Uploaded image for project: 'Data Movement as a Platform'
  1. Data Movement as a Platform
  2. DMAAP-1513

Message router getEvents race condition on Kafka consumer

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Medium Medium
    • None
    • Frankfurt Release

      DMaaP message router Kafka consumer throws the following exceptions:

      ""2020-11-11 20:53:22,562 [qtp1022308509-2736905] INFO  org.onap.dmaap.dmf.mr.service.impl.EventsServiceImpl - [AAI-EVENT/klondike-service-resolver/service-resolver-6c9ff868df-bf8mr] Time taken in getEvents Authorization 2 ms for AAI-EVENT klondike-service-resolver service-resolver-6c9ff868df-bf8mr""2020-11-11 20:53:22,562 [qtp1022308509-2736905] INFO  org.onap.dmaap.dmf.mr.service.impl.EventsServiceImpl - [AAI-EVENT/klondike-service-resolver/service-resolver-6c9ff868df-bf8mr] Time taken in getEvents Authorization 2 ms for AAI-EVENT klondike-service-resolver service-resolver-6c9ff868df-bf8mr""2020-11-11 20:53:22,562 [qtp1022308509-2736905] INFO  org.onap.dmaap.dmf.mr.service.impl.EventsServiceImpl - [AAI-EVENT/klondike-service-resolver/service-resolver-6c9ff868df-bf8mr] fetch: timeout=60000, limit=5, filter= from Remote host 10.42.34.100""2020-11-11 20:53:22,562 [qtp1022308509-2736905] INFO  org.onap.dmaap.dmf.mr.beans.DMaaPCambriaLimiter - AAI-EVENT::klondike-service-resolver::service-resolver-6c9ff868df-bf8mr: 0.4 empty replies/minute.""2020-11-11 20:53:22,562 [qtp1022308509-2736905] INFO  org.onap.dmaap.dmf.mr.service.impl.EventsServiceImpl - [AAI-EVENT/klondike-service-resolver/service-resolver-6c9ff868df-bf8mr] Time taken in getEvents getConsumerFor 2 ms for AAI-EVENT klondike-service-resolver service-resolver-6c9ff868df-bf8mr""2020-11-11 20:53:22,601 [qtp1022308509-2735594] INFO  org.onap.dmaap.util.DMaaPAuthFilter - inside servlet filter Cambria Auth Headers checking before doing other Authentication""2020-11-11 20:53:22,605 [qtp1022308509-2735594] INFO  org.onap.dmaap.DMaaPCambriaExceptionMapper - Reached Cambria Exception Mapper..""2020-11-11 20:53:22,661 [qtp1022308509-2736055] INFO  org.onap.dmaap.dmf.mr.service.impl.EventsServiceImpl - [AAI-EVENT/klondike-service-resolver/service-resolver-6c9ff868df-bf8mr] Sent 0 msgs in 60127 ms; committed to offset 0 for AAI-EVENT klondike-service-resolver service-resolver-6c9ff868df-bf8mr on to the server 10.42.38.173""2020-11-11 20:53:22,661 [qtp1022308509-2736905] ERROR org.onap.dmaap.dmf.mr.backends.kafka.Kafka011Consumer - Exception in in Kafka consumer"java.util.concurrent.ExecutionException: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:206) at org.onap.dmaap.dmf.mr.backends.kafka.Kafka011Consumer.nextMessage(Kafka011Consumer.java:156) at org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream.forEachMessage(CambriaOutboundEventStream.java:405) at org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream.write(CambriaOutboundEventStream.java:271) at org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder.respondOkWithStream(DMaaPResponseBuilder.java:134) at org.onap.dmaap.dmf.mr.service.impl.EventsServiceImpl.respondOkWithStream(EventsServiceImpl.java:259) at org.onap.dmaap.dmf.mr.service.impl.EventsServiceImpl.getEvents(EventsServiceImpl.java:172) at org.onap.dmaap.service.EventsRestService.getEvents(EventsRestService.java:132) at sun.reflect.GeneratedMethodAccessor134.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.cxf.service.invoker.AbstractInvoker.performInvocation(AbstractInvoker.java:179) at org.apache.cxf.service.invoker.AbstractInvoker.invoke(AbstractInvoker.java:96) at org.apache.cxf.jaxrs.JAXRSInvoker.invoke(JAXRSInvoker.java:192) at org.apache.cxf.jaxrs.JAXRSInvoker.invoke(JAXRSInvoker.java:103) at org.apache.cxf.interceptor.ServiceInvokerInterceptor$1.run(ServiceInvokerInterceptor.java:59) at org.apache.cxf.interceptor.ServiceInvokerInterceptor.handleMessage(ServiceInvokerInterceptor.java:96) at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:308) at org.apache.cxf.transport.ChainInitiationObserver.onMessage(ChainInitiationObserver.java:121) at org.apache.camel.component.cxf.cxfbean.CxfBeanDestination.process(CxfBeanDestination.java:83) at org.apache.camel.impl.ProcessorEndpoint.onExchange(ProcessorEndpoint.java:103) at org.apache.camel.impl.ProcessorEndpoint$1.process(ProcessorEndpoint.java:71) at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61) at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:148) at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) at org.apache.camel.processor.Pipeline.process(Pipeline.java:138) at org.apache.camel.processor.Pipeline.process(Pipeline.java:101) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97) at org.apache.camel.http.common.CamelServlet.doService(CamelServlet.java:208) at org.apache.camel.http.common.CamelServlet.service(CamelServlet.java:78) at javax.servlet.http.HttpServlet.service(HttpServlet.java:790) at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:865) at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1655) at ajsc.filter.PassthruFilter.doFilter(PassthruFilter.java:26) at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:347) at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:263) at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642) at com.att.ajsc.csi.writeablerequestfilter.WriteableRequestFilter.doFilter(WriteableRequestFilter.java:41) at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642) at org.onap.dmaap.util.DMaaPAuthFilter.doFilter(DMaaPAuthFilter.java:69) at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1634) at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:533) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:146) at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:257) at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1595) at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1317) at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203) at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:473) at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1564) at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201) at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1219) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144) at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:220) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) at org.eclipse.jetty.server.Server.handle(Server.java:531) at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:352) at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:260) at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:281) at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:102) at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:118) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:762) at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:680) at java.lang.Thread.run(Thread.java:748)Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1674) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1031) at org.onap.dmaap.dmf.mr.backends.kafka.Kafka011Consumer$2.call(Kafka011Consumer.java:131) at org.onap.dmaap.dmf.mr.backends.kafka.Kafka011Consumer$2.call(Kafka011Consumer.java:125) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 common frames omitted"2020-11-11 20:53:22,661 [qtp1022308509-2736905] ERROR org.onap.dmaap.dmf.mr.backends.kafka.Kafka011Consumer - Exception in in Kafka consumer"java.util.concurrent.ExecutionException: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:206) at org.onap.dmaap.dmf.mr.backends.kafka.Kafka011Consumer.nextMessage(Kafka011Consumer.java:156) at org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream.forEachMessage(CambriaOutboundEventStream.java:405) at org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream.write(CambriaOutboundEventStream.java:271) at org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder.respondOkWithStream(DMaaPResponseBuilder.java:134) at org.onap.dmaap.dmf.mr.service.impl.EventsServiceImpl.respondOkWithStream(EventsServiceImpl.java:259) at org.onap.dmaap.dmf.mr.service.impl.EventsServiceImpl.getEvents(EventsServiceImpl.java:172) at org.onap.dmaap.service.EventsRestService.getEvents(EventsRestService.java:132) at sun.reflect.GeneratedMethodAccessor134.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.cxf.service.invoker.AbstractInvoker.performInvocation(AbstractInvoker.java:179) at org.apache.cxf.service.invoker.AbstractInvoker.invoke(AbstractInvoker.java:96) at org.apache.cxf.jaxrs.JAXRSInvoker.invoke(JAXRSInvoker.java:192) at org.apache.cxf.jaxrs.JAXRSInvoker.invoke(JAXRSInvoker.java:103) at org.apache.cxf.interceptor.ServiceInvokerInterceptor$1.run(ServiceInvokerInterceptor.java:59) at org.apache.cxf.interceptor.ServiceInvokerInterceptor.handleMessage(ServiceInvokerInterceptor.java:96) at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:308) at org.apache.cxf.transport.ChainInitiationObserver.onMessage(ChainInitiationObserver.java:121) at org.apache.camel.component.cxf.cxfbean.CxfBeanDestination.process(CxfBeanDestination.java:83) at org.apache.camel.impl.ProcessorEndpoint.onExchange(ProcessorEndpoint.java:103) at org.apache.camel.impl.ProcessorEndpoint$1.process(ProcessorEndpoint.java:71) at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61) at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:148) at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) at org.apache.camel.processor.Pipeline.process(Pipeline.java:138) at org.apache.camel.processor.Pipeline.process(Pipeline.java:101) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97) at org.apache.camel.http.common.CamelServlet.doService(CamelServlet.java:208) at org.apache.camel.http.common.CamelServlet.service(CamelServlet.java:78) at javax.servlet.http.HttpServlet.service(HttpServlet.java:790) at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:865) at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1655) at ajsc.filter.PassthruFilter.doFilter(PassthruFilter.java:26) at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:347) at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:263) at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642) at com.att.ajsc.csi.writeablerequestfilter.WriteableRequestFilter.doFilter(WriteableRequestFilter.java:41) at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642) at org.onap.dmaap.util.DMaaPAuthFilter.doFilter(DMaaPAuthFilter.java:69) at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1634) at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:533) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:146) at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:257) at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1595) at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1317) at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203) at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:473) at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1564) at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201) at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1219) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144) at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:220) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) at org.eclipse.jetty.server.Server.handle(Server.java:531) at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:352) at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:260) at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:281) at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:102) at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:118) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:762) at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:680) at java.lang.Thread.run(Thread.java:748)Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1674) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1031) at org.onap.dmaap.dmf.mr.backends.kafka.Kafka011Consumer$2.call(Kafka011Consumer.java:131) at org.onap.dmaap.dmf.mr.backends.kafka.Kafka011Consumer$2.call(Kafka011Consumer.java:125) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 common frames omitted20:53:22.661 [qtp1022308509-2736905] ERROR org.onap.dmaap.dmf.mr.backends.kafka.Kafka011Consumer - Exception in in Kafka consumerjava.util.concurrent.ExecutionException: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:206) at org.onap.dmaap.dmf.mr.backends.kafka.Kafka011Consumer.nextMessage(Kafka011Consumer.java:156) at org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream.forEachMessage(CambriaOutboundEventStream.java:405) at org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream.write(CambriaOutboundEventStream.java:271) at org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder.respondOkWithStream(DMaaPResponseBuilder.java:134) at org.onap.dmaap.dmf.mr.service.impl.EventsServiceImpl.respondOkWithStream(EventsServiceImpl.java:259) at org.onap.dmaap.dmf.mr.service.impl.EventsServiceImpl.getEvents(EventsServiceImpl.java:172) at org.onap.dmaap.service.EventsRestService.getEvents(EventsRestService.java:132) at sun.reflect.GeneratedMethodAccessor134.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.cxf.service.invoker.AbstractInvoker.performInvocation(AbstractInvoker.java:179) at org.apache.cxf.service.invoker.AbstractInvoker.invoke(AbstractInvoker.java:96) at org.apache.cxf.jaxrs.JAXRSInvoker.invoke(JAXRSInvoker.java:192) at org.apache.cxf.jaxrs.JAXRSInvoker.invoke(JAXRSInvoker.java:103) at org.apache.cxf.interceptor.ServiceInvokerInterceptor$1.run(ServiceInvokerInterceptor.java:59) at org.apache.cxf.interceptor.ServiceInvokerInterceptor.handleMessage(ServiceInvokerInterceptor.java:96) at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:308) at org.apache.cxf.transport.ChainInitiationObserver.onMessage(ChainInitiationObserver.java:121) at org.apache.camel.component.cxf.cxfbean.CxfBeanDestination.process(CxfBeanDestination.java:83) at org.apache.camel.impl.ProcessorEndpoint.onExchange(ProcessorEndpoint.java:103) at org.apache.camel.impl.ProcessorEndpoint$1.process(ProcessorEndpoint.java:71) at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61) at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:148) at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) at org.apache.camel.processor.Pipeline.process(Pipeline.java:138) at org.apache.camel.processor.Pipeline.process(Pipeline.java:101) at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97) at org.apache.camel.http.common.CamelServlet.doService(CamelServlet.java:208) at org.apache.camel.http.common.CamelServlet.service(CamelServlet.java:78) at javax.servlet.http.HttpServlet.service(HttpServlet.java:790) at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:865) at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1655) at ajsc.filter.PassthruFilter.doFilter(PassthruFilter.java:26) at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:347) at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:263) at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642) at com.att.ajsc.csi.writeablerequestfilter.WriteableRequestFilter.doFilter(WriteableRequestFilter.java:41) at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642) at org.onap.dmaap.util.DMaaPAuthFilter.doFilter(DMaaPAuthFilter.java:69) at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1634) at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:533) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:146) at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:548) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:257) at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1595) at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1317) at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203) at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:473) at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1564) at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201) at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1219) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144) at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:220) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) at org.eclipse.jetty.server.Server.handle(Server.java:531) at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:352) at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:260) at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:281) at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:102) at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:118) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:762) at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:680) at java.lang.Thread.run(Thread.java:748)Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1674) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1031) at org.onap.dmaap.dmf.mr.backends.kafka.Kafka011Consumer$2.call(Kafka011Consumer.java:131) at org.onap.dmaap.dmf.mr.backends.kafka.Kafka011Consumer$2.call(Kafka011Consumer.java:125) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 common frames omitted
      

       

      How to reproduce :

      Send multiple concurrent requests getEvents endpoint : /events/my-topic/my-group/my-consumer-id

      I sent  ~10 requests to reproduce using attached script : test.sh

      You should get the above exception. Since it's a race condition, it's possible that exception is not raised on first try. You might also need to increase the number of concurrent request.

       

      Diagnostic :

      Kafka Consumer doesn't handle multi-threading, see : https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

      The consumer is not thread-safe

       

      See : https://github.com/onap/dmaap-messagerouter-msgrtr/blob/master/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java#L175

      In getEvents() method :

      consumer.commitOffsets();
      

      Which leads us to : https://github.com/onap/dmaap-messagerouter-msgrtr/blob/8333cde6f59d9a3b341c7e762cf68df8f316b962/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java#L388

      This call to the Kafka consumer is performed in an unsynchronized fashion that could end up in a race condition when several clients try to consume messages using the same topic name, group id and consumer id.

       

      For the same topic/group/consumer-id, the DMaaP Kafka Consumer factory will instantiate and re-use the same consumer during a short period of time and then clean it. But if several clients try to consume at the same time, they will use the same instance across multiple thread (one thread per request), thus the consumers needs to be perfectly synchronized.

       

      There is several other unsynchronized access to kafka consumer see :

       

            jfontaine49 jfontaine49
            jfontaine49 jfontaine49
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated:
              Resolved: