FairCom Edge concepts
FairCom Edge is an IoT data integration platform. It is an integration hub that combines FairCom’s MQTT broker, data collector plugins, data transformation engine, app server, and FairCom's database into a single executable. It is designed to receive, process, transform, and deliver data to and from external systems. It creates bridges across protocols – even fundamentally incompatible protocols – such as MQTT and SQL. Its core engine stores and forwards messages from many different inputs to many different outputs. New input and output protocols can be implemented using FairCom plugin technology.
The FairCom Edge plugin architecture provides any number and type of services to external systems. FairCom Edge ships with out-of-the-box support for MQTT, OPC UA, Modbus, SQL, ThingWorx, and FairCom’s JSON DB APIs. Using plugin services, FairCom Edge can receive data from any protocol and send it to any other protocol. You can build your own plugins using a simple C example program or FairCom can build them for you.
As shown in Figure 1, “Integrate data”, many input data streams can be bridged to many output data streams. Data coming in through any supported protocol can be transformed and delivered out to any supported protocol.
To help you apply these concepts, see the FairCom Edge API and the Tutorials.
The integration table is the heart of FairCom Edge. It is a database table that has been enhanced and optimized for inserts, transformation processes, queries, and forwarding data asynchronously to output services. It makes it possible to bridge data across otherwise incompatible protocols, data serialization formats, data shapes, and data types.
Input services insert data into integration tables. Output services read and deliver data from the integration tables.
Input services insert records, they do not typically change or delete records because that tampers with the original flow of data. Once an input source submits data for insertion, automatic transforms are created and stored in the record with the original data. It becomes read-only and can be queried by SQL and JSON DB APIs. An integration table is like a log that can be queried.
When an input service is configured, it is assigned to an existing integration table or it creates a new integration table. MQTT automatically creates an integration table for each new topic it creates. As you refine integration processes, you can rename integration tables to better label the data they hold. You can use the "configureTopic"
and "alterIntegrationTable"
actions in the API to rename an integration table.
Figure 2, “The integration table” identifies the FairCom Edge components that interact with the integration table.
Each integration table record contains the fields shown in Figure 3, “Integration table fields”.
Each record is automatically given a unique ID in the
"id"
field.The server’s current UTC time is recorded in the
"create_ts"
field.The incoming payload is stored in the
"source_payload"
field.Up to nine transform processes can store their results in fields
"t1"
through"t9"
and you can add more fields to store more transforms.Error messages and transform logs are stored in the
"error"
and"log"
fields.
Since the MQTT engine requires additional fields to track message data and since any integration table can be connected to the MQTT engine, each integration table contains MQTT fields.
Plugins can also add fields to each integration table. This occurs automatically when such a plugin is added to FairCom Edge. It is done invisibly in the background with no performance impact because of the FairCom Hot Alter Table feature.
The SQL and JSON DB APIs should only write to the "source_payload"
field and user-defined fields. They can query all fields.
Field name | Field type | Nullable | Indexed | Description |
---|---|---|---|---|
| BIGINT | N | N | The identity field provides a unique ID for each record. |
| TIMESTAMP | N | Y | Millisecond UTC date and time when inserted. |
| BOOLEAN | N |
| |
| LVARCHAR | Y |
| |
| LVARBINARY | Y | The payload from the source. | |
| LVARBINARY | Y | Location for a step in a transformation process to store its results. | |
| LVARBINARY | Y | Location for a step in a transformation process to store its results. | |
| LVARBINARY | Y | Location for a step in a transformation process to store its results. | |
| LVARBINARY | Y | Location for a step in a transformation process to store its results. | |
| LVARBINARY | Y | Location for a step in a transformation process to store its results. | |
| LVARBINARY | Y | Location for a step in a transformation process to store its results. | |
| LVARBINARY | Y | Location for a step in a transformation process to store its results. | |
| LVARBINARY | Y | Location for a step in a transformation process to store its results. | |
| LVARBINARY | Y | Location for a step in a transformation process to store its results. | |
Plugin fields | plugin-defined | Y | A plugin can add additional fields. Each field is prefixed with a unique prefix — for example, MQTT uses the prefix | |
User-defined fields | user-defined | Y | An application can create additional fields. The field names must not conflict with FairCom Edge field names. |
FairCom Edge uses the same integration process across all its services — see Figure 4, “The integration data”.
Inputs insert a record into an integration table.
The engine optionally transforms the data in the record and stores the results in one of the record's transform fields.
The delivery engine pushes inserted data to outputs.
The process starts with an input service receiving or gathering data from a device, user, or application and inserting a record into an integration table. The incoming data is placed in the "source_payload"
field of the integration table record. Prior to committing the insert, FairCom Edge automatically runs nine or more transformations on the source data and places the results of those transformations in the record’s transform fields "t1"
through "t9"
and optionally in other integration fields created by the user. The Table Transform Service can optionally extract data from JSON properties in the "source_payload"
and insert the data into the fields of a record in another table.
After the transform process finishes, FairCom begins to asynchronously deliver each inserted record to the non-query-based output services. Output services deliver data to external systems. Query services, such as SQL and JSON DB APIs can query data in integration tables at any time.
FairCom Edge connectors read data from a variety of protocols, such as Modbus and OPC UA, and transform the data into JSON, which is then inserted as a record into an integration table. FairCom Edge connectors automatically transform binary source data into JSON, which is inserted into the "source_payload"
field of an integration table. The inserted JSON standardizes subsequent data processing and simplifies subsequent transform processes. JSON also standardizes the delivery of data to output protocols used by IT and the cloud.
You configure each input service to insert records into one integration table. Multiple input services can insert records into the same integration table. Input services include MQTT, OPC UA, and Modbus.
The SQL and JSON DB APIs directly insert data into integration tables at any time without prior configuration. Thus, applications can use these APIs to insert and read records from any table that their account has permission to access — for example, an application or user can use the JSON DB API to insert records into the same integration table that a Modbus connector uses to collect data for a CNC device. Additionally, a user could use SQL to query records from that table.
During an insertion, the transform process automatically runs nine or more sequential transformations for each single database transaction. Transforms can take values from any table. They typically use values from the "source_payload"
and "t1"
to "t9"
fields. Transforms reshape or enrich the data, and place the results into the "t1"
to "t9"
fields as well as custom fields in the table or in another table. Custom fields can be indexed for fast, sorted queries making it easy to query all data collected by FairCom Edge.
Once data is received and transformed it is stored as a record and is ready for queries and asynchronous output. Results are then forwarded to output services.
Some output services provide traditional APIs, such as SQL and JSON DB API, that can query any integration table at any time without prior configuration — see Integration data.
Queries have access to all fields in an integration table, including the source’s original payload in the "source_payload"
field, the transformed values in the "t1"
through "t9"
fields, the "create_ts"
field, user-added fields, and so forth.
FairCom's delivery engine is available for all integration tables . In Figure 5, “The delivery process”, the right side of the integration table is the forward part of the Store and Forward delivery model used by FairCom Edge.
The delivery engine pushes data to outputs with guaranteed delivery in the order data is received. It is used to deliver data to output services such as MQTT, ThingWorx, AWS IoT Core, and other MQTT brokers. The delivery engine creates multiple processes that read records from each integration table and push the data to output services. Each delivery process delivers data in the order it is inserted into the integration table. Each delivery process runs independently (asynchronously) from all other output services, which means no output service can block another output service.
The delivery engine guarantees the delivery of data to output services. If anything in the output chain is not working properly (such as a failed network, an offline MQTT client, or a failed external system) the integration table retains the stored data. When the external system is available, the delivery engine starts where it left off and forwards the data to the external output service.
Each integration table is configured to retain data for a different period of time. It can also be configured to forward data to external systems at maximum rates to avoid saturating the network or overwhelming the external system.
When a record is inserted into an integration table, it triggers the transformation engine — see Figure 6, “The transform process”.
The transform engine runs up to nine transformations one after another within a single database transaction. Each transformation is configured to get its data from any field in the table, which is typically the "source_payload"
field or one of the transformation fields ("t1"
through "t9"
). Multiple transformations may share the same input field or fields.
Each transformation is also configured to store its results in an output field, which can be any of the transformation fields ("t1"
through "t9"
) or a user-created field. Once a transformation has been written to a field, the transform engine prevents any other transformation from writing to that field. Transformations cannot share output fields. If a transform fails, error processing is performed. The remaining transformations continue to run as long as their input field is not null. The record is committed regardless of transform failures.
Note
Most transforms store their output to one or more fields ("t1"
to "t9"
) in the current record of that table. For JSON, the "jsonPropertiesToTableFields"
transform can be used to store the output as a new record in a different integration table.
Since data capture is vital, an unexpected failure in one transform does not prevent other transforms from running and does not prevent the record from being inserted. The transform process is synchronous and completes before the record is inserted. Once a record is inserted, the original source payload and the transformed field values can be queried by SQL and the JSON DB APIs.
Each MQTT topic in FairCom Edge and each integration table can be configured to run its own transform process.
This example shows how MQTT topics and integration tables might be configured to take a Siemens binary UDT payload, convert it into JSON, enrich it, change its shape, and convert it to SparkPlugB for delivery.
The input service stores a Siemens binary UDT payload in the
"source_payload"
field.The first transform converts the UDT to JSON and stores it in
"t1"
.The second transform uses machine learning to enrich the value of
"t1"
. It stores the result in"t4"
and"t5"
.The third transform process changes the shape of the JSON in
"t1"
and stores the new JSON in"t2"
.The last transform converts the JSON value in
"t2"
into the SparkPlugB format and stores it in"t3"
.
source_payload
t1
-t9
create_ts
error
log
user-defined fields
When a transformation error occurs, a transformation error is placed in the "log"
field in the record that is being transformed, the "output"
field is nulled, and the "error"
field is set to 1
(true
).
The "log"
field contains the error message "Transformation failed on table XXX on record with id YYY".
"log"
field{ "transformErrors": [ { "errorCode": 0, "errorMessage": "", "errorData": {} } ] }
The JSON-to-external-table transform extracts properties from a JSON payload in the "source_payload"
field and inserts a record into another table. The destination integration table can then be used as a source for output services as shown in Figure 7, “Transform JSON to an external table”.
The "tableFieldsToJson"
transform works differently than the other transforms. It takes values from a table’s fields, constructs a JSON document, and puts the JSON document in the "source_payload"
field.
This transform is useful when records are inserted into an integration table using the SQL and JSON DB APIs, or another input service that writes data directly into the fields of a table. The "tableFieldsToJson"
transform is needed when transform processes and output services require the "source_payload"
field to be populated with JSON.
If transform errors occur, the "transformErrors"
property in the "log"
field is also appended to the end of the JSON document in the "source_payload"
field to notify recipients that a transform error occurred.
FairCom Edge uses integration tables to connect inputs to outputs. One or more inputs can be mapped to the same integration table for inserting data — for example, MQTT, Modbus, and OPC UA can all insert data into the same integration table.
One or more outputs can be mapped to the same integration table — for example, MQTT and Thingworx can be mapped to the same integration table, and it will push each inserted record to each output.
Each integration table can be thought of as a topic — for example, each MQTT topic uses a different integration table to store its messages.
You can configure each input service to use a different integration table when each input represents a different type of data or a different MQTT topic. Conversely, you can configure multiple input services to feed data to the same integration table when each input represents the same type of data or the same MQTT topic.
Some input services automatically create and use their own integration tables. So, each time a topic is created in MQTT, FairCom Edge automatically creates a new integration table to store its messages.
Some input services, such as OPC UA and Modbus, create a table when they are configured to collect and store data.
Each integration between a service and an integration table is configured separately. In Figure 8, “Connect inputs to outputs”, each arrow going to or coming from an integration table represents a different integration. An Input Integration connects an input service to an integration table. Likewise, an output integration connects an integration table to an output service.
Input integration 1
An application configures an OPC UA service to collect a specific set of data from a device and insert it into an integration table. FairCom Edge automatically creates the integration table. Most input services automatically create integration tables to store the data they collect, but they can also be configured to use an existing integration table.
Input integration 2
An IT message broker, such as RabbitMQ, ActiveMQ, or IBM MQ, receives enterprise messages. It is configured to use its MQTT plugin to publish messages to a topic on FairCom Edge's MQTT broker. In FairCom Edge, a topic is mapped to one integration table. Each MQTT message inserts a record into the appropriate integration table. The FairCom delivery engine delivers each record to each output that is mapped to that integration table.
Output integration 3
The MQTT output service is configured to use the integration table as a source for sending messages to a topic. This associates the integration table with the named MQTT topic. The MQTT output service uses the forward engine to read each inserted record and asynchronously publish it as an MQTT message to all clients that subscribed to that topic. Thus, any input service, such as OPC UA, Modbus, SQL, JSON DB API or MQTT, can insert a record into an integration table assigned to an MQTT topic, and the MQTT output service will publish a message to all subscribers of that topic.
Output integration 4
The ThingWorx connector service is configured to use the integration table as a source. The ThingWorx connector uses the forward engine to asynchronously deliver each inserted record to ThingWorx.
Output integration 5
FairCom Edge can forward MQTT messages to any other MQTT broker. Cloud providers, such as AWS IoT Core and Azure IoT Hub, provide MQTT brokers to receive IoT data and route it to cloud services, such as machine learning, data lakes, data warehouses, etc. Conversely, FairCom Edge can subscribe to any other MQTT broker, which allows it to receive messages back from cloud providers.
FairCom Edge uses the same integration process across all its services — see Figure 9, “Configure input and outputs”.
Input and output services can be modified at any time to route data where it needs to go.
The MQTT engine automatically maps MQTT topics to integration tables. Each MQTT topic can configure this mapping using the "configureTopic"
action. Applications can add, modify, and delete these mappings at any time.
Use the "createInput"
action to configure an input service. This causes FairCom Edge to create an integration table to store the input from that service. If you plan on attaching multiple inputs to the same integration table, you can first use the "createIntegrationTable"
action to create an integration table, and then use the "createInput"
action to connect the integration table to one or more input services.
Some input and output services do not need configuration — see Use SQL and JSON DB APIs.
Output services, such as ThingWorx, read data from a table. They cannot insert, update, or delete data.
Input services insert data but cannot update or delete data. This is because FairCom Edge is designed to store an unalterable history of all data it collects and transforms. When FairCom Edge transforms data, it stores the original source data along with the result of each transformation. Transform services are not allowed to modify a field that was written to by another transform service. Thus, the history of all data inserts and transforms is always available for auditing and troubleshooting.
Note
To prevent FairCom Edge from consuming all disk space, each integration table can be configured differently to purge data automatically on a schedule. The default is to keep data for 30 days and to purge expired data once each day.
FairCom Edge separates MQTT into two parts, the MQTT Pub and MQTT Sub services. The MQTT Pub service is an input service that receives published MQTT messages and the MQTT Sub service is an output service that delivers MQTT messages to subscribers. These services are decoupled as shown in Figure 10, “Configure MQTT”.
In Figure 10, “Configure MQTT” the arrow between the MQTT Pub service (the input service) and the integration table represents the input integration. The arrow between the integration table and the MQTT Sub service (the output service) represents the output integration. All input and output services are connected through integration tables.
Each MQTT topic is bound to a dedicated integration table that stores messages sent to a topic and delivers messages to subscribers of the topic. FairCom Edge automatically creates this table when a message is first published to a topic or when the "configureTopic"
action is used to create the topic.
Each time the MQTT input service receives a message, it inserts the message into the "source_payload"
field of a new record in the topic’s integration table. Transforms optionally transform this data and put results in the transform fields, "t1"
through "t9"
.
Each MQTT topic is configured using the "configureTopic"
action which configures the topic, the integration table, the MQTT input service, and the MQTT output service. If the topic being configured has not yet been published to FairCom Edge, the "configureTopic"
action automatically creates the integration table, the topic, and its configuration.
Optionally, when "configureTopic"
creates a new MQTT topic, it can assign an existing integration table to the topic. Thus, an integration table may be created using a protocol such as Modbus, and later bound to an MQTT topic. Similarly, an integration table may be created by the "createIntegrationTable"
action and later bound to an MQTT topic.
The MQTT output service uses FairCom’s delivery engine to asynchronously read records that are in the integration table. The "configureTopic"
action reads the MQTT payload from a field in the table. By default, it reads from the "source_payload"
field, but it can be configured to read from one of the transform fields, "t1"
through "t9"
or a user-defined field. The MQTT output service then delivers the message to MQTT clients that are subscribed to the topic. FairCom’s delivery engine guarantees the delivery of messages to subscribers in the order they are received.
The SQL and JSON DB APIs directly use integration tables without prior configuration. Thus, applications and users can use these APIs to insert and read from any table that their account has permissions to access. Because records can only be inserted into integration tables, accounts that use these APIs cannot typically update or delete records. The administrator account has the ability to update and delete records and it can grant permission to other accounts to do the same.
Each arrow in Figure 11, “Use SQL and JSON DB APIs” represents an account that is logged into these APIs with the appropriate permissions to insert data into and to query the integration table. Each integration table may grant different accounts different permissions.
FairCom Edge’s simple integration process can be repeated to form complex connections from any input to any output through integration tables.
Figure 12, “Integration engine process” shows the MQTT Pub service receiving messages sent to a topic and placing them into integration table A. The MQTT Subscriber service automatically retrieves these messages and delivers them to subscribers.
integration table A has been configured to run a payload transform that extracts a nested array of JSON objects from the payload of each record inserted into integration table A. It transforms these nested records into tabular data and inserts these records into integration table D. A machine learning application uses SQL Queries to read that data to provide feedback to factory processes.
OPC UA is configured to poll a device every minute for specific telemetry data. It inserts the collected data into integration table B. It runs a transform to convert the data into JSON. The MQTT Subscriber service is configured to use integration table B as the source of an MQTT topic. It sends each record inserted into integration table B to subscribers of that topic. Additionally, the ThingWorx connector is configured to use integration table B to get inserted records from this table and deliver them to a digital twin in ThingWorx.
A table transform process has been configured to use the delivery engine to get inserted records asynchronously from integration table A. It transforms these records into tabular data and inserts them into integration table C. An application uses the JSON DB API to gather data into a manufacturing execution system. In addition, the ThingWorx connector is configured to use the delivery engine to get inserted records asynchronously from this table and guarantee their delivery to a digital twin in ThingWorx.
FairCom Edge has standardized on JSON as the primary data interchange format. JSON is the preferred data format in FairCom Edge because it is easy to convert into other formats. It is also human-readable, which makes it good for troubleshooting. Standardizing on JSON simplifies the development and troubleshooting of input and output services. For these reasons, JSON is the most common data format used in IoT and MQTT payloads.
FairCom Edge supports any type of data, such as XML, binary images, Siemens binary user-defined types, SparkplugB, and so forth. FairCom Edge supports transform services that can convert binary data to JSON and convert from JSON to binary data. FairCom Edge can also convert JSON to tabular data by extracting values from JSON properties into the fields of a record in a table.
Each output and transform service is designed to read, process, and transform a specific type of data. When you bind an output or a transform service to an integration table, you must designate one of its fields as its data source. This field is typically the "source_payload"
field, but it may be a transformation field, "t1"
through "t9"
. The maximum field size is 2GB which is also the maximum record size. Each of these fields can contain any type of binary data, but the data type must match the data type expected by the output or transform Services that are bound to it.
Because output and transform services expect their source data to be in a specific format, data often needs to be transformed. It is typical to use one transform to convert from a proprietary binary format into JSON and use another transform to convert that JSON into a different proprietary format. This leverages JSON as a universal intermediary.