My Adventures in Coding

October 15, 2011

JMS – How to do Synchronous Messaging with ActiveMQ in Scala

Filed under: ActiveMQ,JMS,Scala — Brian @ 6:49 pm
Tags: , , ,

The first question you will probably ask is “Aren’t queues designed for the purpose of asynchronous fire and forget style messaging?”. Well, yep, they are! Your goal for scaling should be to make anything that can be asynchronous, asynchronous. However, if a message bus such as ActiveMQ is how your applications talk to each other, sometimes an immediate response is necessary (although we try to avoid this when possible). The following is a simple example in Scala showing how to send such a message with ActiveMQ.

Setup ActiveMQ

  • Download the latest version ActiveMQ here. (For this post 5.5.0 was used)
  • Now unpack the tar file and start ActiveMQ:
  •          tar -zxvf apache-activemq-5.5.0-bin.tar.gz
             cd apache-activemq-5.5.0/bin
             chmod 755 activemq
             cd ..
             bin/activemq start
           
  • You should now be able to access the ActiveMQ admin page: http://localhost:8161/admin/

Get the required libraries
To compile and run the publisher and subscriber, you will need the following two libraries on your class path:

  • From the apache-activemq-5.5.0 folder get the jar file: activemq-all-5-5-0.jar
  • You will also need SLF4J (Simple Logging Facade for Java): slf4j-simple-1.5.11.jar
    • NOTE: version 1.6+ will not work with the current version of ActiveMQ (5.5.0)
    • You will need to download version 1.5.11 here

NOTE: For this example I used scala 2.9.0.

Synchronous Producer
The main difference between an asynchronous and synchronous producer is that the synchronous producer has two queues: a send queue and a reply queue. The send queue is the queue on which the producer will send a message to the consumer. The reply queue is the queue on which the producer will listen for a reply from the consumer. The producer when it sends a message sets two important pieces of information on the message:

  1. JMSCorrelationID: This is the uniqueID used by the producer to indentify the message
  2. JMSReplyTo: This tells the consumer on which queue to send the message reply

The producer then creates a “ReplyConsumer” on the reply queue and listens for a reply from the consumer that contains that “JMSCorrelationID”. When a message with that ID appears on the reply queue, the ReplyConsumer will receive that message and our synchronous message round trip has been completed!

import javax.jms._
import org.apache.activemq.ActiveMQConnectionFactory
import util.Random

object ProducerSynchronous {
  val activeMqUrl: String = "tcp://localhost:61616"

  def main(args: Array[String]): Unit = {
    val connectionFactory = new ActiveMQConnectionFactory(activeMqUrl)
    val connection = connectionFactory.createConnection
    connection.setClientID("ProducerSynchronous")
    connection.start

    val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
    val sendQueue = session.createQueue("SendSynchronousMsgQueue")
    val replyQueue = session.createQueue("ReplyToSynchronousMsgQueue")

    val producer = session.createProducer(sendQueue)
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)

    val correlationId = new Random().nextString(32)
    val replyConsumer = session.createConsumer(replyQueue, "JMSCorrelationID = '%s'".format(correlationId))

    val textMessage = session.createTextMessage("Hello, please reply immediately to my message!")
    textMessage.setJMSReplyTo(replyQueue)
    textMessage.setJMSCorrelationID(correlationId)

    println("Sending message...")

    producer.send(textMessage)

    println("Waiting for reply...")

    val reply = replyConsumer.receive(1000)
    replyConsumer.close()

    reply match {
      case txt: TextMessage => println("Received reply: " + txt.getText())
      case _ => throw new Exception("Invalid Response:" + reply)
    }

    connection.close
  }
}

Synchronous Consumer
The synchronous consumer listens for messages on the send queue. When a message is received, it creates a “ReplyProducer” and connects it to the reply queue specified in the message’s “JMSReplyTo” field. The consumer then creates a reply message and copies the JMSCorrelationID from the received message. The consumer then sends the reply message with the “ReplyProducer”.

import javax.jms._
import org.apache.activemq.ActiveMQConnectionFactory

object ConsumerSynchronous {
  val activeMqUrl: String = "tcp://localhost:61616"
  
