๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ

Kafka Streams์™€ ์Šคํ”„๋ง ๋ถ€ํŠธ ํ†ตํ•ฉ: ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ๊ตฌ์ถ•ํ•˜๊ธฐ

okrestart 2024. 10. 23.

 

Apache Kafka Streams ๋Š” ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆฌ๋ฐ์„ ์ฒ˜๋ฆฌํ•˜๋Š” ๊ฐ•๋ ฅํ•œ API๋กœ, Kafka์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์‹ค์‹œ๊ฐ„์œผ๋กœ ์ฒ˜๋ฆฌํ•˜๊ณ  ๋ณ€ํ™˜ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด๋ฒˆ ํฌ์ŠคํŒ…์—์„œ๋Š” Kafka Streams API ์™€ ์Šคํ”„๋ง ๋ถ€ํŠธ ๋ฅผ ํ†ตํ•ฉํ•˜์—ฌ ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ์„ ๊ตฌ์ถ•ํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ์„ค๋ช…ํ•ฉ๋‹ˆ๋‹ค. ๋˜ํ•œ ์ƒํƒœ ์ €์žฅ ๋ณ€ํ™˜ ๊ณผ ์ธํ„ฐ๋ž™ํ‹ฐ๋ธŒ ์ฟผ๋ฆฌ ๋ฅผ ํ†ตํ•ด ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์—์„œ ์ƒํƒœ ์ €์žฅ์†Œ ๋ฅผ ์–ด๋–ป๊ฒŒ ํ™œ์šฉํ•˜๋Š”์ง€๋„ ์‚ดํŽด๋ด…๋‹ˆ๋‹ค.

 


 

 

1. Kafka Streams API ์†Œ๊ฐœ

Kafka Streams ๋Š” Apache Kafka ์œ„์—์„œ ๋™์ž‘ํ•˜๋Š” ์ŠคํŠธ๋ฆผ ์ฒ˜๋ฆฌ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ ๋กœ, ๋ฐ์ดํ„ฐ๋ฅผ ์‹ค์‹œ๊ฐ„์œผ๋กœ ๋ณ€ํ™˜, ํ•„ํ„ฐ๋ง, ์ง‘๊ณ„ ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ํŠนํžˆ ๋ถ„์‚ฐ ํ™˜๊ฒฝ ์—์„œ ๋™์ž‘ํ•  ์ˆ˜ ์žˆ์–ด ๋Œ€๊ทœ๋ชจ ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆฌ๋ฐ ์ฒ˜๋ฆฌ์— ์ ํ•ฉํ•ฉ๋‹ˆ๋‹ค.

Kafka Streams์˜ ์ฃผ์š” ๊ธฐ๋Šฅ

  1. ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ : ๋ฉ”์‹œ์ง€๋ฅผ ์‹ค์‹œ๊ฐ„์œผ๋กœ ๋ณ€ํ™˜ํ•˜๊ณ  ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
  2. ๋ถ„์‚ฐ ์‹œ์Šคํ…œ : Kafka ํด๋Ÿฌ์Šคํ„ฐ ๋‚ด์—์„œ ํ™•์žฅ ๊ฐ€๋Šฅํ•˜๊ฒŒ ๋™์ž‘ํ•ฉ๋‹ˆ๋‹ค.
  3. ์ƒํƒœ ์ €์žฅ : ์ƒํƒœ ์ €์žฅ์†Œ ๋ฅผ ํ†ตํ•ด ๋ฐ์ดํ„ฐ์˜ ์ง‘๊ณ„ ๋‚˜ ๋ณ€ํ™˜ ๊ฒฐ๊ณผ ๋ฅผ ๋ฉ”๋ชจ๋ฆฌ๋‚˜ ๋””์Šคํฌ์— ์ €์žฅํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์˜ˆ์‹œ: Kafka Streams๋กœ ๋‹จ์ˆœ ๋ฉ”์‹œ์ง€ ํ•„ํ„ฐ๋ง

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("input_topic");

KStream<String, String> filteredStream = stream.filter((key, value) -> value.contains("Kafka"));

filteredStream.to("output_topic");

 


 

2. Kafka Streams ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ์„ค์ •

