Scala to C# – The Scala Developers Introduction to C# 4.0

I worked for years in .NET back in the 1.1 to 2.0 days, then moved off to Java, then Python, and finally Scala. However, now I am back to doing some .NET again after being away from it for almost four years. I work in Scala on a daily basis and when I heard I would be doing .NET again I was a bit sad, since I enjoy the syntax in Scala so much. However, since my last experience with .NET was in the 2.0 days, things have changed for the better. With .NET 4.0 a lot of functional style syntax has been incorporated into the language (They have done the opposite of Java and continued to evolve the language). So as I have been working with .NET 4.0, everyday I find myself thinking “If only I had Scala, I could write this in one line”. This has prompted me to actually look for equivalents, and it turns out there are a great deal of them! (Hint: LINQ is a great place to start!)

The funny thing is I am the only person in my office who is willing to work on both sides. I just want to use the best tool for the job, or the tool that makes the most business sense given what needs to be done, the resources available, and what code already exists. So I thought I would write up a few quick Scala and C# examples showing the equivalent syntax to help get you started.

Also, as a note, if you are using C# 4.0, I highly recommend using the re-factoring tool Resharper made by JetBrains who make IntelliJ. This tool has been very handy, often I will write a line or two of code in the old way in C#, and it will highlight and suggest the newer cleaner syntax, and in most cases will actually translate what I have written into the new style (e.g, if you write foreach() {} to loop through a list it will rewrite it as myList.ForEach() etc.). That is a trivial example, but for more complex statements it is a great teaching tool.

ForEach on a list

Let’s create a list of strings and loop through them, printing each string to the console.
Scala:

val someStrings = List("one","two","three")
someStrings.foreach(x => println(x))

C#:

var someStrings = new List<string> { "one", "two", "three" };
someStrings.ForEach(x => Console.WriteLine(x));

Scala has “_” but C# has Method Groups

In the above example any Scala developer would say “But you can write that example with the much cleaner syntax using the underscore”. Yes, that is true I totally agree. In fact I was annoyed that I did not have an equivalent until my ReSharper prompted me in Visual Studio to rewrite the above C# statement using Method Groups. So here they are:
Scala:

val someStrings = List("one","two","three")
someStrings.foreach(println(_))

C#:

var someStrings = new List<string> { "one", "two", "three" };
someStrings.ForEach(Console.WriteLine);

So in C#, since the parameter being passed is a string, and the method takes a string, there is no need to explicitly write it out. This is also very handy when using LINQexpressions, such as “Select”.

Tuples

I was working on a piece of code recently in C# and really wished I could use a Tuple in C#. Well, once again I was surprised to find out it was available:
Scala:

val myTuple = ("Hello", 123)
println(myTuple._1)
println(myTuple._2)

C#:

var myTuple = Tuple.Create("Hello", 123);
Console.WriteLine(myTuple.Item1);
Console.WriteLine(myTuple.Item2);

FoldLeft equivalent is Aggregate

Let’s use the basic introduction to FoldLeft example and calculate the sum of a list of numbers.
Scala:

val numbers = List(1,2,3,4,5)
val total = numbers.foldLeft(0)((result,current) => result + current)
println(total)
//Of course any Scala developer would lean towards the Syntax which I have not found an equivalent for in C#
val total2  = numbers.foldLeft(0)(_+_)
println(total2)

C#:

var numbers = new List<int> {1, 2, 3, 4, 5};
var total = numbers.Aggregate(0, (result, current) => result + current);
Console.WriteLine(total);

As you can see the syntax for the Aggregate function is very similar. In fact it took me a while to figure out what method call in C# was the equivalent to a foldLeft just because of the name. Also just as a note their are two types of Aggregate, “Simple” is where you do not need to specify an initial value, while “Seed” is where you do specify an initial value, so the example above is a “Seed” since we are starting our sum with an initial value of 0.

Yes, they both have the “sum” function as well

I know that is a trivial foldLeft and aggregate example, so just to be clear, yes they both have “sum” as well.
Scala:

val numbers = List(1,2,3,4,5)
val total = numbers.sum
println(total)

C#:

var numbers = new List<int> {1, 2, 3, 4, 5};
var total = numbers.Sum();
Console.WriteLine(total);

Filter equivalent is FindAll

Scala:

val numbers = List(1,2,3,4,5)
val numbersSubset = numbers.filter(x => x > 2)
println(numbersSubset.length)

C#:

var numbers = new List<int> {1, 2, 3, 4, 5};
var numbersSubset = numbers.FindAll(x => x > 2);
Console.WriteLine(numbersSubset.Count);

Let’s combine two list operations in one statement

Scala – filter followed by forEach

val numbers = List(1,2,3,4,5)
numbers.filter(x => x > 2).foreach(println(_))

C# – FindAll followed by a ForEach

var numbers = new List<int> {1, 2, 3, 4, 5};
numbers.FindAll(x => x > 2).ForEach(Console.WriteLine);

Constructing a class with named parameters

Yes, I realize these classes are not equivalent, however, they are just for the examples to use below.
Scala:

case class Person(name: String)
val john = new Person(name = "John Smtih")

C#:

class Person { public string name { get; set; } }
var john = new Person { name = "John Smith" };

Construct a new list adding a new Person object to it at the same time

Scala:

case class Person(name: String)
val johns = List(new Person(name = "John Smith"))

C#:

class Person { public string name { get; set; } }
var johns = new List<Person> { new Person { name = "John Smith" } };

Converting a list of objects of one type into another

Now, just as an example of something a little more fun, let’s take the list of “Person” objects we created in the previous example and convert them into a list of “Employee” objects.
Scala – For Scala we will use foldLeft to accomplish this task

case class Employee(name: String)
val employees = johns.foldLeft(List[Employee]())((result,current) => Employee(current.name) :: result)
employees.foreach(x => println(x.name))

C# – For C# you could use Aggregate, but instead let’s try using the LINQ expression “Select”

class Employee { public string name { get; set; } }
var employees = johns.Select(person => new Employee { name = person.name }).ToList();
employees.ForEach(x => Console.WriteLine(x.name));

That’s all for now. As a side note I highly recommend getting familiar with LINQ, it is very useful. The site 101 LINQ Samples in C# is a great place to get started.

Scala – Connecting a Scala app to MS SQL Server with Squeryl

For all of our Scala applications up until now, we have being using MongoDB. However, recently we needed to connect one of our Scala applications to Microsoft SQL Server database. One of my excellent co-workers pointed us at a really nice project called Squeryl which provides a very simple library for doing just that. The documentation is excellent and great examples are provided to help get you up and running quickly.

Let’s look at a simple example.

Create a table in your SQL Server database:

CREATE TABLE [dbo].[Customer](
[id] [int] NOT NULL,
[name] [varchar] (100) NOT NULL,
CONSTRAINT [PK_Customer] PRIMARY KEY CLUSTERED
(
[id] ASC
)) ON [PRIMARY]

Insert a row:

INSERT INTO dbo.Customer(id, name)
VALUES(1,'John Smith')

Get all of the libraries you will need to make the program run:

  • Download Squeryl jar (2.9.0)
  • Download jtds driver jar (1.2.5)
  • Download cglib jar (2.2.2)
  • Download asm jar (3.1)

Now, let’s run our test code to select a row from our Customer table:

package spikes

import org.squeryl.Session
import org.squeryl.adapters.MSSQLServer
import org.squeryl.SessionFactory
import org.squeryl.PrimitiveTypeMode._
import org.squeryl.Schema

// Create a Customer class which has the same fields as the Customer table in SQL Server
case class Customer(id: Long, name: String) {}

object SqlSpike extends App with Schema {
val databaseConnectionUrl = "jdbc:jtds:sqlserver://myservername;DatabaseName=mydatabasename"
val databaseUsername = "myusername"
val databasePassword = "password"

// Set the jtds driver
Class.forName("net.sourceforge.jtds.jdbc.Driver")

// Connect to the database
SessionFactory.concreteFactory = Some(()=>
Session.create(
java.sql.DriverManager.getConnection(databaseConnectionUrl, databaseUsername, databasePassword),
new MSSQLServer))

// Setup the Customer class to be mapped to the "Customer" table in SQL Server
val customers = table[Customer]("Customer")

// Select Customer with id=1 from the Customer table
transaction {
val customer = customers.where(c=> c.id === 1).single
println("Customer name: " + customer.name)
}
}

Finally, the output of the program:

[info] Running spikes.SqlSpike
Customer name: John Smith
[success] Total time: 4 s, completed 29-Feb-2012 8:42:24 PM

Overall we have found the Squeryl library to be very useful. If you need to connect a Scala application to a SQL database I recommend you give it a try, it should save you some aggravation.

Scala – How to combine two lists of different types so they can be sorted by a common field

Recently I was pairing with a co-worker on a new view for our application. The purpose of this view was just to show a timeline of all activities (regardless of type) occurring in the system. We were confident there would be a way to accomplish this task easily in Scala!

For this example lets say we are writing a program for a car dealership. The manager of the dealership would like a page showing test drives and sales throughout the day. The point of this page is not to connect test drives to sales, but instead a view used to give a timeline of the activity taking place on the car lot.

You can download the example code from github here.

In the example below, we have two lists of different types:

  • val testDrives = List(TestDrive(…)…)
  • val purchases = List(Purchase(…)…)

The first thing we will need to do is combine the two lists into one list using:

  • val activities = (testDrives ::: purchases)

Once we have created this new list containing both types of TestDrives and Purchases, we can then sort the list in descending order on the common field date.

  • val activitiesSorted = activities.asInstanceOf[List[ {def date: Date}]].sortBy(_.date).reverse

The following example shows how to do this:

import java.util.Date
import scala.language.reflectiveCalls

case class TestDrive(car: String, salesPerson: String, date: Date)

case class Purchase(car: String, salesPerson: String, soldPrice: Int, date: Date)

object SortTwoListsByCommonField {
  def main(args: Array[String]): Unit = {
    val date = new Date
    val dateFormat = new java.text.SimpleDateFormat("yyyy/MM/dd HH:mm:ss")

    val testDrives = List(
      TestDrive("Charger", "John Smith", new Date(date.getTime - 1000000)),
      TestDrive("Camero", "Jane Doe", new Date(date.getTime - 4000000)),
      TestDrive("Mustang", "Frank Smith", new Date(date.getTime - 10000000))
    )

    val purchases = List(
      Purchase("G35x", "Jane Doe", 19500, new Date(date.getTime - 500000)),
      Purchase("Altima", "Jane Doe", 28500, new Date(date.getTime - 6000000)),
      Purchase("Maxima", "Frank Smith", 35000, new Date(date.getTime - 9000000))
    )

    // Combine the TestDrives and Purchases lists
    val activities = (testDrives ::: purchases)

    // Sort the activities by the common field date
    val activitiesSorted = activities.asInstanceOf[List[ {def date: Date}]].sortBy(_.date).reverse

    // Now if we want to show fields other than "date", we can use pattern matching to cast each activity
    for (activity <- activitiesSorted) {
      activity match {
        case t: TestDrive =>
          println("%s\t|\t%s\t|\t%s\t|\t%s"
            .format(dateFormat.format(t.date), t.getClass.getSimpleName, t.car, t.salesPerson))
        case p: Purchase =>
          println("%s\t|\t%s\t|\t%s\t|\t%s\t|\t$%s"
            .format(dateFormat.format(p.date), p.getClass.getSimpleName, p.car, p.salesPerson, p.soldPrice))
      }
    }
  }
}

You can run the program from the command line with the following command:

scala SortTwoListsByCommonField.scala

As you can see in the output, the TestDrives and Purchases are in one list and ordered descending by date:

2020/12/09 21:37:38    |   Purchase    |   G35x    |   Jane Doe    |   $19500
2020/12/09 21:29:18    |   TestDrive   |   Charger |   John Smith
2020/12/09 20:39:18    |   TestDrive   |   Camero  |   Jane Doe
2020/12/09 20:05:58    |   Purchase    |   Altima  |   Jane Doe    |   $28500
2020/12/09 19:15:58    |   Purchase    |   Maxima  |   Frank Smith |   $35000
2020/12/09 18:59:18    |   TestDrive   |   Mustang |   Frank Smith

I hope that helps!

Scala – Handling failure in Actors with Akka Supervisors

We have been using Akka Actors in all of our Scala projects for about a year now and have been very impressed. However, the problem with applications that have concurrently running Actors is that if an exception occurs and the Actor dies, the application may appear to still be up and running, but an Actor is now missing and know one knows this has happened. What we want in our application is to follow the “Let it fail” design philosophy by being able to restart an actor if one happens to fail. We want to guarantee that every Actor in the application is alive and well, even if an exception occurs. The easy way to obtain this type of fault tolerance in concurrent code is by using Actor Supervisors. The documentation on the Akka site is excellent, I highly recommend you read through it.

The basic idea is that the Supervisor is responsible for managing any Actors you ask it to manage. The supervisor starts the Actors it has been told to manage and in the event of an Actor crashing or being stopped, the supervisor will immediately restart the Actor for you.

The following is a simple example showing how to use a supervisor to start an Actor, that will automatically restart the Actor if an exception is thrown. To run the example you will need to download the Akka Actors library from the Akka Downloads page. Just download the zip, unzip it, and put the akka-actor-1.2.jar file from the lib/akka folder on your class path.

NOTE: This example was written using: Akka-Actor 1.2 and Scala 2.9.1

import akka.actor.Actor._
import akka.actor.{SupervisorFactory, Actor, Supervisor}
import akka.config.Supervision._

case object AreYouAlive
case object Kaboom

// Simple Actor that handles two types of messages: AreYouAlive and Kaboom
class SomeActor extends Actor {
def receive = {
case AreYouAlive => println("Yes I am alive!")
case Kaboom => throw new RuntimeException("Kaboom!!!")
}
}

object SomeApp extends App {
val someActor = actorOf[SomeActor]

// Setup the supervisorFactory with a config and a list of Actors to supervise
val supervisorFactory = SupervisorFactory(
SupervisorConfig(OneForOneStrategy(List(classOf[Exception]), 3, 1000),
Supervise(someActor, Permanent) :: Nil)
)
// Instantiate and start the supervisor, this also starts all supervised Actors
val supervisor = supervisorFactory.newInstance
supervisor.start

// Ping the Actor, yep it is working
someActor ! AreYouAlive

// Send the Kaboom message, oh no, the Actor will crash
someActor ! Kaboom

Thread.sleep(1000)

// Wait, our Actor is alive and well again
someActor ! AreYouAlive
supervisor.shutdown()
}

Our Actor supervisor example will produce the following output:

Yes I am alive!
[ERROR]   [11/22/11 10:30 PM] [akka:event-driven:dispatcher:global-1] [LocalActorRef] Kaboom!!!
java.lang.RuntimeException: Kaboom!!!
at SomeActor$$anonfun$receive$1.apply(ActorSupervisorExample.scala:12)
at SomeActor$$anonfun$receive$1.apply(ActorSupervisorExample.scala:10)
at akka.actor.Actor$class.apply(Actor.scala:545)
at SomeActor.apply(ActorSupervisorExample.scala:9)
at akka.actor.LocalActorRef.invoke(ActorRef.scala:905)
at akka.dispatch.MessageInvocation.invoke(MessageHandling.scala:25)
at akka.dispatch.ExecutableMailbox$class.processMailbox(ExecutorBasedEventDrivenDispatcher.scala:216)
at akka.dispatch.ExecutorBasedEventDrivenDispatcher$$anon$4.processMailbox(ExecutorBasedEventDrivenDispatcher.scala:122)
at akka.dispatch.ExecutableMailbox$class.run(ExecutorBasedEventDrivenDispatcher.scala:188)
at akka.dispatch.ExecutorBasedEventDrivenDispatcher$$anon$4.run(ExecutorBasedEventDrivenDispatcher.scala:122)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:680)
at akka.dispatch.MonitorableThread.run(ThreadPoolBuilder.scala:184)

Yes I am alive!

As you can see, even though the Actor threw an exception when it tried to handle the Kaboom message, the Actor was restarted and back to work right away. Great stuff!

I highly recommend reading the Fault Tolerance Through Supervisor Hierarchies documentation on the Akka website.

JMS – How to do Synchronous Messaging with ActiveMQ in Scala

The first question you will probably ask is “Aren’t queues designed for the purpose of asynchronous fire and forget style messaging?”. Well, yep, they are! Your goal for scaling should be to make anything that can be asynchronous, asynchronous. However, if a message bus such as ActiveMQ is how your applications talk to each other, sometimes an immediate response is necessary (although we try to avoid this when possible). The following is a simple example in Scala showing how to send such a message with ActiveMQ.

Setup ActiveMQ

  • Download the latest version ActiveMQ here. (For this post 5.5.0 was used)
  • Now unpack the tar file and start ActiveMQ:
  •          tar -zxvf apache-activemq-5.5.0-bin.tar.gz
             cd apache-activemq-5.5.0/bin
             chmod 755 activemq
             cd ..
             bin/activemq start
           
  • You should now be able to access the ActiveMQ admin page: http://localhost:8161/admin/

Get the required libraries
To compile and run the publisher and subscriber, you will need the following two libraries on your class path:

  • From the apache-activemq-5.5.0 folder get the jar file: activemq-all-5-5-0.jar
  • You will also need SLF4J (Simple Logging Facade for Java): slf4j-simple-1.5.11.jar
    • NOTE: version 1.6+ will not work with the current version of ActiveMQ (5.5.0)
    • You will need to download version 1.5.11 here

NOTE: For this example I used scala 2.9.0.

Synchronous Producer
The main difference between an asynchronous and synchronous producer is that the synchronous producer has two queues: a send queue and a reply queue. The send queue is the queue on which the producer will send a message to the consumer. The reply queue is the queue on which the producer will listen for a reply from the consumer. The producer when it sends a message sets two important pieces of information on the message:

  1. JMSCorrelationID: This is the uniqueID used by the producer to indentify the message
  2. JMSReplyTo: This tells the consumer on which queue to send the message reply

The producer then creates a “ReplyConsumer” on the reply queue and listens for a reply from the consumer that contains that “JMSCorrelationID”. When a message with that ID appears on the reply queue, the ReplyConsumer will receive that message and our synchronous message round trip has been completed!

