Extracting data from twitter with Scala, Akka Tookit and Reactive Streams

Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure, akka toolkit team is working on an implementation of this standard. At the moment of write this post (June 2015) the api is still experimental but functional. This post is about how to extract information from twitter using scala, akka toolkit and reactive streams.

Interacting with Tiwtter

There is not an official Twitter client for scala, but We can use Twitter4j that is an unofficial java client that get the job done easy, below, an example about how you can use Twitter4j in scala:

    import twitter4j.TwitterFactory
    import twitter4j.Twitter
    import twitter4j.conf.ConfigurationBuilder

    ...
    ...

    val cb = new ConfigurationBuilder()
    cb.setDebugEnabled(true)
      .setOAuthConsumerKey("YOUR KEY HERE")
      .setOAuthConsumerSecret("YOUR SECRET HERE")
      .setOAuthAccessToken("YOUR ACCESS TOKEN")
      .setOAuthAccessTokenSecret("YOUR ACCESS TOKEN SECRET")
    val tf = new TwitterFactory(cb.build())
    val twitter = tf.getInstance()
 

We will build an command line application that will search tweets and hastags by some term and it will process this information.

Akka streams

At the moment of publication of this post (June 2015), akka streams is still experimental so in order to use it We will need to add experimental repository to our sbt config file:

    "com.typesafe.akka" % "akka-stream-experimental_2.11" % "1.0-RC3"

Basic concepts

There are some basic concepts taken from experimental akka stream documentation that are very important as they are required to undestand in order to implement reactive streams applications sucessfully:

  • Source: A processing stage with exactly one output emitting data elements whenever downstream processing stages are ready to receive them.
  • Sink: A processing stage with exactly one input, requesting and accepting data elements possibly slowing down the upstream producer of elements.
  • Flow: A processing stage which has exactly one input and output, which connects its up- and downstreams by transforming the data elements flowing through it.

Building and Running a Flow from a String

In order to run a Stream We need at least one input Source and one output Sink. In the example below, We create a Source from a String iterator created when We split a string. It is possible manipulate a source as We can see in this example, We remove extra blank spaces with trim and then we convert every line to uppercase. A Source for itself can't do nothing, a Flow or a Sink is required to continue the computation, in the example code below we send the strings processed by the Source to a Sink using the method runForeach :

    implicit val system = ActorSystem("Sys")
    import system.dispatcher

    implicit val materializer = ActorFlowMaterializer()

    val text =
      """I saw the best minds of my generation destroyed by madness,
      starving hysterical naked,
      dragging themselves through the negro streets at dawn looking for an angry fix,
      angel headed hipsters burning for the ancient heavenly connection to the starry dynamo in the machinery of night"""

    //build a source from an iterator
    Source(() => text.split(",").iterator)
      //transform every line
      .map(s => s.trim())
      .map(_.toUpperCase)
      //send every line to a sink
      .runForeach(s => println( s + "\n") )
      //shutdown when finish
      .onComplete(_ => system.shutdown())

Building a simple Graph

Graphs are useful when we want to perform any kind of fan-in ("multiple inputs") or fan-out ("multiple outputs") operations. In the code below We will use the source that We created before and We will send every line to a text file and to the console:

    implicit val system = ActorSystem("Sys")
    import system.dispatcher

    implicit val materializer = ActorFlowMaterializer()

    val text =
      """I saw the best minds of my generation destroyed by madness,
      starving hysterical naked,
      dragging themselves through the negro streets at dawn looking for an angry fix,
      angel headed hipsters burning for the ancient heavenly connection to the starry dynamo in the machinery of night"""



    // take a ByteString as input and write to disk
    val fileSink = SynchronousFileSink(new File("/tmp/outscala.txt"), true)

    // take a String as input and print it to the console
    val consoleSink = Sink.foreach(println)

    //takes a string and convert it to ByteString
    val flowConverter = Flow[String].map(s => ByteString.fromString(s))

    val g = FlowGraph.closed(fileSink, consoleSink)((_,cons) => cons) { implicit builder =>

      (f, console) =>
      import FlowGraph.Implicits._

      //build a source from an iterator
      val in = Source(List(text))
        //transform every line
        .map(s => s.trim())
        .map(_.toUpperCase)


      //Broadcast that take 1 entry and send it to 2 outputs
      val bcast = builder.add(Broadcast[String](2))

      in ~> bcast
            bcast ~> flowConverter ~> f
            bcast ~> console
    }.run()

    g.onComplete {
      case Success(_) =>
        system.shutdown()
      case Failure(e) =>
        println(s"Failure: ${e.getMessage}")
        system.shutdown()
    }

