My Adventures in Coding

December 30, 2011

Scala – How to combine two lists of different types so they can be sorted by a common field

Filed under: Scala — Brian @ 1:48 pm

Recently I was pairing with a co-worker on a new view for our application. The purpose of this view was just to show a timeline of all activities (regardless of type) occurring in the system. We were confident there would be a way to accomplish this task easily in Scala!

For this example lets say we are writing a program for a car dealership. The manager of the dealership would like a page showing test drives and sales throughout the day. The point of this page is not to connect test drives to sales, but instead a view used to give a timeline of the activity taking place on the car lot.

In the example below, even though we have two lists of different types testDrives=List[TestDrives] and purchases=List[Purchases], they both have a common field which is “date“. To do the sort we first combine the two lists with (testDrives ::: purchases), creating a new list of type List[{def date: Date}]. The objects in the new list all have “date“. So now sorting the list is easy!

import scala.collection.JavaConversions._
import java.util.Date

case class TestDrive(car: String, priceQuoted: Int, date: Date)
case class Purchase(car: String, priceSold: Int, date: Date)

val date = new Date
val dateFormat = new java.text.SimpleDateFormat("yyyy/MM/dd HH:mm:ss")

val testDrives = List(TestDrive("Charger", 22000, new Date(date.getTime - 1000000)), TestDrive("Camero", 250000, new Date(date.getTime - 4000000)), TestDrive("Pony", 2500, new Date(date.getTime - 10000000)))

val purchases = List(Purchase("Infiniti G35x", 19500, new Date(date.getTime - 500000)), Purchase("Infiniti G37x", 28500, new Date(date.getTime - 6000000)), Purchase("Altima", 35000, new Date(date.getTime - 9000000)))

// Combine the TestDrives and Purchases lists and sort descending by date
for (something <- (testDrives ::: purchases).asInstanceOf[List[{def date: Date}]].sortBy(_.date).reverse) {
	// Now if we want to show fields other than "date", we can easily check the type of each object, and cast it appropriately
	if (something.isInstanceOf[TestDrive]) {
		val testDrive = something.asInstanceOf[TestDrive]
		println("Test Drive: %s\tQuoted price: %d \t Date: %s".format(testDrive.car, testDrive.priceQuoted, dateFormat.format(testDrive.date)))
	}
	else {
		val purchase = something.asInstanceOf[Purchase]
		println("Purchase: %s\tSold price: %d \t Date: %s".format(purchase.car, purchase.priceSold, dateFormat.format(purchase.date)))
	}
}

The program creates the following output, all test drives and purchases combined into a single timeline sorted descending by date.

>scala example.scala
Purchase: Infiniti G35x Sold price: 19500        Date: 2011/12/23 14:48:41
Test Drive: Charger     Quoted price: 22000      Date: 2011/12/23 14:40:21
Test Drive: Camero      Quoted price: 250000     Date: 2011/12/23 13:50:21
Purchase: Infiniti G37x Sold price: 28500        Date: 2011/12/23 13:17:01
Purchase: Altima        Sold price: 35000        Date: 2011/12/23 12:27:01
Test Drive: Pony        Quoted price: 2500       Date: 2011/12/23 12:10:21

That is all!

December 22, 2011

Linux – How to ssh between two linux computers without needing a password

Filed under: Linux — Brian @ 10:44 pm
Tags: ,

Having to constantly type in your password on a linux server that you ssh to often can get to be an annoyance. Luckily this is an easy problem to solve. Since I always end up forgetting how to do this setup, I thought I would finally write this down, even if just for my own reference :) .

1. ssh to server1

Connect to server1 and generate a public/private key pair.

 
ssh myusername@server1
password:
ssh-keygen -t rsa

When you run this command you will be prompted to answer several questions. Just hit enter each time until you are returned to a prompt.

Generating public/private rsa key pair.
Enter file in which to save the key (/home/local/myusername/.ssh/id_rsa): 
Created directory '/home/local/myusername/.ssh'.
Enter passphrase (empty for no passphrase): 
Enter same passphrase again: 
Your identification has been saved in /home/local/myusername/.ssh/id_rsa.
Your public key has been saved in /home/local/myusername/.ssh/id_rsa.pub.
The key fingerprint is:
15:68:47:67:0d:40:e1:7c:9a:1c:25:18:be:ab:f1:3a myusername@server1
The key's randomart image is:
+--[ RSA 2048]----+
|        .*Bo=o   |
|       .+o.*  .  |
|       ...= .    |
|         + =     |
|        S +      |
|         .       |
|      . .        |
|      E+         |
|      oo.        |
+-----------------+

Now you will need to copy the public key you just generated and save it somewhere, you will need it later. Also ensure when you copy the key that the text is all on one line, if there are line breaks in the text, it will cause problems later when you try and use the key.

cd .ssh
cat id_rsa.pub
ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEAyFS7YkakcjdyCDOKpE4RrBecRUWShgmwWnxhbVNHmDtJtK
PqdiLcsVG5PO94hv3A0QqlB1MX33vnP6HzPPS7L4Bq+5plSTyNHiDBIqmZqVVxRbRUKbP44BaA9RsW2ROu
8qdzmXRPupkyFBBOLa23RJJojBieFGygR2OwjS8cq0kpZh1I3c1fbU9I5j38baUK0naTBe2v7s/C8allnJ
hwkfds+Q9/kjaV55pMZIh+9jhoA8acCA6B55DYrgPSycW6fEyV/1PIER+a5lOXp1QCn0U+XFTb85dp5fW0
/rUnu0F9nBJFlo7Rvc1cMuSUiul/wvJ8tzlOhU8FUlHvHqoUUw== myusername@server1

2. ssh to server2

Now we will copy the public key from server1 to server2.

ssh myusername@server2
password:
mkdir .ssh
cd .ssh
vi authorized_keys
# paste the public key
chmod 600 authorized_keys

3. Test that your setup is working

ssh myusername@server1
password:
ssh myusername@server2
# you should not be prompted for a password!

That is all! (Thanks Dave!)

November 22, 2011

Scala – Handling failure in Actors with Akka Supervisors

Filed under: Actors,Scala — Brian @ 10:53 pm
Tags: , , ,

We have been using Akka Actors in all of our Scala projects for about a year now and have been very impressed. However, the problem with applications that have concurrently running Actors is that if an exception occurs and the Actor dies, the application may appear to still be up and running, but an Actor is now missing and know one knows this has happened. What we want in our application is to follow the “Let it fail” design philosophy by being able to restart an actor if one happens to fail. We want to guarantee that every Actor in the application is alive and well, even if an exception occurs. The easy way to obtain this type of fault tolerance in concurrent code is by using Actor Supervisors. The documentation on the Akka site is excellent, I highly recommend you read through it.

The basic idea is that the Supervisor is responsible for managing any Actors you ask it to manage. The supervisor starts the Actors it has been told to manage and in the event of an Actor crashing or being stopped, the supervisor will immediately restart the Actor for you.

The following is a simple example showing how to use a supervisor to start an Actor, that will automatically restart the Actor if an exception is thrown. To run the example you will need to download the Akka Actors library from the Akka Downloads page. Just download the zip, unzip it, and put the akka-actor-1.2.jar file from the lib/akka folder on your class path.

NOTE: This example was written using: Akka-Actor 1.2 and Scala 2.9.1

import akka.actor.Actor._
import akka.actor.{SupervisorFactory, Actor, Supervisor}
import akka.config.Supervision._

case object AreYouAlive
case object Kaboom

// Simple Actor that handles two types of messages: AreYouAlive and Kaboom
class SomeActor extends Actor {
  def receive = {
    case AreYouAlive => println("Yes I am alive!")
    case Kaboom => throw new RuntimeException("Kaboom!!!")
  }
}

object SomeApp extends App {
  val someActor = actorOf[SomeActor]

  // Setup the supervisorFactory with a config and a list of Actors to supervise
  val supervisorFactory = SupervisorFactory(
    SupervisorConfig(OneForOneStrategy(List(classOf[Exception]), 3, 1000),
    Supervise(someActor, Permanent) :: Nil)
  )
  // Instantiate and start the supervisor, this also starts all supervised Actors
  val supervisor = supervisorFactory.newInstance
  supervisor.start

  // Ping the Actor, yep it is working
  someActor ! AreYouAlive

  // Send the Kaboom message, oh no, the Actor will crash
  someActor ! Kaboom
  
  Thread.sleep(1000)

  // Wait, our Actor is alive and well again
  someActor ! AreYouAlive
  supervisor.shutdown()
}

Our Actor supervisor example will produce the following output:

Yes I am alive!
[ERROR]   [11/22/11 10:30 PM] [akka:event-driven:dispatcher:global-1] [LocalActorRef] Kaboom!!!
java.lang.RuntimeException: Kaboom!!!
	at SomeActor$$anonfun$receive$1.apply(ActorSupervisorExample.scala:12)
	at SomeActor$$anonfun$receive$1.apply(ActorSupervisorExample.scala:10)
	at akka.actor.Actor$class.apply(Actor.scala:545)
	at SomeActor.apply(ActorSupervisorExample.scala:9)
	at akka.actor.LocalActorRef.invoke(ActorRef.scala:905)
	at akka.dispatch.MessageInvocation.invoke(MessageHandling.scala:25)
	at akka.dispatch.ExecutableMailbox$class.processMailbox(ExecutorBasedEventDrivenDispatcher.scala:216)
	at akka.dispatch.ExecutorBasedEventDrivenDispatcher$$anon$4.processMailbox(ExecutorBasedEventDrivenDispatcher.scala:122)
	at akka.dispatch.ExecutableMailbox$class.run(ExecutorBasedEventDrivenDispatcher.scala:188)
	at akka.dispatch.ExecutorBasedEventDrivenDispatcher$$anon$4.run(ExecutorBasedEventDrivenDispatcher.scala:122)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
	at java.lang.Thread.run(Thread.java:680)
	at akka.dispatch.MonitorableThread.run(ThreadPoolBuilder.scala:184)

