Apache Kafka๋?
- ์ค์๊ฐ ๋ฐ์ดํฐ ์คํธ๋ฆฌ๋ฐ ํ๋ซํผ
- ๋ฐ์ดํฐ๋ฅผ ๋ฐํ(Publish)ํ๋ ์ชฝ๊ณผ ๊ตฌ๋ (Subscribe)ํ๋ ์ชฝ์ ๋น๋๊ธฐ์ ์ผ๋ก ์ฐ๊ฒฐํด์ฃผ๋ ๋ฉ์์ง ํ ์์คํ
-> ๋์ฉ๋ ๋ฐ์ดํฐ๋ฅผ ๋น ๋ฅด๊ณ ์์ ์ ์ผ๋ก ์ฃผ๊ณ ๋ฐ๋ ์ค์๊ฐ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ
๊ธฐ๋ณธ ๊ฐ๋ ๊ตฌ์กฐ
[Producer] → [Kafka Broker] → [Consumer]
- Producer : ๋ฐ์ดํฐ๋ฅผ ๋ฐํ(์ ์ก)ํ๋ ์ฃผ์ฒด (ex : ์ ํ๋ฆฌ์ผ์ด์ , ์ผ์, ์๋ฒ ๋ก๊ทธ ๋ฑ)
- Consumer : ๋ฐ์ดํฐ๋ฅผ ๊ตฌ๋ (์์ )ํ๋ ์ฃผ์ฒด (ex : Spark, ELK, DB ๋ฑ)
- Broker : Kafka ์๋ฒ๋ก ์ค์ ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ๊ณ ์ ๋ฌ
- Topic : ๋ฉ์์ง๋ฅผ ๋ถ๋ฅํ๋ ๋ ผ๋ฆฌ์ ์ฑ๋
- Partition : Topic์ ๋ถํ ํ ๋จ์
- Offset : ๊ฐ ๋ฉ์์ง๊ฐ Partition ๋ด์์ ์ฐจ์งํ๋ ๊ณ ์ ํ ์์น
- Consumer Group : ์ฌ๋ฌ Consumer๊ฐ ํ๋์ Topic์ ๋ณ๋ ฌ๋ก ์ฒ๋ฆฌํ๊ธฐ ์ํ ๊ทธ๋ฃน ๋จ์
๋์ ํ๋ฆ ์์
- ์์๋ก ์ผํ๋ชฐ ๋ก๊ทธ๋ฅผ ์ค์๊ฐ ๋ถ์ํ๋ค๊ณ ๊ฐ์
[์น ์๋ฒ ๋ก๊ทธ] → [Kafka Topic: web_log] → [Spark Streaming or Elasticsearch]
- Producer (์น์๋ฒ)
- ์ฌ์ฉ์์ ํด๋ฆญ, ๊ฒฐ์ ๋ฑ์ ๋ก๊ทธ๋ฅผ JSON ํํ๋ก Kafka์ ์ ์ก
- Kafka Broker
- ๋ก๊ทธ๋ฅผ 'web_log' ๋ผ๋ Topic์ ์ ์ฅ
- ์ฌ๋ฌ Broker๊ฐ ์์ผ๋ฉด ๋ฐ์ดํฐ๋ฅผ Partition์ผ๋ก ๋๋ ๋ถ์ฐ ์ ์ฅ
- Consumer (Spark, Flink, ELK ๋ฑ)
- 'web_log' Topic์ ๊ตฌ๋ ํ์ฌ ๋ฐ์ดํฐ๋ฅผ ์ฝ์
- ์ค์๊ฐ์ผ๋ก ์ง๊ณ, ํํฐ๋ง, ์๋ฆผ ๋ฑ ์ฒ๋ฆฌ ์ํ
Kafka ์ฃผ์ ํน์ง
- ์ด๊ณ ์ ์ฒ๋ฆฌ : ์ด๋น ์๋ฐฑ๋ง ๊ฑด์ ๋ฉ์์ง๋ฅผ ์ฒ๋ฆฌ ๊ฐ๋ฅ
- ๋ถ์ฐ ๊ตฌ์กฐ : ์ฌ๋ฌ Broker๋ก ๊ตฌ์ฑ๋์ด ์ฅ์ ์ ๊ฐํจ
- ํ์ฅ์ฑ : Topic/Partiiton์ ์ถ๊ฐํ์ฌ ์ ํ ํ์ฅ
- ๋ด๊ตฌ์ฑ : ๋ฐ์ดํฐ๋ฅผ ๋์คํฌ ํํ๋ก ์ ์ฅ -> ์ฅ์ ํ ๋ณต๊ตฌ ๊ฐ๋ฅ
- ์ค์๊ฐ ์คํธ๋ฆฌ๋ฐ
- Exactly-once ๋ณด์ฅ : ๋์ผ ๋ฉ์์ง๊ฐ ์ค๋ณต ์ฒ๋ฆฌ๋์ง ์๋๋ก ์ ์ด ๊ฐ๋ฅ
๋ฉ์์ง ์ฒ๋ฆฌ ๋ชจ๋ธ
- Kafka์ ๊ธฐ๋ณธ ์์น์ Pub/Sub ๋ชจ๋ธ
- ๊ฐ ๋ฉ์์ง๋ ์์๊ฐ ์๊ณ Consumer๋ ๊ฐ Partition์์ Offset์ ๊ธฐ์ตํ์ฌ ์ด๋๊น์ง ์ฝ์๋์ง ๊ด๋ฆฌ
- ์ฆ Consumer๊ฐ ์ฃฝ์ด๋ ๋ณต๊ตฌ ํ ๋ค์ ๊ทธ Offset๋ถํฐ ์ด์ด์ ์ฝ์ ์ ์์
Java ์์ ์ฝ๋
// Producer ์์
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class SimpleProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<>("test-topic", "key" + i, "message-" + i));
}
producer.close();
}
}
// Consumer ์์
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class SimpleConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "group1");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset=%d, key=%s, value=%s%n",
record.offset(), record.key(), record.value());
}
}
}
}
Kafka์ ํ์ฉ
- ์ค์๊ฐ ๋ก๊ทธ ์์ง
- ๊ธ์ต ๊ฑฐ๋ ์ฒ๋ฆฌ (ex : ์ค์๊ฐ ๊ฒฐ์ , ์ด์ ๊ฑฐ๋ ํ์ง)
- IoT ๋ฐ์ดํฐ ์คํธ๋ฆผ (ex : ์ผ์, ์ค๋งํธ ๋๋ฐ์ด์ค ๋ฐ์ดํฐ ์์ง)
- ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ ํ๋ธ (๋ค์ํ ์์คํ ๊ฐ ์ค์๊ฐ ๋ฐ์ดํฐ ์ ์ก)
- ๋จธ์ ๋ฌ๋ ํผ๋๋ฐฑ ๋ฃจํ (๋ชจ๋ธ ์์ธก -> ์ฌ์ฉ์ ๋ฐ์ -> ๋ชจ๋ธ ์ฌํ์ต)
'Infra > Apache' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
| [Apache] Apache Airflow์ ๊ฐ๋ (0) | 2025.02.20 |
|---|---|
| [Apache] Apache Spark์ ๊ฐ๋ (0) | 2025.02.20 |