物联网AIoT之MQTT与MODBUS在开源低代码物联网平台的相爱相杀
现在我们应用Node-RED 搭建一个MQTT网关,把一个水箱的水位和水温发送到物联网平台。网关通过Modbus总线连接一个水位传感器和一个温度传感器,采集水位和温度数据,再通过MQTT客户端把水位和温度信息发布到MQTT代理。
1.0 网关用途
为了便于理解,我们这个示例非常简单,在日常的应用场景中,会比较复杂。这个网关的主要用途是协议转换和信息汇聚。
2011年客户定制的4路无线DVR通过CANBUS走MODBUS协议读取瓦斯等传感器数据
https://www.besovideo.com/detail?t=1&i=141

协议转化:
网关可以通过Modbus总线把多个485串口的传感器,以轮询的方式采集,通过MQTT协议发布给信息平台。
信息汇聚:
网关可以把多个传感器数据,聚合为一个对象,为数据附加语义描述,使信息具有结构化,易可识别。
2.0 网关配置
2.1 模拟两个传感器
-
用虚拟串口软件VSPD配置两对虚拟串口com3-com4,com5-6。
-
用Modbus Slave模拟两个传感器分别使用com3和com5。
2.2 实现网关
2.2.1 配置MQTT代理
拖拽一个Aedes MQTT broker 节点到工作区,双击节点打开节点编辑器:
这里使用了默认的配置,端口号为1883。您可以根据需要修改配置。
2.2.2 通过Modbus分别采集水箱的水位和温度,合并为水箱参数,由MQTT客户端发布
2.2.2.1配置MQTT客户端
拖拽两个Modbus-Getter节点到工作区,双击节点打开节点编辑器,如下图所示配置:
双击右下角的铅笔图标,编辑Sever(服务端):
这里串口端口分别使用com4和com6,波特率与传感器端一致。
2.2.2.2 配置轮询
如下图所示,在Modbus-Getter节点左侧添加注入(inject)节点和延迟(delay)节点:
双击打开注入节点编辑器,按下图配置节点周期注入消息:
双击延迟(delay)节点,设置信息延迟10毫秒:
2.22.3 为消息添加主题
在Modbus-Getter节点右侧,添加函数节点,为消息添加主题:
打开函数编辑器,编写函数体,为消息添加节点:
水位的函数体:
msg={topic:"WaterLevel",payload:msg.payload[0]}return msg;
水温的函数体:
msg={topic:"WaterTemp",payload:msg.payload[0],complete:true}return msg;
2.2.2.4 聚合并发布消息
两个函数节点的消息都传递到后边的连接(join)节点:
打开连接节点编辑器:
连接节点把消息组合成键值对对象输出给mqtt out节点,下面配置发布主题,如下图:
到这里网关功能完成。
2.3 测试
下面配置一个mqtt in节点,测试消息:
双击打开mqtt in节点,配置订阅主题,如下图:
完成后部署,会在右侧调试面板看到收到的消息:
3.0 完整源码
[{"id": "47697d9d8a564c16","type": "tab","label": "流程 2","disabled": false,"info": "","env": []},{"id": "67b13033ddc0d223","type": "modbus-getter","z": "47697d9d8a564c16","name": "水位","showStatusActivities": true,"showErrors": false,"logIOActivities": false,"unitid": "1","dataType": "HoldingRegister","adr": "0","quantity": "1","server": "ce08db4b7d9f20f0","useIOFile": false,"ioFile": "","useIOForPayload": false,"emptyMsgOnFail": false,"keepMsgProperties": false,"x": 310,"y": 180,"wires": [["d4cba5fe6569d8c3"],[]]},{"id": "844ad47ff9474f12","type": "modbus-getter","z": "47697d9d8a564c16","name": "水温","showStatusActivities": true,"showErrors": false,"logIOActivities": false,"unitid": "1","dataType": "HoldingRegister","adr": "0","quantity": "1","server": "c3bdb25d487433e4","useIOFile": false,"ioFile": "","useIOForPayload": false,"emptyMsgOnFail": false,"keepMsgProperties": false,"x": 350,"y": 280,"wires": [["06514971ca3ad6ed"],[]]},{"id": "36e7a1c989d23266","type": "inject","z": "47697d9d8a564c16","name": "","props": [{"p": "payload"},{"p": "topic","vt": "str"}],"repeat": "5","crontab": "","once": true,"onceDelay": "10","topic": "","payload": "","payloadType": "str","x": 150,"y": 220,"wires": [["67b13033ddc0d223","cb911d4fa59973ac"]]},{"id": "16aa2453141586b1","type": "debug","z": "47697d9d8a564c16","name": "debug 4","active": true,"tosidebar": true,"console": false,"tostatus": false,"complete": "payload","targetType": "msg","statusVal": "","statusType": "auto","x": 420,"y": 380,"wires": []},{"id": "cb911d4fa59973ac","type": "delay","z": "47697d9d8a564c16","name": "","pauseType": "delay","timeout": "10","timeoutUnits": "milliseconds","rate": "1","nbRateUnits": "1","rateUnits": "second","randomFirst": "1","randomLast": "5","randomUnits": "seconds","drop": false,"allowrate": false,"outputs": 1,"x": 210,"y": 280,"wires": [["844ad47ff9474f12"]]},{"id": "d4cba5fe6569d8c3","type": "function","z": "47697d9d8a564c16","name": "水位","func": "msg={topic:\"WaterLevel\",payload:msg.payload[0]}\nreturn msg;","outputs": 1,"noerr": 0,"initialize": "","finalize": "","libs": [],"x": 450,"y": 180,"wires": [["47bd8c57dc676aeb"]]},{"id": "06514971ca3ad6ed","type": "function","z": "47697d9d8a564c16","name": "水温","func": "msg={topic:\"WaterTemp\",payload:msg.payload[0],complete:true}\nreturn msg;","outputs": 1,"noerr": 0,"initialize": "","finalize": "","libs": [],"x": 470,"y": 280,"wires": [["47bd8c57dc676aeb"]]},{"id": "2229838c3375166e","type": "mqtt out","z": "47697d9d8a564c16","name": "","topic": "WaterTank","qos": "0","retain": "false","respTopic": "","contentType": "","userProps": "","correl": "","expiry": "","broker": "0222eb30f00ea5a5","x": 750,"y": 220,"wires": []},{"id": "d731c49d3fbee150","type": "aedes broker","z": "47697d9d8a564c16","name": "","mqtt_port": 1883,"mqtt_ws_bind": "port","mqtt_ws_port": "","mqtt_ws_path": "","cert": "","key": "","certname": "","keyname": "","dburl": "","usetls": false,"x": 210,"y": 80,"wires": [[],[]]},{"id": "5323672ec49be0fd","type": "mqtt in","z": "47697d9d8a564c16","name": "","topic": "WaterTank","qos": "0","datatype": "auto-detect","broker": "0222eb30f00ea5a5","nl": false,"rap": true,"rh": 0,"inputs": 0,"x": 140,"y": 380,"wires": [["16aa2453141586b1"]]},{"id": "47bd8c57dc676aeb","type": "join","z": "47697d9d8a564c16","name": "","mode": "custom","build": "object","property": "payload","propertyType": "msg","key": "topic","joiner": "\\n","joinerType": "str","accumulate": true,"timeout": "","count": "","reduceRight": false,"reduceExp": "","reduceInit": "","reduceInitType": "","reduceFixup": "","x": 610,"y": 220,"wires": [["2229838c3375166e"]]},{"id": "7b17b1f6c871629b","type": "comment","z": "47697d9d8a564c16","name": "1. 用于演示的MQTT代理","info": "","x": 190,"y": 40,"wires": []},{"id": "0a22dc0691ff8231","type": "comment","z": "47697d9d8a564c16","name": "2. 通过Modbus分别采集水箱的水位和温度,合并为水箱参数,由MQTT客户端发布","info": "","x": 370,"y": 140,"wires": []},{"id": "c3e9a93a84b8059c","type": "comment","z": "47697d9d8a564c16","name": "3. 订阅水箱参数","info": "","x": 160,"y": 340,"wires": []},{"id": "ce08db4b7d9f20f0","type": "modbus-client","name": "","clienttype": "simpleser","bufferCommands": true,"stateLogEnabled": false,"queueLogEnabled": false,"failureLogEnabled": true,"tcpHost": "127.0.0.1","tcpPort": "502","tcpType": "DEFAULT","serialPort": "com4","serialType": "RTU","serialBaudrate": "9600","serialDatabits": "8","serialStopbits": "1","serialParity": "none","serialConnectionDelay": "100","serialAsciiResponseStartDelimiter": "0x3A","unit_id": 1,"commandDelay": 1,"clientTimeout": 1000,"reconnectOnTimeout": true,"reconnectTimeout": 2000,"parallelUnitIdsAllowed": true},{"id": "c3bdb25d487433e4","type": "modbus-client","name": "","clienttype": "simpleser","bufferCommands": true,"stateLogEnabled": false,"queueLogEnabled": false,"failureLogEnabled": true,"tcpHost": "127.0.0.1","tcpPort": "502","tcpType": "DEFAULT","serialPort": "com6","serialType": "RTU","serialBaudrate": "9600","serialDatabits": "8","serialStopbits": "1","serialParity": "none","serialConnectionDelay": "100","serialAsciiResponseStartDelimiter": "0x3A","unit_id": "1","commandDelay": "1","clientTimeout": "1000","reconnectOnTimeout": true,"reconnectTimeout": "2000","parallelUnitIdsAllowed": true},{"id": "0222eb30f00ea5a5","type": "mqtt-broker","name": "","broker": "localhost","port": "1883","clientid": "","autoConnect": true,"usetls": false,"protocolVersion": "4","keepalive": "60","cleansession": true,"birthTopic": "","birthQos": "0","birthPayload": "","birthMsg": {},"closeTopic": "","closeQos": "0","closePayload": "","closeMsg": {},"willTopic": "","willQos": "0","willPayload": "","willMsg": {},"userProps": "","sessionExpiry": ""}]
