栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

自建Kafka connector

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

自建Kafka connector

Setting up Kinesis source Connector with AWS MSK

  1. Launch an EC2 instance:

    I tested on an Ubuntu instance v16.04

  2. Install Java:

$ sudo apt-get update
$ sudo apt-get install default-jre
  1. Install Confluent platform:

Make sure to install the ConfluentPlatform version 5.1.0 as it comes with Kafka v2.1.0 compatible with the kafka version on AWS MSK

Download and extract the 5.1.0 tarball from the link: https://www.confluent.io/previous-versions/

  1. Install the Confluent Hub:

Download and Unzip the tar given in the doc here: https://docs.confluent.io/current/connect/managing/confluent-hub/client.html

Set the PATH environment variable to point to your bin directory of your confluent folder.

export PATH=~/confluent-5.1.0/bin:${PATH};
  1. Install the Kafka Connect Kinesis Source Connector:

confluent-hub install confluentinc/kafka-connect-kinesis:latest

create a properties file (any path)

sudo vim kinn.properties

Copy the following, and update the parameters accordingly (specify the bootstrap servers of your MSK cluster).

name=KinesisSourceConnector1

connector.class=io.confluent.connect.kinesis.KinesisSourceConnector

tasks.max=1

aws.access.key.id=AccessKey

aws.secret.key.id=SecretKey

kafka.topic=sampleTopic

kinesis.stream=samplestream

confluent.topic.bootstrap.servers=172.xx.xx.xx:9092,172.xx.xx.xx:9092,172.xx.xx.xx:9092

confluent.topic.replication.factor=3

Int he above properties file, provide the bootstrap broker string of your MSK cluster which you can get by running the following in CLI:

aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn

Note: make sure the “tasks.max” matches the number of shards in your source kinesis stream otherwise an exception is thrown.

  • https://docs.confluent.io/current/connect/kafka-connect-kinesis/index.html
  1. Create a Topic in your AWS MSK cluster:
cd ~/confluent-5.1.0

bin/kafka-topics --create --zookeeper ZookeeperConnectString --replication-factor 3 --partitions 1 --topic sampleTopic

Note: Make sure to create a topic first before starting the connector and provide the topic name in the connector properties file as provided in step 5.

  1. Create a Kinesis stream and Put Data to your source Kinesis Stream:
aws kinesis put-record --stream-name samplestream --partition-key 123 --data test-message-1
  1. Configure your connect-standalone properties:
/home/ubuntu/confluent-5.1.0/etc/kafka

sudo vim connect-standalone.properties

Update the bootstrap servers:

bootstrap.servers=172.xx.xx.xx:9092,172.xx.xx.xx:9092,172.xx.xx.xx:9092
  1. Start the standalone connect worker, passing the connect-standalone.properties and the connector properties
cd ~/confluent-5.1.0

bin/connect-standalone /home/ubuntu/confluent-5.1.0/etc/kafka/connect-standalone.properties /home/ubuntu/kinn.properties

You should see messages like:

[2019-04-18 16:19:05,983] INFO WorkerSourceTask{id=KinesisSourceConnector1-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:397)

  1. Start your consumer:
cd ~/confluent-5.1.0

bin/kafka-console-consumer --bootstrap-server "172.xx.xx.xx:9092,172.xx.xx.xx:9092,172.xx.xx.xx:9092" --topic sampleTopic --from-beginning

You should see the Kinesis data here which would be base64 encoded.

Similarly, as above, you can set up other connectors on the client machine itself which can communicate with AWS MSK.

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/821335.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号