当前位置 : 首页 » 互动问答 » 正文

Kafka Spring和多个kafkaListenerContainerFactory

分类 : 互动问答 | 发布时间 : 2018-04-27 15:40:30 | 评论 : 1 | 浏览 : 252 | 喜欢 : 0

I have the following Kafka Spring consumer configuration:

@Configuration
public class KafkaConsumerTestConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String consumerGroupId;

    @Bean
    public Map<String, Object> consumerConfigs() {

        Map<String, Object> props = new HashMap<>();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

        return props;
    }

    @Bean
    public ConsumerFactory<String, ImportDecisionMessage> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(ImportDecisionMessage.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, ImportDecisionMessage> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, ImportDecisionMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

}

So far everything works well.

Now, I'd like another type of message, let's say Review. I change my configuration to the following one:

@Configuration
public class KafkaConsumerTestConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String consumerGroupId;

    @Bean
    public Map<String, Object> consumerConfigs() {

        Map<String, Object> props = new HashMap<>();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

        return props;
    }

    @Bean
    public ConsumerFactory<String, ImportDecisionMessage> importDecisionMessageConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(ImportDecisionMessage.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, ImportDecisionMessage> importDecisionMessageKafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, ImportDecisionMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(importDecisionMessageConsumerFactory());

        return factory;
    }

    @Bean
    public ConsumerFactory<String, Review> reviewConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(Review.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Review> reviewKafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, Review> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(reviewConsumerFactory());

        return factory;
    }

}

and my Spring Boot application fails with the following exception:

Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found.
    - Bean method 'kafkaConsumerFactory' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.ConsumerFactory; SearchStrategy: all) found beans of type'org.springframework.kafka.core.ConsumerFactory'importDecisionMessageConsumerFactory


Action:

Consider revisiting the conditions above or defining a bean of type 'org.springframework.kafka.core.ConsumerFactory' in your configuration.

2017-11-23 18:19:50.578 ERROR 14264 --- [           main] o.s.test.context.TestContextManager      : Caught exception while allowing TestExecutionListener [org.springframework.test.context.web.ServletTestExecutionListener@6989da5e] to prepare test instance [com.decisionwanted.domain.DecisionCharacteristicIT@535a6697]

java.lang.IllegalStateException: Failed to load ApplicationContext
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:125) ~[spring-test-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    at org.springframework.test.context.support.DefaultTestContext.getApplicationContext(DefaultTestContext.java:107) ~[spring-test-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    at org.springframework.test.context.web.ServletTestExecutionListener.setUpRequestContextIfNecessary(ServletTestExecutionListener.java:190) ~[spring-test-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    at org.springframework.test.context.web.ServletTestExecutionListener.prepareTestInstance(ServletTestExecutionListener.java:132) ~[spring-test-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    at org.springframework.test.context.TestContextManager.prepareTestInstance(TestContextManager.java:242) ~[spring-test-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.createTest(SpringJUnit4ClassRunner.java:227) [spring-test-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner$1.runReflectiveCall(SpringJUnit4ClassRunner.java:289) [spring-test-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) [junit-4.12.jar:4.12]
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.methodBlock(SpringJUnit4ClassRunner.java:291) [spring-test-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:246) [spring-test-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97) [spring-test-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) [junit-4.12.jar:4.12]
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) [junit-4.12.jar:4.12]
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) [junit-4.12.jar:4.12]
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) [junit-4.12.jar:4.12]
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) [junit-4.12.jar:4.12]
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61) [spring-test-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70) [spring-test-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363) [junit-4.12.jar:4.12]
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190) [spring-test-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86) [.cp/:na]
    at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) [.cp/:na]
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459) [.cp/:na]
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:678) [.cp/:na]
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382) [.cp/:na]
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192) [.cp/:na]
Caused by: org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'kafkaListenerContainerFactory' defined in class path resource [org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.class]: Unsatisfied dependency expressed through method 'kafkaListenerContainerFactory' parameter 1; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ConsumerFactory<java.lang.Object, java.lang.Object>' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}
    at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:723) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:458) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1249) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1098) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:545) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:502) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:312) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:228) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:310) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:756) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:868) ~[spring-context-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:549) ~[spring-context-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:751) ~[spring-boot-2.0.0.M6.jar:2.0.0.M6]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:387) ~[spring-boot-2.0.0.M6.jar:2.0.0.M6]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:327) ~[spring-boot-2.0.0.M6.jar:2.0.0.M6]
    at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:138) ~[spring-boot-test-2.0.0.M6.jar:2.0.0.M6]
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:99) ~[spring-test-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:117) ~[spring-test-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    ... 25 common frames omitted
Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ConsumerFactory<java.lang.Object, java.lang.Object>' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {}
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.raiseNoMatchingBeanFound(DefaultListableBeanFactory.java:1502) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1099) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1060) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:809) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:715) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE]
    ... 43 common frames omitted

What am I doing wrong and how to fix it ? Why do I need to configure this default kafkaListenerContainerFactory ? Is it possible to avoid it?

回答(1)

  • 1楼
  • 您可以从您的Spring Boot配置中排除 KafkaAutoConfiguration 。或者使用适当的bean名称调用其中一个 KafkaListenerContainerFactory 以满足条件:

     @ConditionalOnMissingBean(name =“kafkaListenerContainerFactory”)

相关阅读: