# Convert the categorical labels in the target column to numerical values indexer = StringIndexer( inputCol="Species", outputCol="label" )
创建特征向量:将所有的特征整合到单一列(估计器必须)
# Assemble the feature columns into a single vector column assembler = VectorAssembler( inputCols=["SepalLength(cm)", "SepalWidth(cm)", "PetalLength(cm)", "PetalWidth(cm)"], outputCol="features" )
拆分成训练集和测试集
# Split data into training and testing sets train, test = iris.randomSplit([0.8, 0.2], seed=42)
Step 3: 创建估计器
from pyspark.ml.classification import LogisticRegression
# Create a LogisticRegression instance. This instance is an Estimator. classifier = LogisticRegression( maxIter=10, regParam=0.01, featuresCol="features", labelCol='label' )
Step 4: 创建管道拟合模型
from pyspark.ml import Pipeline
# Assemble all the steps (indexing, assembling, and model building) into a pipeline. pipeline = Pipeline(stages=[indexer, assembler, classifier]) model = pipeline.fit(train)
>>> from pyspark.ml.feature import Word2Vec >>> >>> # Input data: Each row is a bag of words from a sentence or document. >>> documentDF = spark.createDataFrame([ ... ("Hi I heard about Spark".split(" "), ), ... ("I wish Java could use case classes".split(" "), ), ... ("Logistic regression models are neat".split(" "), ) ... ], ["text"]) >>> >>> # Learn a mapping from words to Vectors. >>> word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result") >>> model = word2Vec.fit(documentDF) 24/05/01 16:18:36 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS >>> >>> result = model.transform(documentDF) >>> for row in result.collect(): ... text, vector = row ... print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector))) ... Text: [Hi, I, heard, about, Spark] => Vector: [0.012264367192983627,-0.06442034244537354,-0.007622340321540833]
>>> from pyspark.ml import Pipeline >>> from pyspark.ml.classification import DecisionTreeClassifier >>> from pyspark.ml.feature import StringIndexer, VectorAssembler >>> from pyspark.ml.evaluation import MulticlassClassificationEvaluator
>>> # Load the dataset. >>> data = spark.read.csv("file:///iris.csv", inferSchema="true", header=True)
>>> # Index labels, adding metadata to the label column. >>> # Fit on whole dataset to include all labels in index. >>> labelIndexer = StringIndexer(inputCol="Species", outputCol="label")
>>> # Assemble the feature columns into a single vector column >>> assembler = VectorAssembler( ... inputCols=["SepalLength(cm)", "SepalWidth(cm)", "PetalLength(cm)", "PetalWidth(cm)"], ... outputCol="features" ... )
>>> # Split the data into training and test sets (30% held out for testing) >>> trainingData, testData = data.randomSplit([0.7, 0.3]) >>> # Train a DecisionTree model.
>>> dt = DecisionTreeClassifier(labelCol="label", featuresCol="features") >>> # Chain indexers and tree in a Pipeline >>> pipeline = Pipeline(stages=[labelIndexer, assembler, dt]) >>> # Train model. This also runs the indexers. >>> model = pipeline.fit(trainingData)
>>> # Make predictions. >>> predictions = model.transform(testData) >>> # Select example rows to display. >>> predictions.select("prediction", "label").show(5) +----------+-----+ |prediction|label| +----------+-----+ | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| | 0.0| 0.0| +----------+-----+ only showing top 5 rows
>>> from pyspark.ml.clustering import KMeans >>> from pyspark.ml.evaluation import ClusteringEvaluator
# Loads data. >>> data = spark.read.csv("file:///iris.txt", inferSchema="true", header=True)
>>> # Assemble the feature columns into a single vector column >>> data = VectorAssembler( ... inputCols=["SepalLength(cm)", "SepalWidth(cm)", "PetalLength(cm)", "PetalWidth(cm)"], ... outputCol="features" ... ).transform(data)
# Trains a k-means model. >>> kmeans = KMeans().setK(3).setSeed(1) >>> model = kmeans.fit(data)
# Make predictions >>> predictions = model.transform(data) >>> evaluator = ClusteringEvaluator() >>> silhouette = evaluator.evaluate(predictions) >>> print("Silhouette with squared euclidean distance = " + str(silhouette)) Silhouette with squared euclidean distance = 0.7342113066202739 >>> centers = model.clusterCenters() >>> print("Cluster Centers: ") Cluster Centers: >>> for center in centers: ... print(center) ... [6.853846153.076923085.715384622.05384615] [5.0063.4181.4640.244] [5.883606562.740983614.388524591.43442623]
协同过滤
协同过滤通常用于推荐系统。
pyspark.ml.recommendation
ALS
交替最小二乘(ALS)矩阵分解
from pyspark.ml.evaluation import RegressionEvaluator from pyspark.ml.recommendation import ALS from pyspark.sql import Row
# Build the recommendation model using ALS on the training data # Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop") model = als.fit(training)
# Evaluate the model by computing the RMSE on the test data predictions = model.transform(test) evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") rmse = evaluator.evaluate(predictions) print("Root-mean-square error = " + str(rmse))
# Generate top 10 movie recommendations for each user userRecs = model.recommendForAllUsers(10) # Generate top 10 user recommendations for each movie movieRecs = model.recommendForAllItems(10)
# Generate top 10 movie recommendations for a specified set of users users = ratings.select(als.getUserCol()).distinct().limit(3) userSubsetRecs = model.recommendForUserSubset(users, 10) # Generate top 10 user recommendations for a specified set of movies movies = ratings.select(als.getItemCol()).distinct().limit(3) movieSubSetRecs = model.recommendForItemSubset(movies, 10)
>>> from pyspark.ml.tuning import ParamGridBuilder, CrossValidator >>> from pyspark.ml.evaluation import MulticlassClassificationEvaluator >>> from pyspark.ml.classification import LogisticRegression
# Prepare training and test data. >>> iris = spark.read.csv("file:///iris.csv", inferSchema="true", header=True)
# Convert the categorical labels in the target column to numerical values >>> indexer = StringIndexer( ... inputCol="Species", ... outputCol="label" ... )
>>> # Assemble the feature columns into a single vector column >>> assembler = VectorAssembler( ... inputCols=["SepalLength(cm)", "SepalWidth(cm)", "PetalLength(cm)", "PetalWidth(cm)"], ... outputCol="features" ... )
>>> train, test = iris.randomSplit([0.9, 0.1], seed=42)
>>> lr = LogisticRegression(maxIter=100)
# Assemble all the steps (indexing, assembling, and model building) into a pipeline. >>> pipeline = Pipeline(stages=[indexer, assembler, lr])
# We use a ParamGridBuilder to construct a grid of parameters to search over. # CrossValidator will try all combinations of values and determine best model using # the evaluator. >>> paramGrid = ParamGridBuilder()\ ... .addGrid(lr.regParam, [0.1, 0.01]) \ ... .addGrid(lr.fitIntercept, [False, True])\ ... .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\ ... .build()
# In this case the estimator is simply the linear regression. # A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator. >>> crossval = CrossValidator(estimator=pipeline, ... estimatorParamMaps=paramGrid, ... evaluator=MulticlassClassificationEvaluator(), ... numFolds=3)
# Run cross-validation, and choose the best set of parameters. >>> cvModel = crossval.fit(train)
# Make predictions on test data. model is the model with combination of parameters # that performed best. >>> cvModel.transform(test)\ ... .select("features", "label", "prediction")\ ... .show(5) +-----------------+-----+----------+ | features|label|prediction| +-----------------+-----+----------+ |[4.8,3.4,1.6,0.2]| 1.0| 1.0| |[4.9,3.1,1.5,0.1]| 1.0| 1.0| |[5.4,3.4,1.5,0.4]| 1.0| 1.0| |[5.1,3.4,1.5,0.2]| 1.0| 1.0| |[5.1,3.8,1.6,0.2]| 1.0| 1.0| +-----------------+-----+----------+ only showing top 5 rows
// Instantiate metrics object val metrics = newBinaryClassificationMetrics(predictionAndLabels)
// Precision-Recall Curve valPRC = metrics.pr
// AUPRC val auPRC = metrics.areaUnderPR println(s"Area under precision-recall curve = $auPRC")
// ROC Curve val roc = metrics.roc
// AUROC val auROC = metrics.areaUnderROC println(s"Area under ROC = $auROC")
在 PySpark 中定义一个子类借用scala接口:
from pyspark.mllib.evaluation import BinaryClassificationMetrics
classCurveMetrics(BinaryClassificationMetrics): def__init__(self, *args): super(CurveMetrics, self).__init__(*args) def_to_list(self, rdd): points = [] # Note this collect could be inefficient for large datasets # considering there may be one probability per datapoint (at most) # The Scala version takes a numBins parameter, # but it doesn't seem possible to pass this from Python to Java for row in rdd.collect(): # Results are returned as type scala.Tuple2, # which doesn't appear to have a py4j mapping points += [(float(row._1()), float(row._2()))] return points defget_curve(self, method): rdd = getattr(self._java_model, method)().toJavaRDD() points = self._to_list(rdd) returnzip(*points) # return tuple(fpr, tpr)
定义子类后具体使用如下:
from pyspark.sql import Row defextractProbability(row, labelCol='label', probabilityCol='probability'): return Row(label = float(row[labelCol]), probability = float(row['probability'][1]))
from pyspark.sql import Window, functions as fn from pyspark.sql import feature as ft
defroc_curve_on_spark(dataset, labelCol='label', probabilityCol='probability'): """ Returns the receiver operating characteristic (ROC) curve, which is a Dataframe having two fields (FPR, TPR) with (0.0, 0.0) prepended and (1.0, 1.0) appended to it. """ roc = dataset.select(labelCol, probabilityCol) # window functions window = Window.orderBy(fn.desc(probabilityCol)) # accumulate the true positives with decreasing threshold roc = roc.withColumn('tps', fn.sum(roc[labelCol]).over(window)) # accumulate the false positives with decreasing threshold roc = roc.withColumn('fps', fn.sum(fn.lit(1) - roc[labelCol]).over(window))
# The total number of negative samples numPositive = roc.tail(1)[0]['tps'] numNegative = roc.tail(1)[0]['fps']
# Add an extra threshold position # to make sure that the curve starts at (0, 0) start_row = spark.createDataFrame([(0.0, 0.0)], schema=roc.schema) roc = start_row.unionAll(roc) return roc
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel from pyspark.mllib.util import MLUtils
# Load and parse the data file into an RDD of LabeledPoint. data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt') # Split the data into training and test sets (30% held out for testing) (trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a DecisionTree model. # Empty categoricalFeaturesInfo indicates all features are continuous. model = DecisionTree.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={}, impurity='gini', maxDepth=5, maxBins=32)
# Evaluate model on test instances and compute test error predictions = model.predict(testData.map(lambda x: x.features)) labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions) testErr = labelsAndPredictions.filter( lambda lp: lp[0] != lp[1]).count() / float(testData.count()) print('Test Error = ' + str(testErr)) print('Learned classification tree model:') print(model.toDebugString())
# Save and load model model.save(sc, "target/tmp/myDecisionTreeClassificationModel") sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeClassificationModel")