from pyspark.conf import SparkConf from pyspark.sql import SparkSession from pyspark.ml import Pipeline from pyspark.ml import Estimator, Transformer from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder import pyspark.sql.functions as fn import pyspark.ml.feature as ft from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator from pyspark.ml.linalg import Vectors from pyspark.sql import Row from pyspark.sql import Observation from pyspark.sql import Window from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit from xgboost.spark import SparkXGBClassifier import xgboost as xgb
import os import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns
# Use 0.11.4-spark3.3 version for Spark3.3 and 1.0.2 version for Spark3.4 spark = SparkSession.builder \ .master("local[*]") \ .appName("XGBoost with PySpark") \ .config("spark.driver.memory", "10g") \ .config("spark.driver.cores", "2") \ .config("spark.executor.memory", "10g") \ .config("spark.executor.cores", "2") \ .enableHiveSupport() \ .getOrCreate() sc = spark.sparkContext sc.setLogLevel('ERROR')
24/06/01 11:20:13 WARN Utils: Your hostname, MacBook-Air resolves to a loopback address: 127.0.0.1; using 192.168.1.5 instead (on interface en0)
24/06/01 11:20:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/01 11:20:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
df = spark.sql("select * from home_credit_default_risk.application_train")
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
# Number of each type of column dtypes = dict(df.dtypes) pd.Series(dtypes).value_counts()
double 65
int 41
string 16
Name: count, dtype: int64
接下来看下数据集的统计信息
df.summary().toPandas()
summary
SK_ID_CURR
TARGET
NAME_CONTRACT_TYPE
CODE_GENDER
FLAG_OWN_CAR
FLAG_OWN_REALTY
CNT_CHILDREN
AMT_INCOME_TOTAL
AMT_CREDIT
...
FLAG_DOCUMENT_18
FLAG_DOCUMENT_19
FLAG_DOCUMENT_20
FLAG_DOCUMENT_21
AMT_REQ_CREDIT_BUREAU_HOUR
AMT_REQ_CREDIT_BUREAU_DAY
AMT_REQ_CREDIT_BUREAU_WEEK
AMT_REQ_CREDIT_BUREAU_MON
AMT_REQ_CREDIT_BUREAU_QRT
AMT_REQ_CREDIT_BUREAU_YEAR
0
count
307511
307511
307511
307511
307511
307511
307511
307511
307511
...
307511
307511
307511
307511
265992
265992
265992
265992
265992
265992
1
mean
278180.51857657125
0.08072881945686496
None
None
None
None
0.4170517477423572
168797.91929698447
599025.9997057016
...
0.008129790479039774
5.951006630657115E-4
5.072989258920819E-4
3.349473677364387E-4
0.006402448193930645
0.0070002105326475985
0.0343619356973142
0.26739526000781977
0.26547414959848414
1.899974435321363
2
stddev
102790.17534842461
0.2724186456483938
None
None
None
None
0.722121384437625
237123.14627885612
402490.776995855
...
0.0897982361093956
0.024387465065862264
0.022517620268446132
0.01829853182243764
0.08384912844747726
0.11075740632435459
0.20468487581282443
0.9160023961526171
0.7940556483207575
1.8692949981815559
3
min
100002
0
Cash loans
F
N
N
0
25650.0
45000.0
...
0
0
0
0
0.0
0.0
0.0
0.0
0.0
0.0
4
25%
189124
0
None
None
None
None
0
112500.0
270000.0
...
0
0
0
0
0.0
0.0
0.0
0.0
0.0
0.0
5
50%
278173
0
None
None
None
None
0
146250.0
513531.0
...
0
0
0
0
0.0
0.0
0.0
0.0
0.0
1.0
6
75%
367118
0
None
None
None
None
1
202500.0
808650.0
...
0
0
0
0
0.0
0.0
0.0
0.0
0.0
3.0
7
max
456255
1
Revolving loans
XNA
Y
Y
19
1.17E8
4050000.0
...
1
1
1
1
4.0
9.0
8.0
27.0
261.0
25.0
8 rows × 123 columns
查看目标变量分布
# `TARGET` is the target variable we are trying to predict (0 or 1): # 1 = Not Repaid # 0 = Repaid
# Check if the data is unbalanced row = df.select(fn.mean('TARGET').alias('rate')).first() print(f"percentage of default : {row['rate']:.2%}") df.groupBy("TARGET").count().show()
# Replace the anomalous values with nan df_emp = df.select(fn.when(df['DAYS_EMPLOYED']>=365243, None).otherwise(df['DAYS_EMPLOYED']).alias('DAYS_EMPLOYED'))
dtypes = df.drop("SK_ID_CURR", "TARGET").dtypes categorical_cols = [k for k, v in dtypes if v == 'string'] numerical_cols = [k for k, v in dtypes if v != 'string']
# Data cleaning defclean(df): # remove duplicates. df = df.dropDuplicates(subset=["SK_ID_CURR"]) # transform cols_to_transform = ['FLAG_OWN_CAR', 'FLAG_OWN_REALTY', 'EMERGENCYSTATE_MODE'] df = df.replace( ['Y', 'N', 'Yes', 'No'], ['1', '0', '1', '0'], subset=cols_to_transform ) df = df.withColumns({c: df[c].cast('int') for c in cols_to_transform}) # Replace the anomalous values with nan df = df.withColumn('DAYS_EMPLOYED', fn.when(df['DAYS_EMPLOYED']>=365243, None).otherwise(df['DAYS_EMPLOYED']) ) df = df.replace('XNA', None) df = df.withColumnRenamed("TARGET", "label") return df
一般情况下,针对分类特征,我们只需要OneHotEncoder或OrdinalEncoder进行编码,这类简单的预处理能够满足大多数数据挖掘算法的需求。如果某一个分类特征的可能值非常多(高基数 high cardinality),那么再使用one-hot编码往往会出现维度爆炸。平均数编码(mean encoding)是一种高效的编码方式,在实际应用中,能极大提升模型的性能。
classMeanEncoder(Estimator, Transformer): def__init__(self, smoothing=0.0, inputCols=None, labelCol="label"): """ The MeanEncoder() replaces categories by the mean value of the target for each category. math: mapping = (w_i) posterior + (1-w_i) prior where w_i = n_i t / (s + n_i t) In the previous equation, t is the target variance in the entire dataset, s is the target variance within the category and n is the number of observations for the category. Parameters ---------- smoothing: int, float, 'auto', default=0.0 """ super().__init__() self.smoothing = smoothing self.inputCols = inputCols self.labelCol = labelCol def_fit(self, df): """ Learn the mean value of the target for each category of the variable. """
self.encoder_dict = {} inputCols = self.inputCols labelCol = self.labelCol y_prior = df.select(fn.mean(labelCol).alias("mean")).first()["mean"] for var in inputCols: if self.smoothing == "auto": y_var = df.cov(labelCol, labelCol) damping = fn.variance(labelCol) / y_var else: damping = fn.lit(self.smoothing) groups = df.groupBy(var).agg( fn.mean(labelCol).alias("posterior"), fn.count("*").alias("counts"), damping.alias("damping") ).toPandas().dropna() groups["lambda"] = groups["counts"] / (groups["counts"] + groups["damping"]) groups["code"] = ( groups["lambda"] * groups["posterior"] + (1.0 - groups["lambda"]) * y_prior ) self.encoder_dict[var] = dict(zip(groups[var], groups["code"])) return self def_transform(self, df): for var in self.encoder_dict: mapping = {k: str(v) for k,v in self.encoder_dict[var].items()} df = df.replace(mapping, subset=[var]) df = df.withColumn(var, df[var].cast('float'))
print(f'{len(self.encoder_dict):d} columns were mean encoded') return df
# replace categories by the mean value of the target for each category. inputCols = ['OCCUPATION_TYPE', 'ORGANIZATION_TYPE'] mean_encoder = MeanEncoder( inputCols=inputCols, labelCol='label', smoothing='auto' ) mean_encoder.fit(df).transform(df).select(inputCols).show(5)
2 columns were mean encoded
+---------------+-----------------+
|OCCUPATION_TYPE|ORGANIZATION_TYPE|
+---------------+-----------------+
| 0.062140968| 0.09299603|
| 0.09631742| 0.09449421|
| 0.113258936| 0.10173836|
| NULL| NULL|
| NULL| NULL|
+---------------+-----------------+
only showing top 5 rows
哑变量编码
无序分类特征对于树集成模型(tree-ensemble like XGBoost)是可用的,但对于线性模型(like Lasso or Ridge)则必须使用one-hot重编码。接下来我们把上节索引化的无序分类特征进行编码。
# The nominative (unordered) categorical features encoded_cols = ['NAME_EDUCATION_TYPE', 'OCCUPATION_TYPE', 'ORGANIZATION_TYPE'] nominal_categories = [col for col in categorical_cols if col notin encoded_cols]
indexedCols = [f"indexed_{col}"for col in nominal_categories] vectorCols = [f"encoded_{col}"for col in nominal_categories]
# Bin the age data age_binned = pd.cut(sample['age'], bins = np.linspace(20, 70, num = 11)) age_groups = sample['label'].groupby(age_binned).mean()
plt.figure(figsize = (8, 3)) # Graph the age bins and the average of the target as a bar plot sns.barplot(x=age_groups.index, y=age_groups*100) # Plot labeling plt.xticks(rotation = 30) plt.xlabel('Age Group (years)') plt.ylabel('Failure to Repay (%)') plt.title('Failure to Repay by Age Group')
dtypes = df.drop("SK_ID_CURR", "TARGET").dtypes categorical_cols = [k for k, v in dtypes if v == 'string'] numerical_cols = [k for k, v in dtypes if v != 'string']
defencode(df): # The ordinal (ordered) categorical features # Pandas calls the categories "levels" ordered_levels = { "NAME_EDUCATION_TYPE": ["Lower secondary", "Secondary / secondary special", "Incomplete higher", "Higher education"] } df = ordinal_encode(df, ordered_levels) # replace categories by the mean value of the target for each category. mean_encoder = MeanEncoder( inputCols=['OCCUPATION_TYPE', 'ORGANIZATION_TYPE'], labelCol='label', smoothing='auto' ) df = mean_encoder.fit(df).transform(df) # The nominative (unordered) categorical features nominal_categories = [col for col in categorical_cols if col notin ordered_levels] features_onehot = [col for col in nominal_categories if col notin ['OCCUPATION_TYPE', 'ORGANIZATION_TYPE']]
indexedCols = [f"indexed_{col}"for col in features_onehot] encodedCols = [f"encoded_{col}"for col in features_onehot]
# Function to calculate missing values by column defdisplay_missing(df, threshold=None, verbose=1): n = df.count() exprs = [fn.sum(df[col].isNull().cast('int')).alias(col) for col in df.columns] missing_number = df.select(*exprs).first().asDict() missing_df = pd.DataFrame({ "missing_number": missing_number.values(), # Total missing values "missing_rate": [value / n for value in missing_number.values()] # Proportion of missing values }, index=missing_number.keys()) missing_df = missing_df.query("missing_rate>0").sort_values("missing_rate", ascending=False) threshold = 0.25if threshold isNoneelse threshold high_missing = missing_df.query(f"missing_rate>{threshold}") # Print some summary information if verbose: print(f"Your selected dataframe has {missing_df.shape[0]} out of {len(df.columns)} columns that have missing values.") # Return the dataframe with missing information if threshold isNone: return missing_df else: if verbose: print(f"There are {high_missing.shape[0]} columns with more than {threshold:.1%} missing values.") return high_missing
Your selected dataframe has 66 out of 122 columns that have missing values.
There are 47 columns with more than 25.0% missing values.
missing_number missing_rate
COMMONAREA_MEDI 214865 0.698723
COMMONAREA_MODE 214865 0.698723
COMMONAREA_AVG 214865 0.698723
NONLIVINGAPARTMENTS_MODE 213514 0.694330
NONLIVINGAPARTMENTS_MEDI 213514 0.694330
NONLIVINGAPARTMENTS_AVG 213514 0.694330
LIVINGAPARTMENTS_MODE 210199 0.683550
LIVINGAPARTMENTS_MEDI 210199 0.683550
LIVINGAPARTMENTS_AVG 210199 0.683550
FLOORSMIN_MODE 208642 0.678486
defdrop_missing_data(df, threshold=0.8): # Remove variables with missing more than threshold(default 20%) thresh = int(df.count() * (1 - threshold)) exprs = [fn.sum(df[col].isNull().cast('int')).alias(col) for col in df.columns] missing_number = df.select(*exprs).first().asDict() cols_to_drop = [k for k,v in missing_number.items() if v > thresh] print(f"Removed {len(cols_to_drop)} variables with missing more than {1 - threshold:.1%}") return df.drop(*cols_to_drop)
# Adds a binary variable to flag missing observations. from pyspark.ml.stat import Correlation, ChiSquareTest
defflag_missing(df, inputCols=None, labelCol='label', alpha=0.05): """ Adds a binary variable to flag missing observations(one indicator per variable). The added variables (missing indicators) are named with the original variable name plus '_missing'. Parameters: ---------- alpha: float, default=0.05 Features with correlation more than alpha are selected. """ if inputCols isNone: inputCols = df.drop(labelCol).columns for var in inputCols: df = df.withColumn(var + "_missing", df[var].isNull().cast('int')) indicators = [var + "_missing"for var in inputCols] # The correlations corr = df.select([fn.corr(labelCol, c2).alias(c2) for c2 in indicators]) corr = corr.fillna(0).first().asDict() # find variables for which indicator should be added. selected_cols = [var for var, r in corr.items() ifabs(r) > alpha] drop_cols = [var for var in indicators if var notin selected_cols] df = df.drop(*drop_cols) print(f"Added {len(selected_cols)} missing indicators") return df
print('The number of features:', len(flag_missing(df_encoded).columns))
Added 0 missing indicators
The number of features: 122
+-----------+-------------+
|CODE_GENDER| encodedCol|
+-----------+-------------+
| M|(4,[1],[1.0])|
| F|(4,[0],[1.0])|
| M|(4,[1],[1.0])|
| F|(4,[0],[1.0])|
| F|(4,[0],[1.0])|
+-----------+-------------+
only showing top 5 rows
分类特征在索引化时已经处理了缺失值,因此不需要再特殊处理。
若变量是布尔型,视情况可统一填充为零
nunique = df_encoded.select([fn.countDistinct(var).alias(var) for var in df_encoded.columns]).first().asDict() binary = df_encoded.select([fn.collect_set(var).alias(var) for var,n in nunique.items() if n == 2]) print([k for k, v in binary.first().asDict().items() ifset(v) == {0, 1}])
# Univariate imputer for completing missing values with simple strategies.
dtypes = df_encoded.drop("SK_ID_CURR", "TARGET").dtypes numerical_cols = [k for k, v in dtypes if v notin ('string', 'vector')] imputed_cols = [f"imputed_{col}"for col in numerical_cols] imputer = ft.Imputer( inputCols=numerical_cols, outputCols=imputed_cols, strategy="median" )
Your selected dataframe has 0 out of 111 columns that have missing values.
There are 0 columns with more than 25.0% missing values.
函数封装
最后,总结下我们的缺失处理策略:
删除缺失率高于80%特征
添加缺失标记
有业务含义的进行人工插补
最后简单统计插补
# Function for missing value imputation
defhandle_missing(df): # Remove variables with high missing rate df = drop_missing_data(df, threshold=0.2) # find variables for which indicator should be added. df = flag_missing(df)
# Replaces missing values by an arbitrary value df = impute_manually(df)
# Univariate imputer for completing missing values with simple strategies. dtypes = df.drop("SK_ID_CURR", "TARGET").dtypes numerical_cols = [k for k, v in dtypes if v notin ('string', 'vector')] imputed_cols = [f"imputed_{col}"for col in numerical_cols] imputer = ft.Imputer( inputCols=numerical_cols, outputCols=imputed_cols, strategy="median" ) df = imputer.fit(df).transform(df) colsMap = dict(zip(imputed_cols, numerical_cols)) df = df.drop(*numerical_cols).withColumnsRenamed(colsMap) return df
df_imputed = handle_missing(df_encoded)
Removed 0 variables with missing more than 80.0%
Added 0 missing indicators
确认缺失值是否已全部处理完毕:
_ = display_missing(df_imputed)
Your selected dataframe has 0 out of 122 columns that have missing values.
There are 0 columns with more than 25.0% missing values.
classOutlierCapper(Estimator, Transformer): """ Caps maximum and/or minimum values of a variable at automatically determined values. Works only with numerical variables. A list of variables can be indicated. Parameters ---------- method: str, 'gaussian' or 'iqr', default='iqr' If method='gaussian': - upper limit: mean + 3 * std - lower limit: mean - 3 * std If method='iqr': - upper limit: 75th quantile + 3 * IQR - lower limit: 25th quantile - 3 * IQR where IQR is the inter-quartile range: 75th quantile - 25th quantile. fold: int, default=3 You can select how far out to cap the maximum or minimum values. """
def_fit(self, df): """ Learn the values that should be used to replace outliers. """
if self.method == "gaussian": mean = df.select([fn.mean(var).alias(var) for var in self.inputCols]) mean = pd.Series(mean.first().asDict()) bias= [mean, mean] scale = df.select([fn.std(var).alias(var) for var in self.inputCols]) scale = pd.Series(scale.first().asDict()) elif self.method == "iqr": Q1 = df.select([fn.percentile(var, 0.25).alias(var) for var in self.inputCols]) Q1 = pd.Series(Q1.first().asDict()) Q3 = df.select([fn.percentile(var, 0.75).alias(var) for var in self.inputCols]) Q3 = pd.Series(Q3.first().asDict()) bias = [Q1, Q3] scale = Q3 - Q1 # estimate the end values if (scale == 0).any(): raise ValueError( f"Input columns {scale[scale == 0].index.tolist()!r}" f" have low variation for method {self.method!r}." f" Try other capping methods or drop these columns." ) else: self.upper_limit = bias[1] + self.fold * scale self.lower_limit = bias[0] - self.fold * scale
return self
def_transform(self, df): """ Cap the variable values. """ maximum = df.select([fn.max(var).alias(var) for var in self.inputCols]) maximum = pd.Series(maximum.first().asDict()) minimum = df.select([fn.min(var).alias(var) for var in self.inputCols]) minimum = pd.Series(minimum.first().asDict()) outiers = (maximum.gt(self.upper_limit) | minimum.lt(self.lower_limit)) n = outiers.sum() print(f"Your selected dataframe has {n} out of {len(self.inputCols)} columns that have outliers.") # replace outliers for var in self.inputCols: upper_limit = self.upper_limit[var] lower_limit = self.lower_limit[var] df = df.withColumn(var, fn.when(df[var] > upper_limit, upper_limit) .when(df[var] < lower_limit, lower_limit) .otherwise(df[var]) ) return df
# Check the skew of all numerical features skewness = df_imputed.select([fn.skewness(var).alias(var) for var in numerical_cols]) skewness = pd.Series(skewness.first().asDict()).sort_values() print(skewness.head(10)) print(skewness.tail(10))
new_colnames = {c: c.replace('/','or').replace(' ','_').replace(',','_or') for c in df_prepared.columns} df_prepared = df_prepared.withColumnsRenamed(new_colnames)