spring - Publish-Subscribe Channels Both Going to Kafka Result in Duplicate KafkaProducerContexts -
i attempting use spring integration send data 1 channel 2 different kafka queues after same data go through different transformations on way respective queues. problem apparently have duplicate producer contexts, , don't know why.
here flow configuration:
flow -> flow .channel(“firstchannel") .publishsubscribechannel(executors.newcachedthreadpool(), s -> s .subscribe(f -> f .transform(firsttransformer::transform) .channel(messagechannels.queue(50)) .handle(kafka.outboundchanneladapter(kafkaconfig) .addproducer(firstmetadata(), brokeraddress), e -> e.id(“firstkafkaoutboundchanneladapter") .autostartup(true) .poller(p -> p.fixeddelay(1000, timeunit.milliseconds).receivetimeout(0).taskexecutor(taskexecutor)) .get()) ) .subscribe(f -> f .transform(secondtransformer::transform) .channel(messagechannels.queue(50)) .handle(kafka.outboundchanneladapter(kafkaconfig) .addproducer(secondmetadata(), brokeraddress), e -> e.id(“secondkafkaoutboundchanneladapter") .autostartup(true) .poller(p -> p.fixeddelay(1000, timeunit.milliseconds).receivetimeout(0).taskexecutor(taskexecutor)) .get()) )); the exception this:
could not register object [org.springframework.integration.kafka.support.kafkaproducercontext@3163987e] under bean name 'not_specified': there object [org.springframework.integration.kafka.support.kafkaproducercontext@15f193b8] bound
i have tried using different kafkaconfig objects, hasn't helped. meanwhile, producermetadata instances distinct can see different first parameters addproducer. provide names of respective destination queues among other metadata.
it sounds there implicit bean definitions being created conflict each other.
how can resolve exception 2 kafkaproducercontexts?
you should not use .get() on kafkaproducermessagehandlerspec , let framework work out environment you.
the issue because kafkaproducermessagehandlerspec implements componentsregistration , no body cares the:
public collection<object> getcomponentstoregister() { this.kafkaproducercontext.setproducerconfigurations(this.producerconfigurations); return collections.<object>singleton(this.kafkaproducercontext); } after manual .get() invocation.
i agree, inconvenience , should find better solution end-application, there no yet choice, unless follow spec style framework components, kafka.outboundchanneladapter().
hope clear.
update
ok, it's issue on our side. , fix soon: https://jira.spring.io/browse/intext-216 https://jira.spring.io/browse/intext-217
meanwhile workaround this:
kafkaproducercontext kafkaproducercontext = (kafkaproducercontext) kafkaproducermessagehandlerspec.getcomponentstoregister().iterator().next(); kafkaproducercontext.setbeanname(null); where should move
kafka.outboundchanneladapter(kafkaconfig) .addproducer(firstmetadata(), brokeraddress) to separate private method access kafkaproducercontext.
Comments
Post a Comment