Yes I am alive!

As you can see, even though the Actor threw an exception when it tried to handle the Kaboom message, the Actor was restarted and back to work right away. Great stuff!

I highly recommend reading the Fault Tolerance Through Supervisor Hierarchies documentation on the Akka website.

October 15, 2011

JMS – How to do Synchronous Messaging with ActiveMQ in Scala

Filed under: ActiveMQ,JMS,Scala — Brian @ 6:49 pm
Tags: , , ,

The first question you will probably ask is “Aren’t queues designed for the purpose of asynchronous fire and forget style messaging?”. Well, yep, they are! Your goal for scaling should be to make anything that can be asynchronous, asynchronous. However, if a message bus such as ActiveMQ is how your applications talk to each other, sometimes an immediate response is necessary (although we try to avoid this when possible). The following is a simple example in Scala showing how to send such a message with ActiveMQ.

Setup ActiveMQ

  • Download the latest version ActiveMQ here. (For this post 5.5.0 was used)
  • Now unpack the tar file and start ActiveMQ:
  •          tar -zxvf apache-activemq-5.5.0-bin.tar.gz
             cd apache-activemq-5.5.0/bin
             chmod 755 activemq
             cd ..
             bin/activemq start
           

  • You should now be able to access the ActiveMQ admin page: http://localhost:8161/admin/

Get the required libraries
To compile and run the publisher and subscriber, you will need the following two libraries on your class path:

  • From the apache-activemq-5.5.0 folder get the jar file: activemq-all-5-5-0.jar
  • You will also need SLF4J (Simple Logging Facade for Java): slf4j-simple-1.5.11.jar
    • NOTE: version 1.6+ will not work with the current version of ActiveMQ (5.5.0)
    • You will need to download version 1.5.11 here

NOTE: For this example I used scala 2.9.0.

Synchronous Producer
The main difference between an asynchronous and synchronous producer is that the synchronous producer has two queues: a send queue and a reply queue. The send queue is the queue on which the producer will send a message to the consumer. The reply queue is the queue on which the producer will listen for a reply from the consumer. The producer when it sends a message sets two important pieces of information on the message:

  1. JMSCorrelationID: This is the uniqueID used by the producer to indentify the message
  2. JMSReplyTo: This tells the consumer on which queue to send the message reply

The producer then creates a “ReplyConsumer” on the reply queue and listens for a reply from the consumer that contains that “JMSCorrelationID”. When a message with that ID appears on the reply queue, the ReplyConsumer will receive that message and our synchronous message round trip has been completed!

import javax.jms._
import org.apache.activemq.ActiveMQConnectionFactory
import util.Random

object ProducerSynchronous {
  val activeMqUrl: String = "tcp://localhost:61616"

  def main(args: Array[String]): Unit = {
    val connectionFactory = new ActiveMQConnectionFactory(activeMqUrl)
    val connection = connectionFactory.createConnection
    connection.setClientID("ProducerSynchronous")
    connection.start

    val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
    val sendQueue = session.createQueue("SendSynchronousMsgQueue")
    val replyQueue = session.createQueue("ReplyToSynchronousMsgQueue")

    val producer = session.createProducer(sendQueue)
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)

    val correlationId = new Random().nextString(32)
    val replyConsumer = session.createConsumer(replyQueue, "JMSCorrelationID = '%s'".format(correlationId))

    val textMessage = session.createTextMessage("Hello, please reply immediately to my message!")
    textMessage.setJMSReplyTo(replyQueue)
    textMessage.setJMSCorrelationID(correlationId)

    println("Sending message...")

    producer.send(textMessage)

    println("Waiting for reply...")

    val reply = replyConsumer.receive(1000)
    replyConsumer.close()

    reply match {
      case txt: TextMessage => println("Received reply: " + txt.getText())
      case _ => throw new Exception("Invalid Response:" + reply)
    }

    connection.close
  }
}

Synchronous Consumer
The synchronous consumer listens for messages on the send queue. When a message is received, it creates a “ReplyProducer” and connects it to the reply queue specified in the message’s “JMSReplyTo” field. The consumer then creates a reply message and copies the JMSCorrelationID from the received message. The consumer then sends the reply message with the “ReplyProducer”.

import javax.jms._
import org.apache.activemq.ActiveMQConnectionFactory

object ConsumerSynchronous {
  val activeMqUrl: String = "tcp://localhost:61616"
  
