推荐Weary
在 extend/RdKafkaClient.php 封装 RdKafkaClient:
base_config = config('game.kafka') ?? [];
// 实例化Conf
$this->kafka_conf = new RdkafkaConf();
// 实例化topicConf
$topicConf = new RdKafkaTopicConf();
$topicConf->set('auto.commit.interval.ms', 100);
$topicConf->set('offset.store.method', 'broker');
$topicConf->set('auto.offset.reset', 'earliest');
$this->kafka_topic_conf = $topicConf;
} catch (Exception $e) {
error_log($e->getMessage());
throw new Exception($e->getMessage());
}
return true;
}
public static function getInstance()
{
$key = md5(json_encode(array()));
if (!isset(self::$instance[$key])) {
self::$instance[$key] = new self();
}
return self::$instance[$key];
}
public function getConf() {
$this->kafka_conf->set('metadata.broker.list', $this->base_config['brokers']);
return $this->kafka_conf;
}
public function getProducer() {
return $this->producer = new RdKafkaProducer($this->kafka_conf);
}
public function getConsumer($group_id) {
$this->kafka_conf->set('group.id', $group_id);
$this->consumer = new RdKafkaConsumer($this->kafka_conf);
$this->consumer->addBrokers($this->base_config['brokers']);
return $this->consumer;
}
public function setConsumeTopic($topic) {
$topic = $this->consumer->newTopic($topic, $this->kafka_topic_conf);
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
return $topic;
}
public function __call($name, $arguments)
{
$logFile = dirname(__DIR__) . "/data/logs/rdkafka_method_not_exist.log";
if (file_exists(dirname($logFile))) {
file_put_contents($logFile, date("Y-m-d H:i:s") . $name . "n", FILE_APPEND);
}
return false;
}
}
?>
生产者调用方法:
public function rd_producer() {
$kafka = RdKafkaClient::getInstance();
$producer = $kafka->getProducer();
$topic = $producer->newTopic("mytopic");
for ($i = 0; $i < 10; $i++) {
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
$producer->poll(0);
}
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
throw new RuntimeException('Was unable to flush, messages might be lost!');
}
}
消费者调用方法:
public function rd_consumer() {
$kafka = RdKafkaClient::getInstance();
$kafka->getConsumer("aaa");
$topic = $kafka->setConsumeTopic("mytopic");
while (true) {
$message = $topic->consume(0, 120*10000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for moren";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed outn";
break;
default:
throw new Exception($message->errstr(), $message->err);
break;
}
}
}



