com.netflix.logging.messaging
Class MessageBatcher<T>

java.lang.Object
  extended by com.netflix.logging.messaging.MessageBatcher<T>

public class MessageBatcher<T>
extends java.lang.Object

A general purpose batcher that combines messages into batches. Callers of process don't block. Configurable parameters control the number of messages that may be queued awaiting processing, the maximum size of a batch, the maximum time a message waits to be combined with others in a batch and the size of the pool of threads that process batches. The implementation aims to avoid congestion, by working more efficiently as load increases. As messages arrive faster, the collector executes less code and batch sizes increase (up to the configured maximum). It should be more efficient to process a batch than to process the messages individually. Here's how it works. Arriving messages are added to the queue. The collector thread takes messages from the queue and collects them into batches. When a batch is big enough or old enough, the collector passes it to the processor, which passes the batch to the target stream. The processor maintains a thread pool. If there's more work than threads, the collector participates in processing by default, and consequently stops collecting more batches.


Field Summary
protected  com.netflix.servo.monitor.Timer avgBatchSizeTracer
           
protected  com.netflix.servo.monitor.Timer batchSyncPutTracer
           
protected  com.netflix.logging.messaging.MessageBatcher.Collector collector
           
protected  java.util.concurrent.atomic.AtomicInteger concurrentBatches
          The number of batches that are currently being processed by the target stream.
protected static long maxDelay
           
protected  int maxMessages
           
protected  java.lang.String name
           
static java.lang.String POOL_KEEP_ALIVE_TIME
           
static java.lang.String POOL_MAX_THREADS
           
static java.lang.String POOL_MIN_THREADS
           
protected  java.util.concurrent.ThreadPoolExecutor processor
           
protected  com.netflix.servo.monitor.Timer processTimeTracer
           
protected  java.util.concurrent.BlockingQueue queue
           
protected  com.netflix.servo.monitor.Counter queueOverflowCounter
           
protected  com.netflix.servo.monitor.Timer queueSizeTracer
           
protected  MessageProcessor target
           
protected  com.netflix.servo.monitor.Timer threadSubmitTracer
           
 
Constructor Summary
MessageBatcher(java.lang.String name, MessageProcessor target)
           
 
Method Summary
 long getNumberAdded()
          Gets the statistics count of number of messages added to this batcher.
 long getNumberDropped()
          Gets the statistics count of number of messages dropped by this batcher.
 int getSize()
          The size of the the queue in which the messages are batches
 boolean isBlocking()
          Gets the information whether the batcher is blocking or not blocking.
 boolean isPaused()
           
 boolean isSpaceAvailable()
          Checks to see if there is space available in the queue
 void pause()
          Pause the collector.
 void process(java.util.List<T> objects)
          Processes the messages sent to the batcher.
 void process(java.util.List<T> objects, boolean sync)
          Processes the messages sent to the batcher.
 boolean process(T message)
          Processes the message sent to the batcher.
 void processSync(T message)
          Processes the message sent to the batcher.
 void resetNumberAdded()
          Resets the statistics that keeps the count of number of messages added to this batcher.
 void resetNumberDropped()
          Resets the statistics that keeps the count of number of messages dropped by this batcher.
 void resume()
          Resume the collector.
 void setBatchMaxDelay(double maxDelaySec)
          Set the maximum time a message spends waiting to complete a full batch, in seconds.
 void setBatchMaxMessages(int maxMessages)
          Set the maximum number of messages in a batch.
 void setTarget(MessageProcessor target)
          Set the stream that will process each batch of messages.
 void stop()
          Stops the batcher.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

name

protected java.lang.String name

queue

protected java.util.concurrent.BlockingQueue queue

maxMessages

protected int maxMessages

maxDelay

protected static long maxDelay

collector

protected com.netflix.logging.messaging.MessageBatcher.Collector collector

processor

protected java.util.concurrent.ThreadPoolExecutor processor

target

protected MessageProcessor target

concurrentBatches

protected final java.util.concurrent.atomic.AtomicInteger concurrentBatches
The number of batches that are currently being processed by the target stream.


queueSizeTracer

protected com.netflix.servo.monitor.Timer queueSizeTracer

batchSyncPutTracer

protected com.netflix.servo.monitor.Timer batchSyncPutTracer

threadSubmitTracer

protected com.netflix.servo.monitor.Timer threadSubmitTracer

processTimeTracer

protected com.netflix.servo.monitor.Timer processTimeTracer

avgBatchSizeTracer

protected com.netflix.servo.monitor.Timer avgBatchSizeTracer

queueOverflowCounter

protected com.netflix.servo.monitor.Counter queueOverflowCounter

POOL_MAX_THREADS

public static final java.lang.String POOL_MAX_THREADS
See Also:
Constant Field Values

POOL_MIN_THREADS

public static final java.lang.String POOL_MIN_THREADS
See Also:
Constant Field Values

POOL_KEEP_ALIVE_TIME

public static final java.lang.String POOL_KEEP_ALIVE_TIME
See Also:
Constant Field Values
Constructor Detail

MessageBatcher

public MessageBatcher(java.lang.String name,
                      MessageProcessor target)
Method Detail

setTarget

public void setTarget(MessageProcessor target)
Set the stream that will process each batch of messages.


setBatchMaxMessages

public void setBatchMaxMessages(int maxMessages)
Set the maximum number of messages in a batch. Setting this to 1 will prevent batching; that is, messages will be passed to target.processMessage one at a time.


setBatchMaxDelay

public void setBatchMaxDelay(double maxDelaySec)
Set the maximum time a message spends waiting to complete a full batch, in seconds. This doesn't limit the time spent in the queue.


isSpaceAvailable

public boolean isSpaceAvailable()
Checks to see if there is space available in the queue

Returns:
- true, if available false otherwise

process

public boolean process(T message)
Processes the message sent to the batcher. This method just writes the message to the queue and returns immediately. If the queue is full, the messages are dropped immediately and corresponding counter is incremented.

Parameters:
message - - The message to be processed
Returns:
boolean - true if the message is queued for processing,false(this could happen if the queue is full) otherwise

processSync

public void processSync(T message)
Processes the message sent to the batcher. This method tries to write to the queue. If the queue is full, the send blocks and waits for the available space.

Parameters:
message - - The message to be processed

process

public void process(java.util.List<T> objects)
Processes the messages sent to the batcher. This method just writes the message to the queue and returns immediately. If the queue is full, the messages are dropped immediately and corresponding counter is incremented.

Parameters:
message - - The messages to be processed

process

public void process(java.util.List<T> objects,
                    boolean sync)
Processes the messages sent to the batcher. The messages are first queued and then will be processed by the MessageProcessor

Parameters:
message - - The messages to be processed
sync - - if true, waits for the queue to make space, if false returns immediately after dropping the message

pause

public void pause()
Pause the collector. The collector stops picking up messages from the queue.


isPaused

public boolean isPaused()

resume

public void resume()
Resume the collector. The collector resumes picking up messages from the queue and calling the processors.


stop

public void stop()
Stops the batcher. The Batcher has to wait for the other processes like the Collector and the Executor to complete. It waits until it is notified that the other processes have completed gracefully. The collector waits until there are no more messages in the queue(tries 3 times waiting for 0.5 seconds each) and then shuts down gracefully.


getSize

@Monitor(name="batcherQueueSize",
         type=GAUGE)
public int getSize()
The size of the the queue in which the messages are batches


resetNumberAdded

public void resetNumberAdded()
Resets the statistics that keeps the count of number of messages added to this batcher.


resetNumberDropped

public void resetNumberDropped()
Resets the statistics that keeps the count of number of messages dropped by this batcher.


getNumberAdded

@Monitor(name="numberAdded",
         type=GAUGE)
public long getNumberAdded()
Gets the statistics count of number of messages added to this batcher.


getNumberDropped

@Monitor(name="numberDropped",
         type=GAUGE)
public long getNumberDropped()
Gets the statistics count of number of messages dropped by this batcher.


isBlocking

@Monitor(name="blocking",
         type=INFORMATIONAL)
public boolean isBlocking()
Gets the information whether the batcher is blocking or not blocking. By default, the batcher is non-blocking and the messages are just dropped if the queue is full. If the batcher is made blocking, the sends block and wait indefinitely until space is made in the batcher.

Returns:
- true if blocking, false otherwise