Skip to main content

MQTT client for Java

Tutorial to use the Java MQTT client with FairCom's MQTT broker engine

Abstract

This section contains MQTT Client tutorials for Java developers.

This section contains MQTT Client tutorials for Java developers.

FairCom Edge is an MQTT broker. Programs use an MQTT client library to publish and subscribe to messages on the MQTT broker.

Introduction

This quick start guide includes two tutorials. Both tutorials are command-line programs that you can use to publish messages and monitor published messages. Thus, the tutorials perform double duty. They show you how to use MQTT and they are command-line utilities that you can use to learn and troubleshoot MQTT.

MQTT tutorials:
An MQTT client does the following:
  • Connects to an MQTT broker.

  • Publishes a message to a topic.

  • Subscribes to a topic and receives all messages published to that topic by any client.

Installation

Prerequisites:
  • FairCom MQ or FairCom Edge (or another MQTT broker) must be running on 127.0.0.1 and listening on port 1883.

  • Java version 8 or later must be installed.

  • Java should be in your command-line path.

  • Maven must be installed or you must use a Java IDE with Maven support.

Code dependencies

The tutorial uses the Eclipse Paho Java Client to communicate with MQTT. The Paho MQTT client is an open-source project using the Eclipse 2.0 license.

FairCom dependencies

Install FairCom MQ.

This tutorial shows you how to build a command-line utility for publishing MQTT messages to a specified topic on a specified broker. This is useful for testing and troubleshooting MQTT.

The source code is simple and self-explanatory:
  • The tutorial is located in the <faircom>/drivers/java.mqtt/JavaMQTTTutorial1 folder.

  • The source code is in the files, Publish.java and Utilities.java, which are located in the <faircom>/drivers/java.mqtt/JavaMQTTTutorial1/src/com/faircom/publish/ folder.

  • Use your Java IDE to open pom.xml and build the project, or build it using the Maven mvn package lifecycle.

Command line usage

copy target\Publish.jar .
java -jar Publish.jar -t topic -f fileName -q qos -c clientid -s hostname:port -u username -p password

Command line options

Options may be present in any order, and if absent, default values will be used.

Table 1. Command line options

Option

Description

-t topic

An optional topic to publish to. It defaults to testTopic.

-f fileName

A required file name to use for the payload in the published message. The file may contain anything.

-q QoS

An optional quality of service to use when publishing the message. It defaults to 1.

  • 0 provides no guarantee of delivery.

  • 1 guarantees delivery at least once.

  • 2 guarantees delivery at most once.

-c clientID

An optional client id that uniquely identifies the client to the MQTT broker. It defaults to testPubClientID.

-s hostname:port

An optional hostname and port of an MQTT broker. It defaults to tcp://127.0.0.1:1883.

-u username

An optional user name to use when connecting to the broker. If absent, a connection without a username and password is attempted.

-p password

An optional password to use with the username provided above. If absent a username without a password is attempted.

-h

Prints the usage and exit.



Example 1. Usage
java -jar Publish.jar -t testTopic -f data.json


Example 2. Output
Read 213 bytes from "data.json".

Connecting to the MQTT broker at address: "tcp://127.0.0.1:1883"

Published 213 bytes to testTopic2

Disconnected from the MQTT broker.


Code

package com.faircom.publish;


import org.eclipse.paho.client.mqttv3.*;

import java.util.Properties;

import static com.faircom.publish.Utilities.*;


/**
 * Command-line utility to publish a file as a payload.
 * args:
 * -s tcp://address:port or ssl://address:port
 * -t topic
 * -c clientID
 * -f file
 * -u username
 * -p password
 * -q qos
 * -h help
 *
 * Note: Paho will trigger a java.lang.reflect.InaccessibleObjectException if the broker address is set to an invalid IP address (e.g. 999.0.0.1).
 * That exception causes this program to exit awkwardly.
 * InaccessibleObjectException was added in Java version 9, but I am building for Java 8, so I am catching RuntimeException (the parent class).
 * Java 8 seems to handle this exception as a generic "MqttException", with no further details.
 */
public class Publish
{
  @SuppressWarnings( "squid:S106" )
  public static void main( String[] args )
  {
    // Program default settings.
    Properties settings = new Properties();
    settings.setProperty( "brokerAddress", "tcp://127.0.0.1:1883" );  // NOSONAR
    settings.setProperty( "username", "" );                           // NOSONAR
    settings.setProperty( "password", "" );
    settings.setProperty( "topic", "testTopic" );                     // NOSONAR
    settings.setProperty( "fileName", "" );
    settings.setProperty( "clientID", "testPubClientID" );
    settings.setProperty( "qos", "1" );                               // NOSONAR

    // Read and validate command-line arguments.
    processCLAs( args, settings );

    byte[] fileData = readFile( settings.getProperty( "fileName" ) );

    try( MqttClient mq = new MqttClient( settings.getProperty( "brokerAddress" ), settings.getProperty( "clientID" ) ) )
    {
      // Block on MQTT actions for up to 20 seconds, then continue the action in the background.
      mq.setTimeToWait( 20000 );
      mq.setCallback( new CallbackMethods() );

      System.out.println( "Connecting to the MQTT broker at address: \"" + settings.getProperty( "brokerAddress" ) + "\"" );
      MqttConnectOptions options = new MqttConnectOptions();
      // Change the keep-alive (PINGREQ) from the default of 60 seconds.
      options.setKeepAliveInterval( 30 );
      // Change the automatic-reconnect from the default of false.
      options.setAutomaticReconnect( true );
      if( !settings.getProperty( "username" ).isEmpty() )
      {
        options.setUserName( settings.getProperty( "username" ) );
        options.setPassword( settings.getProperty( "password" ).toCharArray() );
      }
      mq.connect( options );

      // Publish the data loaded from the input file.
      mq.publish( settings.getProperty( "topic" ), fileData, Integer.parseInt( settings.getProperty( "qos" ) ), false );

      System.out.println( "Published " + fileData.length + " bytes to " + settings.getProperty( "topic" ) );

      mq.disconnect();
      System.out.println( "Disconnected from the MQTT broker." );
    }
    catch( MqttSecurityException mqttSecurityException )
    {
      System.out.println( "\n\nUnable to connect to the broker due to a security exception." );
      System.out.println( "Verify the username and password is correct." );
      exiting( mqttSecurityException.getReasonCode(), mqttSecurityException.getLocalizedMessage() );
    }
    catch( MqttPersistenceException mqttPersistenceException )
    {
      System.out.println( "\n\nUnable to connect to the broker due to a persistence exception!" );
      exiting( mqttPersistenceException.getReasonCode(), mqttPersistenceException.getLocalizedMessage() );
    }
    catch( MqttException mqttException )
    {
      System.out.println( "Exiting due to a MQTT Exception." );
      exiting( mqttException.getReasonCode(), mqttException.getLocalizedMessage() );
    }
    catch( java.lang.RuntimeException exception )
    {
      // This exception can trigger if the broker address is set to an invalid IP address (e.g. 999.0.0.1).
      // The more specific exception is java.lang.reflect.InaccessibleObjectException, which was added in Java version 9, but I am building for Java 8.
      System.out.println( "\n\nThe address you provided is likely invalid!" );
      exiting( -1, "Invalid broker address!" );
    }
  }


  public static class CallbackMethods implements MqttCallback
  {
    public CallbackMethods(){ /* This constructor is needed when used as a callback class. */ }


    /**
     * Actions to take when a message arrives for a topic the client is subscribed to.
     *
     * @param topicName   The topic associated with this message.
     * @param mqttMessage The message that was received.
     */
    @SuppressWarnings( "squid:S106" )
    public void messageArrived( String topicName, MqttMessage mqttMessage ){ /* This method is deliberately left empty. */ }


    /**
     * Actions to take if the connection is lost.
     * This method is deliberately left empty.
     *
     * @param throwable the exception to throw.
     */
    public void connectionLost( Throwable throwable ){ /* This method is deliberately left empty. */ }


