from pyspark.conf import SparkConf from pyspark.sql import SparkSession from pyspark.ml import Pipeline from pyspark.ml import Estimator, Transformer from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder import pyspark.sql.functions as fn import pyspark.ml.feature as ft from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator from pyspark.ml.linalg import Vectors from pyspark.sql import Row from pyspark.sql import Observation from pyspark.sql import Window from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit from xgboost.spark import SparkXGBClassifier import xgboost as xgb
import os import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns
# Use 0.11.4-spark3.3 version for Spark3.3 and 1.0.2 version for Spark3.4 spark = SparkSession.builder \ .master("local[*]") \ .appName("XGBoost with PySpark") \ .config("spark.driver.memory", "10g") \ .config("spark.driver.cores", "2") \ .config("spark.executor.memory", "10g") \ .config("spark.executor.cores", "2") \ .enableHiveSupport() \ .getOrCreate() sc = spark.sparkContext sc.setLogLevel('ERROR')
24/06/03 21:40:26 WARN Utils: Your hostname, MacBook-Air resolves to a loopback address: 127.0.0.1; using 192.168.1.5 instead (on interface en0)
24/06/03 21:40:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/03 21:40:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
df = spark.sql("select * from home_credit_default_risk.created_data")
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
# Persists the data in the disk by specifying the storage level. from pyspark.storagelevel import StorageLevel _ = df.persist(StorageLevel.MEMORY_AND_DISK)
features = df.drop('SK_ID_CURR', 'label').columns feature_importances = score_dataset(df, inputCols=features)
defdrop_correlated_features(df, threshold=0.9): inputCols = [col for col,dtype in df.dtypes if dtype notin ['string', 'vector']] # Assemble the feature columns into a single vector column assembler = VectorAssembler( inputCols=inputCols, outputCol="numericFeatures" ) df = assembler.transform(df) # Compute the correlation matrix with specified method using dataset. corrmat = Correlation.corr(df, 'numericFeatures', 'pearson').collect()[0][0] corrmat = pd.DataFrame(corrmat.toArray(), index=inputCols, columns=inputCols) # Upper triangle of correlations upper = corrmat.where(np.triu(np.ones(corrmat.shape), k=1).astype('bool')) # Absolute value correlation corr = upper.unstack().dropna().abs() to_drop = corr[corr.gt(threshold)].reset_index()['level_1'].unique() return to_drop.tolist()
correlated = drop_correlated_features(df.select(features)) selected_features = [col for col in features if col notin correlated] print(f'Dropped {len(correlated)} correlated features.')
Dropped 127 correlated features.
卡方检验
卡方检验是一种用于衡量两个分类变量之间相关性的统计方法。
# Find categorical features int_features = [k for k,v in df.select(selected_features).dtypes if v == 'int'] vector_features = [k for k,v in df.select(selected_features).dtypes if v == 'vector'] nunique = df.select([fn.countDistinct(var).alias(var) for var in int_features]).first().asDict()
categorical_cols = [f for f, n in nunique.items() if n <= 50] continuous_cols = list(set(selected_features) - set(categorical_cols + vector_features))
from pyspark.ml.feature import UnivariateFeatureSelector
defchi2_test_selector(df, categoricalFeatures, outputCol): selector = UnivariateFeatureSelector( featuresCol="categoricalFeatures", labelCol="label", outputCol=outputCol, selectionMode="fdr" ) selector.setFeatureType("categorical").setLabelType("categorical").setSelectionThreshold(0.05) # Assemble the feature columns into a single vector column assembler = VectorAssembler( inputCols=categoricalFeatures, outputCol="categoricalFeatures" ) df = assembler.transform(df) model = selector.fit(df) df = model.transform(df)
n = df.first()["categoricalFeatures"].size print("The number of dropped features:", n - len(model.selectedFeatures)) return df
# Sort features according to importance feature_importances = feature_importances.sort_values('fscore', ascending=False) feature_importances['fscore'].head(15)
# Find the features with zero importance zero_importance = feature_importances.query("fscore == 0.0").index.tolist() print(f'\nThere are {len(zero_importance)} features with 0.0 importance')
There are 7 features with 0.0 importance
selected_features = [col for col in selected_features if col notin zero_importance] print("The number of selected features:", len(selected_features)) print("Dropped {} features with zero importance.".format(len(zero_importance)))
The number of selected features: 232
Dropped 7 features with zero importance.
original_df = spark.sql("select * from home_credit_default_risk.prepared_data").limit(1).toPandas()
original_features = [f for f in selected_features if f in original_df.columns] derived_features = [f for f in selected_features if f notin original_features]
print(f"Selected features: {len(original)} original features, {len(derived)} derived features.")
Selected features: 79 original features, 153 derived features.