import javax.jms._
import org.apache.activemq.ActiveMQConnectionFactory
import util.Random

object ProducerSynchronous {
  val activeMqUrl: String = "tcp://localhost:61616"

  def main(args: Array[String]): Unit = {
    val connectionFactory = new ActiveMQConnectionFactory(activeMqUrl)
    val connection = connectionFactory.createConnection
    connection.setClientID("ProducerSynchronous")
    connection.start

    val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
    val sendQueue = session.createQueue("SendSynchronousMsgQueue")
    val replyQueue = session.createQueue("ReplyToSynchronousMsgQueue")

    val producer = session.createProducer(sendQueue)
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)

    val correlationId = new Random().nextString(32)
    val replyConsumer = session.createConsumer(replyQueue, "JMSCorrelationID = '%s'".format(correlationId))

    val textMessage = session.createTextMessage("Hello, please reply immediately to my message!")
    textMessage.setJMSReplyTo(replyQueue)
    textMessage.setJMSCorrelationID(correlationId)

    println("Sending message...")

    producer.send(textMessage)

    println("Waiting for reply...")

    val reply = replyConsumer.receive(1000)
    replyConsumer.close()

    reply match {
      case txt: TextMessage => println("Received reply: " + txt.getText())
      case _ => throw new Exception("Invalid Response:" + reply)
    }

    connection.close
  }
}

Synchronous Consumer
The synchronous consumer listens for messages on the send queue. When a message is received, it creates a “ReplyProducer” and connects it to the reply queue specified in the message’s “JMSReplyTo” field. The consumer then creates a reply message and copies the JMSCorrelationID from the received message. The consumer then sends the reply message with the “ReplyProducer”.

import javax.jms._
import org.apache.activemq.ActiveMQConnectionFactory

object ConsumerSynchronous {
  val activeMqUrl: String = "tcp://localhost:61616"
  
  def main(args: Array[String]): Unit = {
    val connectionFactory  = new ActiveMQConnectionFactory(activeMqUrl)
    val connection = connectionFactory.createConnection
    connection.setClientID("ConsumerSynchronous")
    connection.start

    println("Started")

    val session: Session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
    val queue  = session.createQueue("SendSynchronousMsgQueue")
    val consumer = session.createConsumer(queue)

    val listener = new MessageListener {
      def onMessage(message: Message) {
        message match {
          case text: TextMessage => {
          val replyProducer = session.createProducer(text.getJMSReplyTo())
          replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)

          println("Received message: " + text.getText)

          val replyMessage = session.createTextMessage("Yes I received your message!")
          replyMessage.setJMSCorrelationID(text.getJMSCorrelationID())

          println("Reply sent!")
            
          replyProducer.send(replyMessage)
          }
          case _ => {
            throw new Exception("Unhandled Message Type: " + message.getClass.getSimpleName)
          }
        }
      }
    }
    consumer.setMessageListener(listener)
  }
}

That’s all!

Scala – Hello World REST API with Scalatra and Simple Build Tool

I am new to Simple Build Tool and also to building REST APIs in Scala. Mostly I have been using Python with Bottle for REST APIs up until this point. For our current project our REST API will be written in Scala, so I wanted to try out Scalatra. They have excellent documentation and examples, however they assume you have a good understanding of Simple Build Tool (known as sbt or xsbt). I will walk you through running the Scalatra example with Simple Build Tool, assuming you are using it for the first time.

The example we are using is the main example from Scalatra’s website http://www.scalatra.org/.

Get the example application

Clone the Scalatra Hello World REST API example application using git.

git clone http://github.com/scalatra/scalatra-sbt-prototype.git

Download Simple Build Tool

Download version 10 (used for the Scalatra example) of Simple Build Tool into the “scalatra-sbt-prototype” directory.

cd scalatra-sbt-prototype
wget http://typesafe.artifactoryonline.com/typesafe/ivy-releases/org.scala-tools.sbt/sbt-launch/0.10.0/sbt-launch.jar

NOTE: If you use version 0.11.0 of Simple Build Tool with this example you will get the error: “sbt.ResolveException: unresolved dependency: com.github.siasia#xsbt-web-plugin_2.9.1;0.1.0-0.11.0: not found”.

Create a script to run Simple Build Tool

In the “scalatra-sbt-prototype” directory create a file called “sbt.sh” containing the following text:

java -Xmx512M -XX:MaxPermSize=128M -jar `dirname $0`/sbt-launch.jar "$@"

Also, make sure the script is executable:

chmod 755 sbt.sh

NOTE: setting MaxPermSize is important. This example will sometimes throw a “Error during sbt execution: java.lang.OutOfMemoryError: PermGen space” error if the PermGen memory size has not been increased.

Run the example