  def main(args: Array[String]): Unit = {
    val connectionFactory  = new ActiveMQConnectionFactory(activeMqUrl)
    val connection = connectionFactory.createConnection
    connection.setClientID("ConsumerSynchronous")
    connection.start

    println("Started")

    val session: Session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
    val queue  = session.createQueue("SendSynchronousMsgQueue")
    val consumer = session.createConsumer(queue)

    val listener = new MessageListener {
      def onMessage(message: Message) {
        message match {
          case text: TextMessage => {
          val replyProducer = session.createProducer(text.getJMSReplyTo())
          replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)

          println("Received message: " + text.getText)

          val replyMessage = session.createTextMessage("Yes I received your message!")
          replyMessage.setJMSCorrelationID(text.getJMSCorrelationID())

          println("Reply sent!")
            
          replyProducer.send(replyMessage)
          }
          case _ => {
            throw new Exception("Unhandled Message Type: " + message.getClass.getSimpleName)
          }
        }
      }
    }
    consumer.setMessageListener(listener)
  }
}

That’s all!

October 2, 2011

MongoDB – Geospatial Queries

Filed under: MongoDB — Brian @ 10:11 pm
Tags: , ,

Most of the data we work with has geographic longitude/latitude information so we have recently started exploring doing some work with geospatial queries using different database technologies (MySQL, Oracle). We currently use MongoDB as our primary data store and we were surprised to find out that MongoDB already has this functionality built in. After doing some reading we also realized that it handles each of our use cases. Even though we use MongoDB everyday, we just never thought a document store would come with this functionality. For more information go to MongoDB Geospatial Indexing.

Download and Setup MongoDB

For this tutorial, we will just use the MongoDB interactive shell for interacting with the database because it is simple. There is no need to complicate this example by using the MongoDB Scala or Python drivers. They are simple to use once you gain an understanding of MongoDB. The basics are that MongoDB is a document store, data is stored as JSON documents, and queries are made by using subsets of the JSON from the documents you wish to match, with query commands mixed in.

Download MongoDB (Downloads):

wget http://fastdl.mongodb.org/osx/mongodb-osx-x86_64-2.0.0.tgz
tar -zxvf mongodb-osx-x86_64-2.0.0.tgz

Create a folder for MongoDB to store it’s database files (We are just using the default location for this example):

mkdir -p /data/db

NOTE: On windows this would be “C:\data\db”.

Start the MongoDB database:

cd mongodb-osx-x86_64-2.0.0/bin
./mongod

MongoDB provides an interactive shell which can be used to query your MongoDB database. For the rest of this tutorial, we will use the interactive shell.

Start the interactive shell:

cd mongodb-osx-x86_64-2.0.0/bin
./mongo
MongoDB shell version: 2.0.0
connecting to: test
>

You should see the shell start up and display “connecting to: Test”. This means you are connected to the default database “Test” which will be fine for this tutorial.

That’s all, we are ready to explore geospatial querying in MongoDB!

Geospatial indexes and queries

Let’s for these examples assume we are creating a database for a website where a customer can browse and search for automotive dealerships in a given area by different types of map configurations. Let’s explore the most common use cases for retrieving dealership information by a geospatial query.

Defining documents with geospatial co-ordinates

The latitude/longitude elements in a document must be stored in a field called “loc” and follow a certain format. Either it can be stored as an array of two elements such as “loc:[51,-114]” or as a dictionary with two elements such as “loc:{lat:51,lon:-114}”. I decided to use the dictionary since it more closely matches our existing data. So let’s create some documents for car dealerships that each contain latitude/longitude information.

db.dealerships.save({"name":"Frank's Fords", "affiliation":"Ford", "loc":{"lon":51.10682735591432,"lat":-114.11773681640625}})
db.dealerships.save({"name":"Steve's Suzukis", "affiliation":"Suzuki", "loc":{"lon":51.09144802136697,"lat":-114.11773681640625}})
db.dealerships.save({"name":"Charlie's Chevrolets", "affiliation":"Chevrolet", "loc":{"lon":51.08282186160978,"lat":-114.10400390625}})
db.dealerships.save({"name":"Nick's Nissans", "affiliation":"Nissan", "loc":{"lon":51.12076493195686,"lat":-113.98040771484375}})
db.dealerships.save({"name":"Tom's Toyotas", "affiliation":"Toyota", "loc":{"lon":50.93939251390387,"lat":-113.98040771484375}})

Create the geospatial index

Now, inorder to query by geo co-ordinates, we need to create an index over the “loc” field of our dealership documents.

db.dealerships.ensureIndex({loc:"2d"})

Common Use Cases

What if I want the two dealerships closest to a specific co-ordinate?

This can be done using the “near” and “limit” query options. This query finds the points closest to the co-ordinate provided and returns them sorted by distance from the point given (Yep, MongoDB handles that all for you, returns the data exactly as you would expect).

