PySpark로 데이터의 결측치 처리와 전처리 과정
Data Analyst

빅데이터 관련 자료/Machine Learning

PySpark로 데이터의 결측치 처리와 전처리 과정

carpe08 2023. 11. 27. 17:53
320x100
320x100

데이터 분석에서 결측치 처리와 전처리는 매우 중요한 단계입니다.

이번 글에서는 PySpark를 사용하여 데이터의 결측치를 처리하고 전처리하는 과정을 알아보겠습니다.

1. 데이터셋 불러오기

우선, PySpark를 활용하여 데이터셋을 불러오겠습니다. 예를 들어, CSV 파일을 읽어들여 PySpark DataFrame으로 변환하는 과정을 보여줄 것입니다.

from pyspark.sql import SparkSession

# SparkSession 생성
spark = SparkSession.builder.appName('data_preprocessing').getOrCreate()

# CSV 파일을 읽어 DataFrame으로 변환
df = spark.read.csv('파일경로/data.csv', header=True, inferSchema=True)

2. 결측치 처리

다음으로는 데이터셋에 있는 결측치를 처리할 차례입니다. PySpark에서는 na 모듈을 사용하여 결측치를 다룰 수 있습니다. 예를 들어, 결측치를 삭제하거나 다른 값으로 대체하는 방법을 살펴볼 것입니다.

# 결측치 제거
df_no_missing = df.dropna()

# 특정 컬럼의 결측치 대체
df_filled = df.fillna({'column_name': 'replacement_value'})

3. 전처리 과정

마지막으로, 데이터를 분석하기 전에 전처리 단계를 진행합니다. 여기서는 데이터 형식 변환, 이상치 처리, 데이터 스케일링 등의 전처리 작업을 다뤄볼 것입니다.

# 데이터 형식 변환
df = df.withColumn('numeric_column', df['numeric_column'].cast('float'))

# 이상치 처리 (예시)
q1 = df.approxQuantile('numeric_column', [0.25], 0.01)[0]
q3 = df.approxQuantile('numeric_column', [0.75], 0.01)[0]
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
df_no_outliers = df.filter((df['numeric_column'] >= lower_bound) & (df['numeric_column'] <= upper_bound))

# 데이터 스케일링 (예시)
from pyspark.ml.feature import MinMaxScaler
scaler = MinMaxScaler(inputCol='numeric_column', outputCol='scaled_column')
scaler_model = scaler.fit(df)
df_scaled = scaler_model.transform(df)

4. 범주형 데이터 다루기

일반적으로 데이터셋에는 범주형 데이터가 포함되어 있습니다. PySpark에서는 이를 처리하기 위해 원핫인코딩(One-Hot Encoding)과 같은 방법을 사용할 수 있습니다.

from pyspark.ml.feature import StringIndexer, OneHotEncoder

# 범주형 데이터를 숫자형으로 변환
indexer = StringIndexer(inputCol='categorical_column', outputCol='indexed')
indexed_df = indexer.fit(df).transform(df)

# One-Hot Encoding 적용
encoder = OneHotEncoder(inputCol='indexed', outputCol='encoded')
encoded_df = encoder.transform(indexed_df)

5. 데이터 조작 (Join 등)

때로는 여러 데이터셋을 조합하거나 결합해야 할 때가 있습니다. PySpark를 사용하면 다양한 데이터 조작을 수행할 수 있습니다.

 
# 두 개의 데이터프레임을 조인
joined_df = df1.join(df2, df1['key'] == df2['key'], 'inner')

# 다른 데이터프레임과 결합
union_df = df1.union(df2)

6. Feature Scaling (특성 스케일링)

일부 머신 러닝 알고리즘은 데이터의 스케일에 민감할 수 있습니다. 이때 특성 스케일링을 통해 데이터의 범위를 일정하게 조정하는 것이 중요합니다.

 
from pyspark.ml.feature import StandardScaler

# StandardScaler를 사용한 특성 스케일링
scaler = StandardScaler(inputCol='features', outputCol='scaled_features', withStd=True, withMean=True)
scaler_model = scaler.fit(df)
scaled_df = scaler_model.transform(df)

7. Feature Engineering (특성 공학)

데이터에서 유용한 정보를 추출하거나 새로운 특성을 생성하는 것이 중요합니다. PySpark를 사용하여 기존 특성을 활용해 새로운 특성을 생성하거나 변환할 수 있습니다.

from pyspark.sql.functions import col

# 새로운 특성 생성
df = df.withColumn('new_feature', col('feature1') * col('feature2'))

# 날짜 데이터를 기반으로 한 새로운 특성 생성
from pyspark.sql.functions import year, month, dayofweek
df = df.withColumn('year', year('date_column'))
df = df.withColumn('month', month('date_column'))
df = df.withColumn('dayofweek', dayofweek('date_column'))

이러한 추가적인 내용들을 다룬다면, 데이터 분석과 머신 러닝 모델링에 있어서 데이터의 품질을 향상시키고 모델의 성능을 높일 수 있는 다양한 전략을 소개할 수 있을 것입니다.

이렇게 PySpark를 사용하여 데이터의 결측치를 처리하고 전처리하는 과정을 살펴봤습니다.

이러한 전처리 작업은 데이터 분석 및 머신 러닝 모델링의 핵심 단계 중 하나로, 데이터 품질을 향상시키고 정확도를 높이는 데 중요한 역할을 합니다.

320x100
320x100