My Adventures in Coding

August 16, 2011

JMS – How to setup a DurableSubscriber with a MessageListener using ActiveMQ

When I first used JMS and ActiveMQ the first example I tried was a very simple Producer and Consumer that sent/received a single text message to/from ActiveMQ using a Queue. This was very easy to do, but then I wanted to see how to use a Topic and use a DurableSubscriber with a MessageListener. Also, rather than using auto acknowledge, which tells ActiveMQ to remove the message the moment the subscriber receives it, instead I wanted to wait until the subscriber had finished processing the message before acknowledging to ActiveMQ that the message was successfully processed. It turns out all of this was much easier to do than I had thought. So let’s set up a simple example.

Setup ActiveMQ

  • Download the latest version ActiveMQ here. (For this post 5.5.0 was used)
  • Now unpack the tar file and start ActiveMQ:
  •          tar -zxvf apache-activemq-5.5.0-bin.tar.gz
             cd apache-activemq-5.5.0/bin
             chmod 755 activemq
             cd ..
             bin/activemq start
           
  • You should now be able to access the ActiveMQ admin page: http://localhost:8161/admin/

Get the required libraries
To compile and run the publisher and subscriber, you will need the following two libraries on your class path:

  • From the apache-activemq-5.5.0 folder get the jar file: activemq-all-5-5-0.jar
  • You will also need SLF4J (Simple Logging Facade for Java): slf4j-simple-1.5.11.jar
    • NOTE: version 1.6+ will not work with the current version of ActiveMQ (5.5.0)
    • You will need to download version 1.5.11 here

NOTE: For this example I used scala 2.9.0.

I just used IntelliJ with a lib folder for this example, however if you are using maven, you can just use these dependencies:

<dependencies>
    <dependency>
          <groupId>org.apache.activemq</groupId>
          <artifactId>activemq-core</artifactId
          <version>5.5.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.5.11</version>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.9.0</version>
    </dependency>
</dependencies>

Producer
The producer code is fairly straight forward. Create a connection to ActiveMQ, create a Topic, create a MessageProducer, and send three messages. That’s all!

import javax.jms._
import org.apache.activemq.ActiveMQConnection
import org.apache.activemq.ActiveMQConnectionFactory

object Producer {
  val activeMqUrl: String = ActiveMQConnection.DEFAULT_BROKER_URL
  val topicName: String = "TOPIC_NAME"

  def main(args: Array[String]): Unit = {
    val connectionFactory: ConnectionFactory = new ActiveMQConnectionFactory(activeMqUrl)
    val connection: Connection = connectionFactory.createConnection
    connection.start
    val session: Session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
    val destination: Destination = session.createTopic(topicName)
    val messageProducer: MessageProducer = session.createProducer(destination)
    val textMessage: TextMessage = session.createTextMessage("Hello Subscriber!")
    for (i <- 0 until 3) {
      messageProducer.send(textMessage)
      println("Message sent to subscriber: '" + textMessage.getText + "'")
    }
    connection.close
  }

}

Durable Subscriber
The durable subscriber will register to receive messages on the same topic that the producer is sending messages to. Every message sent by the producer will be received and printed by the message listener. After processing the received message, the durable subscriber will acknowledge it telling ActiveMQ that the message was successfully received.

import javax.jms._
import org.apache.activemq.ActiveMQConnection
import org.apache.activemq.ActiveMQConnectionFactory

object DurableSubscriber {
  val activeMqUrl: String = ActiveMQConnection.DEFAULT_BROKER_URL
  val topicName: String = "TOPIC_NAME"

  def main(args: Array[String]): Unit = {
    // Set up the connection, same as the producer, however you also need to set a
    // unique ClientID which ActiveMQ uses to identify the durable subscriber
    val connectionFactory: ConnectionFactory = new ActiveMQConnectionFactory(activeMqUrl)
    val connection: Connection = connectionFactory.createConnection
    connection.setClientID("SomeClientID")
    connection.start

    // We don't want to use AUTO_ACKNOWLEDGE, instead we want to ensure the subscriber has successfully
    // processed the message before telling ActiveMQ to remove it, so we will use CLIENT_ACKNOWLEDGE
    val session: Session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE)

    // Register to be notified of all messages on the topic
    val topic: Topic = session.createTopic(topicName)
    val durableSubscriber: TopicSubscriber = session.createDurableSubscriber(topic, "Test_Durable_Subscriber")

    // Create a listener to process each received message
    val listener: MessageListener = new MessageListener {
      def onMessage(message: Message): Unit = {
        try {
          if (message.isInstanceOf[TextMessage]) {
            val textMessage: TextMessage = message.asInstanceOf[TextMessage]
            println("Message received from producer: '" + textMessage.getText + "'")

            // Once we have successfully processed the message, send an acknowledge back to ActiveMQ
            message.acknowledge
          }
        }
        catch {
          case je: JMSException => {
            println(je.getMessage)
          }
        }
      }
    }

    // Add the message listener to the durable subscriber
    durableSubscriber.setMessageListener(listener)
  }

}

To run the program, first start the DurableSubscriber. It will connect to ActiveMQ and wait to receive a message. Now when you run the Producer you will see that the DurableSubscriber receives all three messages sent by the Producer.

Start DurableSubscriber

278 [ActiveMQ Task-1] INFO org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://localhost:61616

Start Producer

277 [ActiveMQ Task-1] INFO org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://localhost:61616
Message sent to subscriber: 'Hello Subscriber!'
Message sent to subscriber: 'Hello Subscriber!'
Message sent to subscriber: 'Hello Subscriber!'

Messages Received by DurableSubscriber

Message received from producer: 'Hello Subscriber!'
Message received from producer: 'Hello Subscriber!'
Message received from producer: 'Hello Subscriber!'

ActiveMQ makes all of this very easy!

5 Comments »

  1. You Sir, are a lifesaver .. Thank you..

    Comment by zZoiram — December 1, 2011 @ 3:50 pm | Reply

  2. Thanks for the article, it was exactly what I was looking for!
    Any idea on why my Message dequeued count in the console stays at 0, even if messages are consumed and acknowledged as per your code?

    Comment by Kraon — April 30, 2012 @ 9:51 am | Reply

  3. thanks.. I was looking for an asynchronous subscriber.

    Comment by tom — October 4, 2013 @ 12:54 am | Reply

  4. It does not seem to me that you have illustrated the durableness of the subscriber here. If I understand the concept of durableness correctly (which is highly doubtful), you would start the subscriber, and then start the producer and send some messages. Then, while the producer is still running, you would stop the subscriber. The producer would send more messages. The subscriber would be restarted, and it would report the receipt of the messages that the producer sent while the subscriber is stopped.

    Comment by RobR — July 28, 2014 @ 11:37 am | Reply

    • You are right. In the article I don’t walk through demonstrating the durableness of a subscriber, instead the article was more about how to do the set and to get a simple example of ActiveMQ’s durable subscriber working so you can try it out.

      Comment by Brian — July 28, 2014 @ 11:50 am | Reply


RSS feed for comments on this post. TrackBack URI

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Create a free website or blog at WordPress.com.

%d bloggers like this: