- MQTT 数据上送应用示例
MQTT 数据上送应用示例
- 此应用使用lua-mosquitto模块(lua-mosquitto)
代码:
local class = require 'middleclass'local mosq = require 'mosquitto'local cjson = require 'cjson.safe'local sub_topics = {"app/#","sys/#","output/#","command/#",}local mqtt_reconnect_timeout = 100--- 注册对象(请尽量使用唯一的标识字符串)local app = class("BAIDU_IOT_CLOUD")--- 设定应用最小运行接口版本(目前版本为1,为了以后的接口兼容性)app.API_VER = 1----- 应用对象初始化函数-- @param name: 应用本地安装名称。 如modbus_com_1-- @param sys: 系统sys接口对象。参考API文档中的sys接口说明-- @param conf: 应用配置参数。由安装配置中的json数据转换出来的数据对象function app:initialize(name, sys, conf)self._name = nameself._sys = sysself._conf = conf--- 获取数据接口self._api = sys:data_api()--- 获取日志接口self._log = sys:logger()self._nodes = {}self._mqtt_id = conf.mqtt_it or sys:id() -- using system iot idself._username = conf.username or "symlinkdemo/demo"self._password = conf.password or "q1Z/lxXqz2W33NZir6MW13RpCPAFELSiirVvGDfaaQw="self._mqtt_host = conf.server or "symlinkdemo.mqtt.iot.bj.baidubce.com"self._mqtt_port = conf.port or "1883"self._enable_tls = conf.enable_tls or falseself._close_connection = falseend-- @param app: 应用实例对象local function create_handler(app)local api = app._apilocal server = app._serverlocal log = app._loglocal self = appreturn {--- 处理设备对象添加消息on_add_device = function(app, sn, props)return self:fire_devices(1000)end,--- 处理设备对象删除消息on_del_device = function(app, sn)return self:fire_devices(1000)end,--- 处理设备对象修改消息on_mod_device = function(app, sn, props)return self:fire_devices()end,--- 处理设备输入项数值变更消息on_input = function(app, sn, input, prop, value, timestamp, quality)return self:handle_input(app, sn, input, prop, value, timestamp, quality)end,on_event = function(app, sn, level, data, timestamp)return self:handle_event(app, sn, level, data, timestamp)end,on_stat = function(app, sn, stat, prop, value, timestamp)return self:handle_stat(app, sn, stat, prop, value, timestamp)end,}endfunction app:start_reconnect()self._mqtt_client = nilself._sys:timeout(mqtt_reconnect_timeout, function() self:connect_proc() end)mqtt_reconnect_timeout = mqtt_reconnect_timeout * 2if mqtt_reconnect_timeout > 10 * 60 * 100 thenmqtt_reconnect_timeout = 100endendfunction app:handle_input(app, sn, input, prop, value, timestamp, quality)local msg = {app = app,sn = sn,input = input,prop = prop,value = value,timestamp = timestamp,quality = quality}if self._mqtt_client thenself._mqtt_client:publish("/data", cjson.encode(msg), 1, false)endendfunction app:handle_event(app, sn, level, data, timestamp)local msg = {app = app,sn = sn,level = level,data = data,timestamp = timestamp,}if self._mqtt_client thenself._mqtt_client:publish("/event", cjson.encode(msg), 1, false)endendfunction app:handle_stat(app, sn, stat, prop, value, timestamp)local msg = {app = app,sn = sn,stat = stat,prop = prop,value = value,timestamp = timestamp,}if self._mqtt_client thenself._mqtt_client:publish("/statistics", cjson.encode(msg), 1, false)endendfunction app:fire_devices(timeout)local timeout = timeout or 100if self._fire_device_timer thenreturnendself._fire_device_timer = function()local devs = self._api:list_devices() or {}if self._mqtt_client thenself._mqtt_client:publish("/devices", cjson.encode(devs), 1, true)endendself._sys:timeout(timeout, function()if self._fire_device_timer thenself._fire_device_timer()self._fire_device_timer = nilendend)endfunction app:connect_proc()local log = self._loglocal sys = self._syslocal mqtt_id = self._mqtt_idlocal mqtt_host = self._mqtt_hostlocal mqtt_port = self._mqtt_portlocal clean_session = self._clean_session or truelocal username = self._usernamelocal password = self._password-- 创建MQTT客户端实例log:debug("Baidu Cloud MQTT", mqtt_id, mqtt_host, mqtt_port, username, password)local client = assert(mosq.new(mqtt_id, clean_session))client:version_set(mosq.PROTOCOL_V311)client:login_set(username, password)if self._enable_tls thenclient:tls_set(sys:app_dir().."/root_cert.pem")end-- 注册回调函数client.ON_CONNECT = function(success, rc, msg)if success thenlog:notice("ON_CONNECT", success, rc, msg)client:publish("/status", cjson.encode({device=mqtt_id, status="ONLINE"}), 1, true)self._mqtt_client = clientself._mqtt_client_last = sys:time()for _, v in ipairs(sub_topics) doclient:subscribe("/"..v, 1)end--client:subscribe("+/#", 1)--mqtt_reconnect_timeout = 100self:fire_devices(1000)elselog:warning("ON_CONNECT", success, rc, msg)self:start_reconnect()endendclient.ON_DISCONNECT = function(success, rc, msg)log:warning("ON_DISCONNECT", success, rc, msg)if self._mqtt_client thenself:start_reconnect()endendclient.ON_LOG = function(...)--print(...)endclient.ON_MESSAGE = function(...)print(...)endclient:will_set("/status", cjson.encode({device=mqtt_id, status="OFFLINE"}), 1, true)self._close_connection = falselocal r, errlocal ts = 1while not r dor, err = client:connect(mqtt_host, mqtt_port, mqtt_keepalive)if not r thenlog:error(string.format("Connect to broker %s:%d failed!", mqtt_host, mqtt_port), err)sys:sleep(ts * 500)ts = ts * 2if ts >= 64 thenclient:destroy()sys:timeout(100, function() self:connect_proc() end)-- We meet bug that if client reconnect to broker with lots of failures, it's socket will be broken.-- So we will re-create the clientreturnendendendself._mqtt_client = client--- Worker threadwhile self._mqtt_client and not self._close_connection dosys:sleep(0)if self._mqtt_client thenself._mqtt_client:loop(50, 1)elsesys:sleep(50)endendif self._mqtt_client thenself._mqtt_client:disconnect()self._mqtt_client = nillog:notice("Cloud Connection Closed!")endendfunction app:disconnect()if not self._mqtt_client thenreturnendself._log:debug("Cloud Connection Closing!")self._close_connection = truewhile self._mqtt_client doself._sys:sleep(10)endreturn trueend--- 应用启动函数function app:start()--- 设定回调处理对象self._handler = create_handler(self)self._api:set_handler(self._handler, true)self._sys:fork(function()self:connect_proc()end)self._log:debug("Baidu Cloud connector started!")return trueend--- 应用退出函数function app:close(reason)mosq.cleanup()end--- 应用运行入口function app:run(tms)return 1000 * 10 -- 10 secondsend--- 返回应用对象return app
