Skip to main content

Developer concepts

FairCom Edge's architecture and internals explained to engineers

FairCom Edge is an IoT data integration platform that collects, stores, transforms, and delivers data to and from external systems. It is an integration hub that combines FairCom’s MQTT broker, database engine, data connectors, data transformation engine, and app server into a single executable. It creates bridges across protocols – even fundamentally incompatible protocols – such as MQTT and SQL. It collects data from devices, stores collected data in its built-in database, optionally transforms collected data, and delivers resulting data to devices, software applications, cloud solutions, REST endpoints, MQTT clients, SQL clients, and so forth.

The FairCom Edge plugin architecture provides connectors and transform methods to collect, transform, and deliver data over a wide variety of protocols. FairCom Edge ships with out-of-the-box support for MQTT, OPC UA, Modbus, Siemens S7, Allen Bradley EtherNet/IP, REST, SQL, ThingWorx, and FairCom’s JSON DB APIs. You can use FairCom Edge plugin services to build additional connectors and transform methods. FairCom can also build additional connectors and transform methods for you. FairCom Edge is designed to receive data from any protocol, convert it to JSON, reshape JSON into a uniform namespace, enrich JSON with additional information, convert JSON to binary data as desired, and send the resulting data to any protocol.

As shown in Figure 1, “Integrate data, many input data streams can be bridged to many output data streams. Data coming in through any supported protocol can be transformed and delivered out to any supported protocol.

To help you apply these concepts, see the FairCom Edge API and the Tutorials.

Figure 1. Integrate data
Integrate data


Integration tables are the heart of FairCom Edge. They are database tables that are enhanced and optimized for inserts, transformation processes, queries, and forwarding data asynchronously to output services. They make it possible to bridge data across otherwise incompatible protocols, data serialization formats, data shapes, and data types. All integration tables are created with only insert and read permissions. This is because an integration table is an immutable, audit log of events.

Input services insert data into integration tables. Output services read and deliver data from the integration tables.

Input services insert records, they are not allowed to change or delete records because that would destroy the integrity of the audit log. When an input source submits data for insertion, automatic transforms are created and stored in the record with the original data. It becomes read-only and can be queried by SQL and JSON DB APIs.

When an input service is configured, it is assigned to an existing integration table or it creates a new integration table. MQTT automatically creates an integration table for each new topic it creates. Integration processes can be refined and integration tables can be renamed to better label the data they hold. The "configureTopic" and "alterIntegrationTable" actions in the API are used to rename an integration table.

Figure 2, “The integration table identifies the FairCom Edge components that interact with the integration table.

Figure 2. The integration table
The integration table


Each integration table record contains the fields shown in Figure 3, “Integration table fields.

Integration table core fields:
  • Each record is automatically given a unique ID in the "id" field.

  • The server’s current UTC time is recorded in the "create_ts" field.

  • The incoming payload is stored in the "source_payload" field.

  • Unlimited transform processes can store their results in any field (other than the built-in fields).

  • Error messages and transform logs are stored in the "error"and "log" fields.

Figure 3. Integration table fields
Integration table fields


Since the MQTT engine requires additional fields to track message data and since any integration table can be connected to the MQTT engine, each integration table contains MQTT fields.

Plugins can also add fields to each integration table. This occurs automatically when such a plugin is added to FairCom Edge. It is done invisibly in the background with no performance impact because of the FairCom Hot Alter Table feature.

The SQL and JSON DB APIs should only write to the "source_payload" field and user-defined fields. They can query all fields.

Table 1. Built-in fields in FairCom Edge integration tables

Field name

Field type

Nullable

Indexed

Description

id

BIGINT

N

N

The identity field provides a unique ID for each record.

create_ts

TIMESTAMP

N

Y

Millisecond UTC date and time when inserted.

error

BOOLEAN

N

true when there is an error.

log

LVARCHAR

Y

null when there are no issues; otherwise, it contains a JSON object that records information about all processing errors.

source_payload

LVARBINARY

Y

The payload from the source.

Plugin fields

plugin-defined

Y

A plugin can add additional fields. Each field is prefixed with a unique prefix — for example, MQTT uses the prefix "mqtt_".

User-defined fields

user-defined

Y

