N2O Apps over
MQ Telemetry Transport

Web Framework w/o Web Server?

Usually, Web Framework uses HTTP or WebSocket endpoints. But what if we need to connect application on devices with TCP/UDP transports? One of the protocols addressing this usecase is MQTT that was designed by Andy Stanford-Clark (IBM) and Arlen Nipper in 1999 for connecting Oil Pipeline telemetry systems over satellite. The protocol is designed to work over connectionless unreliable environments, and provides three levels of QoS delivery: at least once, at most once, exactly once.

On the other hand, N2O is a federation of protocols, like XMPP/CORBA/WS. Although N2O supports several encoders (BERT/XML/JSON/TEXT) the extension to other channels beyond WebSockets would involve creating a new Socket Server. So we have decided to take MQTT as a transport/session protocol for N2O Federation.

EMQ is an open-source MQTT broker implementation by Feng Lee. It's clean, concise and robust. We've developed an EMQ compatible plugin that works entirely within MQTT sessions just like N2O already does within Cowboy's ranch sessions. This is the main difference from Phoenix's gen_server channels.

Picture 1. Protocol Modules
Picture 2. Protocol Messages

N2O: Protocol Server for MQTT

Starting from 4.5, N2O reduce everything duplicating MQTT features. On the other hand, 4.5 is a completely N2O-compatible embeddable protocol relay with limited features. And it takes only 25KB in Erlang.

N2O 4.5 provides real smooth experience for developers: clean git history, small codebase, nitro compatible, man/html docs.

Listing 1. N2O 4.5 MQTT
    $ time git clone git://github.com/synrc/mqtt
    real       0m0.794s

    $ cloc .

    $ cat rebar.config
    {deps, [{nitro, "git://github.com/synrc/nitro"},
            {n2o,   "git://github.com/synrc/mqtt"}]}.

    $ man ./man/n2o.1


N2O things deprecated due to MQTT

N2O_start and n2o.js are no longer used in MQTT version of N2O. Instead of N2O_start, MQTT_start and mqtt.js should be used for session control replacement. We traded HEART protocol and session facilities for built-in MQTT features. N2O authentication and authorization mechanisms are also abandoned as MQTT could provide AUTH features too. Obviously wf:reg and wf:send APIs have also been abandoned as we can use emqttd API directly and {deliver,_} protocol of ws_client gen_server.



All N2O/MQTT messages which go directly to TCP and/or WebSocket. However there are some endpoints which are not TCP sockets, even non-sockets, like gen_server endpoint, HTTP REST endpoint, which is non-WebSocket endpoint, and there is a room for other endpoint types.

Here is a list of types of endpoints which are supported by EMQ and accesible to N2O apps: WebSockets, MQTT, MQTT-SN, TCP, UDP, CoAP. Normal use of N2O as a Web Framework or a Web Application Server is through WebSockets, but for IoT and MQTT applications it could be served through UDP or SCTP protocols providing application level message delivery consistency. By using MQTT as a transport we extend the supported set of endpoint protocols.

Also N2O is able to work under cowboy Erlang web server and mochiweb web server. There is rest library to deal with REST endpoint.

MQTT Sessions

N2O is working entirely in the context of ws_client processes of EMQ, just as it is working on top of ranch processes of cowboy. No additional gen_server is being introduced.

The only official transparent way with zero abstraction is to use EMQ hooks mechanism. For N2O we need to implement only two cases: client.subscribe and message.delivered. On client.subscribe we deliver all persistent data that are ready for the client. On message.delivered we unpack any N2O BERT protocol message inside MQTT session.

N2O and MQTT Formatters

N2O supports TEXT, JSON, XML, BERT, MessagePack formatters but you can add your own implementations either. They are terminal formatters before socket interface library (usually ranch or esockd). For example one may format IO message with BERT encoding n2o:format({io,Eval,Data},bert).

