Skip to content

[Legacy, RxJava] Mantis Job - Writing Custom Source

Note

There is an alternate implementation that allows writing a Mantis Job as a series of operators operating directly on a MantisStream instance which abstracts information about RxJava, Observables from the user and offers a simpler way (hopefully 🤞) to write mantis jobs. Please see Mantis DSL docs for more details or this documentation for the sample job.

Our first tutorial primed us for writing and executing a job end-to-end but it wasn't particularly interesting from a data perspective because it just repeatedly looped over the contents of a book. In this example we'll explore writing a more involved source which reads an infinite stream of data from Twitter and performs the same word count in real-time. Mantis jobs can easily subscribe to one another using some built in sources but the technique in this tutorial can be used to pull external data into the Mantis ecosystem.

To proceed you'll need to head over to Twitter and grab yourself a pair of API keys.

The Source

The source is responsible for ingesting data to be processed within the job. Many Mantis jobs will subscribe to other jobs and can simply use a templatized source such as io.mantisrx.connectors.job.source.JobSource which handles all the minutiae of connecting to other jobs for us. If however your job exists on the edge of Mantis it will need to pull data in via a custom source. Since we're reading from the Twitter API we'll need to do this ourselves.

Our TwitterSource must implement io.mantisrx.runtime.source.Source which requires us to implement call and optionally init. Mantis provides some guarantees here in that init will be invoked exactly once and before call which will be invoked at least once. This makes init the ideal location to perform one time setup and configuration for the source and call the ideal location for performing work on the incoming stream. The objective of this entire class is to have call return an Observable<Observable<T>> which will be passed as a parameter to the first stage of our job.

Let's deconstruct the init method first. Here we will extract our parameters from the Context -- this allows us to write more generic sources which can be templatized and reused across many jobs. This is a very common pattern for writing Mantis jobs and allows you to iterate quickly testing various configurations as jobs can be resubmitted easily with new parameters.

/**
  * Init method is called only once during initialization. It is the ideal place to perform one time
  * configuration actions.
  *
  * @param context Provides access to Mantis system information like JobId, Job parameters etc
  * @param index   This provides access to the unique workerIndex assigned to this container. It also provides
  *                the total number of workers of this job.
  */
@Override
public void init(Context context, Index index) {

    String consumerKey = (String) context.getParameters().get(CONSUMER_KEY_PARAM);
    String consumerSecret = (String) context.getParameters().get(CONSUMER_SECRET_PARAM);
    String token = (String) context.getParameters().get(TOKEN_PARAM);
    String tokenSecret = (String) context.getParameters().get(TOKEN_SECRET_PARAM);
    String terms = (String) context.getParameters().get(TERMS_PARAM);

    Authentication auth = new OAuth1(consumerKey,
            consumerSecret,
            token,
            tokenSecret);

    StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();

    String[] termArray = terms.split(",");

    List<String> termsList = Arrays.asList(termArray);

    endpoint.trackTerms(termsList);

    client = new ClientBuilder()
            .name("twitter-source")
            .hosts(Constants.STREAM_HOST)
            .endpoint(endpoint)
            .authentication(auth)
            .processor(new StringDelimitedProcessor(twitterObservable))
            .build();


    client.connect();
}

Our call method is very simple thanks to the fact that our twitter client writes to a custom BlockingQueue adapter that we've written. We simply need to return an Observable<Observable<T>>.

@Override
public Observable<Observable<String>> call(Context context, Index index) {
    return Observable.just(twitterObservable.observe());
}

You may have noticed that our init method is pulling a bunch of parameters out of the Context. These are specified in Source#getParameters() and allow us to parameterize this source so that different instances of this job may work with different parameters. This is a very useful concept for designing reusable components for constructing jobs as well as completely reusable jobs.

    /**
     * Define parameters required by this source.
     *
     * @return
     */
    @Override
    public List<ParameterDefinition<?>> getParameters() {
        List<ParameterDefinition<?>> params = Lists.newArrayList();

        // Consumer key
        params.add(new StringParameter()
                .name(CONSUMER_KEY_PARAM)
                .description("twitter consumer key")
                .validator(Validators.notNullOrEmpty())
                .required()
                .build());

        params.add(new StringParameter()
                .name(CONSUMER_SECRET_PARAM)
                .description("twitter consumer secret")
                .validator(Validators.notNullOrEmpty())
                .required()
                .build());

        params.add(new StringParameter()
                .name(TOKEN_PARAM)
                .description("twitter token")
                .validator(Validators.notNullOrEmpty())
                .required()
                .build());

        params.add(new StringParameter()
                .name(TOKEN_SECRET_PARAM)
                .description("twitter token secret")
                .validator(Validators.notNullOrEmpty())
                .required()
                .build());

        params.add(new StringParameter()
                .name(TERMS_PARAM)
                .description("terms to follow")
                .validator(Validators.notNullOrEmpty())
                .defaultValue("Netflix,Dark")
                .build());

        return params;

    }

Now our primary class TwitterJob which implements MantisJobProvider needs to specify our new source so change the source line to match the following.

.source(new TwitterSource())

The Stage

The stage is nearly equivalent to the previous tutorial. We need to add a few lines to the beginning of the chain of operations to deserialize the string, filter for English Tweets and pluck out the text.

            .stage((context, dataO) -> dataO

                    // Deserialize data
                    .map(JsonUtility::jsonToMap)

                    // Filter for English Tweets
                    .filter((eventMap) -> {
                        if(eventMap.containsKey("lang") && eventMap.containsKey("text")) {
                            String lang = (String)eventMap.get("lang");
                            return "en".equalsIgnoreCase(lang);
                        }
                        return false;
                    })

                    // Extract Tweet body
                    .map((eventMap) -> (String)eventMap.get("text"))

                    // Same from here...

Conclusion

We've learned how to create a parameterized source which reads from Twitter and pulls data into the ecosystem. With some slight modifications our previous example's stage deserializes the messages and extracts the data to perform the same word count.

If you've checked out the mantis repository, then running following commands should begin running the job and expose a local port for SSE streaming.

$ cd mantis-examples/mantis-examples-twitter-sample
$ ../../gradlew execute --args='consumerKey consumerSecret token tokensecret'

As an exercise consider how you might begin to scale this work out over multiple machines if the workload were too large to perform on a single host. This will be the topic of the next tutorial.