# MQTT Communication v2

Before you start exchanging messages with the cloud via the Edge Agent, you need to connect with the MQTT broker that is running on your Ability Edge. We have listed the environment variables that will allow you to get all the necessary information (MQTT broker URL, username, password). In this article, we will show you an example of how you can use these values to connect to the broker. Although we only show you examples in C# and Node.js, you could use any other programming language that supports the MQTT protocol.

# Environment variables

Every Edge module has a number of environment variables, which are listed below:

# Common variables

The variables listed below are required for every single Edge to work properly:

  • object_id - objectId of the module in the Ability Information Model
  • mqtt_client_id - the client ID of the module for MQTT communication
  • module_id - the login to establish the MQTT communication
  • mqtt_password_file - the path to the file that contains a password for establishing MQTT communication
  • mqtt_url - URL to the MQTT broker

# Edge secrets variables

With API V2 supporting Edge secrets, here are two variables you can use to enable secrets:

  • private_key_path=/run/secrets/module_id-private
  • public_key_path=/run/secrets/module_id-public

# MQTT Topics

These variables contain the MQTT topics that your module will use to communicate with the Edge Agent.

  • topicsV2_ack_in - used for all ack messages either from cloud or edge processing. For more details on ack for actions (equivalent to Device API v2 operations), check the corresponding descriptions.

Successful retrieval

Agent processing result|status is only sent in the ack endpoint when a given message is sent with qos > 0 (1 or 2). It is especially useful for telemetry (that is, variables, events, and alarms), as Cloud processing doesn't support any ack for those kinds of messages.

  • topicsV2_action_out - used in all Device API V2 events
  • topicsV2_alarm_out - used to publish alarm type messages to the platform
  • topicsV2_event_in - this topic allows to read all Device API v2 notifications. Note this is not the same as ACK response
  • topicsV2_event_out - used to publish event type messages to the platform
  • topicsV2_local_in - used for local communication between Edge modules - messages from other modules will be delivered here
  • topicsV2_local_out - used for local communication between Edge modules - send messages to other modules here
  • topicsV2_methods_in - whenever a method/command is invoked from the cloud, it is delivered via this topic
  • topicsV2_methods_out - use this topic to provide outputs/responses for method/command invocations
  • topicsV2_variable_out - used to publish variable type messages to the platform
  • topicsV2_platformevent - used to broadcast the Information Model Public Key

You can find more detailed information about each of the topics in their corresponding articles.

# Example of environment variables

Here is an example of the environment variables that an Ability Edge module is given. Note that only variables configured by the Ability Edge framework are shown). In this example, the module's name is "my_edge_module".

"module_id=my_edge_module",
"mqtt_client_id=my_edge_module",
"mqtt_password_file=/run/secrets/my_edge_module",
"mqtt_url=mqtt://edge-broker:1883",
"object_id=0d7a78c3-6aa1-4325-b30b-e99534868e38",
"private_key_path=/run/secrets/my_edge_module-private",
"public_key_path=/run/secrets/my_edge_module-public",
"topicsV2_ack_in=to/module/my_edge_module/ack",
"topicsV2_action_out=from/module/my_edge_module/action",
"topicsV2_alarm_out=from/module/my_edge_module/alarm",
"topicsV2_event_in=to/module/my_edge_module/event",
"topicsV2_event_out=from/module/my_edge_module/event",
"topicsV2_local_in=to/module/my_edge_module/local",
"topicsV2_local_out=from/module/my_edge_module/local",
"topicsV2_methods_in=to/module/my_edge_module/action/method/invoke",
"topicsV2_methods_out=from/module/my_edge_module/ack/method/invoke",
"topicsV2_variable_out=from/module/my_edge_module/variable",

The example below shows the following operations:

  • establishment of MQTT communication using parameters taken from the environment variables;
  • subscription to all of the topics where messages could come from;
  • publishing of one message - a single value for a variable called "temperature".

# C#

In this example, we used MQTTnet (version 3.0.14) for MQTT comunication.

using System.Linq;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
var mqttClient = new MqttFactory().CreateMqttClient();
_mqttClient.UseApplicationMessageReceivedHandler(message => {
    //handle messages delivered from the topics you subscribe to
}); 
var mqttUrl = new System.Uri(Environment.GetEnvironmentVariable("mqtt_url");
var options = new MqttClientOptionsBuilder()
    .WithClientId(Environment.GetEnvironmentVariable("mqtt_client_id"))
    .WithTcpServer(mqttUrl.Host, mqttUrl.Port)
    .WithCredentials(
        Environment.GetEnvironmentVariable("module_id"),
        File.ReadAllText(Environment.GetEnvironmentVariable("mqtt_password_file")))
    .WithCleanSession()
    .Build();
await mqttClient.ConnectAsync(options, CancellationToken.None);
// Topics subscription
var topics = new[]
{
    Environment.GetEnvironmentVariable("topicsV2_local_in"),
    Environment.GetEnvironmentVariable("topicsV2_methods_in")
    Environment.GetEnvironmentVariable("topicsV2_event_in"),
    Environment.GetEnvironmentVariable("topicsV2_ack_in")
};
var topicFilters = topics.Select(t => 
    new MqttTopicFilterBuilder().WithTopic($"{t}/#").Build());
await _mqttClient.SubscribeAsync(topicFilters);
// Sending a message (example of telemetry)
var message = new MqttApplicationMessageBuilder()
    .WithTopic($"{Environment.GetEnvironmentVariable("topicsV2_variable_out")}")
    .WithPayload("{\"objectId\": \"116fff1d-d1c6-469b-8e8d-c0f6fd1a7f8e\", \"model\": \"abb.ability.device\", \"timestamp\": \"2021-02-09T13:52:21.4954232Z\",  \"value\": \"10.54\", \"variable\": \"temperature\"}")
    .WithExactlyOnceQoS()
    .WithRetainFlag()
    .Build();
await _mqttClient.PublishAsync(message, CancellationToken.None);

# Node.js

const mqtt = require("async-mqtt");
const fs = require('fs');
const util = require('util');
const readFile = util.promisify(fs.readFile);
const mqttClient = await mqtt.connectAsync(
    process.env.mqtt_url,
    { 
        username: process.env.mqtt_client_id, 
        password: (await readFile(process.env.mqtt_password_file)).toString('utf8') 
    })
mqttClient.on("message", receivedMessage) //callback for any incoming messages
// Topics subscription
await mqttClient.subscribe(`${process.env.topicsV2_local_in}/#`)
await mqttClient.subscribe(`${process.env.topicsV2_methods_in}/#`)
await mqttClient.subscribe(`${process.env.topicsV2_event_in}/#`)
await mqttClient.subscribe(`${process.env.topics_ack_in}/#`)
// Sending a message (example of telemetry)
const message = {
    objectId: '116fff1d-d1c6-469b-8e8d-c0f6fd1a7f8e',
    variable: 'temperature',
    value: 10.54,
    timestamp: new Date().toISOString(),
    model: 'abb.ability.device'
}
await mqttClient.publish(
    `${process.env.topicsV2_variable_out}`, 
    JSON.stringify(message()))
Last updated: 12/22/2021, 8:36:15 AM
Feedback