niedziela, 4 stycznia 2015

Creating Reactive Streams components on Akka Streams

Creating custom publishers and subscribers on Akka Streams

Reactive Streams is an ongoing common effort to create a JVM-wide standard for stream processing with backpressure. It tries to solve a problem of limited resources on multiple machine nodes taking part in a data processing chain or graph. It might consist of two or more nodes - senders and receivers - communicating with each other via messaging. Some of the nodes might be slower than others - they might not be able to process all incoming messages at a rate the senders (producers) are transferring them. Any such receiving node (a consumer) is then forced to skip messages or risk running out of memory.

To avoid flooding slower nodes with data, Reactive Streams introduce backpressure, in a form of upstream (receiver to sender) information flow: the consumer periodically informs the producer of a maximum number of messages it can receive. The producer can send data only on demand. In general, this mode of operation, where the receiver asks the sender for data is called pull-based. In Reactive Streams this specifically refers to the situation, where the consumer is slower and slows down the producer. It is in contrast to push-based mode, where the slower producer sends data not caring for demand. In reactive streams, the processing line between two nodes can dynamically switch between push and pull mode, depending on which side is slower at given moment.

Reactive Streams specification consists of several rather simple Java interfaces and an extensive set of rules every implementation must conform to. Let's look deeper at the interfaces representing different components of the protocol:

A Publisher is the producer of data. It declares only one method:
  • void subscribe(Subscriber<? super T> s) - it is invoked on a Subscriber (a consumer) connection attempt. After the method is called, the Publisher must in turn call the onSubscribe method on the Subscriber, passing a Subscription it creates. The stream is then formed, allowing for the exchange of data

A Subscriber defines 4 methods that are invoked by the Publisher:
  • void onSubscribe(Subscription s) - to store the Subscription created by the Publisher
  • void onNext(T t) - to handle an element of data sent by the Publisher
  • void onComplete() - to handle the end of stream signaled by the Publisher
  • void onError(Throwable t) - to handle an error that occurred on the publishing side

A Subscription consists of 2 methods invoked by the Subscriber:
  • void request(long n) - to signal demand to the Publisher - the number of messages the Subscriber is able to handle
  • void cancel() - to indicate a desire to cancel the subscription and let the Publisher know not to send more data
A Processor interface that is both a Publisher and a Subscriber

During the stream lifetime, the receiving side repeatably requests more elements using the request method. The publisher responds by invoking the onNext method on the subscriber the number of times less or equal the number of requested elements. Streaming can end by either normal completion (when the publisher calls onComplete) or abnormal termination (the publisher calls onError passing an exception). After that, no more elements may be transfered.

Using the Reactive Streams interfaces it is possible to implement many different approaches to exchanging data. However, while powerful, this aproach is relatively low level. Moreover, correct implementation of the raw Publisher and Subscriber interfaces can be quite tricky. For example, the Publisher must keep track of connected Subscriber(s) and monitor demand signaled by any Subscription it created. The Subscriber must in turn balance the rate of elements received and the demand for future elements. Implementation choices for these aspects could be also similar in many different subscribers and publishers.

Fortunately, we can use Akka Streams to let it manage some of the complexity for us and make the implementation much more straightforward.

Creating an ActorPublisher 

Akka Streams is one of the implementations of Reactive Streams, building a high-level data processing DSL on top of the protocol. Beside giving a convenient way of transforming flows of data, Akka Streams defines two traits for implementing custom publishers and subscribers based on actors: ActorPublisher[T] and ActorSubscriber (as of akka-stream-experimental, version 1.0-M2, subject to change).

A skeleton implementation of a publisher may look like the following:

class StringActorPublisher extends ActorPublisher[String] { 
 
   def generateElement() : Try[Option[String]] = ??? 
   def cleanupResources() = ??? 
 
   def receive = { 
      case ActorPublisherMessage.Request(n) => 
         while (isActive && totalDemand > 0) { 
            generateElement() match { 
               case Success(valueOpt) => 
                  valueOpt 
                    .map(element => onNext(element)) 
                    .getOrElse(onComplete()) 
               case Failure(ex) => 
                  onError(ex) 
            } 
         } 
      case ActorPublisherMessage.Cancel => 
         cleanupResources() 
      case ActorPublisherMessage.SubscriptionTimeoutExceeded => 
         cleanupResources() 
   } 
}

