Consumer Config와 @KafkaListener

@Bean
    fun consumerFactory(): ConsumerFactory<String, SlackReport> {
        val config = mapOf(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092",
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ErrorHandlingDeserializer::class.java,
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ErrorHandlingDeserializer::class.java,
            ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS to StringDeserializer::class.java,
            ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS to JsonDeserializer::class.java,
            JsonDeserializer.TRUSTED_PACKAGES to "*"
        )
        return DefaultKafkaConsumerFactory(
            config,
            StringDeserializer(),
            JsonDeserializer(SlackReport::class.java, false)
        )
    }

    @Bean
    fun kafkaListener(): ConcurrentKafkaListenerContainerFactory<String, SlackReport> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, SlackReport>()
        factory.consumerFactory = consumerFactory()
        return factory
    }

위와 같은 방식으로 하나의 Consumer당 하나의 Config를 설정해줘야 한다.

이때 factory 변수를 return하는 메소드의 이름 == @KafkaListner에서의 containerFactory 이름

@KafkaListener(
        topics = [SlackReportProducerService.TOPIC_NAME],
        groupId = "testGroup",
        containerFactory = "kafkaListener"
    )