Category Archives: Scala

MADCAP IDEA 9 : KAFKA STREAMS INTERACTIVE QUERIES

Last Time

 

So last time we came up with a sort of 1/2 way house type post that would pave the way for this one, where we examined several different types of REST frameworks for use with Scala. The requirements were that we would be able to use JSON and that there would be both a client and server side API for the chosen library. For http4s and Akka Http worked. I decided to go with Akka Http due to being more familiar with it.

So that examination of REST APIs has allowed this post to happen. In this post what we will be doing is looking at

  • How to install Kafka/Zookeeper and get them running on Windows
  • Walking through a KafkaProducer
  • Walking through a Kafka Streams processing node, and the duality of streams
  • Walking through Kafka Streams interactive queries

 

PreAmble

Just as a reminder this is part of my ongoing set of posts which I talk about here :

https://sachabarbs.wordpress.com/2017/05/01/madcap-idea/, where we will be building up to a point where we have a full app using lots of different stuff, such as these

 

  • WebPack
  • React.js
  • React Router
  • TypeScript
  • Babel.js
  • Akka
  • Scala
  • Play (Scala Http Stack)
  • MySql
  • SBT
  • Kafka
  • Kafka Streams

 

Ok so now that we have the introductions out of the way, lets crack on with what we want to cover in this post.

 

Where is the code?

As usual the code is on GitHub here : https://github.com/sachabarber/MadCapIdea

 

Before we start, what is this post all about?

 

Well I don’t know if you recall but we are attempting to create a uber simple uber type application where there are drivers/clients. A client can put out a new job, drivers bid for it. And at the end of a job they can rate each other, see the job completion/rating/view rating sections of this previous post : https://sachabarbs.wordpress.com/2017/06/27/madcap-idea-part-6-static-screen-design/

 

The ratings will be placed into streams and aggregated into permanent storage, and will be available for querying later to display in the react front end. The rating should be grouped by email, and also searchable via the use of an email.

 

So that is what we are attempting to cover in this post. However since this is the 1st time we have had to use Kafka, this post will also talk through what you have to go through to get Kafka setup on windows. In a subsequent post I will attempt to get EVERYTHING up and working (including the Play front end) in Docker containers, but for now we will assume a local install of Kafka, if nothing else its good to know how to set this up

 

How to install Kafka/Zookeeper and get them running on Windows

This section will talk you through how to get Kafka and get it working on Windows

 

Step 1 : Download Kafka

Grab Confluence Platform 3.3.0 Open Source : http://packages.confluent.io/archive/3.3/confluent-oss-3.3.0-2.11.zip

 

Step 2 : Update Dodgy BAT Files

The official Kafka windows BAT files don’t seem to work in the Confluence Platform 3.3.0 Open Source download. So replace the official [YOUR INSTALL PATH]\confluent-3.3.0\bin\windows BAT files with the ones found here : https://github.com/renukaradhya/confluentplatform/tree/master/bin/windows

 

Step 3 : Make Some Minor Changes To Log Locations etc etc

Kafka/Zookeeper as installed are setup for Linux, as such these paths won’t work on Windows. So we need to adjust that a bit. So lets do that now

 

  • Modify the [YOUR INSTALL PATH]\confluent-3.3.0\etc\kafka\zookeeper.properties file to change the dataDir to something like dataDir=c:/temp/zookeeper
  • Modify the [YOUR INSTALL PATH]\confluent-3.3.0\etc\kafka\server.properties file to uncomment the line delete.topic.enable=true

 

Step 4 : Running Zookeeper + Kafka + Creating Topics

Now that we have installed everything, it’s just a matter of running stuff up. Sadly before we can run Kafka we need to run Zookeeper, and before Kafka can send messages we need to ensure that the Kafka topics are created. Topics must exist before messages

 

Mmm that sounds like a fair bit of work. Well it is, so I decided to script this into a little PowerShell script. This script is available within the source code at https://github.com/sachabarber/MadCapIdea/tree/master/PowerShellProject/PowerShellProject where the script is called RunPipeline.ps1

 

So all we need to do is change to the directory that the RunPipeline.ps1 is in, and run it.

Obviously it is setup to my installation folders, you WILL have to change the variables at the top of the script if you want to run this on your own machine

 

Here is the contents of the RunPipeline.ps1 file

 

$global:mongoDbInstallationFolder = "C:\Program Files\MongoDB\Server\3.5\bin\"
$global:kafkaWindowsBatFolder = "C:\Apache\confluent-3.3.0\bin\windows\"
$global:kafkaTopics = 
	"rating-submit-topic",
	"rating-output-topic"
$global:ProcessesToKill = @()



function RunPipeLine() 
{
	WriteHeader "STOPPING PREVIOUS SERVICES"
	StopZookeeper
	StopKafka

	Start-Sleep -s 20
	
	WriteHeader "STARTING NEW SERVICE INSTANCES"
	StartZookeeper
	StartKafka
	
	Start-Sleep -s 20

	CreateKafkaTopics
    RunMongo

	WaitForKeyPress

	WriteHeader "KILLING PROCESSES CREATED BY SCRIPT"
	KillProcesses
}

function WriteHeader($text) 
{
	Write-Host "========================================`r`n"
	Write-Host "$text`r`n"
	Write-Host "========================================`r`n"
}


function StopZookeeper() {
	$zookeeperCommandLine = $global:kafkaWindowsBatFolder + "zookeeper-server-stop.bat"
	Write-Host "> Zookeeper Command Line : $zookeeperCommandLine`r`n"
    $global:ProcessesToKill += start-process $zookeeperCommandLine -WindowStyle Normal -PassThru
}

function StopKafka() {
	$kafkaServerCommandLine = $global:kafkaWindowsBatFolder + "kafka-server-stop.bat" 
	Write-Host "> Kafka Server Command Line : $kafkaServerCommandLine`r`n"
    $global:ProcessesToKill += start-process $kafkaServerCommandLine  -WindowStyle Normal -PassThru
}

function StartZookeeper() {
	$zookeeperCommandLine = $global:kafkaWindowsBatFolder + "zookeeper-server-start.bat"
	$arguments = $global:kafkaWindowsBatFolder + "..\..\etc\kafka\zookeeper.properties"
	Write-Host "> Zookeeper Command Line : $zookeeperCommandLine args: $arguments `r`n"
    $global:ProcessesToKill += start-process $zookeeperCommandLine $arguments -WindowStyle Normal -PassThru
}

function StartKafka() {
	$kafkaServerCommandLine = $global:kafkaWindowsBatFolder + "kafka-server-start.bat" 
	$arguments = $global:kafkaWindowsBatFolder + "..\..\etc\kafka\server.properties"
	Write-Host "> Kafka Server Command Line : $kafkaServerCommandLine args: $arguments `r`n"
    $global:ProcessesToKill += start-process $kafkaServerCommandLine $arguments -WindowStyle Normal -PassThru
}

function CreateKafkaTopics() 
{
	Foreach ($topic in $global:kafkaTopics )
	{
		$kafkaCommandLine = $global:kafkaWindowsBatFolder + "kafka-topics.bat"
		$arguments = "--zookeeper localhost:2181 --create  --replication-factor 1 --partitions 1 --topic $topic"
		Write-Host "> Create Kafka Topic Command Line : $kafkaCommandLine args: $arguments `r`n"
		$global:ProcessesToKill += start-process $kafkaCommandLine $arguments -WindowStyle Normal -PassThru
	}
}

function RunMongo() {
	$mongoexe = $global:mongoDbInstallationFolder + "mongod.exe"
	Write-Host "> Mongo Command Line : $mongoexe `r`n" 
	$global:ProcessesToKill += Start-Process -FilePath $mongoexe  -WindowStyle Normal -PassThru
}

function WaitForKeyPress
{
	Write-Host -NoNewLine "Press any key to continue....`r`n"
	[Console]::ReadKey()
}


function KillProcesses() 
{
	Foreach ($processToKill in $global:ProcessesToKill )
	{
		$name = $processToKill | Get-ChildItem -Name
		Write-Host "Killing Process : $name `r`n" 
		$processToKill | Stop-Process -Force
	}
}


# Kick of the entire pipeline
RunPipeLine

 

 

As you can see this script does a bit more than just run up Zookeeper and Kafka, it also create the topics and runs Mongo DB that is also required by the main Play application (remember we are using Reactive Mongo for the login/registration side of things)

 

So far I have not had many issues with the script. Though occasionally when you are trying out new code, I do tend to clear out all the Zookeeper/Kafka state so far, which for me is stored here

 

image

 

It just allows me to start with a clean slate as it were, you should need to do this that often

 

Walking through a KafkaProducer

So the Kafka Producer I present here will send a String key and a JSON Ranking object as a payload. Lets have a quick look at the Ranking object and how it gets turned to and from JSON before we look at the producer code

 

The Domain Entities

Here is what the domain entities looks like, it can be seen that these use the Spray formatters (part of Akka Http) Marshaller/UnMarshaller JSON support. This is not required by the producer but is required by the REST API, which we will look at later. The producer and Kafka streams code work with a different serialization abstraction, something known as SERDES, which as far as I know is only used as a term in Kafka. Its quite simple it stands for Serializer-Deserializer (Serdes)

 

Anyway here are the domain entities and Spray formatters that allow Akka (but not Kafka Streams, more on this later) to work

 

package Entities

import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json.DefaultJsonProtocol._

case class Ranking(fromEmail: String, toEmail: String, score: Float)
case class HostStoreInfo(host: String, port: Int, storeNames: List[String])

object AkkaHttpEntitiesJsonFormats {
  implicit val RankingFormat = jsonFormat3(Ranking)
  implicit val HostStoreInfoFormat = jsonFormat3(HostStoreInfo)
}

 

Serdes

So as we just described Kafka Streams actually cares not for the standard Akka Http/Spray JSON formatters, its not part of Kafka Streams after all. However Kafka Streams still has some of the same concerns where it needs to serialize and de-serialize data (like when it re-partitions (i.e. shuffles data)), so how does it realize that. Well it uses a weirdly name thing called a “serde”. There are MANY inbuilt “serde” types, but you can of course create your own to represent your “on the wire format”. I am using JSON, so this is what my generic “Serde” implementation looks like. I should point out that I owe a great many things to a chap called Jendrik whom I made contact with who has been doing some great stuff with Kafka, his blog has really helped me out : https://www.madewithtea.com/category/kafka-streams.html

 

Anyway here is my/his/yours/ours “Serde” code for JSON

 

import java.lang.reflect.{ParameterizedType, Type}
import java.util
import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.core.`type`.TypeReference
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.exc.{UnrecognizedPropertyException => UPE}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer}


package Serialization {

  object Json {

    type ParseException = JsonParseException
    type UnrecognizedPropertyException = UPE

    private val mapper = new ObjectMapper()
    mapper.registerModule(DefaultScalaModule)
    mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)

    private def typeReference[T: Manifest] = new TypeReference[T] {
      override def getType = typeFromManifest(manifest[T])
    }

    private def typeFromManifest(m: Manifest[_]): Type = {
      if (m.typeArguments.isEmpty) {
        m.runtimeClass
      }
      else new ParameterizedType {
        def getRawType = m.runtimeClass

        def getActualTypeArguments = m.typeArguments.map(typeFromManifest).toArray

        def getOwnerType = null
      }
    }

    object ByteArray {
      def encode(value: Any): Array[Byte] = mapper.writeValueAsBytes(value)

      def decode[T: Manifest](value: Array[Byte]): T =
        mapper.readValue(value, typeReference[T])
    }

  }

  /**
    * JSON serializer for JSON serde
    *
    * @tparam T
    */
  class JSONSerializer[T] extends Serializer[T] {
    override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()

    override def serialize(topic: String, data: T): Array[Byte] =
      Json.ByteArray.encode(data)

    override def close(): Unit = ()
  }

  /**
    * JSON deserializer for JSON serde
    *
    * @tparam T
    */
  class JSONDeserializer[T >: Null <: Any : Manifest] extends Deserializer[T] {
    override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()

    override def close(): Unit = ()

    override def deserialize(topic: String, data: Array[Byte]): T = {
      if (data == null) {
        return null
      } else {
        Json.ByteArray.decode[T](data)
      }
    }
  }

  /**
    * JSON serde for local state serialization
    *
    * @tparam T
    */
  class JSONSerde[T >: Null <: Any : Manifest] extends Serde[T] {
    override def deserializer(): Deserializer[T] = new JSONDeserializer[T]

    override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()

    override def close(): Unit = ()

    override def serializer(): Serializer[T] = new JSONSerializer[T]
  }

}

 

And finally here is what the producer code looks like

 

As you can see it is actually fairly straight forward, its simple produces 10 messages (though you can uncomment the line in the code to make it endless) of String as a key and a Ranking as the message. The key would be who the ranking is destined for. These will then be aggregated by the streams processing node, and stored as a list against a Key (ie email)

 


package Processing.Ratings {

  import java.util.concurrent.TimeUnit

  import Entities.Ranking
  import Serialization.JSONSerde
  import Topics.RatingsTopics

  import scala.util.Random
  import org.apache.kafka.clients.producer.ProducerRecord
  import org.apache.kafka.clients.producer.KafkaProducer
  import org.apache.kafka.common.serialization.Serdes
  import Utils.Settings
  import org.apache.kafka.clients.producer.ProducerConfig

  object RatingsProducerApp extends App {

   run()

    private def run(): Unit = {

      val jSONSerde = new JSONSerde[Ranking]
      val random = new Random
      val producerProps = Settings.createBasicProducerProperties
      val rankingList = List(
        Ranking("jarden@here.com","sacha@here.com", 1.5f),
        Ranking("miro@here.com","mary@here.com", 1.5f),
        Ranking("anne@here.com","margeret@here.com", 3.5f),
        Ranking("frank@here.com","bert@here.com", 2.5f),
        Ranking("morgan@here.com","ruth@here.com", 1.5f))

      producerProps.put(ProducerConfig.ACKS_CONFIG, "all")

      System.out.println("Connecting to Kafka cluster via bootstrap servers " +
        s"${producerProps.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)}")

      // send a random string from List event every 100 milliseconds
      val rankingProducer = new KafkaProducer[String, Array[Byte]](
        producerProps, Serdes.String.serializer, Serdes.ByteArray.serializer)

      //while (true) {
      for (i <- 0 to 10) {
        val ranking = rankingList(random.nextInt(rankingList.size))
        val rankingBytes = jSONSerde.serializer().serialize("", ranking)
        System.out.println(s"Writing ranking ${ranking} to input topic ${RatingsTopics.RATING_SUBMIT_TOPIC}")
        rankingProducer.send(new ProducerRecord[String, Array[Byte]](
          RatingsTopics.RATING_SUBMIT_TOPIC, ranking.toEmail, rankingBytes))
        Thread.sleep(100)
      }

      Runtime.getRuntime.addShutdownHook(new Thread(() => {
        rankingProducer.close(10, TimeUnit.SECONDS)
      }))
    }
  }
}

 

 

 

Walking through a Kafka Streams processing node, and the duality of streams

Before we get started I just wanted to include a severak excerpts taken from the official Kafka docs : http://docs.confluent.io/current/streams/concepts.html#duality-of-streams-and-tables which talks about KStream and KTable objects (which are the stream and table objects inside Kafka streams)

 

When implementing stream processing use cases in practice, you typically need both streams and also databases. An example use case that is very common in practice is an e-commerce application that enriches an incoming stream of customer transactions with the latest customer information from a database table. In other words, streams are everywhere, but databases are everywhere, too.

Any stream processing technology must therefore provide first-class support for streams and tables. Kafka’s Streams API provides such functionality through its core abstractions for streams and tables, which we will talk about in a minute. Now, an interesting observation is that there is actually a close relationship between streams and tables, the so-called stream-table duality. And Kafka exploits this duality in many ways: for example, to make your applications elastic, to support fault-tolerant stateful processing, or to run interactive queries against your application’s latest processing results. And, beyond its internal usage, the Kafka Streams API also allows developers to exploit this duality in their own applications.

 

