from pyspark.conf import SparkConf from pyspark.sql import SparkSession from pyspark.ml import Pipeline from pyspark.ml import Estimator, Transformer from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder import pyspark.sql.functions as fn import pyspark.ml.feature as ft from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator from pyspark.ml.linalg import Vectors from pyspark.sql import Row from pyspark.sql import Observation from pyspark.sql import Window from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit from xgboost.spark import SparkXGBClassifier import xgboost as xgb
import os import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns
# Use 0.11.4-spark3.3 version for Spark3.3 and 1.0.2 version for Spark3.4 spark = SparkSession.builder \ .master("local[*]") \ .appName("XGBoost with PySpark") \ .config("spark.driver.memory", "10g") \ .config("spark.driver.cores", "2") \ .config("spark.executor.memory", "10g") \ .config("spark.executor.cores", "2") \ .enableHiveSupport() \ .getOrCreate() sc = spark.sparkContext sc.setLogLevel('ERROR')
24/06/03 20:13:12 WARN Utils: Your hostname, MacBook-Air resolves to a loopback address: 127.0.0.1; using 192.168.1.5 instead (on interface en0)
24/06/03 20:13:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/03 20:13:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
df = spark.sql("select * from home_credit_default_risk.prepared_data")
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
# Number of each type of column dtypes = dict(df.dtypes) pd.Series(dtypes).value_counts()
plt.figure(figsize = (10, 6)) # iterate through the new features for i, feature inenumerate(new_cols): # create a new subplot for each feature plt.subplot(2, 2, i + 1) sns.kdeplot(data=math_features, x=feature, hue='label', common_norm=False)
# Group AMT_INCOME_TOTAL by NAME_INCOME_TYPE and calculate mean, max, min of loans df.groupBy('OCCUPATION_TYPE').agg( fn.mean('AMT_INCOME_TOTAL').alias('mean'), fn.max('AMT_INCOME_TOTAL').alias('max'), fn.min('AMT_INCOME_TOTAL').alias('min') ).show(5)
classAggFeatures(Estimator, Transformer): """ Transformer to aggregate features in a dataframe. This can be used to create features for each instance of the grouping variable. Parameters ---------- inputCols: List[string] The list of input variables. At least one of `variables`, groupBy: List[string] The variables to group by. funcs: List[string] List of Aggregation Feature types to apply. - Numeric: ['mean', 'median', 'max', 'min', 'skew', 'std', 'kurt'] - Category: ['mode', 'num_unique'] numBins: int, default=10 The number of bins to produce. """ def__init__(self, inputCols, funcs, groupByDiscrete=None, groupByContinuous=None, numBins=10): self.inputCols = inputCols self.funcs = funcs self.groupByDiscrete = groupByDiscrete self.groupByContinuous = groupByContinuous self.numBins= numBins def_fit(self, df): return self
@timer def_transform(self, df): groupBy = [] if self.groupByContinuous isnotNone: outputCols = [f"{col}_binned"for col in self.groupByContinuous] bucketizer = ft.QuantileDiscretizer( numBuckets=self.numBins, handleInvalid='keep', inputCols=self.groupByContinuous, outputCols=outputCols ).fit(df) df = bucketizer.transform(df) groupBy.extend(outputCols) if self.groupByDiscrete isnotNone: groupBy.extend(self.groupByDiscrete) iflen(groupBy) == 0: raise ValueError("groupBy is None.") # Group by the specified variable and calculate the statistics mapping = { 'mean': fn.mean, 'median': fn.median, 'max': fn.max, 'min': fn.min, 'skew': fn.skewness, 'kurt': fn.kurtosis, 'std': fn.std, 'mode': fn.mode, 'num_unique': fn.countDistinct } new_cols = [] i = 0 for by in groupBy: # Skip the grouping variable other_vars = [var for var in self.inputCols if by notin [var, f"{var}_binned"]] for f in self.funcs: colnames = {col: f"{f}({col})_by({by})"for col in other_vars} new_cols.extend(colnames.values()) f = mapping[f] grouped = df.groupBy(by).agg(*[f(df[var]).alias(name) for var, name in colnames.items()]) df = df.join(grouped.dropna(), on=by, how='left') i += 1 progress(i / len(groupBy) / len(self.funcs)) print(f"Created {len(new_cols)} new features.") return df.select("SK_ID_CURR", *new_cols)
@timer deffeature_interaction(df, left, right): """ Parameters ---------- df: pyspark dataframe. left, right: The list of interact variables. default=None """ print(f"Built {len(left) * len(right)} features.") # Make a new dataframe to hold interaction features inputCols = set(left + right) df_new = df.select("SK_ID_CURR", *inputCols) i = 0 for rvar in right: df_new = df_new.withColumns({f"{lvar}&{rvar}": fn.concat_ws('&', df[lvar], df[rvar]) for lvar in left if lvar !=rvar}) i += 1 progress(i / len(right)) return df_new.drop(*inputCols)
from pyspark.ml.feature import PolynomialExpansion
# Make a new dataframe for polynomial features assembler = VectorAssembler( inputCols=['EXT_SOURCE_1', 'EXT_SOURCE_2', 'EXT_SOURCE_3', 'DAYS_BIRTH'], outputCol="ext_features" ) poly_df = assembler.transform(df)
# Create the polynomial object with specified degree poly_transformer = PolynomialExpansion( degree=3, inputCol='ext_features', outputCol='poly_expanded' )
# Train and transform the polynomial features poly_transformer.transform(poly_df).select('poly_expanded').show(5)
+--------------------+
| poly_expanded|
+--------------------+
|[0.72908990917097...|
|[0.76491718757837...|
|[0.50549856415141...|
|[0.50549856415141...|
|[0.50549856415141...|
+--------------------+
only showing top 5 rows
# Remove variables with high missing rate defdrop_missing_data(df, threshold=0.8): # Remove variables with missing more than threshold(default 20%) thresh = int(df.count() * (1 - threshold)) exprs = [fn.sum(df[col].isNull().cast('int')).alias(col) for col in df.columns] missing_number = df.select(*exprs).first().asDict() cols_to_drop = [k for k,v in missing_number.items() if v > thresh] print(f"Removed {len(cols_to_drop)} variables with missing more than {1 - threshold:.1%}") return df.drop(*cols_to_drop)
defhandle_missing(df): # Remove variables with high missing rate df = drop_missing_data(df, threshold=0.8) # Univariate imputer for completing missing values with simple strategies. dtypes = df.drop("SK_ID_CURR", "label").dtypes numerical_cols = [k for k, v in dtypes if v notin ('string', 'vector')] imputed_cols = [f"imputed_{col}"for col in numerical_cols] imputer = ft.Imputer( inputCols=numerical_cols, outputCols=imputed_cols, strategy="median" ) df = imputer.fit(df).transform(df) colsMap = dict(zip(imputed_cols, numerical_cols)) df = df.drop(*numerical_cols).withColumnsRenamed(colsMap) return df
df_created = handle_missing(df_created.replace(np.nan, None)) missing_num = df_created.select([df_created[var].isNull().cast('int') for var in df_created.columns]) missing_num = pd.Series(missing_num.first().asDict()) print(f"The dataframe has {missing_num.sum()} columns that have missing values.")
Removed 26 variables with missing more than 20.0%
The dataframe has 0 columns that have missing values.