db.dealerships.find({loc: {$near:[51,-114]}}).limit(2)

Query returns:

{ "_id" : ObjectId("4e8927066f9caf7713a8421b"), "name" : "Tom's Toyotas", "affiliation" : "Toyota", "loc" : { "lon" : 50.93939251390387, "lat" : -113.98040771484375 } }
{ "_id" : ObjectId("4e8926f96f9caf7713a8421a"), "name" : "Nick's Nissans", "affiliation" : "Nissan", "loc" : { "lon" : 51.12076493195686, "lat" : -113.98040771484375 } }

What if I want to filter by dealership affiliation in the query?

No problem, the MongoDB people have thought of that as well. They call these “Compound Indexes”. When creating the geospatial index you can also include other fields in your document in that index. So for example if you wanted to have your application query for all “Ford” affiliated dealerships available close to the co-ordinates provided, you would create the following index:

Add the Compound index:

db.dealerships.ensureIndex({loc:"2d", affiliation:1})

Then your application would be able to query by dealership affiliation as well:

db.dealerships.find({loc: {$near:[51,-114]}, "affiliation":"Ford"})

Query returns:

{ "_id" : ObjectId("4e8926696f9caf7713a84215"), "name" : "Frank's Fords", "affiliation" : "Ford", "loc" : { "lon" : 51.10682735591432, "lat" : -114.11773681640625 } }

You can see the value in being able to do these geospatial queries so easily. For example, if your website has a map, showing dealership locations, the customer can click on and zoom in on any area of the map. When they do, the items displayed on the map will be refreshed based on a geospatial query, returning the N number of items closest to the point selected. Of course, this can also be filtered further by allowing the customer to select filter criteria such as “Affiliation”.

What if I want to search for all dealerships within a given area of town?

Well MongoDB handles that as well with “Bounded Queries”. With bounded queries you can use either a rectangle, circle, or polygon. Since areas of cities are best represented by a polygon, we will use that for this example.

Let’s define a polygon for a specific area of town:

areaoftown = { a : { x : 51.12335082548444, y : -114.19052124023438 }, b : { x : 51.11904092252057, y : -114.05593872070312 }, c : { x : 51.02325750523972, y : -114.02435302734375 }, d : { x : 51.01634653617311, y : -114.1644287109375 } }

Once this polygon has been defined, we can then search our dealerships collection for dealers that fall within this boundary.

db.dealerships.find({ "loc" : { "$within" : { "$polygon" : areaoftown } } })

NOTE: Polygon searches are only available in versions >=1.9

Query returns:

{ "_id" : ObjectId("4e892d8c7f369ee980a3662b"), "name" : "Charlie's Chevrolets", "affiliation" : "Chevrolet", "loc" : { "lon" : 51.08282186160978, "lat" : -114.10400390625 } }
{ "_id" : ObjectId("4e892d797f369ee980a36629"), "name" : "Frank's Fords", "affiliation" : "Ford", "loc" : { "lon" : 51.10682735591432, "lat" : -114.11773681640625 } }
{ "_id" : ObjectId("4e892d837f369ee980a3662a"), "name" : "Steve's Suzukis", "affiliation" : "Suzuki", "loc" : { "lon" : 51.09144802136697, "lat" : -114.11773681640625 } }

MongoDB makes this simple and easy to use, good job!

September 29, 2011

Scala – Hello World REST API with Scalatra and Simple Build Tool

Filed under: REST,Scala,Simple Build Tool — Brian @ 9:00 pm
Tags: , ,

I am new to Simple Build Tool and also to building REST APIs in Scala. Mostly I have been using Python with Bottle for REST APIs up until this point. For our current project our REST API will be written in Scala, so I wanted to try out Scalatra. They have excellent documentation and examples, however they assume you have a good understanding of Simple Build Tool (known as sbt or xsbt). I will walk you through running the Scalatra example with Simple Build Tool, assuming you are using it for the first time.

The example we are using is the main example from Scalatra’s website http://www.scalatra.org/.

Get the example application

Clone the Scalatra Hello World REST API example application using git.

git clone http://github.com/scalatra/scalatra-sbt-prototype.git

Download Simple Build Tool

Download version 10 (used for the Scalatra example) of Simple Build Tool into the “scalatra-sbt-prototype” directory.

cd scalatra-sbt-prototype
wget http://typesafe.artifactoryonline.com/typesafe/ivy-releases/org.scala-tools.sbt/sbt-launch/0.10.0/sbt-launch.jar

NOTE: If you use version 0.11.0 of Simple Build Tool with this example you will get the error: “sbt.ResolveException: unresolved dependency: com.github.siasia#xsbt-web-plugin_2.9.1;0.1.0-0.11.0: not found”.

Create a script to run Simple Build Tool