Kafka Streams๋ฅผ ์Šคํ”„๋ง ๋ถ€ํŠธ์™€ ํ†ตํ•ฉํ•˜๋ ค๋ฉด Spring Kafka ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์„ค์ •ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์Šคํ”„๋ง ๋ถ€ํŠธ์—์„œ๋Š” Kafka Streams ๋ฅผ ์†์‰ฝ๊ฒŒ ์„ค์ •ํ•  ์ˆ˜ ์žˆ๋„๋ก ๋‹ค์–‘ํ•œ ๊ธฐ๋Šฅ์„ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค.

์Šคํ”„๋ง ๋ถ€ํŠธ ์„ค์ • ์˜ˆ์‹œ ( application.yml )

spring:
  kafka:
    streams:
      application-id: my-kafka-streams-app
      bootstrap-servers: localhost:9092
      default-key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde
      default-value-serde: org.apache.kafka.common.serialization.Serdes$StringSerde

Kafka Streams ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ์˜ˆ์‹œ

@EnableKafkaStreams
@SpringBootApplication
public class KafkaStreamsApp {

    public static void main(String[] args) {
        SpringApplication.run(KafkaStreamsApp.class, args);
    }

    @Bean
    public KStream<String, String> kStream(StreamsBuilder streamsBuilder) {
        KStream<String, String> stream = streamsBuilder.stream("input_topic");

        KStream<String, String> uppercasedStream = stream.mapValues(value -> value.toUpperCase());
        uppercasedStream.to("output_topic");

        return uppercasedStream;
    }
}

์„ค๋ช…:

  • @EnableKafkaStreams : Kafka Streams ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์„ ํ™œ์„ฑํ™”ํ•ฉ๋‹ˆ๋‹ค.
  • StreamsBuilder : ์ŠคํŠธ๋ฆผ์„ ์ƒ์„ฑํ•˜๊ณ  ์ฒ˜๋ฆฌํ•˜๋Š” API.
  • mapValues() : ๊ฐ ๋ฉ”์‹œ์ง€์˜ ๊ฐ’์„ ๋Œ€๋ฌธ์ž๋กœ ๋ณ€ํ™˜ํ•˜๋Š” ๊ฐ„๋‹จํ•œ ์˜ˆ์‹œ์ž…๋‹ˆ๋‹ค.

 


 

3. ์ƒํƒœ ์ €์žฅ ๋ณ€ํ™˜ ์ˆ˜ํ–‰ํ•˜๊ธฐ

Kafka Streams๋Š” ์ƒํƒœ ์ €์žฅ ๋ณ€ํ™˜ ์„ ํ†ตํ•ด ๋ฐ์ดํ„ฐ๋ฅผ ์ง‘๊ณ„ ํ•˜๊ฑฐ๋‚˜ ๋ณ€ํ™˜ ํ•œ ๊ฒฐ๊ณผ๋ฅผ ๋กœ์ปฌ ์ƒํƒœ ์ €์žฅ์†Œ ์— ์ €์žฅํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด๋Ÿฌํ•œ ์ƒํƒœ ์ €์žฅ์€ ์„ธ์…˜ , ์ง‘๊ณ„ ์™€ ๊ฐ™์€ ๊ณ ๊ธ‰ ์ŠคํŠธ๋ฆผ ์ฒ˜๋ฆฌ๋ฅผ ๊ฐ€๋Šฅํ•˜๊ฒŒ ํ•ฉ๋‹ˆ๋‹ค.

์ƒํƒœ ์ €์žฅ ์˜ˆ์‹œ: ๋ฉ”์‹œ์ง€ ์ง‘๊ณ„

KGroupedStream<String, String> groupedStream = stream.groupByKey();
KTable<String, Long> aggregatedTable = groupedStream.count(Materialized.as("counts-store"));

aggregatedTable.toStream().to("aggregated_output_topic");

์„ค๋ช…:

  • groupByKey() : ํ‚ค๋ฅผ ๊ธฐ์ค€์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ๊ทธ๋ฃนํ™”ํ•ฉ๋‹ˆ๋‹ค.
  • count() : ๊ฐ ํ‚ค์˜ ๋ฐœ์ƒ ํšŸ์ˆ˜๋ฅผ ์ง‘๊ณ„ํ•ฉ๋‹ˆ๋‹ค.
  • Materialized.as() : ์ง‘๊ณ„ ๊ฒฐ๊ณผ๋ฅผ ์ƒํƒœ ์ €์žฅ์†Œ ์— ์ €์žฅํ•ฉ๋‹ˆ๋‹ค.

 


 

4. ์ธํ„ฐ๋ž™ํ‹ฐ๋ธŒ ์ฟผ๋ฆฌ์™€ ์ƒํƒœ ์ €์žฅ์†Œ ํ™œ์šฉ

Kafka Streams ์˜ ๊ฐ•๋ ฅํ•œ ๊ธฐ๋Šฅ ์ค‘ ํ•˜๋‚˜๋Š” ์ƒํƒœ ์ €์žฅ์†Œ ์— ์ €์žฅ๋œ ๋ฐ์ดํ„ฐ๋ฅผ ์ธํ„ฐ๋ž™ํ‹ฐ๋ธŒ ์ฟผ๋ฆฌ ๋ฅผ ํ†ตํ•ด ์‹ค์‹œ๊ฐ„์œผ๋กœ ์กฐํšŒํ•  ์ˆ˜ ์žˆ๋‹ค๋Š” ์ ์ž…๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด, ํŠน์ • ํ‚ค์˜ ์ง‘๊ณ„ ๊ฒฐ๊ณผ๋ฅผ API๋กœ ์ œ๊ณตํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์ƒํƒœ ์ €์žฅ์†Œ์—์„œ ์ฟผ๋ฆฌ ์ˆ˜ํ–‰ ์˜ˆ์‹œ

@RestController
public class QueryController {

    @Autowired
    private QueryableStoreRegistry queryableStoreRegistry;

    @GetMapping("/count/{key}")
    public Long getCount(@PathVariable String key) {
        ReadOnlyKeyValueStore<String, Long> keyValueStore =
                queryableStoreRegistry.getQueryableStoreType("counts-store", QueryableStoreTypes.keyValueStore());

        return keyValueStore.get(key);
    }
}

์„ค๋ช…:

  • QueryableStoreRegistry : Kafka Streams์—์„œ ์ƒํƒœ ์ €์žฅ์†Œ์— ์ฟผ๋ฆฌํ•  ์ˆ˜ ์žˆ๋Š” API์ž…๋‹ˆ๋‹ค.
  • ReadOnlyKeyValueStore : ์ƒํƒœ ์ €์žฅ์†Œ์—์„œ ํŠน์ • ํ‚ค์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์–ด์˜ค๋Š” API์ž…๋‹ˆ๋‹ค.

 


 

Kafka Streams์™€ ์Šคํ”„๋ง ๋ถ€ํŠธ ํ†ตํ•ฉ ํŠธ๋ Œ๋“œ ๋ฐ ํ†ต๊ณ„

์ตœ๊ทผ ๋ฐ์ดํ„ฐ์— ๋”ฐ๋ฅด๋ฉด, Kafka Streams ๋ฅผ ์‚ฌ์šฉํ•œ ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์€ ๊ธฐ์—…์—์„œ ๋ฐ์ดํ„ฐ ๋ถ„์„ , ๋ชจ๋‹ˆํ„ฐ๋ง , ์‹ค์‹œ๊ฐ„ ๋Œ€์‹œ๋ณด๋“œ ๊ตฌ์ถ• ๋“ฑ์— ๋„๋ฆฌ ์‚ฌ์šฉ๋˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค. ์‹ค์‹œ๊ฐ„ ์ŠคํŠธ๋ฆฌ๋ฐ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ๋ฅผ ๋„์ž…ํ•œ ๊ธฐ์—…์˜ ๋น„์ฆˆ๋‹ˆ์Šค ์˜์‚ฌ๊ฒฐ์ • ์†๋„ ๋Š” ํ‰๊ท ์ ์œผ๋กœ 30% ์ด์ƒ ํ–ฅ์ƒ ๋˜์—ˆ๋‹ค๊ณ  ๋ณด๊ณ ๋ฉ๋‹ˆ๋‹ค.

 


 

์ž์ฃผ ๋ฌป๋Š” ์งˆ๋ฌธ (FAQ)

Q1. Kafka Streams์™€ ์Šคํ”„๋ง ๋ถ€ํŠธ๋ฅผ ํ†ตํ•ฉํ•˜๋ ค๋ฉด ์–ด๋–ค ์„ค์ •์ด ํ•„์š”ํ•œ๊ฐ€์š”?
A1. Spring Kafka ์„ค์ •์—์„œ Kafka Streams ๊ด€๋ จ ์˜ต์…˜์„ ์„ค์ •ํ•˜๊ณ , @EnableKafkaStreams ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ Kafka Streams ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์„ ํ™œ์„ฑํ™”ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

