Source Jobs
Mantis Source Jobs are Mantis Jobs that fetch data from external sources. There are four types of Source Jobs:
-
Mantis Publish Source Jobs read from their sources by using the Mantis Publish client library. As such, they do not apply MQL on events themselves. Instead, they propagate the MQL queries upstream to Mantis Publish running on the external source, which then applies the MQL queries to the events it produces and only then pushes those events downstream to the Request Source Job.
-
Kafka Source Jobs consume events from Kafka and apply MQL to each incoming event.
Composition of a Source Job¶
A Source Job is composed of three components:
- the default Source
- custom Processing Stages including a tagging operator
- the default Sink consisting of an SSE operator
For example, here is how you might declare a Kafka Source Job:
MantisJob
.source(KafkaSource)
.stage(
getAckableTaggingStage(),
CustomizedAutoAckTaggingStage.config())
.sink(new TaggedDataSourceSink(
new QueryRequestPreProcessor(),
new QueryRequestPostProcessor()))
.lifecycle(...)
.create();
Source (RxFunction)¶
The Source in this example contains code that creates and manages connections to Kafka using the
0.10 high level consumer. It creates an Observable with backpressure semantics by leveraging the
SyncOnSubscribe
class.
Processing Stage (RxFunction)¶
The next stage in this Job is the Processing Stage which enriches events with metadata. This stage transforms events in the following way:
- Applies a user-defined pre-mapping function.
- This is a Groovy function that takes a
Map<String, Object>
and returns aMap<String, Object>
referenced by a variable namede
.
- This is a Groovy function that takes a
- Filters out empty events.
- Inspects its internal subscription registry and enriches each event with all matching
subscriptions.
- Subscriptions are represented by an MQL query and are registered when a consumer (e.g. Mantis Job) subscribes to the Source Job.
-
Each event is enriched with fields specified by the projections of a subscription’s MQL query, as in the following illustration:
Sink (RxAction)¶
In order for a consumer to consume events from a Source Job, the consumer connects to the Job’s Sink.
Consumers subscribe to a Source Job by sending a subscription request over HTTP to the Source Job’s Sink.
When a consumer connects to a Sink, the consumer must provide three query parameters:
criterion
— An MQL query stringclientId
— This is automatically generated if you use the Mantis client library; it defaults to the Mantis Job IDsubscriptionId
— This is used as a load-balancing mechanism forclientId
A consumer (represented as a client through clientId
) may have many consumer instances
(represented as susbcriptions through subscriptionId
). Source Jobs use clientId
and
subscriptionId
to broadcast and/or load balance events to consumers.
Source Jobs will broadcast an event to all clientId
s. This means that consumer instances with
different clientId
s will each receive the same event.
However, Source Jobs will load balance an event within a clientId
. This means that consumer
instances with the same clientId
but different subscriptionId
s are effectively grouped together.
Events with the same clientId
are load balanced among its subscriptionId
s.
curl "http://instance-address:port?clientId=myId&subscriptionId=mySubscription&criterion=select%20*%20where%20true"
data: {"x": 60.000000, "y": -3.048106} data: {"x": 100.000000, "y": -5.063656} data: {"x": 26.000000, "y": 7.625585} ⋮
Sinks have a pre-processor (QueryRequestPreProcessor
), a post-processor
(QueryRequestPostProcessor
), and a router:
-
The pre-processor is an RxFunction that registers the consumer’s query, with their
clientId
andsubscriptionId
, into an in-memory cache called aQueryRefCountMap
when a consumer instance connects to the Sink. This registers queries so that the Source Job can apply them to events as those events are ingested by the Source Job. -
The post-processor is an RxFunction that de-registers subscriptions from the
QueryRefCountMap
when a consumer instance disconnects from the Sink. The Source Job removes removes theclientId
entirely from theQueryRefCountMap
only when all of itssubscriptionId
s have been removed. -
The router routes incoming events for a
clientId
s to its subscriptions. It does this by using a drainer called aChunkProcessor
to drain events from an internal queue on an interval and randomly distribute the events to subscriptions.
Note
Typically, subscriptions to a Source Job come from other Mantis Jobs. However, because subscriptions are SSE endpoints, you can subscribe to Source Jobs over that same SSE endpoint to view the Job’s output for debugging purposes.
Caveats¶
Source Jobs are single-stage Mantis Jobs that perform projection and filtering operations. MQL
queries containing groupBy
, orderBy
, and window
are ignored. These clauses are interpreted
into RxJava operations and run by downstream Mantis Jobs.
Mantis Publish-based Source Jobs do not autoscale. Autoscaling Mantis Publish-based Source Jobs requires future work to reshuffle connections among all Source Job instances and their upstream Mantis Publish connections.