1. 데이터 스트림 생성
CREATE STREAM user_events (id INT, event_name VARCHAR, timestamp BIGINT) WITH (KAFKA_TOPIC='user_events', VALUE_FORMAT='JSON');
이 예제는 user_events 스트림을 생성하고, 이벤트의 ID, 이름, 타임스탬프를 포함하는 JSON 데이터를 받아들이도록 정의합니다.
2. 데이터 필터링
CREATE STREAM high_value_events AS
SELECT *
FROM user_events
WHERE id > 100;
여기서는 user_events 스트림에서 ID가 100보다 큰 이벤트들을 high_value_events 스트림으로 필터링합니다.
3. 데이터 조인
CREATE STREAM enriched_events AS
SELECT ue.*, u.name
FROM user_events ue
LEFT JOIN user_data u ON ue.id = u.id;
이 쿼리는 user_events 스트림과 user_data 스트림을 조인하여 이벤트에 사용자 이름을 추가하여 enriched_events 스트림을 생성합니다.
4. 윈도우 함수를 이용한 집계
SELECT COUNT(*) AS event_count, TUMBLE_END(timestamp, INTERVAL 1 MINUTE) AS window_end
FROM user_events
WINDOW TUMBLING (SIZE 1 MINUTE)
GROUP BY TUMBLE(timestamp, INTERVAL 1 MINUTE);
이 쿼리는 1분 간격으로 윈도우링하여 이벤트를 집계하고, 각 윈도우의 이벤트 수를 반환합니다.
5. 조건부 데이터 처리
CREATE STREAM important_events AS
SELECT *
FROM user_events
WHERE event_name = 'important';
여기서는 이벤트 이름이 'important'인 이벤트들을 important_events 스트림으로 선택합니다.
6. 데이터 변환
CREATE STREAM modified_events AS
SELECT id, event_name, timestamp, 'modified' AS new_event
FROM user_events;
user_events 스트림의 데이터를 가져와 새로운 컬럼인 new_event를 추가하여 modified_events 스트림으로 변환합니다.
7. 특정 윈도우 내 데이터 집계
SELECT COUNT(*) AS event_count, HOP_END(timestamp, INTERVAL 5 MINUTE, INTERVAL 1 MINUTE) AS window_end
FROM user_events
WINDOW HOPPING (SIZE 5 MINUTES, ADVANCE BY 1 MINUTE)
GROUP BY HOP(timestamp, INTERVAL 5 MINUTE, INTERVAL 1 MINUTE);
user_events 스트림을 5분 간격으로 윈도우링하고, 1분씩 이동하며 이벤트를 집계합니다.
8. 데이터 스트림에 조건 추가
CREATE STREAM error_events AS
SELECT *
FROM user_events
WHERE event_name = 'error';
여기서는 이벤트 이름이 'error'인 이벤트들을 error_events 스트림으로 선택합니다.
9. 스트림 데이터 분할
CREATE STREAM split_events AS
SELECT *
FROM user_events
EMIT CHANGES
PARTITION BY id;
user_events 스트림을 ID별로 분할하여 split_events 스트림을 생성합니다.
10. 데이터 스트림 결합
CREATE STREAM combined_events AS
SELECT *
FROM user_events
UNION ALL
SELECT *
FROM important_events;
user_events 스트림과 important_events 스트림을 결합하여 combined_events 스트림을 생성합니다.
11. DISTINCT를 사용한 중복 제거
CREATE STREAM unique_events AS
SELECT DISTINCT id, event_name
FROM user_events;
여기서는 user_events 스트림에서 중복된 ID와 이벤트 이름을 제거하여 unique_events 스트림을 생성합니다.
12. 조인 후 필터링
CREATE STREAM filtered_enriched_events AS
SELECT *
FROM enriched_events
WHERE name IS NOT NULL;
enriched_events 스트림에서 조인한 후, 이름이 존재하는 데이터만 선택하여 filtered_enriched_events 스트림을 생성합니다.
13. MAP을 사용한 컬럼 추가
CREATE STREAM events_with_category AS
SELECT *, MAP('category', 'general') AS properties
FROM user_events;
user_events 스트림에 'properties'라는 맵 타입의 컬럼을 추가하여 events_with_category 스트림을 생성합니다.
14. 조건부 데이터 변환
CREATE STREAM converted_events AS
SELECT id, event_name,
CASE
WHEN event_name = 'error' THEN 'warning'
ELSE event_name
END AS converted_event
FROM user_events;
user_events 스트림에서 'error'인 이벤트를 'warning'으로 변환하여 converted_events 스트림을 생성합니다.
15. JOIN 후 윈도우 함수를 이용한 집계
SELECT COUNT(*) AS event_count, TUMBLE_END(timestamp, INTERVAL 5 MINUTE) AS window_end
FROM (
SELECT ue.*, u.name
FROM user_events ue
LEFT JOIN user_data u ON ue.id = u.id
)
WINDOW TUMBLING (SIZE 5 MINUTE)
GROUP BY TUMBLE(timestamp, INTERVAL 5 MINUTE);
조인한 결과에 대해 5분 간격으로 윈도우링하여 이벤트를 집계합니다.
16. 데이터 윈도우링 후 JOIN
SELECT COUNT(*) AS event_count, HOP_END(ue.timestamp, INTERVAL 10 MINUTE, INTERVAL 1 MINUTE) AS window_end
FROM user_events ue
LEFT JOIN user_data u WITHIN 10 MINUTES ON ue.id = u.id
GROUP BY HOP(ue.timestamp, INTERVAL 10 MINUTE, INTERVAL 1 MINUTE);
10분 간격으로 윈도우링한 데이터에 1분씩 이동하며 10분 내에 조인되는 이벤트를 집계합니다.
17. 조건부 데이터 통합
CREATE STREAM merged_events AS
SELECT *
FROM user_events
UNION
SELECT *
FROM important_events;
user_events 스트림과 important_events 스트림을 통합하여 중복을 제거한 merged_events 스트림을 생성합니다.
18. 데이터 변환 후 필터링
CREATE STREAM modified_filtered_events AS
SELECT *
FROM (
SELECT id, event_name, timestamp,
CASE
WHEN event_name = 'error' THEN 'warning'
ELSE event_name
END AS converted_event
FROM user_events
)
WHERE converted_event != 'warning';
user_events 스트림을 변환한 후 'warning'인 이벤트를 필터링하여 modified_filtered_events 스트림을 생성합니다.
19. JOIN 후 데이터 윈도우링
SELECT COUNT(*) AS event_count, TUMBLE_END(timestamp, INTERVAL 1 HOUR) AS window_end
FROM (
SELECT ue.*, u.name
FROM user_events ue
LEFT JOIN user_data u ON ue.id = u.id
)
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY TUMBLE(timestamp, INTERVAL 1 HOUR);
조인한 결과에 대해 1시간 간격으로 윈도우링하여 이벤트를 집계합니다.
20. 데이터 스트림 분할 후 조건부 필터링
CREATE STREAM split_filtered_events AS
SELECT *
FROM (
SELECT *
FROM user_events
EMIT CHANGES
PARTITION BY id
)
WHERE event_name = 'error';
ID별로 스트림을 분할한 후 'error'인 이벤트를 필터링하여 split_filtered_events 스트림을 생성합니다.
'빅데이터 관련 자료 > [SQL] Basic' 카테고리의 다른 글
4장 - KSQL 고급 기능과 최적화 (0) | 2024.01.11 |
---|---|
3장 - KSQL을 활용한 실전 튜토리얼: 스트리밍 데이터 처리 (0) | 2024.01.10 |
1장 - KSQL: 카프카 스트리밍을 위한 SQL (0) | 2024.01.06 |
PostgreSQL과 MySQL 차이 (0) | 2023.12.11 |
NoSQL 도대체 뭘까? (0) | 2023.08.28 |