栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

maxwell16浣跨敤鏁欑▼(maxwell鏁欑▼)

maxwell16浣跨敤鏁欑▼(maxwell鏁欑▼)

官网:https://maxwells-daemon.io/quickstart/

Maxwell安装

下载maxwell-1.25.0.tar.gz(注意:高版本的可能需要高版本的jdk)

解压

配置mysql

binlog_format = ROW
server_id = 1
log-bin=/usr/local/mysql/arch/mysql-bin

注意maxwell所在的目录是可以访问到该log-bin文件的,简单的办法是加在一个用户组下

重启mysql,查看log-bin所在位置:show variables like ‘%log_bin%’;

创建Maxwell用户

mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY '!maxwell001';
mysql> CREATE USER 'maxwell'@'localhost' IDENTIFIED BY '!maxwell001';
mysql> CREATE USER 'maxwell'@'hadoop02' IDENTIFIED BY '!maxwell001';

mysql>grant all privileges on *.* to maxwell@'%' identified by '!maxwell001';
mysql>grant all privileges on *.* to maxwell@localhost identified by '!maxwell001';
mysql>grant all privileges on *.* to maxwell@hadoop02 identified by '!maxwell001';
mysql>flush privileges;

Maxwell配置文件

# Maxwell 相关配置
log_level=info
client_id=ODS_NEW
# Kafka相关配置
producer=kafka
kafka.bootstrap.servers=hadoop02:9092
kafka_topic=maxwell
# kafka_partition_hash=nurmur3
kafka_key_format=hash
kafka.compression.type=snappy
kafka.retries=100
kafka.acks=all
producer_partition_by=primary_key
# mysql相关配置
host=hadoop02
port=3306
user=maxwell
password = !maxwell001
jdbc_options=zeroDateTimeBehavior=convertToNull&connectTimeout=600000
replica_server_id=202
# 配白名单
filter=exclude:*.*,include: test.*

创建kafka topic,并且启动消费

bin/kafka-topics.sh 
--create 
--zookeeper hadoop02:2181/kafka 
--partitions 3 
--replication-factor 1 
--topic maxwell
kafka-console-consumer.sh --bootstrap-server hadoop02:9092 --topic maxwell --from-beginning

启动:bin/maxwell --conf=driver.properties --daemon
–client_id=ODS_NEW,如果不是后台启动,出现的下面的启动项,证明安装成功

[hadoop@hadoop02 maxwell-1.25.0]$ bin/maxwell --conf=driver.properties  --client_id=ODS_NEW
Using kafka version: 1.0.0
14:55:44,389 WARN  MaxwellMetrics - Metrics will not be exposed: metricsReportingType not configured.
14:55:44,758 INFO  ProducerConfig - ProducerConfig values: 
	acks = all
	batch.size = 16384
	bootstrap.servers = [hadoop02:9092]
	buffer.memory = 33554432
	client.id = 
	compression.type = snappy
	connections.max.idle.ms = 540000
	enable.idempotence = false
	interceptor.classes = null
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 100
	retry.backoff.ms = 100
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

14:55:44,803 INFO  AppInfoParser - Kafka version : 1.0.0
14:55:44,803 INFO  AppInfoParser - Kafka commitId : aaa7af6d4a11b29d
14:55:44,821 INFO  Maxwell - Maxwell v1.25.0 is booting (MaxwellKafkaProducer), starting at Position[BinlogPosition[mysql-bin.000001:99110], lastHeartbeat=1647500121094]
14:55:44,938 INFO  MysqlSavedSchema - Restoring schema id 1 (last modified at Position[BinlogPosition[mysql-bin.000001:5360], lastHeartbeat=0])
14:55:45,050 INFO  BinlogConnectorReplicator - Setting initial binlog pos to: mysql-bin.000001:99110
14:55:45,140 INFO  BinaryLogClient - Connected to hadoop02:16609 at mysql-bin.000001/99110 (sid:202, cid:260)
14:55:45,140 INFO  BinlogConnectorLifecycleListener - Binlog connected.

启动成功,Mysql中会出现以下数据库和表

Kafka消费端打印出的日志

这样就可以对接flink或者spark,进行处理

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/771975.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号