Step 1: Import required libraries and initialize SparkSession
from pyspark.conf import SparkConf from pyspark.sql import SparkSession from pyspark.ml import Pipeline from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder import pyspark.sql.functions as fn from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator from pyspark.ml.linalg import Vectors from pyspark.sql import Row from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit from xgboost.spark import SparkXGBClassifier import xgboost as xgb
import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns
# Assemble the feature columns into a single vector column feature_names = data.drop("SK_ID_CURR", "label").columns assembler = VectorAssembler( inputCols=feature_names, outputCol="features" )
# Split the data into train and test. train, test = data.randomSplit([0.75, 0.25], seed=SEED)
Step 4: Hyperparameter Tuning & Building a model (optional)
# Assemble all the steps into a pipeline. pipeline = Pipeline(stages=[assembler, classifier])
# We use a ParamGridBuilder to construct a grid of parameters to search over. # CrossValidator will try all combinations of values and determine best model using # the evaluator. paramGrid = ParamGridBuilder() \ .addGrid(classifier.learning_rate, [0.01]) \ .addGrid(classifier.max_depth, [4, 6, 8]) \ .addGrid(classifier.subsample, [1.0]) \ .addGrid(classifier.colsample_bytree, [0.33, 0.66]) \ .addGrid(classifier.reg_alpha, [0.5, 5.0, 50]) \ .addGrid(classifier.reg_lambda, [15]) \ .addGrid(classifier.n_estimators, [500, 1000, 1500]) \ .build()
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator. crossval = CrossValidator( estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3, parallelism=4 )
# Run cross-validation, and choose the best set of parameters. cvModel = crossval.fit(train)
# Load the model # from pyspark.ml.classification import SparkXGBClassifierModel # loaded_model = SparkXGBClassifierModel.load("xgbModel")
Step 10: Predict
# Make predictions on test data. predictions = model.transform(test) selected = predictions.rdd.map(extractProbability).toDF().select("SK_ID_CURR", "probability", "prediction")
# Select example rows to display. selected.show(5)
2024-05-17 21:41:02,957 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs
INFO:XGBoost-PySpark:Do the inference on the CPUs (0 + 1) / 1]
+----------+-------------------+----------+
|SK_ID_CURR| probability|prediction|
+----------+-------------------+----------+
| 100004|0.18962906301021576| 0.0|
| 100009|0.08838339149951935| 0.0|
| 100011| 0.4154616594314575| 0.0|
| 100012| 0.1840885430574417| 0.0|
| 100017|0.23958267271518707| 0.0|
+----------+-------------------+----------+
only showing top 5 rows
# save predictions # selected.write.mode('overwrite').saveAsTable('predictions')
// Instantiate metrics object val metrics = newBinaryClassificationMetrics(predictionAndLabels)
// Precision-Recall Curve valPRC = metrics.pr
// AUPRC val auPRC = metrics.areaUnderPR println(s"Area under precision-recall curve = $auPRC")
// ROC Curve val roc = metrics.roc
// AUROC val auROC = metrics.areaUnderROC println(s"Area under ROC = $auROC")
在 PySpark 中定义一个子类借用scala接口:
from pyspark.mllib.evaluation import BinaryClassificationMetrics
classCurveMetrics(BinaryClassificationMetrics): def__init__(self, *args): super(CurveMetrics, self).__init__(*args) def_to_list(self, rdd): points = [] # Note this collect could be inefficient for large datasets # considering there may be one probability per datapoint (at most) # The Scala version takes a numBins parameter, # but it doesn't seem possible to pass this from Python to Java for row in rdd.collect(): # Results are returned as type scala.Tuple2, # which doesn't appear to have a py4j mapping points += [(float(row._1()), float(row._2()))] return points defget_curve(self, method): rdd = getattr(self._java_model, method)().toJavaRDD() points = self._to_list(rdd) returnzip(*points) # return tuple(fpr, tpr)
定义子类后具体使用如下:
from pyspark.sql import Row defextractProbability(row, labelCol='label', probabilityCol='probability'): return Row(label = float(row[labelCol]), probability = float(row['probability'][1]))
from pyspark.sql import Window, functions as fn from pyspark.sql import feature as ft
defroc_curve_on_spark(predictions, labelCol='label', probabilityCol='probability'): """ Returns the receiver operating characteristic (ROC) curve, which is a Dataframe having two fields (FPR, TPR) with (0.0, 0.0) prepended and (1.0, 1.0) appended to it. """ roc = predictions.select(labelCol, probabilityCol) # window functions window = Window.orderBy(fn.desc(probabilityCol)) # accumulate the true positives with decreasing threshold roc = roc.withColumn('tps', fn.sum(roc[labelCol]).over(window)) # accumulate the false positives with decreasing threshold roc = roc.withColumn('fps', fn.sum(fn.lit(1) - roc[labelCol]).over(window))
# The total number of negative samples numPositive = roc.tail(1)[0]['tps'] numNegative = roc.tail(1)[0]['fps']
# Add an extra threshold position # to make sure that the curve starts at (0, 0) start_row = spark.createDataFrame([(0.0, 0.0)], schema=roc.schema) roc = start_row.unionAll(roc) return roc