"getMessagesFromMqttTopic"
Use the "getMessagesFromMqttTopic"
action to retrieve messages published on a specific topic
This action uses the event streaming technique to retrieve messages published on a specific topic. It is stateless and does not remove messages from the topic's queue. Use it to tail the end of the queue, retrieve all messages from the beginning of the queue, retrieve messages starting from a timestamp, or paginate through a queue.
This action allows multiple processes to retrieve messages simultaneously from a topic without moving the client's message position or removing shared messages.
Instead of using a client's message position to retrieve undelivered messages, it retrieves messages from a specified starting location in the topic's message queue. You can retrieve messages from the end or beginning of the queue, a specific timestamp, or a queue position ID.
This action does not disconnect MQTT clients.
Call this action to retrieve the "maxMessages"
number of messages from an MQTT topic starting from the position specified by "startFrom"
. The message payload is encoded according to "binaryFormat"
.
You must use one but not both of the following properties to identify the message queue from which you want to retrieve messages:
Use
"topic"
to specify an MQTT topic from which you want to retrieve messages. A topic cannot contain MQTT wildcards. It matches only one one MQTT topic.Use
"tableName"
to specify the name of an integration table containing messages. You may also set the table's"databaseName"
and"ownerName"
; otherwise, the server will use the JSON Action Session's default values.
The optional "startFrom"
property controls the starting point where the server delivers messages.
Set
"startFrom"
to the default of ["tail"] to retrieve the most recently sent messages.Set
"startFrom"
to"head"
to retrieve messages from the beginning.Set
"startFrom"
to an ISO 8601 DateTime, such as"2024-07-11T21:18:01.000"
, to retrieve messages starting on or after that date and time. If the time component is missing, it defaults to"00:00:00.000"
.Set
"startFrom"
to an integer number, such as52233
, to retrieve messages starting with the message with that ID number.
Tip
To retrieve the next set of messages, set "startFrom"
to one plus the ID number of the last message you received.
Optionally, set "expiredMessagesFilter"
to "include"
, "exclude"
, or "only"
to include or exclude expired messages from the results. Use "only"
to include only expired messages in the results. It defaults to "include"
.
Optionally, set "includeMqttProperties": true
to include the message's MQTT properties. It defaults to false
.
Optionally, set "includeTransformFields": true
to include the custom fields used for the transformation process's inputs and outputs. It defaults to false
.
Tip
For more query options, use the JSON DB "getRecords..."
actions. You can use the "describeMqttSession"
to return information about a client's subscriptions, which includes a map of topic names to table names. You can query those tables to see their messages. You can use SQL to run ad hoc queries. You can find a record using partial, exact, or inequality matches. You can move and skip forward and backward through the records in index-sorted order.
Request examples
Maximal request
{ "api": "mq", "action": "getMessagesFromMqttTopic", "params": { "topic": "topic1", "databaseName": "faircom", "ownerName": "admin", "tableName": "mqtt_msg_topic1", "startFrom": number | "tail | head", "maxMessages": 1, "binaryFormat": "base64 | hex | utf8 | json | byteArray", "expiredMessagesFilter": "include", "includeMessageMetadata": true, "includeMqttProperties": true, "includeTransformFields": true, "includeOtherFields": true }, "apiVersion": "1.0", "debug": "max", "requestId": "optionalUniqueRequestIdFromTheClient", "authToken": "replaceWithAuthTokenFromCreateSession" }
{ "api": "mq", "action": "getMessagesFromMqttTopic", "params": { "maxMessages": 1, "binaryFormat": "json", "topic": "topic1", "expiredMessagesFilter": "include", "includeMessageMetadata": true, "includeMqttProperties": true, "includeTransformFields": true }, "authToken": "replaceWithAuthTokenFromCreateSession" }
{ "api": "mq", "action": "getMessagesFromMqttTopic", "params": { "topic": "topic1", "startFrom": "tail", "maxMessages": 10, "binaryFormat": "base64" }, "authToken": "replaceWithAuthTokenFromCreateSession" }
{ "api": "mq", "action": "getMessagesFromMqttTopic", "params": { "topic": "#", "maxMessages": 1, "binaryFormat": "hex", "includeMessageMetadata": true, "includeExpiredMessages": true, "includeMqttProperties": true, "includeTransformFields": true }, "authToken": "replaceWithAuthTokenFromCreateSession" }
{ "api": "mq", "action": "getMessagesFromMqttTopic", "params": { "topic": "topic1", "startFrom": "head", "maxMessages": 50, "binaryFormat": "json" }, "authToken": "replaceWithAuthTokenFromCreateSession" }
{ "api": "mq", "action": "getMessagesFromMqttTopic", "params": { "topic": "topic1", "startFrom": 51, "maxMessages": 50, "binaryFormat": "json" }, "authToken": "replaceWithAuthTokenFromCreateSession" }
{ "api": "mq", "action": "getMessagesFromMqttTopic", "params": { "topic": "topic1", "startFrom": "2024-07-11T21:18:01.000", "maxMessages": 5000, "binaryFormat": "json" }, "authToken": "replaceWithAuthTokenFromCreateSession" }
{ "result": { "messages": [ { "clientName": "MQTT client identifier in mqtt_client_id", "topic": "topic1", "subscriptionIdentifiers": [123], "databaseName": "faircom", "ownerName": "admin", "tableName": "mqtt_msg_topic1", "payload": "C3A3", "binaryFormat": "base64 | hex | utf8 | json | byteArray", "id": "263", "timestamp": "2023-12-18T18:04:20:085", "error": false, "log": null, "contentType": "IanaMediaType or a CustomType", "correlationData": "any JSON value up to 65500 bytes", "expirySeconds": 60, "messageHasExpired": true, "payloadFormat": "binary | utf8", "publishedQoS": 2, "retain": true, "responseTopic": "topic/name/for/subscriber/to/respond/to", "userProperties": [ { "key": "some key", "value": "some value" } ], "faircomClientId": 6732, "faircomTopicId": 457, "faircomForwardTopicId": 0, "faircomMqttPacketId": 53736, "faircomMqttSentReason": "Client | Retained | Will | Forwarded", "transformFields": [ { "fieldName": "t1", "fieldData": "some value" } ] } ] }, "errorCode": 0, "errorMessage": "", "debugInfo": { }, "requestId": "optionalUniqueRequestIdFromTheClient", "authToken": "replaceWithAuthTokenFromCreateSession" }
Use the "getMessagesFromMqttTopic"
action to retrieve messages published on a specific topic
Property | Description | Default | Type | Limits (inclusive) | |||||
---|---|---|---|---|---|---|---|---|---|
specifies how the |
| enum |
| ||||||
specifies whether or not to include expired messages in the result |
| enum |
| ||||||
returns the message's |
| Boolean |
| ||||||
returns MQTT properties in the response |
| Boolean |
| ||||||
returns additional fields that are not the built-in integration table, transform, and MQTT fields in the response |
| Boolean |
| ||||||
returns custom transform fields in the response |
| Boolean |
| ||||||
specifies the number of messages to return from a |
| integer |
| ||||||
sets the starting message position for a message filter |
| enum |
| ||||||
specifies an MQTT topic |
| UTF-8 string |
The "binaryFormat"
property specifies how the "payload"
property is encoded. It also applies to the format of all binary fields of messages included in a response.
It defaults to "json"
. When the MQTT payload is not JSON, the API returns the following error message: "MQTT payload cannot be encoded as JSON. Set binaryFormat to base64 or hex to retrieve the payload's binary value."
Note
In other FairCom APIs, the default value of "binaryFormat"
is "base64"
. In this API, it defaults to "json"
because JSON is the most common payload in MQTT messages.
You may set "binaryFormat"
to one of the following values:
"base64"
tells the server you set the payload to a string containing a base64 encoded value."hex"
tells the server you set the payload to a string containing a hexadecimal encoded value."byteArray"
tells the server you set the payload to an array of integers. Each integer must be between0
and255
, representing one byte in the payload."utf8"
tells the server you set the payload to a string. The server reads the UTF-8 encoded bytes in the string and directly stores them in the payload without conversion.The value stored in the payload does not include the double quotes before and after the string.
For example,
"payload": "I am a string"
, is stored in the payload asI am a string
. Notice how the server does not store the double quotes.
"json"
tells the server you set the payload to a JSON value. A JSON value may be one of the following:A JSON object assigned to the payload, such as
"payload": { "am": "an object" }
, causes the server to store the UTF-8 characters{ "am": "an object" }
in the payload.The server must store the exact highlighted text, including the curly braces.
The server may alter the object's whitespace, such as
{"am":"an object"}
.
A JSON array assigned to the payload, such as
"payload": [ "am", "array" ]
, causes the server to store the UTF-8 characters[ "am", "array" ]
in the payload.The server must store the exact highlighted text, including the straight braces.
The server may alter the object's whitespace, such as
["am","array"]
.
A JSON string assigned to the payload, such as
"payload": "I am a JSON string"
, causes the server to store the UTF-8 characters"I am a JSON string"
in the payload.The server must store the exact highlighted text, including the double quotes because a JSON string type includes the double-quote characters before and after the string's value.
A JSON number assigned to the payload, such as
"payload": -12.34
, causes the server to store the UTF-8 characters-12.34
in the payload.A JSON true assigned to the payload, such as
"payload": true
, causes the server to store the UTF-8 characterstrue
in the payload.A JSON false assigned to the payload, such as
"payload": false
, causes the server to store the UTF-8 charactersfalse
in the payload.A JSON null assigned to the payload, such as
"payload": null
, causes the server to store the UTF-8 charactersnull
in the payload.
The "expiredMessagesFilter"
property is optional. It is an enumerated string that contains one of the following values: "include"
, "exclude"
, and "only"
. It defaults to "include"
.
"include"
(default) returns expired messages in the results."exclude"
omits expired messages from the results."only"
returns only expired messages in the results.
Set the "includeMessageMetadata"
property to true
to include the message's "id"
, "timestamp"
, "error"
, and "log"
properties.
Set the "includeMqttProperties"
property to true
to include the MQTT properties in the response. Otherwise, omit the property or set it to false
or null
. MQTT properties are helpful when you need to troubleshoot MQTT messages.
Set the "includeOtherFields"
property to true
to include additional fields that are not the built-in integration table fields, transform fields, and MQTT fields. Otherwise, omit the property or set it to false
or null
.
Set the "includeTransformFields"
property to true
to include custom transform fields (if any) that a customer added to the integration table. Otherwise, omit the property or set it to false
or null
. Custom transform fields are helpful when you want to see the outputs and inputs of transformations assigned to the integration table.maxMessages.
The "maxMessages"
property is optional and specifies the maximum number of messages the server returns. When omitted or set to "null"
, it defaults to 20
.
The "maxMessages"
property is optional and defaults to 20
. It has a minimum value of 1
and a maximum value of 100000
. It specifies the number of messages to return from a "getMessages…"
action. The number of returned messages will be equal or less than this maximum.
The "startFrom"
property is an optional JSON string with an enumerated value of "head"
, "tail"
, a string ISO 8601 DateTime value, or a message ID number. The default value is "tail"
. It sets the starting message position for a message filter in the "getMessagesFromMqttTopic"
action. Use it to retrieve historical messages on or after the specified date and time.
When its value is "tail"
, the server returns the "maxMessages"
number of messages before and including the last message. The server returns messages in order of the last message toward the first.
When its value is "head"
, the server returns the "maxMessages"
number of messages after and including the first message that has not been purged from the server. The server returns messages in order of the first message toward the first.
When it is a string ISO 8601 DateTime value, such as "2024-07-11T21:18:01.000"
, the server returns the "maxMessages"
number of messages starting on or after that date and time. If the time component is missing, it defaults to "00:00:00.000"
.
When it is an ID number, such as 52233
, the server returns the "maxMessages"
number of messages starting on or after that ID. Messages are returned in ID ascending order. Do not embed an ID number in a string so the server can tell the difference between an ID and a DateTime.
The "topic"
property is an optional UTF-8 string containing an MQTT topic. It may not MQTT wildcard characters #
and +
. It may contain multiple /
characters, which segment an MQTT topic into a set of positional properties.