Netty
You can stream events on-demand from your application to be ingested directly into Mantis using the Mantis Publish library (sometimes casually referred to as "MRE").
Mantis Publish takes care of filtering the events you send into Mantis, and it will only transmit them over the network if a downstream consumer that is interested in such events is currently subscribed.
Mantis Publish contains a subscription registry where each client subscription is represented by its corresponding Mantis Query Language query. Mantis Publish evaluates all MQL queries from its subscription registry against each event as the event is generated by your application. It then tags events with all of the matching MQL queries and enriches the event with a superset of fields from the matched queries. That is, rather than emitting n events for n matching MQL queries, Mantis Publish will instead emit a single event containing all of the fields requested by the matched queries. This happens directly within your application before any events are sent over the network into Mantis. Further, Mantis Publish only dispatches events if a client is subscribed with a matching query. This means that you can freely produce events without incurring the cost until an active subscription exists.
Choose a Package¶
MRE works with both Guice-enabled and standalone injectors.
Guice-based Injector¶
implementation 'io.mantisrx:mantis-publish-netty-guice:1.2.+'
Enable the Publisher Client¶
Inject the MantisRealtimeEventsPublishModule
into your application.
In addition to injecting MantisRealtimeEventsPublishModule
you will also need to add the
ArchaiusModule
and the SpectatorModule
if not already injected.
Injector injector = Guice.createInjector(
new MyBasicModule(),
new ArchaiusModule(),
new MantisRealtimeEventsPublishModule(),
new SpectatorModule());
Standalone Injector¶
implementation 'io.mantisrx:mantis-publish-netty:1.2.+'
Enable the Publisher Client¶
Follow the example standalone initializer to manually inject dependencies.
Once you have constructed a MrePublishClientInitializer
, call the MantisPublishClientInitializer#start
method
to initialize underlying components.
Configure Where to Send Svents¶
You will need to configure the location of the Mantis API server for the mantis-publish library to bootstrap.
Add the following properties to your application.properties
:
mantis.publish.discovery.api.hostname=<IP of Mantis API>
# mantis api port
mantis.publish.discovery.api.port=<port for Mantis API>
# This application's name
mantis.publish.app.name=JavaApp
Send Events into Mantis¶
For each event your application wishes to send to Mantis, create a Event
object with your desired event fields,
and pass that Event
to the MantisEventPublisher#publish
method.
For example:
// Create an `Event` for Mantis Publish using your application event.
final Event event = new Event();
event.set("testKey", "testValue");
// Send your `Event` into Mantis.
// Note: This event will only be dispatched over the network
// if a subscription with a matching MQL query exists.
eventPublisher.publish(event);
Consuming a Mantis Stream¶
Visit the Querying page for details on how you can consume your application's event stream.
Configuration Options¶
There are a number of configuration options available to control the behavior of the publishing client.
Prefix:
mantis.publish
Name | Default | Description |
---|---|---|
enabled |
true | Determine if event processing is enabled. |
tee.enabled |
false | Allows events to simultaneously be sent to an external system outside of Mantis |
tee.stream |
default_stream | Specifies which external stream name tee will write to |
blacklist |
param.password | Comma separated list of field names where the value will be obfuscated |
max.num.streams |
5 | Maximum number of streams this application can create |
stream.inactive.duration.threshold.sec |
86400 (24 hours) | Maximum duration in seconds for the stream to be considered inactive if there are no events |
<stream name>.stream.queue.size |
1000 | Size of the blocking queue to hold events to be pushed for the specific stream |
max.subscriptions.per.stream.default |
20 | Default maximum number of subscriptions per stream. After the limit is reached, further subscriptions on that stream are rejected |
max.subscriptions.stream.<stream name> |
20 | Overrides the default maximum number of subscriptions for the specific stream |
subs.refresh.interval.sec |
1 | Interval in seconds when subscriptions are fetched. In the default implementation, subscriptions are fetched over http from the workers returned by Discovery API |
drainer.interval.msec |
100 | Interval in milliseconds when events are drained from the stream queue and delegated to underlying transmitter for sending |
subs.expiry.interval.sec |
300 | Duration in seconds between a subscription is last fetched and when it is removed |
jobdiscovery.refresh.interval.sec |
10 | Duration in seconds between workers are refreshed for a job cluster |
jobcluster.mapping.refresh.interval.sec |
60 | Duration in seconds between job cluster mapping is refreshed for the current application |
deepcopy.eventmap.enabled |
true | Determine if event processing should operate on a deep copy of the event. Otherwise the event object is processed directly |
subs.refresh.max.num.workers |
3 | Maximum number of mantis workers to fetch subscription from. Workers are randomly chosen from the list returned by Discovery API |
subs.fetch.query.params.string |
"" | Additional query params to pass to the api call to fetch subscription. It should be of the form "param1=value1¶m2=value2" |
channel.gzip.enabled |
true | Netty channel configuration for pushing events. Determine if events should be gzip encoded when send over the channel |
channel.idleTimeout.sec |
300 | Netty channel configuration for pushing events. Write idle timeout in seconds for the channel |
channel.writeTimeout.sec |
1 | Netty channel configuration for pushing events. Write timeout in seconds for the channel |
channel.httpChunkSize.bytes |
32768 (32 KiB) | Netty channel configuration for pushing events. Chunked size in bytes of the channel content. It is used by HttpObjectAggregator |
channel.flushInterval.msec |
50 | Netty channel configuration for pushing events. Maximum duration in milliseconds between content flushes |
channel.flushInterval.bytes |
524288 | Netty channel configuration for pushing events. Content is flushed when aggregated event size is above this threshold |
channel.lowWriteBufferWatermark.bytes |
1572864 | Netty channel configuration for pushing events. Used for setting write buffer watermark |
channel.highWriteBufferWatermark.bytes |
2097152 | Netty channel configuration for pushing events. Used for setting write buffer watermark |
channel.ioThreads |
1 | Netty channel configuration for pushing events. Number of threads in the eventLoopGroup |
channel.compressionThreads |
1 | Netty channel configuration for pushing events. Number of threads in the encoderEventLoopGroup when gzip is enabled |
workerpool.capacity |
1000 | Size of the pool of Mantis workers to push events to |
workerpool.refresh.internal.sec |
10 | Duration in seconds between Mantis workers are refreshed in the pool |
workerpool.worker.error.quota |
60 | Number of errors to receive from a Mantis worker before it is blacklisted in the pool |
workerpool.worker.error.timeout.sec |
300 | Duration in seconds after which a blacklisted Mantis worker may be reconsidered for selection |