Last Time

Last time we looked at finishing the “View Rating” page, which queried the Kafka Stream stores via a new endpoint in the play backend. This post will see us finish the workings of the “Create Job” page



Just as a reminder this is part of my ongoing set of posts which I talk about here :, where we will be building up to a point where we have a full app using lots of different stuff, such as these


  • WebPack
  • React.js
  • React Router
  • TypeScript
  • Babel.js
  • Akka
  • Scala
  • Play (Scala Http Stack)
  • MySql
  • SBT
  • Kafka
  • Kafka Streams

Ok so now that we have the introductions out of the way, lets crack on with what we want to cover in this post.


Where is the code?

As usual the code is on GitHub here :


What Is This Post All About?

As stated above this post deals with the “Create Job” functions

  • Allowing the user to specify their geo location position on the map
  • Create a JSON Job object
  • Send it to a new Play backend endpoint
  • Published the Job out over a Kafka topic (using Akka Streams / Reactive Kafka)
  • Consume the Job over a Kafka Topic (using Akka Streams / Reactive Kafka)
  • Push the consumed Job out of the forever frame (Comet functionality in Play backend)
  • Have a new RxJs based Observable over the comet based forever frame, and ensure that is working


As you can see this post actually covers a lot. In fact this post can be thought of as the lynch pin to this entire project. With the code that is contained in this post, we now have everything we need to deal with the rather more complex visual aspects of the “View Job” page.

Initially I was thinking I would create separate Kafka streams (topics) for all the different interactions such as

  • Bid for job
  • Accept job
  • Complete job
  • Cancel job
  • Create new job geo location


But then I came to my senses and realized this could all be achieved using a single stream, that is a Job stream. The idea being is that the JSON payload of that single stream would just hold slightly different state and different points in time.


To see what I mean by this, this is the actual Scala class that we will be turning to/from JSON that is sent out over the SINGLE Kafka topic

case class Job
  jobUUID: String,
  clientFullName: String,
  clientEmail: String,
  driverFullName: String,
  driverEmail: String,
  vehicleDescription: String,
  vehicleRegistrationNumber: String,


With this simple Scala object we can do everything we want, for example


  • To determine if this is a new job that a driver may be assigned to : we just look for Job items that don’t yet have a “driverEmail” this tells us this job is free and has no driver yet
  • To determine if this job has been accepted by a client : we just filter/examine the “isAssigned” property which may be true/false


Anyway you get the idea.


So now that we know there is a single stream lets proceed with the rest of the post shall we.


What are we trying to do in this post?

It really is a continuous chain of a single process which follows these sequential steps



Allowing the user to specify their geo location position on the map

Before we send the actual job JSON payload we need to allow the user to specify their position such that the position can be retrieved later (right now the position is maintained in Local Storage not in the Job payload, I may include client/driver positions in the actual payload, we’ll see how that goes).


Once the client sets their OWN position, they are able to create a job, and push out a new job. If they already have a job in flight the client is NOT able to create a new job.


Thanks to the React map component that was picked some time ago the position update really just boils down to this code in the CreateJob.tsx file