Producing data 

After stream is established, ActorPublisher starts receiving Request messages from the subscriber it is connected to, with each such message containing a demand information for a number of elements it requests. Internally, ActorPublisher keeps track of information in a variable by adding each received demand to it and decrementing it on each call to onNext. The current aggregated demand is exposed via always up-to-date totalDemand method, making it easy to react to.

Let's imagine we have a side-effecting data producer method called generateElement that does some computation to generate next element from a sequence. Suppose it normally returns Success(Some(element)), but will return Success(None) if the are no more elements to generate or Failure in case of a fatal error. That models the three cases of passing information downstream, as described by Reactive Streams. Each call to onNext transfers one element to the subscriber. However, only one of onComplete and onError can be signaled and either of these situations end streaming, putting the Publisher in the non-active state.

Subscription management

Apart from the SubscriptionTimeoutExceeded message (received when the subscriber does not subscribe during some time after creating the publisher actor), there is no concept of a subscription or subscriptions to manage. All logic related to the subscribe method from Reactive Streams is hidden in the internal ActorPublisher trait implementation. Request and Cancel messages that correspond to the methods on the Subscription are delivered to the actor itself. ActorPublisher relieves us from manual initialization of the subscriber wiring and later managing the subscription.

What is also apparent from the messages it receives, it can handle only one subscriber. This is the approach Akka team have chosen: Akka Streams has other means for connecting a producer to multiple subscribers (a fan-out component in a form of a Broadcast or Balance). The pattern of connecting multiple subscribers to one publisher is in fact a special case, which can be abstracted away from typical publisher implementations - an algorithm for element distribution can be decoupled from its production. 

Creating an ActorSubscriber

ActorSubscriber implementation is even more straightforward. Also in this case handling the subscription is already taken care of, and what remains is mostly handling the stream of data and lifecycle changes:

class StringActorSubscriber extends ActorSubscriber {
   protected def requestStrategy = WatermarkRequestStrategy(10)

   def processElement(element: String) = ???
   def handleError(ex: Throwable) = ???
   def streamFinished() = ???

   def receive = {
      case ActorSubscriberMessage.OnNext(element) =>
         processElement(element.asInstanceOf[String])
      case ActorSubscriberMessage.OnError(ex) =>
         handleError(ex)
      case ActorSubscriberMessage.OnComplete =>
         streamFinished()
   }
}

Request strategy

ActorSubscriber trait also declares the requestStrategy method that must return an instance of RequestStrategy. This is where the upstream demand generation logic is contained - deciding when to request more elements from the publisher. Manual invocation of request method on a subscription is abstracted away. Akka Streams already defines several strategies, like WatermarkRequestStrategy (sending more demand in batches) or OneByOneRequestStrategy (requesting the next element only after the previous one arrives), it's also simple to create a custom one.

Actor lifecycle

Stream lifecycle is actually not connected to the actor lifecycle. Internally, both traits use external state storage to preserve state between actor restarts. Also, when streaming ends, actor will still live until stopped by custom code. Therefor, any cleanup or release of resources must be done explicitly.

Exposing publishers and subscribers

After creating a custom ActorPublisher or ActorSubscriber, what remains is to convert them into a regular publisher or subscriber that can be used in any library implementing the Reactive Streams protocol. This is done by wrapping their ActorRefs into calls to ActorPublisher.apply[T] or ActorSubscriber.apply[T]:

val system = ActorSystem()

val subscriber: Subscriber[String] = ActorSubscriber[String](system.actorOf(
                     Props(classOf[StringActorSubscriber])))

val publisher: Publisher[String] = ActorPublisher[String](system.actorOf(
                     Props(classOf[StringActorPublisher])))

Conclusion

Akka Streams takes the low level SPI of Reactive Streams and exposes a more programmer-friendly API for managing streams. One side of it is the stream transformation DSL (not covered in this article) used for consuming existing streams. But when a need occurs for a new stream producer or consumer, ActorPublisher and ActorSubscriber traits make implementing it more straightforward and help bridge other components into Reactive Streams.

References