    /**
     * Actions to take when a published message is delivered.
     * This method is deliberately left empty.
     *
     * @param iMqttDeliveryToken the delivery token.
     */
    public void deliveryComplete( IMqttDeliveryToken iMqttDeliveryToken ){ /* This method is deliberately left empty. */ }
  }
}
package com.faircom.publish;


import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Locale;
import java.util.Properties;


public class Utilities
{
  Utilities(){ /* This constructor hides the implicit public constructor. */ }


  /**
   * processCLAs will process the Command-Line Arguments passed to the program.
   *
   * @param args     the arguments passed to the program.
   * @param settings a JSONObject class to set options.
   */
  static void processCLAs( String[] args, Properties settings )
  {
    int claCount = args.length;
    if( claCount > 0 )
    {
      for( int i = 0; i < claCount; i += 2 )
      {
        String claValue = processCLA( args[i] );
        if( claCount > ( i + 1 ) )
          settings.put( claValue, args[i + 1] );
        else
        {
          usage();
          exiting( -2, "Option #" + ( i + 1 ) + ", \"" + claValue + "\", is invalid.  This switch must be followed by a value!" );
        }
      }
    }
    else
    {
      usage();
      exiting( -3, "Missing required parameters!" );
    }
    // Check that all required settings are fulfilled.
    validateSettings( settings );
  }


  /**
   * validateSettings() will ensure that requirements are met for all configurable options.
   *
   * @param settings the settings object to validate.
   */
  private static void validateSettings( Properties settings )
  {
    // Check that all required settings are fulfilled.
    if( settings.getProperty( "brokerAddress" ).isEmpty() )   // NOSONAR
    {
      exiting( -5, "Empty \"brokerAddress\"!" );
    }
    if( !settings.getProperty( "brokerAddress" ).startsWith( "tcp://" ) && !settings.getProperty( "brokerAddress" ).startsWith( "ssl://" ) )
    {
      exiting( -6, "Broker address must begin with the protocol \"tcp://\" or \"ssl://\"!" );
    }
    if( settings.getProperty( "clientID" ).isEmpty() )
    {
      exiting( -7, "Empty \"clientID\"!" );
    }
    if( settings.getProperty( "topic" ).isEmpty() )  // NOSONAR
    {
      exiting( -8, "Empty \"topic\"!" );
    }
    if( settings.getProperty( "fileName" ).isEmpty() )
    {
      exiting( -9, "Empty \"fileName\"!" );
    }
    // The username and password are optional, but if a username is set, the password must also be set.
    if( !settings.getProperty( "username" ).isEmpty() && settings.getProperty( "password" ).isEmpty() ) // NOSONAR
    {
      exiting( -10, "Username is provided, but password is missing!" );
    }
  }


  /**
   * processCLA will analyze one command-line argument and return the setting name for it.
   *
   * @param arg the command-line argument to analyze.
   * @return a String representing the setting.
   */
  private static String processCLA( String arg )
  {
    switch( arg.toLowerCase( Locale.ROOT ) )
    {
      case "h":
      case "-h":
      case "-?":
      case "/h":
      case "/?":
      case "help":
      case "-help":
        usage();
        exiting( 0, "" );
        return null;
      case "-s":
      case "server":
      case "broker":
      case "brokeraddress":
        return "brokerAddress";
      case "-t":
      case "topic":
        return "topic";
      case "-c":
      case "client":
      case "clientid":
        return "clientID";
      case "-f":
      case "file":
      case "filename":
        return "fileName";
      case "-u":
      case "user":
      case "username":
        return "username";
      case "-p":
      case "pass":
      case "password":
        return "password";
      case "-q":
      case "-qos":
      case "qos":
        return "qos";
      default:
        usage();
        exiting( -11, "Unrecognized option \"" + arg + "\"!" );
        return null;
    }
  }


  /**
   * usage() will print to screen how to use this program.
   */
  @SuppressWarnings( "squid:S106" )
  public static void usage()
  {
    System.out.println();
    System.out.println( "Usage: java -jar Publish.jar -t topic -f fileName [options]..." );
    System.out.println( "Publish the contents of a file to a broker using the specified topic." );
    System.out.println();
    System.out.println( "Options:" );
    System.out.println( "   -s server     Server address.  Defaults to \"tcp://127.0.0.1:1883\"." );
    System.out.println( "   -t topic      The topic to publish to.  Defaults to \"testTopic\"." );
    System.out.println( "   -c clientID   The unique ID to use when connecting to the broker.  Defaults to \"testPubClientID\"." );
    System.out.println( "   -f fileName   The file to use as a payload to publish." );
    System.out.println( "   -u username   An optional username to authenticate with when connecting to the broker.  If absent, a connection without credentials will be attempted." );
    System.out.println( "   -p password   The password to use with the username provided above.  Ignored if the username is absent." );
    System.out.println( "   -q qos        The Quality Of Service to publish with.  Defaults to 1." );
    System.out.println( "   -h help       Print the usage and exit." );
    System.out.println();
    System.out.println( "Options may be present in any order, and if absent, default values will be used." );
  }


  /**
   * readFileToByteArray() will open the file specified by fileName, read that into an array of bytes, and return that array.
   *
   * @param fileName the name of the file to open.
   * @return an array of bytes.
   */
  @SuppressWarnings( "squid:S106" )
  public static byte[] readFile( String fileName )
  {
    try( InputStream inputStream = new FileInputStream( fileName ) )
    {
      long fileSize = new File( fileName ).length();
      byte[] allBytes = new byte[( int ) fileSize];

      int bytesRead = inputStream.read( allBytes );
      System.out.println( "Read " + bytesRead + " bytes from \"" + fileName + "\"." );
      return allBytes;
    }
    catch( IOException ioException )
    {
      System.out.println( "Unable to read from \"" + fileName + "\"!" );
      exiting( -12, ioException.getLocalizedMessage() );
    }
    // This is unreachable because of the call to exiting() in the catch block.
    return new byte[0];
  }


  /**
   * exiting() will print an error message and return an exit code to the JVM.
   *
   * @param message  the text to display.
   * @param exitCode the exit code to return to the JVM.
   */
  @SuppressWarnings( "squid:S106" )
  static void exiting( int exitCode, String message )
  {
    System.out.println();
    System.out.println( message );
    System.out.println( "Exit code: " + exitCode );

    System.exit( exitCode );
  } // End of exiting() method.
}

This tutorial shows you how to build a command-line utility for subscribing to MQTT messages. Each message sent to the topic is output to the console and optionally written as a file to a specified folder. This utility is useful for testing and troubleshooting MQTT.

The source code is simple and self-explanatory:
  • The tutorial is located in the <faircom>/drivers/java.mqtt/JavaMQTTTutorial2 folder.

  • The source code is in the files, Subscribe.java and Utilities.java, which are located in the <faircom>/drivers/java.mqtt/JavaMQTTTutorial2/src/com/faircom/subscribe/ folder.

  • Use your Java IDE to open pom.xml and build the project, or build it using the Maven mvn package lifecycle.

Command line usage

copy target\Subscribe.jar .
java -jar Subscribe.jar -t topic -d directory -q QoS -c clientid -s hostname:port -u username -p password

Command line options

Options may be present in any order, and if absent, default values will be used.

Table 2. Command line options

Option

Description

-t topic

An optional topic to subscribe to. It defaults to testTopic.

-d directory

An optional directory to save messages as files. If omitted, messages will be printed to screen and not saved to files. If you specify a directory that does not exist, it is created. Existing files in the directory with the same name are overwritten.

-q QoS

An optional quality of service to use when when subscribing to the topic. It defaults to 1.

  • 0 provides no guarantee of delivery.

  • 1 guarantees delivery at least once.

  • 2 guarantees delivery only once.

-c clientID

An optional client id that uniquely identifies the client to the MQTT broker. It defaults to testSubClientID.

-s hostname:port

An optional hostname and port of an MQTT broker. It defaults to tcp://127.0.0.1:1883.

-u username

An optional user name to use when connecting to the broker. If absent, a connection without a username and password is attempted.

-p password