In the “scalatra-sbt-prototype” folder execute the sbt script:

./sbt.sh

Once the Simple Build Tool prompt has opened, you can now start the web app which uses Jetty:

jetty-run

Confirm it is working

If everything went as planned, you should be able to navigate to http://localhost:8080/ and see “Hello World”.

That’s all!

JMS – How to setup a DurableSubscriber with a MessageListener using ActiveMQ

When I first used JMS and ActiveMQ the first example I tried was a very simple Producer and Consumer that sent/received a single text message to/from ActiveMQ using a Queue. This was very easy to do, but then I wanted to see how to use a Topic and use a DurableSubscriber with a MessageListener. Also, rather than using auto acknowledge, which tells ActiveMQ to remove the message the moment the subscriber receives it, instead I wanted to wait until the subscriber had finished processing the message before acknowledging to ActiveMQ that the message was successfully processed. It turns out all of this was much easier to do than I had thought. So let’s set up a simple example.

Setup ActiveMQ

  • Download the latest version ActiveMQ here. (For this post 5.5.0 was used)
  • Now unpack the tar file and start ActiveMQ:
  •          tar -zxvf apache-activemq-5.5.0-bin.tar.gz
             cd apache-activemq-5.5.0/bin
             chmod 755 activemq
             cd ..
             bin/activemq start
           
  • You should now be able to access the ActiveMQ admin page: http://localhost:8161/admin/

Get the required libraries
To compile and run the publisher and subscriber, you will need the following two libraries on your class path:

  • From the apache-activemq-5.5.0 folder get the jar file: activemq-all-5-5-0.jar
  • You will also need SLF4J (Simple Logging Facade for Java): slf4j-simple-1.5.11.jar
    • NOTE: version 1.6+ will not work with the current version of ActiveMQ (5.5.0)
    • You will need to download version 1.5.11 here

NOTE: For this example I used scala 2.9.0.

I just used IntelliJ with a lib folder for this example, however if you are using maven, you can just use these dependencies:

<dependencies>
    <dependency>
          <groupId>org.apache.activemq</groupId>
          <artifactId>activemq-core</artifactId
          <version>5.5.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.5.11</version>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.9.0</version>
    </dependency>
</dependencies>

Producer
The producer code is fairly straight forward. Create a connection to ActiveMQ, create a Topic, create a MessageProducer, and send three messages. That’s all!

import javax.jms._
import org.apache.activemq.ActiveMQConnection
import org.apache.activemq.ActiveMQConnectionFactory

object Producer {
  val activeMqUrl: String = ActiveMQConnection.DEFAULT_BROKER_URL
  val topicName: String = "TOPIC_NAME"

  def main(args: Array[String]): Unit = {
    val connectionFactory: ConnectionFactory = new ActiveMQConnectionFactory(activeMqUrl)
    val connection: Connection = connectionFactory.createConnection
    connection.start
    val session: Session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
    val destination: Destination = session.createTopic(topicName)
    val messageProducer: MessageProducer = session.createProducer(destination)
    val textMessage: TextMessage = session.createTextMessage("Hello Subscriber!")
    for (i <- 0 until 3) {
      messageProducer.send(textMessage)
      println("Message sent to subscriber: '" + textMessage.getText + "'")
    }
    connection.close
  }

}

Durable Subscriber
The durable subscriber will register to receive messages on the same topic that the producer is sending messages to. Every message sent by the producer will be received and printed by the message listener. After processing the received message, the durable subscriber will acknowledge it telling ActiveMQ that the message was successfully received.

import javax.jms._
import org.apache.activemq.ActiveMQConnection
import org.apache.activemq.ActiveMQConnectionFactory

object DurableSubscriber {
  val activeMqUrl: String = ActiveMQConnection.DEFAULT_BROKER_URL
  val topicName: String = "TOPIC_NAME"

  def main(args: Array[String]): Unit = {
    // Set up the connection, same as the producer, however you also need to set a
    // unique ClientID which ActiveMQ uses to identify the durable subscriber
    val connectionFactory: ConnectionFactory = new ActiveMQConnectionFactory(activeMqUrl)
    val connection: Connection = connectionFactory.createConnection
    connection.setClientID("SomeClientID")
    connection.start

    // We don't want to use AUTO_ACKNOWLEDGE, instead we want to ensure the subscriber has successfully
    // processed the message before telling ActiveMQ to remove it, so we will use CLIENT_ACKNOWLEDGE
    val session: Session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE)

    // Register to be notified of all messages on the topic
    val topic: Topic = session.createTopic(topicName)
    val durableSubscriber: TopicSubscriber = session.createDurableSubscriber(topic, "Test_Durable_Subscriber")