A simple form of a table is a collection of key-value pairs, also called a map or associative array. Such a table may look as follows:

 

    ../_images/streams-table-duality-01.jpg

     

    The stream-table duality describes the close relationship between streams and tables.

     

    • Stream as Table: A stream can be considered a changelog of a table, where each data record in the stream captures a state change of the table. A stream is thus a table in disguise, and it can be easily turned into a “real” table by replaying the changelog from beginning to end to reconstruct the table. Similarly, aggregating data records in a stream will return a table. For example, we could compute the total number of pageviews by user from an input stream of pageview events, and the result would be a table, with the table key being the user and the value being the corresponding pageview count.
    • Table as Stream: A table can be considered a snapshot, at a point in time, of the latest value for each key in a stream (a stream’s data records are key-value pairs). A table is thus a stream in disguise, and it can be easily turned into a “real” stream by iterating over each key-value entry in the table.

     

    Let’s illustrate this with an example. Imagine a table that tracks the total number of pageviews by user (first column of diagram below). Over time, whenever a new pageview event is processed, the state of the table is updated accordingly. Here, the state changes between different points in time – and different revisions of the table – can be represented as a changelog stream (second column).

     

    image

     

    Because of the stream-table duality, the same stream can be used to reconstruct the original table (third column):

     

    ../_images/streams-table-duality-03.jpg

     

    The same mechanism is used, for example, to replicate databases via change data capture (CDC) and, within Kafka Streams, to replicate its so-called state stores across machines for fault tolerance. The stream-table duality is such an important concept for stream processing applications in practice that Kafka Streams models it explicitly via the KStream and KTable abstractions, which we describe in the next sections.

     

     

    I would STRONLY urge you all to read the section of the official docs above, as it will really help you should you want to get into Kafka Streams.

     

    Anyway with all that in mind how does that relate to the use case we are trying to solve. So far we have a publisher that pushes out Rating objects, and as stated ideally we would like to query these across all  processor nodes. As such we should now know that this will involve a KStream and some sort of aggregation to an eventual KTable (where a state store will be used).

     

    Probably the easiest thing to do is to start with the code, which looks like this for the main stream processing code for the Rating section of then final app.

      import java.util.concurrent.TimeUnit
      import org.apache.kafka.common.serialization._
      import org.apache.kafka.streams._
      import org.apache.kafka.streams.kstream._
      import Entities.Ranking
      import Serialization.JSONSerde
      import Topics.RatingsTopics
      import Utils.Settings
      import Stores.StateStores
      import org.apache.kafka.streams.state.HostInfo
      import scala.concurrent.ExecutionContext
    
    
      package Processing.Ratings {
    
        class RankingByEmailInitializer extends Initializer[List[Ranking]] {
          override def apply(): List[Ranking] = List[Ranking]()
        }
    
        class RankingByEmailAggregator extends Aggregator[String, Ranking,List[Ranking]] {
          override def apply(aggKey: String, value: Ranking, aggregate: List[Ranking]) = {
            value :: aggregate
          }
        }
    
    
        object RatingStreamProcessingApp extends App {
    
          implicit val ec = ExecutionContext.global
    
          run()
    
          private def run() : Unit = {
            val stringSerde = Serdes.String
            val rankingSerde = new JSONSerde[Ranking]
            val listRankingSerde = new JSONSerde[List[Ranking]]
            val builder: KStreamBuilder = new KStreamBuilder
            val rankings = builder.stream(stringSerde, rankingSerde, RatingsTopics.RATING_SUBMIT_TOPIC)
    
            //aggrgate by (user email -> their rankings)
            val rankingTable = rankings.groupByKey(stringSerde,rankingSerde)
              .aggregate(
                new RankingByEmailInitializer(),
                new RankingByEmailAggregator(),
                listRankingSerde,
                StateStores.RANKINGS_BY_EMAIL_STORE
              )
    
            //useful debugging aid, print KTable contents
            rankingTable.toStream.print()
    
            val streams: KafkaStreams = new KafkaStreams(builder, Settings.createRatingStreamsProperties)
            val restEndpoint:HostInfo  = new HostInfo(Settings.restApiDefaultHostName, Settings.restApiDefaultPort)
            System.out.println(s"Connecting to Kafka cluster via bootstrap servers ${Settings.bootStrapServers}")
            System.out.println(s"REST endpoint at http://${restEndpoint.host}:${restEndpoint.port}")
    
            // Always (and unconditionally) clean local state prior to starting the processing topology.
            // We opt for this unconditional call here because this will make it easier for you to 
            // play around with the example when resetting the application for doing a re-run 
            // (via the Application Reset Tool,
            // http://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool).
            //
            // The drawback of cleaning up local state prior is that your app must rebuilt its local 
            // state from scratch, which will take time and will require reading all the state-relevant 
            // data from the Kafka cluster over the network.
            // Thus in a production scenario you typically do not want to clean up always as we do 
            // here but rather only when it is truly needed, i.e., only under certain conditions 
            // (e.g., the presence of a command line flag for your app).
            // See `ApplicationResetExample.java` for a production-like example.
            streams.cleanUp();
            streams.start()
            val restService = new RatingRestService(streams, restEndpoint)
            restService.start()
    
            Runtime.getRuntime.addShutdownHook(new Thread(() => {
              streams.close(10, TimeUnit.SECONDS)
              restService.stop
            }))
    
            //return unit
            ()
          }
        }
      }
    

     

     

    Remember the idea is to get a Rating for  a user (based on their email address), and store all the Rating associated with them in some sequence/list such that they can be retrieved in one go based on a a key, where the key would be the users email, and the value would be this list of Rating objects.I think with the formal discussion from the official Kafka docs and my actual Rating requirement, the above should hopefully be pretty clear.

     

     

    Walking through Kafka Streams interactive queries

    So now that we have gone through how data is produced, and transformed (well actually I did not do too much transformation other than a simple map, but trust me you can), and how we aggregate results from a KStream to a KTable (and its state store), we will move on to see how we can use Kafka interactive queries to query the state stores.

     

    One important concept is that if you used multiple partitions for your original topic, the state may be spread across n-many processing node. For this project I have only chosen to use 1 partition, but have written the code to support n-many.

     

    So lets assume that each node could read a different segment of data, or that each node must read from n-many partitions (there is not actually a mapping to nodes and partitions these are 2 mut read chapters elastic-scaling-of-your-application and parallelism-model) we would need each node to expose a REST API to allow its OWN state store to be read. By reading ALL the state stores we are able to get a total view of ALL the persisted data across ALL the partitions. I urge all of you to read this section of the official docs : http://docs.confluent.io/current/streams/developer-guide.html#querying-remote-state-stores-for-the-entire-application

     

    This diagram has also be shamelessly stolen from the official docs:

     

    ../_images/streams-interactive-queries-api-02.png

    I think this diagram does an excellent job of showing you 3 separate processor nodes, and each of them may have a bit of state. ONLY be assembling ALL the data from these nodes are we able to see the ENTIRE dataset.

     

    Kafka allows this via metadata about the streams, where we can use the exposed metadata to help us gather the state store data. To do this we first need a MetadataService, which for me is as follows:

     

    package Processing.Ratings
    
    import org.apache.kafka.streams.KafkaStreams
    import org.apache.kafka.streams.state.StreamsMetadata
    import java.util.stream.Collectors
    import Entities.HostStoreInfo
    import org.apache.kafka.common.serialization.Serializer
    import org.apache.kafka.connect.errors.NotFoundException
    import scala.collection.JavaConverters._
    
    
    /**
      * Looks up StreamsMetadata from KafkaStreams
      */
    class MetadataService(val streams: KafkaStreams) {
    
    
       /**
        * Get the metadata for all of the instances of this Kafka Streams application
        *
        * @return List of { @link HostStoreInfo}
        */
      def streamsMetadata() : List[HostStoreInfo] = {
    
        // Get metadata for all of the instances of this Kafka Streams application
        val metadata = streams.allMetadata
        return mapInstancesToHostStoreInfo(metadata)
      }
    
    
      /**
        * Get the metadata for all instances of this Kafka Streams application that currently
        * has the provided store.
        *
        * @param store The store to locate
        * @return List of { @link HostStoreInfo}
        */
      def streamsMetadataForStore(store: String) : List[HostStoreInfo] = {
    
        // Get metadata for all of the instances of this Kafka Streams application hosting the store
        val metadata = streams.allMetadataForStore(store)
        return mapInstancesToHostStoreInfo(metadata)
      }
    
    
      /**
        * Find the metadata for the instance of this Kafka Streams Application that has the given
        * store and would have the given key if it exists.
        *
        * @param store Store to find
        * @param key   The key to find
        * @return { @link HostStoreInfo}
        */
      def streamsMetadataForStoreAndKey[T](store: String, key: T, serializer: Serializer[T]) : HostStoreInfo = {
        // Get metadata for the instances of this Kafka Streams application hosting the store and
        // potentially the value for key
        val metadata = streams.metadataForKey(store, key, serializer)
        if (metadata == null)
          throw new NotFoundException(
            s"No metadata could be found for store : ${store}, and key type : ${key.getClass.getName}")
    
        HostStoreInfo(metadata.host, metadata.port, metadata.stateStoreNames.asScala.toList)
      }
    
    
      def mapInstancesToHostStoreInfo(metadatas : java.util.Collection[StreamsMetadata]) : List[HostStoreInfo] = {
    
        metadatas.stream.map[HostStoreInfo](metadata =>
          HostStoreInfo(
            metadata.host(),
            metadata.port,
            metadata.stateStoreNames.asScala.toList))
          .collect(Collectors.toList())
          .asScala.toList
      }
    
    }
    

     

    This metadata service is used to obtain the state store information, which we can then use to extract the state data we want (it’s a key value store really).

     

    The next thing we need to do is expose a REST API to allow us to get the state. lets see that now

     

    package Processing.Ratings
    
    import org.apache.kafka.streams.KafkaStreams
    import org.apache.kafka.streams.state.HostInfo
    import akka.actor.ActorSystem
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.model._
    import akka.http.scaladsl.server.Directives._
    import akka.stream.ActorMaterializer
    import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
    import spray.json.DefaultJsonProtocol._
    import Entities.AkkaHttpEntitiesJsonFormats._
    import Entities._
    import Stores.StateStores
    import akka.http.scaladsl.marshalling.ToResponseMarshallable
    import org.apache.kafka.common.serialization.Serdes
    import scala.concurrent.{Await, ExecutionContext, Future}
    import akka.http.scaladsl.unmarshalling.Unmarshal
    import spray.json._
    import scala.util.{Failure, Success}
    import org.apache.kafka.streams.state.QueryableStoreTypes
    import scala.concurrent.duration._
    
    
    
    object RestService {
      val DEFAULT_REST_ENDPOINT_HOSTNAME  = "localhost"
    }
    
    
    class RatingRestService(val streams: KafkaStreams, val hostInfo: HostInfo) {
    
      val metadataService = new MetadataService(streams)
      var bindingFuture: Future[Http.ServerBinding] = null
    
      implicit val system = ActorSystem("rating-system")
      implicit val materializer = ActorMaterializer()
      implicit val executionContext = system.dispatcher
    
    
      def start() : Unit = {
        val emailRegexPattern =  """\w+""".r
        val storeNameRegexPattern =  """\w+""".r
    
        val route =
    
    
          path("test") {
            get {
              parameters('email.as[String]) { (email) =>
                complete(HttpEntity(ContentTypes.`text/html(UTF-8)`,
                            s"<h1>${email}</h1>"))
              }
            }
          } ~
          path("ratingByEmail") {
            get {
              parameters('email.as[String]) { (email) =>
                try {
    
                  val host = metadataService.streamsMetadataForStoreAndKey[String](
                    StateStores.RANKINGS_BY_EMAIL_STORE,
                    email,
                    Serdes.String().serializer()
                  )
    
                  var future:Future[List[Ranking]] = null
    
                  //store is hosted on another process, REST Call
                  if(!thisHost(host))
                    future = fetchRemoteRatingByEmail(host, email)
                  else
                    future = fetchLocalRatingByEmail(email)
    
                  val rankings = Await.result(future, 20 seconds)
                  complete(rankings)
                }
                catch {
                  case (ex: Exception) => {
                    val finalList:List[Ranking] = scala.collection.immutable.List[Ranking]()
                    complete(finalList)
                  }
                }
              }
            }
          } ~
          path("instances") {
            get {
              complete(ToResponseMarshallable.apply(metadataService.streamsMetadata))
            }
          }~
          path("instances" / storeNameRegexPattern) { storeName =>
            get {
              complete(ToResponseMarshallable.apply(metadataService.streamsMetadataForStore(storeName)))
            }
          }
    
        bindingFuture = Http().bindAndHandle(route, hostInfo.host, hostInfo.port)
        println(s"Server online at http://${hostInfo.host}:${hostInfo.port}/\n")
    
        Runtime.getRuntime.addShutdownHook(new Thread(() => {
          bindingFuture
            .flatMap(_.unbind()) // trigger unbinding from the port
            .onComplete(_ => system.terminate()) // and shutdown when done
        }))
      }
    
    
      def fetchRemoteRatingByEmail(host:HostStoreInfo, email: String) : Future[List[Ranking]] = {
    
        val requestPath = s"http://${hostInfo.host}:${hostInfo.port}/ratingByEmail?email=${email}"
        println(s"Client attempting to fetch from online at ${requestPath}")
    
        val responseFuture: Future[List[Ranking]] = {
          Http().singleRequest(HttpRequest(uri = requestPath))
            .flatMap(response => Unmarshal(response.entity).to[List[Ranking]])
        }
    
        responseFuture
      }
    
      def fetchLocalRatingByEmail(email: String) : Future[List[Ranking]] = {
    
        val ec = ExecutionContext.global
    
        val host = metadataService.streamsMetadataForStoreAndKey[String](
          StateStores.RANKINGS_BY_EMAIL_STORE,
          email,
          Serdes.String().serializer()
        )
    
        val f = StateStores.waitUntilStoreIsQueryable(
          StateStores.RANKINGS_BY_EMAIL_STORE,
          QueryableStoreTypes.keyValueStore[String,List[Ranking]](),
          streams
        ).map(_.get(email))(ec)
    
        val mapped = f.map(ranking => {
          if (ranking == null)
            List[Ranking]()
          else
            ranking
        })
    
        mapped
      }
    
      def stop() : Unit = {
        bindingFuture
          .flatMap(_.unbind()) // trigger unbinding from the port
          .onComplete(_ => system.terminate()) // and shutdown when done
      }
    
      def thisHost(hostStoreInfo: HostStoreInfo) : Boolean = {
        hostStoreInfo.host.equals(hostInfo.host()) &&
          hostStoreInfo.port == hostInfo.port
      }
    }
    

     

    With that final class we are able to run the application and query it using the url http://localhost:8080/ratingByEmail?email=sacha@here.com (the key to the Kafka store here is “sacha@here.com” and the value could either be an empty list or a List[Ranking] objects as JSON, the results of which are shown below after we have run the producer and used Chrome (or any other REST tool of your picking) to get the results

     

    image

     

     

    Conclusion

    I have found the journey to get here an interesting one. The main issue being that the Kafka docs and example are all written in Java and some are not even using Java Lambdas (Java 1.8) so the translation from that to Scala code (where there is lambda everywhere) is sometimes trickier than you might think.

     

    The other thing that has caught me out a few times is that the Scala type system is pretty good at inferring the correct types, so you kind of let it get on with its job. But occasionally it doesn’t/can’t infer the type correctly, this may happen at compile time if you are lucky, or at run time. In the case of a runtime issue, I found it fairly hard to see exactly which part of the Kafka streams API would need to be told a bit more type information.

     

    As a general rule of thumb, if there is an overloaded method that takes a serde, and  one that doesn’t ALWAYS use the one that takes a serde and specify the generic type parameters explicitly. The methods that take Serdes are usually ones that involve some sort of shuffling around within partitions so need Serdes to serialize and deserialize correctly.

     

    Other than that I am VERY happy with working with Kafka streams, and once you get into it, its not that different from working with Apache Spark and RDDs

     

     

    Next time

    Next time we will be turning our attention back to the web site, where we will expose an endpoint that can be called from the ratings dialog that is launched at the end of a job. This endpoint will take the place of the RatingsProducerApp demonstrated in this app. For that we will be using https://github.com/akka/reactive-kafka. We would also expose a new end point to fetch the rating (via email address) fro the Kafka stream processor node

     

    The idea being that when a job is completed a Rating from a driver to passenger is given, this is sent to the Kafka stream processor node, and the combined rating are accumulated for users (by email address) and are exposed to be queried. As you can see this post covers the later part of this requirement already. The only thing we would need to do (as stated above) is replace the RatingsProducerApp demonstrated in this app with new reactive kafka producer in the main Play application

    Advertisements

    scala environment config options

    This is going to be a slightly weird post in a way as it is going to go round the houses a bit, and not going to contain any actual code, but shall talk about possible techniques of how to best manage specific environment config values for a multi project scala setup

    Coming from .NET

    So as many of you know I came from .NET, where we have a simple config model. We have App.Config or Web.Config.

    We have tools at our disposal such as the XmlTransformation MsBuild tasks which allow us to maintain a set of different App.Config values in them that will be transformed for your different environments

    I wrote a post about this here

    https://sachabarbs.wordpress.com/2015/07/07/app-config-transforms-outside-of-web-project/

    Here is a small example of what this might look like

     

    image

    So when I started doing multi project Scala projects using SBT where I might have the following project requirements

    image

    In the above diagram the following is assumed

    • There is a Sacha.Common library that we want to use across multiple projects
    • That both Sacaha.Rest.Endpoints and Sacha.Play.FrontEnd are both apps which will need some environment specific configuration in them
    • That there is going to be more than 1 environment that we wish to use such as
      • PROD
      • QA
      • DEV

    Now coming from .NET my initial instinct was to put a a bunch of folders in the 2 apps, so taking the Sacha.Rest.Endpoints app as an example we may have something like this

    image

    So the idea would be that we would have specific application.conf files for the different environments that we need to target (I am assuming there is some build process which takes care of putting the correct file in place for the correct build within the CI server).

    This is very easy to understand, if we want QA we would end up using the QA version of the application.conf file

    This is a very common way of thinking about this problem in .NET.

    Why Is This Bad?

    But hang on here this is only 1 app, what if we had 100 apps that made up our software in total. That means we need to maintain all these separate config files for all the environments in ALL the separate apps.

    Wow that doesn’t sound so cool anymore.

    Another Approach!

    A colleague and I were talking about this in some scala code that was being written for a new project, and this is kind of what was being discussed.

    I should point out that this idea was not in fact mine, but my colleagues Andy Sprague, which is not something I credited him for in the 1st draft of this post. Which is bad, sorry Andy.

    Anyway how about this for another idea. How about the Sacha.Common JAR hold just the specific bits of changing config in separate config files such as

    • “Qa.conf”
    • “Prod.conf”
    • “Dev.conf”

    And then the individual apps that already reference the Sacha.Common JAR just include the environment config they need.

    This is entirely possible thanks to the way that the Typesafe config library works, where it is designed to include extra config files. These extra config files in this case are just inside of the a JAR that is external -> Sacha.Common

    Here is what this might like look for a consumer of the Sacha.Common jar

    image

    Where we just include the relevant environment config from Sacha.Common in the application.conf for the current app

    And this is what the Sacha.Common may look like, where it provides the separate environment config files that consumers may use

    image

    This diagram may help to illustrate this further

    image

    Why Is This Cool?

    The good thing about this design over the separate config files per environment per application is that we now ONLY need to maintain one set of environment specific settings, which are those in the common Jar Sacha.Common

    I wish we could do this within the .NET configuration system.

    Hope this helps, I personally think that this is a fairly nice way to manage your configs for multiple applications and multiple environments

     

     

     

     

     

     

    Apache Kafka 0.9 Scala Producer/Consumer

    For my job at the moment, I am roughly spending 50% of my time working on .NET and the other 50% of the time working with Scala. As such a lot of Scala/JVM toys have spiked my interest of late. My latest quest was to try and learn Apache Kafka, well enough that I at least understood the core concepts. I have even read a book or two on Apache Kafka, now, so feel I am at least talking partial sense in this article.

    So what is Apache Kafka, exactly?

    Here is what the Apache Kafka folks have to say about their own tool.

    Apache Kafka is publish-subscribe messaging rethought as a distributed commit log.
    Fast
    A single Kafka broker can handle hundreds of megabytes of reads and writes per second from thousands of clients.

    Scalable
    Kafka is designed to allow a single cluster to serve as the central data backbone for a large organization. It can be elastically and transparently expanded without downtime. Data streams are partitioned and spread over a cluster of machines to allow data streams larger than the capability of any single machine and to allow clusters of co-ordinated consumers

    Durable
    Messages are persisted on disk and replicated within the cluster to prevent data loss. Each broker can handle terabytes of messages without performance impact.

    Distributed by Design
    Kafka has a modern cluster-centric design that offers strong durability and fault-tolerance guarantees.

    Taken from http://kafka.apache.org/ up on date 11/03/16

    Apache Kafka was designed and built by a team of engineers at LinkedIn, where I am sure you will agree they probably had to deal with quite a bit of data.

     

    I decided to learn a bit more about all this and have written an article on this over at code project :

     

    http://www.codeproject.com/Articles/1085758/Apache-Kafka-Scala-Producer-Consumer-With-Some-RxS

     

    In this article I will talk you through some of the core Apache Kafka concepts, and will also show how to create a Scala Apache Kafka Producer and a Scala Apache Kafka Consumer. I will also sprinkle some RxScala pixie dust on top of the Apache Kafka Consumer code such that the RX operators to be applied to the incoming Apache Kafka messages.

    CASSANDRA + SPARK 2 OF 2

    Last time I walked you through how to install Cassandra in the simplest manner possible. Which was as a single node installation using the DataStax community edition.

    All good stuff. So I have also just written up how you might use Scala/DataStax Cassandra/Spark connector, to allow you to retrieve data from Cassandra into Spark RDDs and Cassandra tables to be hydrated into Spark RDDs.

    These 2 Cassandra articles and the 1st Spark one kind of form a series of articles, which you can find using the series links at the top of the articles.

    Anyway here is the latest installment

    Apache Spark/Cassandra 2 of 2

    CASSANDRA + SPARk 1 of 2

    A while ago I wrote about using Apache Spark, which is a great tool. I have been using Cassandra for a bit at work now, so thought it might be nice to revisit that artilcle and talk through how to use Spark with Cassandra.

    Here is the 1st part of that ; http://www.codeproject.com/Articles/1073158/Apache-Spark-Cassandra-of

    Scala : multi project sbt setup

    A while ago I wrote a post about how to use SBT (Scala Build Tool):

    https://sachabarbs.wordpress.com/2015/10/13/sbt-wheres-my-nuget/

    In that post I showed simple usages of SBT. Thing is that was not really that realistic, so I wanted to have a go at a more real world example of this. One where we might have multiple projects, say like this:

     

    SachasSBTDemo-App which depends on 2 sub projects

    • SachasSBTDemo-Server
    • SachasSBTDemo-Common

    So how do we go about doing this with SBT?

    There are 5 main steps to do this. Which we look at in turn.

    SBT Directory Structure

    The first that we need to do is create a new project folder (if you are from Visual Studio / .NET background think of this as the solution folder) called “project”

    In here we will create 2 files

    build.properties which just lists the version of SBT we will use. It looks like this

    sbt.version=0.13.8
    

    SachaSBTDemo.scala is what I have called the other file, but you can call it what you like. Here is the contents of that file, this is the main SBT file that governs how it all hangs together. I will be explaining each of these parts as we go.

      import sbt._
      import Keys._
    
    object BuildSettings {
    
    
      val buildOrganization = "sas"
      val buildVersion      = "1.0"
      val buildScalaVersion = "2.11.5"
    
      val buildSettings = Defaults.defaultSettings ++ Seq (
        organization := buildOrganization,
        version      := buildVersion,
        scalaVersion := buildScalaVersion
      )
    }
    
    
    object Dependencies {
      val jacksonjson = "org.codehaus.jackson" % "jackson-core-lgpl" % "1.7.2"
      val scalatest = "org.scalatest" % "scalatest_2.9.0" % "1.4.1" % "test"
    }
    
    
    object SachasSBTDemo extends Build {
    
      import Dependencies._
      import BuildSettings._
    
      // Sub-project specific dependencies
      val commonDeps = Seq (
         jacksonjson,
         scalatest
      )
    
      val serverDeps = Seq (
         scalatest
      )
    
    
      lazy val demoApp = Project (
        "SachasSBTDemo-App",
        file ("SachasSBTDemo-App"),
        settings = buildSettings
      )
      //build these projects when main App project gets built
      .aggregate(common, server)
      .dependsOn(common, server)
    
      lazy val common = Project (
        "common",
        file ("SachasSBTDemo-Common"),
        settings = buildSettings ++ Seq (libraryDependencies ++= commonDeps)
      )
    
      lazy val server = Project (
        "server",
        file ("SachasSBTDemo-Server"),
        settings = buildSettings ++ Seq (libraryDependencies ++= serverDeps)
      ) dependsOn (common)
      
    }
    

     

    Projects

    In order to have separate project we need to use the Project item from the SBT library JARs. A minimal Project setup will tell SBT where to create the new Project. Here is an example of a Project, where the folder we expect SBT to create will be called “SachasSBTDemo-App”.

    lazy val demoApp = Project (
        "SachasSBTDemo-App",
        file ("SachasSBTDemo-App"),
        settings = buildSettings
      )
    

    Project Dependencies

    We can also specify Project dependencies using “dependsOn” which takes a Seq of other projects that this Project depends on.

    That means that when we apply an action to the Project that is depended on, the Project that has the dependency will also have the action applied.

    lazy val demoApp = Project (
        "SachasSBTDemo-App",
        file ("SachasSBTDemo-App"),
        settings = buildSettings
      )
      //build these projects when main App project gets built
      .aggregate(common, server)
      .dependsOn(common, server)
    

    Project Aggregation

    We can also specify Project aggregates results from other projects, using “aggregate” which takes a Seq of other projects that this Project aggregates.

    What “aggregate” means is that whenever we apply an action on the aggregating Project we should also see the same action applied to the aggregated Projects.

    lazy val demoApp = Project (
        "SachasSBTDemo-App",
        file ("SachasSBTDemo-App"),
        settings = buildSettings
      )
      //build these projects when main App project gets built
      .aggregate(common, server)
      .dependsOn(common, server)
    

    Library Dependencies

    Just like the simple post I did before, we still need to bring in our JAR files using SBT. But this time we come up with a nicer way to manage them. We simply wrap them all up in a simple object, and then use the object to satisfy the various dependencies of the Projects. Much neater.

    import sbt._
    import Keys._
    
    
    object Dependencies {
      val jacksonjson = "org.codehaus.jackson" % "jackson-core-lgpl" % "1.7.2"
      val scalatest = "org.scalatest" % "scalatest_2.9.0" % "1.4.1" % "test"
    }
    
      // Sub-project specific dependencies
      val serverDeps = Seq (
         scalatest
      )
    
      .....
      .....
      lazy val server = Project (
        "server",
        file ("SachasSBTDemo-Server"),
        //bring in the library dependencies
        settings = buildSettings ++ Seq (libraryDependencies ++= serverDeps)
      ) dependsOn (common)
    

    The Finished Product

    The final product once run through SBT should be something like this if viewed in IntelliJ IDEA:

    image

     

    Or like on the file system

    image

    If you want to grab my source files, they are available here at GitHub : https://github.com/sachabarber/SBT_MultiProject_Demo

    SCALA mocking

     

    Last time we looked at writing unit tests for our code, where we looked at using ScalaTest. This time we will be looking at mocking.

    In .NET there are several choices available that I like (and a couple that I don’t), such as :

    • Moq
    • FakeItEasy
    • RhinoMocks (this is one I am not keen on)

    I personally am most familiar with Moq, so when I started looking at JVM based mocking frameworks I kind of wanted one that used roughly the same syntax as the ones that I had used in .NET land.

    There are several choices available that I think are quite nicely, namely :

    • ScalaMock
    • EasyMock
    • JMock
    • Mockito

    Which all play nicely with ScalaTest (which I am sure you are all very pleased to here).

    So with that list what did I decide upon. I personally opted for Mockito, as I liked the syntax the best, that is not to say the others are not fine and dandy, it is just that I personally liked Mockito and it seemed to have good documentation and favorable Google search results, so Mockito it is.

    So for the rest of this post I will talk about how to use Mockito to write our mocks. I will be used Mockito along side ScalaTest which we looked at last time.

    SBT Requirements

    As with most of the previous posts you will need to grab the libraries using SBT. As such your SBT file will need to use the following:

    libraryDependencies ++= Seq(
      "org.mockito" % "mockito-core" % "1.8.5",
      "org.scalatest" %% "scalatest" % "2.2.5" % "test"
    )
    

     

    Our First Example

    So with all that stated above. Lets have a look at a simple example. This trivial example mocks out a java.util.ArrayList[String]. And also sets up a few verifications

    class FlatSpec_Mocking_Tests extends FlatSpec with Matchers with MockitoSugar {
    
    
      "Testing using Mockito " should "be easy" in {
    
    
        //mock creation
        val mockedList = mock[java.util.ArrayList[String]]
    
        //using mock object
        mockedList.add("one");
        mockedList.clear
    
        //verification
        verify(mockedList).add("one")
        verify(mockedList).clear
    
      }
    }
    

    One thing you may notice straight away is how the F*k am I able to mock a ArrayList[T], which is a class which is not abstract by the way. This is pretty cool.

     

    Stubbing

    Using Mockito we can also stub out things just as you would expect with any 1/2 decent mocking framework. Here is an example where we try and mock out a simple trait.

    import java.util.Date
    import org.scalatest._
    import org.scalatest.mock._
    import org.mockito.Mockito._
    
    
    trait DumbFormatter {
    
      def formatWithDataTimePrefix(inputString : String, date : Date) : String = {
        s"date : $date : $inputString"
      }
    
      def getDate() : String = {
        new Date().toString
      }
    }
    
    
    
    class FlatSpec_Mocking_Tests extends FlatSpec with Matchers with MockitoSugar {
    
      "Stubbing using Mockito " should "be easy" in {
    
        var mockDumbFormatter = mock[DumbFormatter]
        when(mockDumbFormatter.getDate()).thenReturn("01/01/2015")
        assert("01/01/2015" === mockDumbFormatter.getDate())
      }
    }
    

    It can be seen above that it is quite easy to mock a trait. You can also see how we stub the mock out using the  Mockito functions

    • when
    • thenReturn

     

    Return Values

    We just saw an example above of how to use the “thenReturn” Mockito function, which is what you would use to setup your return value. If you want a dynamic return value this could quite easily call some other function which deals with creating the return values. Kind of a return value factory method.

     

    Argument Matching

    Mockito comes with something that allows you to match against any argument value. It also comes with regex matchers, and allows you to write custom matchers if the ones out of the box don’t quite fit your needs.

    Here is an example of writing a mock where we use the standard argument matchers:

    import java.util.Date
    import org.scalatest._
    import org.scalatest.mock._
    import org.mockito.Mockito._
    import org.mockito.Matchers._
    
    
    trait DumbFormatter {
    
      def formatWithDataTimePrefix(inputString : String, date : Date) : String = {
        s"date : $date : $inputString"
      }
    
      def getDate() : String = {
        new Date().toString
      }
    }
    
    
    
    class FlatSpec_Mocking_Tests extends FlatSpec with Matchers with MockitoSugar {
    
      "Stubbing using Mockito " should "be easy" in {
    
        var mockDumbFormatter = mock[DumbFormatter]
        when(mockDumbFormatter.formatWithDataTimePrefix(anyString(),any[Date]())).thenReturn("01/01/2015 Something")
        assert("01/01/2015 Something" === mockDumbFormatter.formatWithDataTimePrefix("blah blah blah", new Date()))
      }
    }
    

    Exceptions

    To throw exceptions with Mockito we simply need to use the “thenThrow(….) function. Here is how.

    import java.util.Date
    import org.scalatest._
    import org.scalatest.mock._
    import org.mockito.Mockito._
    import org.mockito.Matchers._
    
    
    trait DumbFormatter {
    
      def formatWithDataTimePrefix(inputString : String, date : Date) : String = {
        s"date : $date : $inputString"
      }
    
      def getDate() : String = {
        new Date().toString
      }
    }
    
    
    
    class FlatSpec_Mocking_Tests extends FlatSpec with Matchers with MockitoSugar {
    
      "Stubbing using Mockito " should "be easy" in {
    
        var mockDumbFormatter = mock[DumbFormatter]
        when(mockDumbFormatter.formatWithDataTimePrefix(anyString(),any[Date]()))
    	.thenThrow(new RuntimeException())
    
        //use the ScalaTest intercept to test for exceptions
        intercept[RuntimeException] {
          mockDumbFormatter.formatWithDataTimePrefix("blah blah blah", new Date())
        }
      }
    }
    

    See how we also have to use the ScalaTest “intercept” for the actually testing

     

    CallBacks

    Callbacks are useful when you want to see what a method was called with and then you can make informed decisions about what you could possibly return.

    Here is how you do callbacks in Mockito, note the use of the “thenAnswer” function, and how we use an anonymous Answer object.

    import java.util.Date
    import org.mockito.invocation.InvocationOnMock
    import org.mockito.stubbing.Answer
    import org.scalatest._
    import org.scalatest.mock._
    import org.mockito.Mockito._
    import org.mockito.Matchers._
    
    
    trait DumbFormatter {
    
      def formatWithDataTimePrefix(inputString : String, date : Date) : String = {
        s"date : $date : $inputString"
      }
    
      def getDate() : String = {
        new Date().toString
      }
    }
    
    
    
    class FlatSpec_Mocking_Tests extends FlatSpec with Matchers with MockitoSugar {
    
      "Stubbing using Mockito " should "be easy" in {
    
        var mockDumbFormatter = mock[DumbFormatter]
        when(mockDumbFormatter.formatWithDataTimePrefix(anyString(),any[Date]()))
          .thenAnswer(new Answer[String] {
            override def answer(invocation: InvocationOnMock): String = {
              val result = "called back nicely sir"
              println(result)
              result
            }
          })
    
        assert("called back nicely sir" === mockDumbFormatter.formatWithDataTimePrefix("blah blah blah", new Date()))
    
    
    
      }
    }
    

     

    Verification

    The last thing I wanted to talk about was verification. Which may include verifying functions got called, and were called the right number of times.

    Here is a simple example of this:

    import java.util.Date
    import org.mockito.invocation.InvocationOnMock
    import org.mockito.stubbing.Answer
    import org.scalatest._
    import org.scalatest.mock._
    import org.mockito.Mockito._
    import org.mockito.Matchers._
    
    
    trait DumbFormatter {
    
      def formatWithDataTimePrefix(inputString : String, date : Date) : String = {
        s"date : $date : $inputString"
      }
    
      def getDate() : String = {
        new Date().toString
      }
    }
    
    
    
    class FlatSpec_Mocking_Tests extends FlatSpec with Matchers with MockitoSugar {
    
      "Stubbing using Mockito " should "be easy" in {
    
        var mockDumbFormatter = mock[DumbFormatter]
        when(mockDumbFormatter.formatWithDataTimePrefix(anyString(),any[Date]()))
          .thenReturn("someString")
    
        val theDate = new Date()
        val theResult = mockDumbFormatter.formatWithDataTimePrefix("blah blah blah", theDate)
        val theResult2 = mockDumbFormatter.formatWithDataTimePrefix("no no no", theDate)
    
        verify(mockDumbFormatter, atLeastOnce()).formatWithDataTimePrefix("blah blah blah", theDate)
        verify(mockDumbFormatter, times(1)).formatWithDataTimePrefix("no no no", theDate)
    
    
      }
    }
    

     

     

    Further Reading

    You can read more about how to use Mockito from the docs : https://docs.google.com/document/d/15mJ2Qrldx-J14ubTEnBj7nYN2FB8ap7xOn8GRAi24_A/edit

     

     

    End Of The Line

    Personally my quest goes on, I am going to keep going until I consider myself  good at Scala (which probably means I know nothing).

    Anyway behind the scenes I will be studying more and more stuff about how to get myself to that point. As such I guess it is only natural that I may post some more stuff about Scala in the future.

    But for now this it it, this is the end of the line for this brief series of posts on Scala. I hope you have all enjoyed the posts, and if you have please feel free to leave a comment, they are always appreciated.

     

    SCALA : TESTING OUR CODE

     

    So last time we looked at how to use Slick to connect to a SQL server database.

    This time we look at how to use one of the 2 popular Scala testing frameworks.

    The 2 big names when it comes to Scala testing are

    • ScalaTest
    • Specs2

    I have chosen to use ScalaTest as it seems slightly more popular, when you do a Google search, and I quite liked the syntax. That said Specs2 is also very good. so if you fancy having a look at that you should.

    SBT for ScalaTest

    So what do we need to get started with ScalaTest. As always we need to grab the JAR, which we do using SBT.

    At time of writing this was accomplished using this SBT entry:

    name := "ClassesDemo"
    
    version := "1.0"
    
    scalaVersion := "2.11.7"
    
    libraryDependencies ++= Seq(
      "org.scalatest" %% "scalatest" % "2.2.5" % "test"
    )
    

    With that in place, SBT should pull down the JAR from Maven Central for you. So once you are happy that you have the ScalaTest JAR installed, we can not proceed to write some tests.

     

    Writing Some Tests

    I come from a .NET background, and as such I am using to working with tools such as

    • NUnit
      • TestFixture
      • Setup : To setup the test
      • TearDown : To teardown the test
    • Moq / FakeItEasy : Mocking frameworks

    As such I wanted to make sure I could do everything that I was used to in .NET using ScalaTest.

    This article will concentrate on the testing side of things, while the next post will be on the mocking side of things.

    So let’s carry on for now shall we.

    Choosing You Test Style

    ScalaTest allows you to use 2 different styles of writing tests.

    • FunSuite : This is more in line with what you get with NUnit say. We would write something like Test(“testing should be easy”)
    • FlatSpec : This is more of a BDD style test declaration, where we would write something like this: “Testing” should “be easy”
       

    We will see an examples of both of these styles in just a minute, but before that lets carry on and looks at some of the common things you may want to do with your tests

    Setup / TearDown

    You may want to run some startup/teardown code that is run. Typically startup would be used to setup mocks for your test cases, and that sort of thing.

    In things like NUnit this would simply be done by creating a method and attributing it to say it is the Setup/TearDown methods.

    In ScalaTest things are slightly different in that we need to mixin the “BeforeAndAfter”  trait to do this. Lets see an example:

    import org.scalatest.{FunSuite, BeforeAndAfter}
    import scala.collection.mutable.ListBuffer
    
    class FunSuite_Example_Tests extends FunSuite with BeforeAndAfter {
    
      val builder = new StringBuilder
      val buffer = new ListBuffer[String]
    
      before {
        builder.append("ScalaTest is ")
      }
    
      after {
        builder.clear()
        buffer.clear()
      }
    }
    

    It can be seen in this example that the BeforeAndAfter trait, gives you 2 additional functions

    • before
    • after

    You can use these to perform your startup/teardown logic.

    This example uses the FunSuite style, but the “BeforeAndAfter”  trait mixin is done exactly the same for the FlatSpec style testing.

     

    Writing A Test Using FunSuite

    I think if you have come from a NUnit / XUnit type of background you will probably identify more with the FunSuite style of testing.

    Here is an example of a set of FunSuite tests.

    import org.scalatest.{FunSuite, BeforeAndAfter}
    
    import scala.collection.mutable.ListBuffer
    
    class FunSuite_Example_Tests extends FunSuite with BeforeAndAfter {
    
      val builder = new StringBuilder
      val buffer = new ListBuffer[String]
    
      before {
        builder.append("ScalaTest is ")
      }
    
      after {
        builder.clear()
        buffer.clear()
      }
    
      test("Testing should be easy") {
        builder.append("easy!")
        assert(builder.toString === "ScalaTest is easy!")
        assert(buffer.isEmpty)
        buffer += "sweet"
      }
    
      test("Testing should be fun") {
        builder.append("fun!")
        assert(builder.toString === "ScalaTest is fun!")
        assert(buffer.isEmpty)
      }
    }
    

    It can be see that they follow the very tried and tested approach of tools like NUnit, where you have a test(…) function, where “…” is the text that describes your testcase.

    Nothing much more to say there apart from to make sure you mixin the FunSuite trait.

     

    Writing A Test Using FlatSpec

    ScalaTest also supports another way of writing your tests, which is to use the FlatSpec trait, which you would mixin instead of the FunSuite trait.

    When you use FlatSpec you would be writing your tests more like this:

    • “Testing” should “be easy” in {…}
    • it should “be fun” in {…}

    Its more of a BDD style way of creating your test cases.

    Here is the exact same test suite that we saw above but this time written using the FlatSpec instead of FunSuite.

    import scala.collection.mutable.ListBuffer
     
    class FlatSpec_Example_Tests extends FlatSpec with BeforeAndAfter {
     
        val builder = new StringBuilder
        val buffer = new ListBuffer[String]
     
         before {
             builder.append("ScalaTest is ")
           }
     
         after {
             builder.clear()
             buffer.clear()
           }
     
        "Testing" should "be easy" in {
             builder.append("easy!")
             assert(builder.toString === "ScalaTest is easy!")
             assert(buffer.isEmpty)
             buffer += "sweet"
           }
     
         it should "be fun" in {
             builder.append("fun!")
             assert(builder.toString === "ScalaTest is fun!")
             assert(buffer.isEmpty)
           }
    }
    

    I don’t mind either, I guess it’s down to personal choice/taste at the end of the day.

    Using Matchers

    Matchers are ScalaTest’s way of providing additonal constraints to assert against. In some testing frameworks we would just use the Assert class for that along with things like

    • Assert.AreEqual(..)
    • Assert.IsNotNull(..)

    In ScalaTest you can still use the assert(..) function, but matchers are also a good way of expressing your test conditional.

    So what exactly are matchers?

    In the words of the ScalaTest creators:

    ScalaTest provides a domain specific language (DSL) for expressing assertions in tests using the word should.

    So what do we need to do to use these ScalaTest matchers? Well quite simply we need to just mix in Matchers, like this:

    import org.scalatest._
    
    class ExampleSpec extends FlatSpec with Matchers { ...}
    

    You can alternatively import the members of the trait, a technique particularly useful when you want to try out matcher expressions in the Scala interpeter. Here’s an example where the members of Matchers are imported:

    import org.scalatest._
    import Matchers._
    
    class ExampleSpec extends FlatSpec { // Can use matchers here ...
    

    So that give us the ability to use the ScalaTest matchers DSL. So what do these things look like. Lets see a couple of examples:

    import org.scalatest._
    
    
    class FlatSpec_Example_Tests extends FlatSpec with Matchers {
    
        "Testing" should "probably use some matchers" in {
    
              //equality examples
              Array(1, 2) should equal (Array(1, 2))
              val resultInt = 23
              resultInt should equal (3) // can customize equality
              resultInt should === (3)   // can customize equality and enforce type constraints
              resultInt should be (3)    // cannot customize equality, so fastest to compile
              resultInt shouldEqual 3    // can customize equality, no parentheses required
              resultInt shouldBe 3       // cannot customize equality, so fastest to compile, no parentheses required
    
              //length examples
              List(1,2) should have length 2
              "cat" should have length 3
    
              //string examples
              val helloWorld = "Hello worlld"
              helloWorld should startWith ("Hello")
              helloWorld should endWith ("world")
    
              val sevenString ="six seven eight"
              sevenString should include ("seven")
    
              //greater than / less than
              val one = 1
              val zero = 0
              val seven = 7
              one should be < seven
              one should be > zero
              one should be <= seven
              one should be >= zero
    
              //emptiness
              List() shouldBe empty
              List(1,2) should not be empty
           }
    
    }
    

     

     

    For more information on using matchers, you should consult this documentation, which you can find here:

    http://www.scalatest.org/user_guide/using_matchers

     

     

    SCALA : Connecting to a database

     

    This time we will proceed to look at using Scala to connect to SQL server.

    In .NET we have quite a few ORM choices available, as well as standard ADO.NET. For example we could use any of the following quite easily

    • Linq to SQL
    • Entity Framework
    • Dapper
    • NHibernate
    • ADO .NET

    In Scala things are a bit more tame on the ORM front. We basically only have one player, which is called “Slick”. The rest of this post will be about how to use Slick.

     

    Slick

    The good thing about Slick is that it works with a wide range of SQL dialects. For this post I will be using what I know which is MS SQL server. As such I will be using a MS SQL server driver, and there may be differences between the driver I use and other Slick drivers, but hopefully you will get the idea.

     

    Notes on MS SQL Server

    The following notes assume you are install

    I found that I had to do the following to get Slick to work with MS SQL Server

    • Turn on the TCP/IP
    • Insure that the full set of SQL server services were running for the Slick Extension SQL Server driver to work.

    Demo IntelliJ IDEA Project

    As this one is quite a lot bigger than the previous Scala posts. I have decided to upload this one to GitHub.

    You can grab the project from here :

    https://github.com/sachabarber/ScalaSlickTest

    But before you try and run it you should make sure you have done the following :

    • Created a MS SQL Server DB
    • Run  the schema creation scripts included in the IntelliJ IDEA project
    • Changed the “application.conf” file to point to YOUR SQL Server installation

     

    The rest of this post will deal with how to do various things using Slick such as:

    • Use direct SQL commands (sql strings)
    • Use the slick ORM for CRUD
    • Use a store procedure with Slick

    But before we get on to any of that lets just outline the schema we will be working with. The one and only table we will be using is this one :

    image

    So now that we know what the single (I know lame we should have had more, but meh) table looks like lets crack on

    NOTE : In the examples shown in this post I am using the Scala Async Library that I have talked about before.

     

    Using Direct SQL Commands

    In this section we will see how we can use Slick to run arbitrary SQL commands. Lets see some examples

    Return a Scalar value

    Say we only want 1 value back. Perhaps count of the rows. We can just do this:

    def selectScalarObject(db:Database) : Unit = {
    
      val action = sql"""Select count(*) as 'sysobjectsCount'  from sysobjects""".as[Int]
      val futureDB : Future[Vector[Int]] = db.run(action)
    
      async {
        val sqlData = await(futureDB)
        val count = sqlData.head
        println(s"PlainSQLHelper.selectScalarObject() sysobjectsCount: $count")
      } onFailure {
        case e => {
          println(s"ERROR : $e")
        }
      }
    }
    

    Return more than 1 value

    We may of course want a couple of values, but we are not quite ready to return a brand new entity. So we can use a Tuple.

    Here is an example:

    def selectTupleObject(db: Database) : Unit = {
    
      val action = sql"""Select count(*)  as 'sysobjectsCount', count(*)/10  as 'sysobjectsCountDiv10' from sysobjects""".as[(Int,Int)]
      val futureDB : Future[Vector[(Int,Int)]] = db.run(action)
    
      async {
        val sqlData = await(futureDB)
        val (x,y) = sqlData.head
        println(s"PlainSQLHelper.selectTupleObject() sysobjectsCount: $x, sysobjectsCountDiv10: $y")
      } onFailure {
        case e => {
          println(s"ERROR : $e")
        }
      }
    }
    

    Return a case class

    We can obviously make things more formal, and be nice and return  a nice case class. Here is an example of that:

    def selectRawTableObject(db: Database) : Unit = {
    
      val action = sql"""Select * from Items""".as[(Int,String, Double, Int)]
      val futureDB : Future[Vector[(Int,String, Double, Int)]] = db.run(action)
    
      async {
        val sqlData = await(futureDB)
        val (id,desc, cost, location) = sqlData.head
        val item = RawSQLItem(id,desc, cost, location)
        println(s"PlainSQLHelper.selectRawTableObject() Id: ${item.id}, Description: ${item.description}, Cost: ${item.cost}, WarehouseLocation: ${item.warehouseLocationId}")
      } onFailure {
        case e => {
          println(s"ERROR : $e")
        }
      }
    }
    
    
    case class RawSQLItem(id: Int, description: String, cost: Double,  warehouseLocationId: Int)
    

     

     

    Using The Slick ORM For CRUD

    These examples show how you can do the basic CRUD operations with Slick.

    However before we start to look at the CRUD operations, lets just see a bit of basic Slick code. Slick uses a trait called Table which you MUST mixin. It is also common practice that we use a companion object to create a TableQuery[T]. Here is the one for the CRUD operations we will be looking at next

    package org.com.barbers.slicktest
    
    import com.typesafe.slick.driver.ms.SQLServerDriver.api._
    
    object Items {
      val items = TableQuery[Items]
    }
    
    case class DBItem(id: Int, description: String, cost: Double,  warehouseLocationId: Int)
    
    class Items(tag: Tag) extends Table[DBItem](tag, "Products") {
      def id = column[Int]("Id", O.PrimaryKey, O.AutoInc)
      def description = column[String]("Description")
      def cost = column[Double]("Cost")
      def warehouseLocationId = column[Int]("WarehouseLocationId")
      def * = (id, description, cost, warehouseLocationId) <> (DBItem.tupled, DBItem.unapply)
    }
    

    Create

    Ok so now we have seen that Slick uses a Table mixin, and that there is a TableQuery[T] at play. Let’s move on to see how we can create some data.

    This is quite weird to do. Normally what we want from a INSERT is an Id. How Slick does that is a bit strange. We need to use the Slick DSL to say what we would like returned (the “Id”), which we do using the “returning” followed by the map of the Items table. This may sound weird but the example below may help to illustrate this a bit. Here is how we do it:

    def saveItem(db: Database, item: DBItem) = {
    
      val action =(Items.items returning Items.items.map(_.id)) +=
        DBItem(-1, item.description, item.cost, item.warehouseLocationId)
      val futureDB : Future[Int] = db.run(action)
    
      async {
        val savedItemId = await(futureDB)
        println(s"TableResultRunner.saveItem() savedItem.Id ${savedItemId}")
      } onFailure {
        case e => {
          println(s"ERROR : $e")
        }
      }
    }
    

    And here is how we store several items.For a bulk insert, we can’t really get the inserted Ids. But we can add all Items in on go using the standard Scala collection operator ++=, which appends a new collection to the current collection.

    Again an example will make this clearer

    def insertSeveralItems(db: Database, items : List[DBItem]) : Unit = {
    
      implicit val session: Session = db.createSession()
      val insertActions = DBIO.seq(
        (Items.items ++= items.toSeq).transactionally
      )
      val sql = Items.items.insertStatement
      val futureDB : Future[Unit] = db.run(insertActions)
    
      async {
        await(futureDB)
        println(s"TableResultRunner.insertSeveralItems() DONE")
      } onFailure {
        case e => {
          println(s"ERROR : $e")
        }
      }
    }
    

     

    Retrieve

    So we now have some Items, so how do we get them back from the DB?

    There are many ways to do this with Slick. Let’s use a simple Take(2) operation to start with

    def selectTwoItems(db: Database) : Unit = {
    
      implicit val session: Session = db.createSession()
      val q =  Items.items.take(2)
      val futureDB : Future[Seq[DBItem]] = db.run(q.result)
    
      async {
        val sqlData = await(futureDB)
        val item = sqlData.head
        println(s"TableResultRunner.selectTwoItems()[0] " +
          s"Id: ${item.id}, Description: ${item.description}, " +
          s"Cost: ${item.cost}, WarehouseLocationId: ${item.warehouseLocationId}")
      } onFailure {
        case e => {
          println(s"ERROR : $e")
        }
      }
    }
    

    We can also use Queries to filter out what we want from the DB. Here is an example of using a Query, where we use a filter to get all Items that have a Id that matches a Id

    def findItemById(db: Database,id : Int) = {
    
      async {
        val q = for { p <- Items.items if p.id === id } yield p
        val futureDBQuery : Future[Option[DBItem]] = db.run(q.result.headOption)
        val item : Option[DBItem] = await(futureDBQuery)
        println(s"OPTION ${item}")
        item match {
          case Some(x) =>  println(s"TableResultRunner.findItemById The item is $x")
          case _ => ()
        }
      } onFailure {
        case e => {
          println(s"ERROR : $e")
        }
      }
    }
    

     

    Update

    Update is a stranger on. Where we get out only the attributes we want from the DB using a query, and then use Slicks inbuilt update(..) function to perform the update on the columns we want. This is clearer with an example.

    In this example we want to update ONLY the “cost” column of an Item.

    def updateItemCost(db: Database, description : String, cost : Double) = {
    
      async {
        val q = Items.items
          .filter(_.description === description)
          .map(_.cost)
          .update(cost)
    
        val futureDB = db.run(q)
        val done = await(futureDB)
        println(s"Update cost of ${description}, to ${cost}")
    
        val q2 = for { p <- Items.items if p.description === description } yield p
        val futureDBQuery : Future[Seq[DBItem]] = db.run(q2.result)
        val items = await(futureDBQuery)
        items.map(item => println(s"TableResultRunner.updateItemCost The item is now $item") )
      } onFailure {
        case e => {
          println(s"ERROR : $e")
        }
      }
    }
    

    Delete

    Lastly we would like to delete an Item. So let’ see how we can do that. Again we use some Slick magic for this, where we use the .delete() function. Here is an example where I delete a random Item from the DB.

    def deleteRandomItem(db: Database) = {
    
      async {
        val q =  Items.items.take(1)
        val futureDB : Future[Seq[DBItem]] = db.run(q.result)
        val sqlData = await(futureDB)
        val item = sqlData.head
        val deleteFuture : Future[Unit] = db.run(
          Items.items.filter(_.id === item.id).delete).map(_ => ())
        await(deleteFuture)
        println(s"TableResultRunner.deleteRandomItem() deleted item.Id ${item.id}")
      } onFailure {
        case e => {
          println(s"ERROR : $e")
        }
      }
    }
    

     

    Calling A Stored Procedure

    To call a stored procedure is a as simple as using the db session, and building out the call to the right stored procedure:

    Say we have this stored procedure:

    USE [SLICKTEST]
    GO
    
    SET ANSI_NULLS ON
    GO
    
    SET QUOTED_IDENTIFIER ON
    GO
    
    CREATE PROCEDURE [dbo].[sp_SelectItemsByDescription]
        (
          @description NVARCHAR(MAX)
        )
    AS
    BEGIN
    	SET NOCOUNT ON;
    
    	select * from Items i where i.[Description] LIKE '%' + @description + '%'
    
    END
    
    GO
    
    
    

    This is how we would call it using slick

    def selectItems(db: Database, description: String): Unit = {
    
      val sqlStatement = db.source.createConnection().prepareCall(
        "{ call [dbo].[sp_SelectItemsByDescription](?) }",
        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
    
      sqlStatement.setFetchDirection(ResultSet.FETCH_FORWARD);
      sqlStatement.setString("@desc", description)
    
      val rs = sqlStatement.executeQuery()
    
      while (rs.next()) {
        val item = new DBItem(
          rs.getInt("Id"),
          rs.getString("Description"),
          rs.getDouble("Cost"),
          rs.getInt("WarehouseLocationId"))
    
        println(s"StoredProcedureHelper.selectProducts " +
          "using description set to ${desc} got this result : " +
          s"Id: ${item.id}, Description: ${item.description}, " +
          s"Cost: ${item.cost}, WarehouseLocationId: ${item.warehouseLocationId}")
      }
    
      rs.close()
      sqlStatement.close()
    }
    

     

     

    scala : dependency injection / ioc

    In software engineering, dependency injection is a software design pattern that implements inversion of control for resolving dependencies. Dependency injection means giving an object its instance variables. Really. That’s it.

    However there are several ways of doing this, and as such it is a fairly big topic, and I will not be able to go into the very specific details of DI/ IOC in one post.

    Instead I shall attempt to outline some of the ways you could do DI / IOC in Scala (and like I say there are a few).

    I will play nice though, and will try and point out good resources along the way, that you can follow for more information

     

    Factories

    One way of doing simple poor mans DI is to use factories, which decouple the client from the actual instance class that it may need to fulfill its role.

    Here is an example of a factory and a class that needs a service inside it. We simply use the factory to get the service we need.

    
    
    import com.typesafe.config.{ConfigObject, ConfigValue, ConfigFactory, Config}
    import scala.collection.JavaConverters._
    import java.net.URI
    import java.util.Map.Entry
    
    
    trait Processor {
      def Process() : Unit
    }
    
    class ActualProcessor() extends Processor {
      override def Process(): Unit = {
          println("ActualProcessor")
      }
    }
    
    
    object ProcessorFactory {
    
      var _processor: Processor = new ActualProcessor()
    
      // Getter
      def processor = _processor
    
      // Setter
      def processor_=(newProcessor: Processor): Unit = _processor = newProcessor
    }
    
    
    class OrderService {
    
      def processOrder(): Unit = {
        val processor = ProcessorFactory.processor
        processor.Process
      }
      
    }
    
    
    
    object ClassesDemo {
    
      def main(args: Array[String]) : Unit =
      {
        new OrderService().processOrder();
        System.in.read()
      }
    }
    
    
    

    Factories typically use static methods, such that they act as singletons and can be used from anywhere, and have a new instance set from anywhere (which is typically at the start of the app, or a test case)

    Here is how we might change the factory to use a mock/test double before the testing starts. I am using ScalaTest in this example

    package org.scalatest.examples.flatspec.beforeandafter
    
    import org.scalatest._
    
    
    class ExampleSpec extends FlatSpec with BeforeAndAfter {
    
      before {
        //for the tests we could use a Mock, or a Test double
        ProcessorFactory._processor = new MockProcessor()
      }
    }
    

    Google Guice

    Google Guice is a DI library primarily for Java. However since Scala is a JVM language we may use it from Scala.

    You can read more about Google Guice here : https://github.com/google/guice/wiki up on date 17/11/15

    You willl need the following SBT libraryDependencies

     "com.google.inject" % "guice" % "3.0"
    

    Typical usage can be thought of as 4 separate things

    • Defining an abstraction, that our client code will depend on
    • Stating that the client code wants a dependency injected. This is done with annotations in Java/Scala using the @Inject annotation
    • Providing the wire up code to wire the abstraction that the client code wanted satisfied with the actual implementation instance that the client code will get at runtime
    • Get the item from the Google Guice DI framework

    Let’s see an example of these 4 points

    import com.google.inject.{ Inject, Module, Binder, Guice }
    
    //The abstraction
    trait Processor {
      def Process() : Unit
    }
    
    class ActualProcessor() extends Processor {
      override def Process(): Unit = {
          println("ActualProcessor")
      }
    }
    
    
    // OrderService needs a Processor abstraction
    class OrderService @Inject()(processor : Processor) {
    
      def processOrder(): Unit = {
        processor.Process
      }
    
    }
    
    
    //Declare a Google guice module that provides the wire up code
    class DependencyModule extends Module {
      def configure(binder: Binder) = {
        binder.bind(classOf[Processor]).to(classOf[ActualProcessor])
      }
    }
    
    
    object ClassesDemo {
    
      def main(args: Array[String]) : Unit =
      {
        //get the item from the DI framework
        val injector = Guice.createInjector(new DependencyModule)
        val orderService = injector.getInstance(classOf[OrderService])
        orderService.processOrder()
        System.in.read()
      }
    }
    
    
    

     

    This is a very very quick introduction to DI using Google Guice, but as you can see it is quite similar to other DI frameworks such as Spring (or Castle, Autofac, Unity in the .NET world). You should certainly read the wiki a bit more on this one.

     

     

    MacWire

    We will now spend a bit more time looking at another framework called “macwire” which you can read more about at this GitHub project :

    https://github.com/adamw/macwire up on date 17/11/15

    So how do we use this MacWire framework. Well to be honest it is not that different from Google Guice in the code you wrte, but it uses the idea of Scala Macros under the hood. Though you don’t really need to get involved with that to use it.

    We need to include the following SBT libraryDependencies before we start

    libraryDependencies ++= Seq(
      "com.softwaremill.macwire" %% "macros" % "2.1.0" % "provided",
      "com.softwaremill.macwire" %% "util" % "2.1.0",
      "com.softwaremill.macwire" %% "proxy" % "2.1.0"
    )
    

    So lets see an example usage shall we:

    package com.barbersDemo
    
    import com.softwaremill.macwire._
    
    //The abstraction
    trait Processor {
      def Process() : Unit
    }
    
    class ActualProcessor() extends Processor {
      override def Process(): Unit = {
          println("ActualProcessor")
      }
    }
    
    
    class MyApp {
      val processor = new ActualProcessor()
    }
    
    
    // OrderService needs a Processor abstraction
    class OrderService(processor : Processor) {
    
      def processOrder(): Unit = {
        processor.Process
      }
    
    }
    
    object ClassesDemo {
    
      def main(args: Array[String]) : Unit =
      {
    
        // we would substitute this line for a line that loads a Test
        // module with a set of test services services instead if we
        // were interested in testing/mocking
        val wired = wiredInModule(new MyApp)
    
        val orderService = wired.wireClassInstance[OrderService](classOf[OrderService])
        orderService.processOrder()
        System.in.read()
      }
    }
    
    
    

     

    As you can see from a usability point of view, it is not that different from using Google Guice. What is different is that we DO NOT have to use the @Inject annotation 

     

    Cake Pattern

    The cake pattern for me is the hardest one to get out of the lot, but seems to be the defacto way of doing DI in Scala.

    You do get used to it. I managed to do this without the internet to refer to with a colleague today, so it is something that comes with time.

    So here is the example:

    package com.barbersDemo
    
    
    // This trait is how you would express a dependency
    // Any class that needs a Processor would mix in this trait
    // along with using a self type to allow us to mixin either
    // a mock / test double
    trait ProcessorComponent {
    
      //abstract implementation, inheritors provide implementation
      val processor : Processor
    
      trait Processor {
        def Process() : Unit
      }
    }
    
    
    // An actual Processor
    trait ActualProcessorComponent extends ProcessorComponent {
    
      val processor = new ActualProcessor()
    
      class ActualProcessor() extends Processor {
        def Process(): Unit = {
          println("ActualProcessor")
        }
      }
    }
    
    
    // An test double Processor
    trait TestProcessorComponent extends ProcessorComponent {
    
      val processor = new TestProcessor()
    
      class TestProcessor() extends Processor {
        def Process(): Unit = {
          println("TestProcessor")
        }
      }
    }
    
    
    
    // The service that needs the Processor dependency
    // satisfied.Which happens via the use of mixins
    // and the use of a self type
    class OrderService {
    
      // NOTE : The self type that allows to
      // mixin and use a ProcessorComponent
      this: ProcessorComponent =>
    
      def ProcessOrder() {
        processor.Process()
      }
    
    }
    
    
    object ClassesDemo {
    
      def main(args: Array[String]) : Unit =
      {
        //val defaultOrderServiceComponent = new DefaultOrderServiceComponent with ActualProcessorComponent
    
        // To use the test double or mock we would use a line similar to this
        val defaultOrderServiceComponent = new OrderService with TestProcessorComponent
    
        defaultOrderServiceComponent.ProcessOrder()
        System.in.read()
      }
    }
    
    
    

     

    There are a couple of things to not there

    • We want to make use of a trait (abstract class) called “Processor” which others may extend to do something, or provide a mock/test implementation
    • We wrap the trait we want to inject in a xxxComponent (this appears to be some sort of convention), and we also have an abstract val that the inheritor of the trait will provide an implementation for. You can see this in the ProcessorComponent trait (which is abstract)
    • We then have an ActualProcessorComponent / TestProcessorComponent which implement the trait ProcessorComponent
    • The place where we want to make use of the service, we make use of the self type within the OrderService which is this part “this: ProcessorComponent =>”. What this really means is that the OrderService NEEDS a ProcessorComponent  mixed in to work correctly. But since we know we will have a ProcessorComponent  mixed in (eithe real implementation or mock / test double) we can make use of it in the OrderService class.
    • All that is left is to wire up the OrderService with either a real implementation or mock / test double. This is done in the ClassesDemo.main(..) method shown above

     

    Some further “Cake Pattern” blogs

     

     

    Structural Typing

    The last example I wanted to look at was using structural typing. To my mind this is kind of like duck typing, if you are expecting something that has a Print method, and you get something that has a Print method you should be able to use it.

    NOTE : this approach USES reflection so will have a performance impact if used a lot

    Here is an example of using structural typing

    package com.barbersDemo
    
    import com.softwaremill.macwire._
    
    //The abstraction
    trait Processor {
      def Process() : Unit
    }
    
    class ActualProcessor() extends Processor {
      override def Process(): Unit = {
          println("ActualProcessor")
      }
    }
    
    
    class TestProcessor() extends Processor {
      override def Process(): Unit = {
        println("TestProcessor")
      }
    }
    
    
    
    // OrderService needs a Processor abstraction
    // but this tim we use structural typing, if it looks like
    // a duck and quakes like a duck its a duck kind of thing
    class OrderService(env: { val processor: Processor }) {
    
      def processOrder(): Unit = {
        //this time we use the env parameter to obtain the dependency
        env.processor.Process
      }
    
    }
    
    
    
    object Config {
      lazy val processor = new ActualProcessor() // this is where injection happens
    }
    
    object TestConfig {
      lazy val processor = new TestProcessor() // this is where injection happens
    }
    
    object ClassesDemo {
    
      def main(args: Array[String]) : Unit =
      {
        new OrderService(Config).processOrder()
        new OrderService(TestConfig).processOrder()
        System.in.read()
      }
    }
    
    
    

    As this is a bit stranger I have included, a call which uses the actual implementation and also a call that uses a test implementation.

    The good thing about this is there there is no extra libraries, it is all standard Scala, and it is immutable and type safe.

    A nice way to go about things if you ask me