λ³Έλ¬Έ λ°”λ‘œκ°€κΈ°

Kafka μ• ν”Œλ¦¬μΌ€μ΄μ…˜ ν…ŒμŠ€νŠΈν•˜κΈ°: λ‹¨μœ„ ν…ŒμŠ€νŠΈλΆ€ν„° 톡합 ν…ŒμŠ€νŠΈκΉŒμ§€

okrestart 2024. 10. 23.

 

Apache Kafka λŠ” λŒ€κ·œλͺ¨ 데이터 슀트리밍 μ• ν”Œλ¦¬μΌ€μ΄μ…˜μ—μ„œ 널리 μ‚¬μš©λ˜λ©°, μ΄λŸ¬ν•œ μ• ν”Œλ¦¬μΌ€μ΄μ…˜μ˜ ν…ŒμŠ€νŠΈ λŠ” 맀우 μ€‘μš”ν•©λ‹ˆλ‹€. 이번 ν¬μŠ€νŒ…μ—μ„œλŠ” ν”„λ‘œλ“€μ„œμ™€ 컨슈머의 λ‹¨μœ„ ν…ŒμŠ€νŠΈ , Embedded Kafkaλ₯Ό ν™œμš©ν•œ 톡합 ν…ŒμŠ€νŠΈ , Kafka λͺ¨μ˜ 객체(Mock) λ₯Ό μ‚¬μš©ν•œ ν…ŒμŠ€νŠΈ 방법, 그리고 비동기 μ½”λ“œ ν…ŒμŠ€νŠΈλ₯Ό μœ„ν•œ λͺ¨λ²” 사둀에 λŒ€ν•΄ λ‹€λ£Ήλ‹ˆλ‹€.

 


 

 

1. ν”„λ‘œλ“€μ„œμ™€ 컨슈머의 λ‹¨μœ„ ν…ŒμŠ€νŠΈ

Kafka μ• ν”Œλ¦¬μΌ€μ΄μ…˜μ—μ„œ ν”„λ‘œλ“€μ„œ 와 컨슈머 의 λ™μž‘μ„ λ‹¨μœ„ ν…ŒμŠ€νŠΈ 둜 ν™•μΈν•˜λŠ” 것은 맀우 μ€‘μš”ν•©λ‹ˆλ‹€. Mockito 와 같은 λͺ¨μ˜ 객체(Mock)λ₯Ό ν™œμš©ν•˜μ—¬ Kafka λΈŒλ‘œμ»€μ™€μ˜ μ‹€μ œ μ—°κ²° 없이도 λ‹¨μœ„ ν…ŒμŠ€νŠΈλ₯Ό μˆ˜ν–‰ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

ν”„λ‘œλ“€μ„œ λ‹¨μœ„ ν…ŒμŠ€νŠΈ μ˜ˆμ‹œ

@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaProducerTest {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @MockBean
    private KafkaProducer producer;

    @Test
    public void testSendMessage() {
        String message = "Test message";
        kafkaTemplate.send("test_topic", message);

        // λ©”μ„œλ“œ 호좜 확인
        verify(producer).send(eq("test_topic"), eq(message));
    }
}

컨슈머 λ‹¨μœ„ ν…ŒμŠ€νŠΈ μ˜ˆμ‹œ

@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaConsumerTest {

    @MockBean
    private KafkaConsumer consumer;

    @Test
    public void testConsumeMessage() {
        String message = "Test message";

        // λ©”μ‹œμ§€ 처리 λ©”μ„œλ“œ 호좜 확인
        consumer.consume(message);
        verify(consumer).consume(eq(message));
    }
}

μ„€λͺ…:

  • @MockBean : Kafka λΈŒλ‘œμ»€μ™€μ˜ μ‹€μ œ 톡신 없이, ν”„λ‘œλ“€μ„œ 와 컨슈머 λ₯Ό λͺ¨μ˜ 객체 둜 ν…ŒμŠ€νŠΈν•  수 μžˆμŠ΅λ‹ˆλ‹€.
  • verify() : Mockito λ₯Ό μ‚¬μš©ν•˜μ—¬ νŠΉμ • λ©”μ„œλ“œκ°€ ν˜ΈμΆœλ˜μ—ˆλŠ”μ§€ ν™•μΈν•©λ‹ˆλ‹€.

 


 

2. Embedded Kafkaλ₯Ό ν™œμš©ν•œ 톡합 ν…ŒμŠ€νŠΈ

Embedded Kafka λŠ” Kafka 브둜컀 λ₯Ό ν…ŒμŠ€νŠΈ ν™˜κ²½μ—μ„œ μ‹€μ œλ‘œ μ‹€ν–‰ν•˜μ—¬ 톡합 ν…ŒμŠ€νŠΈ λ₯Ό μˆ˜ν–‰ν•  수 있게 ν•©λ‹ˆλ‹€. 이λ₯Ό 톡해 Kafka λΈŒλ‘œμ»€μ™€ μ‹€μ œ 데이터λ₯Ό μ£Όκ³ λ°›μœΌλ©° ν…ŒμŠ€νŠΈν•  수 μžˆμŠ΅λ‹ˆλ‹€.

Embedded Kafka μ„€μ • 및 톡합 ν…ŒμŠ€νŠΈ μ˜ˆμ‹œ

@RunWith(SpringRunner.class)
@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"test_topic"})
public class KafkaIntegrationTest {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private KafkaConsumer consumer;

    @Test
    public void testSendAndConsumeMessage() throws Exception {
        String message = "Test message";

        kafkaTemplate.send("test_topic", message);

        // μ»¨μŠˆλ¨Έκ°€ λ©”μ‹œμ§€λ₯Ό λ°›μ•˜λŠ”μ§€ 확인
        Awaitility.await()
                  .atMost(10, TimeUnit.SECONDS)
                  .untilAsserted(() -> verify(consumer).consume(eq(message)));
    }
}

μ„€λͺ…:

  • @EmbeddedKafka : 톡합 ν…ŒμŠ€νŠΈ μ‹œ Kafka 브둜컀 λ₯Ό ν…ŒμŠ€νŠΈ ν™˜κ²½μ—μ„œ μ‹€ν–‰ν•©λ‹ˆλ‹€.
  • Awaitility.await() : 비동기 λ©”μ‹œμ§€ 처리 κ°€ μ™„λ£Œλ  λ•ŒκΉŒμ§€ λŒ€κΈ°ν•˜λŠ” λ©”μ„œλ“œμž…λ‹ˆλ‹€.

 


 

3. ν…ŒμŠ€νŠΈμ—μ„œ Kafka λͺ¨μ˜ 객체 μ‚¬μš©

Mockito 와 같은 ν”„λ ˆμž„μ›Œν¬λ₯Ό ν™œμš©ν•˜μ—¬ Kafka λΈŒλ‘œμ»€μ™€μ˜ μ‹€μ œ μ—°κ²° 없이 λͺ¨μ˜ 객체(Mock) λ₯Ό μ‚¬μš©ν•΄ Kafka κΈ°λŠ₯을 ν…ŒμŠ€νŠΈν•  수 μžˆμŠ΅λ‹ˆλ‹€. 이λ₯Ό 톡해 λΉ λ₯΄κ²Œ λ‹¨μœ„ ν…ŒμŠ€νŠΈ λ₯Ό μˆ˜ν–‰ν•  수 있으며, μ™ΈλΆ€ μ˜μ‘΄μ„±μ— ꡬ애받지 μ•Šκ³  λ…λ¦½μ μœΌλ‘œ ν…ŒμŠ€νŠΈν•  수 μžˆμŠ΅λ‹ˆλ‹€.

KafkaTemplate λͺ¨μ˜ 객체 ν…ŒμŠ€νŠΈ μ˜ˆμ‹œ

@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaProducerMockTest {

    @MockBean
    private KafkaTemplate<String, String> kafkaTemplate;

    @Test
    public void testSendMessage() {
        String message = "Mock message";

        // KafkaTemplate을 μ‚¬μš©ν•œ λ©”μ‹œμ§€ 전솑 ν…ŒμŠ€νŠΈ
        kafkaTemplate.send("mock_topic", message);

        // λ©”μ„œλ“œ 호좜 확인
        verify(kafkaTemplate).send(eq("mock_topic"), eq(message));
    }
}

μ„€λͺ…:

  • KafkaTemplate 을 Mock 으둜 μ²˜λ¦¬ν•˜μ—¬ μ‹€μ œ Kafka 브둜컀 없이 ν…ŒμŠ€νŠΈν•  수 μžˆμŠ΅λ‹ˆλ‹€.
  • verify() λ©”μ„œλ“œλ₯Ό 톡해 λ©”μ‹œμ§€κ°€ μ„±κ³΅μ μœΌλ‘œ μ „μ†‘λ˜μ—ˆλŠ”μ§€ ν™•μΈν•©λ‹ˆλ‹€.

 


 

4. 비동기 μ½”λ“œ ν…ŒμŠ€νŠΈλ₯Ό μœ„ν•œ λͺ¨λ²” 사둀

Kafka μ• ν”Œλ¦¬μΌ€μ΄μ…˜μ€ 비동기 처리 κ°€ 일반적이기 λ•Œλ¬Έμ—, 비동기 μ½”λ“œλ₯Ό 효과적으둜 ν…ŒμŠ€νŠΈ ν•˜λŠ” 것이 μ€‘μš”ν•©λ‹ˆλ‹€. 비동기 μ½”λ“œ ν…ŒμŠ€νŠΈ μ‹œ ν…ŒμŠ€νŠΈ μ‹€νŒ¨ λ₯Ό λ°©μ§€ν•˜κΈ° μœ„ν•΄ λŒ€κΈ° μ‹œκ°„ 을 적절히 μ„€μ •ν•˜κ±°λ‚˜ Promise , Future 와 같은 비동기 κ°œλ…μ„ λ‹€λ£¨λŠ” 방법이 ν•„μš”ν•©λ‹ˆλ‹€.

비동기 ν…ŒμŠ€νŠΈ μ˜ˆμ‹œ

@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaAsyncTest {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Test
    public void testAsyncSendMessage() throws Exception {
        CompletableFuture<SendResult<String, String>> future =
                kafkaTemplate.send("async_topic", "Async message");

        // 비동기 κ²°κ³Ό 확인
        SendResult<String, String> result = future.get(5, TimeUnit.SECONDS);

        assertNotNull(result.getRecordMetadata());
        assertEquals("async_topic", result.getRecordMetadata().topic());
    }
}

μ„€λͺ…:

  • CompletableFuture : 비동기 μž‘μ—… 이 μ™„λ£Œλ  λ•ŒκΉŒμ§€ 기닀리며, λ©”μ‹œμ§€κ°€ μ •μƒμ μœΌλ‘œ μ „μ†‘λ˜μ—ˆλŠ”μ§€ ν™•μΈν•©λ‹ˆλ‹€.
  • future.get(5, TimeUnit.SECONDS) : μ΅œλŒ€ 5초 λ™μ•ˆ κ²°κ³Όλ₯Ό κΈ°λ‹€λ¦½λ‹ˆλ‹€.

 


 

Kafka ν…ŒμŠ€νŠΈ νŠΈλ Œλ“œ 및 톡계

졜근 데이터에 λ”°λ₯΄λ©΄, λŒ€κ·œλͺ¨ Kafka μ• ν”Œλ¦¬μΌ€μ΄μ…˜μ„ μ‚¬μš©ν•˜λŠ” 기업듀은 λ‹¨μœ„ ν…ŒμŠ€νŠΈ 와 톡합 ν…ŒμŠ€νŠΈ λ₯Ό 적극적으둜 λ„μž…ν•˜μ—¬ ν…ŒμŠ€νŠΈ 컀버리지 λ₯Ό 70% 이상 λ‹¬μ„±ν•˜κ³  μžˆμŠ΅λ‹ˆλ‹€. 특히 Embedded Kafka λ₯Ό ν™œμš©ν•œ 톡합 ν…ŒμŠ€νŠΈλŠ” μ‹€μ œ 브둜컀 ν™˜κ²½ μ—μ„œμ˜ ν…ŒμŠ€νŠΈλ₯Ό κ°€λŠ₯ν•˜κ²Œ ν•˜μ—¬ μ‹ λ’°μ„± 을 크게 λ†’μ—¬μ€λ‹ˆλ‹€.

 


 

자주 λ¬»λŠ” 질문 (FAQ)

Q1. Kafka ν”„λ‘œλ“€μ„œμ™€ 컨슈머의 λ‹¨μœ„ ν…ŒμŠ€νŠΈλŠ” μ–΄λ–»κ²Œ μ§„ν–‰ν•˜λ‚˜μš”?
A1. Mockito 와 같은 λͺ¨μ˜ 객체λ₯Ό μ‚¬μš©ν•˜μ—¬ μ‹€μ œ Kafka λΈŒλ‘œμ»€μ™€ μ—°κ²°ν•˜μ§€ μ•Šκ³ , KafkaTemplate κ³Ό KafkaListener λ₯Ό Mock으둜 μ²˜λ¦¬ν•˜μ—¬ λ‹¨μœ„ ν…ŒμŠ€νŠΈλ₯Ό 진행할 수 μžˆμŠ΅λ‹ˆλ‹€.

Q2. Embedded Kafkaλ₯Ό ν™œμš©ν•œ 톡합 ν…ŒμŠ€νŠΈμ˜ μž₯점은 λ¬΄μ—‡μΈκ°€μš”?
A2. Embedded Kafka λŠ” ν…ŒμŠ€νŠΈ ν™˜κ²½μ—μ„œ μ‹€μ œ Kafka 브둜컀λ₯Ό μ‹€ν–‰ν•˜μ—¬ 톡합 ν…ŒμŠ€νŠΈ λ₯Ό μˆ˜ν–‰ν•  수 있게 ν•©λ‹ˆλ‹€. 이λ₯Ό 톡해 ν”„λ‘œλ“€μ„œμ™€ 컨슈머 κ°„μ˜ μ‹€μ œ 데이터 전솑 을 ν…ŒμŠ€νŠΈν•  수 μžˆμŠ΅λ‹ˆλ‹€.

Q3. Kafka μ• ν”Œλ¦¬μΌ€μ΄μ…˜μ—μ„œ 비동기 μ½”λ“œλ₯Ό μ–΄λ–»κ²Œ ν…ŒμŠ€νŠΈν•˜λ‚˜μš”?
A3. Awaitility λ‚˜ CompletableFuture 와 같은 비동기 처리 λ©”μ»€λ‹ˆμ¦˜μ„ μ‚¬μš©ν•˜μ—¬ 비동기 μ½”λ“œ κ°€ μ™„λ£Œλ  λ•ŒκΉŒμ§€ 기닀리고, κ²°κ³Όλ₯Ό ν™•μΈν•˜λŠ” λ°©μ‹μœΌλ‘œ ν…ŒμŠ€νŠΈν•  수 μžˆμŠ΅λ‹ˆλ‹€.

Q4. Kafka λͺ¨μ˜ 객체(Mock)λ₯Ό μ‚¬μš©ν•˜λ©΄ μ–΄λ–€ 이점이 μžˆλ‚˜μš”?
A4. Kafka λͺ¨μ˜ 객체 λ₯Ό μ‚¬μš©ν•˜λ©΄ μ‹€μ œ Kafka λΈŒλ‘œμ»€μ™€ μ—°κ²°ν•  ν•„μš” 없이, ν”„λ‘œλ“€μ„œ 와 컨슈머 의 λ™μž‘μ„ λ…λ¦½μ μœΌλ‘œ ν…ŒμŠ€νŠΈν•  수 μžˆμ–΄ λΉ λ₯΄κ³  κ°„λ‹¨ν•˜κ²Œ ν…ŒμŠ€νŠΈ λ₯Ό 진행할 수 μžˆμŠ΅λ‹ˆλ‹€.

Q5. ν…ŒμŠ€νŠΈμ—μ„œ Kafka 브둜컀 없이 Mock을 μ‚¬μš©ν•˜λŠ” 것이 μ•ˆμ „ν•œκ°€μš”?
A5. λ‹¨μœ„ ν…ŒμŠ€νŠΈ μ—μ„œλŠ” Kafka 브둜컀 없이 λͺ¨μ˜ 객체 λ₯Ό μ‚¬μš©ν•΄λ„ μ•ˆμ „ν•©λ‹ˆλ‹€. κ·ΈλŸ¬λ‚˜ 톡합 ν…ŒμŠ€νŠΈ λ‚˜ μ‹œμŠ€ν…œ ν…ŒμŠ€νŠΈ μ—μ„œλŠ” Embedded Kafka λ₯Ό μ‚¬μš©ν•˜μ—¬ μ‹€μ œ λΈŒλ‘œμ»€μ™€μ˜ μƒν˜Έμž‘μš©μ„ ν…ŒμŠ€νŠΈν•˜λŠ” 것이 μ’‹μŠ΅λ‹ˆλ‹€.

λŒ“κΈ€