3장 - KSQL을 활용한 실전 튜토리얼: 스트리밍 데이터 처리
Data Analyst

빅데이터 관련 자료/[SQL] Basic

3장 - KSQL을 활용한 실전 튜토리얼: 스트리밍 데이터 처리

carpe08 2024. 1. 10. 12:42
320x100
320x100

KSQL을 사용하여 스트리밍 데이터를 처리하는 것을 알아보겠습니다. 예를 들어, 실시간으로 들어오는 거래 데이터를 다루고 분석하는 과정을 살펴보겠습니다.

1. 데이터 스트림 생성

먼저, KSQL을 사용하여 데이터를 스트리밍하는 토픽을 생성합니다. 예를 들어, 거래 데이터를 다루는 토픽을 생성할 수 있습니다.

-- 거래 데이터를 다루는 토픽 생성
CREATE STREAM transaction_data (id INT, amount DOUBLE, timestamp BIGINT) WITH (KAFKA_TOPIC='raw_transactions', VALUE_FORMAT='JSON');
 

위 코드는 raw_transactions라는 카프카 토픽으로부터 JSON 형식의 거래 데이터를 읽어와 transaction_data 스트림을 생성하는 예시입니다.

2. 데이터 쿼리하기

다음으로, 생성한 스트림을 쿼리하여 데이터를 확인해봅시다.

 
-- 거래 내역 쿼리
SELECT * FROM transaction_data EMIT CHANGES;

위 쿼리는 transaction_data 스트림으로부터 실시간으로 들어오는 거래 데이터를 출력합니다.

3. 데이터 필터링 및 집계

특정 조건에 맞는 데이터를 필터링하거나 집계해보겠습니다.

-- 특정 금액 이상의 거래 필터링
SELECT * FROM transaction_data WHERE amount > 100 EMIT CHANGES;

-- 시간 윈도우링을 이용한 거래 집계
SELECT COUNT(*) AS transaction_count, TUMBLE_END(timestamp, INTERVAL 1 HOUR) AS window_end
FROM transaction_data
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY TUMBLE(timestamp, INTERVAL 1 HOUR)
EMIT CHANGES;
 

위 코드는 거래 금액이 100 이상인 거래를 실시간으로 필터링하고, 시간별로 거래를 집계하는 예시입니다.

4. 결과 출력

쿼리 결과를 확인합니다.

 
+----+-----------+-------------+
| ID | AMOUNT    | TIMESTAMP   |
+----+-----------+-------------+
| 1  | 120.5     | 1641256800  |
| 2  | 150.2     | 1641258000  |
| ...| ...       | ...         |
+----+-----------+-------------+

+-----------------+---------------------+
| TRANSACTION_COUNT | WINDOW_END          |
+-----------------+---------------------+
| 50                | 1641256800          |
| 75                | 1641258000          |
| ...               | ...                 |
+-----------------+---------------------+

위 결과는 거래 금액이 100 이상인 데이터를 실시간으로 필터링한 결과와 시간별 거래를 집계한 예시 결과입니다.

320x100
320x100