Skip to main content

MQTT client for Node.js

Tutorial to use the Node.js MQTT client with FairCom's MQTT broker engine

Abstract

This section contains MQTT Client tutorials for Node.js developers.

This section contains MQTT Client tutorials for Node.js developers.

Introduction

This quick start guide includes two tutorials. Both are command-line programs that you can use to publish messages and monitor published messages. Thus, the tutorials perform double duty. They show you how to use MQTT and they create command-line utilities that you can use to learn and troubleshoot MQTT.

MQTT tutorials:

FairCom MQ is an MQTT broker. Programs use an MQTT client library to publish and subscribe to messages on the MQTT broker.

An MQTT client does the following:
  • Connects to an MQTT broker.

  • Publishes a message to a topic.

  • Subscribes to a topic and receives all messages published to that topic by any client.

Installation

Prerequisites:
  • Install FairCom MQ.

  • FairCom MQ (or another MQTT broker) must be running on 127.0.0.1 and listening on port 1883.

  • The latest stable version of Node.js must be installed.

  • In the directory, <faircom>/drivers/nodejs.mqtt/NodejsMQTTTutorial1, run npm install to load the required node_modules.

    Note

    It is important that your computer be connected to the internet when you run npm install because around 80 modules will be downloaded and installed.

Code dependencies

The tutorial uses the MQTT.js Client Library for Node.js to communicate with MQTT. The MQTT.js client library is an open-source project using the MIT license.

Note

FairCom chose the MQTT.js client library over the Paho MQTT library because it works well, supports all versions of MQTT, uses an MIT license and is an active, mature project on GitHub.

This tutorial is a command-line utility for publishing MQTT messages. It publishes a message to a specified topic on a specified broker. This is useful for testing and troubleshooting MQTT.

The source code is simple and self-explanatory and is located in the <faircom>/drivers/nodejs.mqtt/NodejsMQTTTutorial1/publish/publish.js file.

Command line usage

node publish.js -t topic -f fileName -q qos -c clientid -s hostname:port -u username -p password

Command line options

Options may be present in any order, and if absent, default values will be used.

Table 1. Command line options

Option

Description

-t topic

An optional topic to publish to. It defaults to testTopic.

-f fileName

A required file name to use for the payload in the published message. The file may contain anything.

-q QoS

An optional quality of service to use when publishing the message. It defaults to 1.

  • 0 provides no guarantee of delivery.

  • 1 guarantees delivery at least once.

  • 2 guarantees delivery at most once.

-c clientID

An optional client id that uniquely identifies the client to the MQTT broker. It defaults to testPubClientID.

-s hostname:port

An optional hostname and port of an MQTT broker. It defaults to 127.0.0.1:1883.

-u username

An optional user name to use when connecting to the broker. If absent, a connection without a username and password is attempted.

-p password

An optional password to use with the username provided above. If absent a username without a password is attempted.

-h

Prints the usage and exit.



Example 1. Command line
node publish.js -t testTopic -f /Users/username/myjsonfiles/data.json


Example 2. Output
connecting to client at: 127.0.0.1:1883

connected to client at: 127.0.0.1:1883

Reading file at data.json

publishing to topic 'test/Topic1'

Connection closed.


Code

// Command line parsing and program defaults.
const argv = require( "yargs" )
      .version( "1.0.0" )
      .options( {
         t: {
            alias:    "topic",
            type:     "string",
            default:  "testTopic",
            describe: "An optional topic to subscribe to.",
         },
         f: {
            alias:        ["filename", "file"],
            type:         "string",
            describe:     "A required file name to use for the payload in the published message.",
            demandOption: true,
         },
         q: {
            alias:    "qos",
            type:     "number",
            default:  1,
            describe: "An optional quality of service to use when publishing the message.",
         },
         c: {
            alias:    ["clientId", "client"],
            type:     "string",
            describe: "An optional client id that uniquely identifies the client to the MQTT broker.",
         },
         s: {
            alias:    ["server", "broker", "b"],
            type:     "string",
            default:  "127.0.0.1:1883",
            describe: "An optional hostname and port of an MQTT broker.",
         },
         u: {
            alias:    ["username", "user"],
            type:     "string",
            describe: "An optional user name to use when connecting to the broker. If absent, a connection without a username and password is attempted.",
         },
         p: {
            alias:    ["password", "pass"],
            type:     "string",
            describe: "An optional password to use with the username provided above. Ignored if the username is absent.",
         },
         h: {
            alias:    "help",
            type:     "boolean",
            describe: "Print the usage and exit.",
         }
      } )
      .help()
      .usage( "Usage: node $0 [options]" )
      .argv;