An application can create additional fields. The field names must not conflict with FairCom Edge field names.



FairCom Edge uses the same integration process across all its services — see Figure 4, “The integration data.

The process consists of:
  1. Inputs insert a record into an integration table.

  2. The engine optionally transforms the data in the record and stores the results in one of the record's transform fields.

  3. The delivery engine pushes inserted data to outputs.

Figure 4. The integration data
The integration data


The process starts with an input service receiving or gathering data from a device, user, or application and inserting a record into an integration table. The incoming data is placed in the "source_payload" field of the integration table record. Prior to committing the insert, FairCom Edge automatically runs transformations on the source data and places the results of those transformations in fields created by the user. The Table Transform Service can optionally extract data from JSON properties in the "source_payload" and insert the data into the fields of a record in another table.Table Transform Service

After the transform process finishes, FairCom begins to asynchronously deliver each inserted record to the non-query-based output services. Output services deliver data to external systems. Query services, such as SQL and JSON DB APIs can query data in integration tables at any time.

FairCom Edge connectors read data from a variety of protocols, such as Modbus and OPC UA, and transform the data into JSON, which is then inserted as a record into an integration table. FairCom Edge connectors automatically transform binary source data into JSON, which is inserted into the "source_payload" field of an integration table. The inserted JSON standardizes subsequent data processing and simplifies subsequent transform processes. JSON also standardizes the delivery of data to output protocols used by IT and the cloud.

You configure each input service to insert records into one integration table. Multiple input services can insert records into the same integration table. Input services include MQTT, OPC UA, and Modbus.

The SQL and JSON DB APIs directly insert data into integration tables at any time without prior configuration. Thus, applications can use these APIs to insert and read records from any table that their account has permission to access — for example, an application or user can use the JSON DB API to insert records into the same integration table that a Modbus connector uses to collect data for a CNC device. Additionally, a user could use SQL to query records from that table.

During an insertion, the transform process automatically runs nine or more sequential transformations for each single database transaction. Transforms can take values from any table. They typically use values from the "source_payload" and user-defined fields. Transforms reshape or enrich the data, and place the results into the user-defined ffields as well as custom fields in the table or in another table. Custom fields can be indexed for fast, sorted queries making it easy to query all data collected by FairCom Edge.

Once data is received and transformed it is stored as a record and is ready for queries and asynchronous output. Results are then forwarded to output services.

Some output services provide traditional APIs, such as SQL and JSON DB API, that can query any integration table at any time without prior configuration — see Integration data.

Queries have access to all fields in an integration table, including the source’s original payload in the "source_payload" field, the "create_ts" field, user-defined fields, and so forth.

FairCom's delivery engine is available for all integration tables. In Figure 5, “The delivery process, the right side of the integration table is the forward part of the Store and Forward delivery model used by FairCom Edge.

The delivery engine pushes data to outputs with guaranteed delivery in the order data is received. It is used to deliver data to output services such as MQTT, ThingWorx, AWS IoT Core, and other MQTT brokers. The delivery engine creates multiple processes that read records from each integration table and push the data to output services. Each delivery process delivers data in the order it is inserted into the integration table. Each delivery process runs independently (asynchronously) from all other output services, which means no output service can block another output service.

Figure 5. The delivery process
The delivery process


The delivery engine guarantees the delivery of data to output services. If anything in the output chain is not working properly (such as a failed network, an offline MQTT client, or a failed external system) the integration table retains the stored data. When the external system is available, the delivery engine starts where it left off and forwards the data to the external output service.

Each integration table is configured to retain data for a different period of time. It can also be configured to forward data to external systems at maximum rates to avoid saturating the network or overwhelming the external system.

When a record is inserted into an integration table, it triggers the transformation engine for that table — see Figure 6, “The transform process.

Figure 6. The transform process
The transform process


A transform process consists of one or more steps. All steps run within a single database transaction. You configure each transform step to get its data from one or more fields in the table. The "source_payload" field is the most common source for transform steps because most input processes, such as MQTT, store their data in "source_payload". Multiple transform steps may share the same input field or fields.

You configure each transform step to run a transform method with specific parameters.

You also configure each transform step to store its results in one or more output fields. Once a transform step has written to a field, the transform engine prevents any other transform step from writing to that field. Transform steps cannot be written to the same output fields because that would make it impossible to troubleshoot transform problems. Likewise, transform steps cannot be written to the "source_payload" field.

Note

Most transforms store their output to one or more user-defined fields in the current record of that table. In contrast, the "jsonToDifferentTableFields" transform can be used to store the output as a new record in a different integration table.

If a transform fails, error processing is performed, which logs the error in the log field and sets the error field to 1 (true). A failure in one transform does not prevent other transforms from running and does not prevent the record from being inserted.

The transform process is synchronous and completes before the record is inserted.  Once a record is inserted, the source payload and transformed field values can be queried by SQL and the JSON DB APIs, and FairCom's engine starts delivering the record to all configured outputs.

Each MQTT topic in FairCom Edge and each integration table can be configured to run its own transform process.

Example 1. A multi-step transformation process:

This example shows how five transform steps could take a binary payload from a Siemens user-defined type, convert it into JSON, enrich it, change its shape, and convert it to SparkPlugB for delivery. For the purpose of this example, the table contains user-defined fields named "t1" through "t5".

  1. The input service stores a Siemens binary UDT payload in the "source_payload" field.

  2. The first transform step converts the UDT to JSON and stores it in a user-defined field "t1".

  3. The second transform step looks up data in the database and adds the information to the JSON value in "t1". It stores the result in "t2".

  4. The third transform step changes the shape of the JSON in "t2" and stores the new JSON in "t3".

  5. The last transform step converts the JSON value in "t2" into the SparkPlugB format and stores it in "t4".



Output services can be configured to use any field as their data source:
  • source_payload

  • create_ts

  • error

  • log

  • user-defined fields

When a transformation error occurs, a transformation error is placed in the "log" field in the record that is being transformed, the "output" field is nulled, and the "error" field is set to 1 (true).

The "log" field contains the error message "Transformation failed on table XXX on record with id YYY".

Example 2. Structure of the JSON object in the "log" field
{
  "transformErrors": 
  [
    { 
      "errorCode": 0, 
      "errorMessage": "", 
      "errorData": {} }
  ]
}


The JSON-to-table transform extracts properties from a JSON payload in the "source_payload" field and inserts a record into another table. The destination integration table can then be used as a source for output services as shown in Figure 7, “Transform JSON to a table.

Figure 7. Transform JSON to a table
Transform JSON to a table


The "tableFieldsToJson" transform works differently than the other transforms. It takes values from a table’s fields, constructs a JSON document, and puts the JSON document in the "source_payload" field.

This transform is useful when records are inserted into an integration table using the SQL and JSON DB APIs, or another input service that writes data directly into the fields of a table. The "tableFieldsToJson" transform is needed when the transform processes and output services require the "source_payload" field to be populated with JSON.

If transform errors occur, the "transformErrors" property in the "log" field is also appended to the end of the JSON document in the "source_payload" field to notify recipients that a transform error occurred.

The JSON-to-field transform extracts properties from a JSON payload in the "source_payload" field and inserts a record into a field in the integration table.

FairCom Edge uses integration tables to connect inputs to outputs. One or more inputs can be mapped to the same integration table for inserting data — for example, MQTT, Modbus, and OPC UA can all insert data into the same integration table.

One or more outputs can be mapped to the same integration table — for example, MQTT and Thingworx can be mapped to the same integration table, and it will push each inserted record to each output.

Each integration table can be thought of as a topic — for example, each MQTT topic uses a different integration table to store its messages.

You can configure each input service to use a different integration table when each input represents a different type of data or a different MQTT topic. Conversely, you can configure multiple input services to feed data to the same integration table when each input represents the same type of data or the same MQTT topic.

Some input services automatically create and use their own integration tables. So, each time a topic is created in MQTT, FairCom Edge automatically creates a new integration table to store its messages.

Some input services, such as OPC UA and Modbus, create a table when they are configured to collect and store data.

Example 3. Connect input and output services

Each integration between a service and an integration table is configured separately. In Figure 8, “Connect inputs to outputs, each arrow going to or coming from an integration table represents a different integration. An Input Integration connects an input service to an integration table. Likewise, an output integration connects an integration table to an output service.