In the “scalatra-sbt-prototype” directory create a file called “sbt.sh” containing the following text:

java -Xmx512M -XX:MaxPermSize=128M -jar `dirname $0`/sbt-launch.jar "$@"

Also, make sure the script is executable:
chmod 755 sbt.sh

NOTE: setting MaxPermSize is important. This example will sometimes throw a “Error during sbt execution: java.lang.OutOfMemoryError: PermGen space” error if the PermGen memory size has not been increased.

Run the example

In the “scalatra-sbt-prototype” folder execute the sbt script:

./sbt.sh

Once the Simple Build Tool prompt has opened, you can now start the web app which uses Jetty:
jetty-run

Confirm it is working

If everything went as planned, you should be able to navigate to http://localhost:8080/ and see “Hello World”.

That’s all!

September 11, 2011

Python – Upgrading Python with easy_install, pip, and virtualenv on a Mac

Filed under: Mac,Python — Brian @ 1:45 pm
Tags: , ,

For this tutorial I am upgrading from python 2.6 to python 2.7 on my Mac. Every time I have to upgrade Python versions, it always takes me a few minutes to remember the steps since I do this task so rarely. So here is a simple set of instructions that seem to work for me!

Install python 2.7

  • Download the Python Mac dmg file from the Python Downloads page
    Just open the dmg and follow the installer as you would with any application
  • You should now have Python 2.7 installed in:
    /Library/Frameworks/Python.framework/Versions/2.7/bin/python

NOTE: Even though we have installed python 2.7, your /usr/bin/python is still pointing to an older version of Python, so for example if you run easy_install, if will run from the older version of Python, and install packages into the library of the old Python installation.

So, on to step two of the install:
Let’s switch the /usr/bin/python link to point to the newest version of Python, and at the same time let’s create a soft link in the Python install folder that always points to the current install of Python. So the next time we upgrade, we will only need to change the “Current” version link.

#Create a soft link in /Library/Frameworks/Python.framework/Versions/
cd /Library/Frameworks/Python.framework/Versions/
ln -s 2.7 Current

#Switch the /usr/bin/python link to point to current python link
cd /usr/bin
rm -f python
ln -s /Library/Frameworks/Python.framework/Versions/Current/bin/python python

Install Setup Tools (this includes easy_install)

Download the egg file for your version of python (for this example, 2.7) from the Setup Tools Downloads page. To install setup tools just run the egg file like any other shell script:

sudo sh setuptools-0.6c11-py2.7.egg

Install pip (Using easy_install)

Open a new Terminal window session. You will need a new terminal session for easy_install to show up on the path.

sudo easy_install pip

Now we can install any python packages we want using pip!

Install Virtualenv (Using pip)

Now that we have pip, lets install a very important package called virtualenv!

pip install virtualenv

Thats it!

Now you will have Python 2.7 on your Mac, with easy_install, pip, and virtualenv ready to go.

August 16, 2011

JMS – How to setup a DurableSubscriber with a MessageListener using ActiveMQ

When I first used JMS and ActiveMQ the first example I tried was a very simple Producer and Consumer that sent/received a single text message to/from ActiveMQ using a Queue. This was very easy to do, but then I wanted to see how to use a Topic and use a DurableSubscriber with a MessageListener. Also, rather than using auto acknowledge, which tells ActiveMQ to remove the message the moment the subscriber receives it, instead I wanted to wait until the subscriber had finished processing the message before acknowledging to ActiveMQ that the message was successfully processed. It turns out all of this was much easier to do than I had thought. So let’s set up a simple example.

Setup ActiveMQ

  • Download the latest version ActiveMQ here. (For this post 5.5.0 was used)
  • Now unpack the tar file and start ActiveMQ:
  •          tar -zxvf apache-activemq-5.5.0-bin.tar.gz
             cd apache-activemq-5.5.0/bin
             chmod 755 activemq
             cd ..
             bin/activemq start
           

  • You should now be able to access the ActiveMQ admin page: http://localhost:8161/admin/

Get the required libraries
To compile and run the publisher and subscriber, you will need the following two libraries on your class path:

  • From the apache-activemq-5.5.0 folder get the jar file: activemq-all-5-5-0.jar
  • You will also need SLF4J (Simple Logging Facade for Java): slf4j-simple-1.5.11.jar
    • NOTE: version 1.6+ will not work with the current version of ActiveMQ (5.5.0)
    • You will need to download version 1.5.11 here

NOTE: For this example I used scala 2.9.0.

I just used IntelliJ with a lib folder for this example, however if you are using maven, you can just use these dependencies:

<dependencies>
    <dependency>
          <groupId>org.apache.activemq</groupId>
          <artifactId>activemq-core</artifactId
          <version>5.5.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.5.11</version>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.9.0</version>
    </dependency>
</dependencies>