    // Create a listener to process each received message
    val listener: MessageListener = new MessageListener {
      def onMessage(message: Message): Unit = {
        try {
          if (message.isInstanceOf[TextMessage]) {
            val textMessage: TextMessage = message.asInstanceOf[TextMessage]
            println("Message received from producer: '" + textMessage.getText + "'")

            // Once we have successfully processed the message, send an acknowledge back to ActiveMQ
            message.acknowledge
          }
        }
        catch {
          case je: JMSException => {
            println(je.getMessage)
          }
        }
      }
    }

    // Add the message listener to the durable subscriber
    durableSubscriber.setMessageListener(listener)
  }

}

To run the program, first start the DurableSubscriber. It will connect to ActiveMQ and wait to receive a message. Now when you run the Producer you will see that the DurableSubscriber receives all three messages sent by the Producer.

Start DurableSubscriber

278 [ActiveMQ Task-1] INFO org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://localhost:61616

Start Producer

277 [ActiveMQ Task-1] INFO org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://localhost:61616
Message sent to subscriber: 'Hello Subscriber!'
Message sent to subscriber: 'Hello Subscriber!'
Message sent to subscriber: 'Hello Subscriber!'

Messages Received by DurableSubscriber

Message received from producer: 'Hello Subscriber!'
Message received from producer: 'Hello Subscriber!'
Message received from producer: 'Hello Subscriber!'

ActiveMQ makes all of this very easy!

Scala – Implementing an Actor with a blocking mailbox using Akka

A problem we ran into a couple of years ago with Scala Actors was their inability to set a maximum mailbox size. The problem we had was that our program was sending messages to an Actor faster than the Actor could process each message. Since Scala Actors do not have a mailbox size limit, eventually the mailbox on the Actor would grow until the program ran out of memory.

What we really needed was an Actor with a mailbox that would block when it reached some maximum mailbox size. At the time, some brilliant people I work with wrote a custom Actor with blocking queues for our application. However, now a package called Akka is available and it is precisely this problem (and many others) that Akka solves.

As an example lets write a simple program that sends messages to an Akka Actor with a maximum mailbox size and demonstrate that the Actor’s mailbox blocks when the maximum mailbox size is reached.

Get Scala

Make sure you have Scala 2.9.0 or newer installed.

Get Akka

Download the Akka Actors library, you will need the akka-actors-*.jar file. For this example we will be using akka-actors-1.1.2.jar.

Create an Akka config file

Create an akka.conf file. If no config file is used, Akka uses it’s default settings, which is an unlimited mailbox size. Use this config file and just modify the following two settings:

  • mailbox-capacity = 10
    • Set a maximum mailbox queue size. This number can be anything you want.
  • mailbox-push-timeout-time = -1
    • Set a timeout of -1 (unlimited), this will prevent a caller from timing out when the mailbox on the actor is blocking.

Simple Akka Example to Demonstrate a Blocking Mailbox

A simple application that sends messages to our Akka Actor which has a limited mailbox size. In our example the Actor will sleep for one second each time it receives a message. This will cause our Actor to quickly reach it’s mailbox limit of 10 messages.

import akka.actor.Actor
import Actor._

case class Message(msg:String)
case class Shutdown()

object BlockingActorExample extends App {
val blockingActor = actorOf[BlockingActor].start
var counter = 0
while (counter < 25) {
println("Sending message number: " + counter)
blockingActor ! Message(counter.toString)
counter+=1
}
blockingActor !!! Shutdown
}

class BlockingActor extends Actor {
def receive = {
case message : Message => {
println("Received message: %s, mailbox size: %s".format(message.msg, self.getMailboxSize()))
Thread.sleep(1000)
}
case Shutdown => {
self reply (Shutdown)
self.stop()
}
}
}

Let’s Run the Program!

Run the program from your favorite IDE or from the command line. Make sure to include the akka jar file akka-actors-1.1.2.jar on your classpath and also ensure you include the parameter for the akka config file -Dakka.config=akka.conf.

Output

Our Actor has a maximum mailbox size of 10. Notice how the Actor’s mailbox never exceeds 10 messages, and the application sending it messages is blocked when the mailbox size has been reached.