Figure 8. Connect inputs to outputs
Connect inputs to outputs


  • Input integration 1

    An application configures an OPC UA service to collect a specific set of data from a device and insert it into an integration table. FairCom Edge automatically creates the integration table. Most input services automatically create integration tables to store the data they collect, but they can also be configured to use an existing integration table.

  • Input integration 2

    An IT message broker, such as RabbitMQ, ActiveMQ, or IBM MQ, receives enterprise messages. It is configured to use its MQTT plugin to publish messages to a topic on FairCom Edge's MQTT broker. In FairCom Edge, a topic is mapped to one integration table. Each MQTT message inserts a record into the appropriate integration table. The FairCom delivery engine delivers each record to each output that is mapped to that integration table.

  • Output integration 3

    The MQTT output service is configured to use the integration table as a source for sending messages to a topic. This associates the integration table with the named MQTT topic. The MQTT output service uses the forward engine to read each inserted record and asynchronously publish it as an MQTT message to all clients that subscribed to that topic. Thus, any input service, such as OPC UA, Modbus, SQL, JSON DB API or MQTT, can insert a record into an integration table assigned to an MQTT topic, and the MQTT output service will publish a message to all subscribers of that topic.

  • Output integration 4

    The ThingWorx connector service is configured to use the integration table as a source. The ThingWorx connector uses the forward engine to asynchronously deliver each inserted record to ThingWorx.

  • Output integration 5

    FairCom Edge can forward MQTT messages to any other MQTT broker. Cloud providers, such as AWS IoT Core and Azure IoT Hub, provide MQTT brokers to receive IoT data and route it to cloud services, such as machine learning, data lakes, data warehouses, etc. Conversely, FairCom Edge can subscribe to any other MQTT broker, which allows it to receive messages back from cloud providers.



FairCom Edge uses the same integration process across all its services — see Figure 9, “Configure input and outputs.

Figure 9. Configure input and outputs
Configure input and outputs


Input and output services can be modified at any time to route data where it needs to go.

Actions that map an input service to an integration table:

The MQTT engine automatically maps MQTT topics to integration tables. Each MQTT topic can configure this mapping using the "configureTopic" action. Applications can add, modify, and delete these mappings at any time.

Use the "createInput" action to configure an input service. This causes FairCom Edge to create an integration table to store the input from that service. If you plan on attaching multiple inputs to the same integration table, you can first use the "createIntegrationTable" action to create an integration table, and then use the "createInput" action to connect the integration table to one or more input services.

Some input and output services do not need configuration — see Use SQL and JSON DB APIs.

Output services, such as ThingWorx, read data from a table. They cannot insert, update, or delete data.

Input services insert data but cannot update or delete data. This is because FairCom Edge is designed to store an unalterable history of all data it collects and transforms. When FairCom Edge transforms data, it stores the original source data along with the result of each transformation. Transform services are not allowed to modify a field that was written to by another transform service. Thus, the history of all data inserts and transforms is always available for auditing and troubleshooting.

Note

To prevent FairCom Edge from consuming all disk space, each integration table can be configured differently to purge data automatically on a schedule. The default is to keep data for 30 days and to purge expired data once each day.

FairCom Edge separates MQTT into two parts, the MQTT Pub and MQTT Sub services. The MQTT Pub service is an input service that receives published MQTT messages and the MQTT Sub service is an output service that delivers MQTT messages to subscribers. These services are decoupled as shown in Figure 10, “Configure MQTT.

Figure 10. Configure MQTT
Configure MQTT


In Figure 10, “Configure MQTT the arrow between the MQTT Pub service (the input service) and the integration table represents the input integration. The arrow between the integration table and the MQTT Sub service (the output service) represents the output integration. All input and output services are connected through integration tables.

Each MQTT topic is bound to a dedicated integration table that stores messages sent to a topic and delivers messages to subscribers of the topic. FairCom Edge automatically creates this table when a message is first published to a topic or when the "configureTopic" action is used to create the topic.

Each time the MQTT input service receives a message, it inserts the message into the "source_payload" field of a new record in the topic’s integration table. Transforms optionally transform this data and put results in the user-defined transform fields.

