-
Bug
-
Resolution: Done
-
High
-
Frankfurt Release
-
None
emphasized textEither the sendBatch or the sendBatchWithResponse has to be explicitly called by the publisher for pushing the data into the topic. As per the documentation the messages have to be published either after the max aging is reached or the max batch size has reached. For ex: the publisher class should be only calling the "send" method as and when there is message to be published and the "sendBatchWithResponse" is not required to be explicitly invoked.
Suggested fix :
Existing code - MRSimplerBatchPublisher.java
private synchronized boolean shouldSendNow() {private synchronized boolean shouldSendNow() { boolean shouldSend = false; if (fPending.isEmpty())
{ final long nowMs = Clock.now(); shouldSend = (fPending.size() >= fMaxBatchSize); if (!shouldSend) \{ final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs; shouldSend = sendAtMs <= nowMs; }// however, wait after an error shouldSend = shouldSend && nowMs >= fDontSendUntilMs; } return shouldSend; }
Proposed change -
private synchronized boolean shouldSendNow() {private synchronized boolean shouldSendNow() { boolean shouldSend = false; if (!fPending.isEmpty()) { final long nowMs = Clock.now(); shouldSend = (fPending.size() >= fMaxBatchSize); if (!shouldSend) { final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs; shouldSend = sendAtMs <= nowMs; }
// however, wait after an error shouldSend = shouldSend && nowMs >= fDontSendUntilMs; } return shouldSend; }