An optional password to use with the username provided above. Ignored if the username is absent.

-h

Prints the usage and exit.



Example 3. Usage
java -jar Subscribe.jar -t testTopic -d testDir


Example 4. Output
Connecting to the MQTT broker at address: "tcp://127.0.0.1:1883"

Using "testDir" as the output directory.

Successfully subscribed to the "testTopic" topic.

Going into an infinite loop to listen for published messages...

Message arrived: "

{

    "property":    "value",

    "string":      "string",

    "number":      1,

    "boolean":     true,

    "null":        null,

    "array":       [

        "value",

        1,

        1.1,

        -0,

        true,

        false,

        null

    ],

    "emptyObject": { }

}

" on topic "testTopic", QoS 1

Wrote to "C:\testDir\testTopic_000001".

The client has disconnected from the broker and all client resources have been freed.


Store incoming messages

When a directory is specified, the utility writes the payload of each received message to a file. The name of the file is the name of the topic you specified followed by an underscore plus a sequential number. If the topic name contains characters that are invalid for a file name, they are replaced with underscores. Topic names are case-sensitive. The file has no extension because an MQTT payload can be anything.

Examples of file/topic names:
  • testTopic_000001

  • testTopic_000002

  • testTopic_000003

  • testTopic_000004

Code

package com.faircom.subscribe;


import org.eclipse.paho.client.mqttv3.*;

import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Properties;

import static com.faircom.subscribe.Utilities.exiting;
import static com.faircom.subscribe.Utilities.processCLAs;


/**
 * Command-line utility to subscribe to a topic.
 * This can optionally save messages to files in a specified directory.
 * args:
 * -s tcp://hostname:port or ssl://address:port
 * -t topic
 * -c clientID
 * -d directory
 * -u username
 * -p password
 * -q qos
 * -h help
 *
 * Note: Paho will trigger a java.lang.reflect.InaccessibleObjectException if the broker address is set to an invalid IP address (e.g. 999.0.0.1).
 * That exception causes this program to exit awkwardly.
 * InaccessibleObjectException was added in Java version 9, but I am building for Java 8, so I am catching RuntimeException (the parent class).
 * Java 8 seems to handle this exception as a generic "MqttException", with no further details.
 */
public class Subscribe
{
  private static File outDirFile;
  // This limit is the number of bytes a file name can be on most modern filesystems.
  private static final int MAX_NAME_LEN = 255;
  private static final Charset CHARSET = StandardCharsets.UTF_8;


