Infra/Apache

[Apache] Apache Kafka์˜ ๊ฐœ๋…

carsumin 2025. 2. 25. 21:46
Apache Kafka๋ž€?
  • ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆฌ๋ฐ ํ”Œ๋žซํผ
  • ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐœํ–‰(Publish)ํ•˜๋Š” ์ชฝ๊ณผ ๊ตฌ๋…(Subscribe)ํ•˜๋Š” ์ชฝ์„ ๋น„๋™๊ธฐ์ ์œผ๋กœ ์—ฐ๊ฒฐํ•ด์ฃผ๋Š” ๋ฉ”์‹œ์ง€ ํ ์‹œ์Šคํ…œ

-> ๋Œ€์šฉ๋Ÿ‰ ๋ฐ์ดํ„ฐ๋ฅผ ๋น ๋ฅด๊ณ  ์•ˆ์ •์ ์œผ๋กœ ์ฃผ๊ณ ๋ฐ›๋Š” ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ ํŒŒ์ดํ”„๋ผ์ธ

 

 

๊ธฐ๋ณธ ๊ฐœ๋… ๊ตฌ์กฐ

 

[Producer] → [Kafka Broker] → [Consumer]

 

  • Producer : ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐœํ–‰(์ „์†ก)ํ•˜๋Š” ์ฃผ์ฒด (ex : ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜, ์„ผ์„œ, ์„œ๋ฒ„ ๋กœ๊ทธ ๋“ฑ)
  • Consumer : ๋ฐ์ดํ„ฐ๋ฅผ ๊ตฌ๋…(์ˆ˜์‹ )ํ•˜๋Š” ์ฃผ์ฒด (ex : Spark, ELK, DB ๋“ฑ)
  • Broker : Kafka ์„œ๋ฒ„๋กœ ์‹ค์ œ ๋ฐ์ดํ„ฐ๋ฅผ ์ €์žฅํ•˜๊ณ  ์ „๋‹ฌ
  • Topic : ๋ฉ”์‹œ์ง€๋ฅผ ๋ถ„๋ฅ˜ํ•˜๋Š” ๋…ผ๋ฆฌ์  ์ฑ„๋„
  • Partition : Topic์„ ๋ถ„ํ• ํ•œ ๋‹จ์œ„
  • Offset : ๊ฐ ๋ฉ”์‹œ์ง€๊ฐ€ Partition ๋‚ด์—์„œ ์ฐจ์ง€ํ•˜๋Š” ๊ณ ์œ ํ•œ ์œ„์น˜
  • Consumer Group : ์—ฌ๋Ÿฌ Consumer๊ฐ€ ํ•˜๋‚˜์˜ Topic์„ ๋ณ‘๋ ฌ๋กœ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•œ ๊ทธ๋ฃน ๋‹จ์œ„

 

๋™์ž‘ ํ๋ฆ„ ์˜ˆ์‹œ
  • ์˜ˆ์‹œ๋กœ ์‡ผํ•‘๋ชฐ ๋กœ๊ทธ๋ฅผ ์‹ค์‹œ๊ฐ„ ๋ถ„์„ํ•œ๋‹ค๊ณ  ๊ฐ€์ •
[์›น ์„œ๋ฒ„ ๋กœ๊ทธ] → [Kafka Topic: web_log] → [Spark Streaming or Elasticsearch]

 

  1. Producer (์›น์„œ๋ฒ„)
    • ์‚ฌ์šฉ์ž์˜ ํด๋ฆญ, ๊ฒฐ์ œ ๋“ฑ์˜ ๋กœ๊ทธ๋ฅผ JSON ํ˜•ํƒœ๋กœ Kafka์— ์ „์†ก
  2. Kafka Broker
    • ๋กœ๊ทธ๋ฅผ 'web_log' ๋ผ๋Š” Topic์— ์ €์žฅ
    • ์—ฌ๋Ÿฌ Broker๊ฐ€ ์žˆ์œผ๋ฉด ๋ฐ์ดํ„ฐ๋ฅผ Partition์œผ๋กœ ๋‚˜๋ˆ  ๋ถ„์‚ฐ ์ €์žฅ
  3. Consumer (Spark, Flink, ELK ๋“ฑ)
    • 'web_log' Topic์„ ๊ตฌ๋…ํ•˜์—ฌ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์Œ
    • ์‹ค์‹œ๊ฐ„์œผ๋กœ ์ง‘๊ณ„, ํ•„ํ„ฐ๋ง, ์•Œ๋ฆผ ๋“ฑ ์ฒ˜๋ฆฌ ์ˆ˜ํ–‰

 

Kafka ์ฃผ์š” ํŠน์ง•
  • ์ดˆ๊ณ ์† ์ฒ˜๋ฆฌ : ์ดˆ๋‹น ์ˆ˜๋ฐฑ๋งŒ ๊ฑด์˜ ๋ฉ”์‹œ์ง€๋ฅผ ์ฒ˜๋ฆฌ ๊ฐ€๋Šฅ
  • ๋ถ„์‚ฐ ๊ตฌ์กฐ : ์—ฌ๋Ÿฌ Broker๋กœ ๊ตฌ์„ฑ๋˜์–ด ์žฅ์• ์— ๊ฐ•ํ•จ
  • ํ™•์žฅ์„ฑ : Topic/Partiiton์„ ์ถ”๊ฐ€ํ•˜์—ฌ ์„ ํ˜• ํ™•์žฅ
  • ๋‚ด๊ตฌ์„ฑ : ๋ฐ์ดํ„ฐ๋ฅผ ๋””์Šคํฌ ํ˜•ํƒœ๋กœ ์ €์žฅ -> ์žฅ์•  ํ›„ ๋ณต๊ตฌ ๊ฐ€๋Šฅ
  • ์‹ค์‹œ๊ฐ„ ์ŠคํŠธ๋ฆฌ๋ฐ
  • Exactly-once ๋ณด์žฅ : ๋™์ผ ๋ฉ”์‹œ์ง€๊ฐ€ ์ค‘๋ณต ์ฒ˜๋ฆฌ๋˜์ง€ ์•Š๋„๋ก ์ œ์–ด ๊ฐ€๋Šฅ

 

๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ๋ชจ๋ธ
  • Kafka์˜ ๊ธฐ๋ณธ ์›์น™์€ Pub/Sub ๋ชจ๋ธ
  • ๊ฐ ๋ฉ”์‹œ์ง€๋Š” ์ˆœ์„œ๊ฐ€ ์žˆ๊ณ  Consumer๋Š” ๊ฐ Partition์—์„œ Offset์„ ๊ธฐ์–ตํ•˜์—ฌ ์–ด๋””๊นŒ์ง€ ์ฝ์—ˆ๋Š”์ง€ ๊ด€๋ฆฌ
    • ์ฆ‰ Consumer๊ฐ€ ์ฃฝ์–ด๋„ ๋ณต๊ตฌ ํ›„ ๋‹ค์‹œ ๊ทธ Offset๋ถ€ํ„ฐ ์ด์–ด์„œ ์ฝ์„ ์ˆ˜ ์žˆ์Œ

 

Java ์˜ˆ์‹œ ์ฝ”๋“œ
// Producer ์˜ˆ์‹œ
import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class SimpleProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<>("test-topic", "key" + i, "message-" + i));
        }

        producer.close();
    }
}
// Consumer ์˜ˆ์‹œ
import org.apache.kafka.clients.consumer.*;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SimpleConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "group1");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("test-topic"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset=%d, key=%s, value=%s%n",
                        record.offset(), record.key(), record.value());
            }
        }
    }
}

 

 

Kafka์˜ ํ™œ์šฉ
  • ์‹ค์‹œ๊ฐ„ ๋กœ๊ทธ ์ˆ˜์ง‘
  • ๊ธˆ์œต ๊ฑฐ๋ž˜ ์ฒ˜๋ฆฌ (ex : ์‹ค์‹œ๊ฐ„ ๊ฒฐ์ œ, ์ด์ƒ ๊ฑฐ๋ž˜ ํƒ์ง€)
  • IoT ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ (ex : ์„ผ์„œ, ์Šค๋งˆํŠธ ๋””๋ฐ”์ด์Šค ๋ฐ์ดํ„ฐ ์ˆ˜์ง‘)
  • ๋ฐ์ดํ„ฐ ํŒŒ์ดํ”„๋ผ์ธ ํ—ˆ๋ธŒ (๋‹ค์–‘ํ•œ ์‹œ์Šคํ…œ ๊ฐ„ ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ ์ „์†ก)
  • ๋จธ์‹ ๋Ÿฌ๋‹ ํ”ผ๋“œ๋ฐฑ ๋ฃจํ”„ (๋ชจ๋ธ ์˜ˆ์ธก -> ์‚ฌ์šฉ์ž ๋ฐ˜์‘ -> ๋ชจ๋ธ ์žฌํ•™์Šต)