org.apache.rocketmq rocketmq-spark0.0.1-SNAPSHOT
可能maven仓库没有这个依赖,拉下代码,执行以下命令安装到本地仓库。
mvn clean install dependency:copy-dependencies -Dmaven.test.skip -Dcheckstyle.skipDemo
val df = spark.createDataframe(Seq(
("{}", "your_topic", "your_tag")
)).toDF("body", "topic", "tags")
df.write
.format("org.apache.spark.sql.rocketmq.RocketMQSourceProvider")
.option("nameServer", "host:port;host:port")
.option("topic", "new_topic")
.save()
说明
dataframe 应该包含以下列,其中body是必须要的,topic可被覆盖。
| column | type |
|---|---|
| body | string or binary |
| topic | string |
| tags | string |
option的nameServer参数必须设置,若设置了topic参数则会覆盖dataframe的topic列,且topic不存在会自动创建。
| key | value |
|---|---|
| nameServer | host:port;host:port |
https://github.com/apache/rocketmq-externals
https://rocketmq-1.gitbook.io/rocketmq-connector/