Each MQTT topic is configured using the "configureTopic" action which configures the topic, the integration table, the MQTT input service, and the MQTT output service.  If the topic being configured has not yet been published to FairCom Edge, the "configureTopic" action automatically creates the integration table, the topic, and its configuration.

Optionally, when "configureTopic" creates a new MQTT topic, it can assign an existing integration table to the topic. Thus, an integration table may be created using a protocol such as Modbus, and later bound to an MQTT topic. Similarly, an integration table may be created by the "createIntegrationTable" action and later bound to an MQTT topic.

The MQTT output service uses FairCom’s delivery engine to asynchronously read records that are in the integration table. The "configureTopic" action reads the MQTT payload from a field in the table. By default, it reads from the "source_payload" field, but it can be configured to read from one of the user-defined transform fields. The MQTT output service then delivers the message to MQTT clients that are subscribed to the topic. FairCom’s delivery engine guarantees the delivery of messages to subscribers in the order they are received.

The SQL and JSON DB APIs directly use integration tables without prior configuration. Thus, applications and users can use these APIs to insert and read from any table that their account has permissions to access. Because records can only be inserted into integration tables, accounts that use these APIs cannot typically update or delete records. The administrator account has the ability to update and delete records and it can grant permission to other accounts to do the same.

Each arrow in Figure 11, “Use SQL and JSON DB APIs represents an account that is logged into these APIs with the appropriate permissions to insert data into and to query the integration table. Each integration table may grant different accounts different permissions.

Figure 11. Use SQL and JSON DB APIs
Use SQL and JSON DB APIs


FairCom Edge’s simple integration process can be repeated to form complex connections from any input to any output through integration tables.

Example 4. Complex integration example

Figure 12, “Integration engine process shows the MQTT Pub service receiving messages sent to a topic and placing them into integration table A. The MQTT Subscriber service automatically retrieves these messages and delivers them to subscribers.

Figure 12. Integration engine process
Integration engine process


integration table A has been configured to run a payload transform that extracts a nested array of JSON objects from the payload of each record inserted into integration table A. It transforms these nested records into tabular data and inserts these records into integration table D. A machine learning application uses SQL Queries to read that data to provide feedback to factory processes.

OPC UA is configured to poll a device every minute for specific telemetry data. It inserts the collected data into integration table B. It runs a transform to convert the data into JSON. The MQTT Subscriber service is configured to use integration table B as the source of an MQTT topic. It sends each record inserted into integration table B to subscribers of that topic. Additionally, the ThingWorx connector is configured to use integration table B to get inserted records from this table and deliver them to a digital twin in ThingWorx.

A table transform process has been configured to use the delivery engine to get inserted records asynchronously from integration table A. It transforms these records into tabular data and inserts them into integration table C. An application uses the JSON DB API to gather data into a manufacturing execution system. In addition, the ThingWorx connector is configured to use the delivery engine to get inserted records asynchronously from this table and guarantee their delivery to a digital twin in ThingWorx.



FairCom Edge has standardized on JSON as the primary data interchange format. JSON is the preferred data format in FairCom Edge because it is easy to convert into other formats. It is also human-readable, which makes it good for troubleshooting. Standardizing on JSON simplifies the development and troubleshooting of input and output services. For these reasons, JSON is the most common data format used in IoT and MQTT payloads.

FairCom Edge supports any type of data, such as XML, binary images, Siemens binary user-defined types, SparkplugB, and so forth. FairCom Edge supports transform services that can convert binary data to JSON and convert from JSON to binary data. FairCom Edge can also convert JSON to tabular data by extracting values from JSON properties into the fields of a record in a table.

Each output and transform service is designed to read, process, and transform a specific type of data. When you bind an output or a transform service to an integration table, you must designate one of its fields as its data source. This field is typically the "source_payload" field, but it may be a user-defined transformation field. The maximum field size is 2GB which is also the maximum record size. Each of these fields can contain any type of binary data, but the data type must match the data type expected by the output or transform Services that are bound to it.

Because output and transform services expect their source data to be in a specific format, data often needs to be transformed. It is typical to use one transform to convert from a proprietary binary format into JSON and use another transform to convert that JSON into a different proprietary format. This leverages JSON as a universal intermediary.