Q2. Kafka Streams์—์„œ ์ƒํƒœ ์ €์žฅ ๋ณ€ํ™˜์€ ์–ด๋–ป๊ฒŒ ์ˆ˜ํ–‰๋˜๋‚˜์š”?
A2. Materialized.as() ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋ณ€ํ™˜ ๊ฒฐ๊ณผ๋ฅผ ์ƒํƒœ ์ €์žฅ์†Œ ์— ์ €์žฅํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ƒํƒœ ์ €์žฅ ๋ณ€ํ™˜์€ ์ง‘๊ณ„, ์œˆ๋„์šฐ ์ฒ˜๋ฆฌ ๋“ฑ์˜ ๋ณต์žกํ•œ ์ŠคํŠธ๋ฆผ ์ฒ˜๋ฆฌ๋ฅผ ๊ฐ€๋Šฅํ•˜๊ฒŒ ํ•ฉ๋‹ˆ๋‹ค.

Q3. ์ธํ„ฐ๋ž™ํ‹ฐ๋ธŒ ์ฟผ๋ฆฌ๋Š” ๋ฌด์—‡์ธ๊ฐ€์š”?
A3. ์ธํ„ฐ๋ž™ํ‹ฐ๋ธŒ ์ฟผ๋ฆฌ ๋Š” Kafka Streams์˜ ์ƒํƒœ ์ €์žฅ์†Œ ์— ์ €์žฅ๋œ ๋ฐ์ดํ„ฐ๋ฅผ API๋ฅผ ํ†ตํ•ด ์‹ค์‹œ๊ฐ„์œผ๋กœ ์กฐํšŒํ•  ์ˆ˜ ์žˆ๋Š” ๊ธฐ๋Šฅ์ž…๋‹ˆ๋‹ค. ์ด๋ฅผ ํ†ตํ•ด ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ์™ธ๋ถ€์—์„œ ์ƒํƒœ ์ €์žฅ๋œ ๋ฐ์ดํ„ฐ๋ฅผ ํ™œ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

Q4. Kafka Streams ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์—์„œ ์ƒํƒœ ์ €์žฅ์†Œ๋ฅผ ์–ด๋–ป๊ฒŒ ํ™œ์šฉํ•  ์ˆ˜ ์žˆ๋‚˜์š”?
A4. ์ƒํƒœ ์ €์žฅ์†Œ๋Š” ์„ธ์…˜ ์œ ์ง€ , ๋ฐ์ดํ„ฐ ์ง‘๊ณ„ , ๋ณ€ํ™˜ ๊ฒฐ๊ณผ ์ €์žฅ ๋“ฑ ๋‹ค์–‘ํ•œ ์šฉ๋„๋กœ ์‚ฌ์šฉ๋ฉ๋‹ˆ๋‹ค. ์ฟผ๋ฆฌ ๋ฅผ ํ†ตํ•ด ์‹ค์‹œ๊ฐ„์œผ๋กœ ์ €์žฅ๋œ ๋ฐ์ดํ„ฐ๋ฅผ ์กฐํšŒํ•  ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค.

Q5. Kafka Streams์™€ ์Šคํ”„๋ง ๋ถ€ํŠธ๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ์ฃผ๋œ ์ด์œ ๋Š” ๋ฌด์—‡์ธ๊ฐ€์š”?
A5. Kafka Streams์™€ ์Šคํ”„๋ง ๋ถ€ํŠธ์˜ ํ†ตํ•ฉ์€ ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ๋ฅผ ๊ฐ„ํŽธํ•˜๊ฒŒ ๊ตฌํ˜„ํ•  ์ˆ˜ ์žˆ๋Š” ๊ฐ•๋ ฅํ•œ ์†”๋ฃจ์…˜์„ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค. ์Šคํ”„๋ง ๋ถ€ํŠธ์˜ ์œ ์—ฐํ•œ ์„ค์ •๊ณผ Kafka Streams์˜ ๊ฐ•๋ ฅํ•œ ์ŠคํŠธ๋ฆผ ์ฒ˜๋ฆฌ ๊ธฐ๋Šฅ์ด ๊ฒฐํ•ฉ๋˜์–ด ๋Œ€๊ทœ๋ชจ ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์„ ์‰ฝ๊ฒŒ ๊ตฌ์ถ•ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๋Œ“๊ธ€