  @SuppressWarnings( "squid:S106" )
  public static void main( String[] args )
  {
    // Program default settings.
    Properties settings = new Properties();
    settings.setProperty( "brokerAddress", "tcp://127.0.0.1:1883" );  // NOSONAR
    settings.setProperty( "username", "" );                           // NOSONAR
    settings.setProperty( "password", "" );                           // NOSONAR
    settings.setProperty( "topic", "testTopic" );                     // NOSONAR
    settings.setProperty( "clientID", "testSubClientID" );            // NOSONAR
    settings.setProperty( "outputDirectory", "" );                    // NOSONAR
    settings.setProperty( "qos", "1" );                               // NOSONAR

    // Read and validate command-line arguments.
    processCLAs( args, settings );

    try( MqttClient subClient = new MqttClient( settings.getProperty( "brokerAddress" ), settings.getProperty( "clientID" ) ) )
    {
      // Block on MQTT actions for up to 20 seconds, then continue the action in the background.
      subClient.setTimeToWait( 20000 );
      subClient.setCallback( new CallbackMethods() );

      System.out.println( "Connecting to the MQTT broker at address: \"" + settings.getProperty( "brokerAddress" ) + "\"" );
      MqttConnectOptions options = new MqttConnectOptions();
      // Change the keep-alive from the default of 60 seconds.
      options.setKeepAliveInterval( 30 );
      // Change the automatic-reconnect from the default of false.
      options.setAutomaticReconnect( true );
      if( !settings.getProperty( "username" ).isEmpty() )
      {
        options.setUserName( settings.getProperty( "username" ) );
        options.setPassword( settings.getProperty( "password" ).toCharArray() );
      }
      subClient.connect( options );

      if( !settings.getProperty( "outputDirectory" ).isEmpty() )
      {
        // The existence of this directory will already have been verified by the validateSettings() method.
        outDirFile = new File( settings.getProperty( "outputDirectory" ) );
        System.out.println( "Using \"" + settings.getProperty( "outputDirectory" ) + "\" as the output directory." );
      }

      subClient.subscribe( settings.getProperty( "topic" ), Integer.parseInt( settings.getProperty( "qos" ) ) );
      System.out.println( "Successfully subscribed to the \"" + settings.getProperty( "topic" ) + "\" topic." );
      System.out.println( "Going into an infinite loop to listen for published messages..." );

      // Since the while loop below has no exit condition, set a shutdown hook to close the connection gracefully.
      Thread disconnectHook = new Thread( () -> {
        if( cleanup( subClient, settings.getProperty( "topic" ) ) )
          System.out.println( "The client has unsubscribed from the topic, disconnected from the broker, and all client resources have been freed." );
        else
          System.out.println( "The client was not able to properly disconnect from the broker. It may be necessary to remove temporary directories." );
      } );
      Runtime.getRuntime().addShutdownHook( disconnectHook );

      while( subClient.isConnected() )
      {
        try // NOSONAR
        {
          //noinspection BusyWait
          Thread.sleep( 200 );
        }
        catch( InterruptedException interruptedException )
        {
          Thread.currentThread().interrupt();
        }
      }
      System.out.println( "connected? " + subClient.isConnected() );

      System.out.println( "Disconnected from the MQTT broker." );
    }
    catch( MqttException mqttException )
    {
      System.out.println( "Exiting due to a MQTT Exception." );
      exiting( mqttException.getReasonCode(), mqttException.getLocalizedMessage() );
    }
    catch( java.lang.RuntimeException exception )
    {
      // This exception can trigger if the broker address is set to an invalid IP address (e.g. 999.0.0.1).
      // The more specific exception is java.lang.reflect.InaccessibleObjectException, which was added in Java version 9, but I am building for Java 8.
      System.out.println( "\n\nThe address you provided is likely invalid!" );
      exiting( -1, "Invalid broker address!" );
    }
  }


  /**
   * cleanup() will disconnect from the broker and release all client resources.
   *
   * @param client the client to disconnect and free.
   * @return true on success.
   */
  private static boolean cleanup( MqttClient client, String topic )
  {
    try
    {
      client.unsubscribe( topic );
      client.disconnect();
      return true;
    }
    catch( MqttException mqttException )
    {
      mqttException.printStackTrace();
    }
    return false;
  }


  public static class CallbackMethods implements MqttCallback
  {
    private int counter = 0;


    public CallbackMethods(){ /* This constructor is needed when used as a callback class. */ }


    /**
     * Actions to take when a message arrives for a topic the client is subscribed to.
     *
     * @param topicName   The topic associated with this message.
     * @param mqttMessage The message that was received.
     */
    @SuppressWarnings( "squid:S106" )
    public void messageArrived( String topicName, MqttMessage mqttMessage )
    {
      String logString = "Message arrived: \"\n" + new String( mqttMessage.getPayload() ) + "\" on topic \"" + topicName + "\", QoS " + mqttMessage.getQos();
      System.out.println( logString );
      counter++;
      if( outDirFile != null )
      {
        String sanitizedTopic = sanitizeTopic( topicName );
        // Set a limit to the maximum path length, to prevent recursion issues from symbolic links.
        if( outDirFile.getAbsolutePath().getBytes( CHARSET ).length + sanitizedTopic.getBytes( CHARSET ).length > ( MAX_NAME_LEN * 8 ) )
        {
          System.out.println( "Output file name would be too large for this filesystem.  Logging to file will be disabled for this topic: \"" + topicName + "\"." );
          return;
        }
        // Create a temporary file in the output directory, using the topic and a suffixed number as the file name.
        File tempFile = new File( outDirFile.getAbsolutePath(), sanitizedTopic + "_" + String.format( "%06d", counter ) );
        Path temp = Paths.get( tempFile.getAbsolutePath() );
        try
        {
          Path tempPath = Files.write( temp, mqttMessage.getPayload() );
          System.out.println( "Wrote to \"" + outDirFile.getAbsolutePath() + System.getProperty( "file.separator" ) + tempPath.getFileName() + "\"." );
          System.out.println();
        }
        catch( IOException ioException )
        {
          ioException.printStackTrace();
          exiting( -55, "Unable to write to the output file!" );
        }
      }
    }


