Skip to content

Netty

You can stream events on-demand from your application to be ingested directly into Mantis using the Mantis Publish library (sometimes casually referred to as "MRE").

Mantis Publish takes care of filtering the events you send into Mantis, and it will only transmit them over the network if a downstream consumer that is interested in such events is currently subscribed.

Mantis Publish contains a subscription registry where each client subscription is represented by its corresponding Mantis Query Language query. Mantis Publish evaluates all MQL queries from its subscription registry against each event as the event is generated by your application. It then tags events with all of the matching MQL queries and enriches the event with a superset of fields from the matched queries. That is, rather than emitting n events for n matching MQL queries, Mantis Publish will instead emit a single event containing all of the fields requested by the matched queries. This happens directly within your application before any events are sent over the network into Mantis. Further, Mantis Publish only dispatches events if a client is subscribed with a matching query. This means that you can freely produce events without incurring the cost until an active subscription exists.

Choose a Package

MRE works with both Guice-enabled and standalone injectors.

Guice-based Injector

implementation 'io.mantisrx:mantis-publish-netty-guice:1.2.+'

Enable the Publisher Client

Inject the MantisRealtimeEventsPublishModule into your application. In addition to injecting MantisRealtimeEventsPublishModule you will also need to add the ArchaiusModule and the SpectatorModule if not already injected.

Injector injector = Guice.createInjector(
    new MyBasicModule(),
    new ArchaiusModule(),
    new MantisRealtimeEventsPublishModule(),
    new SpectatorModule());

Standalone Injector

implementation 'io.mantisrx:mantis-publish-netty:1.2.+'

Enable the Publisher Client

Follow the example standalone initializer to manually inject dependencies. Once you have constructed a MrePublishClientInitializer, call the MantisPublishClientInitializer#start method to initialize underlying components.

Configure Where to Send Svents

You will need to configure the location of the Mantis API server for the mantis-publish library to bootstrap.

Add the following properties to your application.properties:

mantis.publish.discovery.api.hostname=<IP of Mantis API>

# mantis api port
mantis.publish.discovery.api.port=<port for Mantis API>

# This application's name
mantis.publish.app.name=JavaApp

Send Events into Mantis

For each event your application wishes to send to Mantis, create a Event object with your desired event fields, and pass that Event to the MantisEventPublisher#publish method.

For example:

// Create an `Event` for Mantis Publish using your application event.
final Event event = new Event();
event.set("testKey", "testValue");

// Send your `Event` into Mantis.
// Note: This event will only be dispatched over the network
// if a subscription with a matching MQL query exists.
eventPublisher.publish(event);

Consuming a Mantis Stream

Visit the Querying page for details on how you can consume your application's event stream.

Configuration Options

There are a number of configuration options available to control the behavior of the publishing client.

Prefix:

mantis.publish

Name Default Description
enabled true Determine if event processing is enabled.
tee.enabled false Allows events to simultaneously be sent to an external system outside of Mantis
tee.stream default_stream Specifies which external stream name tee will write to
blacklist param.password Comma separated list of field names where the value will be obfuscated
max.num.streams 5 Maximum number of streams this application can create
stream.inactive.duration.threshold.sec 86400 (24 hours) Maximum duration in seconds for the stream to be considered inactive if there are no events
<stream name>.stream.queue.size 1000 Size of the blocking queue to hold events to be pushed for the specific stream
max.subscriptions.per.stream.default 20 Default maximum number of subscriptions per stream. After the limit is reached, further subscriptions on that stream are rejected
max.subscriptions.stream.<stream name> 20 Overrides the default maximum number of subscriptions for the specific stream
subs.refresh.interval.sec 1 Interval in seconds when subscriptions are fetched. In the default implementation, subscriptions are fetched over http from the workers returned by Discovery API
drainer.interval.msec 100 Interval in milliseconds when events are drained from the stream queue and delegated to underlying transmitter for sending
subs.expiry.interval.sec 300 Duration in seconds between a subscription is last fetched and when it is removed
jobdiscovery.refresh.interval.sec 10 Duration in seconds between workers are refreshed for a job cluster
jobcluster.mapping.refresh.interval.sec 60 Duration in seconds between job cluster mapping is refreshed for the current application
deepcopy.eventmap.enabled true Determine if event processing should operate on a deep copy of the event. Otherwise the event object is processed directly
subs.refresh.max.num.workers 3 Maximum number of mantis workers to fetch subscription from. Workers are randomly chosen from the list returned by Discovery API
subs.fetch.query.params.string "" Additional query params to pass to the api call to fetch subscription. It should be of the form "param1=value1&param2=value2"
channel.gzip.enabled true Netty channel configuration for pushing events. Determine if events should be gzip encoded when send over the channel
channel.idleTimeout.sec 300 Netty channel configuration for pushing events. Write idle timeout in seconds for the channel
channel.writeTimeout.sec 1 Netty channel configuration for pushing events. Write timeout in seconds for the channel
channel.httpChunkSize.bytes 32768 (32 KiB) Netty channel configuration for pushing events. Chunked size in bytes of the channel content. It is used by HttpObjectAggregator
channel.flushInterval.msec 50 Netty channel configuration for pushing events. Maximum duration in milliseconds between content flushes
channel.flushInterval.bytes 524288 Netty channel configuration for pushing events. Content is flushed when aggregated event size is above this threshold
channel.lowWriteBufferWatermark.bytes 1572864 Netty channel configuration for pushing events. Used for setting write buffer watermark
channel.highWriteBufferWatermark.bytes 2097152 Netty channel configuration for pushing events. Used for setting write buffer watermark
channel.ioThreads 1 Netty channel configuration for pushing events. Number of threads in the eventLoopGroup
channel.compressionThreads 1 Netty channel configuration for pushing events. Number of threads in the encoderEventLoopGroup when gzip is enabled
workerpool.capacity 1000 Size of the pool of Mantis workers to push events to
workerpool.refresh.internal.sec 10 Duration in seconds between Mantis workers are refreshed in the pool
workerpool.worker.error.quota 60 Number of errors to receive from a Mantis worker before it is blacklisted in the pool
workerpool.worker.error.timeout.sec 300 Duration in seconds after which a blacklisted Mantis worker may be reconsidered for selection