spring - Publish-Subscribe Channels Both Going to Kafka Result in Duplicate KafkaProducerContexts -


i attempting use spring integration send data 1 channel 2 different kafka queues after same data go through different transformations on way respective queues. problem apparently have duplicate producer contexts, , don't know why.

here flow configuration:

flow -> flow         .channel(“firstchannel")         .publishsubscribechannel(executors.newcachedthreadpool(), s -> s                 .subscribe(f -> f                                 .transform(firsttransformer::transform)                                 .channel(messagechannels.queue(50))                                 .handle(kafka.outboundchanneladapter(kafkaconfig)                                         .addproducer(firstmetadata(), brokeraddress), e -> e.id(“firstkafkaoutboundchanneladapter")                                         .autostartup(true)                                         .poller(p -> p.fixeddelay(1000, timeunit.milliseconds).receivetimeout(0).taskexecutor(taskexecutor))                                         .get())                 )                 .subscribe(f -> f                                 .transform(secondtransformer::transform)                                 .channel(messagechannels.queue(50))                                 .handle(kafka.outboundchanneladapter(kafkaconfig)                                         .addproducer(secondmetadata(), brokeraddress), e -> e.id(“secondkafkaoutboundchanneladapter")                                         .autostartup(true)                                         .poller(p -> p.fixeddelay(1000, timeunit.milliseconds).receivetimeout(0).taskexecutor(taskexecutor))                                         .get())                 )); 

the exception this:

could not register object [org.springframework.integration.kafka.support.kafkaproducercontext@3163987e] under bean name 'not_specified': there object [org.springframework.integration.kafka.support.kafkaproducercontext@15f193b8] bound

i have tried using different kafkaconfig objects, hasn't helped. meanwhile, producermetadata instances distinct can see different first parameters addproducer. provide names of respective destination queues among other metadata.

it sounds there implicit bean definitions being created conflict each other.

how can resolve exception 2 kafkaproducercontexts?

you should not use .get() on kafkaproducermessagehandlerspec , let framework work out environment you.

the issue because kafkaproducermessagehandlerspec implements componentsregistration , no body cares the:

public collection<object> getcomponentstoregister() {     this.kafkaproducercontext.setproducerconfigurations(this.producerconfigurations);     return collections.<object>singleton(this.kafkaproducercontext); } 

after manual .get() invocation.

i agree, inconvenience , should find better solution end-application, there no yet choice, unless follow spec style framework components, kafka.outboundchanneladapter().

hope clear.

update

ok, it's issue on our side. , fix soon: https://jira.spring.io/browse/intext-216 https://jira.spring.io/browse/intext-217

meanwhile workaround this:

 kafkaproducercontext kafkaproducercontext = (kafkaproducercontext) kafkaproducermessagehandlerspec.getcomponentstoregister().iterator().next();  kafkaproducercontext.setbeanname(null); 

where should move

kafka.outboundchanneladapter(kafkaconfig)                                     .addproducer(firstmetadata(), brokeraddress) 

to separate private method access kafkaproducercontext.


Comments

Popular posts from this blog

how to insert data php javascript mysql with multiple array session 2 -

multithreading - Exception in Application constructor -

windows - CertCreateCertificateContext returns CRYPT_E_ASN1_BADTAG / 8009310b -