栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Thinkphp RdKafka 公共类封装

Thinkphp RdKafka 公共类封装

推荐Weary

在 extend/RdKafkaClient.php 封装 RdKafkaClient:

base_config = config('game.kafka') ?? [];

            // 实例化Conf
            $this->kafka_conf = new RdkafkaConf();
            
            // 实例化topicConf
            $topicConf = new RdKafkaTopicConf();
            $topicConf->set('auto.commit.interval.ms', 100);
            $topicConf->set('offset.store.method', 'broker');
            $topicConf->set('auto.offset.reset', 'earliest');

            $this->kafka_topic_conf = $topicConf;
        } catch (Exception $e) {
            error_log($e->getMessage());
            throw new Exception($e->getMessage());
        }

        return true;
    }

    
    public static function getInstance()
    {
        $key = md5(json_encode(array()));
        if (!isset(self::$instance[$key])) {
            self::$instance[$key] = new self();
        }
        return self::$instance[$key];
    }

    public function getConf() {
        $this->kafka_conf->set('metadata.broker.list', $this->base_config['brokers']);
        return $this->kafka_conf; 
    }

    public function getProducer() {
        return $this->producer = new RdKafkaProducer($this->kafka_conf);
    }

    public function getConsumer($group_id) {
        $this->kafka_conf->set('group.id', $group_id);

        $this->consumer = new RdKafkaConsumer($this->kafka_conf);
        $this->consumer->addBrokers($this->base_config['brokers']);
        return $this->consumer;
    }

    public function setConsumeTopic($topic) {
        $topic = $this->consumer->newTopic($topic, $this->kafka_topic_conf);

        $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

        return $topic;
    }

    
    public function __call($name, $arguments)
    {
        $logFile = dirname(__DIR__) . "/data/logs/rdkafka_method_not_exist.log";
        if (file_exists(dirname($logFile))) {
            file_put_contents($logFile, date("Y-m-d H:i:s") . $name . "n", FILE_APPEND);
        }
        return false;
    }

}

?>

生产者调用方法:

public function rd_producer() {
    $kafka = RdKafkaClient::getInstance();

    $producer = $kafka->getProducer();

    $topic = $producer->newTopic("mytopic");

    for ($i = 0; $i < 10; $i++) {
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
        $producer->poll(0);
    }

    for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
        $result = $producer->flush(10000);
        if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
            break;
        }
    }

    if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
        throw new RuntimeException('Was unable to flush, messages might be lost!');
    }
}

消费者调用方法:

public function rd_consumer() {
    $kafka = RdKafkaClient::getInstance();
    $kafka->getConsumer("aaa");
    $topic = $kafka->setConsumeTopic("mytopic");

    while (true) {
        $message = $topic->consume(0, 120*10000);
        switch ($message->err) {
            case RD_KAFKA_RESP_ERR_NO_ERROR:
                var_dump($message);
                break;
            case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                echo "No more messages; will wait for moren";
                break;
            case RD_KAFKA_RESP_ERR__TIMED_OUT:
                echo "Timed outn";
                break;
            default:
                throw new Exception($message->errstr(), $message->err);
                break;
        }
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/731089.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号