Prepare data¶
In [1]:
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql import Row, functions as fn
import pyspark.ml.feature as ft
import pandas as pd
import numpy as np
import time
import warnings
# Setting configuration.
warnings.filterwarnings('ignore')
SEED = 42
In [2]:
# Use 0.11.4-spark3.3 version for Spark3.3 and 1.0.2 version for Spark3.4
spark = SparkSession.builder.appName("MyApp") \
.config("spark.driver.memory", "5g") \
.config("spark.driver.cores", "2") \
.config("spark.executor.memory", "5g") \
.config("spark.executor.cores", "2") \
.enableHiveSupport() \
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('ERROR')
24/04/26 21:44:36 WARN Utils: Your hostname, MacBook-Air resolves to a loopback address: 127.0.0.1; using 192.168.1.4 instead (on interface en0) 24/04/26 21:44:36 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 24/04/26 21:44:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
In [3]:
# Load dataset
path = '/Users/***/Documents/Project/datasets/Home-Credit-Default-Risk/prepared_data.csv'
data = spark.read.format("csv").option("header", True).load(f"file://{path}")
In [4]:
# print dataset size
print("records read: " + str(data.count()))
records read: 307511
In [5]:
# Check if the data is unbalanced
data.groupBy("TARGET").count().show()
+------+------+ |TARGET| count| +------+------+ | 0|282686| | 1| 24825| +------+------+
In [6]:
for colname in data.columns:
data = data.withColumn(colname, data[colname].cast('float'))
In [7]:
# Add featurizer to convert features to vector
feature_name = data.columns[1:-1]
assembler = VectorAssembler(
inputCols=feature_name,
outputCol="features"
)
data = assembler.transform(data)['features', 'TARGET']
In [8]:
# Split the data into train and test.
train, test = data.randomSplit([0.75, 0.25], seed=SEED)
Spark MLlib¶
In [9]:
from pyspark.ml.classification import RandomForestClassifier
gbt = RandomForestClassifier(
labelCol="TARGET",
featuresCol="features",
maxDepth=8,
numTrees=500,
subsamplingRate=1.0,
featureSubsetStrategy='auto',
seed=SEED
)
# Train a GBT model.
model = gbt.fit(train)
# Select (prediction, true label) and compute areaUnderROC
evaluator = BinaryClassificationEvaluator(
labelCol="TARGET",
metricName='areaUnderROC'
)
train_auc = evaluator.evaluate(model.transform(train))
test_auc = evaluator.evaluate(model.transform(test))
print(f"Train auc: {train_auc:.4f}")
print(f"Test auc: {test_auc:.4f}")
Java HotSpot(TM) 64-Bit Server VM warning: CodeCache is full. Compiler has been disabled. Java HotSpot(TM) 64-Bit Server VM warning: Try increasing the code cache size using -XX:ReservedCodeCacheSize=
CodeCache: size=131072Kb used=38814Kb max_used=39023Kb free=92257Kb bounds [0x000000010464c000, 0x0000000106c9c000, 0x000000010c64c000] total_blobs=13345 nmethods=12309 adapters=949 compilation: disabled (not enough contiguous free space left)
Train auc: 0.7526 Test auc: 0.7235
In [10]:
feature_imp = pd.Series(
model.featureImportances.toArray(),
index=assembler.getInputCols()
).sort_values(ascending=False)
print(feature_imp.head(20))
EXT_SOURCE_2 0.183568 EXT_SOURCE_3 0.175979 EXT_SOURCE_1 0.094980 DAYS_EMPLOYED 0.050050 OCCUPATION_TYPE 0.032153 DAYS_BIRTH 0.032032 NAME_EDUCATION_TYPE 0.025601 DAYS_LAST_PHONE_CHANGE 0.022394 AMT_GOODS_PRICE 0.019779 REGION_RATING_CLIENT_W_CITY 0.014936 CODE_GENDER_M 0.014736 REGION_RATING_CLIENT 0.012078 ORGANIZATION_TYPE 0.011209 AMT_CREDIT 0.010922 NAME_INCOME_TYPE_Working 0.010745 DAYS_ID_PUBLISH 0.010505 FLAG_DOCUMENT_3 0.009315 OWN_CAR_AGE 0.009004 AMT_ANNUITY 0.007916 TOTALAREA_MODE 0.007510 dtype: float64
XGBoost with spark¶
In [11]:
from xgboost.spark import SparkXGBClassifier
import xgboost as xgb
train = train.withColumn('isVal', fn.rand() < 0.2)
xgb_clf = SparkXGBClassifier(
features_col='features',
label_col='TARGET',
# validation_indicator_col='isVal',
eval_metric='auc',
scale_pos_weight=11,
learning_rate=0.015,
max_depth=8,
subsample=1.0,
colsample_bytree=0.35,
reg_alpha=65,
reg_lambda=15,
# early_stopping_rounds=20,
n_estimators=1200,
verbosity=0
)
xgb_model = xgb_clf.fit(train)
train_auc = evaluator.evaluate(xgb_model.transform(train))
test_auc = evaluator.evaluate(xgb_model.transform(test))
print(f"Train auc: {train_auc:.4f}")
print(f"Test auc: {test_auc:.4f}")
2024-04-26 21:48:31,541 INFO XGBoost-PySpark: _fit Running xgboost-2.0.3 on 1 workers with booster params: {'objective': 'binary:logistic', 'colsample_bytree': 0.35, 'device': 'cpu', 'learning_rate': 0.015, 'max_depth': 8, 'reg_alpha': 65, 'reg_lambda': 15, 'scale_pos_weight': 11, 'subsample': 1.0, 'verbosity': 0, 'eval_metric': 'auc', 'nthread': 1} train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 1200} dmatrix_kwargs: {'nthread': 1, 'missing': nan} [21:48:37] task 0 got new rank 0 (0 + 1) / 1] 2024-04-26 21:50:12,299 INFO XGBoost-PySpark: _fit Finished xgboost training! INFO:XGBoost-PySpark:Do the inference on the CPUs (0 + 8) / 8] 2024-04-26 21:52:34,899 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs 2024-04-26 21:52:35,749 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs 2024-04-26 21:52:35,830 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs 2024-04-26 21:52:35,838 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs 2024-04-26 21:52:35,882 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs 2024-04-26 21:52:35,895 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs 2024-04-26 21:52:35,920 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs 2024-04-26 21:57:30,388 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs 2024-04-26 21:57:30,806 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs 2024-04-26 21:57:30,938 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs 2024-04-26 21:57:30,941 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs INFO:XGBoost-PySpark:Do the inference on the CPUs 2024-04-26 21:57:30,982 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs 2024-04-26 21:57:31,003 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs 2024-04-26 21:57:31,007 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
Train auc: 0.8524 Test auc: 0.7626
In [12]:
feature_imp = xgb_model.get_feature_importances()
indices = [int(name[1:]) for name in feature_imp.keys()]
feature_imp = pd.Series(
feature_imp.values(),
index=np.array(feature_name)[indices]
).sort_values(ascending=False)
print(feature_imp.head(20))
DAYS_BIRTH 5801.0 AMT_ANNUITY 5501.0 DAYS_REGISTRATION 5303.0 EXT_SOURCE_2 5275.0 DAYS_ID_PUBLISH 5231.0 EXT_SOURCE_3 4985.0 AMT_CREDIT 4893.0 DAYS_LAST_PHONE_CHANGE 4798.0 DAYS_EMPLOYED 4634.0 EXT_SOURCE_1 4610.0 anomaly_score 4529.0 AMT_GOODS_PRICE 4387.0 AMT_INCOME_TOTAL 3886.0 REGION_POPULATION_RELATIVE 3813.0 ORGANIZATION_TYPE 3130.0 OWN_CAR_AGE 3017.0 HOUR_APPR_PROCESS_START 2845.0 OCCUPATION_TYPE 2504.0 AMT_REQ_CREDIT_BUREAU_YEAR 1975.0 TOTALAREA_MODE 1655.0 dtype: float64
LightGBM with spark¶
In [ ]:
from synapse.ml.lightgbm import LightGBMClassifier
import lightgbm as lgb
lgb_clf = LightGBMClassifier(
featuresCol="features",
labelCol="TARGET",
boostingType='gbdt',
objective='binary',
metric='auc',
isUnbalance=True,
learningRate=0.015,
numIterations=1200,
maxDepth=8,
featureFraction=0.35,
baggingFraction=1.0,
lambdaL1=65,
lambdaL2=15,
# subsampleFreq=5,
earlyStoppingRound=20,
dataRandomSeed=SEED,
verbosity=-1
)
lgb_model = lgb_clf.fit(train)
train_auc = evaluator.evaluate(lgb_model.transform(train))
test_auc = evaluator.evaluate(lgb_model.transform(test))
print(f"Train auc: {train_auc:.4f}")
print(f"Test auc: {test_auc:.4f}")
LightGBM的参数比SynapseML公开的要多得多,若要添加额外的参数,请使用passThroughArgs字符串参数配置。您可以混合passThroughArgs和显式args,SynapseML合并它们以创建一个要发送到LightGBM的参数字符串。如果您在两个地方都设置参数,则以passThroughArgs为优先。
In [ ]:
feature_imp = pd.Series(
lgb_model.getFeatureImportances(),
index=assembler.get_inputCols()
).sort_values(ascending=False)
print(feature_imp.head(20))
In [13]:
spark.stop()