• MQTT 数据上送应用示例

    MQTT 数据上送应用示例

    • 此应用使用lua-mosquitto模块(lua-mosquitto)

    代码:

    1. local class = require 'middleclass'
    2. local mosq = require 'mosquitto'
    3. local cjson = require 'cjson.safe'
    4. local sub_topics = {
    5. "app/#",
    6. "sys/#",
    7. "output/#",
    8. "command/#",
    9. }
    10. local mqtt_reconnect_timeout = 100
    11. --- 注册对象(请尽量使用唯一的标识字符串)
    12. local app = class("BAIDU_IOT_CLOUD")
    13. --- 设定应用最小运行接口版本(目前版本为1,为了以后的接口兼容性)
    14. app.API_VER = 1
    15. ---
    16. -- 应用对象初始化函数
    17. -- @param name: 应用本地安装名称。 modbus_com_1
    18. -- @param sys: 系统sys接口对象。参考API文档中的sys接口说明
    19. -- @param conf: 应用配置参数。由安装配置中的json数据转换出来的数据对象
    20. function app:initialize(name, sys, conf)
    21. self._name = name
    22. self._sys = sys
    23. self._conf = conf
    24. --- 获取数据接口
    25. self._api = sys:data_api()
    26. --- 获取日志接口
    27. self._log = sys:logger()
    28. self._nodes = {}
    29. self._mqtt_id = conf.mqtt_it or sys:id() -- using system iot id
    30. self._username = conf.username or "symlinkdemo/demo"
    31. self._password = conf.password or "q1Z/lxXqz2W33NZir6MW13RpCPAFELSiirVvGDfaaQw="
    32. self._mqtt_host = conf.server or "symlinkdemo.mqtt.iot.bj.baidubce.com"
    33. self._mqtt_port = conf.port or "1883"
    34. self._enable_tls = conf.enable_tls or false
    35. self._close_connection = false
    36. end
    37. -- @param app: 应用实例对象
    38. local function create_handler(app)
    39. local api = app._api
    40. local server = app._server
    41. local log = app._log
    42. local self = app
    43. return {
    44. --- 处理设备对象添加消息
    45. on_add_device = function(app, sn, props)
    46. return self:fire_devices(1000)
    47. end,
    48. --- 处理设备对象删除消息
    49. on_del_device = function(app, sn)
    50. return self:fire_devices(1000)
    51. end,
    52. --- 处理设备对象修改消息
    53. on_mod_device = function(app, sn, props)
    54. return self:fire_devices()
    55. end,
    56. --- 处理设备输入项数值变更消息
    57. on_input = function(app, sn, input, prop, value, timestamp, quality)
    58. return self:handle_input(app, sn, input, prop, value, timestamp, quality)
    59. end,
    60. on_event = function(app, sn, level, data, timestamp)
    61. return self:handle_event(app, sn, level, data, timestamp)
    62. end,
    63. on_stat = function(app, sn, stat, prop, value, timestamp)
    64. return self:handle_stat(app, sn, stat, prop, value, timestamp)
    65. end,
    66. }
    67. end
    68. function app:start_reconnect()
    69. self._mqtt_client = nil
    70. self._sys:timeout(mqtt_reconnect_timeout, function() self:connect_proc() end)
    71. mqtt_reconnect_timeout = mqtt_reconnect_timeout * 2
    72. if mqtt_reconnect_timeout > 10 * 60 * 100 then
    73. mqtt_reconnect_timeout = 100
    74. end
    75. end
    76. function app:handle_input(app, sn, input, prop, value, timestamp, quality)
    77. local msg = {
    78. app = app,
    79. sn = sn,
    80. input = input,
    81. prop = prop,
    82. value = value,
    83. timestamp = timestamp,
    84. quality = quality
    85. }
    86. if self._mqtt_client then
    87. self._mqtt_client:publish("/data", cjson.encode(msg), 1, false)
    88. end
    89. end
    90. function app:handle_event(app, sn, level, data, timestamp)
    91. local msg = {
    92. app = app,
    93. sn = sn,
    94. level = level,
    95. data = data,
    96. timestamp = timestamp,
    97. }
    98. if self._mqtt_client then
    99. self._mqtt_client:publish("/event", cjson.encode(msg), 1, false)
    100. end
    101. end
    102. function app:handle_stat(app, sn, stat, prop, value, timestamp)
    103. local msg = {
    104. app = app,
    105. sn = sn,
    106. stat = stat,
    107. prop = prop,
    108. value = value,
    109. timestamp = timestamp,
    110. }
    111. if self._mqtt_client then
    112. self._mqtt_client:publish("/statistics", cjson.encode(msg), 1, false)
    113. end
    114. end
    115. function app:fire_devices(timeout)
    116. local timeout = timeout or 100
    117. if self._fire_device_timer then
    118. return
    119. end
    120. self._fire_device_timer = function()
    121. local devs = self._api:list_devices() or {}
    122. if self._mqtt_client then
    123. self._mqtt_client:publish("/devices", cjson.encode(devs), 1, true)
    124. end
    125. end
    126. self._sys:timeout(timeout, function()
    127. if self._fire_device_timer then
    128. self._fire_device_timer()
    129. self._fire_device_timer = nil
    130. end
    131. end)
    132. end
    133. function app:connect_proc()
    134. local log = self._log
    135. local sys = self._sys
    136. local mqtt_id = self._mqtt_id
    137. local mqtt_host = self._mqtt_host
    138. local mqtt_port = self._mqtt_port
    139. local clean_session = self._clean_session or true
    140. local username = self._username
    141. local password = self._password
    142. -- 创建MQTT客户端实例
    143. log:debug("Baidu Cloud MQTT", mqtt_id, mqtt_host, mqtt_port, username, password)
    144. local client = assert(mosq.new(mqtt_id, clean_session))
    145. client:version_set(mosq.PROTOCOL_V311)
    146. client:login_set(username, password)
    147. if self._enable_tls then
    148. client:tls_set(sys:app_dir().."/root_cert.pem")
    149. end
    150. -- 注册回调函数
    151. client.ON_CONNECT = function(success, rc, msg)
    152. if success then
    153. log:notice("ON_CONNECT", success, rc, msg)
    154. client:publish("/status", cjson.encode({device=mqtt_id, status="ONLINE"}), 1, true)
    155. self._mqtt_client = client
    156. self._mqtt_client_last = sys:time()
    157. for _, v in ipairs(sub_topics) do
    158. client:subscribe("/"..v, 1)
    159. end
    160. --client:subscribe("+/#", 1)
    161. --
    162. mqtt_reconnect_timeout = 100
    163. self:fire_devices(1000)
    164. else
    165. log:warning("ON_CONNECT", success, rc, msg)
    166. self:start_reconnect()
    167. end
    168. end
    169. client.ON_DISCONNECT = function(success, rc, msg)
    170. log:warning("ON_DISCONNECT", success, rc, msg)
    171. if self._mqtt_client then
    172. self:start_reconnect()
    173. end
    174. end
    175. client.ON_LOG = function(...)
    176. --print(...)
    177. end
    178. client.ON_MESSAGE = function(...)
    179. print(...)
    180. end
    181. client:will_set("/status", cjson.encode({device=mqtt_id, status="OFFLINE"}), 1, true)
    182. self._close_connection = false
    183. local r, err
    184. local ts = 1
    185. while not r do
    186. r, err = client:connect(mqtt_host, mqtt_port, mqtt_keepalive)
    187. if not r then
    188. log:error(string.format("Connect to broker %s:%d failed!", mqtt_host, mqtt_port), err)
    189. sys:sleep(ts * 500)
    190. ts = ts * 2
    191. if ts >= 64 then
    192. client:destroy()
    193. sys:timeout(100, function() self:connect_proc() end)
    194. -- We meet bug that if client reconnect to broker with lots of failures, it's socket will be broken.
    195. -- So we will re-create the client
    196. return
    197. end
    198. end
    199. end
    200. self._mqtt_client = client
    201. --- Worker thread
    202. while self._mqtt_client and not self._close_connection do
    203. sys:sleep(0)
    204. if self._mqtt_client then
    205. self._mqtt_client:loop(50, 1)
    206. else
    207. sys:sleep(50)
    208. end
    209. end
    210. if self._mqtt_client then
    211. self._mqtt_client:disconnect()
    212. self._mqtt_client = nil
    213. log:notice("Cloud Connection Closed!")
    214. end
    215. end
    216. function app:disconnect()
    217. if not self._mqtt_client then
    218. return
    219. end
    220. self._log:debug("Cloud Connection Closing!")
    221. self._close_connection = true
    222. while self._mqtt_client do
    223. self._sys:sleep(10)
    224. end
    225. return true
    226. end
    227. --- 应用启动函数
    228. function app:start()
    229. --- 设定回调处理对象
    230. self._handler = create_handler(self)
    231. self._api:set_handler(self._handler, true)
    232. self._sys:fork(function()
    233. self:connect_proc()
    234. end)
    235. self._log:debug("Baidu Cloud connector started!")
    236. return true
    237. end
    238. --- 应用退出函数
    239. function app:close(reason)
    240. mosq.cleanup()
    241. end
    242. --- 应用运行入口
    243. function app:run(tms)
    244. return 1000 * 10 -- 10 seconds
    245. end
    246. --- 返回应用对象
    247. return app