Skip to main content

MQTT client for Python

Tutorial to use the Python MQTT client with FairCom's MQTT broker engine

Abstract

This section contains MQTT Client tutorials for Python developers.

This section contains MQTT client tutorials for Python developers.

Introduction

This quick start guide includes two tutorials. Both are command-line programs that you can use to publish messages and monitor published messages. Thus, the tutorials perform double duty. They show you how to use MQTT and they create command-line utilities that you can use to learn and troubleshoot MQTT.

MQTT tutorials:
  • The first is a program that publishes MQTT messages to the broker.

  • The second is a program that subscribes to messages.

FairCom Edge and FairCom MQ are MQTT brokers. Programs use an MQTT client library to publish and subscribe to messages on the MQTT broker.

An MQTT client does the following:
  • Connects to an MQTT broker.

  • Publishes a message to a topic.

  • Subscribes to a topic and receives all messages published to that topic by any client.

Installation

Prerequisites:

Code dependencies

Install paho using pip install paho-mqtt.

This tutorial is a command-line utility for publishing MQTT messages to a specified topic on a specified broker. This is useful for testing and troubleshooting MQTT.

The source code is simple and self-explanatory and is located in the <faircom>/drivers/python.mqtt/pythonMQTTTutorial1/publish/publish.py file.

Command line usage

python3 publish.py -t topic -f fileName -q qos -c clientid -s hostname:port -u username -p password

Command line options

Options may be present in any order, and if absent, default values will be used.

Table 1. Command line options

Option

Description

-t topic

An optional topic to publish to. It defaults to testTopic.

-f fileName

A required file name to use for the payload in the published message. The file may contain anything.

-q QoS

An optional quality of service to use when publishing the message. It defaults to 1.

  • 0 provides no guarantee of delivery.

  • 1 guarantees delivery at least once.

  • 2 guarantees delivery at most once.

-c clientID

An optional client id that uniquely identifies the client to the MQTT broker. It defaults to testPubClientID.

-s hostname:port

An optional hostname and port of an MQTT broker. It defaults to tcp://127.0.0.1:1883.

-u username

An optional user name to use when connecting to the broker. If absent, a connection without a username and password is attempted.

-p password

An optional password to use with the username provided above. If absent a username without a password is attempted.

-h

Prints the usage and exit.



Example 1. Command line
python3 publish.py -t testTopic -f data.json


Example 2. Output
Connecting to MQTT broker on host "127.0.0.1", port 1883, sending your file (data.json) to topic "testTopic", and then disconnecting.

After publish.single() call.  Goodbye!


Code

# -*- coding: Latin-1 -*-
#
# publish.py
#
# This Python script connects to the specified FairCom EdgeMQ MQTT broker and publishes one MQTT message to the specified topic.  The contents of that message is read from disk.
#

import getopt
import os
import sys

try:
  import paho.mqtt.publish as publish
except ImportError:
  print('Error!  This script requires the Paho MQTT module, which does not appear to be installed.  Please install it, using a command such as "pip3 install paho-mqtt".')
  sys.exit(2)


def usage():
  print('This script connects to the specified MQTT broker, publishes one MQTT message (from the specified file), and then disconnects from the broker.\nCommand line options:\n  -h, --help : display help\n  -s MQTT server information - hostName or IPAddress : portNumber.  Default="127.0.0.1:1883".\n  -u user name.  No Default.\n  -p password.  No Default.  Ignored if user name is not specified.\n  -t MQTT topic name.  Default="testTopic".\n  -c MQTT Client ID.  Default="testPubClientID".\n  -q publish QoS.  Default=1.\n-f file that contains the message which will be published.  Required.')
  print("Example: python publish.py -s 127.0.0.1:1883 -u admin -p ADMIN -t testTopic -c testPubClientID -q 1 -f data.json\n")



# Default values for the command-line arguments
server_data = '127.0.0.1:1883'
user_name = None
user_pass = None
topic = 'testTopic'
client_id = 'testPubClientID'
publish_qos = 1
file_name = None  # Required argument - File to publish to the topic

# Read the command-line arguments
try:
  opts, args = getopt.getopt(sys.argv[1:], 'hs:u:p:t:c:f:q:', longopts=['help'])