Reactive tweets

Our goal is to build an akka application that uses reactive streams to get tweets from twitter and classify them by author and content. To do that, We will create a tweets source quering data from twitter and We will mantain 3 indexes: tweets, authors, hashtags. A simplfied draw of the process would look like this:



                                             +----------+
                                    +------- | Authors  |
                                    |        +----------+
                                    |
                                    |
                               +---------+
    +----------------+         | Tweets  |
    | Twitter Source |---------+---------+
    +----------------+              |
                                    |    +-----------+
                                    +----| Hashtags  |
                                         +-----------+

Configuring the client

We will define access keys and tokens as envrioment variables:

  def getClient(): Twitter = {
    val api_key = sys.env("REACTIVE_TWITTER_API_KEY")
    val api_secret = sys.env("REACTIVE_TWITTER_API_SECRET")
    val access_token = sys.env("REACTIVE_TWITTER_ACCESS_TOKEN")
    val access_token_secret = sys.env("REACTIVE_TWITTER_ACCESS_TOKEN_SECRET")
    val cb = new ConfigurationBuilder()
    cb.setDebugEnabled(true)
      .setOAuthConsumerKey(api_key)
      .setOAuthConsumerSecret(api_secret)
      .setOAuthAccessToken(access_token)
      .setOAuthAccessTokenSecret(access_token_secret)
    val tf = new TwitterFactory(cb.build())
    tf.getInstance()
  }

Building the source

    val client = getClient()
    val query = new Query(term);
    val tweets = Source( () => client.search(query).getTweets.asScala.iterator)

Building the Graph

    val tweetPath = Paths.get(dataDir, "tweets.txt")
    val tweetSink = SynchronousFileSink(new File(tweetPath.toString), true)
    val authorPath = Paths.get(dataDir, "authors.txt")
    val authorSink = SynchronousFileSink(new File(authorPath.toString), true)
    val hashtagPath = Paths.get(dataDir, "hashtags.txt")
    val hashtagSink = SynchronousFileSink(new File(hashtagPath.toString), true)

    val tweetFlow = Flow[Status]

    val twitterGraph = FlowGraph.closed(tweets, tweetSink)( (tweets, tSink) => tSink) { implicit builder =>
      (in, tSink) =>
      import FlowGraph.Implicits._

    val DELIMITER = "==="

      //Broadcast that take 1 entry and send it to 2 outputs
      val bcast = builder.add(Broadcast[Status](3))

      in ~> bcast
            //bcast ~> tweetToStrFlow ~> autSink
            bcast ~> tweetFlow
              .map(t => ByteString.fromString(
                t.getId + DELIMITER  + t.getText + DELIMITER + t.getSource + "\n")) ~> tSink

            bcast ~> tweetFlow
            .map(t => ByteString.fromString(
              t.getId + DELIMITER + t.getUser.getId + DELIMITER
              + DELIMITER + t.getUser.getScreenName + DELIMITER + t.getUser.getName + "\n")) ~> authorSink

            bcast ~> tweetFlow
              .filter(t => t.getHashtagEntities.length > 0 )
              .map(t => {
                var lineList = ArrayBuffer.empty[String]
                lineList.append(t.getId + DELIMITER)
                t.getHashtagEntities.foreach(h => lineList.append(h.getText + DELIMITER))
                lineList.append("\n")
                ByteString.fromString(lineList.mkString(""))
              }) ~> hashtagSink
    }.run(

Running the code

First thing required is to set up env variables with twitter access values:

    export REACTIVE_TWITTER_API_KEY=API_KEY
    export REACTIVE_TWITTER_API_SECRET=API_SECRET
    export REACTIVE_TWITTER_ACCESS_TOKEN=ACCESS_TOKEN
    export REACTIVE_TWITTER_ACCESS_TOKEN_SECRET=TOKEN_SECRET

Then You can execute sbt :

    sbt clean "run --dataDir /tmp --query internet&colombia"

Source code can be found at this link

References