Loading config from -Dakka.config=akka.conf
Sending message number: 0
Sending message number: 1
Sending message number: 2
Sending message number: 3
Sending message number: 4
Sending message number: 5
Sending message number: 6
Sending message number: 7
Sending message number: 8
Sending message number: 9
Sending message number: 10
Sending message number: 11
Received message: 0, mailbox size: 10
Sending message number: 12
Received message: 1, mailbox size: 9
Sending message number: 13
Received message: 2, mailbox size: 9
Sending message number: 14
Received message: 3, mailbox size: 9
Sending message number: 15
Received message: 4, mailbox size: 10
Sending message number: 16
Received message: 5, mailbox size: 9
Sending message number: 17
Received message: 6, mailbox size: 9
Sending message number: 18
Received message: 7, mailbox size: 9
Sending message number: 19
Received message: 8, mailbox size: 9
Sending message number: 20
Received message: 9, mailbox size: 10
Sending message number: 21
Received message: 10, mailbox size: 9
Sending message number: 22
Received message: 11, mailbox size: 9
Received message: 12, mailbox size: 9
Sending message number: 23
Sending message number: 24
Received message: 13, mailbox size: 9
Received message: 14, mailbox size: 9
Received message: 15, mailbox size: 9
Received message: 16, mailbox size: 9
Received message: 17, mailbox size: 8
Received message: 18, mailbox size: 7
Received message: 19, mailbox size: 6
Received message: 20, mailbox size: 5
Received message: 21, mailbox size: 4
Received message: 22, mailbox size: 3
Received message: 23, mailbox size: 2
Received message: 24, mailbox size: 1

Process finished with exit code 0

Akka is a great addition to Scala and has many other important features (e.g., Remote Actors). I encourage you to go to the Akka Documentation and read more about it.

Thats all!

Scala – Extending a built in class with Implicit Conversions

The String class in Scala has a “replaceFirst” method, but what if you really want it to have a “replaceLast” method as well in your code. One way to accomplish this task would be:

class StringHelper(str: String) {
def replaceLast(regex: String, replacement: String) = str.reverse.replaceFirst(regex, replacement.reverse).reverse
}

val myString = new StringHelper("Hello:")
println(myString.replaceLast(":",""))

Now that looks kind of ugly, having to directly use our StringHelper in the code. Fortunately Scala’s Implicit Conversions allows you to keep the code clean and let Scala do the work for you. All we have to do is create an implicit wrapper for StringHelper:

class StringHelper(str: String) {
def replaceLast(regex: String, replacement: String) = str.reverse.replaceFirst(regex, replacement.reverse).reverse
}

implicit def stringWrapper(string: String) = new StringHelper(string)

val myString = "Hello:"
println(myString.replaceLast(":",""))

So now whenever the method “replaceLast” is called on a String, Scala is smart enough to do the implicit conversion of the String to StringHelper for me automatically. Scala Implicit Conversions allows me to add additional functionality to an existing built in library class, without having to clutter my code with references to messy wrapper classes.

That’s all!

Tail Recursion in Scala: A Simple Example

While at UberConf I had the great opportunity to attend a talk titled “Scala for Java Programmers” put on by Venkat Subramaniam, the author of the book “Scala Programming“. In my previous post I gave examples of using XML in Scala. In this post I will walk you through the examples Venkat used to show how to properly do tail recursion in Scala.

Recursion
def factorial(number:Int) : Int = {
if (number == 1)
return 1
number * factorial (number - 1)
}
println(factorial(5))

In order for a recursive call to be tail recursive, the call back to the function must be the last action performed in the function. In the example above, because the total returned from the recursive call is being multiplied by number, the recursive call is NOT the last action performed in the function, so the recursive call is NOT tail recursive. The method calls itself until the parameter passed in equals 1. In this recursive method call, the sequence of calls looks like:

5 * total (5 – 1)
4 * total (4 – 1) = 20
3 * total (3 – 1) = 60
2 * total (2 – 1) = 120

Tail Recursion

To take this example and make it tail recursive, we must make sure that the last action performed in the function is the recursive call. To do this we update the factorial function to have two parameters. The new accumulator parameter stores the intermediate value, so we are no longer doing a calculation against the value returned from the function like we were before.

def factorial(accumulator: Int, number: Int) : Int = {
if(number == 1)
return accumulator
factorial(number * accumulator, number - 1)
}
println(factorial(1,5))

However, we can still do better! Scala allows a function to be declared inside another function. So here we take the code inside the function and wrap it in a new inner function called factorialWithAccumulator. By making this change we avoid changing the parameters of the factorial method so no one calling it needs to do anything different.

def factorial(number: Int) : Int = {
def factorialWithAccumulator(accumulator: Int, number: Int) : Int = {
if (number == 1)
return accumulator
else
factorialWithAccumulator(accumulator * number, number - 1)
}
factorialWithAccumulator(1, number)
}
println(factorial(5))
Why Tail Recursion?

In the recursion example, notice how the result of each call must be remembered, to do this each recursive call requires an entry on the stack until all recursive calls have been made. This makes the recursive call more expensive in terms of memory. While in the tail recursive example, there are no intermediate values that need to be stored on the stack, the intermediate value is always passed back as a parameter.