My Adventures in Coding

November 22, 2011

Scala – Handling failure in Actors with Akka Supervisors

Filed under: Actors,Scala — Brian @ 10:53 pm
Tags: , , ,

We have been using Akka Actors in all of our Scala projects for about a year now and have been very impressed. However, the problem with applications that have concurrently running Actors is that if an exception occurs and the Actor dies, the application may appear to still be up and running, but an Actor is now missing and know one knows this has happened. What we want in our application is to follow the “Let it fail” design philosophy by being able to restart an actor if one happens to fail. We want to guarantee that every Actor in the application is alive and well, even if an exception occurs. The easy way to obtain this type of fault tolerance in concurrent code is by using Actor Supervisors. The documentation on the Akka site is excellent, I highly recommend you read through it.

The basic idea is that the Supervisor is responsible for managing any Actors you ask it to manage. The supervisor starts the Actors it has been told to manage and in the event of an Actor crashing or being stopped, the supervisor will immediately restart the Actor for you.

The following is a simple example showing how to use a supervisor to start an Actor, that will automatically restart the Actor if an exception is thrown. To run the example you will need to download the Akka Actors library from the Akka Downloads page. Just download the zip, unzip it, and put the akka-actor-1.2.jar file from the lib/akka folder on your class path.

NOTE: This example was written using: Akka-Actor 1.2 and Scala 2.9.1

import akka.actor.Actor._
import akka.actor.{SupervisorFactory, Actor, Supervisor}
import akka.config.Supervision._

case object AreYouAlive
case object Kaboom

// Simple Actor that handles two types of messages: AreYouAlive and Kaboom
class SomeActor extends Actor {
  def receive = {
    case AreYouAlive => println("Yes I am alive!")
    case Kaboom => throw new RuntimeException("Kaboom!!!")
  }
}

object SomeApp extends App {
  val someActor = actorOf[SomeActor]

  // Setup the supervisorFactory with a config and a list of Actors to supervise
  val supervisorFactory = SupervisorFactory(
    SupervisorConfig(OneForOneStrategy(List(classOf[Exception]), 3, 1000),
    Supervise(someActor, Permanent) :: Nil)
  )
  // Instantiate and start the supervisor, this also starts all supervised Actors
  val supervisor = supervisorFactory.newInstance
  supervisor.start

  // Ping the Actor, yep it is working
  someActor ! AreYouAlive

  // Send the Kaboom message, oh no, the Actor will crash
  someActor ! Kaboom
  
  Thread.sleep(1000)

  // Wait, our Actor is alive and well again
  someActor ! AreYouAlive
  supervisor.shutdown()
}

Our Actor supervisor example will produce the following output:

Yes I am alive!
[ERROR]   [11/22/11 10:30 PM] [akka:event-driven:dispatcher:global-1] [LocalActorRef] Kaboom!!!
java.lang.RuntimeException: Kaboom!!!
	at SomeActor$$anonfun$receive$1.apply(ActorSupervisorExample.scala:12)
	at SomeActor$$anonfun$receive$1.apply(ActorSupervisorExample.scala:10)
	at akka.actor.Actor$class.apply(Actor.scala:545)
	at SomeActor.apply(ActorSupervisorExample.scala:9)
	at akka.actor.LocalActorRef.invoke(ActorRef.scala:905)
	at akka.dispatch.MessageInvocation.invoke(MessageHandling.scala:25)
	at akka.dispatch.ExecutableMailbox$class.processMailbox(ExecutorBasedEventDrivenDispatcher.scala:216)
	at akka.dispatch.ExecutorBasedEventDrivenDispatcher$$anon$4.processMailbox(ExecutorBasedEventDrivenDispatcher.scala:122)
	at akka.dispatch.ExecutableMailbox$class.run(ExecutorBasedEventDrivenDispatcher.scala:188)
	at akka.dispatch.ExecutorBasedEventDrivenDispatcher$$anon$4.run(ExecutorBasedEventDrivenDispatcher.scala:122)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
	at java.lang.Thread.run(Thread.java:680)
	at akka.dispatch.MonitorableThread.run(ThreadPoolBuilder.scala:184)

Yes I am alive!

As you can see, even though the Actor threw an exception when it tried to handle the Kaboom message, the Actor was restarted and back to work right away. Great stuff!

I highly recommend reading the Fault Tolerance Through Supervisor Hierarchies documentation on the Akka website.

Advertisements

June 23, 2011

Scala – Implementing an Actor with a blocking mailbox using Akka

Filed under: Actors,Scala — Brian @ 10:32 pm
Tags: , ,

A problem we ran into a couple of years ago with Scala Actors was their inability to set a maximum mailbox size. The problem we had was that our program was sending messages to an Actor faster than the Actor could process each message. Since Scala Actors do not have a mailbox size limit, eventually the mailbox on the Actor would grow until the program ran out of memory.

What we really needed was an Actor with a mailbox that would block when it reached some maximum mailbox size. At the time, some brilliant people I work with wrote a custom Actor with blocking queues for our application. However, now a package called Akka is available and it is precisely this problem (and many others) that Akka solves.

As an example lets write a simple program that sends messages to an Akka Actor with a maximum mailbox size and demonstrate that the Actor’s mailbox blocks when the maximum mailbox size is reached.

Get Scala

Make sure you have Scala 2.9.0 or newer installed.

Get Akka

Download the Akka Actors library, you will need the akka-actors-*.jar file. For this example we will be using akka-actors-1.1.2.jar.

Create an Akka config file

Create an akka.conf file. If no config file is used, Akka uses it’s default settings, which is an unlimited mailbox size. Use this config file and just modify the following two settings:

  • mailbox-capacity = 10
    • Set a maximum mailbox queue size. This number can be anything you want.
  • mailbox-push-timeout-time = -1
    • Set a timeout of -1 (unlimited), this will prevent a caller from timing out when the mailbox on the actor is blocking.

Simple Akka Example to Demonstrate a Blocking Mailbox

A simple application that sends messages to our Akka Actor which has a limited mailbox size. In our example the Actor will sleep for one second each time it receives a message. This will cause our Actor to quickly reach it’s mailbox limit of 10 messages.

import akka.actor.Actor
import Actor._

case class Message(msg:String)
case class Shutdown()

object BlockingActorExample extends App {
  val blockingActor = actorOf[BlockingActor].start
  var counter = 0
  while (counter < 25) {
    println("Sending message number: " + counter)
    blockingActor ! Message(counter.toString)
    counter+=1
  }
  blockingActor !!! Shutdown
}

class BlockingActor extends Actor {
  def receive = {
    case message : Message => {
      println("Received message: %s, mailbox size: %s".format(message.msg, self.getMailboxSize()))
      Thread.sleep(1000)
    }
    case Shutdown => {
      self reply (Shutdown)
      self.stop()
    }
  }
}

Let’s Run the Program!

Run the program from your favorite IDE or from the command line. Make sure to include the akka jar file akka-actors-1.1.2.jar on your classpath and also ensure you include the parameter for the akka config file -Dakka.config=akka.conf.

Output

Our Actor has a maximum mailbox size of 10. Notice how the Actor’s mailbox never exceeds 10 messages, and the application sending it messages is blocked when the mailbox size has been reached.

Loading config from -Dakka.config=akka.conf
Sending message number: 0
Sending message number: 1
Sending message number: 2
Sending message number: 3
Sending message number: 4
Sending message number: 5
Sending message number: 6
Sending message number: 7
Sending message number: 8
Sending message number: 9
Sending message number: 10
Sending message number: 11
Received message: 0, mailbox size: 10
Sending message number: 12
Received message: 1, mailbox size: 9
Sending message number: 13
Received message: 2, mailbox size: 9
Sending message number: 14
Received message: 3, mailbox size: 9
Sending message number: 15
Received message: 4, mailbox size: 10
Sending message number: 16
Received message: 5, mailbox size: 9
Sending message number: 17
Received message: 6, mailbox size: 9
Sending message number: 18
Received message: 7, mailbox size: 9
Sending message number: 19
Received message: 8, mailbox size: 9
Sending message number: 20
Received message: 9, mailbox size: 10
Sending message number: 21
Received message: 10, mailbox size: 9
Sending message number: 22
Received message: 11, mailbox size: 9
Received message: 12, mailbox size: 9
Sending message number: 23
Sending message number: 24
Received message: 13, mailbox size: 9
Received message: 14, mailbox size: 9
Received message: 15, mailbox size: 9
Received message: 16, mailbox size: 9
Received message: 17, mailbox size: 8
Received message: 18, mailbox size: 7
Received message: 19, mailbox size: 6
Received message: 20, mailbox size: 5
Received message: 21, mailbox size: 4
Received message: 22, mailbox size: 3
Received message: 23, mailbox size: 2
Received message: 24, mailbox size: 1

Process finished with exit code 0

Akka is a great addition to Scala and has many other important features (e.g., Remote Actors). I encourage you to go to the Akka Documentation and read more about it.

Thats all!

Create a free website or blog at WordPress.com.