My Adventures in Coding

August 16, 2011

JMS – How to setup a DurableSubscriber with a MessageListener using ActiveMQ

When I first used JMS and ActiveMQ the first example I tried was a very simple Producer and Consumer that sent/received a single text message to/from ActiveMQ using a Queue. This was very easy to do, but then I wanted to see how to use a Topic and use a DurableSubscriber with a MessageListener. Also, rather than using auto acknowledge, which tells ActiveMQ to remove the message the moment the subscriber receives it, instead I wanted to wait until the subscriber had finished processing the message before acknowledging to ActiveMQ that the message was successfully processed. It turns out all of this was much easier to do than I had thought. So let’s set up a simple example.

Setup ActiveMQ

  • Download the latest version ActiveMQ here. (For this post 5.5.0 was used)
  • Now unpack the tar file and start ActiveMQ:
  •          tar -zxvf apache-activemq-5.5.0-bin.tar.gz
             cd apache-activemq-5.5.0/bin
             chmod 755 activemq
             cd ..
             bin/activemq start
           
  • You should now be able to access the ActiveMQ admin page: http://localhost:8161/admin/

Get the required libraries
To compile and run the publisher and subscriber, you will need the following two libraries on your class path:

  • From the apache-activemq-5.5.0 folder get the jar file: activemq-all-5-5-0.jar
  • You will also need SLF4J (Simple Logging Facade for Java): slf4j-simple-1.5.11.jar
    • NOTE: version 1.6+ will not work with the current version of ActiveMQ (5.5.0)
    • You will need to download version 1.5.11 here

NOTE: For this example I used scala 2.9.0.

I just used IntelliJ with a lib folder for this example, however if you are using maven, you can just use these dependencies:

<dependencies>
    <dependency>
          <groupId>org.apache.activemq</groupId>
          <artifactId>activemq-core</artifactId
          <version>5.5.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.5.11</version>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.9.0</version>
    </dependency>
</dependencies>

Producer
The producer code is fairly straight forward. Create a connection to ActiveMQ, create a Topic, create a MessageProducer, and send three messages. That’s all!

import javax.jms._
import org.apache.activemq.ActiveMQConnection
import org.apache.activemq.ActiveMQConnectionFactory

object Producer {
  val activeMqUrl: String = ActiveMQConnection.DEFAULT_BROKER_URL
  val topicName: String = "TOPIC_NAME"

  def main(args: Array[String]): Unit = {
    val connectionFactory: ConnectionFactory = new ActiveMQConnectionFactory(activeMqUrl)
    val connection: Connection = connectionFactory.createConnection
    connection.start
    val session: Session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
    val destination: Destination = session.createTopic(topicName)
    val messageProducer: MessageProducer = session.createProducer(destination)
    val textMessage: TextMessage = session.createTextMessage("Hello Subscriber!")
    for (i <- 0 until 3) {
      messageProducer.send(textMessage)
      println("Message sent to subscriber: '" + textMessage.getText + "'")
    }
    connection.close
  }

}

Durable Subscriber
The durable subscriber will register to receive messages on the same topic that the producer is sending messages to. Every message sent by the producer will be received and printed by the message listener. After processing the received message, the durable subscriber will acknowledge it telling ActiveMQ that the message was successfully received.

import javax.jms._
import org.apache.activemq.ActiveMQConnection
import org.apache.activemq.ActiveMQConnectionFactory

object DurableSubscriber {
  val activeMqUrl: String = ActiveMQConnection.DEFAULT_BROKER_URL
  val topicName: String = "TOPIC_NAME"

  def main(args: Array[String]): Unit = {
    // Set up the connection, same as the producer, however you also need to set a
    // unique ClientID which ActiveMQ uses to identify the durable subscriber
    val connectionFactory: ConnectionFactory = new ActiveMQConnectionFactory(activeMqUrl)
    val connection: Connection = connectionFactory.createConnection
    connection.setClientID("SomeClientID")
    connection.start

    // We don't want to use AUTO_ACKNOWLEDGE, instead we want to ensure the subscriber has successfully
    // processed the message before telling ActiveMQ to remove it, so we will use CLIENT_ACKNOWLEDGE
    val session: Session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE)

    // Register to be notified of all messages on the topic
    val topic: Topic = session.createTopic(topicName)
    val durableSubscriber: TopicSubscriber = session.createDurableSubscriber(topic, "Test_Durable_Subscriber")

    // Create a listener to process each received message
    val listener: MessageListener = new MessageListener {
      def onMessage(message: Message): Unit = {
        try {
          if (message.isInstanceOf[TextMessage]) {
            val textMessage: TextMessage = message.asInstanceOf[TextMessage]
            println("Message received from producer: '" + textMessage.getText + "'")

            // Once we have successfully processed the message, send an acknowledge back to ActiveMQ
            message.acknowledge
          }
        }
        catch {
          case je: JMSException => {
            println(je.getMessage)
          }
        }
      }
    }

    // Add the message listener to the durable subscriber
    durableSubscriber.setMessageListener(listener)
  }

}

To run the program, first start the DurableSubscriber. It will connect to ActiveMQ and wait to receive a message. Now when you run the Producer you will see that the DurableSubscriber receives all three messages sent by the Producer.

Start DurableSubscriber

278 [ActiveMQ Task-1] INFO org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://localhost:61616

Start Producer

277 [ActiveMQ Task-1] INFO org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://localhost:61616
Message sent to subscriber: 'Hello Subscriber!'
Message sent to subscriber: 'Hello Subscriber!'
Message sent to subscriber: 'Hello Subscriber!'

Messages Received by DurableSubscriber

Message received from producer: 'Hello Subscriber!'
Message received from producer: 'Hello Subscriber!'
Message received from producer: 'Hello Subscriber!'

ActiveMQ makes all of this very easy!

Blog at WordPress.com.