2장 - KSQL: 카프카 스트리밍을 위한 SQL Basic
Data Analyst

빅데이터 관련 자료/[SQL] Basic

2장 - KSQL: 카프카 스트리밍을 위한 SQL Basic

carpe08 2024. 1. 8. 12:39
320x100
320x100

1. 데이터 스트림 생성

 
CREATE STREAM user_events (id INT, event_name VARCHAR, timestamp BIGINT) WITH (KAFKA_TOPIC='user_events', VALUE_FORMAT='JSON');

이 예제는 user_events 스트림을 생성하고, 이벤트의 ID, 이름, 타임스탬프를 포함하는 JSON 데이터를 받아들이도록 정의합니다.

2. 데이터 필터링

CREATE STREAM high_value_events AS
SELECT *
FROM user_events
WHERE id > 100;
 

여기서는 user_events 스트림에서 ID가 100보다 큰 이벤트들을 high_value_events 스트림으로 필터링합니다.

3. 데이터 조인

CREATE STREAM enriched_events AS
SELECT ue.*, u.name
FROM user_events ue
LEFT JOIN user_data u ON ue.id = u.id;
 

이 쿼리는 user_events 스트림과 user_data 스트림을 조인하여 이벤트에 사용자 이름을 추가하여 enriched_events 스트림을 생성합니다.

4. 윈도우 함수를 이용한 집계

SELECT COUNT(*) AS event_count, TUMBLE_END(timestamp, INTERVAL 1 MINUTE) AS window_end
FROM user_events
WINDOW TUMBLING (SIZE 1 MINUTE)
GROUP BY TUMBLE(timestamp, INTERVAL 1 MINUTE);
 

이 쿼리는 1분 간격으로 윈도우링하여 이벤트를 집계하고, 각 윈도우의 이벤트 수를 반환합니다.

5. 조건부 데이터 처리

CREATE STREAM important_events AS
SELECT *
FROM user_events
WHERE event_name = 'important';
 

여기서는 이벤트 이름이 'important'인 이벤트들을 important_events 스트림으로 선택합니다.

6. 데이터 변환

 
CREATE STREAM modified_events AS
SELECT id, event_name, timestamp, 'modified' AS new_event
FROM user_events;

user_events 스트림의 데이터를 가져와 새로운 컬럼인 new_event를 추가하여 modified_events 스트림으로 변환합니다.

7. 특정 윈도우 내 데이터 집계

 
SELECT COUNT(*) AS event_count, HOP_END(timestamp, INTERVAL 5 MINUTE, INTERVAL 1 MINUTE) AS window_end
FROM user_events
WINDOW HOPPING (SIZE 5 MINUTES, ADVANCE BY 1 MINUTE)
GROUP BY HOP(timestamp, INTERVAL 5 MINUTE, INTERVAL 1 MINUTE);

user_events 스트림을 5분 간격으로 윈도우링하고, 1분씩 이동하며 이벤트를 집계합니다.

8. 데이터 스트림에 조건 추가

 
CREATE STREAM error_events AS
SELECT *
FROM user_events
WHERE event_name = 'error';

여기서는 이벤트 이름이 'error'인 이벤트들을 error_events 스트림으로 선택합니다.

9. 스트림 데이터 분할

 
CREATE STREAM split_events AS
SELECT *
FROM user_events
EMIT CHANGES
PARTITION BY id;

user_events 스트림을 ID별로 분할하여 split_events 스트림을 생성합니다.

10. 데이터 스트림 결합

 
CREATE STREAM combined_events AS
SELECT *
FROM user_events
UNION ALL
SELECT *
FROM important_events;

user_events 스트림과 important_events 스트림을 결합하여 combined_events 스트림을 생성합니다.

11. DISTINCT를 사용한 중복 제거

 
CREATE STREAM unique_events AS
SELECT DISTINCT id, event_name
FROM user_events;

여기서는 user_events 스트림에서 중복된 ID와 이벤트 이름을 제거하여 unique_events 스트림을 생성합니다.

12. 조인 후 필터링

 
CREATE STREAM filtered_enriched_events AS
SELECT *
FROM enriched_events
WHERE name IS NOT NULL;

enriched_events 스트림에서 조인한 후, 이름이 존재하는 데이터만 선택하여 filtered_enriched_events 스트림을 생성합니다.

13. MAP을 사용한 컬럼 추가

 
CREATE STREAM events_with_category AS
SELECT *, MAP('category', 'general') AS properties
FROM user_events;

user_events 스트림에 'properties'라는 맵 타입의 컬럼을 추가하여 events_with_category 스트림을 생성합니다.

14. 조건부 데이터 변환

 
CREATE STREAM converted_events AS
SELECT id, event_name,
    CASE
        WHEN event_name = 'error' THEN 'warning'
        ELSE event_name
    END AS converted_event
FROM user_events;

user_events 스트림에서 'error'인 이벤트를 'warning'으로 변환하여 converted_events 스트림을 생성합니다.

15. JOIN 후 윈도우 함수를 이용한 집계

 
SELECT COUNT(*) AS event_count, TUMBLE_END(timestamp, INTERVAL 5 MINUTE) AS window_end
FROM (
    SELECT ue.*, u.name
    FROM user_events ue
    LEFT JOIN user_data u ON ue.id = u.id
)
WINDOW TUMBLING (SIZE 5 MINUTE)
GROUP BY TUMBLE(timestamp, INTERVAL 5 MINUTE);

조인한 결과에 대해 5분 간격으로 윈도우링하여 이벤트를 집계합니다.

16. 데이터 윈도우링 후 JOIN

 
SELECT COUNT(*) AS event_count, HOP_END(ue.timestamp, INTERVAL 10 MINUTE, INTERVAL 1 MINUTE) AS window_end
FROM user_events ue
LEFT JOIN user_data u WITHIN 10 MINUTES ON ue.id = u.id
GROUP BY HOP(ue.timestamp, INTERVAL 10 MINUTE, INTERVAL 1 MINUTE);

10분 간격으로 윈도우링한 데이터에 1분씩 이동하며 10분 내에 조인되는 이벤트를 집계합니다.

17. 조건부 데이터 통합

 
CREATE STREAM merged_events AS
SELECT *
FROM user_events
UNION
SELECT *
FROM important_events;

user_events 스트림과 important_events 스트림을 통합하여 중복을 제거한 merged_events 스트림을 생성합니다.

18. 데이터 변환 후 필터링

 
CREATE STREAM modified_filtered_events AS
SELECT *
FROM (
    SELECT id, event_name, timestamp,
        CASE
            WHEN event_name = 'error' THEN 'warning'
            ELSE event_name
        END AS converted_event
    FROM user_events
)
WHERE converted_event != 'warning';

user_events 스트림을 변환한 후 'warning'인 이벤트를 필터링하여 modified_filtered_events 스트림을 생성합니다.

19. JOIN 후 데이터 윈도우링

 
SELECT COUNT(*) AS event_count, TUMBLE_END(timestamp, INTERVAL 1 HOUR) AS window_end
FROM (
    SELECT ue.*, u.name
    FROM user_events ue
    LEFT JOIN user_data u ON ue.id = u.id
)
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY TUMBLE(timestamp, INTERVAL 1 HOUR);

조인한 결과에 대해 1시간 간격으로 윈도우링하여 이벤트를 집계합니다.

20. 데이터 스트림 분할 후 조건부 필터링

 
CREATE STREAM split_filtered_events AS
SELECT *
FROM (
    SELECT *
    FROM user_events
    EMIT CHANGES
    PARTITION BY id
)
WHERE event_name = 'error';

ID별로 스트림을 분할한 후 'error'인 이벤트를 필터링하여 split_filtered_events 스트림을 생성합니다.

320x100
320x100