The Sink Component

The Sink component of a Mantis Job serves two purposes:

  1. Trigger job execution when a client subscribes to a Job

    RxJava cold Observables have a lazy execution model. Execution begins only when someone subscribes to the Observable. A Mantis Job is a complex Observable chain, and to trigger the execution of such a Job, somebody needs to subscribe to it. This happens in the Sink component.

  2. Output the results of job execution in a streaming fashion

    Once you are done processing the input stream in the Processing Stage component, you need to figure out what to do with the results. Most jobs that write to some other system do so within the Processing Stage component itself (in such a case, the Sink component is usually used for debugging purposes).

To create a Sink component, you implement the Sink interface:

import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.PortRequest;
import rx.Observable;
import rx.functions.Action3;

public interface Sink<T> extends Action3<Context, PortRequest, Observable<T>> { }

Built-in Sinks

Some Sinks are provided by Mantis. To get access to these Sinks, add this line to your source:

import io.mantisrx.runtime.sink.Sinks;

SSE Sink

The SSE Sink is commonly used. It makes the results of your Stage transformation available in the form of an SSE stream.

To get an SSE Sink, pass an encoder function to the sse() method that accepts the data to be streamed as input and outputs data encoded as needed. The following example attaches a sink to a Mantis Job that passes the data from the last Processing Stage of the job, unchanged, to the SSE stream:

return MantisJob.  .sink(Sinks.sse((String data) -> data)).  .create();

sysout Sink

The sysout Sink simply outputs the results from the previous Processing Stage directly to sysout. For example:

return MantisJob.  .sink(Sinks.sysout()).  .create();

eagerSubscribe Sink

A typical Sink subscribes to the output of the previous Processing Stage at some point during the call to its call() method. You may want some of your Mantis Jobs to start executing as soon as you launch them. These include Jobs that run perpetually and power things like alerts, dashboards, and so forth.

You can modify a Sink with eagerSubscribe() to create a Sink that that instead subscribes to the output of the previous Processing Stage immediately when its call() method is called, even if it has more processing to do within that method before it can respond to the output. This will start your Mantis Job more quickly, but may mean some of its initial data is dropped.

For example:

return MantisJob.  .sink(Sinks.eagerSubscribe(Sinks.sse((String data) -> data))).  .create();

Here is a more complete example:

return MantisJob        
        // Reuse existing class that does all the plumbing of connecting to source jobs
        .source(new JobSource())

        // Groups requests by ESN
        .stage(new GroupByStage(), GroupByStage.config())

        // Computes sum over a window
        .stage(new FastAggregateStage(), FastAggregateStage.config())

        // Collects the data and makes it availabe over SSE
        .stage(new CollectStage(), CollectStage.config())

        // Reuse built in sink that eagerly subscribes and delivers data over SSE
        .sink(Sinks.eagerSubscribe(
                Sinks.sse((String data) -> data)
             ))

            // Here we define the job parameter overrides        

            // The query sent to the source job.
            // Here we fetch the esn for all requests hitting the source
            .parameterDefinition(new StringParameter()
            .name(MantisSourceJobConnector.MANTIS_SOURCEJOB_CRITERION)
            .validator(Validators.notNullOrEmpty())
            .defaultValue("SELECT customer_id, client_ip WHERE true")
            .build())    

        .metadata(new Metadata.Builder()
            .name("GroupByIp")
            .description("Connects to a source job and counts the number of requests sent from each ip within a window")
            .build())
        .create();

toMany Sink

You can hook up multiple Sinks to the same final Stage of a Job by using the toMany Sink. To do this, pass each Sink to the toMany() method. For example:

return MantisJob.  .sink(Sinks.toMany(Sinks.sysout(), Sinks.sse((String data) -> data))).  .create();

Custom Sinks

If you do not want to use one of the provided Sinks, or if you need to customize one of those (for instance, if you need access to the query parameters supplied by a client who is connecting into your Job, or if you need pre and post hooks to perform operations as a new client connects or disconnects), you can create your own Sink. To do so, implement the Sink interface.

The ServerSentEventsSink builder takes three parameters:

  1. Preprocess function

    this callback gives you access to the query parameters; it is invoked before job execution begins

  2. Postprocess function

    this callback allows you to perform any clean-up actions; it is invoked just after the client connection is terminated

  3. Predicate function

    allows you to filter your output stream based on the given predicate, which allows you to filter based on query parameters sent by the client

Here is an example of a custom Sink that uses the ServerSentEventsSink builder:

package com.netflix.mantis.sourcejob;

import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.PortRequest;
import io.mantisrx.runtime.sink.ServerSentEventsSink;
import io.mantisrx.runtime.sink.Sink;
import io.mantisrx.runtime.sink.predicate.Predicate;

import java.util.List;
import java.util.Map;

import rx.Observable;
import rx.functions.Func1;
import rx.functions.Func2;

import com.google.common.base.Charsets;

public class SourceSink implements Sink<String> {

  Func2<Map<String,List<String>>,Context,Void> preProcessor = new NoOpProcessor();
  Func2<Map<String,List<String>>,Context,Void> postProcessor = new NoOpProcessor();
  private String clientId = "DEFAULT_CLIENT_ID";

  static class NoOpProcessor implements Func2<Map<String,List<String>>,Context,Void> {
    @Override
    public Void call(Map<String, List<String>> t1, Context t2) {
      return null;
    }
  }

  public SourceSink() {
  }

  public SourceSink(Func2<Map<String,List<String>>,Context,Void> preProcessor,
           Func2<Map<String,List<String>>,Context,Void> postProcessor,
           String mantisClientId) {
    this.postProcessor = postProcessor;
    this.preProcessor = preProcessor;
    this.clientId = mantisClientId;
  }

  @Override
  public void call(Context context, PortRequest portRequest,
                   Observable<String> observable) {

    observable = observable.filter(new Func1<String,Boolean>() {
      @Override
      public Boolean call(String t1) {
        return !t1.isEmpty();
      }
    });

    ServerSentEventsSink<String> sink = new ServerSentEventsSink.Builder<String>()
    .withEncoder(new Func1<String, String>() {
      @Override
      public String call(String data) {
        return data;
      }
    })
    .withPredicate(new Predicate<String>("description",new EventFilter(clientId)))
    .withRequestPreprocessor(preProcessor)
    .withRequestPostprocessor(postProcessor)
    .build();

    observable.subscribe();

    sink.call(context, portRequest, observable);
  }

  public static void main(String [] args) {
    String s = "{\"amazon.availability-zone\":\"us-east-1e\",\"status\":200,\"type\":\"EVENT\",\"matched-clients\":\"client6\",\"currentTime\":1409595016697,\"duration-millis\":172}";

    byte [] barr = s.getBytes(Charsets.UTF_8);
    System.out.println("size: " + barr.length);
  }
}