MMQ broker 是一款完全开源,高度可伸缩,高可用的分布式 MQTT 消息服务器,适用于 IoT、M2M 和移动应用程序。
MMQ broker 完整支持MQTT V3.1 和 V3.1.1。
Github: https://github.com/MrHKing/mmqtt
Gitee: https://gitee.com/paperman/mmq
安装MMQ broker 是跨平台的,支持 Linux、Unix、macOS 以及 Windows。这意味着 MMQ broker 可以部署在 x86_64 架构的服务器上。由于使用raft一致性算法,集群部署三个节点以上。
从 Github 上下载源码方式
git clone https://github.com/MrHKing/mmq.git cd mmq mvn -Prelease-mmq -Dmaven.test.skip=true clean install -U 复制代码
直接安装
您可以从 最新稳定版本 下载 mmq-server-$version.zip 包。
unzip mmq-server-$version.zip 或者 tar -xvf mmq-server-$version.tar.gz cd mmq/bin 复制代码
Docker安装
docker run -d --name mmq -p 2883:2883 -p 1883:3883 -p 8888:8888 paperman/mmq:v1.0.8 复制代码
Kubernetes安装
快速入门 单机版启动cd mmqbin #windows start startup.cmd -m standalone 复制代码
cd mmq/bin #linux start sh startup.sh -m standalone #linux shutdown sh shutdown.sh 复制代码集群版启动
cd mmq/config #配置集群文件 cp cluster.conf.example cluster.conf 复制代码
#每个节点都需要配置其他节点的地址,如下: #example 192.168.31.9:7777 192.168.31.9:8848 192.168.31.9:8888 复制代码
cd mmqbin #windows start startup.cmd 复制代码
cd mmq/bin #linux start sh startup.sh #linux shutdown sh shutdown.sh 复制代码配置文件
mqtt tcp端口默认:3883
mqtt websocket端口:2883
#*************** Spring Boot Related Configurations ***************# ### Default web context path: server.servlet.contextPath=/ ### Default web server port: server.port=8888 #*************** mqtt broker Configurations ***************# mmq.broker.websocketPort=2883 mmq.broker.port=3883 mmq.broker.default.user=admin mmq.broker.default.password=admin@mmq mmq.broker.default.anonymous=true 复制代码10万客户端压力测试
10万连接两节点集群测试工具xmeter 服务器资源
服务端资源:两台4核8G服务器客户端资源:三台8核16G模拟测试服务器 测试结果
10万连接,一分钟发送一次数据。平均吞吐量:1302。成功率:100%服务器使用资源:CPU 10%, 内存6%平均响应时间:0.0158秒 规则引擎
通过SQL进行规则转发
选择发布到topic/#的消息,然后选择所有字段:
SELECt * FROM "topic/#" 复制代码
查询专门字段SQL
SELECt this.payload.value, this.payload.deviceName FROM "topic/#" 复制代码
条件查询SQL
SELECt this.payload.value, this.payload.deviceName FROM "topic/#" WHERe this.payload.deviceName like 'abc%' 复制代码
滚动窗口滑动窗口MySqlSqlserverPostgresqlTdengine 插入数据库模板通用字段
| 属性 | 说明 |
|---|---|
| uuid | uuid字符串 |
| date | yyyy-MM-dd格式日期 |
| datetime | yyyy-MM-dd HH:mm:ss格式日期 |
| utc | yyyy-MM-dd'T'HH:mm:ssZ格式日期 |
| timestamp | long 格式日期 |
| username | 发送消息的账户 |
| topic | 发送topic |
| topic[no] | 例:/topic1/topic2/topic3/... |
INSERT INTO test (uuid, date, datetime, utc, timestamp, topic, name) VALUES ('#{[uuid]}',#{[date]},#{[datetime]},#{[utc]},#{[timestamp]},#{[topic]},#{[topic1]})
复制代码
kafka资源桥接
- 添加kafka资源添加规则引擎
SELECt * FROM "topic/#" 复制代码
- 测试
Topic: topic/testMQTT 客户端上传 Json Demo 如下:
{
"msg": {
"name": "test",
"value": "12321"
}
}
复制代码
- 插入结果
{"address":"","qos":0,"payload":{"msg":{"name":"test","value":"12321"}},"topic":"topic/test"}
复制代码
Mysql资源桥接
MQTT 客户端上传 Json Demo 如下:
{
"msg": {
"name": "test",
"value": "12321"
}
}
复制代码
规则SQL Demo:
SELECt * FROM "topic/#" 复制代码
Mysql SQL Demo如下:
INSERT INTO history (name, value) VALUES ('#{[msg][name]}',#{[msg][value]})
复制代码
SqlServer资源桥接
MQTT 客户端上传 Json Demo 如下:
{
"msg": {
"name": "test",
"value": "12321"
}
}
复制代码
规则SQL Demo:
SELECt * FROM "topic/#" 复制代码
Mysql SQL Demo如下:
INSERT INTO history (name, value) VALUES ('#{[msg][name]}',#{[msg][value]})
复制代码
postgresql资源桥接
MQTT 客户端上传 Json Demo 如下:
{
"msg": {
"name": "test",
"value": "12321"
}
}
复制代码
规则SQL Demo:
SELECt * FROM "topic/#" 复制代码
Mysql SQL Demo如下:
INSERT INTO history (name, value) VALUES ('#{[msg][name]}',#{[msg][value]})
复制代码
认证方式
对外API接口,HTTP API 使用 Basic 认证 (opens new window)方式,id 和 password 须分别填写 AppID 和 AppSecret。 默认的 AppID 和 AppSecret 是:mmq/aaaaaa。使用需要在【模块】菜单中找到HTTP API模块,启用即可,如需修改 AppID 和 AppSecret,点击编辑修改即可。
返回响应码| 响应码 | 说明 |
|---|---|
| 102 | 未知错误 |
| 200 | 正常 |
| 401 | 没有权限 |
GET /v1/api/clients
返回集群下所有客户端的信息,支持分页
查询参数
| 名称 | 类型 | 是否必填 | 默认值 | 说明 |
|---|---|---|---|---|
| pageNo | int | 是 | 无 | 页码 |
| pageSize | int | 是 | 无 | 数据条数 |
| clientId | String | 否 | 无 | 客户端ID |
| address | String | 否 | 无 | 客户端IP |
| user | String | 否 | 无 | 客户端账户 |
| topic | String | 否 | 无 | 客户端订阅的Topic,用于like查询订阅此Topic的客户端 |
| 返回数据 | ||||
| 名称 | 类型 | 是否必填 | 默认值 | 说明 |
| ------ | ------ | ------ | ------ | ------ |
| code | int | 是 | 无 | 200 |
| message | int | 是 | 无 | 消息 |
| data | Array of Objects | 是 | 所有客户端信息 | |
| pageNo | int | 是 | 无 | 页码 |
| pageSize | int | 是 | 无 | 数据条数 |
| totalCount | int | 是 | 无 | 数据总数 |
| totalPage | int | 是 | 无 | 数据总页 |
| data[0].clientId | String | 是 | 无 | 客户端ID |
| data[0].address | String | 否 | 无 | 客户端IP |
| data[0].user | String | 否 | 无 | 客户端账户 |
| data[0].nodeIp | String | 否 | 无 | 客户端所在节点IP |
$ curl -i --basic -u mmq:aaaaaa -X GET "http://localhost:8888/v1/api/clients?pageNo=1&pageSize=10"
{
"code": 200,
"message": null,
"data": {
"pageSize": 10,
"pageNo": 1,
"totalCount": 1,
"totalPage": 0,
"data": [
{
"clientId": "paho1640921025845000001",
"user": "e",
"connectTiem": "2022-01-20T01:02:39.903+00:00",
"address": "127.0.0.1",
"nodeIp": "192.168.0.113",
"nodePort": 8888
}
]
}
}
复制代码
踢出客户端接口
GET /v1/api/rejectClient
通过客户端id,踢出客户端
查询参数
| 名称 | 类型 | 是否必填 | 默认值 | 说明 |
|---|---|---|---|---|
| clientId | String | 否 | 无 | 客户端ID |
| 返回数据 | ||||
| 名称 | 类型 | 是否必填 | 默认值 | 说明 |
| ------ | ------ | ------ | ------ | ------ |
| code | int | 是 | 无 | 200 |
| message | int | 是 | 无 | 消息 |
$ curl -i --basic -u mmq:aaaaaa -X GET "http://localhost:8888/v1/api/rejectClient?clientId=paho1640921025845000001"
{
"code": 200,
"message": null,
"data": null
}
复制代码
Dashboard --单机演示
启动后访问 http://101.43.4.211:8888/
默认账户:mmq
默认密码:aaaaaa
mqtt tcp端口默认:1883
mqtt websocket端口:2883
MQTT 规范你可以通过以下链接了解与查阅 MQTT 协议:
MQTT Version 3.1.1
开源许可Apache License 2.0, 详见 LICENSE。
群号QQ群: 1016132679
Github: https://github.com/MrHKing/mmqtt
Gitee: https://gitee.com/paperman/mmq