except getopt.GetoptError as e:
  print(str(e))
  usage()
  sys.exit(2)
for opt, arg in opts:
  if opt in ('-h', '--help'):
    usage()
    sys.exit(1)
  if opt == '-s':
    server_data = arg
  elif opt == '-u':
    user_name = arg
  elif opt == '-p':
    user_pass = arg
  elif opt == '-t':
    topic = arg
  elif opt == '-c':
    client_id = arg
  elif opt == '-f':
    file_name = arg
  elif opt == '-q':
    # Make sure the publish QoS is valid
    if not arg.isdigit():
      print('Error!  The "-q" (publish QoS) argument must be a number from 0 to 2 inclusive.')
      usage()
      sys.exit(2)
    publish_qos = int(arg)
    if publish_qos<0 or publish_qos>2:
      print('Error!  The "-q" (publish QoS) argument must be a number from 0 to 2 inclusive.')
      usage()
      sys.exit(2)
  else:
    print('Error!  Unhandled option: "%s"' % opt)

# Make sure the required argument is present
if file_name is None:
  print('Error!  The "-f" (file) argument is required.')
  usage()
  sys.exit(2)

# Separate the host name and port from the 'server_data' variable, because Paho needs them individually.
host = '127.0.0.1'
port = 1883
parts = server_data.split(":")
if len(parts) == 1:  # No colon.  Assume the entire string is the host name / IP Address
  host = server_data
elif len(parts) == 2:  # One colon.  Assume hostname:port or IPAddress:port
  host = parts[0]
  port = int(parts[1])
else:
  print('Error!  I do not understand the -s (server) parameter you specified.  Please use one of the following:\n  hostname:port\n  IPAddress:port.')
  sys.exit(2)

# Create the authentication data dictionary which will be passed to the Paho MQTT client
if (user_name is None) or (user_pass is None):
  auth_data = None
else:
  auth_data = {'username': user_name, 'password': user_pass}

# Verify that the specified file actually exists
if not os.path.isfile(file_name):
  print('Error!  The specified file "%s" does not exist.' % file_name)
  sys.exit(2)

# Read in the file, all at once
with open(file_name, "rb") as inFile:
  message_payload = inFile.read()

# This is a blocking call unless there is an exception while connecting to the MQTT broker.
# This connects to the MQTT broker and sends the message contained in the file to the specified topic.
# https://www.eclipse.org/paho/index.php?page=clients/python/docs/index.php#id2
print('Connecting to MQTT broker on host "%s", port %d, sending your file (%s) to topic "%s", and then disconnecting.' % (host, port, file_name, topic))
try:
  publish.single(topic, payload=message_payload, qos=publish_qos, retain=False, hostname=host, port=port, client_id=client_id, keepalive=60, will=None, auth=auth_data, tls=None, protocol=publish.paho.MQTTv311, transport="tcp")
except ConnectionRefusedError as e:
  print("Connection Refused error.  Is your FairCom EDGE server running?  %s" % str(e))
except KeyboardInterrupt:
  print('User pressed <ctrl-c> to exit script.')
except Exception as e:
  print("MQTT exception while connecting to broker %s." % str(e))
  # try to print a little more information about the problem
  if len(e.args) > 0:
    print( "[%s]" % publish.paho.connack_string( e.args[0] ) )


print('After publish.single() call.  Goodbye!')

This tutorial is a utility for subscribing to MQTT messages in a topic. Each message sent to the topic is output to the console and optionally written as a file to the specified folder. This utility is useful for testing and troubleshooting MQTT.

The source code is simple and self-explanatory and is located in the <faircom>/drivers/python.mqtt/pythonMQTTTutorial1/subscribe/subscribe.py file

Command line usage

python3 subscribe.py -t topic -d directory -q QoS -c clientid -s hostname:port -u username -p password

Command line options

Options may be present in any order, and if absent, default values will be used.

Table 2. Command line options

Option

Description

-t topic

An optional topic to subscribe to. It defaults to "testTopic".

-d directory

An optional directory to save messages as files. If omitted, messages will be printed to screen and not saved to files. If you specify a directory that does not exist, it is created. Existing files in the directory with the same name are overwritten.

-q QoS

An optional quality of service to use when subscribing to the topic. It defaults to 1.

  • 0 provides no guarantee of delivery.

  • 1 guarantees delivery at least once.

  • 2 guarantees delivery only once.

-c clientID

An optional client id that uniquely identifies the client to the MQTT broker. It defaults to testSubClientID.

-s hostname:port

An optional hostname and port of an MQTT broker. It defaults to tcp://127.0.0.1:1883.

-u username

An optional user name to use when connecting to the broker. If absent, a connection without a username and password is attempted.

-p password

An optional password to use with the username provided above. Ignored if the username is absent.

-h

Prints the usage and exit.



Example 3. Command line
python3 subscribe.py -t testTopic -d testDir


Example 4. Output
Connecting to MQTT broker on host "127.0.0.1", port 1883 and subscribing to topic "testTopic".  Messages that arrive will be printed to the screen.

Msg 1: {

        "property":    "value",

        "string":      "string",

        "number":      1,

        "boolean":     true,

        "null...


Store incoming messages

When a directory is specified, the utility writes the payload of each received message to a file. The name of the file is the name of the topic you specified followed by an underscore plus a sequential number. Topic names are case-sensitive. The file has no extension because an MQTT payload can be anything.

Examples of file/topic names:
  • testTopic_000001

  • testTopic_000002

  • testTopic_000003

  • testTopic_000004

Code

# -*- coding: Latin-1 -*-
#
# subscribe.py
#
# This Python script connects to the specified FairCom EdgeMQ MQTT broker and subscribes to the specified topic.
# It then prints each message which arrives from the broker to the console and (optionally) writes the message to a numbered file in the specified directory.
#

import sys
import getopt
import os

try:
  import paho.mqtt.subscribe as subscribe
except ImportError:
  print('Error!  This script requires the Paho MQTT module, which does not appear to be installed.  Please install it, using a command such as "pip3 install paho-mqtt".')
  sys.exit(2)


def usage():
  print('This script connects to the specified MQTT broker and subscribes to the specified topic.  It then prints each message that arrives to the console and (if desired) writes it to a numbered file in the specified directory.  Warning!  Existing files are overwritten!\nCommand line options:\n  -h, --help : display help\n  -s MQTT server information - hostName or IPAddress : portNumber.  Default="127.0.0.1:1883".\n  -u user name.  No Default.\n  -p password.  No Default.  Ignored if user name is not specified.\n  -t MQTT topic name.  Default="testTopic".\n  -c MQTT Client ID.  Default="testSubClientID".\n  -q subscribe QoS.  Default=1.\n  -d directory to store files in, if this is desired.  Defaults to not writing files.')
  print("Example: python subscribe.py -s 127.0.0.1:1883 -u admin -p ADMIN -t testTopic -c testSubClientID -q 1 -d messages\n")


message_number = 0


# This function gets invoked each time a message arrives from the MQTT broker.
def on_message(client, userdata, msg):
  global message_number, directory, safe_file_name
  message_number += 1
  # Write the message to a new file in the specified directory, if a directory was specified.
  if not directory is None:
    with open(os.path.join(directory, "%s_%06d" % (safe_file_name, message_number)), "wb") as f:
      f.write(msg.payload)
  # Now try to decode the message as ASCII / UTF-8 Unicode for display.
  # If that fails, just show a hex dump of the message.
  try:
    payload=msg.payload.decode('utf8') # convert the bytes to a string.
  except UnicodeDecodeError:
    payload=msg.payload.hex()

  # Print the first 100 characters of the string-version of the payload.
  if len(payload)>100:
    print("Msg %d: %s..." % (message_number, payload[:100]))
  else:
    print("Msg %d: %s" % (message_number, payload))


# Default values for the command-line arguments
server_data = '127.0.0.1:1883'
user_name = None
user_pass = None
topic = 'testTopic'
client_id = 'testSubClientID'
subscribe_qos = 1
directory = None  # Required argument - Folder to store message payloads in

# Read the command-line arguments
try:
  opts, args = getopt.getopt(sys.argv[1:], 'hs:u:p:t:c:d:q:', longopts=['help'])
except getopt.GetoptError as e:
  print(str(e))
  usage()
  sys.exit(2)
for opt, arg in opts:
  if opt in ('-h', '--help'):
    usage()
    sys.exit(1)
  if opt == '-s':
    server_data = arg
  elif opt == '-u':
    user_name = arg
  elif opt == '-p':
    user_pass = arg
  elif opt == '-t':
    topic = arg
  elif opt == '-c':
    client_id = arg
  elif opt == '-d':
    directory = arg
  elif opt == '-q':
    # Make sure the subscribe QoS is valid
    if not arg.isdigit():
      print('Error!  The "-q" (subscribe QoS) argument must be a number from 0 to 2 inclusive.')
      usage()
      sys.exit(2)
    subscribe_qos = int(arg)
    if subscribe_qos<0 or subscribe_qos>2:
      print('Error!  The "-q" (subscribe QoS) argument must be a number from 0 to 2 inclusive.')
      usage()
      sys.exit(2)
  else:
    print('Error!  Unhandled option: "%s"' % opt)

# Separate the host name and port from the 'server_data' variable, because Paho needs them individually.
host = '127.0.0.1'
port = 1883
parts = server_data.split(":")
if len(parts) == 1:  # No colon.  Assume the entire string is the host name / IP Address
  host = server_data
elif len(parts) == 2:  # One colon.  Assume hostname:port or IPAddress:port
  host = parts[0]
  port = int(parts[1])
else:
  print('Error!  I do not understand the -s (server) parameter you specified.  Please use one of the following:\n  hostname:port\n  IPAddress:port.')
  sys.exit(2)

# Create the authentication data dictionary which will be passed to the Paho MQTT client
if (user_name is None) or (user_pass is None):
  auth_data = None
else:
  auth_data = {'username': user_name, 'password': user_pass}

# If the user specified an output directory, create it if it does not already exist.
# Then come up with a safe name to use for the files I create.
if not directory is None:
  if not os.path.exists(directory):
    print('The specified directory "%s" does not exist.  Attempting to create it...' % directory)
    os.makedirs(directory)
  # Sanitize the topic name to replace characters that are not allowed in file names
  # with underscores.  I will be rather agressive with this...
  safe_file_name = topic
  illegal_chars = ['/', '\\', '*', '"', '?', ':', '>', '<', '|', ' ', '.', '&', '%', '#']
  for c in illegal_chars:
    safe_file_name = safe_file_name.replace(c, '_')
  # Now, to prevent path overflows, save the last 500 chars of the file name.
  safe_file_name = safe_file_name[-500:]

# This is a blocking call unless there is an exception while connecting to the MQTT broker.
# This connects to the MQTT broker and subscribes to the specified topic.
# Each time an MQTT packet arrives, the on_message() callback will be invoked.
# This automatically handles re-connects to the broker.
# https://www.eclipse.org/paho/index.php?page=clients/python/docs/index.php#id2
print('Connecting to MQTT broker on host "%s", port %d and subscribing to topic "%s".  Messages that arrive will be printed to the screen' %  (host, port, topic), end='')
if directory is None:
  print(".")
else:
  print('and stored in the "%s" directory.' % (directory))

try:
  subscribe.callback(on_message, topic, qos=subscribe_qos, userdata=None, hostname=host, port=port, client_id=client_id, keepalive=60, will=None, auth=auth_data, tls=None, protocol=subscribe.paho.MQTTv311)
except ConnectionRefusedError as e:
  print("Connection Refused error.  Is your FairCom EDGE server running?  %s" % str(e))
except KeyboardInterrupt:
  print('User pressed <ctrl-c> to exit script.')
except Exception as e:
  print("MQTT exception while connecting to broker %s." % str(e))
  # try to print a little more information about the problem
  if len(e.args) > 0:
    print("[%s]" % subscribe.paho.connack_string(e.args[0]))


print('After subscribe.callback() call.  Goodbye!')

Don't hesitate to contact us with questions, suggestions, and bug reports. We want you to be successful.

Contact info:
  • Address:

    6300 W. Sugar Creek Drive

    Columbia, Missouri 65203-9052

  • Phone:

    800.234.8180