Making ActiveMQs Topics virtual for use with Queues

[Reading Time: 6 minutes]

The problem

Before I start explaining the core part, I will tell you the reason, why I am posting this.

or directly to the solution

Everything started with RabbitMQ 😉 in a major enterprise IoT-project for a well-known german refridgerator producer, that has been mentioned by Satya Nadella at the Hannover-Messe in Germany in 2016.
The goal was, to bring smart devices online with a common messaging protocol. But because of some issues with the broker (handling some sort of certificates), the decision has been made, to replace RabbitMQ by ActiveMQ.
The problem (technically) with that decision was, that all sinks of messages send to the broker with MQTT, will be handled by ActiveMQ as Topics. This leads the whole communication architecture to be not scalable.

Please let me explain some details regarding Scalability and Topics vs. Queues…

If you like to have an IoT-Architecture, that is scalable, than you should enable your environment to handle as much messages as possible in parallel. To achieve this, you have to decouple logics/functions into Worker-Units. With this, you can higher the number of parallel worker, that can then consume messages parallel. As you can see… this is scaling! But, what is very important, to point to, is that these workers can only consume messages parallel, if the source of the messages (where worker consume messages from) delivers messages with FIFO pattern (or one in one out). This also known as queuing concept.

Queues
Queues
are constructs, that allows consumer, to handle only one message at a time. With that you have a concurrency enabled environment, where all consumers are “battling” for messages. So with this pattern, you can keep queues “empty”. And, if there is more load at the system, then you can scale by awaking some more instances of the same worker.

Topics

The other, let’s call it “message handling strategy”, is Topics. The idea behind this is, to have multiple different worker consume on topic-related messages. Think of the following scenario, you have a smart device, sending notification, alarms and other emergent messages; But also it sends some logging/monitoring messages. Further you like to have some backend modules, that handle these different types of messages in a different manner, then you need Topics. A message would then be routed to either a topic called “Alerts” or “loggings” or whatever…. With that strategy you deliver one message to all the consuming workers listening on the same topic.

The problem with change of broker

Maybe, you can see the main problem with the broker-change… after changing from RabbitMQ, that handled all the messages send by MQTT protocol in queues, the concept changed to topics, because ActiveMQ handles MQTT messages with topics-Strategy.

[Quote: “ActiveMQ is a JMS broker in its core, so there needs to be some mapping between MQTT subscriptions and JMS semantics. Subscriptions with QoS=0 (At Most Once) are directly mapped to plain JMS non-persistent topics. For reliable messaging, QoS=1 and QoS=2, by default subscriptions are transformed to JMS durable topic subscribers. This behaviour is desired in most scenarios. For some use cases, it is useful to map these subscriptions to virtual topics. Virtual topics provide a better scalability and are generally better solution if you want to use you MQTT subscribers over network of brokers.“, https://activemq.apache.org/mqtt.html ]

And that makes our architecture become inefficient/ not scalable. One possibility to get out of this situation, is to change that smart device to send by AMQP (for example), but this would mean, to change the whole project plan and with that time to market. Or you make some “magic” configuration stuff at brokers side J… as a second solution.

The solution

How to change Topics to virtual topics

As a developer in the eco-system of Microsoft (Visual Studio, C#, .Net,….) it is not so trivial, to get into this Broker and understand the different concepts, “languages” and further “strange things” that are used with ActiveMQ. Also the documentation of that broker is not that detailed and easy to understand, if you only want to use this product. (but that is understandable, because no one should use it as an OOB-Tool). But I think this solution here could be found useful for other devs, like me, that are only looking for a solution to get around the problem, I explained before.

First: Config

Somewhere in the install folder of ActiveMQ…

There is a “conf” folder, containing the activemq.xml. There you can find your allowed and configured connections in the section transportConnectors.

In the excerpt below, you can see the line, where mqtt protocol is enabled. This line gets extended by ActiveMQ parameter for subscription strategy (see below). https://activemq.apache.org/mqtt.html

<transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://11.11.11.11:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <!--<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>-->
            <!-- <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> -->
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883"/>
Should be replaced by
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions"/>		
</transportConnectors>

If you haven’t already, than you should fix you code as well, to read from queues:

  • Create consumer for Queues instead for topics and pass the new path-pattern…
  • With virtual Topics the name of the queue is changed to a pattern as follows. Consumer.Application.VirtualTopic.QueueName.
    A example… If you send a message to topic Alert by the pattern smartDevice.number.Alert this will change to (from Broker or consumer perspective) VirtualTopic.smartDevice.number.Alert
    That is then consumable by that path(e.g.): Consumer.AlertWorker.VirtualTopic.smartDevice.number.Alert (means, I have a worker (or multiple instances) handling alert messages from the queue “Alert” of a specific device).
    You can also use wildcard characters like * e.g. Consumer.AlertWorker.VirtualTopic.smartDevice.*.Alert (means, I have a worker (or multiple instances) handling alert messages from the queue “Alert” of all devices).
    .. and so on.

I hope someone can make use of this solution, otherwise it may lead to another. Please share your thoughts!

By Thomas

As Chief Technology Officer at Xpirit Germany. I am responsible for driving productivity for our customers by a full stack of dev and technology in modern times. But I not only care for technologies from Microsofts stack like Azure, AI, and IoT, but also for delivering quality and expertise with DevOps

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.