    /**
     * sanitizeTopic() will replace characters in the topic which are invalid for a file name.
     *
     * @param topicName the String to check for invalid characters.
     * @return a String devoid of invalid characters.
     */
    private String sanitizeTopic( String topicName )
    {
      topicName = topicName.replace( '\\', '_' );
      topicName = topicName.replace( '/', '_' );
      topicName = topicName.replace( '*', '_' );
      topicName = topicName.replace( '"', '_' );
      topicName = topicName.replace( '?', '_' );
      topicName = topicName.replace( ':', '_' );
      topicName = topicName.replace( '<', '_' );
      topicName = topicName.replace( '>', '_' );
      topicName = topicName.replace( '|', '_' );
      topicName = topicName.replace( ' ', '_' );
      topicName = topicName.replace( '.', '_' );
      topicName = topicName.replace( '&', '_' );
      topicName = topicName.replace( '%', '_' );
      topicName = topicName.replace( '#', '_' );
      // Return at most 255 bytes minus the size in bytes of 6 numbers (for the affixed message sequence number).
      return topicName.substring( topicName.getBytes( CHARSET ).length - MAX_NAME_LEN - "000001".getBytes( CHARSET ).length );
    }


    /**
     * Actions to take if the connection is lost.
     * This method is deliberately left empty.
     *
     * @param throwable the exception to throw.
     */
    public void connectionLost( Throwable throwable ){ /* This method is deliberately left empty. */ }


    /**
     * Actions to take when a published message is delivered.
     * This method is deliberately left empty.
     *
     * @param iMqttDeliveryToken the delivery token.
     */
    public void deliveryComplete( IMqttDeliveryToken iMqttDeliveryToken ){ /* This method is deliberately left empty. */ }
  }
}
package com.faircom.subscribe;


import java.io.File;
import java.util.Locale;
import java.util.Properties;


public class Utilities
{
  Utilities(){ /* This constructor hides the implicit public constructor. */ }


  /**
   * processCLAs will process the Command-Line Arguments passed to the program.
   *
   * @param args     the arguments passed to the program.
   * @param settings a JSONObject class to set options.
   */
  static void processCLAs( String[] args, Properties settings )
  {
    int claCount = args.length;
    if( claCount > 0 )
    {
      for( int i = 0; i < claCount; i += 2 )
      {
        String claValue = processCLA( args[i] );
        if( claCount > ( i + 1 ) )
          settings.put( claValue, args[i + 1] );
        else
        {
          usage();
          exiting( -2, "Option #" + ( i + 1 ) + ", \"" + claValue + "\", is invalid.  This switch must be followed by a value!" );
        }
      }
    }
    // Check that all required settings are fulfilled.
    validateSettings( settings );
  }


  /**
   * processCLA will analyze one command-line argument and return the setting name for it.
   *
   * @param arg the command-line argument to analyze.
   * @return a String representing the setting.
   */
  private static String processCLA( String arg )
  {
    switch( arg.toLowerCase( Locale.ROOT ) )
    {
      case "h":
      case "-h":
      case "-?":
      case "/h":
      case "/?":
      case "help":
      case "-help":
        usage();
        exiting( 0, "" );
        return null;
      case "-s":
      case "brokeraddress":
      case "server":
      case "broker":
        return "brokerAddress"; // NOSONAR
      case "-t":
      case "topic":  // NOSONAR
        return "topic";
      case "-c":
      case "client":
      case "clientid":
        return "clientID";
      case "-d":
      case "dir":
      case "directory":
        return "outputDirectory";  // NOSONAR
      case "-u":
      case "user":
      case "username":  // NOSONAR
        return "username";
      case "-p":
      case "pass":
      case "password":  // NOSONAR
        return "password";
      case "-q":
      case "-qos":
      case "qos":
        return "qos";
      default:
        usage();
        exiting( -4, "Unrecognized option \"" + arg + "\"!" );
        return null;
    }
  }


