์คํ๋ง ๋ถํธ์์ Kafka ์ค์ ํ๊ธฐ: ๊ฐ๋จํ๊ณ ๋น ๋ฅด๊ฒ ๋ฐฐ์ฐ๋ ๋ฐฉ๋ฒ
์คํ๋ง ๋ถํธ ์ Apache Kafka ๋ฅผ ํตํฉํ์ฌ ์ค์๊ฐ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๋ ๋ฐฉ๋ฒ์ ๋ง์ ๊ธฐ์ ๊ณผ ๊ฐ๋ฐ์๋ค์ด ํ์ฉํ๋ ์ค์ํ ๊ธฐ์ ์ ๋๋ค. ์ด๋ฒ ํฌ์คํ ์์๋ application.yml ๋๋ application.properties ํ์ผ์ ์ฌ์ฉํ Kafka ์ค์ ๋ฐฉ๋ฒ, ํ๋ก๋์์ ์ปจ์๋จธ ์ค์ , ๊ธฐ๋ณธ ์ง๋ ฌํ๊ธฐ/์ญ์ง๋ ฌํ๊ธฐ ์ค์ , ๊ทธ๋ฆฌ๊ณ ์ฌ์ฉ์ ์ ์ Kafka ์์ฑ์ ์ ์ฉํ๋ ๋ฐฉ๋ฒ์ ์ค๋ช ํฉ๋๋ค. ๋ชจ๋ ๊ณผ์ ์ ์ฝ๋ ์์ ์ ํจ๊ป ์ ๊ณต๋์ด ์ฝ๊ฒ ๋ฐ๋ผ ํ ์ ์์ต๋๋ค.
1.
application.properties
๋๋
application.yml
์ค์ ํ์ผ ์ดํดํ๊ธฐ
application.yml
์ ํตํ Kafka ์ค์
์คํ๋ง ๋ถํธ์์ Kafka๋ฅผ ์ค์ ํ๋ ค๋ฉด application.yml ๋๋ application.properties ํ์ผ์ ์ค์ ์ ์ถ๊ฐํด์ผ ํฉ๋๋ค. ์ฌ๊ธฐ์๋ YAML ํ์์ ์ฌ์ฉํ ์ค์ ์์ ๋ฅผ ๋ณด์ฌ๋๋ฆฝ๋๋ค.
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my-group
auto-offset-reset: earliest
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
์ฃผ์ ์ค์ ํญ๋ชฉ:
- bootstrap-servers : Kafka ๋ธ๋ก์ปค์ ์ฃผ์๋ฅผ ์ง์ ํฉ๋๋ค.
- group-id : ์ปจ์๋จธ ๊ทธ๋ฃน์ ID๋ฅผ ์ค์ ํฉ๋๋ค.
- auto-offset-reset : ์๋ก์ด ์ปจ์๋จธ๊ฐ ์์ํ ๋ ์คํ์ ์ ์ด๋์๋ถํฐ ์ฝ์์ง ์ง์ ํฉ๋๋ค. earliest ๋ก ์ค์ ํ๋ฉด ๊ฐ์ฅ ์ฒ์๋ถํฐ ์ฝ๊ธฐ ์์ํฉ๋๋ค.
- key-serializer / value-serializer : Kafka ๋ฉ์์ง์ ํค์ ๊ฐ์ ์ง๋ ฌํํ๋ ๋ฐฉ๋ฒ์ ์ง์ ํฉ๋๋ค. ์ฌ๊ธฐ์๋ ๊ธฐ๋ณธ์ ์ผ๋ก String ํ์ ์ผ๋ก ์ค์ ํ์ต๋๋ค.
application.properties
์ค์ ์์
application.properties
ํ์์ ์ ํธํ๋ ๊ฒฝ์ฐ, ๋์ผํ ์ค์ ์ ๋ค์๊ณผ ๊ฐ์ด ์์ฑํ ์ ์์ต๋๋ค:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
2. Kafka ํ๋ก๋์์ ์ปจ์๋จธ ์ค์ ๋ฐฉ๋ฒ
Kafka ํ๋ก๋์ ์ค์
Kafka ํ๋ก๋์๋ ๋ฉ์์ง๋ฅผ ์์ฑํ๊ณ ๋ธ๋ก์ปค์ ์ ์ก ํ๋ ์ญํ ์ ํฉ๋๋ค. ์คํ๋ง ๋ถํธ ์์๋ KafkaTemplate์ ์ฌ์ฉํ์ฌ ์ฝ๊ฒ ๋ฉ์์ง๋ฅผ ์ ์กํ ์ ์์ต๋๋ค.
@Service
public class KafkaProducer {
private static final Logger logger = LoggerFactory.getLogger(KafkaProducer.class);
private final KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String message) {
logger.info(String.format("#### -> Producing message -> %s", message));
this.kafkaTemplate.send("my_topic", message);
}
}
์ค๋ช :
- KafkaTemplate ์ ํตํด ๋ฉ์์ง๋ฅผ ์ ์กํฉ๋๋ค.
- send ๋ฉ์๋๋ ์ง์ ํ ํ ํฝ (์ฌ๊ธฐ์๋
my_topic
)์ ๋ฉ์์ง๋ฅผ ์ ์กํฉ๋๋ค.
Kafka ์ปจ์๋จธ ์ค์
Kafka ์ปจ์๋จธ๋ ๋ฉ์์ง๋ฅผ ์๋น ํ๋ ์ญํ ์ ํ๋ฉฐ, ์ฃผ๋ก ๋ฐ์ดํฐ ์ฒ๋ฆฌ๋ ์ค์๊ฐ ๋ถ์์ ์ฌ์ฉ๋ฉ๋๋ค. ์คํ๋ง ๋ถํธ์์๋ @KafkaListener ๋ฅผ ์ฌ์ฉํ์ฌ ๋ฉ์์ง๋ฅผ ์์ ํ ์ ์์ต๋๋ค.
@Service
public class KafkaConsumer {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = "my_topic", groupId = "my-group")
public void consume(String message) {
logger.info(String.format("#### -> Consumed message -> %s", message));
}
}
์ค๋ช :
- @KafkaListener ๋ฅผ ํตํด ์ง์ ๋ ํ ํฝ(
my_topic
)์์ ๋ฉ์์ง๋ฅผ ์์ ํฉ๋๋ค. - ์์ ๋ ๋ฉ์์ง๋ consume ๋ฉ์๋์์ ์ฒ๋ฆฌ๋ฉ๋๋ค.
3. ๊ธฐ๋ณธ ์ง๋ ฌํ๊ธฐ์ ์ญ์ง๋ ฌํ๊ธฐ ์ค์
Kafka๋ ์ง๋ ฌํ ์ ์ญ์ง๋ ฌํ ๋ฅผ ํตํด ๋ฉ์์ง๋ฅผ ๋ฐ์ดํธ ์คํธ๋ฆผ ์ผ๋ก ๋ณํํ๊ณ , ๋ค์ ๊ฐ์ฒด๋ก ๋ณํํ๋ ๊ณผ์ ์ ๊ฑฐ์นฉ๋๋ค. ๊ธฐ๋ณธ์ ์ผ๋ก๋ String ์ ์ง๋ ฌํํ์ง๋ง, ํ์์ ๋ฐ๋ผ ์ปค์คํ ์ง๋ ฌํ๊ธฐ๋ฅผ ์ ์ํ ์ ์์ต๋๋ค.
๊ธฐ๋ณธ ์ง๋ ฌํ๊ธฐ ์ค์
Kafka์์ ๊ธฐ๋ณธ์ ์ผ๋ก ์ ๊ณตํ๋ StringSerializer ์ StringDeserializer ๋ฅผ ์ฌ์ฉํ๋ฉด ๋ฌธ์์ด ๋ฐ์ดํฐ๋ฅผ ์ฝ๊ฒ ์ ์กํ ์ ์์ต๋๋ค. ์ด๋ ์์ ์ดํด๋ณธ application.yml ์ค์ ์์ ์ด๋ฏธ ์ ์ฉ๋์์ต๋๋ค:
spring:
kafka:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
JSON ์ง๋ ฌํ/์ญ์ง๋ ฌํ ์ค์
๋ง์ฝ JSON ํ์ ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ์ฃผ๊ณ ๋ฐ๊ณ ์ถ๋ค๋ฉด, Jackson ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ์ฌ์ฉํ์ฌ ์ง๋ ฌํ๊ธฐ์ ์ญ์ง๋ ฌํ๊ธฐ๋ฅผ ์ค์ ํ ์ ์์ต๋๋ค.
spring:
kafka:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
consumer-properties:
spring.json.trusted.packages: "*"
์ด ์ค์ ์ ํตํด ํ๋ก๋์๊ฐ JSON ๋ฐ์ดํฐ ๋ฅผ ์ง๋ ฌํํ๊ณ , ์ปจ์๋จธ๊ฐ ์ด๋ฅผ ์ญ์ง๋ ฌํํ์ฌ Java ๊ฐ์ฒด ๋ก ๋ณํํ ์ ์์ต๋๋ค.
4. ์ฌ์ฉ์ ์ ์ Kafka ์์ฑ ์ ์ฉ
Kafka์ ๊ธฐ๋ณธ ์ค์ ์ธ์๋ ํ์์ ๋ฐ๋ผ ์ฌ์ฉ์ ์ ์ ์์ฑ ์ ์ ์ฉํ ์ ์์ต๋๋ค. ์๋ฅผ ๋ค์ด, ACK(ํ์ธ ์๋ต) ๋ฉ์ปค๋์ฆ์ ์ฌ์ฉ์ ์ ์ํ์ฌ ๋ฐ์ดํฐ ์์ค์ ์ต์ํํ ์ ์์ต๋๋ค.
ACK ๋ฉ์ปค๋์ฆ ์ค์
ACK ์ค์ ์ ํ๋ก๋์๊ฐ ๋ฉ์์ง๋ฅผ ์ ์กํ ํ Kafka ๋ธ๋ก์ปค๋ก๋ถํฐ ํ์ธ ์๋ต ์ ๋ฐ์ ๋๊น์ง ๊ธฐ๋ค๋ฆฌ๋ ๋ฐฉ์์ ์ ์ํฉ๋๋ค. ์ด๋ฅผ ํตํด ๋ฉ์์ง ์ ๋ฌ์ ์์ ์ฑ์ ๋ณด์ฅํ ์ ์์ต๋๋ค.
spring:
kafka:
producer:
acks: all # ๋ชจ๋ ๋ธ๋ก์ปค์์ ์๋ต์ ๋ฐ์์ ๋๋ง ์ฑ๊ณต์ผ๋ก ์ฒ๋ฆฌ
retries: 3 # ๋ฉ์์ง ์ ์ก ์คํจ ์ ์ฌ์๋ ํ์
- acks: all : ๋ชจ๋ ๋ฆฌํ๋ฆฌ์นด์์ ๋ฉ์์ง๊ฐ ๊ธฐ๋ก๋์์ ๋๋ง ์ฑ๊ณต์ผ๋ก ๊ฐ์ฃผํฉ๋๋ค.
- retries: 3 : ์ ์ก ์คํจ ์ ์ต๋ 3๋ฒ๊น์ง ์ฌ์๋ํฉ๋๋ค.
๋ฐฐ์น ์ฒ๋ฆฌ ์ค์
Kafka์์ ๋ฐฐ์น ์ฒ๋ฆฌ ๋ฅผ ํ์ฑํํ๋ฉด, ์ฌ๋ฌ ๊ฐ์ ๋ฉ์์ง๋ฅผ ๋ชจ์์ ํ ๋ฒ์ ์ฒ๋ฆฌํ ์ ์์ด ์ฑ๋ฅ์ด ํฅ์๋ฉ๋๋ค.
spring:
kafka:
consumer:
enable-auto-commit: false # ์๋ ์ปค๋ฐ ๋นํ์ฑํ
max-poll-records: 10 # ํ ๋ฒ์ ์ต๋ 10๊ฐ์ ๋ฉ์์ง ์ฒ๋ฆฌ
์ด ์ค์ ์ ํตํด ํ ๋ฒ์ ์ฌ๋ฌ ๋ฉ์์ง๋ฅผ ์ฝ๊ณ ์ฒ๋ฆฌํ๋ ๋ฐฐ์น ์ฒ๋ฆฌ๊ฐ ๊ฐ๋ฅํด์ง๋๋ค. ์ด๋ฅผ ํตํด ์ฒ๋ฆฌ ์๋๋ฅผ ๋์ด๊ณ , ๋ฆฌ์์ค ์ฌ์ฉ์ ์ต์ ํํ ์ ์์ต๋๋ค.
์คํ๋ง ๋ถํธ์์ Kafka ์ค์ ํธ๋ ๋ ๋ฐ ํต๊ณ
Kafka๋ ์ ์ธ๊ณ์ ์ผ๋ก ๋๊ท๋ชจ ์ค์๊ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ ๋ฅผ ์ํ ํ์ ๋๊ตฌ๋ก ์๋ฆฌ ์ก๊ณ ์์ต๋๋ค. ์ต๊ทผ 2023๋ Stack Overflow ๊ฐ๋ฐ์ ์ค๋ฌธ์กฐ์ฌ ์ ๋ฐ๋ฅด๋ฉด, ๋ง์ ๊ฐ๋ฐ์๋ค์ด Kafka๋ฅผ ๋ง์ดํฌ๋ก์๋น์ค ์ํคํ ์ฒ์์ ์ฌ์ฉํ๊ณ ์์ผ๋ฉฐ, Kafka๋ฅผ ์ฌ์ฉํ ๊ธฐ์ ์ค 80% ์ด์์ด ๋ฐ์ดํฐ ์์ค ์์ด ์ค์๊ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ ๋ฅผ ์ฑ๊ณต์ ์ผ๋ก ๊ตฌํํ๋ค๊ณ ๋ณด๊ณ ๋์์ต๋๋ค.
์์ฃผ ๋ฌป๋ ์ง๋ฌธ (FAQ)
Q1. Kafka ์ค์ ํ์ผ์์ ๊ฐ์ฅ ์ค์ํ ํญ๋ชฉ์ ๋ฌด์์ธ๊ฐ์?
A1. bootstrap-servers , group-id , key/value ์ง๋ ฌํ๊ธฐ ์ค์ ์ด ๊ฐ์ฅ ์ค์ํฉ๋๋ค. ์ด ์ค์ ๋ค์ด Kafka์ ๋ธ๋ก์ปค๋ฅผ ์ฐ๊ฒฐํ๊ณ , ๋ฐ์ดํฐ๋ฅผ ์ง๋ ฌํ/์ญ์ง๋ ฌํํ๋ ๊ณผ์ ์ ์ ์ดํฉ๋๋ค.
Q2. Kafka ํ๋ก๋์์ ์ปจ์๋จธ๋ฅผ ๋์์ ์ฌ์ฉํ ์ ์๋์?
A2. ๋ค, ๊ฐ๋ฅํฉ๋๋ค. ์คํ๋ง ๋ถํธ์์๋ ๋์ผํ ์ ํ๋ฆฌ์ผ์ด์
์์ Kafka ํ๋ก๋์์ ์ปจ์๋จธ๋ฅผ ๋์์ ์ค์ ํ์ฌ ์ฌ์ฉํ ์ ์์ต๋๋ค.
Q3. ๋ฐฐ์น ์ฒ๋ฆฌ๋ ์ด๋ค ์ํฉ์์ ์ ๋ฆฌํ๊ฐ์?
A3. ๋ฐฐ์น ์ฒ๋ฆฌ ๋ ๋ง์ ์์ ๋ฉ์์ง๋ฅผ ํ ๋ฒ์ ์ฒ๋ฆฌํด์ผ ํ ๋ ์ ๋ฆฌํฉ๋๋ค. ๋ฐฐ์น ์ฒ๋ฆฌ๋ก ์ฑ๋ฅ์ ์ต์ ํํ ์ ์์ผ๋ฉฐ, ์์คํ
๋ฆฌ์์ค๋ฅผ ์ ์ฝํ๊ณ ์ฒ๋ฆฌ ์๋๋ฅผ ๋์ผ ์ ์์ต๋๋ค. ์ฃผ๋ก ๋ก๊ทธ ์์ง , ํธ๋์ญ์
์ฒ๋ฆฌ ์ ๊ฐ์ ๋๋ ๋ฐ์ดํฐ ์ฒ๋ฆฌ์์ ์ฌ์ฉ๋ฉ๋๋ค.
Q4. Kafka ์ง๋ ฌํ๊ธฐ๋ฅผ ์ปค์คํฐ๋ง์ด์งํ ์ ์๋์?
A4. ๋ค, ๊ฐ๋ฅํฉ๋๋ค. Kafka๋ ๊ธฐ๋ณธ์ ์ผ๋ก StringSerializer ๋ฅผ ์ ๊ณตํ์ง๋ง, ํ์์ ๋ฐ๋ผ JsonSerializer ๋๋ ์ง์ ๋ง๋ ์ปค์คํ
์ง๋ ฌํ๊ธฐ ๋ฅผ ์ฌ์ฉํ ์ ์์ต๋๋ค. ์ด๋ฅผ ํตํด ๋ฐ์ดํฐ๋ฅผ ์ํ๋ ํ์์ผ๋ก ์ง๋ ฌํํ ์ ์์ต๋๋ค.
Q5. Kafka์ ACK ๋ฉ์ปค๋์ฆ์ด ์ค์ํ ์ด์ ๋ ๋ฌด์์ธ๊ฐ์?
A5. ACK ๋ฉ์ปค๋์ฆ ์ ๋ฉ์์ง๊ฐ ์์ ํ๊ฒ ์ ๋ฌ๋์๋์ง๋ฅผ ํ์ธํ๋ ๊ณผ์ ์
๋๋ค. ๋ฉ์์ง ์์ค์ ๋ฐฉ์งํ๊ณ , ์์ ์ ์ธ ๋ฐ์ดํฐ ์ฒ๋ฆฌ๋ฅผ ์ํด ํ๋ก๋์๊ฐ ๋ธ๋ก์ปค๋ก๋ถํฐ ํ์ธ ์๋ต์ ๋ฐ๋ ๊ฒ์ด ์ค์ํฉ๋๋ค. ACK๋ฅผ
all
๋ก ์ค์ ํ๋ฉด ๋ชจ๋ ๋ธ๋ก์ปค์์ ๋ฉ์์ง๊ฐ ์ฑ๊ณต์ ์ผ๋ก ์ ์ฅ๋์์ ๋๋ง ์ฑ๊ณต์ผ๋ก ์ฒ๋ฆฌํ๋ฏ๋ก ์์ ์ฑ์ ๋์ผ ์ ์์ต๋๋ค.
๋๊ธ