Skip to main content

Tutorial for using the C MQTT client with FairCom's MQTT broker engine

This section contains MQTT Client tutorials for C programmers.

Introduction

This quick start guide includes two tutorials. Both are command-line programs that you can use to publish messages and monitor published messages. Thus, the tutorials perform double duty. They show you how to use MQTT and they are command-line utilities that you can use to learn and troubleshoot MQTT.

MQTT tutorials:
  • The first is a program that publishes one MQTT message at a time to the broker.

  • The second is a program that subscribes to messages.

FairCom Edge is an MQTT broker. Programs use an MQTT client library to publish and subscribe to messages on the MQTT broker.

An MQTT client does the following:
  • Connects to an MQTT broker.

  • Publishes a message to a topic.

  • Subscribes to a topic and receives all messages published to that topic by any client.

Installation

Prerequisites:
  • FairCom Edge (or another MQTT broker) must be running. The default address is set to 127.0.0.1, and listening on port 1883.

  • Any modern C or C++ compiler installed.

Code dependencies

The tutorial uses the Eclipse Paho C Client to communicate with MQTT. The Paho MQTT client is an open-source project using the Eclipse 2.0 license.

Download the latest precompiled library from the releases section of the GitHub client library . Binaries are provided for Windows, Linux, and Macintosh. Extract that package to a location appropriate for your project.

FairCom dependencies

Install FairCom MQ.

Compile the project

The details of compiling this project are beyond the scope of this document, but the key points are:
  • The "include" directory from the precompiled library is needed by your compiler to resolve functions you will call.

  • The "lib" directory from the precompiled library is needed by your linker. The specific library file used by these tutorials is the "paho-mqtt3c" variant, and the linker will need to reference this exact file, not just the directory containing that file.

  • The "lib" directory from the precompiled library also contains a shared library file (.dll/.so/.dylib), which will need to be distributed with your compiled binary if you use dynamic linking. The precompiled archive in release 1.3.10 lacks these shared libraries, but release 1.3.9 contains them.

  • The complete tutorial source and header files are located in the drivers\c.mqtt\tutorials directory in the FairCom MQ/FairCom Edge installation folder.

Publish the contents of a file to an MQTT broker.

Command line usage

publish -f fileName [options]...

Command line options

Table 1. Command line options

Option

Description

-s server

An optional server address. It defaults to tcp://127.0.0.1:1883.

-t topic

An optional topic to publish to. It defaults to testTopic.

-c clientID

An optional client id that uniquely identifies the client to the MQTT broker. It defaults to testPubClientID.

-f fileName

A required file name to use for the payload in the published message. The file may contain anything.

-u username

An optional user name to use when connecting to the broker. If absent, a connection without username and password is attempted.

-p password

An optional password to use with the username provided above. If absent a username without a password is attempted.

-q qos

An optional quality of service to use when publishing the message. It defaults to 1.

  • 0 provides no guarantee of delivery.

  • 1 guarantees delivery at least once.

  • 2 guarantees delivery at most once.

-h help

Prints the usage and exit.



Example 1. Publish
publish -f data.json

publish -t testTopic -f data.json

publish -s tcp://127.0.0.1:1883 -t testTopic -c testSubClientID -f  data.json -u admin -p ADMIN -q 1


Code

#include "utilities.h"

/*
 * Command-line utility to publish a file as a payload.
 *
 * Pseudocode:
 * 1. Load default settings.
 * 2. Process command-line arguments and use them to overwrite default settings.
 * 3. Validate all settings.  Topics containing an octothorpe (#) or a plus (+) will be rejected.
 * 4. Read input file into memory.
 * 5. Create MQTT objects.
 * 6. Log into the MQTT broker.
 * 7. Publish the contents of the input file.
 * 8. Disconnect from the MQTT broker.
 *
 * This example program has a simplistic error-handling system so the user can focus on how the MQTT
 * client is used.  When an error condition is detected, this program immediately prints an error message
 * and terminates by calling the 'exiting()' function in 'utilities.c'.
 *
 * args:
 * -s tcp://address:port or ssl://address:port
 * -t topic
 * -c clientID
 * -f file
 * -u username
 * -p password
 * -q qos
 * -h help
 *
 * Basic usage:
 *    publish -f data.json
 *
 * Complete usage:
 *		publish -s tcp://127.0.0.1:1883 -t testTopic -c tutorial1ID -f data.json -u admin -p ADMIN -c tutorial1ID -q 1
 */
