from pyspark.sql import SparkSession
SparkSession 생성
= (
spark
SparkSession
.builder'ml')
.appName(
.getOrCreate() )
= spark.read.parquet('./src/sf-airbnb-clean.parquet') df
len(df.columns)
34
칼럼이 너무나 많아 정신이 없습니다. 테스트를 위한 모델 설계이니 bedrooms
와 bathrooms
만 가지고 price
를 예측하는 모델을 만들어 보겠습니다.
(df'bedrooms', 'bathrooms', 'price')
.select(5)
.show( )
[Stage 1:> (0 + 1) / 1]
+--------+---------+-----+
|bedrooms|bathrooms|price|
+--------+---------+-----+
| 1.0| 1.0|170.0|
| 2.0| 1.0|235.0|
| 1.0| 4.0| 65.0|
| 1.0| 4.0| 65.0|
| 2.0| 1.5|785.0|
+--------+---------+-----+
only showing top 5 rows
학습 및 테스트 데이터셋 생성
학습 및 테스트 데이터셋을 위한 분리는 DataFrame.randomSplit
을 사용합니다. 재현성 유지를 위해 seed
를 지정합니다.
= df.randomSplit([0.8,0.2], seed=42) train, test
변환기: 알맞은 데이터 형태 갖추기
scikit-learn
와는 달리, spark에서는 VecAssembler
를 통해 모델 구축에 사용할 피쳐를 한데 모아 nested list
의 모양으로 만듭니다.
from pyspark.ml.feature import VectorAssembler
= VectorAssembler(inputCols=['bedrooms', 'bathrooms'], outputCol='features') vecAssembler
= vecAssembler.transform(train) vecTrain
(vecTrain'bedrooms', 'bathrooms', 'features', 'price')
.select(5)
.show( )
22/11/13 20:40:01 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+--------+---------+---------+-----+
|bedrooms|bathrooms| features|price|
+--------+---------+---------+-----+
| 1.0| 1.0|[1.0,1.0]|200.0|
| 1.0| 1.0|[1.0,1.0]|130.0|
| 1.0| 1.0|[1.0,1.0]| 95.0|
| 1.0| 1.0|[1.0,1.0]|250.0|
| 3.0| 3.0|[3.0,3.0]|250.0|
+--------+---------+---------+-----+
only showing top 5 rows
[Stage 2:> (0 + 1) / 1]
위의 모습처럼 bedrooms와 bathrooms가 각각 1.0
, 1.0
일 때, features가 [1.0, 1.0]
으로 모인 것을 볼 수 있습니다.
추정기: Linear Regression
from pyspark.ml.regression import LinearRegression
= LinearRegression(featuresCol='features', labelCol='price') lr
%time
= lr.fit(vecTrain) lrModel
CPU times: user 3 µs, sys: 1 µs, total: 4 µs
Wall time: 6.91 µs
22/11/13 20:40:06 WARN Instrumentation: [2c60d946] regParam is zero, which might cause numerical instability and overfitting.
22/11/13 20:40:07 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/11/13 20:40:07 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/11/13 20:40:07 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
Pipeline으로 test에도 동일하게, 더 쉽게
vecAssembler
로 feature
칼럼을 확정해주고, lr(추정기)
에 적용하는 과정을 Pipeline
으로 연결해 줍니다.
from pyspark.ml import Pipeline
= Pipeline(stages=[vecAssembler, lr]) pipeline
= pipeline.fit(train) pipelineModel
= pipelineModel.transform(test) testDF
(testDF'bedrooms', 'bathrooms', 'features', 'price', 'prediction')
.select(10)
.show( )
+--------+---------+---------+------+------------------+
|bedrooms|bathrooms| features| price| prediction|
+--------+---------+---------+------+------------------+
| 1.0| 1.0|[1.0,1.0]| 85.0|167.92639278679684|
| 1.0| 1.0|[1.0,1.0]| 45.0|167.92639278679684|
| 1.0| 1.0|[1.0,1.0]| 70.0|167.92639278679684|
| 1.0| 1.0|[1.0,1.0]| 128.0|167.92639278679684|
| 1.0| 1.0|[1.0,1.0]| 159.0|167.92639278679684|
| 2.0| 1.0|[2.0,1.0]| 250.0|286.22970084273163|
| 1.0| 1.0|[1.0,1.0]| 99.0|167.92639278679684|
| 1.0| 1.0|[1.0,1.0]| 95.0|167.92639278679684|
| 1.0| 1.0|[1.0,1.0]| 100.0|167.92639278679684|
| 1.0| 1.0|[1.0,1.0]|2010.0|167.92639278679684|
+--------+---------+---------+------+------------------+
only showing top 10 rows
예측값 열이 생성된 것(prediction)을 볼 수 있습니다. 많은 feature를 사용하지 않아 실제값과의 차이는 크지만 추가적인 Feature를 넣어준다면 차이가 별로 나지 않을 것입니다.