栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

window下kafka安装及其使用

window下kafka安装及其使用

目录

1.kafka安装

1.1安装JDK1.8

1.2安装Zookeeper3.7

1.3 Kafka2.13安装

2.命令行测试

3.客户端程序开发

3.1 openssl编译

3.3 生产者

3.4 消费者


1.kafka安装

本地装了一套kafka的环境:

序号

名称

备注

下载链接

1

JDK1.8

Java开发环境

https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

2

Zookeeper3.7

分布式应用程序协调服务

http://mirror.bit.edu.cn/apache/zookeeper/

3

Kafka2.13

Kafka开发环境

http://kafka.apache.org/downloads.html

下载如下:

 

1.1安装JDK1.8

步骤1:双击安装包,直到安装完成,

步骤2:需要添加以下的环境变量(右键点击“我的电脑” -> "高级系统设置" -> "环境变量" ),如下:

JAVA_HOME: C:Program FilesJavajdk1.8.0_171 (jdk的安装路径)

Path: 在现有的值后面添加"; %JAVA_HOME%bin"

步骤3:1.3 打开cmd运行 "java -version" 查看当前系统Java的版本:

1.2安装Zookeeper3.7

步骤1:解压安装包apache-zookeeper-3.7.0-bin.tar.gz,

步骤2:打开zookeeper-3.4.13conf,把zoo_sample.cfg重命名成zoo.cfg,从文本编辑器里打开zoo.cfg,把dataDir的值改成“./zookeeper-3.4.13/data”

步骤3:添加如下系统变量:

ZOOKEEPER_HOME: D:kafkaPrestduyapache-zookeeper-3.7.0-bin (zookeeper目录)

Path: 在现有的值后面添加 ";%ZOOKEEPER_HOME%bin;"

1.3 Kafka2.13安装

步骤1:kafka_2.13-3.1.0.tgz解压

步骤2:打开kafka_2.11-2.0.0config,从文本编辑器里打开 server.properties。把 log.dirs的值改成 “./logs”

2.命令行测试

步骤1:启动zookeeper,进入目录:D:kafkaPrestduyapache-zookeeper-3.7.0-binbin

执行如下:./zkserver

 

步骤2:启动kafka,进入目录:D:kafkaPrestduykafka_2.13-3.1.0binwindows

启动服务: ./kafka-server-start.bat ....configserver.properties

 

创建话题:

.kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

 

创建生产者:

kafka-console-producer.bat --broker-list localhost:9092 --topic test

创建消费者:

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

查看话题:

 ./kafka-topics.bat --list   --bootstrap-server localhost:9092

3.客户端程序开发

librdkafka依赖openssl,zlib与zstd库,所以先编译依赖,然后编译librdkafka

各个库使用版本:

库名

下载地址

openssl-1.0.2a

/index.html

librdkafka-1.4.x

https://github.com/edenhill/librdkafka

zlib

NuGet获取(解决方案右键->获取NuGet)

zstd

NuGet获取(解决方案右键->获取NuGet)

备注:zlib与zstd:D:kafkaPrestduylibrdkafka-1.4.xlibrdkafka-1.4.xwin32packages

3.1 openssl编译

OpenSSL编译在vs2019,首先下载/index.html下,找并下载:openssl-1.0.2a

步骤1:下载编译环境Perl和NASM,双击安装即可

Download & Install Perl - ActiveState

https://www.nasm.us/pub/nasm/releasebuilds/2.14rc15/win64/nasm-2.14rc15-installer-x64.exe

步骤2:编译方式选择:在此x64的release版本:perl Configure VC-WIN64A

编译类别

平台

编译指令

debug

x86

perl Configure debug-VC-WIN32 --prefix=xxx

x64

perl Configure debug-VC-WIN64A --prefix=xxx

release

x86

perl Configure VC-WIN32 --prefix=xxx

x64

perl Configure VC-WIN64A  --prefix=

注意:没有安装nasm,加上no-asm

--prefix参数:库生成目录

步骤3:创建 makefile 文件

创建32位makefile文件:msdo_ms.bat

创建64位makefile文件 msdo_win64a.ba

步骤4:编译动态库和静态库

编译动态库:

nmake -f msntdll.mak

nmake -f msntdll.mak test

nmake -f msntdll.mak install

编译静态库:

lib nmake -f msnt.mak

nmake -f msnt.mak test

nmake -f msnt.mak install

备注:重新编译需要:

nmake -f msnt.mak clean

nmake -f msntdll.mak clean

步骤5:在指定的目录下获取到库文件

3.2 librdkafka编译

步骤1:https://github.com/edenhill/librdkafka,下载并解压

步骤2:librdkafka-1.4.xwin32librdkafka.sln下使用vs2019打开

步骤3:编译librdkafka项目,点击编译,报错如下:

 Error This project references NuGet package(s) that are missing on this computer. Use NuGet Package Restore to download them. For more information, see 必应

解决方法:

找到 项目文件librdkafka.csproj,打开后,移除下面:


  
    This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them.  For more information, see http://go.microsoft.com/fwlink/?linkID=322105. The missing file is {0}.
  
  

2.zlib和zstd连接失败

在librdkafka-1.4.xwin32packages目录下,有zlib和libzstd编译好的库,只添加包含头文件和附加库目录,以及链接lib文件。

Zlib头文件:

librdkafka-1.4.xwin32packageszlib.v140.windesktop.msvcstl.dyn.rt-dyn.1.2.8.8buildnativeinclude

zlib库文件:

librdkafka-1.4.xwin32packageszlib.v140.windesktop.msvcstl.dyn.rt-dyn.1.2.8.8libnativev140windesktopmsvcstldynrt-dynx64Release

libzstd头文件:

librdkafka-1.4.xwin32packagesconfluent.libzstd.redist.1.3.8-g9f9630f4-test1buildnativeinclude

Libzstd库文件:

librdkafka-1.4.xwin32packagesconfluent.libzstd.redist.1.3.8-g9f9630f4-test1buildnativelibwinx64

3.确实openssl缺失

找到opensssl编译生成目录,添加包含头文件和附加库目录,以及链接lib文件

步骤4:编译libkafka,文件生产在librdkafka-1.4.xwin32outdirv142x64Release

 

3.3 生产者

官方精简:

#define _CRT_SECURE_NO_WARNINGS
#include 
#include 
#include 
#include "rdkafka.h"

static volatile sig_atomic_t run = 1;
static void stop(int sig) {
    run = 0;
    fclose(stdin);
}

static void dr_msg_cb(rd_kafka_t* rk,const rd_kafka_message_t* rkmessage, void* opaque) {
    if (rkmessage->err)
    {
        fprintf(stderr, "%% Message delivery failed: %sn", rd_kafka_err2str(rkmessage->err));
    }
    else
    {
        fprintf(stderr, "Message delivered % d bytes partition % dn", rkmessage->len, rkmessage->partition);
    }
}

int main(int argc, char** argv) 
{
    rd_kafka_t* rk;       
    rd_kafka_conf_t* conf; 
    char errstr[512];       
    char buf[512];          
    const char* brokers;    
    const char* topic;      

    brokers = "localhost:9092";
    topic = "test_2";

    conf = rd_kafka_conf_new();
    if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) 
    {
        fprintf(stderr, "%sn", errstr);
        return 1;
    }

    rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
    rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
    if (!rk) 
    {
        fprintf(stderr,"Failed to create new producer: %sn", errstr);
        return 1;
    }
    signal(SIGINT, stop);

    fprintf(stderr,"Type some text and hit enter to produce messagen");
    while (run && fgets(buf, sizeof(buf), stdin)) 
    {
        size_t len = strlen(buf);
        rd_kafka_resp_err_t err;

        if (buf[len - 1] == 'n')
        {
            buf[--len] = '';
        }

        if (len == 0) {
            rd_kafka_poll(rk, 0);
            continue;
        }

    retry:
        err = rd_kafka_producev(rk,RD_KAFKA_V_TOPIC(topic),RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),RD_KAFKA_V_VALUE(buf, len),RD_KAFKA_V_OPAQUE(NULL),RD_KAFKA_V_END);
        if(err) 
        {
            fprintf(stderr,"Failed to produce to topic %s: %sn",topic, rd_kafka_err2str(err));
            if(err == RD_KAFKA_RESP_ERR__QUEUE_FULL) 
            {
                rd_kafka_poll(rk, 1000);
                goto retry;
            }
        }
        rd_kafka_poll(rk, 0);
    }

    rd_kafka_flush(rk, 10 * 1000 );
    if (rd_kafka_outq_len(rk) > 0)
    {
        fprintf(stderr, "%d message(s) were not deliveredn", rd_kafka_outq_len(rk));
    }

    rd_kafka_destroy(rk);

    return 0;
}

3.4 消费者
#define  _CRT_SECURE_NO_WARNINGS
#include 
#include 
#include 
#include 

#include "rdkafka.h"

static volatile sig_atomic_t run = 1;
static void stop(int sig) {
    run = 0;
}

static int is_printable(const char* buf, size_t size) {
    size_t i;

    for (i = 0; i < size; i++)
        if (!isprint((int)buf[i]))
            return 0;

    return 1;
}

int main(int argc, char** argv) {
    rd_kafka_t* rk=NULL;         
    rd_kafka_conf_t* conf = NULL;
    rd_kafka_resp_err_t err; 
    char errstr[512];        
    const char* brokers = NULL;
    const char* groupid = NULL;
    char** topics = NULL;
    int topic_cnt;           
    rd_kafka_topic_partition_list_t* subscription = NULL;
    int i;

    brokers = "localhost:9092";
    groupid = "101";
    
    char *name = (char*)malloc(10 * sizeof(char*));
    strcpy(name,"test_2");
    topics = &name;
    topic_cnt = 1;

    conf = rd_kafka_conf_new();
    if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%sn", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }

    if (rd_kafka_conf_set(conf, "group.id", groupid,errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%sn", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }

    if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest",errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "%sn", errstr);
        rd_kafka_conf_destroy(conf);
        return 1;
    }

    rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
    if (!rk) {
        fprintf(stderr, "Failed to create new consumer: % sn", errstr);
        return 1;
    }

    conf = NULL;
    rd_kafka_poll_set_consumer(rk);

    subscription = rd_kafka_topic_partition_list_new(topic_cnt);
    for (i = 0; i < topic_cnt; i++)
        rd_kafka_topic_partition_list_add(subscription,topics[i],RD_KAFKA_PARTITION_UA);

    err = rd_kafka_subscribe(rk, subscription);
    if (err) {
        fprintf(stderr,"Failed to subscribe to %d topics: %sn",subscription->cnt, rd_kafka_err2str(err));rd_kafka_topic_partition_list_destroy(subscription);
        rd_kafka_destroy(rk);
        return 1;
    }

    fprintf(stderr,"Subscribed to %d topic(s), " "waiting for rebalance and messages...n",subscription->cnt);

    rd_kafka_topic_partition_list_destroy(subscription);
    signal(SIGINT, stop);

    while (run) {
        rd_kafka_message_t* rkm;
        rkm = rd_kafka_consumer_poll(rk, 100);
        if (!rkm)
            continue; 

        if (rkm->err) {
            fprintf(stderr,"%% Consumer error: %sn",rd_kafka_message_errstr(rkm));rd_kafka_message_destroy(rkm);
            continue;
        }
        printf("Message on %s  %d at offset %d:n",rd_kafka_topic_name(rkm->rkt), rkm->partition,rkm->offset);

        if (rkm->key && is_printable((const char*)rkm->key, rkm->key_len))
        {
            printf(" Key: %.*sn", (int)rkm->key_len, (const char*)rkm->key);
        }
        else if (rkm->key)
        {
            printf(" Key: (%d bytes)n", (int)rkm->key_len);
        }

        if (rkm->payload && is_printable((const char*)rkm->payload, rkm->len))
        {
            printf(" Value: %.*sn", (int)rkm->len, (const char*)rkm->payload);
        }
        else if (rkm->key)
        {
            printf(" Value: (%d bytes)n", (int)rkm->len);
        }

        rd_kafka_message_destroy(rkm);
    }

    fprintf(stderr, "%% Closing consumern");
    rd_kafka_consumer_close(rk);


    
    rd_kafka_destroy(rk);

    return 0;
}

      运行结果:

参考:

librdkafka编译及简单使用过程简介_一缕阳光宣泄、整个世界的博客-CSDN博客_rdkafka编译

Windows 下编译 OpenSSL_青春不老,奋斗不止!-CSDN博客_openssl编译

用VS2019编译librdkafka库_eamon100的博客-CSDN博客

编译OpenSSL 动态库/静态库以及运行时库的选择_YuHengZuo的博客-CSDN博客

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/742417.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号