MQTT client for Python
Tutorial to use the Python MQTT client with FairCom's MQTT broker engine
This section contains MQTT Client tutorials for Python developers.
This section contains MQTT client tutorials for Python developers.
Introduction
This quick start guide includes two tutorials. Both are command-line programs that you can use to publish messages and monitor published messages. Thus, the tutorials perform double duty. They show you how to use MQTT and they create command-line utilities that you can use to learn and troubleshoot MQTT.
The first is a program that publishes MQTT messages to the broker.
The second is a program that subscribes to messages.
FairCom Edge and FairCom MQ are MQTT brokers. Programs use an MQTT client library to publish and subscribe to messages on the MQTT broker.
Connects to an MQTT broker.
Publishes a message to a topic.
Subscribes to a topic and receives all messages published to that topic by any client.
Installation
Install FairCom Edge.
FairCom Edge (or another MQTT broker) must be running on 127.0.0.1 and listening on port 1883.
A recent version of Python should be installed.
Code dependencies
Install paho using pip install paho-mqtt
.
This tutorial is a command-line utility for publishing MQTT messages to a specified topic on a specified broker. This is useful for testing and troubleshooting MQTT.
The source code is simple and self-explanatory and is located in the <faircom>/drivers/python.mqtt/pythonMQTTTutorial1/publish/publish.py
file.
Command line usage
python3 publish.py -t topic -f fileName -q qos -c clientid -s hostname:port -u username -p password
Command line options
Options may be present in any order, and if absent, default values will be used.
Option | Description |
---|---|
| An optional topic to publish to. It defaults to |
| A required file name to use for the payload in the published message. The file may contain anything. |
| An optional quality of service to use when publishing the message. It defaults to
|
| An optional client id that uniquely identifies the client to the MQTT broker. It defaults to |
| An optional hostname and port of an MQTT broker. It defaults to |
| An optional user name to use when connecting to the broker. If absent, a connection without a username and password is attempted. |
| An optional password to use with the username provided above. If absent a username without a password is attempted. |
| Prints the usage and exit. |
python3 publish.py -t testTopic -f data.json
Connecting to MQTT broker on host "127.0.0.1", port 1883, sending your file (data.json) to topic "testTopic", and then disconnecting. After publish.single() call. Goodbye!
Code
# -*- coding: Latin-1 -*- # # publish.py # # This Python script connects to the specified FairCom EdgeMQ MQTT broker and publishes one MQTT message to the specified topic. The contents of that message is read from disk. # import getopt import os import sys try: import paho.mqtt.publish as publish except ImportError: print('Error! This script requires the Paho MQTT module, which does not appear to be installed. Please install it, using a command such as "pip3 install paho-mqtt".') sys.exit(2) def usage(): print('This script connects to the specified MQTT broker, publishes one MQTT message (from the specified file), and then disconnects from the broker.\nCommand line options:\n -h, --help : display help\n -s MQTT server information - hostName or IPAddress : portNumber. Default="127.0.0.1:1883".\n -u user name. No Default.\n -p password. No Default. Ignored if user name is not specified.\n -t MQTT topic name. Default="testTopic".\n -c MQTT Client ID. Default="testPubClientID".\n -q publish QoS. Default=1.\n-f file that contains the message which will be published. Required.') print("Example: python publish.py -s 127.0.0.1:1883 -u admin -p ADMIN -t testTopic -c testPubClientID -q 1 -f data.json\n") # Default values for the command-line arguments server_data = '127.0.0.1:1883' user_name = None user_pass = None topic = 'testTopic' client_id = 'testPubClientID' publish_qos = 1 file_name = None # Required argument - File to publish to the topic # Read the command-line arguments try: opts, args = getopt.getopt(sys.argv[1:], 'hs:u:p:t:c:f:q:', longopts=['help']) except getopt.GetoptError as e: print(str(e)) usage() sys.exit(2) for opt, arg in opts: if opt in ('-h', '--help'): usage() sys.exit(1) if opt == '-s': server_data = arg elif opt == '-u': user_name = arg elif opt == '-p': user_pass = arg elif opt == '-t': topic = arg elif opt == '-c': client_id = arg elif opt == '-f': file_name = arg elif opt == '-q': # Make sure the publish QoS is valid if not arg.isdigit(): print('Error! The "-q" (publish QoS) argument must be a number from 0 to 2 inclusive.') usage() sys.exit(2) publish_qos = int(arg) if publish_qos<0 or publish_qos>2: print('Error! The "-q" (publish QoS) argument must be a number from 0 to 2 inclusive.') usage() sys.exit(2) else: print('Error! Unhandled option: "%s"' % opt) # Make sure the required argument is present if file_name is None: print('Error! The "-f" (file) argument is required.') usage() sys.exit(2) # Separate the host name and port from the 'server_data' variable, because Paho needs them individually. host = '127.0.0.1' port = 1883 parts = server_data.split(":") if len(parts) == 1: # No colon. Assume the entire string is the host name / IP Address host = server_data elif len(parts) == 2: # One colon. Assume hostname:port or IPAddress:port host = parts[0] port = int(parts[1]) else: print('Error! I do not understand the -s (server) parameter you specified. Please use one of the following:\n hostname:port\n IPAddress:port.') sys.exit(2) # Create the authentication data dictionary which will be passed to the Paho MQTT client if (user_name is None) or (user_pass is None): auth_data = None else: auth_data = {'username': user_name, 'password': user_pass} # Verify that the specified file actually exists if not os.path.isfile(file_name): print('Error! The specified file "%s" does not exist.' % file_name) sys.exit(2) # Read in the file, all at once with open(file_name, "rb") as inFile: message_payload = inFile.read() # This is a blocking call unless there is an exception while connecting to the MQTT broker. # This connects to the MQTT broker and sends the message contained in the file to the specified topic. # https://www.eclipse.org/paho/index.php?page=clients/python/docs/index.php#id2 print('Connecting to MQTT broker on host "%s", port %d, sending your file (%s) to topic "%s", and then disconnecting.' % (host, port, file_name, topic)) try: publish.single(topic, payload=message_payload, qos=publish_qos, retain=False, hostname=host, port=port, client_id=client_id, keepalive=60, will=None, auth=auth_data, tls=None, protocol=publish.paho.MQTTv311, transport="tcp") except ConnectionRefusedError as e: print("Connection Refused error. Is your FairCom EDGE server running? %s" % str(e)) except KeyboardInterrupt: print('User pressed <ctrl-c> to exit script.') except Exception as e: print("MQTT exception while connecting to broker %s." % str(e)) # try to print a little more information about the problem if len(e.args) > 0: print( "[%s]" % publish.paho.connack_string( e.args[0] ) ) print('After publish.single() call. Goodbye!')
This tutorial is a utility for subscribing to MQTT messages in a topic. Each message sent to the topic is output to the console and optionally written as a file to the specified folder. This utility is useful for testing and troubleshooting MQTT.
The source code is simple and self-explanatory and is located in the <faircom>/drivers/python.mqtt/pythonMQTTTutorial1/subscribe/subscribe.py
file
Command line usage
python3 subscribe.py -t topic -d directory -q QoS -c clientid -s hostname:port -u username -p password
Command line options
Options may be present in any order, and if absent, default values will be used.
Option | Description |
---|---|
| An optional topic to subscribe to. It defaults to |
| An optional directory to save messages as files. If omitted, messages will be printed to screen and not saved to files. If you specify a directory that does not exist, it is created. Existing files in the directory with the same name are overwritten. |
| An optional quality of service to use when subscribing to the topic. It defaults to
|
| An optional client id that uniquely identifies the client to the MQTT broker. It defaults to |
| An optional hostname and port of an MQTT broker. It defaults to |
| An optional user name to use when connecting to the broker. If absent, a connection without a username and password is attempted. |
| An optional password to use with the username provided above. Ignored if the username is absent. |
| Prints the usage and exit. |
python3 subscribe.py -t testTopic -d testDir
Connecting to MQTT broker on host "127.0.0.1", port 1883 and subscribing to topic "testTopic". Messages that arrive will be printed to the screen. Msg 1: { "property": "value", "string": "string", "number": 1, "boolean": true, "null...
Store incoming messages
When a directory is specified, the utility writes the payload of each received message to a file. The name of the file is the name of the topic you specified followed by an underscore plus a sequential number. Topic names are case-sensitive. The file has no extension because an MQTT payload can be anything.
testTopic_000001
testTopic_000002
testTopic_000003
testTopic_000004
Code
# -*- coding: Latin-1 -*- # # subscribe.py # # This Python script connects to the specified FairCom EdgeMQ MQTT broker and subscribes to the specified topic. # It then prints each message which arrives from the broker to the console and (optionally) writes the message to a numbered file in the specified directory. # import sys import getopt import os try: import paho.mqtt.subscribe as subscribe except ImportError: print('Error! This script requires the Paho MQTT module, which does not appear to be installed. Please install it, using a command such as "pip3 install paho-mqtt".') sys.exit(2) def usage(): print('This script connects to the specified MQTT broker and subscribes to the specified topic. It then prints each message that arrives to the console and (if desired) writes it to a numbered file in the specified directory. Warning! Existing files are overwritten!\nCommand line options:\n -h, --help : display help\n -s MQTT server information - hostName or IPAddress : portNumber. Default="127.0.0.1:1883".\n -u user name. No Default.\n -p password. No Default. Ignored if user name is not specified.\n -t MQTT topic name. Default="testTopic".\n -c MQTT Client ID. Default="testSubClientID".\n -q subscribe QoS. Default=1.\n -d directory to store files in, if this is desired. Defaults to not writing files.') print("Example: python subscribe.py -s 127.0.0.1:1883 -u admin -p ADMIN -t testTopic -c testSubClientID -q 1 -d messages\n") message_number = 0 # This function gets invoked each time a message arrives from the MQTT broker. def on_message(client, userdata, msg): global message_number, directory, safe_file_name message_number += 1 # Write the message to a new file in the specified directory, if a directory was specified. if not directory is None: with open(os.path.join(directory, "%s_%06d" % (safe_file_name, message_number)), "wb") as f: f.write(msg.payload) # Now try to decode the message as ASCII / UTF-8 Unicode for display. # If that fails, just show a hex dump of the message. try: payload=msg.payload.decode('utf8') # convert the bytes to a string. except UnicodeDecodeError: payload=msg.payload.hex() # Print the first 100 characters of the string-version of the payload. if len(payload)>100: print("Msg %d: %s..." % (message_number, payload[:100])) else: print("Msg %d: %s" % (message_number, payload)) # Default values for the command-line arguments server_data = '127.0.0.1:1883' user_name = None user_pass = None topic = 'testTopic' client_id = 'testSubClientID' subscribe_qos = 1 directory = None # Required argument - Folder to store message payloads in # Read the command-line arguments try: opts, args = getopt.getopt(sys.argv[1:], 'hs:u:p:t:c:d:q:', longopts=['help']) except getopt.GetoptError as e: print(str(e)) usage() sys.exit(2) for opt, arg in opts: if opt in ('-h', '--help'): usage() sys.exit(1) if opt == '-s': server_data = arg elif opt == '-u': user_name = arg elif opt == '-p': user_pass = arg elif opt == '-t': topic = arg elif opt == '-c': client_id = arg elif opt == '-d': directory = arg elif opt == '-q': # Make sure the subscribe QoS is valid if not arg.isdigit(): print('Error! The "-q" (subscribe QoS) argument must be a number from 0 to 2 inclusive.') usage() sys.exit(2) subscribe_qos = int(arg) if subscribe_qos<0 or subscribe_qos>2: print('Error! The "-q" (subscribe QoS) argument must be a number from 0 to 2 inclusive.') usage() sys.exit(2) else: print('Error! Unhandled option: "%s"' % opt) # Separate the host name and port from the 'server_data' variable, because Paho needs them individually. host = '127.0.0.1' port = 1883 parts = server_data.split(":") if len(parts) == 1: # No colon. Assume the entire string is the host name / IP Address host = server_data elif len(parts) == 2: # One colon. Assume hostname:port or IPAddress:port host = parts[0] port = int(parts[1]) else: print('Error! I do not understand the -s (server) parameter you specified. Please use one of the following:\n hostname:port\n IPAddress:port.') sys.exit(2) # Create the authentication data dictionary which will be passed to the Paho MQTT client if (user_name is None) or (user_pass is None): auth_data = None else: auth_data = {'username': user_name, 'password': user_pass} # If the user specified an output directory, create it if it does not already exist. # Then come up with a safe name to use for the files I create. if not directory is None: if not os.path.exists(directory): print('The specified directory "%s" does not exist. Attempting to create it...' % directory) os.makedirs(directory) # Sanitize the topic name to replace characters that are not allowed in file names # with underscores. I will be rather agressive with this... safe_file_name = topic illegal_chars = ['/', '\\', '*', '"', '?', ':', '>', '<', '|', ' ', '.', '&', '%', '#'] for c in illegal_chars: safe_file_name = safe_file_name.replace(c, '_') # Now, to prevent path overflows, save the last 500 chars of the file name. safe_file_name = safe_file_name[-500:] # This is a blocking call unless there is an exception while connecting to the MQTT broker. # This connects to the MQTT broker and subscribes to the specified topic. # Each time an MQTT packet arrives, the on_message() callback will be invoked. # This automatically handles re-connects to the broker. # https://www.eclipse.org/paho/index.php?page=clients/python/docs/index.php#id2 print('Connecting to MQTT broker on host "%s", port %d and subscribing to topic "%s". Messages that arrive will be printed to the screen' % (host, port, topic), end='') if directory is None: print(".") else: print('and stored in the "%s" directory.' % (directory)) try: subscribe.callback(on_message, topic, qos=subscribe_qos, userdata=None, hostname=host, port=port, client_id=client_id, keepalive=60, will=None, auth=auth_data, tls=None, protocol=subscribe.paho.MQTTv311) except ConnectionRefusedError as e: print("Connection Refused error. Is your FairCom EDGE server running? %s" % str(e)) except KeyboardInterrupt: print('User pressed <ctrl-c> to exit script.') except Exception as e: print("MQTT exception while connecting to broker %s." % str(e)) # try to print a little more information about the problem if len(e.args) > 0: print("[%s]" % subscribe.paho.connack_string(e.args[0])) print('After subscribe.callback() call. Goodbye!')
Don't hesitate to contact us with questions, suggestions, and bug reports. We want you to be successful.
Address:
6300 W. Sugar Creek Drive
Columbia, Missouri 65203-9052
Phone:
800.234.8180