My Adventures in Coding

October 15, 2011

JMS – How to do Synchronous Messaging with ActiveMQ in Scala

Filed under: ActiveMQ,JMS,Scala — Brian @ 6:49 pm
Tags: , , ,

The first question you will probably ask is “Aren’t queues designed for the purpose of asynchronous fire and forget style messaging?”. Well, yep, they are! Your goal for scaling should be to make anything that can be asynchronous, asynchronous. However, if a message bus such as ActiveMQ is how your applications talk to each other, sometimes an immediate response is necessary (although we try to avoid this when possible). The following is a simple example in Scala showing how to send such a message with ActiveMQ.

Setup ActiveMQ

  • Download the latest version ActiveMQ here. (For this post 5.5.0 was used)
  • Now unpack the tar file and start ActiveMQ:
  •          tar -zxvf apache-activemq-5.5.0-bin.tar.gz
             cd apache-activemq-5.5.0/bin
             chmod 755 activemq
             cd ..
             bin/activemq start
           
  • You should now be able to access the ActiveMQ admin page: http://localhost:8161/admin/

Get the required libraries
To compile and run the publisher and subscriber, you will need the following two libraries on your class path:

  • From the apache-activemq-5.5.0 folder get the jar file: activemq-all-5-5-0.jar
  • You will also need SLF4J (Simple Logging Facade for Java): slf4j-simple-1.5.11.jar
    • NOTE: version 1.6+ will not work with the current version of ActiveMQ (5.5.0)
    • You will need to download version 1.5.11 here

NOTE: For this example I used scala 2.9.0.

Synchronous Producer
The main difference between an asynchronous and synchronous producer is that the synchronous producer has two queues: a send queue and a reply queue. The send queue is the queue on which the producer will send a message to the consumer. The reply queue is the queue on which the producer will listen for a reply from the consumer. The producer when it sends a message sets two important pieces of information on the message:

  1. JMSCorrelationID: This is the uniqueID used by the producer to indentify the message
  2. JMSReplyTo: This tells the consumer on which queue to send the message reply

The producer then creates a “ReplyConsumer” on the reply queue and listens for a reply from the consumer that contains that “JMSCorrelationID”. When a message with that ID appears on the reply queue, the ReplyConsumer will receive that message and our synchronous message round trip has been completed!

import javax.jms._
import org.apache.activemq.ActiveMQConnectionFactory
import util.Random

object ProducerSynchronous {
  val activeMqUrl: String = "tcp://localhost:61616"

  def main(args: Array[String]): Unit = {
    val connectionFactory = new ActiveMQConnectionFactory(activeMqUrl)
    val connection = connectionFactory.createConnection
    connection.setClientID("ProducerSynchronous")
    connection.start

    val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
    val sendQueue = session.createQueue("SendSynchronousMsgQueue")
    val replyQueue = session.createQueue("ReplyToSynchronousMsgQueue")

    val producer = session.createProducer(sendQueue)
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)

    val correlationId = new Random().nextString(32)
    val replyConsumer = session.createConsumer(replyQueue, "JMSCorrelationID = '%s'".format(correlationId))

    val textMessage = session.createTextMessage("Hello, please reply immediately to my message!")
    textMessage.setJMSReplyTo(replyQueue)
    textMessage.setJMSCorrelationID(correlationId)

    println("Sending message...")

    producer.send(textMessage)

    println("Waiting for reply...")

    val reply = replyConsumer.receive(1000)
    replyConsumer.close()

    reply match {
      case txt: TextMessage => println("Received reply: " + txt.getText())
      case _ => throw new Exception("Invalid Response:" + reply)
    }

    connection.close
  }
}

Synchronous Consumer
The synchronous consumer listens for messages on the send queue. When a message is received, it creates a “ReplyProducer” and connects it to the reply queue specified in the message’s “JMSReplyTo” field. The consumer then creates a reply message and copies the JMSCorrelationID from the received message. The consumer then sends the reply message with the “ReplyProducer”.

import javax.jms._
import org.apache.activemq.ActiveMQConnectionFactory

object ConsumerSynchronous {
  val activeMqUrl: String = "tcp://localhost:61616"
  
  def main(args: Array[String]): Unit = {
    val connectionFactory  = new ActiveMQConnectionFactory(activeMqUrl)
    val connection = connectionFactory.createConnection
    connection.setClientID("ConsumerSynchronous")
    connection.start

    println("Started")

    val session: Session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
    val queue  = session.createQueue("SendSynchronousMsgQueue")
    val consumer = session.createConsumer(queue)

    val listener = new MessageListener {
      def onMessage(message: Message) {
        message match {
          case text: TextMessage => {
          val replyProducer = session.createProducer(text.getJMSReplyTo())
          replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)

          println("Received message: " + text.getText)

          val replyMessage = session.createTextMessage("Yes I received your message!")
          replyMessage.setJMSCorrelationID(text.getJMSCorrelationID())

          println("Reply sent!")
            
          replyProducer.send(replyMessage)
          }
          case _ => {
            throw new Exception("Unhandled Message Type: " + message.getClass.getSimpleName)
          }
        }
      }
    }
    consumer.setMessageListener(listener)
  }
}

That’s all!

Advertisements

4 Comments »

  1. Hey good example…exactly I was looking for šŸ™‚

    Regards,
    Kanhaiya

    Comment by Kanhaiya — February 17, 2012 @ 12:11 am | Reply

    • Thanks, I am glad it helped!

      Comment by Brian — February 17, 2012 @ 12:29 am | Reply

  2. Hey can you please post other useful programs associated with the same.
    Some other useful links for Scala and ActiveMQ will also do.

    Thanks and regards,
    Kanhaiya

    Comment by Kanhaiya — February 17, 2012 @ 12:12 am | Reply

  3. https://github.com/fancellu/jmsScala

    Comment by Dino Fancellu — July 30, 2013 @ 6:10 am | Reply


RSS feed for comments on this post. TrackBack URI

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Blog at WordPress.com.

%d bloggers like this: