서론

통합테스트를 진행하던 도중 Docker로 띄워진 Kafka를 이용하게 되면

외부 상황에 따라 테스트가 깨질 수 있기 때문에 올바른 통합테스트라고 볼 수 없게 된다.

그렇기 때문에 Spring에서 제공해주는 내장 Kafka 사용해보는걸 정리하려고 한다.

Set Up

// kafka test
testImplementation 'org.springframework.kafka:spring-kafka-test'

위의 의존성을 추가한다. (버전 n부터는 default 추가라는것 같기도 하다)

@DirtiesContext
@EmbeddedKafka(
        topics = "TestTopic" // 이 topics를 반드시 명시해야함
        , partitions = 1,
        brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"}
)
@SpringBootTest
public class KafkaIntegrationTest {}

if use Schema Registry

Schema Registry를 따로 띄워주는것 같진 않아서 mocking 할 수 있는 방법을 찾았다.

@Configuration
public class TestSchemaRegistry {

    @Bean
    public SchemaRegistryClient schemaRegistryClient() {
        SchemaProvider schemaProvider = new ProtobufSchemaProvider();
        return new MockSchemaRegistryClient(List.of(schemaProvider));
    }
}

위와 같이 test 패키지 안에서 MockSchemaRegistry를 반환하도록 Bean 등록을 해준다.

Config setting

configs.put("schema.registry.url", "mock://not-used");

producer, consumer 모두 config를 등록할 때 schema.registry.url → “mock://not-used” 넣어야 함

Config는 따로 클래스 파서 Producer(KafkaTemplate), Consumer(KafkaListener) 잘 등록해주면 된다.

Test

	// 코드 생략...    
		private static final String TEST_TOPIC = "TestTopic";
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final KafkaConsumer kafkaConsumer;
    @MockBean
    private Acknowledgment acknowledgment;

    @Autowired
    public KafkaIntegrationTest(
            KafkaTemplate<String, String> kafkaTemplate,
            KafkaConsumer kafkaConsumer
    ) {
        this.kafkaTemplate = kafkaTemplate;
        this.kafkaConsumer = kafkaConsumer;
    }

    @Test
    public void 토픽_발행_테스트() {
        log.error("Try to produce topic : {}", TEST_TOPIC);
        kafkaTemplate.send(
                TEST_TOPIC,
                "string"
        );
}

이렇게 평소에 하던대로 topic을 발행하면