|
|||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | ||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |
java.lang.Objectcom.netflix.logging.messaging.MessageBatcher<T>
public class MessageBatcher<T>
A general purpose batcher that combines messages into batches. Callers of process don't block. Configurable parameters control the number of messages that may be queued awaiting processing, the maximum size of a batch, the maximum time a message waits to be combined with others in a batch and the size of the pool of threads that process batches. The implementation aims to avoid congestion, by working more efficiently as load increases. As messages arrive faster, the collector executes less code and batch sizes increase (up to the configured maximum). It should be more efficient to process a batch than to process the messages individually. Here's how it works. Arriving messages are added to the queue. The collector thread takes messages from the queue and collects them into batches. When a batch is big enough or old enough, the collector passes it to the processor, which passes the batch to the target stream. The processor maintains a thread pool. If there's more work than threads, the collector participates in processing by default, and consequently stops collecting more batches.
Field Summary | |
---|---|
protected com.netflix.servo.monitor.Timer |
avgBatchSizeTracer
|
protected com.netflix.servo.monitor.Timer |
batchSyncPutTracer
|
protected com.netflix.logging.messaging.MessageBatcher.Collector |
collector
|
protected java.util.concurrent.atomic.AtomicInteger |
concurrentBatches
The number of batches that are currently being processed by the target stream. |
protected static long |
maxDelay
|
protected int |
maxMessages
|
protected java.lang.String |
name
|
static java.lang.String |
POOL_KEEP_ALIVE_TIME
|
static java.lang.String |
POOL_MAX_THREADS
|
static java.lang.String |
POOL_MIN_THREADS
|
protected java.util.concurrent.ThreadPoolExecutor |
processor
|
protected com.netflix.servo.monitor.Timer |
processTimeTracer
|
protected java.util.concurrent.BlockingQueue |
queue
|
protected com.netflix.servo.monitor.Counter |
queueOverflowCounter
|
protected com.netflix.servo.monitor.Timer |
queueSizeTracer
|
protected MessageProcessor |
target
|
protected com.netflix.servo.monitor.Timer |
threadSubmitTracer
|
Constructor Summary | |
---|---|
MessageBatcher(java.lang.String name,
MessageProcessor target)
|
Method Summary | |
---|---|
long |
getNumberAdded()
Gets the statistics count of number of messages added to this batcher. |
long |
getNumberDropped()
Gets the statistics count of number of messages dropped by this batcher. |
int |
getSize()
The size of the the queue in which the messages are batches |
boolean |
isBlocking()
Gets the information whether the batcher is blocking or not blocking. |
boolean |
isPaused()
|
boolean |
isSpaceAvailable()
Checks to see if there is space available in the queue |
void |
pause()
Pause the collector. |
void |
process(java.util.List<T> objects)
Processes the messages sent to the batcher. |
void |
process(java.util.List<T> objects,
boolean sync)
Processes the messages sent to the batcher. |
boolean |
process(T message)
Processes the message sent to the batcher. |
void |
processSync(T message)
Processes the message sent to the batcher. |
void |
resetNumberAdded()
Resets the statistics that keeps the count of number of messages added to this batcher. |
void |
resetNumberDropped()
Resets the statistics that keeps the count of number of messages dropped by this batcher. |
void |
resume()
Resume the collector. |
void |
setBatchMaxDelay(double maxDelaySec)
Set the maximum time a message spends waiting to complete a full batch, in seconds. |
void |
setBatchMaxMessages(int maxMessages)
Set the maximum number of messages in a batch. |
void |
setTarget(MessageProcessor target)
Set the stream that will process each batch of messages. |
void |
stop()
Stops the batcher. |
Methods inherited from class java.lang.Object |
---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
Field Detail |
---|
protected java.lang.String name
protected java.util.concurrent.BlockingQueue queue
protected int maxMessages
protected static long maxDelay
protected com.netflix.logging.messaging.MessageBatcher.Collector collector
protected java.util.concurrent.ThreadPoolExecutor processor
protected MessageProcessor target
protected final java.util.concurrent.atomic.AtomicInteger concurrentBatches
protected com.netflix.servo.monitor.Timer queueSizeTracer
protected com.netflix.servo.monitor.Timer batchSyncPutTracer
protected com.netflix.servo.monitor.Timer threadSubmitTracer
protected com.netflix.servo.monitor.Timer processTimeTracer
protected com.netflix.servo.monitor.Timer avgBatchSizeTracer
protected com.netflix.servo.monitor.Counter queueOverflowCounter
public static final java.lang.String POOL_MAX_THREADS
public static final java.lang.String POOL_MIN_THREADS
public static final java.lang.String POOL_KEEP_ALIVE_TIME
Constructor Detail |
---|
public MessageBatcher(java.lang.String name, MessageProcessor target)
Method Detail |
---|
public void setTarget(MessageProcessor target)
public void setBatchMaxMessages(int maxMessages)
public void setBatchMaxDelay(double maxDelaySec)
public boolean isSpaceAvailable()
public boolean process(T message)
message
- - The message to be processed
public void processSync(T message)
message
- - The message to be processedpublic void process(java.util.List<T> objects)
message
- - The messages to be processedpublic void process(java.util.List<T> objects, boolean sync)
MessageProcessor
message
- - The messages to be processedsync
- - if true, waits for the queue to make space, if false returns
immediately after dropping the messagepublic void pause()
public boolean isPaused()
public void resume()
public void stop()
@Monitor(name="batcherQueueSize", type=GAUGE) public int getSize()
public void resetNumberAdded()
public void resetNumberDropped()
@Monitor(name="numberAdded", type=GAUGE) public long getNumberAdded()
@Monitor(name="numberDropped", type=GAUGE) public long getNumberDropped()
@Monitor(name="blocking", type=INFORMATIONAL) public boolean isBlocking()
|
|||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | ||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |