Details
-
Bug
-
Status: Closed
-
Medium
-
Resolution: Done
-
Frankfurt Release
-
None
Description
DMaaP message router Kafka consumer throws the following exceptions:
""2020-11-11 20:53:22,562 [qtp1022308509-2736905] INFO org.onap.dmaap.dmf.mr.service.impl.EventsServiceImpl - [AAI-EVENT/klondike-service-resolver/service-resolver-6c9ff868df-bf8mr] Time taken in getEvents Authorization 2 ms for AAI-EVENT klondike-service-resolver service-resolver-6c9ff868df-bf8mr""2020-11-11 20:53:22,562 [qtp1022308509-2736905] INFO org.onap.dmaap.dmf.mr.service.impl.EventsServiceImpl - [AAI-EVENT/klondike-service-resolver/service-resolver-6c9ff868df-bf8mr] Time taken in getEvents Authorization 2 ms for AAI-EVENT klondike-service-resolver service-resolver-6c9ff868df-bf8mr""2020-11-11 20:53:22,562 [qtp1022308509-2736905] INFO org.onap.dmaap.dmf.mr.service.impl.EventsServiceImpl - [AAI-EVENT/klondike-service-resolver/service-resolver-6c9ff868df-bf8mr] fetch: timeout=60000, limit=5, filter= from Remote host 10.42.34.100""2020-11-11 20:53:22,562 [qtp1022308509-2736905] INFO org.onap.dmaap.dmf.mr.beans.DMaaPCambriaLimiter - AAI-EVENT::klondike-service-resolver::service-resolver-6c9ff868df-bf8mr: 0.4 empty replies/minute.""2020-11-11 20:53:22,562 [qtp1022308509-2736905] INFO org.onap.dmaap.dmf.mr.service.impl.EventsServiceImpl - [AAI-EVENT/klondike-service-resolver/service-resolver-6c9ff868df-bf8mr] Time taken in getEvents getConsumerFor 2 ms for AAI-EVENT klondike-service-resolver service-resolver-6c9ff868df-bf8mr""2020-11-11 20:53:22,601 [qtp1022308509-2735594] INFO org.onap.dmaap.util.DMaaPAuthFilter - inside servlet filter Cambria Auth Headers checking before doing other Authentication""2020-11-11 20:53:22,605 [qtp1022308509-2735594] INFO org.onap.dmaap.DMaaPCambriaExceptionMapper - Reached Cambria Exception Mapper..""2020-11-11 20:53:22,661 [qtp1022308509-2736055] INFO org.onap.dmaap.dmf.mr.service.impl.EventsServiceImpl - [AAI-EVENT/klondike-service-resolver/service-resolver-6c9ff868df-bf8mr] Sent 0 msgs in 60127 ms; committed to offset 0 for AAI-EVENT klondike-service-resolver service-resolver-6c9ff868df-bf8mr on to the server 10.42.38.173""2020-11-11 20:53:22,661 [qtp1022308509-2736905] ERROR org.onap.dmaap.dmf.mr.backends.kafka.Kafka011Consumer - Exception in in Kafka consumer"java.util.concurrent.ExecutionException: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:206) at org.onap.dmaap.dmf.mr.backends.kafka.Kafka011Consumer.nextMessage(Kafka011Consumer.java:156) at org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream.forEachMessage(CambriaOutboundEventStream.java:405) at org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream.write(CambriaOutboundEventStream.java:271) at org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder.respondOkWithStream(DMaaPResponseBuilder.java:134) at org.onap.dmaap.dmf.mr.service.impl.EventsServiceImpl.respondOkWithStream(EventsServiceImpl.java:259) at org.onap.dmaap.dmf.mr.service.impl.EventsServiceImpl.getEvents(EventsServiceImpl.java:172) at org.onap.dmaap.service.EventsRestService.getEvents(EventsRestService.java:132) at sun.reflect.GeneratedMethodAccessor134.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.cxf.service.invoker.AbstractInvoker.performInvocation(AbstractInvoker.java:179) at org.apache.cxf.service.invoker.AbstractInvoker.invoke(AbstractInvoker.java:96) at org.apache.cxf.jaxrs.JAXRSInvoker.invoke(JAXRSInvoker.java:192) at org.apache.cxf.jaxrs.JAXRSInvoker.invoke(JAXRSInvoker.java:103) at org.apache.cxf.interceptor.ServiceInvokerInterceptor$1.run(ServiceInvokerInterceptor.java:59) at org.apache.cxf.interceptor.ServiceInvokerInterceptor.handleMessage(ServiceInvokerInterceptor.java:96) at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:308) at org.apache.cxf.transport.ChainInitiationObserver.onMessage(ChainInitiationObserver.java:121) at org.apache.camel.component.cxf.cxfbean.CxfBeanDestination.process(CxfBeanDestination.java:83) at org.apache.camel.impl.ProcessorEndpoint.onExchange(ProcessorEndpoint.java:103) at org.apache.camel.impl.ProcessorEndpoint$1.process(ProcessorEndpoint.java:71) at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61) at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:148) at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) at org.apache.camel.processor.Pipeline.process(Pipeline.java:138) at org.apache.camel.processor.Pipeline.process(Pipeline.java:101) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97) at org.apache.camel.http.common.CamelServlet.doService(CamelServlet.java:208) at org.apache.camel.http.common.CamelServlet.service(CamelServlet.java:78) at javax.servlet.http.HttpServlet.service(HttpServlet.java:790) at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:865) at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1655) at ajsc.filter.PassthruFilter.doFilter(PassthruFilter.java:26) at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:347) at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:263) at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642) at com.att.ajsc.csi.writeablerequestfilter.WriteableRequestFilter.doFilter(WriteableRequestFilter.java:41) at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642) at org.onap.dmaap.util.DMaaPAuthFilter.doFilter(DMaaPAuthFilter.java:69) at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1634) at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:533) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:146) at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:257) at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1595) at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1317) at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203) at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:473) at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1564) at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201) at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1219) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144) at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:220) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) at org.eclipse.jetty.server.Server.handle(Server.java:531) at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:352) at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:260) at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:281) at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:102) at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:118) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:762) at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:680) at java.lang.Thread.run(Thread.java:748)Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1674) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1031) at org.onap.dmaap.dmf.mr.backends.kafka.Kafka011Consumer$2.call(Kafka011Consumer.java:131) at org.onap.dmaap.dmf.mr.backends.kafka.Kafka011Consumer$2.call(Kafka011Consumer.java:125) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 common frames omitted"2020-11-11 20:53:22,661 [qtp1022308509-2736905] ERROR org.onap.dmaap.dmf.mr.backends.kafka.Kafka011Consumer - Exception in in Kafka consumer"java.util.concurrent.ExecutionException: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:206) at org.onap.dmaap.dmf.mr.backends.kafka.Kafka011Consumer.nextMessage(Kafka011Consumer.java:156) at org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream.forEachMessage(CambriaOutboundEventStream.java:405) at org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream.write(CambriaOutboundEventStream.java:271) at org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder.respondOkWithStream(DMaaPResponseBuilder.java:134) at org.onap.dmaap.dmf.mr.service.impl.EventsServiceImpl.respondOkWithStream(EventsServiceImpl.java:259) at org.onap.dmaap.dmf.mr.service.impl.EventsServiceImpl.getEvents(EventsServiceImpl.java:172) at org.onap.dmaap.service.EventsRestService.getEvents(EventsRestService.java:132) at sun.reflect.GeneratedMethodAccessor134.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.cxf.service.invoker.AbstractInvoker.performInvocation(AbstractInvoker.java:179) at org.apache.cxf.service.invoker.AbstractInvoker.invoke(AbstractInvoker.java:96) at org.apache.cxf.jaxrs.JAXRSInvoker.invoke(JAXRSInvoker.java:192) at org.apache.cxf.jaxrs.JAXRSInvoker.invoke(JAXRSInvoker.java:103) at org.apache.cxf.interceptor.ServiceInvokerInterceptor$1.run(ServiceInvokerInterceptor.java:59) at org.apache.cxf.interceptor.ServiceInvokerInterceptor.handleMessage(ServiceInvokerInterceptor.java:96) at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:308) at org.apache.cxf.transport.ChainInitiationObserver.onMessage(ChainInitiationObserver.java:121) at org.apache.camel.component.cxf.cxfbean.CxfBeanDestination.process(CxfBeanDestination.java:83) at org.apache.camel.impl.ProcessorEndpoint.onExchange(ProcessorEndpoint.java:103) at org.apache.camel.impl.ProcessorEndpoint$1.process(ProcessorEndpoint.java:71) at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61) at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:148) at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) at org.apache.camel.processor.Pipeline.process(Pipeline.java:138) at org.apache.camel.processor.Pipeline.process(Pipeline.java:101) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97) at org.apache.camel.http.common.CamelServlet.doService(CamelServlet.java:208) at org.apache.camel.http.common.CamelServlet.service(CamelServlet.java:78) at javax.servlet.http.HttpServlet.service(HttpServlet.java:790) at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:865) at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1655) at ajsc.filter.PassthruFilter.doFilter(PassthruFilter.java:26) at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:347) at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:263) at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642) at com.att.ajsc.csi.writeablerequestfilter.WriteableRequestFilter.doFilter(WriteableRequestFilter.java:41) at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642) at org.onap.dmaap.util.DMaaPAuthFilter.doFilter(DMaaPAuthFilter.java:69) at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1634) at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:533) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:146) at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:257) at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1595) at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1317) at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203) at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:473) at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1564) at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201) at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1219) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144) at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:220) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) at org.eclipse.jetty.server.Server.handle(Server.java:531) at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:352) at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:260) at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:281) at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:102) at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:118) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:762) at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:680) at java.lang.Thread.run(Thread.java:748)Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1674) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1031) at org.onap.dmaap.dmf.mr.backends.kafka.Kafka011Consumer$2.call(Kafka011Consumer.java:131) at org.onap.dmaap.dmf.mr.backends.kafka.Kafka011Consumer$2.call(Kafka011Consumer.java:125) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 common frames omitted20:53:22.661 [qtp1022308509-2736905] ERROR org.onap.dmaap.dmf.mr.backends.kafka.Kafka011Consumer - Exception in in Kafka consumerjava.util.concurrent.ExecutionException: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:206) at org.onap.dmaap.dmf.mr.backends.kafka.Kafka011Consumer.nextMessage(Kafka011Consumer.java:156) at org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream.forEachMessage(CambriaOutboundEventStream.java:405) at org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream.write(CambriaOutboundEventStream.java:271) at org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder.respondOkWithStream(DMaaPResponseBuilder.java:134) at org.onap.dmaap.dmf.mr.service.impl.EventsServiceImpl.respondOkWithStream(EventsServiceImpl.java:259) at org.onap.dmaap.dmf.mr.service.impl.EventsServiceImpl.getEvents(EventsServiceImpl.java:172) at org.onap.dmaap.service.EventsRestService.getEvents(EventsRestService.java:132) at sun.reflect.GeneratedMethodAccessor134.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.cxf.service.invoker.AbstractInvoker.performInvocation(AbstractInvoker.java:179) at org.apache.cxf.service.invoker.AbstractInvoker.invoke(AbstractInvoker.java:96) at org.apache.cxf.jaxrs.JAXRSInvoker.invoke(JAXRSInvoker.java:192) at org.apache.cxf.jaxrs.JAXRSInvoker.invoke(JAXRSInvoker.java:103) at org.apache.cxf.interceptor.ServiceInvokerInterceptor$1.run(ServiceInvokerInterceptor.java:59) at org.apache.cxf.interceptor.ServiceInvokerInterceptor.handleMessage(ServiceInvokerInterceptor.java:96) at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:308) at org.apache.cxf.transport.ChainInitiationObserver.onMessage(ChainInitiationObserver.java:121) at org.apache.camel.component.cxf.cxfbean.CxfBeanDestination.process(CxfBeanDestination.java:83) at org.apache.camel.impl.ProcessorEndpoint.onExchange(ProcessorEndpoint.java:103) at org.apache.camel.impl.ProcessorEndpoint$1.process(ProcessorEndpoint.java:71) at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61) at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:148) at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) at org.apache.camel.processor.Pipeline.process(Pipeline.java:138) at org.apache.camel.processor.Pipeline.process(Pipeline.java:101) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97) at org.apache.camel.http.common.CamelServlet.doService(CamelServlet.java:208) at org.apache.camel.http.common.CamelServlet.service(CamelServlet.java:78) at javax.servlet.http.HttpServlet.service(HttpServlet.java:790) at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:865) at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1655) at ajsc.filter.PassthruFilter.doFilter(PassthruFilter.java:26) at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:347) at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:263) at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642) at com.att.ajsc.csi.writeablerequestfilter.WriteableRequestFilter.doFilter(WriteableRequestFilter.java:41) at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642) at org.onap.dmaap.util.DMaaPAuthFilter.doFilter(DMaaPAuthFilter.java:69) at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1634) at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:533) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:146) at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:257) at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1595) at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1317) at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203) at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:473) at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1564) at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201) at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1219) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144) at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:220) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) at org.eclipse.jetty.server.Server.handle(Server.java:531) at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:352) at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:260) at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:281) at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:102) at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:118) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:762) at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:680) at java.lang.Thread.run(Thread.java:748)Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1674) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1031) at org.onap.dmaap.dmf.mr.backends.kafka.Kafka011Consumer$2.call(Kafka011Consumer.java:131) at org.onap.dmaap.dmf.mr.backends.kafka.Kafka011Consumer$2.call(Kafka011Consumer.java:125) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 common frames omitted
How to reproduce :
Send multiple concurrent requests getEvents endpoint : /events/my-topic/my-group/my-consumer-id
I sent ~10 requests to reproduce using attached script : test.sh
You should get the above exception. Since it's a race condition, it's possible that exception is not raised on first try. You might also need to increase the number of concurrent request.
Diagnostic :
Kafka Consumer doesn't handle multi-threading, see : https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
The consumer is not thread-safe
In getEvents() method :
consumer.commitOffsets();
Which leads us to : https://github.com/onap/dmaap-messagerouter-msgrtr/blob/8333cde6f59d9a3b341c7e762cf68df8f316b962/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java#L388
This call to the Kafka consumer is performed in an unsynchronized fashion that could end up in a race condition when several clients try to consume messages using the same topic name, group id and consumer id.
For the same topic/group/consumer-id, the DMaaP Kafka Consumer factory will instantiate and re-use the same consumer during a short period of time and then clean it. But if several clients try to consume at the same time, they will use the same instance across multiple thread (one thread per request), thus the consumers needs to be perfectly synchronized.
There is several other unsynchronized access to kafka consumer see :
- https://github.com/onap/dmaap-messagerouter-msgrtr/blob/8333cde6f59d9a3b341c7e762cf68df8f316b962/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java#L288
- https://github.com/onap/dmaap-messagerouter-msgrtr/blob/8333cde6f59d9a3b341c7e762cf68df8f316b962/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java#L209
Attachments
Issue Links
- relates to
-
DMAAP-896 [MR] Error: KafkaConsumer is not safe for multi-threaded access
-
- Closed
-