1119
[13주차 - Day5] Spark 실습 과제
ML Pipeline 기반 머신러닝 모델 만들기
# 실습
필요한 패키지 다운로드
!pip install pyspark==3.0.1 py4j==0.10.9
Spark 생성
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Titanic Classification via ML Pipeline and Model Selection") \
.getOrCreate()
데이터셋 다운로드
!wget https://s3-geospatial.s3-us-west-2.amazonaws.com/titanic.csv
data = spark.read.csv('./titanic.csv', header=True, inferSchema=True)
data.printSchema() # 데이터셋 스키마 확인
data.show() # 데이터셋 형태 확인
변형해야 할 데이터 존재 확인
data.select(['*']).describe().show()
데이터 클린업
# PassengerID, Name, Ticket, Embarked는 사용하지 않을 예정 (아무 의미 없음)
# Cabin도 비어 있는 값이 너무 많아서 사용하지 않을 예정
# Age는 중요한 정보인데 비어 있는 레코드가 많아서 디폴트값을 채워 줄 예정
# Gender의 경우 카에고리 정보이기에 숫자로 인코딩 필요
final_data = data.select(['Survived', 'Pclass', 'Gender', 'Age', 'SibSp', 'Parch', 'Fare'])
# Age는 평균값으로 채운다
from pyspark.ml.feature import Imputer
imputer = Imputer(strategy='mean', inputCols=['Age'], outputCols=['AgeImputed'])
imputer_model = imputer.fit(final_data)
final_data = imputer_model.transform(final_data)
# 성별 정보 인코딩: male -> 0, female -> 1
from pyspark.ml.feature import StringIndexer
gender_indexer = StringIndexer(inputCols=['Gender'], outputCols=['GenderIndexed'])
gender_indexer_model = gender_indexer.fit(final_data)
final_data = gender_indexer_model.transform(final_data)
피쳐 벡터 만들기
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['Survived', 'Pclass', 'SibSp', 'Parch', 'Fare', 'AgeImputed', 'GenderIndexed'], outputCol="features")
data_vec = assembler.transform(final_data)
data_vec.show() # 제일 마지막에 추가된 feature 행 확인
Age와 Fare 값 스케일하는 것이 주요 목표
from pyspark.ml.feature import MinMaxScaler
age_scaler = MinMaxScaler(inputCol="features", outputCol="features_scaled")
age_scaler_model = age_scaler.fit(data_vec)
data_vec = age_scaler_model.transform(data_vec)
data_vec.select("features", "features_scaled").show()
훈련용과 테스트용 데이터를 나누고 Binary classification 모델 만들기
train, test = data_2.randomSplit([0.7, 0.3])
from pyspark.ml.classification import LogisticRegression
algo = LogisticRegression(featuresCol="features", labelCol="Survived")
model = algo.fit(train)
모델 성능 측정
predictions = model.transform(test)
predictions.groupby(['Survived']).count().collect()
predictions.groupby(['prediction']).count().collect()
predictions.groupby(['Survived', 'prediction', 'probability']).count().collect()
predictions.select(['Survived', 'prediction', 'probability']).show()
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol='Survived', metricName='areaUnderROC')
evaluator.evaluate(predictions)
그래프로 그려서 확인하기
import matplotlib.pyplot as plt
plt.figure(figsize=(5,5))
plt.plot([0, 1], [0, 1], 'r--')
plt.plot(model.summary.roc.select('FPR').collect(),
model.summary.roc.select('TPR').collect())
plt.xlabel('FPR')
plt.ylabel('TPR')
plt.show()
ML Pipeline 만들기
from pyspark.ml.feature import Imputer, StringIndexer, VectorAssembler, MinMaxScaler
# Gender
stringIndexer = StringIndexer(inputCol = "Gender", outputCol = 'GenderIndexed')
# Age
imputer = Imputer(strategy='mean', inputCols=['Age'], outputCols=['AgeImputed'])
# Vectorize
inputCols = ['Pclass', 'SibSp', 'Parch', 'Fare', 'AgeImputed', 'GenderIndexed']
assembler = VectorAssembler(inputCols=inputCols, outputCol="features")
# MinMaxScaler
minmax_scaler = MinMaxScaler(inputCol="features", outputCol="features_scaled")
stages = [stringIndexer, imputer, assembler, minmax_scaler]
from pyspark.ml.classification import LogisticRegression
algo = LogisticRegression(featuresCol="features_scaled", labelCol="Survived")
lr_stages = stages + [algo]
lr_stages # 담긴 값 확인
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = lr_stages)
df = data.select(['Survived', 'Pclass', 'Gender', 'Age', 'SibSp', 'Parch', 'Fare'])
df.show()
훈련/테스트 나누기
train, test = df.randomSplit([0.7, 0.3])
lr_model = pipeline.fit(train)
lr_cv_predictions = lr_model.transform(test)
evaluator.evaluate(lr_cv_predictions)
ML Tuning
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol='Survived', metricName='areaUnderROC')
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder()
.addGrid(algo.maxIter, [1, 5, 10])
.build())
cv = CrossValidator(
estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=evaluator,
numFolds=5
)
cvModel = cv.fit(train) # 모델 학습
lr_cv_predictions = cvModel.transform(test)
evaluator.evaluate(lr_cv_predictions)
lr_cv_predictions.select("prediction", "survived").show()
import pandas as pd
params = [{p.name: v for p, v in m.items()} for m in cvModel.getEstimatorParamMaps()]
pd.DataFrame.from_dict([
{cvModel.getEvaluator().getMetricName(): metric, **ps}
for ps, metric in zip(params, cvModel.avgMetrics)
])
'프로그래머스 데브 코스 > TIL' 카테고리의 다른 글
[6기] 프로그래머스 인공지능 데브코스 82일차 TIL (0) | 2023.11.21 |
---|---|
[6기] 프로그래머스 인공지능 데브코스 81일차 TIL (0) | 2023.11.20 |
[6기] 프로그래머스 인공지능 데브코스 79일차 TIL (0) | 2023.11.18 |
[6기] 프로그래머스 인공지능 데브코스 78일차 TIL (0) | 2023.11.17 |
[6기] 프로그래머스 인공지능 데브코스 77일차 TIL (0) | 2023.11.16 |