Gradient Descent Algorithm in Akka Toolkit

Gradient descent is an algorithm that minimizes functions, it starts at some initial random point, and iterates until it finds the smallest error margin. This algorithm is commonly used to calculate margin error on statistical anaylisis shuch as linear regression. In this post We will implement gradient descent algorithm using akka toolkit.

Linear Regresion and Cost Function

We can use standard equation y = mx + b to model our set of points, nevertheless, the precision of our model dependens of how accurate the values m and b are, in other words, we will need to find the best m and b values. A cost function J (θ0, θ1) is a function that is used to measure how good a line fits into the data.

In order to calculate the cost function we will iterate over all (x, y) points and we will sume the square distances between each point’s y value and the candidate line’s y value (computed at mx + b).

  case class Point(x: Double, y: Double)
  case class Intercept(theta0: Double, theta1: Double, cost: Double)

  //y = mx + b
  //hθ(x)= θ0 + θ1x
  def calculateCost(intercept: Intercept, numberList: List[Point]): Intercept = {
    // Calculte error for a particular set of θ0  and θ1
    var totalCost: Double = 0.0
    for (i <- 0 to numberList.size) {
      totalCost += math.pow( (numberList{i}.y - (intercept.theta1 *  numberList{i}.x + intercept.theta0)), 2)
    }
    val result = Intercept(intercept.theta0, intercept.theta1, totalCost / numberList.size)
    return result
  }

Gradient descent Algorithm

This Algorithm is used to minimize the cost function finding the the best values (θ0, θ1) for our standard equation hθ(x)= θ0 + θ1x .

Before start to code we will write the steps required to implement this algorithm:

  1. Select a random starting values θ0 and θ1.
  2. Take the gradient at your location.
  3. Move your location in the opposite direction of your gradient just a bit. Specifically, take your gradient and subtract some value alpha . This variable alpha is a small number that ideally should be passed by configuration so that would be easy change this value in order to adjust (tune) the algorithm.
  4. Repeat steps 2 and 3 until you’re satisfied and repeating them more doesn’t help you too much.

  def stepGradient(
    intercept: Intercept, numberList: List[Point], learningRate: Double): Intercept = {

    var theta0Gradient: Double = 0.0
    var theta1Gradient: Double = 0.0
    var x: Double = 0.0
    var y: Double = 0.0
    val N = numberList.size.toDouble

    for (i <- 0 to numberList.size) {
      x = numberList{i}.x
      y = numberList{i}.y
      //b gradient
      theta0Gradient += -(2/N) * (y - ((intercept.theta1 * x) + intercept.theta0))
      //m gradient
      theta1Gradient += -(2/N) * x * (y - ((intercept.theta1 * x) + intercept.theta0))
    }
    return Intercept(
      intercept.theta0 - (learningRate * theta0Gradient),
      intercept.theta1 - (learningRate * theta1Gradient), 0.0)
  }

  def calculateGradient(
    intercept: Intercept, numberList: List[Point],
    learningRate: Double, nrOfIterations: Int): Intercept = {

    log.info("Calculating gradient with starting values {} {}", intercept.theta0, intercept.theta1)
    var result: Intercept = intercept
    for (i <- 0 to nrOfIterations) {
      result = stepGradient(result, numberList, learningRate)
    }
    log.info("Gradient result {} {}", result.theta0, result.theta1)
    return result;
  }

Algorithm implementation using Akka toolkit

We are going to follow the steps described on my previous post akka toolkit introduction. so I won't cover basic steps such as intial setup.

In order implement the algorithm we will need to define 6 messages:

  • ComputeIntercept: sent to the Master actor to start the calculation.
  • CalculateGradient: sent from the Master actor to the Worker actors to compute the gradient.
  • CalculateCost: sent from the Master actor to the Worker actors to compute error cost.
  • GradientResult: sent from the Worker actors to the Master actor containing the result from the worker’s gradient calculation.
  • CostResult: sent from the Worker actors to the Master actor containing the result from the worker’s cost calculation.
  • InterceptResult: Sent from the Master actor to the Listener actor contaning values (θ0, θ1) for our standard equation hθ(x)= θ0 + θ1x , the cost of use these values and how long time the calculation took.

Below the complete implementation:


    package com.notempo1320

    import akka.actor._
    import akka.event.Logging
    import akka.routing.RoundRobinRouter

    import org.clapper.argot._
    import ArgotConverters._

    import scala.concurrent.duration._
    import scala.collection.mutable.ArrayBuffer
    import scala.io.{Source, BufferedSource}

    import java.nio.file.{Paths, Files}
    import java.io._

    case class Point(x: Double, y: Double)
    case class Intercept(theta0: Double, theta1: Double, cost: Double)

    object InterceptApp extends App {

      sealed trait InterceptMessage

      case object ComputeIntercept extends InterceptMessage

      case class CalculateCost(intercept: Intercept,
            numberList: List[Point]) extends InterceptMessage

      case class CostResult(intercept: Intercept) extends InterceptMessage

      case class CalculateGradient(
        intercept: Intercept, numberList: List[Point], learningRate: Double,
        nrOfIterations: Int) extends InterceptMessage

      case class GradientResult(intercept: Intercept) extends InterceptMessage

      case class InterceptResult(interceptResult: Intercept, duration: Duration)


      class Worker extends Actor {
          val log = Logging(context.system, this)

          //y = mx + b
          //hθ(x)= θ0 + θ1x
          def calculateCost(intercept: Intercept, numberList: List[Point]): Intercept = {
            // Calculte error for a particular set of θ0  and θ1
            var totalCost: Double = 0.0
            for (i <- 0 to numberList.size -1) {
              totalCost += math.pow( (numberList{i}.y - (intercept.theta1 *  numberList{i}.x + intercept.theta0)), 2)
            }
            val result = Intercept(intercept.theta0, intercept.theta1, totalCost / numberList.size)
            log.info("CalculateCost for intercept {} {} = {}", result.theta0, result.theta1, result.cost)
            return result
          }

          def stepGradient(
            intercept: Intercept, numberList: List[Point], learningRate: Double): Intercept = {

            var theta0Gradient: Double = 0.0
            var theta1Gradient: Double = 0.0
            var x: Double = 0.0
            var y: Double = 0.0
            val N = numberList.size.toDouble

            for (i <- 0 to numberList.size - 1) {
              x = numberList{i}.x
              y = numberList{i}.y
              //b gradient
              theta0Gradient += -(2/N) * (y - ((intercept.theta1 * x) + intercept.theta0))
              //m gradient
              theta1Gradient += -(2/N) * x * (y - ((intercept.theta1 * x) + intercept.theta0))
            }
            return Intercept(
              intercept.theta0 - (learningRate * theta0Gradient),
              intercept.theta1 - (learningRate * theta1Gradient), 0.0)
          }

          def calculateGradient(
            intercept: Intercept, numberList: List[Point],
            learningRate: Double, nrOfIterations: Int): Intercept = {

            log.info("Calculating gradient with starting values {} {}", intercept.theta0, intercept.theta1)
            var result: Intercept = intercept
            for (i <- 0 to nrOfIterations) {
              result = stepGradient(result, numberList, learningRate)
            }
            log.info("Gradient result {} {}", result.theta0, result.theta1)
            return result;
          }

          def receive = {
            // ! means “fire-and-forget”. Also known as tell.
            case CalculateGradient(intercept: Intercept, numberList: List[Point],
              learningRate: Double, nrOfIterations: Int) =>
              sender ! GradientResult(calculateGradient(intercept, numberList, learningRate, nrOfIterations))

            case CalculateCost(intercept: Intercept, numberList: List[Point]) =>
              sender ! CostResult(calculateCost(intercept, numberList))
          }

      }


      class Master(nrOfWorkers: Int, nrOfIterations: Int,
          learningRate: Double, numberList: List[Point], listener: ActorRef)
        extends Actor {
        val log = Logging(context.system, this)
        var initialIntercept: Intercept = _
        var partialResults = ArrayBuffer.empty[Intercept]
        // Every worker will perform nrOfIterations / nrOfWorkers
        var nrOfCalculations: Int = nrOfIterations / nrOfWorkers
        var nrOfResults: Int = 0
        var theta1: Double = 0.0
        val start: Long = System.currentTimeMillis

        override def preStart() = {
          log.info("Starting master")
          super.preStart()
        }

        // create a round-robin router to make it easier to spread out the work between the workers
        val workerRouter = context.actorOf(
          Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), name = "workerRouter")

        def generate_random_intercept_point(): Double = {
          // Semi random generation based on number of x,y points
          return (
            numberList.size.toDouble /
            ((util.Random.nextInt(numberList.size) + 1) * (numberList.size + util.Random.nextInt(numberList.size))))
        }


        def receive = {
          case ComputeIntercept =>
            for(i <- 1 to nrOfWorkers + 1) {
              //get random initial intercept
              initialIntercept = Intercept(
                generate_random_intercept_point(), generate_random_intercept_point(), 0.0)

              if (i == 1) {
                // First execution loop computes initial value of cost function
                log.info("CalculateCost call for initial intercept theta0:{} theta1:{}",
                  initialIntercept.theta0, initialIntercept.theta1)
                workerRouter ! CalculateCost(initialIntercept, numberList)
              }

              workerRouter ! CalculateGradient(
                initialIntercept, numberList, learningRate, nrOfIterations)
            }

          case GradientResult(intercept) =>
            // calculate cost for every result
            log.info("CalculateCost call for intercept theta0:{} theta1:{}", intercept.theta0, intercept.theta1)
            workerRouter ! CalculateCost(intercept, numberList)

          case CostResult(result) =>
            log.info("CostResult ...")
            partialResults.append(result)
            nrOfResults += 1
            if (nrOfResults == nrOfWorkers + 1) {
              // Find the point with minimum error
              var error: Double = partialResults{0}.cost
              var position: Int = 0
              for (i <-0 to partialResults.size -1) {
                log.info("partial cost {} i {}", partialResults{i}.cost, i)
                if (partialResults{i}.cost < error) {
                  error = partialResults{i}.cost
                  position = i
                }
              }

              // Send the result to the listener
              listener ! InterceptResult(
                partialResults{position}, duration = (System.currentTimeMillis - start).millis)

              // Stops this actor and all its supervised children
              context.stop(self)
            }
        }
      }

      class Listener extends Actor {
        val log = Logging(context.system, this)

        override def preStart() = {
          log.info("Starting listener")
          super.preStart()
        }

        def receive = {
          case InterceptResult(intercept: Intercept, duration: Duration) =>
            log.info("total duration {}", duration)
            log.info(
              "GradientDescent final result theta0: {} theta1: {} cost: {}",
              intercept.theta0, intercept.theta1, intercept.cost)
            context.system.shutdown()
        }
      }


      def calculate(nrOfWorkers: Int, nrOfIterations: Int, learningRate: Double, fileName: String) {
        val numberList = Source.fromFile(fileName).getLines().map(
          s => s.split(",")).map(x => Point( x{0}.toDouble, x{1}.toDouble)).toList

        // Create an Akka system
        val system = ActorSystem("GradientDescentSystem")

        // create the result listener, which will log the result and will shutdown the system
        val listener = system.actorOf(Props[Listener], name = "listener")

       // create the master
        val master = system.actorOf(Props(new Master(
          nrOfWorkers, nrOfIterations, learningRate, numberList, listener)),
          name = "master")

        // start the calculation
        master ! ComputeIntercept

      }

       // Main program
      override def main(args: Array[String]) {
        val parser = new ArgotParser("gradient", preUsage=Some("Version 1.0"))
        val fileName = parser.option[String](List("f", "filename"), "filename", "filename")
        val nrOfWorkers = parser.option[Int](List("w", "workers"), "workers", "workers")
        val nrOfIterations = parser.option[Int](List("i", "iterations"), "itetarions", "iterations")
        val learningRate = parser.option[Double](List("r", "rate"), "rate", "rate")
        parser.parse(args)
        calculate(
          nrOfWorkers=nrOfWorkers.value.get, nrOfIterations=nrOfIterations.value.get,
          learningRate=learningRate.value.get, fileName=fileName.value.get)
      }
    }

Running the application

I am going to use the same dataset that I used on a previous post about linear regression . In that post, I used Octave to compute the cost function for my dataset and I got the results θ0 = 2.1368e+05 and θ1x = 2.3589e+03.

To run the akka application as a console command all you have to do is call it usign sbt

 sbt "run --rate 0.0002  --iterations 200000 --workers 4 --filename ../total_by_period.csv"

I tried different parameters combinations and this was the result that I got:

Learning Rate Number of Iterations Number of Workers θ0 Result θ1 Result Cost Execution time
0.0001 1000 2 13444.522450507255 15733.153261356721 1.4915513928506067E10 82 milliseconds
0.0001 10000 2 96714.66045646855 10171.263617220522 6.991494692323004E9 170 milliseconds
0.0001 10000 4 96714.64937053746 10171.264357686756 6.9914914322857065E9 290 milliseconds
0.0002 10000 4 149323.8010837764 6657.324029165672 4.129770277424713E9 250 milliseconds
0.0001 100000 4 213137.42909120218 2394.999439961037 2.8873926496464314E9 795 milliseconds
0.0002 100000 4 213677.0403639862 2358.957007201501 2.887304849208148E9 854 milliseconds
0.0002 200000 4 213678.41665775442 2358.865079960366 2.887304848639881E9 997 milliseconds
0.0002 400000 4 213678.41666654532 2358.8650793731726 2.887304848639881E9 861 milliseconds

The data above showed me that this algorithm works better if I increase the number of iterations and after 20000 iterations there are not significants improvements. Maybe the outcome could be different with another set of data. At this You can find the source code of this example

References