Saturday, April 14, 2012

Hub: Interesting but undocumented feature of Playframework 2

WARNING: 

hub are depreciated in Play2.1. They are replace by Concurrent.broadcast (which is also not documented). I did a very small example that demonstrate the concept and post it on github: https://github.com/benqua/BroadcasterInPlay2

The Problem

Write a stream based (server sent event, comet or WebSocket) web application where many clients need to get the same information (news feed or any kind of dashboard for example).
Why push the same information to all clients? Because this information may be costly to build or it is important every client gets the exact same, time dependent, information.

The Concepts

To deal with information stream, Play provides many new (to me) interesting concepts, like Iteratees, Enumerators and Enumartees. Check the documentation. 
Basically we want to push an enumerator (think: “input data stream”) to many web clients.
To do so, Play2 provides an undocumented object called “hub”. Lets see it in action.

The Code

For this example, we will use “Server Sent Events” to push information from the server to many clients.

First, a simple example without hub, the code of the controller could be:
  
  //no hub: each client gets its own event
  def serverevents = Action {
    val timestep = 1000
    //getmtime is an Enumarator that provides 
    // a new number (time) every second
    val getmtime = Enumerator.fromCallback{ () =>
      Promise.timeout( {
        //time in tenth of sec
        val currentTime = java.lang.System.currentTimeMillis() / 100 
        Logger.debug(currentTime toString)
        Some(currentTime +":ds")},
        timestep, TimeUnit.MILLISECONDS )
    }

    // the getmtime enumerator is pushed through the Enumerattee returned
    //  by the apply method of the EventSource object, 
    //  to be transformed into a Server Sent Event.
    // The event is then sent to the client.
    Ok.stream(getmtime &> EventSource()).as("text/event-stream")
  }

If you run this example in two browsers, you will see that they get different results. It is also obvious, from the log in the Play console, that a new time list is generated for each client. 
There is one line per client per second in the log.
If this small function (getting current time in milliseconds) were resource intensive, resource usage would grow linearly with the number of client.  There is one call to the function per client every time step.

Now, let's add the hub and refactor a little bit the controller code:
 
  val timestep = 1000
  val timeEnum = Enumerator.fromCallback{ () =>
    Promise.timeout( {
      val currentTime = java.lang.System.currentTimeMillis()
      Logger.debug(currentTime toString)
      Some(currentTime +":ds")},
      timestep, TimeUnit.MILLISECONDS )
  } 
  
  // now, the hub that let us 
  // "share" the timeEnum between all clients
  val hub = Concurrent.hub[String](timeEnum)
  
  //with hub: one event generation for all clients; 
  def serverhubevents = Action {
    implicit val encoder = Comet.CometMessage.stringMessages
    Ok.stream(hub.getPatchCord &> EventSource()).as("text/event-stream")
  }

Note the use of the magic and undocumented hub.getPatchCord function which, at least conceptually, returns a copy of the timeEnum Enumerator.

With this code, all clients will get the exact same output. The (potentially) resource intensive function that feed the timeEnum would be called only once every time step, independently of the number of clients.

Conclusion

Hubs are a great way to multiplex an answer to many clients. 
Sadly, there is no documentation for the moment.


Open Questions

I don't understand everything in my own (based on “try and test”) code :( Hopefully, someone can shed some light on the two below open points:
  1. First, a Scala question: why do I have to add the “implicit” parameter in the hub version while it is not necessary in the version without hub?
  2. In the first example, if I remove the Enumerator definition from the end point function and put it in the controller object scope, I get the exact same result. In this case, shouldn't the Enumerator be shared between the clients? I don't know exactly what should happen, but I don't understand why each client gets its own enumerator:
 
  val timestep = 1000
  val timeEnum = Enumerator.fromCallback{ () =>
    Promise.timeout( {
      val currentTime = java.lang.System.currentTimeMillis()
      Logger.debug(currentTime toString)
      Some(currentTime +":ds")},
      timestep, TimeUnit.MILLISECONDS )
  } 

  //why do each client have its own timeEnum
  // (declared in the controller object scope)??
  def servereventsshared = Action {
    implicit val encoder = Comet.CometMessage.stringMessages
    Ok.stream(timeEnum &> EventSource()).as("text/event-stream")
  }

This gives the exact same result as the version without hub

Last question: how will the server know that a client browser window has been closed and that the stream shouldn't be pushed to this client anymore? no way... ?

Thanks

Thanks to Gaƫtan Renaudeau who wrote a very interesting experiment and blog post using, among other things, hub. His experiment raises my attention to this undocumented Play2 feature.

2 comments:

  1. I am trying to implement this in Java-Play Framework. Any pointers?

    ReplyDelete
  2. How to implement SSE with PlayFramework 1.2.7 ?

    ReplyDelete