_handleMapClick = (event) => {
    const newState = Object.assign({}, this.state, {
        currentPosition: new Position(, event.latLng.lng())

To deal with the users current position, I also created this simple service class

export class Position {

    lat: number;
    lng: number;

    constructor(lat: number, lng: number) { = lat;
        this.lng = lng;


import { injectable, inject } from "inversify";
import { Position } from "../domain/Position";

export class PositionService {

    constructor() {


    clearUserPosition = (email: string): void => {
        let key = 'currentUserPosition_' + email;

    storeUserPosition = (currentUser: any, position: Position): void => {

        if (currentUser == null || currentUser == undefined)

        if (position == null || position == undefined)

        let currentUsersPosition = {
            currentUser: currentUser,
            position: position
        let key = 'currentUserPosition_' +;
        sessionStorage.setItem(key, JSON.stringify(currentUsersPosition));

    currentPosition = (email: string): Position => {
        let key = 'currentUserPosition_' + email;
        var currentUsersPosition = JSON.parse(sessionStorage.getItem(key));
        return currentUsersPosition.position;

    hasPosition = (email: string): boolean => {
        let key = 'currentUserPosition_' + email;
        var currentUsersPosition = JSON.parse(sessionStorage.getItem(key));
        return currentUsersPosition != null && currentUsersPosition != undefined;


Create a JSON Job object

The next step is to create a Job object that may be posted for the new Job to the Play backend. This is done via a standard JQuery POST, as follows:

_handleCreateJobClick = () => {

    var self = this;
    var currentUser = this._authService.user();

    var newJob = {

        clientFullName: currentUser.fullName,
        driverFullName: '',
        driverEmail: '',
        vehicleDescription: '',
        vehicleRegistrationNumber: '',
        isAssigned: false,
        isCompleted: false


        type: 'POST',
        url: 'job/submit',
        data: JSON.stringify(newJob),
        contentType: "application/json; charset=utf-8",
        dataType: 'json'
    .done(function (jdata, textStatus, jqXHR) {

        const newState = Object.assign({}, self.state, {
            hasIssuedJob: self._jobService.hasIssuedJob()
        self._positionService.storeUserPosition(currentUser, self.state.currentPosition);
    .fail(function (jqXHR, textStatus, errorThrown) {
        const newState = Object.assign({}, self.state, {
            okDialogHeaderText: 'Error',
            okDialogBodyText: jqXHR.responseText,
            okDialogOpen: true,
            okDialogKey: Math.random()

This will also store the users current position using the PositionService we just saw, and redirect the user (this page is only available to clients as its all about creating new jobs, which drivers cant do). We also redirect to the “ViewJob” page on successfully sending a new job.


Send it to a new Play backend endpoint

There is a new route to support the Job creation, so obviously we need a new route entry


POST  /job/submit                              controllers.JobController.submitJob()


Published the Job out over a Kafka topic (using Akka Streams / Reactive Kafka)

Ok so we now know that we have a new endpoint that can accept a “job” JSON object. What does it do with this Job JSON object. Well quite simply it does this

  • Converts the JSON into a Scala object
  • Sends it out over Kafka using Reactive Kafka publisher


You may be asking yourself why we want to burden ourselves with Kafka here at all if all we are going to do is get a Job JSON payload in them send it out via Kafka only to have it come back in via Kafka. This seems weird why bother. The reason we want to involve Kafka here is for the audit an commit log facility that it provided. We want a record of the events, that’s what kafka given us, a nice append only log

Anyway what does the new endpoint code look like that accepts the job. Here it is

package controllers

import javax.inject.Inject

import entities.Job
import entities.JobJsonFormatters._
import entities._
import actors.job.{JobConsumerActor, JobProducerActor}
import{ActorSystem, OneForOneStrategy, Props, SupervisorStrategy}
import akka.pattern.{Backoff, BackoffSupervisor}
import{BroadcastHub, Keep, MergeHub}
import{ActorMaterializer, ActorMaterializerSettings, Supervision}
import play.api.http.ContentTypes
import play.api.libs.Comet
import play.api.libs.json._
import play.api.libs.json.Json
import play.api.libs.json.Format
import play.api.libs.json.JsSuccess
import play.api.libs.json.Writes
import play.api.mvc.{Action, Controller}
import utils.Errors
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Random
import scala.concurrent.duration._

class JobController @Inject()
  implicit actorSystem: ActorSystem,
  ec: ExecutionContext
) extends Controller
  val rand = new Random()

  //Error handling for streams
  val decider: Supervision.Decider = {
    case _ => Supervision.Restart

  implicit val mat = ActorMaterializer(

  val (sink, source) =
    MergeHub.source[JsValue](perProducerBufferSize = 16)
      .toMat(BroadcastHub.sink(bufferSize = 256))(Keep.both)

  //job producer
  val childJobProducerActorProps = Props(classOf[JobProducerActor],mat,ec)
  val jobProducerSupervisorProps = createBackoffSupervisor(childJobProducerActorProps,
  val jobProducerSupervisorActorRef = actorSystem.actorOf(jobProducerSupervisorProps, 
    name = "jobProducerSupervisor")

  //job consumer
  val childJobConsumerActorProps = Props(new JobConsumerActor(sink)(mat,ec))
  val jobConsumerSupervisorProps = createBackoffSupervisor(childJobConsumerActorProps,
  val jobConsumerSupervisorActorRef = actorSystem.actorOf(jobConsumerSupervisorProps, 
    name = "jobConsumerSupervisor")
  jobConsumerSupervisorActorRef ! Init

  def streamedJob() = Action {
    Ok.chunked(source via Comet.json("parent.jobChanged")).as(ContentTypes.HTML)

  def submitJob = Action.async(parse.json) { request =>
    Json.fromJson[Job](request.body) match {
      case JsSuccess(job, _) => {
        jobProducerSupervisorActorRef ! job
        Future.successful(Ok(Json.toJson(job.copy(clientEmail = job.clientEmail.toUpperCase))))
      case JsError(errors) =>
        Future.successful(BadRequest("Could not build a Job from the json provided. " +


  private def createBackoffSupervisor(childProps:Props, actorChildName: String) : Props = {
        childName = actorChildName,
        minBackoff = 3.seconds,
        maxBackoff = 30.seconds,
        randomFactor = 0.2
        OneForOneStrategy() {
          case _ => SupervisorStrategy.Restart



There is a fair bit going on in that code. Lets dissect it a bit


  • We create a backoff supervisor for both the Kafka producer/consumer actors
  • We create a stream that is capable of writing to the Comet frame socket
  • We provide the sink side (MergeHub) of the stream to the consumer actor, such that when it reads a value from Kafka it will be pumped into the sink which will then travel through the Akka stream back to the web page via the BroadcastHub and Comet forever frame back to the HTML (and ultimately RxJs Subject)


Push the consumed Job out of the forever frame (Comet functionality in Play backend)

Ok so we just saw how the 2 actors are created under back off supervisors, and how the consumer (the one that reads from Kafka) gets the ability to essentially write back to the forever frame in the HTML.

So how does the job go out into Kafka land?

That part is quite simple, here it is

package actors.job

import kafka.topics.JobTopics
import serialization.JSONSerde
import akka.Done
import{Actor, PoisonPill}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import{Keep, MergeHub, Source}
import{ActorMaterializer, KillSwitches}
import entities.Job
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import utils.Settings

import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}

class JobProducerActor(
  implicit materializer: ActorMaterializer,
  ec: ExecutionContext
) extends Actor {

  val jSONSerde = new JSONSerde[Job]
  val jobProducerSettings = ProducerSettings(
    new StringSerializer,
    new ByteArraySerializer)

  val ((mergeHubSink, killswitch), kafkaSourceFuture) =
    MergeHub.source[Job](perProducerBufferSize = 16)
      .map(job => {
        val jobBytes = jSONSerde.serializer().serialize("", job)
        (job, jobBytes)
      .map { jobWithBytes =>
        val (job, jobBytes) = jobWithBytes
        new ProducerRecord[String, Array[Byte]](
          JobTopics.JOB_SUBMIT_TOPIC, job.clientEmail, jobBytes)

  kafkaSourceFuture.onComplete {
    case Success(value) => println(s"Got the callback, value = $value")
    case Failure(e) => {
      self ! PoisonPill

  override def postStop(): Unit = {
    println(s"JobProducerActor seen 'Done'")

  override def receive: Receive = {
    case (job: Job) => {
      println(s"JobProducerActor seen ${job}")
    case Done => {
      println(s"JobProducerActor seen 'Done'")
      self ! PoisonPill

We covered a lot of how this worked in the last post, when we talked about how to create a new Rating. The mechanism is essentially the same but this time for Job JSON data.


Consume the Job over a Kafka Topic (using Akka Streams / Reactive Kafka) 

Lets see the JobConsumerActor which takes this Sink (MergeHub from JobController) and pushes the value out to it, when it sees a new value from Kafka on the job topic “job-submit-topic”. This then travels through the Akka stream where it goes via the BroadcastHub out to the forever from in the HTML.


Here is the code, it may look scary but really its just reading a value of the Kafka topic and pushing it out via the Sink (MergeHub)

package actors.job

import entities.{Job, Init}
import kafka.topics.JobTopics
import serialization.JSONSerde
import akka.{Done, NotUsed}
import{Actor, ActorSystem, PoisonPill}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.kafka.scaladsl.{Consumer, Producer}
import{Keep, MergeHub, Sink, Source}
import{ActorMaterializer, KillSwitches}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer}
import play.api.libs.json.{JsValue, Json}
import utils.Settings

import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}

//TODO : This actor shouls take in a way of pushing back to Websocket
class JobConsumerActor
  (val sink:Sink[JsValue, NotUsed])
  (implicit materializer: ActorMaterializer, ec: ExecutionContext
) extends Actor {

  val jSONSerde = new JSONSerde[Job]
  val jobConsumerSettings = ConsumerSettings(
    context.system,new StringDeserializer(),new ByteArrayDeserializer())
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  val ((_, killswitch), kafkaConsumerFuture) =
    Consumer.committableSource(jobConsumerSettings, Subscriptions.topics(JobTopics.JOB_SUBMIT_TOPIC))
      .mapAsync(1) { msg => {
        val jobBytes = msg.record.value
        val job = jSONSerde.deserializer().deserialize(JobTopics.JOB_SUBMIT_TOPIC,jobBytes)
        self ! job

  kafkaConsumerFuture.onComplete {
    case Success(value) => println(s"Got the callback, value = $value")
    case Failure(e) => {
      self ! PoisonPill

  override def postStop(): Unit = {
    println(s"JobConsumerActor seen 'Done'")

  override def receive: Receive = {
    case (job: Job) => {
      println(s"JobConsumerActor seen ${job}")
      val finalJsonValue = Json.toJson(job)
    case Done => {
      println(s"JobConsumerActor seen 'Done'")
      self ! PoisonPill
    case Init => {
      println("JobConsumerActor saw init")


Have a new RxJs based Observable over the comet based forever frame, and ensure that is working

So at the end of the pipeline, we have a forever frame in the browser (always available) that we wish to get events from. Ideally we want to turn this rather bland event into a better RxJs Observable. So how do we do that. Its quite simple we use this little service that is able to create a new Observable from the incoming event for us

import { injectable, inject } from "inversify";
import { JobEventArgs } from "../domain/JobEventArgs";
import Rx from 'rx';

export class JobStreamService {

    private _jobSourceObservable: Rx.Observable<any>;

    constructor() {


    init = (): void => {

        window['jobChanged'] = function (incomingJsonPayload: any) {
            let evt = new CustomEvent('onJobChanged', new JobEventArgs(incomingJsonPayload));

        this._jobSourceObservable = Rx.Observable.fromEvent(window, 'onJobChanged');

    getJobStream = (): Rx.Observable<any> => {
        return this._jobSourceObservable;

Where the JobEventArgs looks like this

export class JobEventArgs {

    detail: any;

    constructor(detail: any) {
        this.detail = detail;


We can this use this service in other code and subscribe to this RxJs Observable that the above service exposes. Here is an example of subscribing to it. We will talk much more about this in the next post

componentWillMount() {
    this._subscription =
        jobArgs => {

                //TODO : 1. This should not be hard coded
                //TODO : 2. We should push out current job when we FIRST LOAD this page
                //          if we are a client, and we should enrich it if we are a driver
                //       3. The list of markers should be worked out again every time based
                //          on RX stream messages
                console.log('RX saw onJobChanged');
                console.log('RX x = ', jobArgs.detail);
            error => {
                console.log('RX saw ERROR');
                console.log('RX error = ', error);
            () => {
                console.log('RX saw COMPLETE');



I am aware this post has taken a while to get out there. I had an issue in the middle of this one where I broke something and I had to unwind a whole bunch of commits and bring them back in one by one to see when it broke. This caused a bit of friction. The other reason this post took so long is that life just gets in the way some times. Stupid life huh


Next Time

Next time we will focus our attention on the “View Job” page which is probably the most complex visual aspect of this project, but we now have all the plumbing to support it, so its just a matter of getting it done. After that page is done, this project it pretty much there. Yay


Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s