// Read settings from the command line or use defaults.
let settings = {
   topic:           argv.t,
   fileName:        argv.f,
   qos:             argv.q,
   clientId:        argv.c || "nodeJS_pub_" + Math.random().toString( 16 ).substring( 2, 8 ),
   brokerAddress:   argv.s,
   username:        argv.u,
   password:        argv.p,
   protocolId:      "MQTT",
   protocolVersion: 4  // 4 is MQTT 3.1.1 and 5 is MQTT 5.0
};


// Variable declarations and initialization.
const brokerUrl = `mqtt://${settings.brokerAddress}`;
let mqtt = require( "mqtt" );
let process = require( "process" );
let client = null;


// Connect to the MQTT broker.
console.log( `Connecting to broker at: ${settings.brokerAddress}` )
client = mqtt.connect(
      brokerUrl,
      {
         clientId:        settings.clientId,
         connectTimeout:  30 * 1000,
         reconnectPeriod: 1000,
         protocolId:      settings.protocolId,
         protocolVersion: settings.protocolVersion,
         ...( settings.username && {username: settings.username} ),
         ...( settings.password && {password: settings.password} )
      } );


// Interrupt handling.
process.on( "SIGINT", function()
{
   console.log( "\nShutting down MQTT client after Ctrl-C" );
   if( client )
   {
      // Wait for in-flight messages to be sent and exit the program.
      client.end( false, () => { process.exit(); } );
   }
   else {
      process.exit();
   }
} );


// The "On Connect" listener.
client.on( "connect", function()
{
   console.log( `Connected to broker at: ${settings.brokerAddress}` );

   // read in file from fileName path passed in by user.
   console.log( `Reading file at ${settings.fileName}` );
   try {
      const fs = require( "fs" );
      const msgData = fs.readFileSync( settings.fileName );
      console.log( `Publishing to topic '${settings.topic}'` );
      client.publish(
            settings.topic,
            JSON.stringify( msgData ),
            {qos: settings.qos} );
      client.end();
   }
   catch( e )
   {
      if( e.code !== "MODULE_NOT_FOUND" )
      {
         client.end();
         throw e;
      }
   }
}, )


// The "On Disconnect" listener.
client.on( "disconnect", function()
{
   console.log( `Disconnected from: ${brokerUrl}` );
   if( client.reconnecting === false )
      client.reconnect(); // Only reconnect if not already reconnecting.
} );


// The "On Error" listener.
client.on( "error", function( err )
{
   console.log( "Error: " + err );
   if( err && typeof err === "object" && "code" in err )
      if( err.code === "ENOTFOUND" )
         console.log( "Network error: Verify the passed hostname:port and that your MQTT server is running." );
} );


// The "On Close" listener.
client.on( "close", function()
{
   console.log( "Connection closed." );
   if( client.isConnected )
      client.end();
} );

This tutorial is a utility for subscribing to MQTT messages in a topic. Each message sent to the topic is output to the console and optionally written as a file to the specified folder. This utility is useful for testing and troubleshooting MQTT.

The source code is simple and self-explanatory and is located in the <faircom>/drivers/nodejs.mqtt/NodejsMQTTTutorial1/subscribe/subscribe.js file.

Command line usage

node subscribe.js -t topic -d directory -q QoS -c clientID -s hostname:port -u username -p password

Command line options

Options may be present in any order, and if absent, default values will be used.

Table 2. Command line options

Option

Description

-t topic

An optional topic to subscribe to. It defaults to testTopic.

-d directory

An optional directory to save messages as files. If omitted, messages will be printed to screen and not saved to files. If you specify a directory that does not exist, it is created. Existing files in the directory with the same name are overwritten.

-q QoS

An optional quality of service to use when when subscribing to the topic. It defaults to 1.

  • 0 provides no guarantee of delivery.

  • 1 guarantees delivery at least once.

  • 2 guarantees delivery only once.

-c clientID

An optional client id that uniquely identifies the client to the MQTT broker. It defaults to testSubClientID.

-s hostname:port

An optional hostname and port of an MQTT broker. It defaults to 127.0.0.1:1883.

-u username

An optional user name to use when connecting to the broker. If absent, a connection without a username and password is attempted.

-p password

An optional password to use with the username provided above. Ignored if the username is absent.

-h

Prints the usage and exit.



Example 3. Command line
node subscribe.js -t testTopic -d testDir -c testSubClientID


Example 4. Output
connecting to broker at: 127.0.0.1:1883

connected to broker at: mqtt://127.0.0.1:1883

subscribing to topic: 'test/Topic1'

 

Messages that arrive will be printed to the screen.

received message: {

       "property":    "value",

       "string":      "string",

       "number":      1,

       "boolean":     true,

       "null":        null,
  
       "array":       [
       
            "value",
               
            1,

            1.1,

            -0,
   
            true,

            false,

            null

      ],

      "emptyObject": { }

}


Store incoming message

When a directory is specified, the utility writes the payload of each received message to a file. The name of the file is the name of the topic you specified followed by an underscore plus a sequential number. Topic names are case-sensitive. The file has no extension because an MQTT payload can be anything.

Examples of file/topic names:
  • testTopic_000001

  • testTopic_000002

  • testTopic_000003

  • testTopic_000004

Code

// Command line parsing and program defaults.
const argv = require( "yargs" )
      .version( "1.0.0" )
      .options( {
         t: {
            alias:    "topic",
            type:     "string",
            default:  "testTopic",
            describe: "An optional topic to subscribe to.",
         },
         d: {
            alias:    ["directory", "dir"],
            type:     "string",
            describe: "An optional directory to save messages as files.",
         },
         q: {
            alias:    "qos",
            type:     "number",
            default:  1,
            describe: "An optional quality of service to use when when subscribing to the topic.",
         },
         c: {
            alias:    ["clientId", "client"],
            type:     "string",
            describe: "Client ID for the MQTT connection",
         },
         s: {
            alias:    ["server", "broker", "b"],
            type:     "string",
            default:  "127.0.0.1:1883",
            describe: "An optional hostname and port of an MQTT broker.",
         },
         u: {
            alias:    ["username", "user"],
            type:     "string",
            describe: "An optional user name to use when connecting to the broker. If absent, a connection without a username and password is attempted.",
         },
         p: {
            alias:    ["password", "pass"],
            type:     "string",
            describe: "An optional password to use with the username provided above. Ignored if the username is absent.",
         },
         h: {
            alias:    "help",
            type:     "boolean",
            describe: "Print the usage and exit.",
         }
      } )
      .help()
      .usage( "Usage: node $0 [options]" )
      .argv;


// Read settings from the command line or use defaults.
let settings = {
   topic:           argv.t,
   outputDirectory: argv.d || null,
   qos:             argv.q,
   clientId:        argv.c || "nodeJS_sub_" + Math.random().toString( 16 ).substring( 2, 8 ),
   brokerAddress:   argv.s,
   username:        argv.u,
   password:        argv.p,
   protocolId:      "MQTT",
   protocolVersion: 4  // 4 is MQTT 3.1.1 and 5 is MQTT 5.0
};


// Variable declarations and initialization.
const fs = require( "fs" );
const path = require( 'path' );
const MAX_FILENAME_LEN = 500;
const brokerUrl = `mqtt://${settings.brokerAddress}`;
let mqtt = require( "mqtt" );
let process = require( "process" );
let client = null;
let counter = 0;


// Connect to the MQTT broker.
console.log( `Connecting to broker at: ${settings.brokerAddress}` );
client = mqtt.connect(
      brokerUrl,
      {
         clientId:        settings.clientId,
         connectTimeout:  30 * 1000,
         keepalive:       20,
         reconnectPeriod: 1000,
         protocolId:      settings.protocolId,
         protocolVersion: settings.protocolVersion,
         ...( settings.username && {username: settings.username} ),
         ...( settings.password && {password: settings.password} )
      } );


// Interrupt handling.
process.on( "SIGINT", function()
{
   console.log( "\nShutting down MQTT client after Ctrl-C" );
   if( client )
   {
      // Wait for in-flight messages to be sent and exit the program.
      client.end( false, () => { process.exit(); } );
   }
   else {
      process.exit();
   }
} );


// The "On Connect" listener.
client.on( "connect", function()
{
   console.log( `Connected to broker at: ${brokerUrl}` );
   console.log( `Subscribing to topic: '${settings.topic}'` );
   client.subscribe( settings.topic, {qos: settings.qos} );
} );


// The "On Disconnect" listener.
client.on( "disconnect", function()
{
   console.log( `Disconnected from: ${brokerUrl}` );
   if( client.reconnecting === false )
      client.reconnect(); // Only reconnect if not already reconnecting.
} );


// The "On Close" listener.
client.on( "close", function()
{
   console.log( "Connection closed." );
   if( client.isConnected )
      client.end();
} );


// The "On Error" listener.
client.on( "error", function( err )
{
   console.log( "Error: " + err );
   if( err && typeof err === "object" && "code" in err )
      if( err.code === "ENOTFOUND" )
         console.log( "Network error: Verify the passed hostname:port and that your MQTT server is running." );
} );


// The "On Message" listener.
client.on( "message", function( topic, message )
{
   let formattedMsg = Buffer.isBuffer( message ) ? message.toString() : message;
   console.log( `Received message: ${formattedMsg}` );

   // Save the message to disc if the user set an output directory.
   if( "outputDirectory" in settings && settings.outputDirectory !== null )
   {
      const absoluteOutputDirectory = path.join( process.cwd(), settings.outputDirectory );
      // Check if the directory exists and create it if absent.
      fs.access( absoluteOutputDirectory, fs.constants.F_OK, ( err ) =>
      {
         if( err )
         {
            // The directory does not exist, so create it.
            fs.mkdir( absoluteOutputDirectory, {recursive: true}, ( mkdirErr ) =>
            {
               if( mkdirErr )
                  console.error( `Error creating directory "${settings.outputDirectory}":`, mkdirErr );
               else
                  console.log( `Directory "${settings.outputDirectory}" created successfully.` );
               // Write the file only after the directory is created.
               writeFile( absoluteOutputDirectory, formattedMsg );
            } );
         }
         else {
            // Directory already exists.
            writeFile( absoluteOutputDirectory, formattedMsg );
         }
      } );
   }
} );


function writeFile( directory, data )
{
   let seq = String( ++counter ).padStart( 6, "0" );
   let filePrefix;
   let topic = settings.topic;

   topic = ( topic.length > MAX_FILENAME_LEN ) ? topic.substring( topic.length - MAX_FILENAME_LEN ) : topic;
   filePrefix = topic.replaceAll( /[/\\|· &%#*<>?:"]/g, "_" );
   let absoluteFileName = path.join( directory, `${filePrefix}_${seq}` );

   fs.writeFile(
         absoluteFileName,
         Buffer.from( data ),
         {flag: "w"},
         err => err ? console.error( err ) : null
   );
}

Don't hesitate to contact us with questions, suggestions, and bug reports. We want you to be successful.

Contact info:
  • Address:

    6300 W. Sugar Creek Drive

    Columbia, Missouri 65203-9052

  • Phone:

    800.234.8180