Skip to content

Source

To implement a Mantis Job Source component, you must implement the io.mantisrx.runtime.Source interface. A Source returns Observable<Observable<T>>, that is, an Observable that emits Observables. Each of the emitted Observables represents a stream of data from a single target server.

Varieties of Sources

Sources can be roughly divided into two categories:

  1. Sources that read data from the output of other Mantis Jobs
    1. this may include Source Jobs (more on this below)
    2. or ordinary Mantis Jobs
  2. Custom sources that read data directly from Amazon S3, SQS, Apache Kafka, etc.

Mantis Job Sources

You can string Mantis Jobs together by using the output of one Mantis Job as the input to another. This is useful if you want to break up your processing into multiple, reusable components and to take advantage of code and data reuse.

In such a case, you do not have access to the complete set of Mantis Query Language (MQL) capabilities that you do in the case of a Source Job, but you can use MQL in client mode.

Connecting to a Mantis Job

To connect to a Mantis Job, use a JobSource when you call MantisJob.create() — declare the following parameters when you use this class:

  1. sourceJobName (required) — the name of any valid Job Cluster (not necessarily a “Source Job”)
  2. sample (optional) — use this if you want to sample the output sample times per second, or set this to -1 to disable sampling

For example:

MantisJob.source(new JobSource())
         .stage()
         .sink()
         .parameterDefinition(new StringParameter().name("sourceJobName")
            .description("The name of the job")
            .validator(Validators.notNullOrEmpty())
            .defaultValue("MyDefaultJob")
            .build())
         .parameterDefinition(new IntParameter().name("sample")
            .description("The number of samples per second")
            .validator(Validators.range(-1, 10000))
            .defaultValue(-1)
            .build())
         .lifecycle()
         .metadata()
         .create();

Source Job Sources

Mantis has a concept of Source Jobs which are Mantis Jobs with added conveniences and efficiences that simplify accessing data from certain sources. Your job can simply connect to a source job as its data source rather than trying to retrieve the data from its native home. There are two advantages to this approach:

  1. Source Jobs handle all of the implementation details around interacting with the native data source.
  2. Source Jobs come with a simple query interface based on the Mantis Query Language (MQL), which allows you to filter the data from the source before processing it. In the case of source jobs that fetch data from application servers directly, this filter gets pushed all the way to those target servers so that no data flows unless someone is asking for it.
  3. Source Jobs reuse data so that multiple matching MQL queries are forwarded downstream instead of paying the cost to fetch and serialize/deserialize the same data multiple times from the upstream source.

Broadcast Mode

By default, Mantis will distribute the data that is output from the Source Job among the various workers in the processing stage of your Mantis Job. Each of those workers will get a subset of the complete data from the Source Job.

You can override this by instructing the Source Job to use “broadcast mode”. If you do this, Mantis will send the complete set of data from the Source Job to every worker in your Job.

Connecting to a Source Job

Since Source Jobs are fundamentally Mantis Jobs, you should use a JobSource when you call MantisJob.create() to connect to a particular Source Job. The difference is that you should pass in additional parameters:

  1. sourceJobName (required) — the name of the source Job Cluster you want to connect to
  2. sample (required) — use this if you want to sample the output sample times per second, or set this to -1 to disable sampling
  3. criterion (required) — a query expression in MQL to filter the source
  4. clientId (optional) — by default, the jobId of the client Job; the Source Job uses this to distribute data between all the subscriptions of the client Job
  5. enableMetaMessages (optional) — the source job may occasionally inject meta messages (with the prefix mantis.meta.) that indicate things like data drops on the Source Job side.

For example:

MantisJob.source(new JobSource())
         .stage()
         .sink()
         .parameterDefinition(new StringParameter().name("sourceJobName")
            .description("The name of the job")
            .validator(Validators.notNullOrEmpty())
            .defaultValue("MyDefaultSourceJob")
            .build())
         .parameterDefinition(new IntParameter().name("sample")
            .description("The number of samples per second")
            .validator(Validators.range(-1, 10000))
            .defaultValue(-1)
            .build())
         .parameterDefinition(new StringParameter().name("criterion")
            .description("Filter the source with this MQL statement")
            .validator(Validators.notNullOrEmpty())
            .defaultValue("true")
            .build())
         .parameterDefinition(new StringParameter().name("clientId")
            .description("the ID of the client job")
            .validator(Validators.alwaysPass())
            .build())
         .parameterDefinition(new BooleanParameter().name("enableMetaMessages")
            .description("Is the source allowed to inject meta messages")
            .validator(Validators.alwaysPass())
            .defaultValue("true")
            .build())
         .lifecycle()
         .metadata()
         .create();

Custom Sources

Custom sources may be implemented and used to access data sources for which Mantis does not have a Source Job. Implementers are free to implement the Source interface to fetch data from an external source. Here is an example in a source which implements the Source interface to consume data from Kafka.

Learning When Source Data is Incomplete

You may want to know whether or not the stream you are receiving from your source is complete. Streams may be incomplete for a number of reasons:

  1. A connection to one or more of the Source Job workers is lost.
  2. A connection exists but no data is flowing.
  3. Data is intentionally dropped from a source because of the backpressure strategy you are using.

You can use the following boolean method within your JobSource#call method to determine whether or not all of your client connections are complete:

DefaultSinkConnectionStatusObserver.getInstance(true).isConnectedToAllSinks()