# Communication Between Edge Modules

This is the fifth in a series of tutorials which explain how to accomplish various functions within the context of an ABB Ability™ Edge. On the ABB Ability™ Edge runtime, the proxy and broker modules provided by the Platform are responsible for handling any communication to or from the ABB Ability™ Edge environment. Thus, any code written within custom modules has built-in security and can be relatively simple.

This tutorial will build upon the first four modules and will demonstrate:

  1. How to send messages to other ABB Ability™ Edge modules
  2. How to subscribe to and receive messages from other modules

# Before You Start

The tutorial is written in C# using the .Net Core framework. A basic console application will be created. Familiarity with .NET C# is recommended.

Once the application is compiled, Docker will be used to create and push an image to a remote container registry.

Once the image is deployed, the ABB Ability™ Edge runtime is responsible for identifying custom modules to be loaded and then pulling down the appropriate image and creating a container instance. The corresponding code for this tutorial can be found at this ABB Codebits repository under the “05-inter-module-communication” folder. For this tutorial, there are two projects: in the Telemetry folder the ABB.Ability.IotEdge.CST.Modules.CSharp.InterModuleCommunication.Telemetry project is responsible for generating simulated data and sending messages to the aggregator module; in the Aggregator folder the ABB.Ability.IotEdge.CST.Modules.CSharp.InterModuleCommunication.Aggregator project is responsible for receiving those messages, aggregating the data, and sending the results to the cloud. Both projects reference the ABB.Ability.IotEdge.CST.Modules.CSharp.Common project, which can be found under the “common” folder in the same repository.

For this tutorial, we are using two MIT-licensed nuget packages: MQTTnet for the sending and receiving of MQTT (Message Queuing Telemetry Transport) messages, and Newtonsoft.Json to parse JSON objects.

# Steps

  1. Open the ABB.Ability.IotEdge.CSTModules.CSharp.InterModuleCommunication.Telemetry project in Visual Studio, VS Code, or the IDE of your choice. The first point of focus in this tutorial is the TelemetryModule, which is created on startup in the Program.cs class.

    This module operates in the same manner as the modules covered in previous tutorials - by retrieving its configuration to determine how many simulated devices to create, and by using remote commands to start and stop the publishing of telemetry.

    The main difference is that in the StartSendingTelemetryAsync method, instead of sending telemetry messages on the “messages out” topic, they are now placed on a topic specific to the aggregator module. This will ensure that the telemetry messages are not sent to the cloud via the broker module, but rather directed to the aggregator module on an incoming queue:

    var topic = $"{this.Configuration.LocalOutTopic}/{AGGREGATOR_MODULE}";
    

    The first part of the topic is the “local out” topic as defined by the Edge runtime; the second part is the name of the module to which the message will be sent. The names of custom Edge modules are defined in the configuration model for the Edge itself, for example:

    "modules": {
         "dataType": "map",
         "value": {
             "telemetryModule": {
                     "device": {
                         "value": "abb.ability.device.telemetryModule@1"
                     }
             },
             "aggregatorModule": {
                     "device": {
                         "value": "abb.ability.device.aggregatorModule@1"
                     }
             }
         }
    }
    
  2. Next, open the AggregatorModule class, which is located in the ABB.Ability.IotEdge.CSTModules.CSharp.InterModuleCommunication.Aggregator project. In the StartModuleAsync method, the module first subscribes to the “local in” topic. The pound (#) character is used as a wildcard to ensure that any messages delivered on a topic beginning with this pattern are received by the module:

    var topic = $"{this.Configuration.LocalInTopic}/#";
    
  3. The OnApplicationMessageReceived defines how incoming messages are handled. In this case the code first checks to make sure the topic matches the “local in” topic. Since the TelemetryModule is the only expected source of messages, this project assumes that every incoming message originated in Step 1, above. Once the message is received, the code then parses it to create a TelemetryResult object, which is then placed into the _receivedTelemetry list. All received TelemetryResult objects are stored in memory so that aggregation can be performed later.

    var messageBody = Encoding.UTF8
        .GetString(e.ApplicationMessage.Payload);
    
    if ((e.ApplicationMessage.Topic ?? string.Empty)
        .StartsWith(this.Configuration.LocalInTopic,
        StringComparison.InvariantCultureIgnoreCase))
    {
        var jsonObj = JObject.Parse(messageBody);
    
        if (jsonObj["type"].Value<string>().Equals("timeSeries",
            StringComparison.InvariantCultureIgnoreCase))
        {
            _receivedTelemetry.Add(
                new TelemetryResult(
                    jsonObj["objectId"].Value<string>(),
                    jsonObj["variable"].Value<string>(),
                    jsonObj["value"].Value<double>(),
                    new DateTimeOffset(
                        jsonObj["timestamp"].Value<DateTime>()
                    )
                )
            );
        }
    }
    
  4. When the module is started, after it subscribes to the “local in” topic it begins to attempt to send telemetry by calling the StartSendingTelemetryAsync method. The implementation of this method varies from the modules in the other tutorials in that no telemetry is produced; instead, simple calculations are performed on the data in _receivedTelemetry, and the results of those calculations are sent to the cloud. The code loops once per minute and derives the average value of each of the variables across all devices. If no telemetry has been received in the last minute, then no aggregation message is sent.

    while (true)
    {
        //only send aggregation once a minute
        await Task.Delay(60000);
    
        var relevantData = _receivedTelemetry
            .Where(e => e.Timestamp > DateTimeOffset.Now.AddMinutes(-1));
        if (relevantData.Any())
        {
            var vars = relevantData.Select(e => e.Variable).Distinct();
            foreach (var v in vars)
            {
                var avg = Math.Round(relevantData
                    .Where(e => e.Variable.Equals(v))
                    .Select(e => e.Value).Average(), 2);
                var msg = JsonConvert.SerializeObject(
                    new TelemetryMessage<double>(
                        Guid.Parse(Configuration.ObjectId),
                        $"average{v}",
                        () => avg
                    )
                );
    
                await _mqttClient.PublishAsync(
                    Configuration.MessagesOutTopic, msg)
                    .ContinueWith(e => Console.WriteLine("Published to " +
                    $"topic '{Configuration.MessagesOutTopic}': {msg}"));
            }
    
            //sanity check to make sure all messages are being received
            Console.WriteLine("Telemetry messages received " +
                $"in past minute: {relevantData.Count()}");
        }
        else
        {
            Console.WriteLine($"{DateTime.Now.ToLongTimeString()}: " +
                "No telemetry received in the past minute");
        }
    }
    

    Aggregation such as this is often recommended when developing modules for ABB Ability™ Edge, as sending every data point to the cloud can become expensive and require a great deal of bandwidth. The aggregation demonstrated here is simple, but the .Net framework can be leveraged to create much more complex calculations as needed.

# Running the Sample Module

There are two ways to run this custom module. CST provides types for a given version of Ability Platform. If your CST contact has not already loaded these types into your Ability environment, these types can be manually uploaded to the environment. Once the types are loaded into the instance, any supported Edge setup can be used to run the Edge types provided by the tutorials.

Once you are familiar with the Edge, it is possible to build custom versions of the tutorials. Once a custom modules is created and the type(s) are loaded into an instance, any valid Edge can run the custom type.

# Verify Module Functionality

  1. Once the Edge starts, it should automatically pull the image for this module and start a container using this image.

  2. To verify that telemetry messages are being sent, the logs for this tutorial module can be viewed by running the following command:

    docker service logs -f telemetryModule
    
  3. Within the ABB Ability™ Instance API, to remotely invoke commands, a POST request must be made to the endpoint located at: /api/v1/objects/{objectId}/models/{modelId}/methods/{methodName}. A JSON-formatted payload can be provided but is not required. The values for each parameter can be determined as follows:

    • objectId: The object ID of the custom module which contains the method. This can be found by looking at the logs for telemetryModule in the step above. Upon module startup, the model definitions for the module are retrieved on the “model in” topic. The object ID can be found in the device model for the module’s type returned on the modules/referenceModule/model/desired/abb.ability.device topic.
    • modelId: The type of model on which the method is defined. As mentioned above, the start and stop methods (along with the interval parameter for start) are defined in the type definition for abb.ability.device.edge.modules.csharp.sample. The model for that type is abb.ability.device and should be the value here.
    • methodName: The name of the remote command that the user is invoking (in this case, either start or stop). The method name is case sensitive.
  4. When invoking the start method, a payload is required to include the value for the telemetry sending interval. This can be defined as the input property in a JSON object as follows:

    {
        "input": {
            "interval": 5
        }
    }
    
  5. Once the start method has been invoked on the telemetryModule, view the logs for the aggregatorModule in a separate terminal window with the command:

    docker service logs -f aggregatorModule
    

    Every minute, the aggregator module should be sending the average values for each variable across all devices. It should also log the total number of timeSeries messages received in the past minute, which should be equal to:

    (60 / {telemetryInterval}) * {numberOfVariables} * {numberOfDevices}
    
  6. Be sure to invoke the stop method on the telemetryModule to stop the sending of telemetry.

Last updated: 7/7/2021, 9:03:31 AM
Feedback