  /**
   * validateSettings() will ensure that requirements are met for all configurable options.
   *
   * @param settings the settings object to validate.
   */
  private static void validateSettings( Properties settings )
  {
    // Check that all required settings are fulfilled.
    if( settings.getProperty( "brokerAddress" ).isEmpty() )
    {
      exiting( -5, "Empty \"brokerAddress\"!" );
    }
    if( !settings.getProperty( "brokerAddress" ).startsWith( "tcp://" ) && !settings.getProperty( "brokerAddress" ).startsWith( "ssl://" ) )
    {
      exiting( -6, "Broker address must begin with the protocol \"tcp://\" or \"ssl://\"!" );
    }
    if( settings.getProperty( "clientID" ).isEmpty() )
    {
      exiting( -7, "Empty \"clientID\"!" );
    }
    if( settings.getProperty( "topic" ).isEmpty() )
    {
      exiting( -8, "Empty \"topic\"!" );
    }
    // The username and password are optional, but if a username is set, the password must also be set.
    if( !settings.getProperty( "username" ).isEmpty() && settings.getProperty( "password" ).isEmpty() )
    {
      exiting( -9, "Username is provided, but password is missing!" );
    }
    if( !settings.getProperty( "outputDirectory" ).isEmpty() )
    {
      File outDirFile = new File( settings.getProperty( "outputDirectory" ) );
      // If the directory does not exist, or if it is not a directory, attempt to create the directory.  Exit on failure.
      if( ( !outDirFile.exists() || !outDirFile.isDirectory() ) && !outDirFile.mkdir() )
        exiting( -99, "Unable to create the output directory!" );
    }
  }


  /**
   * usage() will print to screen how to use this program.
   */
  @SuppressWarnings( "squid:S106" )
  public static void usage()
  {
    System.out.println();
    System.out.println( "Usage: java -jar Subscribe.jar -t topic [options]..." );
    System.out.println( "Subscribe to a specified topic from a specified broker, print messages to the screen, and persist messages to files in the specified directory." );
    System.out.println();
    System.out.println( "Options:" );
    System.out.println( "   -s server      Server address.  Defaults to \"tcp://127.0.0.1:1883\"." );
    System.out.println( "   -t topic       The topic to subscribe to.  Defaults to \"testTopic\"." );
    System.out.println( "   -c clientID    The unique ID to use when connecting to the broker.  Defaults to \"testSubClientID\"." );
    System.out.println( "   -d directory   An optional directory to save message files to.  If absent, no messages will be saved to disk." );
    System.out.println( "   -u username    An optional username to authenticate with when connecting to the broker.  If absent, a connection without credentials will be attempted." );
    System.out.println( "   -p password    The password to use with the username provided above.  Ignored if the username is absent." );
    System.out.println( "   -q qos         The Quality Of Service to subscribe with.  Defaults to 1." );
    System.out.println( "   -h help        Print the usage and exit." );
    System.out.println();
    System.out.println( "Options may be present in any order, and if absent, default values will be used." );
  }


  /**
   * exiting() will print an error message and return an exit code to the JVM.
   *
   * @param message  the text to display.
   * @param exitCode the exit code to return to the JVM.
   */
  @SuppressWarnings( "squid:S106" )
  static void exiting( int exitCode, String message )
  {
    System.out.println();
    System.out.println( message );
    System.out.println( "Exit code: " + exitCode );

    System.exit( exitCode );
  } // End of exiting() method.
}

Don't hesitate to contact us with questions, suggestions, and bug reports. We want you to be successful.

Contact info:
  • Address:

    6300 W. Sugar Creek Drive

    Columbia, Missouri 65203-9052

  • Phone:

    800.234.8180