int main( int argc, char **argv )
{
	/* Create program objects and variables. */
	MQTTClient mq;
	MQTTClient_connectOptions connectionOptions = MQTTClient_connectOptions_initializer;
	MQTTClient_message publishMessage = MQTTClient_message_initializer;
	MQTTClient_deliveryToken token;
	int returnCode;
	size_t bytesRead = 0;
	settings_struct mySettings;


	/* Load default settings. */
	mySettings.brokerAddress = "tcp://127.0.0.1:1883";
	mySettings.topic = "testTopic";
	mySettings.clientId = "testPubClientID";
	mySettings.filename = "";
	mySettings.username = "";
	mySettings.password = "";
	mySettings.qos = 1;
	mySettings.maxFileSize = 4095;
	mySettings.timeout = 10000; /* Timeout for sending MQTT message */

	/* Process command-line arguments and then validate all settings. */
	returnCode = processCLAsPublish( argc, argv, &mySettings );
	if( returnCode )
		return returnCode;
	returnCode = validateSettingsPublish( mySettings );
	if( returnCode )
		return returnCode;

	/* Allocate a memory buffer and read the input file into it. */
	char *fileBuffer = readFileToBuffer( mySettings.filename, &bytesRead, mySettings.maxFileSize );
	printf( "Read %zu bytes from file '%s'.\n", bytesRead, mySettings.filename );

	/* Populate the client structure */
	MQTTClient_create( &mq, mySettings.brokerAddress, mySettings.clientId, MQTTCLIENT_PERSISTENCE_NONE, NULL );
	connectionOptions.keepAliveInterval = 20; /* default is 60 seconds */
	connectionOptions.cleansession = 1;
	connectionOptions.MQTTVersion = MQTTVERSION_3_1_1; /* Only try MQTT version 3.1.1.  Allows connection timeout to work if broker is not running. */
	connectionOptions.username = mySettings.username;	/* Specify the username and password from 'mySettings' */
	connectionOptions.password = mySettings.password;

	/* Log into the MQTT broker. */
	printf( "Connecting to the MQTT broker at address: '%s'\n", mySettings.brokerAddress );
	if( ( returnCode = MQTTClient_connect( mq, &connectionOptions ) ) != MQTTCLIENT_SUCCESS ) /* The default connect timeout is 30 seconds */
	{
		char logString[60];
		snprintf( logString, 60, "Failed to connect, return code %d\n", returnCode );
		exiting( returnCode, logString );
	}
	publishMessage.payload = fileBuffer;
	publishMessage.payloadlen = ( int )bytesRead;
	publishMessage.qos = mySettings.qos;
	publishMessage.retained = 0;

	/* Publish the contents of the input file. */
	MQTTClient_publishMessage( mq, mySettings.topic, &publishMessage, &token );
	printf( "Published %d bytes to topic '%s'.\n", publishMessage.payloadlen, mySettings.topic );
	returnCode = MQTTClient_waitForCompletion( mq, token, mySettings.timeout );
	if( returnCode )
		printf( "There was an error publishing the message.\n" );

	/* Disconnect from the MQTT broker. */
	MQTTClient_disconnect( mq, 10000 );
	MQTTClient_destroy( &mq );

	printf( "Disconnected from the MQTT broker.\n" );

	/* Free the buffer that stored the contents of the input file */
	free( fileBuffer );

	return returnCode;

Subscribe to a specific topic from an MQTT broker, print messages to the screen, and optionally persist messages to files in the specified directory.

Command line usage

subscribe [options]...

Command line options

Note

Options may be present in any order, and if absent, default values will be used.

Table 2. Command line options

Option

Description

-s server

An optional server address. It defaults to tcp://127.0.0.1:1883.

-t topic

An optional topic to subscribe to. It defaults to testTopic.

-c clientID

An optional client id that uniquely identifies the client to the MQTT broker. It defaults to testSubClientID.

-d directory

An optional directory to save messages as files. If omitted, messages will be printed to screen and not saved to files. If you specify a directory that does not exist, it is created. Existing files in the directory with the same name are overwritten.

-u username

An optional user name to use when connecting to the broker. If absent, a connection without username and password is attempted.

-p password

An optional password to use with the username provided above. Ignored if the username is absent.

-q qos

An optional quality of service to use when subscribing to the topic. It defaults to 1.

  • 0 provides no guarantee of delivery.

  • 1 guarantees delivery at least once.

  • 2 guarantees delivery only once.

-h help

Prints the usage and exit.



Example 2. Subscribe
subscribe

subscribe -t testTopic

subscribe -s tcp://127.0.0.1:1883 -t testTopic -c testSubClientID -d     testDir -u admin -p ADMIN -q 1


Code

#include "utilities.h"

void interruptHandler(int sig);
int messageArrived(void *context, char *topicName, int topicLength, MQTTClient_message *message);
void connectionLost(void *context, char *cause);
void deliveryComplete(void *context, MQTTClient_deliveryToken dt);


unsigned long g_counter = 0;	/* This is used to increment the message count, so output file names are unique. */
int g_keepRunning = 1;			/* This is changed to 0 to break out of the do/while() loop. */
char *g_outputDirectory = ""; /* This is needed for the callback to be able to write the output file. */


int main( int argc, char *argv[] )
{
	MQTTClient mq;
	MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
	int returnCode;
	settings_struct mySettings;

	/* Load default settings. */
	mySettings.brokerAddress = "tcp://127.0.0.1:1883";
	mySettings.topic = "testTopic";
	mySettings.clientId = "testSubClientID";
	mySettings.outputDirectory = "";
	mySettings.username = "";
	mySettings.password = "";
	mySettings.qos = 1;
	mySettings.listenSeconds = 0l; /* Default is to listen for messages until user hits <ctrl-c>. */

	printf( "\n" );

	/* Process command-line arguments and then validate all settings. */
	returnCode = processCLAsSubscribe( argc, argv, &mySettings);
	if( returnCode )
		return returnCode;
	returnCode = validateSettingsSubscribe(mySettings);
	if( returnCode )
		return returnCode;

	if( *mySettings.outputDirectory != '\0' )
		g_outputDirectory = mySettings.outputDirectory;

	/* Populate the client structure 'mq'. */
	returnCode = MQTTClient_create( &mq, mySettings.brokerAddress, mySettings.clientId, MQTTCLIENT_PERSISTENCE_NONE, NULL );
	if( returnCode == MQTTCLIENT_SUCCESS )
	{
		conn_opts.keepAliveInterval = 20;			 /* Default is 60 seconds. */
		conn_opts.cleansession = 1;					 /* Request a "clean" session. */
		conn_opts.MQTTVersion = MQTTVERSION_3_1_1; /* Use MQTT version 3.1.1.  Allows connection timeout to work if broker is not running. */
		conn_opts.username = mySettings.username;
		conn_opts.password = mySettings.password;

		/* All 3 callback functions are required, even if they are not used. */
		returnCode = MQTTClient_setCallbacks( mq, NULL, connectionLost, messageArrived, deliveryComplete );
		if( returnCode == MQTTCLIENT_SUCCESS )
		{
			printf( "Connecting to the MQTT broker at address: '%s'\n", mySettings.brokerAddress );

			returnCode = MQTTClient_connect( mq, &conn_opts );

			if( returnCode == MQTTCLIENT_SUCCESS )
			{
				returnCode = MQTTClient_subscribe( mq, mySettings.topic, mySettings.qos );
				if( returnCode == MQTTCLIENT_SUCCESS )
				{
					printf( "Successfully subscribed to the '%s' topic.\n", mySettings.topic );

					/* If the user wants me to run for _____ seconds and then shut down, compute the end time */
					time_t now, stopTime=0l;
					if (mySettings.listenSeconds>0)
					{
						time(&stopTime);
						stopTime+=mySettings.listenSeconds;
						printf("I will stop listening for packets in %ld seconds.\n", mySettings.listenSeconds);
					}

					/* Set a signal interrupt handler to intercept <CTRL><C>, so the client can gracefully disconnect. */
					signal( SIGINT, interruptHandler );
					do
					{
						( void )0;
						if (stopTime)
						{
							time(&now); /* get the current time */
							if (now>stopTime) /* if it's after the stop time, exit this loop */
							{
								printf("Done listening for packets.\n");
								g_keepRunning = 0; 
							}
						}
					} while( g_keepRunning );
				}
				else
					printf( "Unable to subscribe to the '%s' topic!\nReturn code %d\n\n", mySettings.topic, returnCode );
			}
			else
				printf( "Failed to connect to the broker!.  Return code: %d\n", returnCode );
		}
		else
			printf( "Failed to set the callback functions!\nUnable to continue.\n" );

		returnCode = MQTTClient_disconnect( mq, 10000 );
		if( returnCode == MQTTCLIENT_SUCCESS )
			printf( "Successfully disconnected from the broker.\n" );
		else
			printf( "Unable to cleanly disconnect from the broker!\n" );
		MQTTClient_destroy( &mq );
	}
	else
		printf( "Failed to create the client object!\nUnable to continue.\n" );
	return returnCode;
} /* End of main() function. */


/*
 * This function will catch <CTRL><C> and prevent it from exiting the program.
 * It will instead set the g_keepRunning flag to zero, which will break out of the do/while() loop, 
 * allowing the client to gracefully disconnect from the broker.
 */
void interruptHandler( int sig )
{
	signal( sig, SIG_IGN );
	/* Setting this to 0 breaks out of the do/while() loop. */
	g_keepRunning = 0;
} /* End of interruptHandler() function. */


/*
 * The message arrived callback processes incoming messages on all topics subscribed to.
 * The topicLength parameter is supposed to be non-zero if the topicName parameter contains a null.  I have never seen topicLength be anything but zero.
 */
int messageArrived( void *context, char *topicName, int topicLength, MQTTClient_message *message )
{
	g_counter++;
	char *payloadPointer;
	printf( "\nMessage arrived on topic '%s'\n", topicName );
	payloadPointer = message->payload;
	if( *g_outputDirectory != '\0' )
	{
		sanitizeTopic( topicName );
		char fileName[250];
		snprintf( fileName, 250, "%s_%06lu", topicName, g_counter );
		int returnValue = writeFile( g_outputDirectory, fileName, message->payload );
		if( returnValue != 0 )
		{
			g_keepRunning = 0;
			return returnValue;
		}
	}
	for( int i = 0; i < message->payloadlen; i++ )
		putchar( *payloadPointer++ );
	printf( "\n" );
	if( topicLength < 0 )
		printf( "Topic length was %d, context: %p", topicLength, context );
	MQTTClient_freeMessage( &message );
	MQTTClient_free( topicName );
	return 1;
} /* End of messageArrived() function. */


/*
 * The connection lost callback is required by the MQTTClient_setCallbacks() function.
 */
void connectionLost( void *context, char *cause )
{
	printf( "\n\nConnection lost!\n" );
	printf( "  Cause: %s\n", cause );
	printf( "  context: %p\n", context );
	g_keepRunning = 0;
} /* End of connectionLost() function. */


/*
 * The delivery complete callback is required by the MQTTClient_setCallbacks() function, even if it is not used.
 */
void deliveryComplete( void *context, MQTTClient_deliveryToken deliveryToken )
{
	printf( "Confirmed delivery of message with token '%d'.\n", deliveryToken );
	printf( "Context: %p\n", context );
} /* End of deliveryComplete() function. */

Don't hesitate to contact us with questions, suggestions, and bug reports. We want you to be successful.

Contact info:
  • Address:

    6300 W. Sugar Creek Drive

    Columbia, Missouri 65203-9052

  • Phone:

    800.234.8180