Akka http

Last time we talked about routing within Akka. This time we will be looking at Akka’s support for http.

But just before that, a bit of history. Before Akka.Http there was already a fairly successful Akk based http option available to you as a Scala developer, called Spray. There is a lot of Spray documentation available here http://spray.io/

This framework was extremely well thought of, so much so that the good people at Akka have taken on much of the good work done by this team, and it now forms much of the codebase for Akka Http.

In fact if you are familiar with Spray, you will certainly notice quite a lot of similarities in the way routes and JSON are handled in Akka.Http, as it is pretty much the Spray code.

 

Introduction

Akka.Http comes with server side and client side libraries. It also comes with good support for standard serialization such as JSON/XML and the ability to roll your own serialization should you want to.

It also comes with a fairly nifty routing DSL which is very much inspired by the work done in Spray.

This post will concentrate on the common use cases that you may come across when working with HTTP.

 

SBT Dependencies

As usual we need to make sure we have the correct JARs referenced. So here is the SBT file that I am using for both the server side/client side and common messages that pass between them

import sbt._
import sbt.Keys._


lazy val allResolvers = Seq(
  "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
  "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/"
)


lazy val AllLibraryDependencies =
  Seq(
    "com.typesafe.akka" % "akka-actor_2.11" % "2.4.12",
    "com.typesafe.akka" % "akka-http_2.11" % "3.0.0-RC1",
    "com.typesafe.akka" % "akka-http-core_2.11" % "3.0.0-RC1",
    "com.typesafe.akka" % "akka-http-spray-json_2.11" % "3.0.0-RC1"
  )


lazy val commonSettings = Seq(
  version := "1.0",
  scalaVersion := "2.11.8",
  resolvers := allResolvers,
  libraryDependencies := AllLibraryDependencies
)


lazy val serverside =(project in file("serverside")).
  settings(commonSettings: _*).
  settings(
    name := "serverside"
  )
  .aggregate(common, clientside)
  .dependsOn(common, clientside)

lazy val common = (project in file("common")).
  settings(commonSettings: _*).
  settings(
    name := "common"
  )

lazy val clientside = (project in file("clientside")).
  settings(commonSettings: _*).
  settings(
    name := "clientside"
  )
  .aggregate(common)
  .dependsOn(common)

It can be seen that the JSON dependency is contained in this JAR

akka-http-spray-json_2.11

Told you is was inspired by Spray a fair bit

 

Server Side

This section will talk about the server side element of Akka.Http

 

Hosting The Service

To have  a correctly formed/hostable server side we need a couple of things in place, namely the following

  • An actor system
  • A materializer (Akka http uses flows which is the subject of the next and final post)
  • An execution context
  • Routing

Once we have these things it is really just a question of binding the route to a host name and port.

Shown below is a barebones skeleton of what this may look like

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{ContentTypes, HttpEntity}
import akka.http.scaladsl.server.Directives
import akka.stream.scaladsl.Flow
import common.{Item, JsonSupport}
import scala.io.StdIn
import scala.concurrent.Future
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.stream._
import akka.stream.scaladsl._


object Demo extends App with Directives with JsonSupport {

  implicit val system = ActorSystem("my-system")
  implicit val materializer = ActorMaterializer()


  val route = .....

  val (host, port) = ("localhost", 8080)
  val bindingFuture = Http().bindAndHandle(route, host, port)

  bindingFuture.onFailure {
    case ex: Exception =>
      println(s"$ex Failed to bind to $host:$port!")
  }

  println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
  StdIn.readLine() // let it run until user presses return
  bindingFuture
    .flatMap(_.unbind()) // trigger unbinding from the port
    .onComplete(_ => system.terminate()) // and shutdown when done
}

We will be looking at the routing DSL separately

 

Routing DSL

As stated, Akka.Http owes much to Spray, and the routing DSL in particular is practically unchanged from Spray, so it is well worth reading the Spray routing documentation which is available here : http://spray.io/documentation/1.2.4/spray-routing/ and for completeness here is the Akka.Http docs link too : http://doc.akka.io/docs/akka/2.4.7/scala/http/introduction.html#routing-dsl-for-http-servers

There is way too many possible routes to go into for a single post. Lets consider a few basic examples and deconstruct them

Some of these examples do rely on JSON which is the next topic, so for now just understand that there is a way to accept/return JSON.

Lets consider the following use cases

  • GET that returns a simple string
  • GET that returns a JSON representation of an Item
  • POST that accept a new Item

In all these cases this is what an Item looks like

package common

final case class Item(name: String, id: Long)

So lets see the routing DSL that makes the above examples work

val route =
  path("hello") {
    get {
      complete(HttpEntity(
	ContentTypes.`text/html(UTF-8)`, 
	"<h1>Say hello to akka-http</h1>"))
    }
  } ~
  path("randomitem") {
    get {
      // will marshal Item to JSON
      complete(Item("thing", 42))
    }
  } ~
  path("saveitem") {
    post {
      // will unmarshal JSON to Item
      entity(as[Item]) { item =>
        println(s"Server saw Item : $item")
        complete(item)
      }
    }
  }

It can be seen that there are some common routing DSL bits and bobs in there, such as:

  • path : which satisfies the route name part of the route
  • get : which tells us that we should go further into the route matching if it’s a GET http request and it matched the path route DSL part
  • post: which tells us that we should go further into the route matching if it’s a POST http request and it matched the path route DSL part
  • complete : This is the final result from the route

These parts of the DSL are known as directives. The general anatomy of a directive is as follows:

name(arguments) { extractions =>
  ... // inner route
}

It has a name, zero or more arguments and optionally an inner route (The RouteDirectives are special in that they are always used at the leaf-level and as such cannot have inner routes). Additionally directives can “extract” a number of values and make them available to their inner routes as function arguments. When seen “from the outside” a directive with its inner route form an expression of type Route.

Taken from http://doc.akka.io/docs/akka/2.4.7/scala/http/routing-dsl/directives/index.html#directives up on date 15/11/16

What Directives Do?

A directive can do one or more of the following:

  • Transform the incoming RequestContext before passing it on to its inner route (i.e. modify the request)
  • Filter the RequestContext according to some logic, i.e. only pass on certain requests and reject others
  • Extract values from the RequestContext and make them available to its inner route as “extractions”
  • Chain some logic into the RouteResult future transformation chain (i.e. modify the response or rejection)
  • Complete the request

 

This means a Directive completely wraps the functionality of its inner route and can apply arbitrarily complex transformations, both (or either) on the request and on the response side.

Ok so now that we have taken a whistle stop tour of the routing DSL and directives, lets have a look at the few we discussed above

 

For this work I would strongly recommend the use of the “Postman” google app, which you can grab from here

https://chrome.google.com/webstore/detail/postman/fhbjgbiflinjbdggehcddcbncdddomop?hl=en

GET

We can see this route looks like this

path("hello") {
  get {
    complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "<h1>Say hello to akka-http</h1>"))
  }
}

So we use the path, and also the get directives to establish a get route. We then use complete to complete the route with some static string representing the html we would like to return

So let’s see this one in postman

image

 

GET Item (as JSON)

We can see this route looks like this

path("randomitem") {
  get {
    // will marshal Item to JSON
    complete(Item("thing", 42))
  }
} 

So again we use the path/get directives, but this time we complete with an Item. This is done due to the JSON support that is able to create the right serialization data for us. We will look at this in the next section

So let’s see this one in postman

image

POST Item

We can see this route looks like this

path("saveitem") {
  post {
    // will unmarshal JSON to Item
    entity(as[Item]) { item =>
      println(s"Server saw Item : $item")
      complete(item)
    }
  }
} 

So again we use the path directive, but this time we use a post, where the post expects an item as JSON to be provided. The converting from the incoming JSON string to an Item is done using an Unmarshaller, we will look at this in the next section

So let’s see this one in postman

image

 

JSON Support

Akka.http provides JSON support using this library akka-http-spray-json-experimental which you can grab from Maven Central Repo.

JsonProtocol

When using spray we may use the SprayJsonProtocol and DefaultJsonProtocol to create the JSON protcol for your custom objects

Lets consider the Item class we have seen in the demos so far

package common

final case class Item(name: String, id: Long)

This is how we might write the JSON protocol code for this simple class

package common

import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import spray.json.DefaultJsonProtocol

trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
  implicit val itemFormat = jsonFormat2(Item)
}

It can be seen that there are jsonFormatXX helpers that can be used for very simple cases. In this case jsonFormat2 is used as our item class had 2 parameters

Most of the time this inbuilt helpers are all we need. If however you want something more elaborate you are free to create your own jsonFormat read / write methods

 

Marshalling

Marshalling is sprays process of taking objects and create a JSON string representation of them to send across the wire.

The Akka Spray JAR comes with a bunch of default marshallers that allow us to take custom classes and turn them into JSON

These are the most common default marshallers that you will most likely use

type ToEntityMarshaller[T] = Marshaller[T, MessageEntity]
type ToHeadersAndEntityMarshaller[T] = Marshaller[T, (immutable.Seq[HttpHeader], MessageEntity)]
type ToResponseMarshaller[T] = Marshaller[T, HttpResponse]
type ToRequestMarshaller[T] = Marshaller[T, HttpRequest]

You can read more about this here : http://doc.akka.io/docs/akka/2.4.7/scala/http/common/marshalling.html

Luckily you don’t really have to get that involved with these that often as the routing DSL does most of the heavy lifting for you when you do the complete this is taken care of for you providing there is a marshaller that can be found implicitly

Unmarshalling

Unmarshalling is the process of taking the on the wire format (JSON string in these examples) back into a scala class (Item class in this case)

You can read more about this at the official Akka docs page : http://doc.akka.io/docs/akka/2.4.7/scala/http/common/unmarshalling.html

Luckily you don’t really have to get that involved with these that often as the routing DSL does most of the heavy lifting for you, which is what we use this part of the routing DSL, where this will use an unmarshaller to create the Item from the JSON string on the wire

entity(as[Item]) { item =>

WebSockets

Akka Http also supports web sockets too. Lets start this investigation with looking at what is required from the routing DSL perspective, which starts like this

path("websocket") {
  get {
    handleWebSocketMessages(websocketFlow)
  }
} ~

If we look at this special directive a bit more, what exactly does the handleWebSocketMessages directive look like

Well it looks like this:

def handleWebSocketMessages(handler: Flow[Message, Message, Any]): Route

So we need to supply a flow. A Flow is part of akka reactive streams which will look at in the next part. But for now just be aware that you can create a Flow from a Sink/Source and Materializer to materialize the flow.

For this websocket example here is what the Flow looks like

val (websocketSink, websocketSource) =
  MergeHub.source[String].toMat(BroadcastHub.sink[String])(Keep.both).run()

val websocketFlow: Flow[Message, Message, NotUsed] =
  Flow[Message].mapAsync(1) {
    // transform websocket message to domain message (string)
    case TextMessage.Strict(text) =>       Future.successful(text)
    case streamed: TextMessage.Streamed => streamed.textStream.runFold("")(_ ++ _)
  }.via(Flow.fromSinkAndSource(websocketSink, websocketSource))
    .map[Message](string => TextMessage(string))

The idea is that when a websocket client connects and sends an initial message they will get a reply TextMessage sent over the websocket to them

This uses some pretty new akka stream stages namely

  • MergeHub : Creates a Source that emits elements merged from a dynamic set of producers.
  • Broadcast : Emit each incoming element each of n outputs

 

Lets start by running the server, and then opening the “WebSocketTestClient.html” page which should look like this

image

image

Once the page is open, type something in the textbox and hit the “Send” button, you should see this

image

All fairly normal socket type stuff so far, we send a message from the web page client side to the server and the server responds with the text we sent.

But what about if we wanted to send message to the client on demand, say from another route which could be a command to do some work, which notifies the clients of the websocket?

With this Flow in place, we are also able to push back messages to the client end of the websocket.

Lets see another route which will simulate some work, which results in messages being sent down the websocket back to the client (if its still connected)

Here is the route

path("sendmessagetowebsocket" / IntNumber) { msgCount =>
  post {
    for(i <- 0 until msgCount)
    {
      Source.single(s"sendmessagetowebsocket $i").runWith(websocketSink)
    }
    complete("done")
  }
}

It can be seen that we simply create a new source which is run with the existing Sink that was part of the Flow used by the websocket

Here is what this would look like in postman

image

And here is what the web page client side websocket example looks like after this route has been called as above

image

 

 

Client Side

Akka http support comes with 3 types of client API that one can use

In this article I will only be using the last of these APIs, as in my opinion it is the most sensible client side choice.

So what does the request level client API look like.

GET

If we consider that we want to conduct this request

http://localhost:8080/randomitem

which when run via postman gives the following JSON response

image

So lets see what the code looks like to do this using the request level client API

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import scala.concurrent.{Await, Future}
import concurrent.ExecutionContext.Implicits.global
import common.{Item, JsonSupport}
import concurrent.duration._
import scala.io.StdIn

class RegularRoutesDemo extends JsonSupport {

  def Run() : Unit = {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()

    val httpClient = Http().outgoingConnection(host = "localhost", port = 8080)

    //+++++++++++++++++++++++++++++++++++++++++++++++
    // GET http://localhost:8080/randomitem
    //+++++++++++++++++++++++++++++++++++++++++++++++
    val randomItemUrl = s"""/randomitem"""
    val flowGet : Future[Item] =
      Source.single(
        HttpRequest(
          method = HttpMethods.GET,
          uri = Uri(randomItemUrl))
        )
        .via(httpClient)
        .mapAsync(1)(response => Unmarshal(response.entity).to[Item])
        .runWith(Sink.head)
    val start = System.currentTimeMillis()
    val result = Await.result(flowGet, 5 seconds)
    val end = System.currentTimeMillis()
    println(s"Result in ${end-start} millis: $result")

  }
}

There are a couple of take away points in the code above

  • We use a Source which is a HttpRequest, where we can specify the HTTP verb and other request type things
  • We use Unmarshal to convert the incoming JSON string to an Item. We discussed Marshalling/Unmarshalling above.
  • This obviously relies on the Spray JSON support that we discussed above

 

POST

If we consider that we want to conduct this request

http://localhost:8080/saveitem

which when run via postman gives the following JSON response

image

So lets see what the code looks like to do this using the request level client API

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import scala.concurrent.{Await, Future}
import concurrent.ExecutionContext.Implicits.global
import common.{Item, JsonSupport}
import concurrent.duration._
import scala.io.StdIn

class RegularRoutesDemo extends JsonSupport {

  def Run() : Unit = {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()

    val httpClient = Http().outgoingConnection(host = "localhost", port = 8080)

    //+++++++++++++++++++++++++++++++++++++++++++++++
    // POST http://localhost:8080/saveitem
    //+++++++++++++++++++++++++++++++++++++++++++++++
    val saveItemUrl = s"""/saveitem"""
    val itemToSave = Item("newItemHere",12)
    val flowPost = for {
      requestEntity <- Marshal(itemToSave).to[RequestEntity]
      response <-
      Source.single(
        HttpRequest(
          method = HttpMethods.POST,
          uri = Uri(saveItemUrl),
          entity = requestEntity)
        )
        .via(httpClient)
        .mapAsync(1)(response => Unmarshal(response.entity).to[Item])
        .runWith(Sink.head)
    } yield response
    val startPost = System.currentTimeMillis()
    val resultPost = Await.result(flowPost, 5 seconds)
    val endPost = System.currentTimeMillis()
    println(s"Result in ${endPost-startPost} millis: $resultPost")
  }
}

The only thing that is different this time, is that we need to pass a JSON string representation of an Item which we pass to the HttpRequest.

This is done use a JSON marshaller which must be in scope implicitly.

 

Where Can I Find The Code Examples?

I will be augmenting this GitHub repo with the example projects as I move through this series

https://github.com/sachabarber/SachaBarber.AkkaExamples

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: