Akka toolkit introduction

Akka is an open-source toolkit and runtime simplifying the construction of concurrent and distributed applications on the JVM. Akka supports multiple programming models for concurrency, but it emphasizes actor-based concurrency, with inspiration drawn from Erlang. In this post we will create a service that will listen comma separated lines, the system must split every line and it will maintain on the hard drive an index of words with the number of times that every word has been sent.

Creating the project

We are going to use sbt which is a build tool for java and scala to manage our project, the first step is to create a directory named word-service and inside this directory a file build.sbt with the lines below:

    name := "Word service"

    version := "1.0"

    scalaVersion := "2.11.6"

    resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"


    libraryDependencies += "com.typesafe.akka" % "akka-actor_2.11" % "2.3.4"

After create build.sbt file, we need to create the directory src/main/scala in which We will store the Scala source files. Inside this directory we must create a file with name WordCount.scala, then We can add some initial imports to this file:

    import akka.actor._
    import akka.routing.RoundRobinRouter
    import akka.util.Duration
    import akka.util.duration._

Creating the messages

Proposed desing consists on one Master actor responsible for starting the computation by creating a set of Worker actors, then it splits the work into discrete chunks that will be sent to these Workers following a round robin policy. The master waits until all the workers have completed their work and sent back result for aggregation. When computation is completed the master sends the result to the Listener which is responsible for mantaining an filesystem index with every word.

In order to execute these calculations we will need to define four messages:

  • Calculate: sent to the Master actor to start the calculation.
  • Work: sent from the Master actor to the Worker actors containing the work assignment.
  • Result: sent from the Worker actors to the Master actor containing the result from the worker’s calculation.
  • Index: Sent from the Master actor to the Listener actor contaning an array of words and how long time the calculation took.

Case classes in scala are very useful to create immutable objects, so we will use them to create messages.

  sealed trait WordIndexMessage

  case object Calculate extends WordIndexMessage

  case class Work(line: String) extends WordIndexMessage

  case class Result(words: Array[String]) extends WordIndexMessage

  case class Index(words: ArrayBuffer[String], duration: Duration)

Creating the worker

Worker is created mixing the Actor trait and defining the receive method which defines the message handler.

  class Worker extends Actor {
      // count words
      def receive = {
        // ! means “fire-and-forget”. Also known as tell.
        case Work(line) => sender ! Result(splitLine(line)) // perform the work
      }

      def splitLine(line: String): Array[String] = {
        line.split(",");
      }
  }

Creating the master

Master Actor receives in its contructor 2 paremeters:

  • nrOfWorkers: defining how many workers we should start up.
  • itLines: String Iterator that contains the chunks to send out to the workers.

Master Actor recieves itLines iterator from some caller, then it creates nrOfWorkers and send lines to be processed, after process all lines it returns to the caller the result. The code below describes the Master actor:

  class Master(nrOfWorkers: Int, itLines: Iterator[String], listener: ActorRef)
    extends Actor {
    val messagesPercall = 100
    var words = ArrayBuffer.empty[String]
    val start: Long = System.currentTimeMillis
    var nrOfResults: Int = _
    var nrOfLines: Int = 0

    // create a round-robin router to make it easier to spread out the work between the workers
    val workerRouter = context.actorOf(
      Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), name = "workerRouter")


    def receive = {
      case Calculate =>
        //sent to the workers all lines in groups of nrOfLines items
        while (itLines.nonEmpty) {
          for (line <- itLines.take(messagesPercall)) {
            nrOfLines += 1
            workerRouter ! Work(line)
          }
        }

      case Result(value) =>
        words.appendAll(value)
        nrOfResults += 1
        if (nrOfResults == nrOfLines) {
          // Send the result to the listener
          listener !Index(words, duration = (System.currentTimeMillis - start).millis)
          // Stops this actor and all its supervised children
          context.stop(self)
        }
    }
  }

Creating the result listener

Result Listener receives the list of words produced by the workers and updates a file system based index:

   class Listener extends Actor {
    val rootDir = "/tmp/wordcount"
    var nrOfWords: Int = 1;
    var src: Source = _;

    def indexWord(word: String) {
      val filePath = Paths.get(rootDir, word)
      def computeValue() {
        def wordExist(): Boolean = {
          return Files.exists(filePath)
        }

        //If word file does not exists, create it
        if (!wordExist()) {
          Files.createFile(filePath)
          src = Source.fromFile(filePath.toString())
        } else {
          src = Source.fromFile(filePath.toString())
          nrOfWords = src.getLines().toList{0}.toInt
        }

        src.close()
      }

      def updateValue() {
        val file = new File(filePath.toString())
        val bw = new BufferedWriter(new FileWriter(file))
        bw.write(nrOfWords.toString())
        bw.close()
      }

      computeValue()
      updateValue()
    }

    def receive = {
      case Index(words, duration) =>
        words.foreach(indexWord)
        println("\nCalculation time: %s".format(duration))
        context.system.shutdown()
    }
  }

Bootstrap the calculation

We can create now a class named Indexer which will extend from the App trait in Scala, this class will be able to run from the command line:


    object Indexer extends App {

      calculate(nrOfWorkers = 4)

      // actors and messages ...
      def calculate(nrOfWorkers: Int) {

        //Open file with words
        val wordLines = Source.fromInputStream(getClass.getResourceAsStream("/words.txt")).getLines()

        // Create an Akka system
        val system = ActorSystem("WordCountSystem")

        // create the result listener, which will print the result and will shutdown the system
        val listener = system.actorOf(Props[Listener], name = "listener")

        // create the master
        val master = system.actorOf(Props(new Master(
          nrOfWorkers, wordLines, listener)),
          name = "master")

        // start the calculation
        master ! Calculate
      }
    }

Below the full source code:


    package com.notempo1320.demo

    import akka.actor._
    import akka.routing.RoundRobinRouter
    import scala.concurrent.duration._

    import java.nio.file.{Paths, Files}
    import java.io._

    import scala.collection.mutable.ArrayBuffer
    import scala.io.{Source, BufferedSource}


    object Indexer extends App {

      calculate(nrOfWorkers = 4)


      sealed trait WordIndexMessage

      case object Calculate extends WordIndexMessage

      case class Work(line: String) extends WordIndexMessage

      case class Result(words: Array[String]) extends WordIndexMessage

      case class Index(words: ArrayBuffer[String], duration: Duration)

      class Worker extends Actor {

          def splitLine(line: String): Array[String] = {
            line.split(",");
          }

          // count words
          def receive = {
            // ! means “fire-and-forget”. Also known as tell.
            case Work(line) => sender ! Result(splitLine(line)) // perform the work
          }

      }


      class Master(nrOfWorkers: Int, itLines: Iterator[String], listener: ActorRef)
        extends Actor {
        val messagesPercall = 100
        var words = ArrayBuffer.empty[String]
        val start: Long = System.currentTimeMillis
        var nrOfResults: Int = _
        var nrOfLines: Int = 0

        // create a round-robin router to make it easier to spread out the work between the workers
        val workerRouter = context.actorOf(
          Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), name = "workerRouter")


        def receive = {
          case Calculate =>
            //sent to the workers all lines in groups of nrOfLines items
            while (itLines.nonEmpty) {
              for (line <- itLines.take(messagesPercall)) {
                nrOfLines += 1
                workerRouter ! Work(line)
              }
            }

          case Result(value) =>
            words.appendAll(value)
            nrOfResults += 1
            if (nrOfResults == nrOfLines) {
              // Send the result to the listener
              listener !Index(words, duration = (System.currentTimeMillis - start).millis)
              // Stops this actor and all its supervised children
              context.stop(self)
            }
        }
      }



      class Listener extends Actor {
        val rootDir = "/tmp/wordcount"
        var nrOfWords: Int = 1;
        var src: Source = _;

        def indexWord(word: String) {
          val filePath = Paths.get(rootDir, word)
          def computeValue() {
            def wordExist(): Boolean = {
              return Files.exists(filePath)
            }

            //If word file does not exists, create it
            if (!wordExist()) {
              Files.createFile(filePath)
              src = Source.fromFile(filePath.toString())
            } else {
              src = Source.fromFile(filePath.toString())
              nrOfWords = src.getLines().toList{0}.toInt
            }

            src.close()
          }

          def updateValue() {
            val file = new File(filePath.toString())
            val bw = new BufferedWriter(new FileWriter(file))
            bw.write(nrOfWords.toString())
            bw.close()
          }

          computeValue()
          updateValue()
        }

        def receive = {
          case Index(words, duration) =>
            words.foreach(indexWord)
            println("\nCalculation time: %s".format(duration))
            context.system.shutdown()
        }
      }

      def calculate(nrOfWorkers: Int) {

        //Open file with words
        val wordLines = Source.fromInputStream(getClass.getResourceAsStream("/words.txt")).getLines()

        // Create an Akka system
        val system = ActorSystem("WordCountSystem")

        // create the result listener, which will print the result and will shutdown the system
        val listener = system.actorOf(Props[Listener], name = "listener")

        // create the master
        val master = system.actorOf(Props(new Master(
          nrOfWorkers, wordLines, listener)),
          name = "master")

        // start the calculation
        master ! Calculate
      }

    }

Compiling and running the code

    $sbt clean compile
    $sbt run

References