Kafka Streams์ ์คํ๋ง ๋ถํธ ํตํฉ: ์ค์๊ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ ์ ํ๋ฆฌ์ผ์ด์ ๊ตฌ์ถํ๊ธฐ
Apache Kafka Streams ๋ ์ค์๊ฐ ๋ฐ์ดํฐ ์คํธ๋ฆฌ๋ฐ์ ์ฒ๋ฆฌํ๋ ๊ฐ๋ ฅํ API๋ก, Kafka์ ๋ฐ์ดํฐ๋ฅผ ์ค์๊ฐ์ผ๋ก ์ฒ๋ฆฌํ๊ณ ๋ณํํ ์ ์์ต๋๋ค. ์ด๋ฒ ํฌ์คํ ์์๋ Kafka Streams API ์ ์คํ๋ง ๋ถํธ ๋ฅผ ํตํฉํ์ฌ ์ค์๊ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ ์ ํ๋ฆฌ์ผ์ด์ ์ ๊ตฌ์ถํ๋ ๋ฐฉ๋ฒ์ ์ค๋ช ํฉ๋๋ค. ๋ํ ์ํ ์ ์ฅ ๋ณํ ๊ณผ ์ธํฐ๋ํฐ๋ธ ์ฟผ๋ฆฌ ๋ฅผ ํตํด ์ ํ๋ฆฌ์ผ์ด์ ์์ ์ํ ์ ์ฅ์ ๋ฅผ ์ด๋ป๊ฒ ํ์ฉํ๋์ง๋ ์ดํด๋ด ๋๋ค.
1. Kafka Streams API ์๊ฐ
Kafka Streams ๋ Apache Kafka ์์์ ๋์ํ๋ ์คํธ๋ฆผ ์ฒ๋ฆฌ ๋ผ์ด๋ธ๋ฌ๋ฆฌ ๋ก, ๋ฐ์ดํฐ๋ฅผ ์ค์๊ฐ์ผ๋ก ๋ณํ, ํํฐ๋ง, ์ง๊ณ ํ ์ ์์ต๋๋ค. ํนํ ๋ถ์ฐ ํ๊ฒฝ ์์ ๋์ํ ์ ์์ด ๋๊ท๋ชจ ๋ฐ์ดํฐ ์คํธ๋ฆฌ๋ฐ ์ฒ๋ฆฌ์ ์ ํฉํฉ๋๋ค.
Kafka Streams์ ์ฃผ์ ๊ธฐ๋ฅ
- ์ค์๊ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ : ๋ฉ์์ง๋ฅผ ์ค์๊ฐ์ผ๋ก ๋ณํํ๊ณ ์ฒ๋ฆฌํ ์ ์์ต๋๋ค.
- ๋ถ์ฐ ์์คํ : Kafka ํด๋ฌ์คํฐ ๋ด์์ ํ์ฅ ๊ฐ๋ฅํ๊ฒ ๋์ํฉ๋๋ค.
- ์ํ ์ ์ฅ : ์ํ ์ ์ฅ์ ๋ฅผ ํตํด ๋ฐ์ดํฐ์ ์ง๊ณ ๋ ๋ณํ ๊ฒฐ๊ณผ ๋ฅผ ๋ฉ๋ชจ๋ฆฌ๋ ๋์คํฌ์ ์ ์ฅํ ์ ์์ต๋๋ค.
์์: Kafka Streams๋ก ๋จ์ ๋ฉ์์ง ํํฐ๋ง
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("input_topic");
KStream<String, String> filteredStream = stream.filter((key, value) -> value.contains("Kafka"));
filteredStream.to("output_topic");
2. Kafka Streams ์ ํ๋ฆฌ์ผ์ด์ ์ค์
Kafka Streams๋ฅผ ์คํ๋ง ๋ถํธ์ ํตํฉํ๋ ค๋ฉด Spring Kafka ๋ฅผ ์ฌ์ฉํ์ฌ ์ค์ ํ ์ ์์ต๋๋ค. ์คํ๋ง ๋ถํธ์์๋ Kafka Streams ๋ฅผ ์์ฝ๊ฒ ์ค์ ํ ์ ์๋๋ก ๋ค์ํ ๊ธฐ๋ฅ์ ์ ๊ณตํฉ๋๋ค.
์คํ๋ง ๋ถํธ ์ค์ ์์ (
application.yml
)
spring:
kafka:
streams:
application-id: my-kafka-streams-app
bootstrap-servers: localhost:9092
default-key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default-value-serde: org.apache.kafka.common.serialization.Serdes$StringSerde
Kafka Streams ์ ํ๋ฆฌ์ผ์ด์ ์์
@EnableKafkaStreams
@SpringBootApplication
public class KafkaStreamsApp {
public static void main(String[] args) {
SpringApplication.run(KafkaStreamsApp.class, args);
}
@Bean
public KStream<String, String> kStream(StreamsBuilder streamsBuilder) {
KStream<String, String> stream = streamsBuilder.stream("input_topic");
KStream<String, String> uppercasedStream = stream.mapValues(value -> value.toUpperCase());
uppercasedStream.to("output_topic");
return uppercasedStream;
}
}
์ค๋ช :
-
@EnableKafkaStreams
: Kafka Streams ์ ํ๋ฆฌ์ผ์ด์ ์ ํ์ฑํํฉ๋๋ค. - StreamsBuilder : ์คํธ๋ฆผ์ ์์ฑํ๊ณ ์ฒ๋ฆฌํ๋ API.
- mapValues() : ๊ฐ ๋ฉ์์ง์ ๊ฐ์ ๋๋ฌธ์๋ก ๋ณํํ๋ ๊ฐ๋จํ ์์์ ๋๋ค.
3. ์ํ ์ ์ฅ ๋ณํ ์ํํ๊ธฐ
Kafka Streams๋ ์ํ ์ ์ฅ ๋ณํ ์ ํตํด ๋ฐ์ดํฐ๋ฅผ ์ง๊ณ ํ๊ฑฐ๋ ๋ณํ ํ ๊ฒฐ๊ณผ๋ฅผ ๋ก์ปฌ ์ํ ์ ์ฅ์ ์ ์ ์ฅํ ์ ์์ต๋๋ค. ์ด๋ฌํ ์ํ ์ ์ฅ์ ์ธ์ , ์ง๊ณ ์ ๊ฐ์ ๊ณ ๊ธ ์คํธ๋ฆผ ์ฒ๋ฆฌ๋ฅผ ๊ฐ๋ฅํ๊ฒ ํฉ๋๋ค.
์ํ ์ ์ฅ ์์: ๋ฉ์์ง ์ง๊ณ
KGroupedStream<String, String> groupedStream = stream.groupByKey();
KTable<String, Long> aggregatedTable = groupedStream.count(Materialized.as("counts-store"));
aggregatedTable.toStream().to("aggregated_output_topic");
์ค๋ช :
- groupByKey() : ํค๋ฅผ ๊ธฐ์ค์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ๊ทธ๋ฃนํํฉ๋๋ค.
- count() : ๊ฐ ํค์ ๋ฐ์ ํ์๋ฅผ ์ง๊ณํฉ๋๋ค.
- Materialized.as() : ์ง๊ณ ๊ฒฐ๊ณผ๋ฅผ ์ํ ์ ์ฅ์ ์ ์ ์ฅํฉ๋๋ค.
4. ์ธํฐ๋ํฐ๋ธ ์ฟผ๋ฆฌ์ ์ํ ์ ์ฅ์ ํ์ฉ
Kafka Streams ์ ๊ฐ๋ ฅํ ๊ธฐ๋ฅ ์ค ํ๋๋ ์ํ ์ ์ฅ์ ์ ์ ์ฅ๋ ๋ฐ์ดํฐ๋ฅผ ์ธํฐ๋ํฐ๋ธ ์ฟผ๋ฆฌ ๋ฅผ ํตํด ์ค์๊ฐ์ผ๋ก ์กฐํํ ์ ์๋ค๋ ์ ์ ๋๋ค. ์๋ฅผ ๋ค์ด, ํน์ ํค์ ์ง๊ณ ๊ฒฐ๊ณผ๋ฅผ API๋ก ์ ๊ณตํ ์ ์์ต๋๋ค.
์ํ ์ ์ฅ์์์ ์ฟผ๋ฆฌ ์ํ ์์
@RestController
public class QueryController {
@Autowired
private QueryableStoreRegistry queryableStoreRegistry;
@GetMapping("/count/{key}")
public Long getCount(@PathVariable String key) {
ReadOnlyKeyValueStore<String, Long> keyValueStore =
queryableStoreRegistry.getQueryableStoreType("counts-store", QueryableStoreTypes.keyValueStore());
return keyValueStore.get(key);
}
}
์ค๋ช :
- QueryableStoreRegistry : Kafka Streams์์ ์ํ ์ ์ฅ์์ ์ฟผ๋ฆฌํ ์ ์๋ API์ ๋๋ค.
- ReadOnlyKeyValueStore : ์ํ ์ ์ฅ์์์ ํน์ ํค์ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ด์ค๋ API์ ๋๋ค.
Kafka Streams์ ์คํ๋ง ๋ถํธ ํตํฉ ํธ๋ ๋ ๋ฐ ํต๊ณ
์ต๊ทผ ๋ฐ์ดํฐ์ ๋ฐ๋ฅด๋ฉด, Kafka Streams ๋ฅผ ์ฌ์ฉํ ์ค์๊ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ ์ ํ๋ฆฌ์ผ์ด์ ์ ๊ธฐ์ ์์ ๋ฐ์ดํฐ ๋ถ์ , ๋ชจ๋ํฐ๋ง , ์ค์๊ฐ ๋์๋ณด๋ ๊ตฌ์ถ ๋ฑ์ ๋๋ฆฌ ์ฌ์ฉ๋๊ณ ์์ต๋๋ค. ์ค์๊ฐ ์คํธ๋ฆฌ๋ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ ๋ฅผ ๋์ ํ ๊ธฐ์ ์ ๋น์ฆ๋์ค ์์ฌ๊ฒฐ์ ์๋ ๋ ํ๊ท ์ ์ผ๋ก 30% ์ด์ ํฅ์ ๋์๋ค๊ณ ๋ณด๊ณ ๋ฉ๋๋ค.
์์ฃผ ๋ฌป๋ ์ง๋ฌธ (FAQ)
Q1. Kafka Streams์ ์คํ๋ง ๋ถํธ๋ฅผ ํตํฉํ๋ ค๋ฉด ์ด๋ค ์ค์ ์ด ํ์ํ๊ฐ์?
A1. Spring Kafka ์ค์ ์์ Kafka Streams ๊ด๋ จ ์ต์
์ ์ค์ ํ๊ณ ,
@EnableKafkaStreams
๋ฅผ ์ฌ์ฉํ์ฌ Kafka Streams ์ ํ๋ฆฌ์ผ์ด์
์ ํ์ฑํํ ์ ์์ต๋๋ค.
Q2. Kafka Streams์์ ์ํ ์ ์ฅ ๋ณํ์ ์ด๋ป๊ฒ ์ํ๋๋์?
A2. Materialized.as() ๋ฅผ ์ฌ์ฉํ์ฌ ๋ณํ ๊ฒฐ๊ณผ๋ฅผ ์ํ ์ ์ฅ์ ์ ์ ์ฅํ ์ ์์ต๋๋ค. ์ํ ์ ์ฅ ๋ณํ์ ์ง๊ณ, ์๋์ฐ ์ฒ๋ฆฌ ๋ฑ์ ๋ณต์กํ ์คํธ๋ฆผ ์ฒ๋ฆฌ๋ฅผ ๊ฐ๋ฅํ๊ฒ ํฉ๋๋ค.
Q3. ์ธํฐ๋ํฐ๋ธ ์ฟผ๋ฆฌ๋ ๋ฌด์์ธ๊ฐ์?
A3. ์ธํฐ๋ํฐ๋ธ ์ฟผ๋ฆฌ ๋ Kafka Streams์ ์ํ ์ ์ฅ์ ์ ์ ์ฅ๋ ๋ฐ์ดํฐ๋ฅผ API๋ฅผ ํตํด ์ค์๊ฐ์ผ๋ก ์กฐํํ ์ ์๋ ๊ธฐ๋ฅ์
๋๋ค. ์ด๋ฅผ ํตํด ์ ํ๋ฆฌ์ผ์ด์
์ธ๋ถ์์ ์ํ ์ ์ฅ๋ ๋ฐ์ดํฐ๋ฅผ ํ์ฉํ ์ ์์ต๋๋ค.
Q4. Kafka Streams ์ ํ๋ฆฌ์ผ์ด์
์์ ์ํ ์ ์ฅ์๋ฅผ ์ด๋ป๊ฒ ํ์ฉํ ์ ์๋์?
A4. ์ํ ์ ์ฅ์๋ ์ธ์
์ ์ง , ๋ฐ์ดํฐ ์ง๊ณ , ๋ณํ ๊ฒฐ๊ณผ ์ ์ฅ ๋ฑ ๋ค์ํ ์ฉ๋๋ก ์ฌ์ฉ๋ฉ๋๋ค. ์ฟผ๋ฆฌ ๋ฅผ ํตํด ์ค์๊ฐ์ผ๋ก ์ ์ฅ๋ ๋ฐ์ดํฐ๋ฅผ ์กฐํํ ์๋ ์์ต๋๋ค.
Q5. Kafka Streams์ ์คํ๋ง ๋ถํธ๋ฅผ ์ฌ์ฉํ๋ ์ฃผ๋ ์ด์ ๋ ๋ฌด์์ธ๊ฐ์?
A5. Kafka Streams์ ์คํ๋ง ๋ถํธ์ ํตํฉ์ ์ค์๊ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ ๋ฅผ ๊ฐํธํ๊ฒ ๊ตฌํํ ์ ์๋ ๊ฐ๋ ฅํ ์๋ฃจ์
์ ์ ๊ณตํฉ๋๋ค. ์คํ๋ง ๋ถํธ์ ์ ์ฐํ ์ค์ ๊ณผ Kafka Streams์ ๊ฐ๋ ฅํ ์คํธ๋ฆผ ์ฒ๋ฆฌ ๊ธฐ๋ฅ์ด ๊ฒฐํฉ๋์ด ๋๊ท๋ชจ ์ค์๊ฐ ๋ฐ์ดํฐ ์ ํ๋ฆฌ์ผ์ด์
์ ์ฝ๊ฒ ๊ตฌ์ถํ ์ ์์ต๋๋ค.
๋๊ธ