Last Time

So last time we walk through the Rating Kafka streams architecture and also showed how we can query the local state stores. I also stated that the standard KafkaProducer that was used in the last post was more for demonstration purposes and long term, we would like to swap that out with a Play framework REST endpoint that allowed us to publish a message straight from our app to the Kafka rating stream processing



Just as a reminder this is part of my ongoing set of posts which I talk about here :, where we will be building up to a point where we have a full app using lots of different stuff, such as these


  • WebPack
  • React.js
  • React Router
  • TypeScript
  • Babel.js
  • Akka
  • Scala
  • Play (Scala Http Stack)
  • MySql
  • SBT
  • Kafka
  • Kafka Streams

Ok so now that we have the introductions out of the way, lets crack on with what we want to cover in this post.


Where is the code?

As usual the code is on GitHub here :


What Is This Post All About?

As stated in the last post “Kafka interactive queries” we used a standard KafkaProducer to simulate what would have come from the end user via the Play Framework API. This time we will build out the Play Framework side of things, to include the ability to produce “rating” objects that are consumed via the rating Kafka Stream processing topology introduced in the last post



So we already had an SBT file inside of the PlayBackEndApi project, but we need to expand that to include support for a couple of things

  • Reactive Kafka
  • Jackson JSON (play already comes with its own JSON support, but for the Kafka Serialization-DeSerialization (Serdes) I wanted to make sure it was the same as the Kafka Streams project

This means these additions to the built.sbt file


val configVersion = "1.0.1"

libraryDependencies ++= Seq(
  "org.reactivemongo" %% "play2-reactivemongo" % "0.11.12",
  "com.typesafe.akka" % "akka-stream-kafka_2.11" % "0.17",
  "" % "jackson-module-scala_2.11" % "2.8.4",
  "com.typesafe"        % "config" % configVersion



We also want to ensure that we are using the same topics as the stream processing topology so I have just replicated that class (in reality I should have made this stuff a common JAR, but meh)


package kafka.topics

object RatingsTopics {
    val RATING_SUBMIT_TOPIC = "rating-submit-topic"
    val RATING_OUTPUT_TOPIC = "rating-output-topic"



As this is essentially a new route that would be called via the front end React app when a new Rating is given, we obviously need a new route/controller. The route is fairly simple which is as follows:


# Rating page
POST  /rating/submit/new                       controllers.RatingController.submitNewRating()



The new Rating route expects a Rating object to be provided as a POST in JSON. Here is the actual Rating object and play JSON handling for it


package Entities

import play.api.libs.json._
import play.api.libs.functional.syntax._

case class Rating(fromEmail: String, toEmail: String, score: Float)

object Rating {
  implicit val formatter = Json.format[Rating]

object RatingJsonFormatters {

  implicit val ratingWrites = new Writes[Rating] {
    def writes(rating: Rating) = Json.obj(
      "fromEmail" -> rating.fromEmail,
      "toEmail" -> rating.toEmail,
      "score" -> rating.score

  implicit val ratingReads: Reads[Rating] = (
      (JsPath \ "fromEmail").read[String] and
      (JsPath \ "toEmail").read[String] and
      ((JsPath \ "score").read[Float])
    )(Rating.apply _)



So now that we have a route lets turn our attention to the new RatingController. Which right now to just accept a new Rating just looks like this:

package controllers

import javax.inject.Inject

import Actors.Rating.RatingProducerActor
import Entities.RatingJsonFormatters._
import Entities._
import{ActorSystem, OneForOneStrategy, Props, SupervisorStrategy}
import akka.pattern.{Backoff, BackoffSupervisor}
import{ActorMaterializer, ActorMaterializerSettings, Supervision}
import play.api.libs.json._
import play.api.mvc.{Action, Controller}
import utils.Errors

import scala.concurrent.{ExecutionContext, Future}
import scala.util.Random
import scala.concurrent.duration._

class RatingController @Inject()
  implicit actorSystem: ActorSystem,
  ec: ExecutionContext
) extends Controller

  //Error handling for streams
  val decider: Supervision.Decider = {
    case _                      => Supervision.Restart

  implicit val mat = ActorMaterializer(
  val childRatingActorProps = Props(classOf[RatingProducerActor],mat,ec)
  val rand = new Random()
  val ratingSupervisorProps = BackoffSupervisor.props(
      childName = s"RatingProducerActor_${rand.nextInt()}",
      minBackoff = 3.seconds,
      maxBackoff = 30.seconds,
      randomFactor = 0.2
      OneForOneStrategy() {
        case _ => SupervisorStrategy.Restart

  val ratingSupervisorActorRef = 
      name = "ratingSupervisor"

  def submitNewRating = Action.async(parse.json) { request =>
    Json.fromJson[Rating](request.body) match {
      case JsSuccess(newRating, _) => {
        ratingSupervisorActorRef ! newRating
          newRating.copy(toEmail = newRating.toEmail.toUpperCase))))
      case JsError(errors) =>
          "Could not build a Rating from the json provided. " +



The main points from the code above are

  • We use the standard Play framework JSON handling for the unmarshalling/marshalling to JSON
  • That controller route is async (see how it returns a Future[T]
  • That we will not process anything if the JSON is invalid
  • That the RatingController creates a supervisor actor that will supervise the creation of another actor namely a RatingProducerActor. It may look like this happens each time the RatingController is instantiated, which is true. However this only happens once as there is only one router in play, and the controller are created by the router. You can read more about this here : The short story is that the supervisor is created once, and the actor is supervises will be created using a BackOffSupervisor where the creation of the actor will be retried using an incremental back off strategy. We also use the OneForOneStrategy to ensure only the single failed child actor is effected by the supervisor.
  • That this controller is also responsible for creating a ActorMaterializer with a supervision strategy (more on this in the next section). The ActorMaterializer  is used to create actors within Akka Streams workflows.




The final part of the pipeline for this post is obviously to be able to write a Rating to a Kafka topic, via a Kafka producer. As already stated I chose to use reactive a Reactive Kafka (akka streams Kafka producer which build upon Akka streams ideas, where we have Sinks/Sources/Flow/RunnableGraph all the good stuff. So here is the full code for the actor:

package Actors.Rating

import Entities.Rating
import Serialization.JSONSerde
import akka.Done
import{Actor, PoisonPill}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import{ActorMaterializer, KillSwitches}
import{Keep, MergeHub, Source}
import kafka.topics.RatingsTopics
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import utils.Settings

import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}

class RatingProducerActor(
    implicit materializer: ActorMaterializer,
    ec: ExecutionContext
) extends Actor {

  val jSONSerde = new JSONSerde[Rating]
  val ratingProducerSettings = ProducerSettings(
      new StringSerializer,
      new ByteArraySerializer)
    .withProperty(Settings.ACKS_CONFIG, "all")

  val ((mergeHubSink, killswitch), kafkaSourceFuture) =
    MergeHub.source[Rating](perProducerBufferSize = 16)
      .map(rating => {
        val ratingBytes = jSONSerde.serializer().serialize("", rating)
        (rating, ratingBytes)
      .map { ratingWithBytes =>
        val (rating, ratingBytes) = ratingWithBytes
        new ProducerRecord[String, Array[Byte]](
          RatingsTopics.RATING_SUBMIT_TOPIC, rating.toEmail, ratingBytes)

  kafkaSourceFuture.onComplete {
    case Success(value) => println(s"Got the callback, value = $value")
    case Failure(e) => self ! PoisonPill

  override def postStop(): Unit = {
    println(s"RatingProducerActor seen 'Done'")

  override def receive: Receive = {
    case (rating: Rating) => {
      println(s"RatingProducerActor seen ${rating}")
    case Done => {
      println(s"RatingProducerActor seen 'Done'")
      self ! PoisonPill


I’ll be honest there is a fair bit going on in that small chunk of code above so lets dissect it. What it happening exactly?

  • The most important point is that we simply use the actor as a vessel to host a reactive kafka akka stream RunnableGraph representing a Graph of MergeHub – > Reactive Kafka producer sink. This is completely fine and a normal thing to do. Discussing akka streams is out of scope for this post but if you want to know more you can read more on a previous post I did here : 
  • So we now know this actor hosts a stream, but the stream could fail, or the actor could fail. So what we want is if the actor fails the stream is stopped, and if the stream fails the actor is stopped. To do that we need to do a couple of thing
    • STREAM FAILING : Since the RunnableGraph can return a Future[T] we can hook a callback Success/Failure on that, and send a PoisonPill to the hosting actor. Then the supervisor actor we saw above would kick in and try and create a new instance of this actor. Another thing to note is that the stream hosted in this actor uses the ActorMaterializer that was supplied by the RatingController, where we provided a restart supervision strategy for the stream.
    • ACTOR FAILING : If the actor itself fails the Akka framework will call the postStop() method, at which point we want to shutdown the stream within this actor. So how can we shutdown the hosted stream? Well see in the middle of the stream setup there is this line .viaMat(KillSwitches.single)(Keep.both) this allows us to get a killswitch from the materialized values for the stream. Once we have a KillSwitch we can simply call its shutDown() method.
    • BELTS AND BRACES : I have also provided a way for the outside world to shutdown this actor and its hosted stream. This is via sending this actor a Done message. I have not put this in yet, but the hook is there to demonstrate how you could do this.
  • We can see that there is a MergeHub source which allows external code to push stuff through the MergeHub via the materialized Sink value from within the actor
  • We can also see that the Rating object that the actor sees is indeed pushed into the MergeHub materialized Sink via this actor, and then some transformation is done on it, to grab its raw bytes
  • We can see the final stage in the RunnableGraph is the  Reactive Kafka Producer.plainSink. Which would result in a message being pushed out to a Kafka topic from the hosted stream, pushed Rating object from this actor into the stream

And I think that is the main set of points about how this actor works



The End Result

Just to prove that this is all working here is a screen shot of the new RatingController http://localhost:9000/rating/submit/new endpoint being called with a JSON payload representing the Rating




And here is the Akka Http endpoint that queries the Kafka Stream state store(s)


http://localhost:8080/ratingByEmail? this gives an empty list as we have NOT sent any Rating through for email “” yet




http://localhost:8080/ratingByEmail? this gives 1 result which is consistent with the amount of Rating(s) I created




http://localhost:8080/ratingByEmail? this gives 3 result which is consistent with the amount of Rating(s) I created




So that’s cool this means that we have successfully integrated publishing of a JSON payload Rating object through Kafka to the the Kafka streams Rating processor…….Happy days




Straight after the last article I decided to Dockerize everything (a decision I have now reversed, due to the flay nature of Dockers dependsOn and it not truly waiting for the item depended on even when using “condition : server_healthy” and “healthcheck – test” etc et), and some code must have become corrupt, as stuff from the last post stopped working.


An example from the Docker-Compose docs being


version: '2.1'
    build: .
        condition: service_healthy
        condition: service_started
    image: redis
    image: redis
      test: "exit 0"


I love Docker but for highly complex setups I think you are better off using a Docker-Compose file but just not trying to bring it all up in one go. I may bring the Docker bits back into the fold for anyone that is reading this that wants to play around, but I will have to think about that closely.

Once I realized that my Docker campaign was doing more harm than good, and I reverted back to my extremely hand coded, but deterministic PowerShell startup script, I found that getting the Play Framework and a Reactive Kafka (akka streams Kafka producer up and running was quite simple, and it kind of worked like a charm first time. Yay



Next Time

Next time we should be able to making the entire rating view page work as we now have the following things

So we should quite easily be able to turn that data into a simple bootstrap table in the React portion of this application


Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: