2.Maxwell 工作原理Maxwell 是由美国 Zendesk 开源,用 Java 编写的 MySQL 实时抓取软件。 实时读取MySQL 二进制日志 Binlog,并生成 JSON 格式的消息,作为生产者发送给 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。
官网地址
2.1Mysql主从复制过程Maxwell 的工作原理很简单,就是把自己伪装成 MySQL 的一个 slave,然后以 slave的身份假装从MySQL(master)复制数据。
- Master 主库将改变记录,写到二进制日志(binary log)中
- Slave 从库向 mysql master 发送 dump 协议,将 master 主库的 binary log events 拷贝到它的中继日志(relay log);
- Slave 从库向 mysql master 发送 dump 协议,将 master 主库的 binary log events 拷贝到它的中继日志(relay log);
工作原理图
- 什么是 binlog
MySQL 的二进制日志可以说 MySQL 最重要的日志了,它记录了所有的 DDL 和 DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL 的二进制日志是事务安全型的。
一般来说开启二进制日志大概会有 1%的性能损耗。二进制有两个最重要的使用场景:
- 其一:MySQL Replication 在 Master 端开启 binlog,Master 把它的二进制日志传递
给 slaves 来达到 master-slave 数据一致的目的。 - 其二:自然就是数据恢复了,通过使用 mysqlbinlog 工具来使恢复数据。
二进制日志包括两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的 DDL 和 DML(除了数据查询语句)语句事件。
- binlog 的开启
3 Maxwell使用
- 找到 MySQL 配置文件的位置
Linux: /etc/my.cnf 如果/etc 目录下没有,可以通过 locate my.cnf 查找位置
Windows: Mysql安装目录下找my.ini- 在 mysql 的配置文件下,修改配置
在[mysqld] 区块,设置/添加[mysqld] #设置服务id server_id=1 #配置binlog文件名前缀 log-bin=mysql-bin #binlog记录格式 有STATEMENT,MIXED,ROW。 binlog_format=row
- 重启Mysql服务
3.1Maxwell 安装部署特别注意: maxwell-1.3x后不支持jdk8.0,本次使用版本为 1.3以下最新 maxwell-1.29.2
下载地址 提取码:yyds
安装部署前需要安装Mysql并开启binlog,这里不在赘述
- 上传 maxwell-1.29.2.tar.gz 到/opt/software 下
- 解压maxwell-1.29.2.tar.gz tar -zxvf maxwell-1.29.2.tar.gz
- 在 MySQL 中建立一个 maxwell 库用于存储 Maxwell 的元数据
create database maxwell character set utf8mb4;
- 分配一个账号可以操作该数据库
-- 创建一个maxwell账号使其只有maxwell库的权限并设置密码为123456 GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY '123456'; -- 分配maxwell账号其他数据库的监控权限 GRANT SELECt ,REPLICATION SLAVE , REPLICATION CLIENT ON *.* TO maxwell@'%'; -- 刷新权限 flush privileges;3.2 Maxwell使用
Maxwell的启动方式有两种
- 使用命令行参数启动Maxwell进程
- 使用配置文件启动Maxwell进程
1.命令行简单启动
#进入到maxwell根目录下 cd maxwell-1.29.2 bin/maxwell --user='maxwell' --password='123456' --port=3306 --host='127.0.0.1' --producer=stdout
命令解说
bin/maxwel 为启动Maxwell脚本路径
–user 是连接数据库的用户名
–password 连接数据库的密码
–port 连接数据库的端口
–host 连接数据库的地址
–producer 生产者模式(stdout:控制台输出,kafka:kafka集群输出,redis:redis输出,rabbitmq:rabbitmq输出 …等)
- 配置文件启动
cd maxwell-1.29.2 bin/maxwell --config=./config.properties
config.properties配置文件
# tl;dr config log_level=info #生产者模式(既输出模式) producer=stdout kafka.bootstrap.servers=localhost:9092 # mysql login info #mysql连接主机 host=127.0.0.1 #mysql用户名 user=maxwell #mysql密码 password=123456 #mysql端口 port=3306 # *** general *** # choose where to produce data to. stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis #producer=kafka # set the log level. note that you can configure things further in log4j2.xml #log_level=DEBUG # [DEBUG, INFO, WARN, ERROR] # if set, maxwell will look up the scoped environment variables, strip off the prefix and inject the configs #env_config_prefix=MAXWELL_ # *** mysql *** # mysql host to connect to #host=hostname # mysql port to connect to #port=3306 # mysql user to connect as. This user must have REPLICATION SLAVE permissions, # as well as full access to the `maxwell` (or schema_database) database #user=maxwell # mysql password #password=maxwell # options to pass into the jdbc connection, given as opt=val&opt2=val2 #jdbc_options=opt1=100&opt2=hello # name of the mysql database where maxwell keeps its own state #schema_database=maxwell # whether to use GTID or not for positioning #gtid_mode=true # maxwell will capture an initial "base" schema containing all table and column information, # and then keep delta-updates on top of that schema. If you have an inordinate amount of DDL changes, # the table containing delta changes will grow unbounded (and possibly too large) over time. If you # enable this option Maxwell will periodically compact its tables. #max_schemas=10000 # SSL/TLS options # To use VERIFY_CA or VERIFY_IDENTITY, you must set the trust store with Java opts: # -Djavax.net.ssl.trustStore=3.2 Maxwell输出数据格式说明-Djavax.net.ssl.trustStorePassword= # or import the MySQL cert into the global Java cacerts. # MODE must be one of DISABLED, PREFERRED, REQUIRED, VERIFY_CA, or VERIFY_IDENTITY # # turns on ssl for the maxwell-store connection, other connections inherit this setting unless specified #ssl=DISABLED # for binlog-connector #replication_ssl=DISABLED # for the schema-capture connection, if used #schema_ssl=DISABLED # maxwell can optionally replicate from a different server than where it stores # schema and binlog position info. Specify that different server here: #replication_host=other #replication_user=username #replication_password=password #replication_port=3306 # This may be useful when using MaxScale's binlog mirroring host. # Specifies that Maxwell should capture schema from a different server than # it replicates from: #schema_host=other #schema_user=username #schema_password=password #schema_port=3306 # *** output format *** # records include binlog position (default false) #output_binlog_position=true # records include a gtid string (default false) #output_gtid_position=true # records include fields with null values (default true). If this is false, # fields where the value is null will be omitted entirely from output. #output_nulls=true # records include server_id (default false) #output_server_id=true # records include thread_id (default false) #output_thread_id=true # records include schema_id (default false) #output_schema_id=true # records include row query, binlog option "binlog_rows_query_log_events" must be enabled" (default false) #output_row_query=true # DML records include list of values that make up a row's primary key (default false) #output_primary_keys=true # DML records include list of columns that make up a row's primary key (default false) #output_primary_key_columns=true # records include commit and xid (default true) #output_commit_info=true # This controls whether maxwell will output JSON information containing # DDL (ALTER/CREATE TABLE/ETC) infromation. (default: false) # See also: ddl_kafka_topic #output_ddl=true # turns underscore naming style of fields to camel case style in JSON output # default is none, which means the field name in JSON is the exact name in MySQL table #output_naming_strategy=underscore_to_camelcase # *** kafka *** # list of kafka brokers #kafka.bootstrap.servers=hosta:9092,hostb:9092 # kafka topic to write to # this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table} # in the latter case 'database' and 'table' will be replaced with the values for the row being processed #kafka_topic=maxwell # alternative kafka topic to write DDL (alter/create/drop) to. Defaults to kafka_topic #ddl_kafka_topic=maxwell_ddl # hash function to use. "default" is just the JVM's 'hashCode' function. #kafka_partition_hash=default # [default, murmur3] # how maxwell writes its kafka key. # # 'hash' looks like: # {"database":"test","table":"tickets","pk.id":10001} # # 'array' looks like: # ["test","tickets",[{"id":10001}]] # # default: "hash" #kafka_key_format=hash # [hash, array] # extra kafka options. Anything prefixed "kafka." will get # passed directly into the kafka-producer's config. # a few defaults. # These are 0.11-specific. They may or may not work with other versions. kafka.compression.type=snappy kafka.retries=0 kafka.acks=1 #kafka.batch.size=16384 # kafka+SSL example # kafka.security.protocol=SSL # kafka.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks # kafka.ssl.truststore.password=test1234 # kafka.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks # kafka.ssl.keystore.password=test1234 # kafka.ssl.key.password=test1234# # controls a heuristic check that maxwell may use to detect messages that # we never heard back from. The heuristic check looks for "stuck" messages, and # will timeout maxwell after this many milliseconds. # # See https://github.com/zendesk/maxwell/blob/master/src/main/java/com/zendesk/maxwell/producer/InflightMessageList.java # if you really want to get into it. #producer_ack_timeout=120000 # default 0 # *** partitioning *** # What part of the data do we partition by? #producer_partition_by=database # [database, table, primary_key, transaction_id, thread_id, column] # specify what fields to partition by when using producer_partition_by=column # column separated list. #producer_partition_columns=id,foo,bar # when using producer_partition_by=column, partition by this when # the specified column(s) don't exist. #producer_partition_by_fallback=database # *** kinesis *** #kinesis_stream=maxwell # AWS places a 256 unicode character limit on the max key length of a record # http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html # # Setting this option to true enables hashing the key with the md5 algorithm # before we send it to kinesis so all the keys work within the key size limit. # Values: true, false # Default: false #kinesis_md5_keys=true # *** sqs *** #sqs_queue_uri=aws_sqs_queue_uri # The sqs producer will need aws credentials configured in the default # root folder and file format. Please check below link on how to do it. # http://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/setup-credentials.html # *** pub/sub *** #pubsub_project_id=maxwell #pubsub_topic=maxwell #ddl_pubsub_topic=maxwell_ddl # *** rabbit-mq *** #rabbitmq_host=rabbitmq_hostname #rabbitmq_port=5672 #rabbitmq_user=guest #rabbitmq_pass=guest #rabbitmq_virtual_host=/ #rabbitmq_exchange=maxwell #rabbitmq_exchange_type=fanout #rabbitmq_exchange_durable=false #rabbitmq_exchange_autodelete=false #rabbitmq_routing_key_template=%db%.%table% #rabbitmq_message_persistent=false #rabbitmq_declare_exchange=true # *** redis *** #redis_host=redis_host #redis_port=6379 #redis_auth=redis_auth #redis_database=0 # name of pubsub/list/whatever key to publish to #redis_key=maxwell # this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table} #redis_pub_channel=maxwell # this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table} #redis_list_key=maxwell # this can be static, e.g. 'maxwell', or dynamic, e.g. namespace_%{database}_%{table} # Valid values for redis_type = pubsub|lpush. Defaults to pubsub #redis_type=pubsub # *** custom producer *** # the fully qualified class name for custom ProducerFactory # see the following link for more details. # http://maxwells-daemon.io/producers/#custom-producer #custom_producer.factory= # custom producer properties can be configured using the custom_producer.* property namespace #custom_producer.custom_prop=foo # *** filtering *** # filter rows out of Maxwell's output. Command separated list of filter-rules, evaluated in sequence. # A filter rule is: # ":" "." [ "." "=" ] # type ::= [ "include" | "exclude" | "blacklist" ] # db ::= [ "/regexp/" | "string" | "`string`" | "*" ] # tbl ::= [ "/regexp/" | "string" | "`string`" | "*" ] # col_val ::= "column_name" # tbl ::= [ "/regexp/" | "string" | "`string`" | "*" ] # # See http://maxwells-daemon.io/filtering for more details # #filter= exclude: *.*, include: foo.*, include: bar.baz, include: foo.bar.col_eg = "value_to_match" # javascript filter # maxwell can run a bit of javascript for each row if you need very custom filtering/data munging. # See http://maxwells-daemon.io/filtering/#javascript_filters for more details # #javascript=/path/to/javascript_filter_file # *** encryption *** # Encryption mode. Possible values are none, data, and all. (default none) #encrypt=none # Specify the secret key to be used #secret_key=RandomInitVector # *** monitoring *** # Maxwell collects metrics via dropwizard. These can be exposed through the # base logging mechanism (slf4j), JMX, HTTP or pushed to Datadog. # Options: [jmx, slf4j, http, datadog] # Supplying multiple is allowed. #metrics_type=jmx,slf4j # The prefix maxwell will apply to all metrics #metrics_prefix=MaxwellMetrics # default MaxwellMetrics # Enable (dropwizard) JVM metrics, default false #metrics_jvm=true # When metrics_type includes slf4j this is the frequency metrics are emitted to the log, in seconds #metrics_slf4j_interval=60 # When metrics_type includes http or diagnostic is enabled, this is the port the server will bind to. #http_port=8080 # When metrics_type includes http or diagnostic is enabled, this is the http path prefix, default /. #http_path_prefix=/some/path/ # ** The following are Datadog specific. ** # When metrics_type includes datadog this is the way metrics will be reported. # Options: [udp, http] # Supplying multiple is not allowed. #metrics_datadog_type=udp # datadog tags that should be supplied #metrics_datadog_tags=tag1:value1,tag2:value2 # The frequency metrics are pushed to datadog, in seconds #metrics_datadog_interval=60 # required if metrics_datadog_type = http #metrics_datadog_apikey=API_KEY # required if metrics_datadog_type = udp #metrics_datadog_host=localhost # default localhost #metrics_datadog_port=8125 # default 8125 # Maxwell exposes http diagnostic endpoint to check below in parallel: # 1. binlog replication lag # 2. producer (currently kafka) lag # To enable Maxwell diagnostic #http_diagnostic=true # default false # Diagnostic check timeout in milliseconds, required if diagnostic = true #http_diagnostic_timeout=10000 # default 10000 # *** misc *** # maxwell's bootstrapping functionality has a couple of modes. # # In "async" mode, maxwell will output the replication stream while it # simultaneously outputs the database to the topic. Note that it won't # output replication data for any tables it is currently bootstrapping -- this # data will be buffered and output after the bootstrap is complete. # # In "sync" mode, maxwell stops the replication stream while it # outputs bootstrap data. # # async mode keeps ops live while bootstrapping, but carries the possibility of # data loss (due to buffering transactions). sync mode is safer but you # have to stop replication. #bootstrapper=async [sync, async, none] # output filename when using the "file" producer #output_file=/path/to/file
- 新增数据格式
{
"database": "test_maxwell",
"table": "t_test01",
"type": "insert",
"ts": 1639969580,
"xid": 4948,
"commit": true,
"data": {
"id": 6,
"v_name": "ad"
}
}
database: 数据库名称
table:表名
type: 数据变化类型(insert 新增,update 修改,delete 删除)
ts: 变化时间
xid:事务id
commit: 是否提交(true 是,false 否)
data: 数据
- 修改数据格式
{
"database": "test_maxwell",
"table": "t_test01",
"type": "update",
"ts": 1639969580,
"xid": 4948,
"commit": true,
"data": {
"id": 6,
"v_name": "ad"
},
"old": {
"v_name": "aa"
}
}
old: 旧数据
- 删除数据格式
{
"database": "test_maxwell",
"table": "t_test01",
"type": "delete",
"ts": 1639969580,
"xid": 4948,
"commit": true,
"data": {
"id": 6,
"v_name": "ad"
}
}
3.3 Maxwell输出到RabbitMQ配置
下面说明采用配置文件方式
- 修改producer=rabbitmq
- 添加如下配置
#mq连接地址 rabbitmq_host=127.0.0.1 #mq连接端口 rabbitmq_port=5672 #连接mq用户名,这里我新建了一个maxwell用户名 rabbitmq_user=maxwell #连接mq用户名密码 rabbitmq_pass=123456 #消息发送到哪个虚拟主机下 rabbitmq_virtual_host=/maxwell #交换机名称 rabbitmq_exchange=maxwell #交换机类型(详情自行查阅rabbitmq相关信息) rabbitmq_exchange_type=fanout #交换机是否持久化默认fasle,采用默认 #rabbitmq_exchange_durable=false #交换机是否自动删除默认false 采用默认 #rabbitmq_exchange_autodelete=false #交换机路由key的模板 #rabbitmq_routing_key_template=%db%.%table% #消息是否持久化默认false 采用默认 #rabbitmq_message_persistent=false #是否自动声明交换机默认true 采用默认 #rabbitmq_declare_exchange=true
- 启动Maxwell进程
cd maxwell-1.29.2 bin/maxwell --config=./config.properties --daemon #--daemon 采用后台方式启动
- Java连接rabbitmq交换机进行数据消费
com.rabbitmq amqp-client 5.8.0
public class ReceiveMessage01 {
public static final String EXCHANGE_NAME = "maxwell";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel("maxwell", "123456","/maxwell");
//创建交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//声明一个随机队列 消费者端口后会被删除
String queue_name = channel.queueDeclare().getQueue();
//队列与交换机绑定
channel.queueBind(queue_name,EXCHANGE_NAME,"");
//接收消息
DeliverCallback handler = (consumerTag, message) -> {
System.out.println("ReceiveMessage02接收到消息:"+new String(message.getBody()));
};
// 消费消息
channel.basicConsume(queue_name,true,handler,consumerTag -> {});
}
}
public class RabbitMqUtil {
public static Channel getChannel(String username,String password,String virtualHost) throws IOException, TimeoutException {
if (virtualHost==null){
virtualHost = "/";
}
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
Connection connection = factory.newConnection();
return connection.createChannel();
}
}
4 Maxwell其他高级配置
4.1 配置单独监听某个库,某个表
#配置只监听test_maxwell库下所有表 eclude 排除 *.*是所有 filter= exclude: *.*, include: test_maxwell.* #配置只监听test_maxwell库下test01表 filter= exclude: *.*, include: test_maxwell.test014.2配置输出到kafka
#删除前面配置rabbitmq配置,新增/修改如下配置 producer=kafka #配置你要连接的kakfa地址 kafka.bootstrap.servers=localhost:9092 #kafkatopic名称 kafka_topic=maxwell34.3指定Mysql数据库中指定表数据同步
- 在maxwell连接数据库中的maxwell库中bootstrap表中插入一条数据,表示需要全量同步数据
-- test_maxwell需要同步数据库名称,test2为表名称
insert into maxwell.bootstrap(database_name,table_name) values('test_maxwell','test2');
- 启动Maxwell进程
- 启动完成后可以查看maxwell库中bootstrap表数据变化
Maxwell是一款非常好用的数据库同步工具,其配置简单,数据输出方式多种多样,是我们使用者选择非常灵活。其他更多配置可以查看官网介绍