  def main(args: Array[String]): Unit = {
    val connectionFactory  = new ActiveMQConnectionFactory(activeMqUrl)
    val connection = connectionFactory.createConnection
    connection.setClientID("ConsumerSynchronous")
    connection.start

    println("Started")

    val session: Session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
    val queue  = session.createQueue("SendSynchronousMsgQueue")
    val consumer = session.createConsumer(queue)

    val listener = new MessageListener {
      def onMessage(message: Message) {
        message match {
          case text: TextMessage => {
          val replyProducer = session.createProducer(text.getJMSReplyTo())
          replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)

          println("Received message: " + text.getText)

          val replyMessage = session.createTextMessage("Yes I received your message!")
          replyMessage.setJMSCorrelationID(text.getJMSCorrelationID())

          println("Reply sent!")
            
          replyProducer.send(replyMessage)
          }
          case _ => {
            throw new Exception("Unhandled Message Type: " + message.getClass.getSimpleName)
          }
        }
      }
    }
    consumer.setMessageListener(listener)
  }
}

That’s all!

Advertisements

October 2, 2011

MongoDB – Geospatial Queries

Filed under: MongoDB — Brian @ 10:11 pm
Tags: , ,

Most of the data we work with has geographic longitude/latitude information so we have recently started exploring doing some work with geospatial queries using different database technologies (MySQL, Oracle). We currently use MongoDB as our primary data store and we were surprised to find out that MongoDB already has this functionality built in. After doing some reading we also realized that it handles each of our use cases. Even though we use MongoDB everyday, we just never thought a document store would come with this functionality. For more information go to MongoDB Geospatial Indexing.

Download and Setup MongoDB

For this tutorial, we will just use the MongoDB interactive shell for interacting with the database because it is simple. There is no need to complicate this example by using the MongoDB Scala or Python drivers. They are simple to use once you gain an understanding of MongoDB. The basics are that MongoDB is a document store, data is stored as JSON documents, and queries are made by using subsets of the JSON from the documents you wish to match, with query commands mixed in.

Download MongoDB (Downloads):

wget http://fastdl.mongodb.org/osx/mongodb-osx-x86_64-2.0.0.tgz
tar -zxvf mongodb-osx-x86_64-2.0.0.tgz

Create a folder for MongoDB to store it’s database files (We are just using the default location for this example):

mkdir -p /data/db

NOTE: On windows this would be “C:\data\db”.

Start the MongoDB database:

cd mongodb-osx-x86_64-2.0.0/bin
./mongod

MongoDB provides an interactive shell which can be used to query your MongoDB database. For the rest of this tutorial, we will use the interactive shell.

Start the interactive shell:

cd mongodb-osx-x86_64-2.0.0/bin
./mongo
MongoDB shell version: 2.0.0
connecting to: test
>

You should see the shell start up and display “connecting to: Test”. This means you are connected to the default database “Test” which will be fine for this tutorial.

That’s all, we are ready to explore geospatial querying in MongoDB!

Geospatial indexes and queries

Let’s for these examples assume we are creating a database for a website where a customer can browse and search for automotive dealerships in a given area by different types of map configurations. Let’s explore the most common use cases for retrieving dealership information by a geospatial query.

Defining documents with geospatial co-ordinates

The latitude/longitude elements in a document must be stored in a field called “loc” and follow a certain format. Either it can be stored as an array of two elements such as “loc:[51,-114]” or as a dictionary with two elements such as “loc:{lat:51,lon:-114}”. I decided to use the dictionary since it more closely matches our existing data. So let’s create some documents for car dealerships that each contain latitude/longitude information.

db.dealerships.save({"name":"Frank's Fords", "affiliation":"Ford", "loc":{"lon":51.10682735591432,"lat":-114.11773681640625}})
db.dealerships.save({"name":"Steve's Suzukis", "affiliation":"Suzuki", "loc":{"lon":51.09144802136697,"lat":-114.11773681640625}})
db.dealerships.save({"name":"Charlie's Chevrolets", "affiliation":"Chevrolet", "loc":{"lon":51.08282186160978,"lat":-114.10400390625}})
db.dealerships.save({"name":"Nick's Nissans", "affiliation":"Nissan", "loc":{"lon":51.12076493195686,"lat":-113.98040771484375}})
db.dealerships.save({"name":"Tom's Toyotas", "affiliation":"Toyota", "loc":{"lon":50.93939251390387,"lat":-113.98040771484375}})

Create the geospatial index

Now, inorder to query by geo co-ordinates, we need to create an index over the “loc” field of our dealership documents.

db.dealerships.ensureIndex({loc:"2d"})

Common Use Cases

What if I want the two dealerships closest to a specific co-ordinate?

This can be done using the “near” and “limit” query options. This query finds the points closest to the co-ordinate provided and returns them sorted by distance from the point given (Yep, MongoDB handles that all for you, returns the data exactly as you would expect).

db.dealerships.find({loc: {$near:[51,-114]}}).limit(2)

Query returns:

{ "_id" : ObjectId("4e8927066f9caf7713a8421b"), "name" : "Tom's Toyotas", "affiliation" : "Toyota", "loc" : { "lon" : 50.93939251390387, "lat" : -113.98040771484375 } }
{ "_id" : ObjectId("4e8926f96f9caf7713a8421a"), "name" : "Nick's Nissans", "affiliation" : "Nissan", "loc" : { "lon" : 51.12076493195686, "lat" : -113.98040771484375 } }
What if I want to filter by dealership affiliation in the query?

No problem, the MongoDB people have thought of that as well. They call these “Compound Indexes”. When creating the geospatial index you can also include other fields in your document in that index. So for example if you wanted to have your application query for all “Ford” affiliated dealerships available close to the co-ordinates provided, you would create the following index:

Add the Compound index:

db.dealerships.ensureIndex({loc:"2d", affiliation:1})

Then your application would be able to query by dealership affiliation as well:

db.dealerships.find({loc: {$near:[51,-114]}, "affiliation":"Ford"})

Query returns:

{ "_id" : ObjectId("4e8926696f9caf7713a84215"), "name" : "Frank's Fords", "affiliation" : "Ford", "loc" : { "lon" : 51.10682735591432, "lat" : -114.11773681640625 } }

You can see the value in being able to do these geospatial queries so easily. For example, if your website has a map, showing dealership locations, the customer can click on and zoom in on any area of the map. When they do, the items displayed on the map will be refreshed based on a geospatial query, returning the N number of items closest to the point selected. Of course, this can also be filtered further by allowing the customer to select filter criteria such as “Affiliation”.

What if I want to search for all dealerships within a given area of town?

Well MongoDB handles that as well with “Bounded Queries”. With bounded queries you can use either a rectangle, circle, or polygon. Since areas of cities are best represented by a polygon, we will use that for this example.

Let’s define a polygon for a specific area of town:

areaoftown = { a : { x : 51.12335082548444, y : -114.19052124023438 }, b : { x : 51.11904092252057, y : -114.05593872070312 }, c : { x : 51.02325750523972, y : -114.02435302734375 }, d : { x : 51.01634653617311, y : -114.1644287109375 } }

Once this polygon has been defined, we can then search our dealerships collection for dealers that fall within this boundary.

db.dealerships.find({ "loc" : { "$within" : { "$polygon" : areaoftown } } })

NOTE: Polygon searches are only available in versions >=1.9

Query returns:

{ "_id" : ObjectId("4e892d8c7f369ee980a3662b"), "name" : "Charlie's Chevrolets", "affiliation" : "Chevrolet", "loc" : { "lon" : 51.08282186160978, "lat" : -114.10400390625 } }
{ "_id" : ObjectId("4e892d797f369ee980a36629"), "name" : "Frank's Fords", "affiliation" : "Ford", "loc" : { "lon" : 51.10682735591432, "lat" : -114.11773681640625 } }
{ "_id" : ObjectId("4e892d837f369ee980a3662a"), "name" : "Steve's Suzukis", "affiliation" : "Suzuki", "loc" : { "lon" : 51.09144802136697, "lat" : -114.11773681640625 } }

MongoDB makes this simple and easy to use, good job!

Create a free website or blog at WordPress.com.