from pyspark.sql import SparkSessionSpark로 머신러닝 모델 샘플 만들기
SparkSession 생성
spark = (
SparkSession
.builder
.appName('ml')
.getOrCreate()
)df = spark.read.parquet('./src/sf-airbnb-clean.parquet')
len(df.columns)34
칼럼이 너무나 많아 정신이 없습니다. 테스트를 위한 모델 설계이니 bedrooms와 bathrooms만 가지고 price를 예측하는 모델을 만들어 보겠습니다.
(df
.select('bedrooms', 'bathrooms', 'price')
.show(5)
)[Stage 1:> (0 + 1) / 1]
+--------+---------+-----+
|bedrooms|bathrooms|price|
+--------+---------+-----+
| 1.0| 1.0|170.0|
| 2.0| 1.0|235.0|
| 1.0| 4.0| 65.0|
| 1.0| 4.0| 65.0|
| 2.0| 1.5|785.0|
+--------+---------+-----+
only showing top 5 rows
학습 및 테스트 데이터셋 생성
학습 및 테스트 데이터셋을 위한 분리는 DataFrame.randomSplit을 사용합니다. 재현성 유지를 위해 seed를 지정합니다.
train, test = df.randomSplit([0.8,0.2], seed=42)변환기: 알맞은 데이터 형태 갖추기
scikit-learn와는 달리, spark에서는 VecAssembler를 통해 모델 구축에 사용할 피쳐를 한데 모아 nested list의 모양으로 만듭니다.
from pyspark.ml.feature import VectorAssemblervecAssembler = VectorAssembler(inputCols=['bedrooms', 'bathrooms'], outputCol='features')vecTrain = vecAssembler.transform(train)(vecTrain
.select('bedrooms', 'bathrooms', 'features', 'price')
.show(5)
)22/11/13 20:40:01 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 2:> (0 + 1) / 1]
+--------+---------+---------+-----+
|bedrooms|bathrooms| features|price|
+--------+---------+---------+-----+
| 1.0| 1.0|[1.0,1.0]|200.0|
| 1.0| 1.0|[1.0,1.0]|130.0|
| 1.0| 1.0|[1.0,1.0]| 95.0|
| 1.0| 1.0|[1.0,1.0]|250.0|
| 3.0| 3.0|[3.0,3.0]|250.0|
+--------+---------+---------+-----+
only showing top 5 rows
위의 모습처럼 bedrooms와 bathrooms가 각각 1.0, 1.0일 때, features가 [1.0, 1.0]으로 모인 것을 볼 수 있습니다.
추정기: Linear Regression
from pyspark.ml.regression import LinearRegressionlr = LinearRegression(featuresCol='features', labelCol='price')%time
lrModel = lr.fit(vecTrain)CPU times: user 3 µs, sys: 1 µs, total: 4 µs
Wall time: 6.91 µs
22/11/13 20:40:06 WARN Instrumentation: [2c60d946] regParam is zero, which might cause numerical instability and overfitting.
22/11/13 20:40:07 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/11/13 20:40:07 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/11/13 20:40:07 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
Pipeline으로 test에도 동일하게, 더 쉽게
vecAssembler로 feature 칼럼을 확정해주고, lr(추정기)에 적용하는 과정을 Pipeline으로 연결해 줍니다.
from pyspark.ml import Pipelinepipeline = Pipeline(stages=[vecAssembler, lr])pipelineModel = pipeline.fit(train)testDF = pipelineModel.transform(test)(testDF
.select('bedrooms', 'bathrooms', 'features', 'price', 'prediction')
.show(10)
)+--------+---------+---------+------+------------------+
|bedrooms|bathrooms| features| price| prediction|
+--------+---------+---------+------+------------------+
| 1.0| 1.0|[1.0,1.0]| 85.0|167.92639278679684|
| 1.0| 1.0|[1.0,1.0]| 45.0|167.92639278679684|
| 1.0| 1.0|[1.0,1.0]| 70.0|167.92639278679684|
| 1.0| 1.0|[1.0,1.0]| 128.0|167.92639278679684|
| 1.0| 1.0|[1.0,1.0]| 159.0|167.92639278679684|
| 2.0| 1.0|[2.0,1.0]| 250.0|286.22970084273163|
| 1.0| 1.0|[1.0,1.0]| 99.0|167.92639278679684|
| 1.0| 1.0|[1.0,1.0]| 95.0|167.92639278679684|
| 1.0| 1.0|[1.0,1.0]| 100.0|167.92639278679684|
| 1.0| 1.0|[1.0,1.0]|2010.0|167.92639278679684|
+--------+---------+---------+------+------------------+
only showing top 10 rows
예측값 열이 생성된 것(prediction)을 볼 수 있습니다. 많은 feature를 사용하지 않아 실제값과의 차이는 크지만 추가적인 Feature를 넣어준다면 차이가 별로 나지 않을 것입니다.