@Bean
fun consumerFactory(): ConsumerFactory<String, SlackReport> {
val config = mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ErrorHandlingDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ErrorHandlingDeserializer::class.java,
ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS to StringDeserializer::class.java,
ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS to JsonDeserializer::class.java,
JsonDeserializer.TRUSTED_PACKAGES to "*"
)
return DefaultKafkaConsumerFactory(
config,
StringDeserializer(),
JsonDeserializer(SlackReport::class.java, false)
)
}
@Bean
fun kafkaListener(): ConcurrentKafkaListenerContainerFactory<String, SlackReport> {
val factory = ConcurrentKafkaListenerContainerFactory<String, SlackReport>()
factory.consumerFactory = consumerFactory()
return factory
}
위와 같은 방식으로 하나의 Consumer당 하나의 Config를 설정해줘야 한다.
이때 factory 변수를 return하는 메소드의 이름 == @KafkaListner에서의 containerFactory 이름
@KafkaListener(
topics = [SlackReportProducerService.TOPIC_NAME],
groupId = "testGroup",
containerFactory = "kafkaListener"
)