# Communication Between Edge Modules
This is the fifth in a series of tutorials which explain how to accomplish various functions within the context of an ABB Ability™ Edge. On the ABB Ability™ Edge runtime, the proxy and broker modules provided by the Platform are responsible for handling any communication to or from the ABB Ability™ Edge environment. Thus, any code written within custom modules has built-in security and can be relatively simple.
This tutorial will build upon the first four modules and will demonstrate:
- How to send messages to other ABB Ability™ Edge modules
- How to subscribe to and receive messages from other modules
# Before You Start
The tutorial is written in C# using the .Net Core framework. A basic console application will be created. Familiarity with .NET C# is recommended.
Once the application is compiled, Docker will be used to create and push an image to a remote container registry.
Once the image is deployed, the ABB Ability™ Edge runtime is responsible for identifying custom modules to be loaded and then pulling down the appropriate image and creating a container instance. The corresponding code for this tutorial can be found at this ABB Codebits repository under the “05-inter-module-communication” folder. For this tutorial, there are two projects: in the Telemetry folder the ABB.Ability.IotEdge.CST.Modules.CSharp.InterModuleCommunication.Telemetry project is responsible for generating simulated data and sending messages to the aggregator module; in the Aggregator folder the ABB.Ability.IotEdge.CST.Modules.CSharp.InterModuleCommunication.Aggregator project is responsible for receiving those messages, aggregating the data, and sending the results to the cloud. Both projects reference the ABB.Ability.IotEdge.CST.Modules.CSharp.Common project, which can be found under the “common” folder in the same repository.
For this tutorial, we are using two MIT-licensed nuget packages: MQTTnet for the sending and receiving of MQTT (Message Queuing Telemetry Transport) messages, and Newtonsoft.Json to parse JSON objects.
# Steps
Open the
ABB.Ability.IotEdge.CSTModules.CSharp.InterModuleCommunication.Telemetry
project in Visual Studio, VS Code, or the IDE of your choice. The first point of focus in this tutorial is theTelemetryModule
, which is created on startup in theProgram.cs
class.This module operates in the same manner as the modules covered in previous tutorials - by retrieving its configuration to determine how many simulated devices to create, and by using remote commands to start and stop the publishing of telemetry.
The main difference is that in the
StartSendingTelemetryAsync
method, instead of sending telemetry messages on the “messages out” topic, they are now placed on a topic specific to the aggregator module. This will ensure that the telemetry messages are not sent to the cloud via the broker module, but rather directed to the aggregator module on an incoming queue:var topic = $"{this.Configuration.LocalOutTopic}/{AGGREGATOR_MODULE}";
The first part of the topic is the “local out” topic as defined by the Edge runtime; the second part is the name of the module to which the message will be sent. The names of custom Edge modules are defined in the configuration model for the Edge itself, for example:
"modules": { "dataType": "map", "value": { "telemetryModule": { "device": { "value": "abb.ability.device.telemetryModule@1" } }, "aggregatorModule": { "device": { "value": "abb.ability.device.aggregatorModule@1" } } } }
Next, open the
AggregatorModule
class, which is located in theABB.Ability.IotEdge.CSTModules.CSharp.InterModuleCommunication.Aggregator
project. In theStartModuleAsync
method, the module first subscribes to the “local in” topic. The pound (#) character is used as a wildcard to ensure that any messages delivered on a topic beginning with this pattern are received by the module:var topic = $"{this.Configuration.LocalInTopic}/#";
The
OnApplicationMessageReceived
defines how incoming messages are handled. In this case the code first checks to make sure the topic matches the “local in” topic. Since theTelemetryModule
is the only expected source of messages, this project assumes that every incoming message originated in Step 1, above. Once the message is received, the code then parses it to create aTelemetryResult
object, which is then placed into the_receivedTelemetry
list. All receivedTelemetryResult
objects are stored in memory so that aggregation can be performed later.var messageBody = Encoding.UTF8 .GetString(e.ApplicationMessage.Payload); if ((e.ApplicationMessage.Topic ?? string.Empty) .StartsWith(this.Configuration.LocalInTopic, StringComparison.InvariantCultureIgnoreCase)) { var jsonObj = JObject.Parse(messageBody); if (jsonObj["type"].Value<string>().Equals("timeSeries", StringComparison.InvariantCultureIgnoreCase)) { _receivedTelemetry.Add( new TelemetryResult( jsonObj["objectId"].Value<string>(), jsonObj["variable"].Value<string>(), jsonObj["value"].Value<double>(), new DateTimeOffset( jsonObj["timestamp"].Value<DateTime>() ) ) ); } }
When the module is started, after it subscribes to the “local in” topic it begins to attempt to send telemetry by calling the
StartSendingTelemetryAsync
method. The implementation of this method varies from the modules in the other tutorials in that no telemetry is produced; instead, simple calculations are performed on the data in_receivedTelemetry
, and the results of those calculations are sent to the cloud. The code loops once per minute and derives the average value of each of the variables across all devices. If no telemetry has been received in the last minute, then no aggregation message is sent.while (true) { //only send aggregation once a minute await Task.Delay(60000); var relevantData = _receivedTelemetry .Where(e => e.Timestamp > DateTimeOffset.Now.AddMinutes(-1)); if (relevantData.Any()) { var vars = relevantData.Select(e => e.Variable).Distinct(); foreach (var v in vars) { var avg = Math.Round(relevantData .Where(e => e.Variable.Equals(v)) .Select(e => e.Value).Average(), 2); var msg = JsonConvert.SerializeObject( new TelemetryMessage<double>( Guid.Parse(Configuration.ObjectId), $"average{v}", () => avg ) ); await _mqttClient.PublishAsync( Configuration.MessagesOutTopic, msg) .ContinueWith(e => Console.WriteLine("Published to " + $"topic '{Configuration.MessagesOutTopic}': {msg}")); } //sanity check to make sure all messages are being received Console.WriteLine("Telemetry messages received " + $"in past minute: {relevantData.Count()}"); } else { Console.WriteLine($"{DateTime.Now.ToLongTimeString()}: " + "No telemetry received in the past minute"); } }
Aggregation such as this is often recommended when developing modules for ABB Ability™ Edge, as sending every data point to the cloud can become expensive and require a great deal of bandwidth. The aggregation demonstrated here is simple, but the .Net framework can be leveraged to create much more complex calculations as needed.
# Running the Sample Module
There are two ways to run this custom module. CST provides types for a given version of Ability Platform. If your CST contact has not already loaded these types into your Ability environment, these types can be manually uploaded to the environment. Once the types are loaded into the instance, any supported Edge setup can be used to run the Edge types provided by the tutorials.
Once you are familiar with the Edge, it is possible to build custom versions of the tutorials. Once a custom modules is created and the type(s) are loaded into an instance, any valid Edge can run the custom type.
# Verify Module Functionality
Once the Edge starts, it should automatically pull the image for this module and start a container using this image.
To verify that telemetry messages are being sent, the logs for this tutorial module can be viewed by running the following command:
docker service logs -f telemetryModule
Within the ABB Ability™ Instance API, to remotely invoke commands, a POST request must be made to the endpoint located at:
/api/v1/objects/{objectId}/models/{modelId}/methods/{methodName}
. A JSON-formatted payload can be provided but is not required. The values for each parameter can be determined as follows:- objectId: The object ID of the custom module which contains the method.
This can be found by looking at the logs for
telemetryModule
in the step above. Upon module startup, the model definitions for the module are retrieved on the “model in” topic. The object ID can be found in the device model for the module’s type returned on themodules/referenceModule/model/desired/abb.ability.device
topic. - modelId: The type of model on which the method is defined. As mentioned
above, the
start
andstop
methods (along with the interval parameter forstart
) are defined in the type definition forabb.ability.device.edge.modules.csharp.sample
. The model for that type isabb.ability.device
and should be the value here. - methodName: The name of the remote command that the user is invoking (in
this case, either
start
orstop
). The method name is case sensitive.
- objectId: The object ID of the custom module which contains the method.
This can be found by looking at the logs for
When invoking the
start
method, a payload is required to include the value for the telemetry sending interval. This can be defined as the input property in a JSON object as follows:{ "input": { "interval": 5 } }
Once the
start
method has been invoked on thetelemetryModule
, view the logs for theaggregatorModule
in a separate terminal window with the command:docker service logs -f aggregatorModule
Every minute, the aggregator module should be sending the average values for each variable across all devices. It should also log the total number of timeSeries messages received in the past minute, which should be equal to:
(60 / {telemetryInterval}) * {numberOfVariables} * {numberOfDevices}
Be sure to invoke the
stop
method on thetelemetryModule
to stop the sending of telemetry.