Skip to content

Source Jobs

Mantis Source Jobs are Mantis Jobs that fetch data from external sources. There are four types of Source Jobs:

  1. Mantis Publish Source Jobs read from their sources by using the Mantis Publish client library. As such, they do not apply MQL on events themselves. Instead, they propagate the MQL queries upstream to Mantis Publish running on the external source, which then applies the MQL queries to the events it produces and only then pushes those events downstream to the Request Source Job.

  2. Kafka Source Jobs consume events from Kafka and apply MQL to each incoming event.

Composition of a Source Job

A Source Job is composed of three components:

  1. the default Source
  2. custom Processing Stages including a tagging operator
  3. the default Sink consisting of an SSE operator

For example, here is how you might declare a Kafka Source Job:

MantisJob
  .source(KafkaSource)
  .stage(
     getAckableTaggingStage(),
     CustomizedAutoAckTaggingStage.config())
  .sink(new TaggedDataSourceSink(
     new QueryRequestPreProcessor(),
     new QueryRequestPostProcessor()))
  .lifecycle(...)
  .create();

Source (RxFunction)

The Source in this example contains code that creates and manages connections to Kafka using the 0.10 high level consumer. It creates an Observable with backpressure semantics by leveraging the SyncOnSubscribe class.

Processing Stage (RxFunction)

The next stage in this Job is the Processing Stage which enriches events with metadata. This stage transforms events in the following way:

  1. Applies a user-defined pre-mapping function.
    • This is a Groovy function that takes a Map<String, Object> and returns a Map<String, Object> referenced by a variable named e.
  2. Filters out empty events.
  3. Inspects its internal subscription registry and enriches each event with all matching subscriptions.
    • Subscriptions are represented by an MQL query and are registered when a consumer (e.g. Mantis Job) subscribes to the Source Job.
    • Each event is enriched with fields specified by the projections of a subscription’s MQL query, as in the following illustration:

      If an event matches multiple subscriptions, then the event will be enriched with projections from every subscription’s MQL query.

Sink (RxAction)

In order for a consumer to consume events from a Source Job, the consumer connects to the Job’s Sink.

Consumers subscribe to a Source Job by sending a subscription request over HTTP to the Source Job’s Sink.

When a consumer connects to a Sink, the consumer must provide three query parameters:

  1. criterion — An MQL query string
  2. clientId — This is automatically generated if you use the Mantis client library; it defaults to the Mantis Job ID
  3. subscriptionId — This is used as a load-balancing mechanism for clientId

A consumer (represented as a client through clientId) may have many consumer instances (represented as susbcriptions through subscriptionId). Source Jobs use clientId and subscriptionId to broadcast and/or load balance events to consumers.

Source Jobs will broadcast an event to all clientIds. This means that consumer instances with different clientIds will each receive the same event.

However, Source Jobs will load balance an event within a clientId. This means that consumer instances with the same clientId but different subscriptionIds are effectively grouped together. Events with the same clientId are load balanced among its subscriptionIds.

Example of subscribing to a Source Job’s Sink which outputs the results of a sine wave function:
curl "http://instance-address:port?clientId=myId&subscriptionId=mySubscription&criterion=select%20*%20where%20true"

data: {"x": 60.000000, "y": -3.048106}

data: {"x": 100.000000, "y": -5.063656}

data: {"x": 26.000000, "y": 7.625585}

⋮

Sinks have a pre-processor (QueryRequestPreProcessor), a post-processor (QueryRequestPostProcessor), and a router:

  • The pre-processor is an RxFunction that registers the consumer’s query, with their clientId and subscriptionId, into an in-memory cache called a QueryRefCountMap when a consumer instance connects to the Sink. This registers queries so that the Source Job can apply them to events as those events are ingested by the Source Job.

  • The post-processor is an RxFunction that de-registers subscriptions from the QueryRefCountMap when a consumer instance disconnects from the Sink. The Source Job removes removes the clientId entirely from the QueryRefCountMap only when all of its subscriptionIds have been removed.

  • The router routes incoming events for a clientIds to its subscriptions. It does this by using a drainer called a ChunkProcessor to drain events from an internal queue on an interval and randomly distribute the events to subscriptions.

Note

Typically, subscriptions to a Source Job come from other Mantis Jobs. However, because subscriptions are SSE endpoints, you can subscribe to Source Jobs over that same SSE endpoint to view the Job’s output for debugging purposes.

Caveats

Source Jobs are single-stage Mantis Jobs that perform projection and filtering operations. MQL queries containing groupBy, orderBy, and window are ignored. These clauses are interpreted into RxJava operations and run by downstream Mantis Jobs.

Mantis Publish-based Source Jobs do not autoscale. Autoscaling Mantis Publish-based Source Jobs requires future work to reshuffle connections among all Source Job instances and their upstream Mantis Publish connections.