Relationships勾选failure
Database Connection Pooling Service 配置连接池,包含:连接地址、驱动、驱动地址、用户名、密码,配置完成后将状态置为Enable
- GenerateTableFetch和ExecuteSQLRecord之间的连接(Connection)勾选success
Relationships勾选failure;Database Connection Pooling Service选择连接池;Record Writer 配置为JsonRecordSetWriter(我们需要转为json格式,所以还需要配置avro的结构);
JsonRecordSetWriter中 Schema Write Strategy配置为Set ‘schema.name’ Attribute;
Schema Registry为AvroSchemaRegistry;
Schema Access Strategy配置为 Use ‘Schema Name’ Property;
Schema Name为avro.schema; Schema Text为${avro.schema};
AvroSchemaRegistry 中添加属性avro.schema,value为数据对应的avro格式
Schema示例:
{
"namespace": "iot_firmaster_new",
"type": "record",
"name": "user_test",
"fields": [
{
"name": "id",
"type": "int",
"default": 0
},
{
"name": "name",
"type": "string",
"default": ""
},
{
"name": "age",
"type": "int",
"default": 0
},
{
"name": "sex",
"type": "int",
"default": 0
}
]
}
ExecuteSQLRecord和SplitJson之间的连接(Connection)勾选success
Relationships勾选failure和original
JsonPath expression配置为$.*,从根节点开始解析
SplitJson和PublishKafka_2_0之间的连接(Connection)勾选split
Relationships勾选failure,Kafka Brokers 配置kafka连接信息;
Topic Name配置topic 名称;Delivery Guarantee 配置 Guarantee Replicated Delivery,对应kafka的acks机制;
PublishKafka_2_0和LogAttribute之间的连接(Connection)勾选success
- Relationships勾选success



