"getMessagesFromMqTopic"
(jsonAction)
Use the "getMessagesFromMqTopic"
action to retrieve messages published on a specific topic
This action uses the event streaming technique to retrieve messages published on a specific topic by the MQ API and MQTT. It is stateless and does not remove messages from the topic's queue. Use it to tail the end of the queue, retrieve all messages from the beginning of the queue, retrieve messages starting from a timestamp, or paginate through a queue.
This action allows multiple processes to retrieve messages simultaneously from a topic without moving the client's message position or removing shared messages.
Instead of using a client's message position to retrieve undelivered messages, it retrieves messages from a specified starting location in the topic's message queue. You can retrieve messages from the end or beginning of the queue, a specific timestamp, or a queue position ID.
This action does not disconnect MQTT clients.
Call this action to retrieve the "maxMessages"
number of messages from a topic starting from the position specified by "startFrom"
. The message payload is encoded according to "binaryFormat"
.
You must use one but not both of the following properties to identify the message queue from which you want to retrieve messages:
Use
"topic"
to specify a topic from which you want to retrieve messages. A topic cannot contain wildcards.Use
"tableName"
to specify the name of an integration table containing messages. You may also set the table's"databaseName"
and"ownerName"
; otherwise, the server will use the JSON Action Session's default values.
The optional "startFrom"
property controls the starting point where the server delivers messages.
Set
"startFrom"
to the default of ["tail"] to retrieve the most recently sent messages.Set
"startFrom"
to"head"
to retrieve messages from the beginning.Set
"startFrom"
to an ISO 8601 DateTime, such as"2024-07-11T21:18:01.000"
, to retrieve messages starting on or after that date and time. If the time component is missing, it defaults to"00:00:00.000"
.Set
"startFrom"
to an integer number, such as52233
, to retrieve messages starting with the message with that ID number.
Tip
To retrieve the next set of messages, set "startFrom"
to one plus the ID number of the last message you received.
Optionally, set "expiredMessagesFilter"
to "include"
, "exclude"
, or "only"
to include or exclude expired messages from the results. Use "only"
to include only expired messages in the results. It defaults to "include"
.
Optionally, set "includeMqttProperties": true
to include the message's MQTT properties. It defaults to false
.
Optionally, set "includeTransformFields": true
to include the custom fields used for the transformation process's inputs and outputs. It defaults to false
.
Tip
For more query options, use the JSON DB "getRecords..."
actions. You can use the "describeMqSession"
to return information about a client's subscriptions, which includes a map of topic names to table names. You can query those tables to see their messages. You can use SQL to run ad hoc queries. You can find a record using partial, exact, or inequality matches. You can move and skip forward and backward through the records in index-sorted order.
Request examples
Minimal
{
"api": "mq",
"action": "getMessagesFromMqTopic",
"params": {
"topic": "test/topic1"
}
}
{
"api": "mq",
"action": "getMessagesFromMqTopic",
"params":
{
"topic": "test/topic1",
"databaseName": "faircom",
"ownerName": "admin",
"tableName": "mqtt_msg_test_topic1",
"startFrom": number | "tail | head",
"maxMessages": 1,
"binaryFormat": "base64 | hex | utf8 | json | byteArray",
"expiredMessagesFilter": "include",
"includeMessageMetadata": true,
"includeMqttProperties": true,
"includeTransformFields": true,
"includeOtherFields": true
},
"apiVersion": "1.0",
"debug": "max",
"requestId": "optionalUniqueRequestIdFromTheClient",
"authToken": "replaceWithAuthTokenFromCreateSession"
}
{
"api": "mq",
"action": "getMessagesFromMqTopic",
"params":
{
"maxMessages": 1,
"binaryFormat": "json",
"topic": "test/topic1",
"expiredMessagesFilter": "include",
"includeMessageMetadata": true,
"includeMqttProperties": true,
"includeTransformFields": true
},
"authToken": "replaceWithAuthTokenFromCreateSession"
}
{
"api": "mq",
"action": "getMessagesFromMqTopic",
"params":
{
"topic": "test/topic1",
"startFrom": "tail",
"maxMessages": 10,
"binaryFormat": "base64"
},
"authToken": "replaceWithAuthTokenFromCreateSession"
}
{
"api": "mq",
"action": "getMessagesFromMqTopic",
"params":
{
"topic": "#",
"maxMessages": 1,
"binaryFormat": "hex",
"includeMessageMetadata": true,
"includeExpiredMessages": true,
"includeMqttProperties": true,
"includeTransformFields": true
},
"authToken": "replaceWithAuthTokenFromCreateSession"
}
{
"api": "mq",
"action": "getMessagesFromMqTopic",
"params":
{
"topic": "test/topic1",
"startFrom": "head",
"maxMessages": 50,
"binaryFormat": "json"
},
"authToken": "replaceWithAuthTokenFromCreateSession"
}
{
"api": "mq",
"action": "getMessagesFromMqTopic",
"params":
{
"topic": "test/topic1",
"startFrom": 51,
"maxMessages": 50,
"binaryFormat": "json"
},
"authToken": "replaceWithAuthTokenFromCreateSession"
}
{
"api": "mq",
"action": "getMessagesFromMqTopic",
"params":
{
"topic": "test/topic1",
"startFrom": "2024-07-11T21:18:01.000",
"maxMessages": 5000,
"binaryFormat": "json"
},
"authToken": "replaceWithAuthTokenFromCreateSession"
}
{
"result": {
"messages":
[
{
"clientName": "Client identifier",
"topic": "test/topic1",
"subscriptionIdentifiers": [123],
"databaseName": "faircom",
"ownerName": "admin",
"tableName": "mqtt_msg_test_topic1",
"payload": "C3A3",
"binaryFormat": "base64 | hex | utf8 | json | byteArray",
"id": "263",
"timestamp": "2023-12-18T18:04:20:085",
"error": false,
"log": null,
"contentType": "IanaMediaType or a CustomType",
"correlationData": "any JSON value up to 65500 bytes",
"expirySeconds": 60,
"messageHasExpired": true,
"payloadFormat": "binary | utf8",
"publishedQoS": 2,
"retain": true,
"responseTopic": "topic/name/for/subscriber/to/respond/to",
"userProperties": [
{
"key": "some key",
"value": "some value"
}
],
"faircomClientId": 6732,
"faircomTopicId": 457,
"faircomForwardTopicId": 0,
"faircomMqttPacketId": 53736,
"faircomMqttSentReason": "Client | Retained | Will | Forwarded",
"transformFields": [
{ "fieldName": "t1", "fieldData": "some value" }
]
}
]
},
"errorCode": 0,
"errorMessage": "",
"debugInfo": {
},
"requestId": "optionalUniqueRequestIdFromTheClient",
"authToken": "replaceWithAuthTokenFromCreateSession"
}
Use the "getMessagesFromMqTopic"
action to retrieve messages published on a specific topic
"params"
property summariesProperty | Description | Default | Type | Limits (inclusive) | ||||
---|---|---|---|---|---|---|---|---|
(optional) specifies how the |
| string | One of the following: | |||||
(optional) specifies whether or not to include expired messages in the result |
| string |
| |||||
(optional) returns the message's |
| Boolean |
| |||||
(optional) returns MQTT properties in the response |
| Boolean |
| |||||
(optional) returns additional fields that are not the built-in integration table, transform, and MQTT fields in the response |
| Boolean |
| |||||
(optional) returns custom transform fields in the response |
| Boolean |
| |||||
(optional) specifies the number of messages to return from a |
| integer |
| |||||
(optional) sets the starting message position for a message filter |
| string |
| |||||
(optional) specifies the name of a topic |
| UTF-8 string |
The "expiredMessagesFilter"
property is optional. It is an enumerated string that contains one of the following values: "include"
, "exclude"
, and "only"
. It defaults to "include"
.
"include"
(default) returns expired messages in the results."exclude"
omits expired messages from the results."only"
returns only expired messages in the results.
Set the "includeMessageMetadata"
property to true
to include the message's "id"
, "timestamp"
, "error"
, and "log"
properties.
Set the "includeMqttProperties"
property to true
to include the MQTT properties in the response. Otherwise, omit the property or set it to false
or null
. MQTT properties are helpful when you need to troubleshoot MQTT messages.
Set the "includeOtherFields"
property to true
to include additional fields that are not the built-in integration table fields, transform fields, and MQTT fields. Otherwise, omit the property or set it to false
or null
.
Set the "includeTransformFields"
property to true
to include custom transform fields (if any) that a customer added to the integration table. Otherwise, omit the property or set it to false
or null
. Custom transform fields are helpful when you want to see the outputs and inputs of transformations assigned to the integration table.maxMessages.
The "maxMessages"
property is optional and defaults to 20
. It has a minimum value of 1
and a maximum value of 100000
. It specifies the number of messages to return from a "getMessages…"
action. The number of returned messages will be equal to or less than this maximum.
The "startFrom"
property is an optional JSON string with an enumerated value of "head"
, "tail"
, a string ISO 8601 DateTime value, or a message ID number. The default value is "tail"
. It sets the starting message position for a message filter in the "getMessagesFromMqTopic"
action. Use it to retrieve historical messages on or after the specified date and time.
When its value is
"tail"
, the server returns the"maxMessages"
number of messages before and including the last message. The server returns messages in order of the last message toward the first.When its value is
"head"
, the server returns the"maxMessages"
number of messages after and including the first message that has not been purged from the server. The server returns messages in order of the first message toward the first.When it is a string ISO 8601 DateTime value, such as
"2024-07-11T21:18:01.000"
, the server returns the"maxMessages"
number of messages starting on or after that date and time. If the time component is missing, it defaults to"00:00:00.000"
.When it is an ID number, such as
52233
, the server returns the"maxMessages"
number of messages starting on or after that ID. Messages are returned in ID ascending order. Do not embed an ID number in a string so the server can tell the difference between an ID and a DateTime.
The "topic"
property is a required, case-sensitive UTF-8 string containing the name of a topic. It must not contain MQTT wildcard characters, #
, or +
. It may contain multiple /
characters, which segment a topic into a set of positional properties.