The first question you will probably ask is “Aren’t queues designed for the purpose of asynchronous fire and forget style messaging?”. Well, yep, they are! Your goal for scaling should be to make anything that can be asynchronous, asynchronous. However, if a message bus such as ActiveMQ is how your applications talk to each other, sometimes an immediate response is necessary (although we try to avoid this when possible). The following is a simple example in Scala showing how to send such a message with ActiveMQ.
Setup ActiveMQ
- Download the latest version ActiveMQ here. (For this post 5.5.0 was used)
- Now unpack the tar file and start ActiveMQ:
tar -zxvf apache-activemq-5.5.0-bin.tar.gz
cd apache-activemq-5.5.0/bin
chmod 755 activemq
cd ..
bin/activemq start
Get the required libraries
To compile and run the publisher and subscriber, you will need the following two libraries on your class path:
- From the apache-activemq-5.5.0 folder get the jar file: activemq-all-5-5-0.jar
- You will also need SLF4J (Simple Logging Facade for Java): slf4j-simple-1.5.11.jar
- NOTE: version 1.6+ will not work with the current version of ActiveMQ (5.5.0)
- You will need to download version 1.5.11 here
NOTE: For this example I used scala 2.9.0.
Synchronous Producer
The main difference between an asynchronous and synchronous producer is that the synchronous producer has two queues: a send queue and a reply queue. The send queue is the queue on which the producer will send a message to the consumer. The reply queue is the queue on which the producer will listen for a reply from the consumer. The producer when it sends a message sets two important pieces of information on the message:
- JMSCorrelationID: This is the uniqueID used by the producer to indentify the message
- JMSReplyTo: This tells the consumer on which queue to send the message reply
The producer then creates a “ReplyConsumer” on the reply queue and listens for a reply from the consumer that contains that “JMSCorrelationID”. When a message with that ID appears on the reply queue, the ReplyConsumer will receive that message and our synchronous message round trip has been completed!
import javax.jms._
import org.apache.activemq.ActiveMQConnectionFactory
import util.Random
object ProducerSynchronous {
val activeMqUrl: String = "tcp://localhost:61616"
def main(args: Array[String]): Unit = {
val connectionFactory = new ActiveMQConnectionFactory(activeMqUrl)
val connection = connectionFactory.createConnection
connection.setClientID("ProducerSynchronous")
connection.start
val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
val sendQueue = session.createQueue("SendSynchronousMsgQueue")
val replyQueue = session.createQueue("ReplyToSynchronousMsgQueue")
val producer = session.createProducer(sendQueue)
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
val correlationId = new Random().nextString(32)
val replyConsumer = session.createConsumer(replyQueue, "JMSCorrelationID = '%s'".format(correlationId))
val textMessage = session.createTextMessage("Hello, please reply immediately to my message!")
textMessage.setJMSReplyTo(replyQueue)
textMessage.setJMSCorrelationID(correlationId)
println("Sending message...")
producer.send(textMessage)
println("Waiting for reply...")
val reply = replyConsumer.receive(1000)
replyConsumer.close()
reply match {
case txt: TextMessage => println("Received reply: " + txt.getText())
case _ => throw new Exception("Invalid Response:" + reply)
}
connection.close
}
}
Synchronous Consumer
The synchronous consumer listens for messages on the send queue. When a message is received, it creates a “ReplyProducer” and connects it to the reply queue specified in the message’s “JMSReplyTo” field. The consumer then creates a reply message and copies the JMSCorrelationID from the received message. The consumer then sends the reply message with the “ReplyProducer”.
import javax.jms._
import org.apache.activemq.ActiveMQConnectionFactory
object ConsumerSynchronous {
val activeMqUrl: String = "tcp://localhost:61616"
def main(args: Array[String]): Unit = {
val connectionFactory = new ActiveMQConnectionFactory(activeMqUrl)
val connection = connectionFactory.createConnection
connection.setClientID("ConsumerSynchronous")
connection.start
println("Started")
val session: Session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
val queue = session.createQueue("SendSynchronousMsgQueue")
val consumer = session.createConsumer(queue)
val listener = new MessageListener {
def onMessage(message: Message) {
message match {
case text: TextMessage => {
val replyProducer = session.createProducer(text.getJMSReplyTo())
replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
println("Received message: " + text.getText)
val replyMessage = session.createTextMessage("Yes I received your message!")
replyMessage.setJMSCorrelationID(text.getJMSCorrelationID())
println("Reply sent!")
replyProducer.send(replyMessage)
}
case _ => {
throw new Exception("Unhandled Message Type: " + message.getClass.getSimpleName)
}
}
}
}
consumer.setMessageListener(listener)
}
}
That’s all!
Hey good example…exactly I was looking for
Regards,
Kanhaiya
Comment by Kanhaiya — February 17, 2012 @ 12:11 am |
Thanks, I am glad it helped!
Comment by Brian — February 17, 2012 @ 12:29 am |
Hey can you please post other useful programs associated with the same.
Some other useful links for Scala and ActiveMQ will also do.
Thanks and regards,
Kanhaiya
Comment by Kanhaiya — February 17, 2012 @ 12:12 am |