MadCapIdea

MADCAP IDEA PART 8 : INTERMEDIATE STEP, REST API FOR INTERACTIVE KAFKA STREAM KTABLE QUERIES

Last Time

 

So this one has taken a very long time to get up, I have been on a few weeks holiday and a few other things cropped up. This is also an intermediate post that the next one will build on, I just thought it might be useful to show this one in isolation, before we move on to the real one which is to allow interactive queries over live Kafka streams. In order to carry out the live queries we need to expose a REST endpoint over the Kafka stream nodes, which will be the focus of the next article. This one will focus on a simple REST example in Scala.

 

PreAmble

Just as a reminder this is part of my ongoing set of posts which I talk about here :

https://sachabarbs.wordpress.com/2017/05/01/madcap-idea/, where we will be building up to a point where we have a full app using lots of different stuff, such as these

 

  • WebPack
  • React.js
  • React Router
  • TypeScript
  • Babel.js
  • Akka
  • Scala
  • Play (Scala Http Stack)
  • MySql
  • SBT
  • Kafka
  • Kafka Streams

 

Ok so now that we have the introductions out of the way, lets crack on with what we want to cover in this post. Which is how to create a simple Client/Server REST API in Scala.

 

There are several choices available, that I wanted to consider

 

I did actually try out all 3 of these, with varying degrees of success. I am using Scala 2.12 and IntelliJ IDEA 17. So even though I settled on Akka Http (as I have used it before and it just works), I thought it may be useful to show a simple hello world of the server examples in the other 2

 

Finch Server example

So this would be a simple REST server written in Finch

 

import io.finch._
import com.twitter.finagle.Http

val api: Endpoint[String] = get("hello") { Ok("Hello, World!") }

Http.server.serve(":8080", api.toServiceAs[Text.Plain])

 

 

The main points that I liked about using Finch were

  • Its functional
  • I liked the ScalaZ composable style endpoints
  • It had good JSON support (Circe, Argonaut, Jackson, Json4s, PlayJson)
  • It was very concise

The reason I chose not to use it

  • The endpoint composition seems to get very confused in IntelliJ IDEA and required you to resort to use SBT command line, and IntelliJ IDEA would still complain about the syntax
  • No client library

 

 

HTTP4S Server/Client example

So this would be a simple REST server/client written in Http4s

 

SBT

you will need a SBT file something like this, I opted to go for a version that was considered stable, not the current develop version

name := "SImpleRestApi"

version := "1.0"

scalaVersion := "2.12.1"

val http4sVersion = "0.15.16"


libraryDependencies ++= Seq(
  "org.http4s" %% "http4s-dsl"          % http4sVersion,
  "org.http4s" %% "http4s-blaze-server" % http4sVersion,
  "org.http4s" %% "http4s-blaze-client" % http4sVersion,
  "org.http4s" %% "http4s-circe"        % http4sVersion,
  "io.circe"   %% "circe-generic"       % "0.6.1",
  "io.circe"   %% "circe-literal"       % "0.6.1"

)
        

 

Common Entities

case class User(name: String)
case class Hello(greeting: String)

 

The Server

This is a simple http4s server that will accept a “User” case class JSON payload on the “hello” route, and will return a “Hello” case class as JSON

import Entities.{Hello, User}
import io.circe.generic.auto._
import io.circe.syntax._
import org.http4s._
import org.http4s.circe._
import org.http4s.dsl._
import org.http4s.server.blaze._

object RestServer extends App {

  run()


  def run() : Unit = {

    val jsonService = HttpService {
      case req @ POST -> Root / "hello" =>
        for {
        // Decode a User request
          user <- req.as(jsonOf[User])
          // Encode a hello response
          resp <- Ok(Hello(user.name).asJson)
        } yield (resp)

      case req @ GET -> Root / "test" =>
        for {

          resp <- Ok(Hello("Monkey").asJson)
        } yield (resp)

    }

    val builder = BlazeBuilder.bindHttp(8080).mountService(jsonService, "/")
    val blazeServer = builder.run

    scala.io.StdIn.readLine()
    ()
  }

}

 

There are a couple of things to note here

  • It uses Blaze server
  • The routes are constructed in a pretty intuitive manner
  • The JSON encoding/decoding is done seamlessly and did not require anything other than including circle.generic (this is nice)

 

 

The Client

 

import Entities.{Hello, User}
import org.http4s.client.blaze.PooledHttp1Client
import scalaz.concurrent.Task
import io.circe.generic.auto._
import io.circe.syntax._
import org.http4s.circe._
import org.http4s.dsl._
import org.http4s.client._


object RestClient extends App {

  val httpClient = PooledHttp1Client()

  run()


  def run() : Unit = {

    val hello = helloClient("foo").run
    println(s"Hello ${hello}")
    scala.io.StdIn.readLine()
    ()
  }

  def helloClient(name: String): Task[Hello] = {
    // Encode a User request
    val req = POST(uri("http://localhost:8080/hello"), User(name).asJson)
    // Decode a Hello response
    httpClient.expect(req)(jsonOf[Hello])
  }
}

The client API is quite nice and required very little code, and just like the server portion the encoding/decoding to/from JSON is seamless. I also like the use of scalaz.concurrent.Task to manage the asynchronisity

 

 

The main points that I liked about using http4s were

  • Its functional
  • It looked like a nice API to use
  • It had good JSON support (Circe, Argonaut, Json4s)
  • It was very concise
  • It had a server API + client API

The reason I chose not to use it

  • It took me a fair while to get the client to work, due to the documentation being a mish mash of versions where they have not kelp documentation for specific versions. So I had to dig around quite a bit to get the version I was working for to work. Once it did work I was happy with it though
  • Documentation is clearly a mish mash of different version, this could be better

 

So that really only left me one choice, the tried and tested but rather unadventurous Akka-Http, so lets look at that next

 

Akka Http Server/Client example

So this would be a simple REST server/client written in Akka Http

 

SBT

This is the SBT file that I am using

name := "AkkaHttpRESTExample"

version := "0.1"

scalaVersion := "2.12.3"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-http-spray-json" % "10.0.9",
  "com.typesafe.akka" %% "akka-http" % "10.0.9"
)
        

 

Common Entities

These are the common entities.

package Entities

import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json.DefaultJsonProtocol._

final case class Item(name: String, id: Long)
final case class Order(items: List[Item])

object DomainEntitiesJsonFormats {
  implicit val itemFormat = jsonFormat2(Item)
  implicit val orderFormat = jsonFormat1(Order)
}

 

There are a couple of things to note above.

  • We need to use the JSON marshallers/unmarshallers jsonFormat2 / jsonFormat1 that are available within the spray.json package. These represent JSON marshallers/unmarshallers for case class(es) with 2 and 1 parameters respectively
  • That the actual entities are simple case class(es)

 

 

The Server

This is a simple server

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import scala.io.StdIn


import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json.DefaultJsonProtocol._
import Entities.DomainEntitiesJsonFormats._
import Entities._

object RestServer extends App {

  run()


  def run() {
    val itemNameRegexPattern =  """\w+""".r
    implicit val system = ActorSystem("my-system")
    implicit val materializer = ActorMaterializer()
    implicit val executionContext = system.dispatcher

    val route =
      path("hello") {
        get {
          complete(HttpEntity(ContentTypes.`text/html(UTF-8)`,
            "<h1>Say hello to akka-http</h1>"))
        }
      }~
      path("order" / itemNameRegexPattern) { itemName =>
        get {
          complete(Order(List[Item](Item(itemName, 1))))
        }
      }

    val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)
    println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")

    Runtime.getRuntime.addShutdownHook(new Thread(() => {
      bindingFuture
        .flatMap(_.unbind()) // trigger unbinding from the port
        .onComplete(_ => system.terminate()) // and shutdown when done
    }))

    StdIn.readLine() // let it run until user presses return
  }
}

The main points to note above are:

  • How the routing DSL works, and we can build multiple routes which are chained together
  • How we send JSON payload back using the complete routing directive
  • How we bind the server to the port and address

 

The Client

This is a client that goes hand in hand with the server code above

import Entities.DomainEntitiesJsonFormats._
import Entities._
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import scala.concurrent.Future
import scala.io.StdIn
import akka.http.scaladsl.unmarshalling.Unmarshal
import spray.json._

import scala.util.{Failure, Success}

object RestClient extends App {

  run()


  def run() {

    implicit val system = ActorSystem("my-system")
    implicit val materializer = ActorMaterializer()
    implicit val executionContext = system.dispatcher

    println(s"Client attempting to fetch from online " +
      "at http://localhost:8080/order/beans\nPress RETURN to stop...")

    val responseFuture: Future[Order] = {
      Http().singleRequest(HttpRequest(uri = "http://localhost:8080/order/beans"))
        .flatMap(response => Unmarshal(response.entity).to[Order])
    }

    responseFuture onComplete {
      case Failure(ex) => System.out.println(s"Failed to perform GET, reason: $ex")
      case Success(response) => System.out.println(s"Server responded with $response")
    }

    Runtime.getRuntime.addShutdownHook(new Thread(() => {
      system.terminate() // and shutdown when done
    }))

    StdIn.readLine() // let it run until user presses return
  }
}

There is not too much to note here, the main points are

  • How the response from the Http methods are Future[T] and as such must be handled with standard Future code
  • How we need to Unmarshal the JSON string response back into a case class. This is thanks to the jsonFormat2/jsonFormat1 (marshallers/unmarshallers) we saw earlier

 

That’s It For Now

So I have shown a few options here both http4s and Akka Http are fairly easy to use, it’s a shame about Finch and IntelliJ, but at the end of the day I also wanted something with a Client API too

 

 

Next Time

Next time we will look the actual Ratings workflow in terms of Kafka Streams, and how we can aggregate incoming rating messages inside a Kafka KStream (which will come from the React from end in the end). We will the look at how we can store these rolling stream values into a Kafka KTable and query the results using Kafka interactive queries

Leave a comment