MQTT client for Node.js
Tutorial to use the Node.js MQTT client with FairCom's MQTT broker engine
This section contains MQTT Client tutorials for Node.js developers.
This section contains MQTT Client tutorials for Node.js developers.
Introduction
This quick start guide includes two tutorials. Both are command-line programs that you can use to publish messages and monitor published messages. Thus, the tutorials perform double duty. They show you how to use MQTT and they create command-line utilities that you can use to learn and troubleshoot MQTT.
The first is a program that publishes MQTT messages to the broker.
The second is a program that subscribes to messages.
FairCom MQ is an MQTT broker. Programs use an MQTT client library to publish and subscribe to messages on the MQTT broker.
Connects to an MQTT broker.
Publishes a message to a topic.
Subscribes to a topic and receives all messages published to that topic by any client.
Installation
FairCom MQ (or another MQTT broker) must be running on 127.0.0.1 and listening on port 1883.
The latest stable version of Node.js must be installed.
In the directory, <faircom>/drivers/nodejs.mqtt/NodejsMQTTTutorial1, run
npm install
to load the requirednode_modules
.Note
It is important that your computer be connected to the internet when you run
npm install
because around 80 modules will be downloaded and installed.
Code dependencies
The tutorial uses the MQTT.js Client Library for Node.js to communicate with MQTT. The MQTT.js client library is an open-source project using the MIT license.
Note
FairCom chose the MQTT.js client library over the Paho MQTT library because it works well, supports all versions of MQTT, uses an MIT license and is an active, mature project on GitHub.
This tutorial is a command-line utility for publishing MQTT messages. It publishes a message to a specified topic on a specified broker. This is useful for testing and troubleshooting MQTT.
The source code is simple and self-explanatory and is located in the <faircom>/drivers/nodejs.mqtt/NodejsMQTTTutorial1/publish/publish.js
file.
Command line usage
node publish.js -t topic -f fileName -q qos -c clientid -s hostname:port -u username -p password
Command line options
Options may be present in any order, and if absent, default values will be used.
Option | Description |
---|---|
| An optional topic to publish to. It defaults to |
| A required file name to use for the payload in the published message. The file may contain anything. |
| An optional quality of service to use when publishing the message. It defaults to
|
| An optional client id that uniquely identifies the client to the MQTT broker. It defaults to |
| An optional hostname and port of an MQTT broker. It defaults to |
| An optional user name to use when connecting to the broker. If absent, a connection without a username and password is attempted. |
| An optional password to use with the username provided above. If absent a username without a password is attempted. |
| Prints the usage and exit. |
node publish.js -t testTopic -f /Users/username/myjsonfiles/data.json
connecting to client at: 127.0.0.1:1883 connected to client at: 127.0.0.1:1883 Reading file at data.json publishing to topic 'test/Topic1' Connection closed.
Code
// Command line parsing and program defaults. const argv = require( "yargs" ) .version( "1.0.0" ) .options( { t: { alias: "topic", type: "string", default: "testTopic", describe: "An optional topic to subscribe to.", }, f: { alias: ["filename", "file"], type: "string", describe: "A required file name to use for the payload in the published message.", demandOption: true, }, q: { alias: "qos", type: "number", default: 1, describe: "An optional quality of service to use when publishing the message.", }, c: { alias: ["clientId", "client"], type: "string", describe: "An optional client id that uniquely identifies the client to the MQTT broker.", }, s: { alias: ["server", "broker", "b"], type: "string", default: "127.0.0.1:1883", describe: "An optional hostname and port of an MQTT broker.", }, u: { alias: ["username", "user"], type: "string", describe: "An optional user name to use when connecting to the broker. If absent, a connection without a username and password is attempted.", }, p: { alias: ["password", "pass"], type: "string", describe: "An optional password to use with the username provided above. Ignored if the username is absent.", }, h: { alias: "help", type: "boolean", describe: "Print the usage and exit.", } } ) .help() .usage( "Usage: node $0 [options]" ) .argv; // Read settings from the command line or use defaults. let settings = { topic: argv.t, fileName: argv.f, qos: argv.q, clientId: argv.c || "nodeJS_pub_" + Math.random().toString( 16 ).substring( 2, 8 ), brokerAddress: argv.s, username: argv.u, password: argv.p, protocolId: "MQTT", protocolVersion: 4 // 4 is MQTT 3.1.1 and 5 is MQTT 5.0 }; // Variable declarations and initialization. const brokerUrl = `mqtt://${settings.brokerAddress}`; let mqtt = require( "mqtt" ); let process = require( "process" ); let client = null; // Connect to the MQTT broker. console.log( `Connecting to broker at: ${settings.brokerAddress}` ) client = mqtt.connect( brokerUrl, { clientId: settings.clientId, connectTimeout: 30 * 1000, reconnectPeriod: 1000, protocolId: settings.protocolId, protocolVersion: settings.protocolVersion, ...( settings.username && {username: settings.username} ), ...( settings.password && {password: settings.password} ) } ); // Interrupt handling. process.on( "SIGINT", function() { console.log( "\nShutting down MQTT client after Ctrl-C" ); if( client ) { // Wait for in-flight messages to be sent and exit the program. client.end( false, () => { process.exit(); } ); } else { process.exit(); } } ); // The "On Connect" listener. client.on( "connect", function() { console.log( `Connected to broker at: ${settings.brokerAddress}` ); // read in file from fileName path passed in by user. console.log( `Reading file at ${settings.fileName}` ); try { const fs = require( "fs" ); const msgData = fs.readFileSync( settings.fileName ); console.log( `Publishing to topic '${settings.topic}'` ); client.publish( settings.topic, JSON.stringify( msgData ), {qos: settings.qos} ); client.end(); } catch( e ) { if( e.code !== "MODULE_NOT_FOUND" ) { client.end(); throw e; } } }, ) // The "On Disconnect" listener. client.on( "disconnect", function() { console.log( `Disconnected from: ${brokerUrl}` ); if( client.reconnecting === false ) client.reconnect(); // Only reconnect if not already reconnecting. } ); // The "On Error" listener. client.on( "error", function( err ) { console.log( "Error: " + err ); if( err && typeof err === "object" && "code" in err ) if( err.code === "ENOTFOUND" ) console.log( "Network error: Verify the passed hostname:port and that your MQTT server is running." ); } ); // The "On Close" listener. client.on( "close", function() { console.log( "Connection closed." ); if( client.isConnected ) client.end(); } );
This tutorial is a utility for subscribing to MQTT messages in a topic. Each message sent to the topic is output to the console and optionally written as a file to the specified folder. This utility is useful for testing and troubleshooting MQTT.
The source code is simple and self-explanatory and is located in the <faircom>/drivers/nodejs.mqtt/NodejsMQTTTutorial1/subscribe/subscribe.js
file.
Command line usage
node subscribe.js -t topic -d directory -q QoS -c clientID -s hostname:port -u username -p password
Command line options
Options may be present in any order, and if absent, default values will be used.
Option | Description |
---|---|
| An optional topic to subscribe to. It defaults to |
| An optional directory to save messages as files. If omitted, messages will be printed to screen and not saved to files. If you specify a directory that does not exist, it is created. Existing files in the directory with the same name are overwritten. |
| An optional quality of service to use when when subscribing to the topic. It defaults to
|
| An optional client id that uniquely identifies the client to the MQTT broker. It defaults to |
| An optional hostname and port of an MQTT broker. It defaults to |
| An optional user name to use when connecting to the broker. If absent, a connection without a username and password is attempted. |
| An optional password to use with the username provided above. Ignored if the username is absent. |
| Prints the usage and exit. |
node subscribe.js -t testTopic -d testDir -c testSubClientID
connecting to broker at: 127.0.0.1:1883 connected to broker at: mqtt://127.0.0.1:1883 subscribing to topic: 'test/Topic1' Messages that arrive will be printed to the screen. received message: { "property": "value", "string": "string", "number": 1, "boolean": true, "null": null, "array": [ "value", 1, 1.1, -0, true, false, null ], "emptyObject": { } }
Store incoming message
When a directory is specified, the utility writes the payload of each received message to a file. The name of the file is the name of the topic you specified followed by an underscore plus a sequential number. Topic names are case-sensitive. The file has no extension because an MQTT payload can be anything.
testTopic_000001
testTopic_000002
testTopic_000003
testTopic_000004
Code
// Command line parsing and program defaults. const argv = require( "yargs" ) .version( "1.0.0" ) .options( { t: { alias: "topic", type: "string", default: "testTopic", describe: "An optional topic to subscribe to.", }, d: { alias: ["directory", "dir"], type: "string", describe: "An optional directory to save messages as files.", }, q: { alias: "qos", type: "number", default: 1, describe: "An optional quality of service to use when when subscribing to the topic.", }, c: { alias: ["clientId", "client"], type: "string", describe: "Client ID for the MQTT connection", }, s: { alias: ["server", "broker", "b"], type: "string", default: "127.0.0.1:1883", describe: "An optional hostname and port of an MQTT broker.", }, u: { alias: ["username", "user"], type: "string", describe: "An optional user name to use when connecting to the broker. If absent, a connection without a username and password is attempted.", }, p: { alias: ["password", "pass"], type: "string", describe: "An optional password to use with the username provided above. Ignored if the username is absent.", }, h: { alias: "help", type: "boolean", describe: "Print the usage and exit.", } } ) .help() .usage( "Usage: node $0 [options]" ) .argv; // Read settings from the command line or use defaults. let settings = { topic: argv.t, outputDirectory: argv.d || null, qos: argv.q, clientId: argv.c || "nodeJS_sub_" + Math.random().toString( 16 ).substring( 2, 8 ), brokerAddress: argv.s, username: argv.u, password: argv.p, protocolId: "MQTT", protocolVersion: 4 // 4 is MQTT 3.1.1 and 5 is MQTT 5.0 }; // Variable declarations and initialization. const fs = require( "fs" ); const path = require( 'path' ); const MAX_FILENAME_LEN = 500; const brokerUrl = `mqtt://${settings.brokerAddress}`; let mqtt = require( "mqtt" ); let process = require( "process" ); let client = null; let counter = 0; // Connect to the MQTT broker. console.log( `Connecting to broker at: ${settings.brokerAddress}` ); client = mqtt.connect( brokerUrl, { clientId: settings.clientId, connectTimeout: 30 * 1000, keepalive: 20, reconnectPeriod: 1000, protocolId: settings.protocolId, protocolVersion: settings.protocolVersion, ...( settings.username && {username: settings.username} ), ...( settings.password && {password: settings.password} ) } ); // Interrupt handling. process.on( "SIGINT", function() { console.log( "\nShutting down MQTT client after Ctrl-C" ); if( client ) { // Wait for in-flight messages to be sent and exit the program. client.end( false, () => { process.exit(); } ); } else { process.exit(); } } ); // The "On Connect" listener. client.on( "connect", function() { console.log( `Connected to broker at: ${brokerUrl}` ); console.log( `Subscribing to topic: '${settings.topic}'` ); client.subscribe( settings.topic, {qos: settings.qos} ); } ); // The "On Disconnect" listener. client.on( "disconnect", function() { console.log( `Disconnected from: ${brokerUrl}` ); if( client.reconnecting === false ) client.reconnect(); // Only reconnect if not already reconnecting. } ); // The "On Close" listener. client.on( "close", function() { console.log( "Connection closed." ); if( client.isConnected ) client.end(); } ); // The "On Error" listener. client.on( "error", function( err ) { console.log( "Error: " + err ); if( err && typeof err === "object" && "code" in err ) if( err.code === "ENOTFOUND" ) console.log( "Network error: Verify the passed hostname:port and that your MQTT server is running." ); } ); // The "On Message" listener. client.on( "message", function( topic, message ) { let formattedMsg = Buffer.isBuffer( message ) ? message.toString() : message; console.log( `Received message: ${formattedMsg}` ); // Save the message to disc if the user set an output directory. if( "outputDirectory" in settings && settings.outputDirectory !== null ) { const absoluteOutputDirectory = path.join( process.cwd(), settings.outputDirectory ); // Check if the directory exists and create it if absent. fs.access( absoluteOutputDirectory, fs.constants.F_OK, ( err ) => { if( err ) { // The directory does not exist, so create it. fs.mkdir( absoluteOutputDirectory, {recursive: true}, ( mkdirErr ) => { if( mkdirErr ) console.error( `Error creating directory "${settings.outputDirectory}":`, mkdirErr ); else console.log( `Directory "${settings.outputDirectory}" created successfully.` ); // Write the file only after the directory is created. writeFile( absoluteOutputDirectory, formattedMsg ); } ); } else { // Directory already exists. writeFile( absoluteOutputDirectory, formattedMsg ); } } ); } } ); function writeFile( directory, data ) { let seq = String( ++counter ).padStart( 6, "0" ); let filePrefix; let topic = settings.topic; topic = ( topic.length > MAX_FILENAME_LEN ) ? topic.substring( topic.length - MAX_FILENAME_LEN ) : topic; filePrefix = topic.replaceAll( /[/\\|· &%#*<>?:"]/g, "_" ); let absoluteFileName = path.join( directory, `${filePrefix}_${seq}` ); fs.writeFile( absoluteFileName, Buffer.from( data ), {flag: "w"}, err => err ? console.error( err ) : null ); }
Don't hesitate to contact us with questions, suggestions, and bug reports. We want you to be successful.
Address:
6300 W. Sugar Creek Drive
Columbia, Missouri 65203-9052
Phone:
800.234.8180