Producer
The producer code is fairly straight forward. Create a connection to ActiveMQ, create a Topic, create a MessageProducer, and send three messages. That’s all!

import javax.jms._
import org.apache.activemq.ActiveMQConnection
import org.apache.activemq.ActiveMQConnectionFactory

object Producer {
  val activeMqUrl: String = ActiveMQConnection.DEFAULT_BROKER_URL
  val topicName: String = "TOPIC_NAME"

  def main(args: Array[String]): Unit = {
    val connectionFactory: ConnectionFactory = new ActiveMQConnectionFactory(activeMqUrl)
    val connection: Connection = connectionFactory.createConnection
    connection.start
    val session: Session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
    val destination: Destination = session.createTopic(topicName)
    val messageProducer: MessageProducer = session.createProducer(destination)
    val textMessage: TextMessage = session.createTextMessage("Hello Subscriber!")
    for (i <- 0 until 3) {
      messageProducer.send(textMessage)
      println("Message sent to subscriber: '" + textMessage.getText + "'")
    }
    connection.close
  }

}

Durable Subscriber
The durable subscriber will register to receive messages on the same topic that the producer is sending messages to. Every message sent by the producer will be received and printed by the message listener. After processing the received message, the durable subscriber will acknowledge it telling ActiveMQ that the message was successfully received.

import javax.jms._
import org.apache.activemq.ActiveMQConnection
import org.apache.activemq.ActiveMQConnectionFactory

object DurableSubscriber {
  val activeMqUrl: String = ActiveMQConnection.DEFAULT_BROKER_URL
  val topicName: String = "TOPIC_NAME"

  def main(args: Array[String]): Unit = {
    // Set up the connection, same as the producer, however you also need to set a
    // unique ClientID which ActiveMQ uses to identify the durable subscriber
    val connectionFactory: ConnectionFactory = new ActiveMQConnectionFactory(activeMqUrl)
    val connection: Connection = connectionFactory.createConnection
    connection.setClientID("SomeClientID")
    connection.start

    // We don't want to use AUTO_ACKNOWLEDGE, instead we want to ensure the subscriber has successfully
    // processed the message before telling ActiveMQ to remove it, so we will use CLIENT_ACKNOWLEDGE
    val session: Session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE)

    // Register to be notified of all messages on the topic
    val topic: Topic = session.createTopic(topicName)
    val durableSubscriber: TopicSubscriber = session.createDurableSubscriber(topic, "Test_Durable_Subscriber")

    // Create a listener to process each received message
    val listener: MessageListener = new MessageListener {
      def onMessage(message: Message): Unit = {
        try {
          if (message.isInstanceOf[TextMessage]) {
            val textMessage: TextMessage = message.asInstanceOf[TextMessage]
            println("Message received from producer: '" + textMessage.getText + "'")

            // Once we have successfully processed the message, send an acknowledge back to ActiveMQ
            message.acknowledge
          }
        }
        catch {
          case je: JMSException => {
            println(je.getMessage)
          }
        }
      }
    }

    // Add the message listener to the durable subscriber
    durableSubscriber.setMessageListener(listener)
  }

}

To run the program, first start the DurableSubscriber. It will connect to ActiveMQ and wait to receive a message. Now when you run the Producer you will see that the DurableSubscriber receives all three messages sent by the Producer.

Start DurableSubscriber

278 [ActiveMQ Task-1] INFO org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://localhost:61616

Start Producer

277 [ActiveMQ Task-1] INFO org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://localhost:61616
Message sent to subscriber: 'Hello Subscriber!'
Message sent to subscriber: 'Hello Subscriber!'
Message sent to subscriber: 'Hello Subscriber!'

Messages Received by DurableSubscriber

Message received from producer: 'Hello Subscriber!'
Message received from producer: 'Hello Subscriber!'
Message received from producer: 'Hello Subscriber!'

ActiveMQ makes all of this very easy!

June 23, 2011

Scala – Implementing an Actor with a blocking mailbox using Akka

Filed under: Actors,Scala — Brian @ 10:32 pm
Tags: , ,

A problem we ran into a couple of years ago with Scala Actors was their inability to set a maximum mailbox size. The problem we had was that our program was sending messages to an Actor faster than the Actor could process each message. Since Scala Actors do not have a mailbox size limit, eventually the mailbox on the Actor would grow until the program ran out of memory.

What we really needed was an Actor with a mailbox that would block when it reached some maximum mailbox size. At the time, some brilliant people I work with wrote a custom Actor with blocking queues for our application. However, now a package called Akka is available and it is precisely this problem (and many others) that Akka solves.

As an example lets write a simple program that sends messages to an Akka Actor with a maximum mailbox size and demonstrate that the Actor’s mailbox blocks when the maximum mailbox size is reached.

Get Scala

Make sure you have Scala 2.9.0 or newer installed.

Get Akka

Download the Akka Actors library, you will need the akka-actors-*.jar file. For this example we will be using akka-actors-1.1.2.jar.

Create an Akka config file

Create an akka.conf file. If no config file is used, Akka uses it’s default settings, which is an unlimited mailbox size. Use this config file and just modify the following two settings:

  • mailbox-capacity = 10
    • Set a maximum mailbox queue size. This number can be anything you want.
  • mailbox-push-timeout-time = -1
    • Set a timeout of -1 (unlimited), this will prevent a caller from timing out when the mailbox on the actor is blocking.

Simple Akka Example to Demonstrate a Blocking Mailbox

A simple application that sends messages to our Akka Actor which has a limited mailbox size. In our example the Actor will sleep for one second each time it receives a message. This will cause our Actor to quickly reach it’s mailbox limit of 10 messages.

import akka.actor.Actor
import Actor._

case class Message(msg:String)
case class Shutdown()

object BlockingActorExample extends App {
  val blockingActor = actorOf[BlockingActor].start
  var counter = 0
  while (counter < 25) {
    println("Sending message number: " + counter)
    blockingActor ! Message(counter.toString)
    counter+=1
  }
  blockingActor !!! Shutdown
}

class BlockingActor extends Actor {
  def receive = {
    case message : Message => {
      println("Received message: %s, mailbox size: %s".format(message.msg, self.getMailboxSize()))
      Thread.sleep(1000)
    }
    case Shutdown => {
      self reply (Shutdown)
      self.stop()
    }
  }
}

Let’s Run the Program!

Run the program from your favorite IDE or from the command line. Make sure to include the akka jar file akka-actors-1.1.2.jar on your classpath and also ensure you include the parameter for the akka config file -Dakka.config=akka.conf.

Output

Our Actor has a maximum mailbox size of 10. Notice how the Actor’s mailbox never exceeds 10 messages, and the application sending it messages is blocked when the mailbox size has been reached.

Loading config from -Dakka.config=akka.conf
Sending message number: 0
Sending message number: 1
Sending message number: 2
Sending message number: 3
Sending message number: 4
Sending message number: 5
Sending message number: 6
Sending message number: 7
Sending message number: 8
Sending message number: 9
Sending message number: 10
Sending message number: 11
Received message: 0, mailbox size: 10
Sending message number: 12
Received message: 1, mailbox size: 9
Sending message number: 13
Received message: 2, mailbox size: 9
Sending message number: 14
Received message: 3, mailbox size: 9
Sending message number: 15
Received message: 4, mailbox size: 10
Sending message number: 16
Received message: 5, mailbox size: 9
Sending message number: 17
Received message: 6, mailbox size: 9
Sending message number: 18
Received message: 7, mailbox size: 9
Sending message number: 19
Received message: 8, mailbox size: 9
Sending message number: 20
Received message: 9, mailbox size: 10
Sending message number: 21
Received message: 10, mailbox size: 9
Sending message number: 22
Received message: 11, mailbox size: 9
Received message: 12, mailbox size: 9
Sending message number: 23
Sending message number: 24
Received message: 13, mailbox size: 9
Received message: 14, mailbox size: 9
Received message: 15, mailbox size: 9
Received message: 16, mailbox size: 9
Received message: 17, mailbox size: 8
Received message: 18, mailbox size: 7
Received message: 19, mailbox size: 6
Received message: 20, mailbox size: 5
Received message: 21, mailbox size: 4
Received message: 22, mailbox size: 3
Received message: 23, mailbox size: 2
Received message: 24, mailbox size: 1

Process finished with exit code 0

Akka is a great addition to Scala and has many other important features (e.g., Remote Actors). I encourage you to go to the Akka Documentation and read more about it.

Thats all!

May 19, 2011

MongoDB – Negative Regex Query in Mongo shell

Filed under: MongoDB — Brian @ 8:26 am
Tags: , , ,

MongoDB’s interactive shell, supports the use of Regular Expressions as one of their Advanced Query options. The other day it came up that I needed to do a query for all documents where the text of a field did NOT start with a given pattern. It took a little bit of trial and error to get the regex right in the mongo shell, so here is what worked for us.

So let’s say for example we have a MongoDB database with a collection called “cars”. The primary key for each document is “make:model”, for example “Ford:Fusion”.

To find all cars made by “Ford”, you would just query for all documents where the id starts with the text “Ford”:

db.cars.find({"_id":/^Ford/})

But, if you want to query for all cars that are NOT made by “Ford”, you can do this with a negative regex such as:

db.cars.find({"_id":/^((?!Ford).)/})

Obviously, this type of regex is not efficient, but if you just need to do a quick query from the Mongo shell, it can come in handy.

I hope that saves you some time!

Next Page »

Theme: Rubric. Blog at WordPress.com.

Follow

Get every new post delivered to your Inbox.