有这么一句话在业界广泛流传:数据和特征决定了机器学习的上限,而模型和算法只是逼近这个上限而已。由此可见,特征工程在机器学习中占有相当重要的地位。在实际应用当中,可以说特征工程是机器学习成功的关键。

特征工程是数据分析中最耗时间和精力的一部分工作,它不像算法和模型那样是确定的步骤,更多是工程上的经验和权衡。因此没有统一的方法。这里只是对一些常用的方法做一个总结。

特征工程包含了 Data PreProcessing(数据预处理)、Feature Extraction(特征提取)、Feature Selection(特征选择)和 Feature construction(特征构造)等子问题。

Jupyter Notebook 代码连接:feature_engineering_on_spark_p2_feature_construction

特征构造

特征构造是从现有数据创建新特征的过程。目标是构建有用的功能,帮助我们的模型了解数据集中的信息与给定目标之间的关系。

from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml import Estimator, Transformer
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
import pyspark.sql.functions as fn
import pyspark.ml.feature as ft
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
from pyspark.sql import Observation
from pyspark.sql import Window
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from xgboost.spark import SparkXGBClassifier
import xgboost as xgb

import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

import time
import warnings
import gc

# Setting configuration.
warnings.filterwarnings('ignore')
SEED = 42

# Use 0.11.4-spark3.3 version for Spark3.3 and 1.0.2 version for Spark3.4
spark = SparkSession.builder \
.master("local[*]") \
.appName("XGBoost with PySpark") \
.config("spark.driver.memory", "10g") \
.config("spark.driver.cores", "2") \
.config("spark.executor.memory", "10g") \
.config("spark.executor.cores", "2") \
.enableHiveSupport() \
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('ERROR')
24/06/03 20:13:12 WARN Utils: Your hostname, MacBook-Air resolves to a loopback address: 127.0.0.1; using 192.168.1.5 instead (on interface en0)
24/06/03 20:13:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/03 20:13:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
df = spark.sql("select * from home_credit_default_risk.prepared_data")
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
# Number of each type of column
dtypes = dict(df.dtypes)
pd.Series(dtypes).value_counts()
double    65
int       45
vector    10
float      2
Name: count, dtype: int64

为了检测新特征集的性能,我们先定义数据集评估函数。

def timer(func):
import time
import functools
def strfdelta(tdelta, fmt):
hours, remainder = divmod(tdelta, 3600)
minutes, seconds = divmod(remainder, 60)
return fmt.format(hours, minutes, seconds)
@functools.wraps(func)
def wrapper(*args, **kwargs):
click = time.time()
print("Starting time\t", time.strftime("%H:%M:%S", time.localtime()))
result = func(*args, **kwargs)
delta = strfdelta(time.time() - click, "{:.0f} hours {:.0f} minutes {:.0f} seconds")
print(f"{func.__name__} cost {delta}")
return result
return wrapper

def cross_val_score(df, estimator, evaluator, features, numFolds=3, seed=SEED):
df = df.withColumn('fold', (fn.rand(seed) * numFolds).cast('int'))
eval_result = []
# Initialize an empty dataframe to hold feature importances
feature_importances = pd.DataFrame(index=features)
for i in range(numFolds):
train = df.filter(df['fold'] == i)
valid = df.filter(df['fold'] != i)
model = estimator.fit(train)
train_pred = model.transform(train)
valid_pred = model.transform(valid)
train_score = evaluator.evaluate(train_pred)
valid_score = evaluator.evaluate(valid_pred)
metric = evaluator.getMetricName()
print(f"[{i}] train's {metric}: {train_score}, valid's {metric}: {valid_score}")
eval_result.append(valid_score)

fscore = model.get_feature_importances()
fscore = {name:fscore.get(f'f{k}', 0) for k,name in enumerate(features)}
feature_importances[f'cv_{i}'] = fscore
feature_importances['fscore'] = feature_importances.mean(axis=1)
return eval_result, feature_importances.sort_values('fscore', ascending=False)

@timer
def score_dataset(df, inputCols=None, featuresCol=None, labelCol='label', nfold=3):
assert inputCols is not None or featuresCol is not None
if featuresCol is None:
# Assemble the feature columns into a single vector column
featuresCol = "features"
assembler = VectorAssembler(
inputCols=inputCols,
outputCol=featuresCol
)
df = assembler.transform(df)
# Create an Estimator.
classifier = SparkXGBClassifier(
features_col=featuresCol,
label_col=labelCol,
eval_metric='auc',
scale_pos_weight=11,
learning_rate=0.015,
max_depth=8,
subsample=1.0,
colsample_bytree=0.35,
reg_alpha=65,
reg_lambda=15,
n_estimators=500,
verbosity=0
)
evaluator = BinaryClassificationEvaluator(labelCol=labelCol, metricName='areaUnderROC')
# Training with 3-fold CV:
scores, feature_importances = cross_val_score(
df=df,
estimator=classifier,
evaluator=evaluator,
features=inputCols,
numFolds=nfold
)
print(f"cv_agg's valid auc: {np.mean(scores):.4f} +/- {np.std(scores):.5f}")
return feature_importances

简单数学变换

我们可以根据业务含义,创建具有一些明显实际含义的补充特征,例如:

math_features = df

# 贷款金额相对于收入的比率
math_features = math_features.withColumn('CREDIT_INCOME_PERCENT', df['AMT_CREDIT'] / df['AMT_INCOME_TOTAL'])

# 贷款年金占总收入比率
math_features = math_features.withColumn('ANNUITY_INCOME_PERCENT', df['AMT_ANNUITY'] / df['AMT_INCOME_TOTAL'])

# 以月为单位的付款期限
math_features = math_features.withColumn('CREDIT_TERM', df['AMT_ANNUITY'] / df['AMT_CREDIT'])

#工作时间占年龄的比率
math_features = math_features.withColumn('DAYS_EMPLOYED_PERCENT', df['DAYS_EMPLOYED'] / df['DAYS_BIRTH'])

# 该用户家庭的人均收入
math_features = math_features.withColumn('INCOME_PER_PERSON', df['AMT_INCOME_TOTAL'] / df['CNT_FAM_MEMBERS'])

我们可以在图形中直观地探索这些新变量:

new_cols = ['CREDIT_INCOME_PERCENT', 'ANNUITY_INCOME_PERCENT', 'CREDIT_TERM', 'DAYS_EMPLOYED_PERCENT']
math_features = math_features.select(*new_cols, 'label').sample(0.1).toPandas()

plt.figure(figsize = (10, 6))
# iterate through the new features
for i, feature in enumerate(new_cols):
# create a new subplot for each feature
plt.subplot(2, 2, i + 1)
sns.kdeplot(data=math_features, x=feature, hue='label', common_norm=False)

del math_features
gc.collect()

当然,我们不可能手动计算出所有有实际含义的数学特征。我们编写一个函数找到尽可能多的特征组合进行加减乘除运算。另外,还有对单变量的对数、指数、倒数、平方根、三角函数等运算。

from itertools import product, permutations, combinations

class mathFeatureCreator(Estimator, Transformer):
"""
Apply math operators to create new features.
Parameters
----------
inputCols: List[string]
The list of input variables.
operators: List[string]
List of Transform Feature functions to apply.
- binary: 'add', 'subtract', 'multiply', 'divide'
- unary: 'sin', 'cos', 'tan', 'sqrt', 'ln'
"""
def __init__(self, inputCols, operators):
self.inputCols = inputCols
self.operators = operators

def _fit(self, df):
return self

def _transform(self, df):
funcs = {
'add': (fn.try_add, '+'),
'subtract': (fn.try_subtract, '-'),
'multiply': (fn.try_multiply, '*'),
'divide': (fn.try_divide, '/'),
'sin': fn.sin,
'cos': fn.cos,
'tan': fn.tan,
'sqrt': fn.sqrt,
'ln': fn.ln
}

commutation = [o for o in self.operators if o in ['add', 'subtract', 'multiply']]
binary = [o for o in self.operators if o in ['divide']]
unary = [o for o in self.operators if o in ['sin', 'cos', 'tan', 'sqrt', 'ln']]

comb = list(combinations(self.inputCols, 2))
perm = list(permutations(self.inputCols, 2))

feature_number = (len(list(comb)) * len(commutation) +
len(list(perm)) * len(binary) +
len(unary) * len(self.inputCols))
print(f"Built {feature_number} features.")

df_new = df.select("SK_ID_CURR", *self.inputCols)
for operator, (left, right) in product(commutation, comb):
func = funcs[operator][0]
colname = left + funcs[operator][1] + right
df_new = df_new.withColumn(colname, func(left, right))

for operator, (left, right) in product(binary, perm):
func = funcs[operator][0]
colname = left + funcs[operator][1] + right
df_new = df_new.withColumn(colname, func(left, right))

for operator, col in product(unary, self.inputCols):
func = funcs[operator]
colname = f"{operator}({col})"
df_new = df_new.withColumn(colname, func(col))

return df_new.drop(*self.inputCols)

注意:对于二元运算虽然简单,但会出现阶乘式维度爆炸。因此,我们挑选出少数型特征进行简单运算。

features_to_trans = [
'AMT_CREDIT', 'AMT_ANNUITY', 'DAYS_BIRTH', 'AMT_GOODS_PRICE',
'DAYS_ID_PUBLISH', 'DAYS_EMPLOYED', 'DAYS_LAST_PHONE_CHANGE',
'DAYS_REGISTRATION', 'AMT_INCOME_TOTAL', 'REGION_POPULATION_RELATIVE',
'ORGANIZATION_TYPE', 'OWN_CAR_AGE', 'OCCUPATION_TYPE',
'HOUR_APPR_PROCESS_START', 'NAME_EDUCATION_TYPE', 'NONLIVINGAPARTMENTS_MEDI',
'EXT_SOURCE_1', 'EXT_SOURCE_2', 'EXT_SOURCE_3'
]
nunique = df.select([fn.countDistinct(col).alias(col) for col in features_to_trans]).first()

discrete_to_trans = [f for f in features_to_trans if nunique[f] < 50]
continuous_to_trans = [f for f in features_to_trans if nunique[f] >= 50]

print("Selected discrete features:", discrete_to_trans, sep='\n')
print("Selected continuous features:", continuous_to_trans, sep='\n')
Selected discrete features:
['OCCUPATION_TYPE', 'HOUR_APPR_PROCESS_START', 'NAME_EDUCATION_TYPE']
Selected continuous features:
['AMT_CREDIT', 'AMT_ANNUITY', 'DAYS_BIRTH', 'AMT_GOODS_PRICE', 'DAYS_ID_PUBLISH', 'DAYS_EMPLOYED', 'DAYS_LAST_PHONE_CHANGE', 'DAYS_REGISTRATION', 'AMT_INCOME_TOTAL', 'REGION_POPULATION_RELATIVE', 'ORGANIZATION_TYPE', 'OWN_CAR_AGE', 'NONLIVINGAPARTMENTS_MEDI', 'EXT_SOURCE_1', 'EXT_SOURCE_2', 'EXT_SOURCE_3']            
math_feature_creator = mathFeatureCreator(
inputCols=continuous_to_trans,
operators=['divide', 'sqrt', 'ln']
)
math_features = math_feature_creator.transform(df)
Built 272 features.

分组统计特征衍生

分组统计特征衍生,顾名思义,就是分类特征和连续特征间的分组交互统计,这样可以得到更多有意义的特征,例如:

# Group AMT_INCOME_TOTAL by NAME_INCOME_TYPE and calculate mean, max, min of loans
df.groupBy('OCCUPATION_TYPE').agg(
fn.mean('AMT_INCOME_TOTAL').alias('mean'),
fn.max('AMT_INCOME_TOTAL').alias('max'),
fn.min('AMT_INCOME_TOTAL').alias('min')
).show(5)
+---------------+------------------+---------+-------+
|OCCUPATION_TYPE|              mean|      max|    min|
+---------------+------------------+---------+-------+
|     0.07856473|195003.99467376832| 675000.0|67500.0|
|     0.06396721|188916.28241563056| 699750.0|30600.0|
|     0.06599255|182334.81278280544|3150000.0|36000.0|
|     0.10578712|166357.48252518754|   1.17E8|27000.0|
|    0.061600607|182842.04568321616|1890000.0|27000.0|
+---------------+------------------+---------+-------+
only showing top 5 rows

常用的统计量

var/std 方差、标准差
mean/median 均值、中位数
max/min 最大值、最小值
skew 偏度
mode 众数
nunique 类别数
frequency 频数
count 个数
quantile 分位数

注意:分组特征必须是离散特征,且最好是一些取值较多的离散变量,这样可以避免新特征出现大量重复取值。分组使用连续值特征时一般需要先进行离散化。

接下来我们自定义一个transformer用来处理数值类型和分类型的分组变量衍生。

定义计时器和进度条

def progress(percent=0, width=50, desc="Processing"):
import math
tags = math.ceil(width * percent) * "#"
print(f"\r{desc}: [{tags:-<{width}}]{percent:.1%}", end="", flush=True)
class AggFeatures(Estimator, Transformer):
"""
Transformer to aggregate features in a dataframe. This can
be used to create features for each instance of the grouping variable.

Parameters
----------
inputCols: List[string]
The list of input variables. At least one of `variables`,
groupBy: List[string]
The variables to group by.
funcs: List[string]
List of Aggregation Feature types to apply.
- Numeric: ['mean', 'median', 'max', 'min', 'skew', 'std', 'kurt']
- Category: ['mode', 'num_unique']
numBins: int, default=10
The number of bins to produce.
"""

def __init__(self, inputCols, funcs, groupByDiscrete=None, groupByContinuous=None, numBins=10):
self.inputCols = inputCols
self.funcs = funcs
self.groupByDiscrete = groupByDiscrete
self.groupByContinuous = groupByContinuous
self.numBins= numBins

def _fit(self, df):
return self

@timer
def _transform(self, df):
groupBy = []
if self.groupByContinuous is not None:
outputCols = [f"{col}_binned" for col in self.groupByContinuous]
bucketizer = ft.QuantileDiscretizer(
numBuckets=self.numBins,
handleInvalid='keep',
inputCols=self.groupByContinuous,
outputCols=outputCols
).fit(df)
df = bucketizer.transform(df)
groupBy.extend(outputCols)
if self.groupByDiscrete is not None:
groupBy.extend(self.groupByDiscrete)
if len(groupBy) == 0:
raise ValueError("groupBy is None.")

# Group by the specified variable and calculate the statistics
mapping = {
'mean': fn.mean,
'median': fn.median,
'max': fn.max,
'min': fn.min,
'skew': fn.skewness,
'kurt': fn.kurtosis,
'std': fn.std,
'mode': fn.mode,
'num_unique': fn.countDistinct
}
new_cols = []
i = 0
for by in groupBy:
# Skip the grouping variable
other_vars = [var for var in self.inputCols if by not in [var, f"{var}_binned"]]
for f in self.funcs:
colnames = {col: f"{f}({col})_by({by})" for col in other_vars}
new_cols.extend(colnames.values())

f = mapping[f]
grouped = df.groupBy(by).agg(*[f(df[var]).alias(name) for var, name in colnames.items()])
df = df.join(grouped.dropna(), on=by, how='left')

i += 1
progress(i / len(groupBy) / len(self.funcs))
print(f"Created {len(new_cols)} new features.")
return df.select("SK_ID_CURR", *new_cols)

数值型特征直接计算统计量,至于分类特征,计算众数、类别数。

agg_continuous_transformer = AggFeatures(
inputCols=continuous_to_trans,
funcs=['mean'],
groupByDiscrete=discrete_to_trans,
groupByContinuous=continuous_to_trans
)
agg_discrete_transformer = AggFeatures(
inputCols=discrete_to_trans,
funcs=['mode'],
groupByDiscrete=discrete_to_trans,
groupByContinuous=continuous_to_trans
)
agg_transformer = Pipeline(stages=[agg_continuous_transformer, agg_discrete_transformer])
# agg_features = agg_transformer.fit(df).transform(df)
# print(agg_features.columns)

特征交互

通过将单独的特征求笛卡尔乘积的方式来组合2个或更多个特征,从而构造出组合特征。最终获得的预测能力可能远远超过任一特征单独的预测能力。笛卡尔乘积组合特征方法一般应用于离散特征之间。

@timer
def feature_interaction(df, left, right):
"""
Parameters
----------
df: pyspark dataframe.
left, right: The list of interact variables. default=None
"""
print(f"Built {len(left) * len(right)} features.")
# Make a new dataframe to hold interaction features
inputCols = set(left + right)
df_new = df.select("SK_ID_CURR", *inputCols)
i = 0
for rvar in right:
df_new = df_new.withColumns({f"{lvar}&{rvar}": fn.concat_ws('&', df[lvar], df[rvar]) for lvar in left if lvar !=rvar})
i += 1
progress(i / len(right))
return df_new.drop(*inputCols)

feature_interaction(df.limit(5), discrete_to_trans, discrete_to_trans).toPandas()
Starting time	 20:13:41
Built 9 features.
Processing: [##################################################]100.0%feature_interaction cost 0 hours 0 minutes 0 seconds
SK_ID_CURR HOUR_APPR_PROCESS_START&OCCUPATION_TYPE NAME_EDUCATION_TYPE&OCCUPATION_TYPE OCCUPATION_TYPE&HOUR_APPR_PROCESS_START NAME_EDUCATION_TYPE&HOUR_APPR_PROCESS_START OCCUPATION_TYPE&NAME_EDUCATION_TYPE HOUR_APPR_PROCESS_START&NAME_EDUCATION_TYPE
0 100180 15&0.048305318 3&0.048305318 0.048305318&15 3&15 0.048305318&3 15&3
1 100201 10&0.06304005 3&0.06304005 0.06304005&10 3&10 0.06304005&3 10&3
2 100243 13&0.10578712 3&0.10578712 0.10578712&13 3&13 0.10578712&3 13&3
3 100435 10&0.09631742 1&0.09631742 0.09631742&10 1&10 0.09631742&1 10&1
4 100441 16&0.09631742 1&0.09631742 0.09631742&16 1&16 0.09631742&1 16&1

多项式特征

多项式特征是 sklearn 中特征构造的最简单方法。当我们创建多项式特征时,尽量避免使用过高的度数,因为特征的数量随着度数指数级地变化,并且可能过拟合。

现在我们使用3度多项式来查看结果:

from pyspark.ml.feature import PolynomialExpansion

# Make a new dataframe for polynomial features
assembler = VectorAssembler(
inputCols=['EXT_SOURCE_1', 'EXT_SOURCE_2', 'EXT_SOURCE_3', 'DAYS_BIRTH'],
outputCol="ext_features"
)
poly_df = assembler.transform(df)

# Create the polynomial object with specified degree
poly_transformer = PolynomialExpansion(
degree=3,
inputCol='ext_features',
outputCol='poly_expanded'
)

# Train and transform the polynomial features
poly_transformer.transform(poly_df).select('poly_expanded').show(5)
+--------------------+
|       poly_expanded|
+--------------------+
|[0.72908990917097...|
|[0.76491718757837...|
|[0.50549856415141...|
|[0.50549856415141...|
|[0.50549856415141...|
+--------------------+
only showing top 5 rows
del poly_df
gc.collect()
13801

聚类分析

聚类算法在特征构造中的应用有不少,例如:利用聚类算法对文本聚类,使用聚类类标结果作为输入特征;利用聚类算法对单个数值特征进行聚类,相当于分箱;利用聚类算法对R、F、M数据进行聚类,类似RFM模型,然后再使用代表衡量客户价值的聚类类标结果作为输入特征。

当一个或多个特征具有多峰分布(有两个或两个以上清晰的峰值)时,可以使用聚类算法为每个峰值分类,并输出聚类类标结果。

age_simi = df.select((df['DAYS_BIRTH']/-365).alias('age'))
plt.figure(figsize=(5,4))
sns.histplot(x=age_simi.sample(0.1).toPandas()['age'], bins=30)

可以看到有两个峰值:40和55

一般聚类类标结果为一个数值,但实际上这个数值并没有大小之分,所以一般需要进行one-hot编码,或者创建新特征来度量样本和每个类中心的相似性(距离)。相似性度量通常使用径向基函数(RBF)来计算。

径向基函数(Radial Basis Function,简称RBF)是一种在机器学习和统计建模中常用的函数类型。它们以距离某个中心点的距离作为输入,并输出一个数值,通常表示为距离的“衰减函数”。最常用的RBF是高斯RBF,其输出值随着输入值远离固定点而呈指数衰减。接下来,我们先定义高斯RBF函数 k(x,y)=exp(γxy2)k(x,y)=\exp(-\gamma\|x-y\|^2),超参数 gamma 确定当x远离y时相似性度量衰减的速度。

from pyspark.ml.linalg import Vector
def rbf_kernel(row, vector1, vectors2, outputCols, gamma=1.0):
if isinstance(vector1, str):
v1 = row[vector1]
else:
v1 = Vectors.dense([row[i] for i in vector1])

row = row.asDict()
for i, v2 in enumerate(vectors2):
v2 = Vectors.dense(v2)
squared_distance = v1.squared_distance(v2)
rbf = np.exp(-gamma * squared_distance)
row[outputCols[i]] = float(rbf)
return Row(**row)

下图显示了年龄的两个径向基函数:

age_simi = age_simi.rdd.map(lambda x: rbf_kernel(x, ['age'], [[40], [55]], ['simi_40', 'simi_55'], gamma=0.01)).toDF()

age_simi_pdf = age_simi.sample(0.1).toPandas()
fig = plt.figure(figsize=(5,4))
ax1 = fig.add_subplot(111)
sns.histplot(data=age_simi_pdf, x='age', bins=30, ax=ax1)

ax2 = ax1.twinx()
sns.lineplot(data=age_simi_pdf, x='age', y='simi_40', ci=None, ax=ax2, color='green')
sns.lineplot(data=age_simi_pdf, x='age', y='simi_55', ci=None, ax=ax2, color='orange')

del age_simi, age_simi_pdf
gc.collect()
151

如果这个特定的特征与目标变量有很好的相关性,那么这个新特征将有很好的机会发挥作用。

接下来,我们自定义一个函数使用rbf_kernel() 来衡量每个样本与每个聚类中心的相似程度:

from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import RobustScaler

def ClusterSimilarity(df, inputCols, outputCols, n_clusters=5, gamma=1.0, seed=None):
# Assemble the feature columns into a single vector column
assembler = VectorAssembler(
inputCols=inputCols,
outputCol="features"
)
# Standardize
scaler = RobustScaler(inputCol="features", outputCol="scaled")
# Trains a k-means model.
kmeans = KMeans(featuresCol="scaled", k=n_clusters, seed=seed)
pipeline = Pipeline(stages=[assembler, scaler, kmeans]).fit(df)
kmeans = pipeline.stages[2]
# Coordinates of cluster centers.
clusterCenters = [list(i) for i in kmeans.clusterCenters()]

df = pipeline.transform(df).select("SK_ID_CURR", "scaled")
df = df.rdd.map(lambda x: rbf_kernel(x, "scaled", clusterCenters, outputCols, gamma=gamma)).toDF()
return df.drop("scaled")

现在让我们使用这个转换器:

cluster_features = ClusterSimilarity(
df=df,
inputCols=df.drop("SK_ID_CURR",'label').columns,
outputCols=[f"centroid_{i}" for i in range(5)],
n_clusters=5,
gamma=1.0,
seed=SEED
)

cluster_features.show(5)
+----------+--------------------+----------+----------+----------+----------+
|SK_ID_CURR|          centroid_0|centroid_1|centroid_2|centroid_3|centroid_4|
+----------+--------------------+----------+----------+----------+----------+
|    100180|3.712547879890347...|       0.0|       0.0|       0.0|       0.0|
|    100201|3.217366500721083...|       0.0|       0.0|       0.0|       0.0|
|    100243|1.598249005834992...|       0.0|       0.0|       0.0|       0.0|
|    100435|3.543010683726346...|       0.0|       0.0|       0.0|       0.0|
|    100441|1.021260362527048...|       0.0|       0.0|       0.0|       0.0|
+----------+--------------------+----------+----------+----------+----------+
only showing top 5 rows

主成分分析

由于我们新增的这些特征都是和原始特征高度相关,可以使用PCA的主成分作为新的特征来消除相关性。

由于主成分分析主要用于降维,我们将在后续特征选择部分详细介绍。

总结

合并之前创造的特征

# Combine datasets
df_created = (df.join(math_features, on='SK_ID_CURR')
# .join(agg_features, on='SK_ID_CURR')
.join(cluster_features, on='SK_ID_CURR')
)

缺失值处理

# Remove variables with high missing rate
def drop_missing_data(df, threshold=0.8):
# Remove variables with missing more than threshold(default 20%)
thresh = int(df.count() * (1 - threshold))
exprs = [fn.sum(df[col].isNull().cast('int')).alias(col) for col in df.columns]
missing_number = df.select(*exprs).first().asDict()
cols_to_drop = [k for k,v in missing_number.items() if v > thresh]
print(f"Removed {len(cols_to_drop)} variables with missing more than {1 - threshold:.1%}")
return df.drop(*cols_to_drop)

def handle_missing(df):
# Remove variables with high missing rate
df = drop_missing_data(df, threshold=0.8)

# Univariate imputer for completing missing values with simple strategies.
dtypes = df.drop("SK_ID_CURR", "label").dtypes
numerical_cols = [k for k, v in dtypes if v not in ('string', 'vector')]
imputed_cols = [f"imputed_{col}" for col in numerical_cols]
imputer = ft.Imputer(
inputCols=numerical_cols,
outputCols=imputed_cols,
strategy="median"
)
df = imputer.fit(df).transform(df)
colsMap = dict(zip(imputed_cols, numerical_cols))
df = df.drop(*numerical_cols).withColumnsRenamed(colsMap)
return df
df_created = handle_missing(df_created.replace(np.nan, None))
missing_num = df_created.select([df_created[var].isNull().cast('int') for var in df_created.columns])
missing_num = pd.Series(missing_num.first().asDict())
print(f"The dataframe has {missing_num.sum()} columns that have missing values.")
Removed 26 variables with missing more than 20.0%
The dataframe has 0 columns that have missing values.
print(f"dataset shape: ({df_created.count()}, {len(df_created.columns)})")          
dataset shape: (307511, 373)

我们继续使用XGBoost模型评估创造的新特征

features = df_created.drop('SK_ID_CURR', 'label').columns
feature_importances = score_dataset(df_created, inputCols=features, nfold=2)
Starting time	 20:18:16
[0] train's areaUnderROC: 0.8589685970790397,  valid's areaUnderROC: 0.7649915434701238
[1] train's areaUnderROC: 0.8599883799618516,  valid's areaUnderROC: 0.7598499769233037
cv_agg's valid auc: 0.7624 +/- 0.00257
score_dataset cost 0 hours 12 minutes 29 seconds

特征重要性

feature_importances['fscore'].head(20)
DAYS_ID_PUBLISH/AMT_CREDIT            981.0
AMT_GOODS_PRICE/AMT_CREDIT            980.0
DAYS_ID_PUBLISH/DAYS_BIRTH            680.5
YEARS_BEGINEXPLUATATION_MODE          581.0
DAYS_EMPLOYED/EXT_SOURCE_1            555.0
NONLIVINGAPARTMENTS_MEDI              542.0
DAYS_LAST_PHONE_CHANGE/AMT_ANNUITY    530.0
DAYS_LAST_PHONE_CHANGE/AMT_CREDIT     528.0
NONLIVINGAREA_MEDI                    509.0
DAYS_EMPLOYED/EXT_SOURCE_3            489.5
AMT_GOODS_PRICE/DAYS_BIRTH            464.5
DAYS_EMPLOYED/ORGANIZATION_TYPE       460.0
LIVINGAREA_MEDI                       454.0
EXT_SOURCE_1/EXT_SOURCE_2             410.0
EXT_SOURCE_1/OWN_CAR_AGE              405.0
EXT_SOURCE_1/ORGANIZATION_TYPE        392.0
DAYS_EMPLOYED/OWN_CAR_AGE             381.5
sqrt(AMT_GOODS_PRICE)                 380.5
OWN_CAR_AGE/EXT_SOURCE_2              378.0
NONLIVINGAREA_MODE                    377.0
Name: fscore, dtype: float64

保存数据集

df_created.write.bucketBy(100, "SK_ID_CURR").mode("overwrite").saveAsTable("home_credit_default_risk.created_data")
spark.stop()