Publish/Subscribe queues are fun and useful. I first learned about them when tinkering with Redis a while back. One big drawback of RedisPub/Sub is that Redis project refuses to add some form of transport layer security, which means anything and everything is transferred into and out of Redis unencrypted.

I recently discovered MQTT, the Message Queue Telemetry Transport protocol, which is

a lightweight broker-based publish/subscribe messaging protocol designed to be open, simple, lightweight and easy to implement.

The MQTT protocol provides one-to-many message distribution over TCP/IP with a small transport overhead, and a method to notify parties of abnormal disconnection of a client.

I think of a Pub/Sub message queue as “Twitter for my network”. Clients subscribe to be notified of incoming messages pertaining to specific topics, and other clients publish on those topics. A topic (think of it as a kind of channel) classifies messages. For example, I could have topics called nagios/mta, nagios/disk, test/jp/private, etc. Clients can subscribe to any number of topics, and may include wild-cards when subscribing (e.g. nagios/#). In the context of MQTT, messages are blobs of opaque data (UTF-8, i.e. binary safe) with a maximum size of 256MB.

MQTT can be used in low-bandwidth or unreliable network environments, on embedded devices (e.g. on an mbed with the mbed MQTT library, on an Arduino with the Arduino client for MQTT), from Lua, Python, Perl, etc.

What I find particularly interesting with this technology, is that it enables me to employ a single (secured) transport mechanism for all sorts of data. I don’t have to pierce holes in a firewall to specifically allow this and that; a single TCP port suffices to transport all sorts of (smallish) stuff.

Mosquitto is an open source MQTT broker, which I chose over IBM’s RSMB because source code of the latter is not freely available. Mosquitto is easy to install and deploy, and its documentation is more than adequate.

The Mosquitto broker supports TLS out of the box, and provides authentication either via username/password, pre-shared keys or TLS client certificates. Furthermore, Mosquitto has a simple ACL by which the broker administrator can configure which clients may access which topics. (Clients identify themselves by a name they specify upon connecting.)

A small Python program connects to the broker and subscribes to a few topics:

#!/usr/bin/env python

import mosquitto

def on_message(mosq, obj, msg):
    print "%-20s %d %s" % (msg.topic, msg.qos, msg.payload)

    mosq.publish('pong', "Thanks", 0)

def on_publish(mosq, obj, mid):
    pass

cli = mosquitto.Mosquitto()
cli.on_message = on_message
cli.on_publish = on_publish

cli.tls_set('root.ca',
    certfile='c1.crt',
    keyfile='c1.key')

cli.connect("hippo", 1883, 60)

cli.subscribe("dns/all", 0)
cli.subscribe("nagios/#", 0)

while cli.loop() == 0:
    pass

If I run that on a client, the program will sit there waiting for messages published to the dns/all or nagios/# topics. In a different window, I can use the mosquitto_pub utility to fire off a message:

mosquitto_pub -d -h hippo.ww.mens.de \
    --cafile root.ca \
    --cert c1.crt \
    --key c1.key \
    -q 0 \                             # quality-of-service
    -t nagios/n1 \                     # topic
    -m 'mail server 001 is broke'      # message

Whereupon the Python program displays:

nagios/n1 0 mail server 001 is broke

(You’ll have noticed that the Python program publishes a “Thanks” on the pong topic; clients subscribed to that will receive it.)

Clients can optionally set a Will (in Python before the connect() call). This “last will and testament” (so to speak) is published by the broker when a client disconnects unexpectedly. This can be useful to notify of particular clients’ deaths. In other words, if a client issues the following request before connecting, and it dies at some later point, the broker will publish the payload on this client’s behalf.

cli.will_set('clients/w1', payload="Adios!", qos=0)

Bridge

Mosquitto can be configured as a so-called “bridge”. I could imagine this being useful in, say, different data centers.

In a bridge configuration, Mosquitto is configured to pass certain topics in certain directions. For example, I could configure a bridge to notify a “central” broker for messages of topic +/important. A bridge connects to a broker like any other client and subscribes to topics which are then “imported” into the bridge; clients connected to said bridge can thus be notified for particular subscriptions from the “main” broker.

Mosquitto periodically publishes statistics which interested parties can subscribe to, e.g. for monitoring purposes.

$SYS/broker/version mosquitto version 1.1
$SYS/broker/clients/total 368
$SYS/broker/clients/active 91
$SYS/broker/clients/inactive 277
$SYS/broker/clients/maximum 368
$SYS/broker/messages/received 13358099
$SYS/broker/messages/sent 16381123
$SYS/broker/messages/dropped 414180
$SYS/broker/messages/stored 10806
$SYS/broker/messages/sent 16381123
$SYS/broker/messages/sent 16381123
$SYS/broker/bytes/received 761223497
$SYS/broker/bytes/sent 476065843
$SYS/broker/load/bytes/sent/1min 28745.93
$SYS/broker/load/bytes/sent/5min 15418.24
$SYS/broker/load/bytes/sent/15min 6980.69
[...]

The Mosquitto project has a test server you can use if you don’t want to set up your own (just launch mosquitto_sub at it), and there’s a nice-looking Web-based interface you can use (on your public MQTT server) at http://mqtt.io.

Wishlist

There are a couple of things which would be neat to see implemented:

  • Nagios and Icinga typically “go out” and get statii of services, unless I configure them to accept passive checks. How about a plugin or component for Nagios and Icinga which subscribes to specific topics? That would allow for blindingly fast notification of problems within the network, without having to await a check interval? (It’s been a while since I did something with these monitoring processes, so this may be stupid, but it sounds good to me …) A notification could be wrapped into a JSON object containing a Nagios exit code and the reason.
  • Logstash should support MQTT as an input filter. That would allow me to use this message broker for shipping logs. (Logstash already supports AMQP and ZeroMQ.) On the other hand, I could easily create a Mosquitto to Redis “converter”, and use Redis as input to Logstash.

Does anybody feel like doing that? :-)

Further reading:

pubsub and broker :: 25 Feb 2013 :: e-mail