MQTT format messages only in MQTT format. You can wrap binary B in MQTT packet as emqttd_message:make(Name, 0, Name, B).

Sample Application

Listing 2. Sample Web Application written in NITRO DSL
main() -> []. event(init) -> nitro:update(loginButton, #button{id=loginButton,body="Login", postback=login,source=[user,pass]}); event(login) -> nitro:redirect("index.htm?room=" ++ nitro:to_list( n2o:q(pass)));
Picture 3. N2O Review Application

EMQ Plugin Implementation

Here you will see the implementation of N2O bridge for EMQ. We will inject N2O loop inside EMQ session handlers. As you may know, the single point of entrance in N2O is the event(init) message handler. It can only be reached by calling {init,_} message of NITRO protocol.

Listing 3. Emulating "N2O," init handshake
on_client_subscribe(ClientId, Username, TopicTable, _Env) -> Name = binary_to_list(iolist_to_binary(ClientId)), BinTopic = element(1,hd(TopicTable)), put(topic,BinTopic), n2o_cx:context(#cx{module=route:select(BinTopic), formatter=bert,params=[]}), case n2o_proto:info({init,<<>>},[],?CTX) of {reply, {binary, M}, _, #cx{}} -> self() ! {deliver, emqttd_message:make(Name, 0, Name, M)}; _ -> skip end, {ok, TopicTable}.

However N2O MQTT is a protocol federation so we need to handle not only N2O messages, but also KVS, ROSTER, BPE, REST protocols. Thus after initialization during client.subscribe we call n2o_proto:info — the entire N2O protocol chain recursor inside message.delivered hook. This is the best place to put the federation relay for N2O modules.

Listing 4. Emulating N2O message delivery
on_message_delivered(ClientId, Username, Message = #mqtt_message{topic = Topic, payload = Payload}, _Env) -> Name = binary_to_list(ClientId), case n2o_proto:info(binary_to_term(Payload),[],?CTX) of {reply, {binary, M}, R, #cx{}} -> case binary_to_term(M) of {io,X,_} -> Msg = emqttd_message:make(Name, 0, Name, M), self() ! {deliver, Msg}, {ok, Message}; _ -> {ok, Message} end; _ -> {ok, Message} end.

MQTT JavaScript Client

Listing 5. mq.js
var topic = module + "_" + params.room || "lobby", mqtt = new Paho.MQTT.Client("", 8083, ''), subscribeOptions = { qos: 0, onSuccess: function () { console.log("N2O Subscribed"); }, onFailure: function (m) { console.log("SUB Failure: " + message.errorMessage); }, timeout: 3 }, options = { timeout: 3, onFailure: function (m) { console.log("SND Failure: " + m.errorMessage); }, onSuccess: function () { console.log("N2O Connected"); mqtt.subscribe(topic, subscribeOptions); } }, ws = { send: function (payload) { var message = new Paho.MQTT.Message(payload); message.destinationName = topic; message.qos = 0; mqtt.send(message); } }; function MQTT_start() { mqtt.onConnectionLost = function (o) { }; mqtt.onMessageArrived = function (m) { var BERT = m.payloadBytes.buffer.slice( m.payloadBytes.byteOffset, m.payloadBytes.byteOffset + m.payloadBytes.length); try { erlang = dec(BERT); console.log(utf8_dec(erlang.v[1].v)); for (var i=0;i<$bert.protos.length;i++) { p = $bert.protos[i]; if (p.on(erlang, p.do).status == "ok") return; } } catch (e) { console.log(e); } }; mqtt.connect(options); } MQTT_start();

Enable EMQ Plugin

The N2O over MQTT bridge is a drop-in plugin for EMQ broker. After starting the server you should enable N2O over MQTT bridge EMQ plugin on plugin page and then open the application sample

Picture 4. N2O 4.5 MQTT as EMQ Plugin

The nice thing about EMQ is that it enables session introspection for N2O connections. The EMQ Dashboards is written in vue.js and a bit complex for admin application. The possible and expecting hackaton is on rewriting EMQ admin with hyperapp and n2o. The IBM library Paho is also too old and fat to fit N2O sizes. Some parts should be replaced with more modern and compact pinger and connection respawning from n2o.js. This is expecting further research and imporevement.

Picture 5. Inspecting N2O sessions with EMQ

EMQ — Lightweight Broker as Service Container for N2O Apps

EMQ is the latest software that has robust iOS and Android MQTT client libraries, just have a look at https://github.com/emqtt Github organization.

Picture 6. MQTT 3.1.1 vs MQTT 5.0

We made a compatible EMQ plugin and N2O Review sample application with built-in EMQ with minimal dependencies. Our fork is based on the latest EMQ master and is dedicated for building with our MAD build tool.

Listing 6. Setup MQTT flavoured N2O.
$ brew install erlang $ curl -fsSL https://raw.github.com/synrc/mad/master/mad > mad \ && chmod +x mad \ && sudo cp mad /usr/local/bin $ git clone git://github.com/voxoz/mq && cd mq $ time mad dep com Writing /apps/review/ebin/review.app OK real 1m45.357s user 0m17.166s sys 0m5.065s $ mad rep Configuration: [{n2o, [{port,8000}, {app,review}, {pickler,n2o_secret}, {formatter,bert}, {log_modules,config}, {log_level,config}]}, {emq_dashboard, [{listeners_dash, [{http,18083,[{acceptors,4},{max_clients,512}]}]}]}, {emqttd, [{listeners, [{http,8083,[{acceptors,4},{max_clients,512}]}, {tcp,1883,[{acceptors,4},{max_clients,512}]}]}, {sysmon, [{long_gc,false}, {long_schedule,240}, {large_heap,8000000}, {busy_port,false}, {busy_dist_port,true}]}, {session, [{upgrade_qos,off}, {max_inflight,32}, {retry_interval,20}, {max_awaiting_rel,100}, {await_rel_timeout,20}, {enable_stats,off}]}, {queue,[]}, {allow_anonymous,true}, {protocol, [{max_clientid_len,1024},{max_packet_size,64000}]}, {acl_file,"etc/acl.conf"}, {plugins_etc_dir,"etc/plugins/"}, {plugins_loaded_file,"etc/loaded_plugins"}, {pubsub, [{pool_size,8},{by_clientid,true},{async,true}]}]}, {kvs, [{dba,store_mnesia}, {schema,[kvs_user,kvs_acl,kvs_feed,kvs_subscription]}]}] Applications: [kernel,stdlib,gproc,lager_syslog,pbkdf2,asn1,fs,ranch,mnesia, compiler,inets,crypto,syntax_tools,xmerl,gen_logger,esockd, cowlib,goldrush,public_key,lager,ssl,cowboy,mochiweb,emqttd, erlydtl,kvs,mad,emqttc,nitro,rest,sh,syslog,review] Erlang/OTP 19 [erts-8.3] [source] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false] [dtrace] Eshell V8.3 (abort with ^G) starting emqttd on node '[email protected]' Nonexistent: [] Plugins: [{mqtt_plugin,emq_auth_username,"2.1.1", "Authentication with Username/Password",false}, {mqtt_plugin,emq_dashboard,"2.1.1","EMQ Web Dashboard",false}, {mqtt_plugin,emq_modules,"2.1.1","EMQ Modules",false}, {mqtt_plugin,n2o,"4.5-mqtt","N2O Server",false}] Names: [emq_dashboard,n2o] dashboard:http listen on with 4 acceptors. Async Start Attempt {handler,"timer",n2o,system,n2o,[],[]} Proc Init: init mqtt:ws listen on with 4 acceptors. mqtt:tcp listen on with 4 acceptors. emqttd 2.1.1 is running now >

Related Documents