MLlib - Naive Bayes
Naive Bayes is a simple multiclass classification algorithm with the assumption of independence between every pair of features. Naive Bayes can be trained very efficiently. Within a single pass to the training data, it computes the conditional probability distribution of each feature given label, and then it applies Bayes’ theorem to compute the conditional probability distribution of label given an observation and use it for prediction. For more details, please visit the Wikipedia page Naive Bayes classifier.
In MLlib, we implemented multinomial naive Bayes, which is typically used for document classification. Within that context, each observation is a document, each feature represents a term, whose value is the frequency of the term. For its formulation, please visit the Wikipedia page Multinomial Naive Bayes or the section Naive Bayes text classification from the book Introduction to Information Retrieval. Additive smoothing can be used by setting the parameter $\lambda$ (default to $1.0$). For document classification, the input feature vectors are usually sparse. Please supply sparse vectors as input to take advantage of sparsity. Since the training data is only used once, it is not necessary to cache it.
Examples
NaiveBayes implements
multinomial naive Bayes. It takes an RDD of
LabeledPoint and an optional
smoothing parameter lambda as input, and output a
NaiveBayesModel, which
can be used for evaluation and prediction.
import org.apache.spark.mllib.classification.NaiveBayes
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
val data = sc.textFile("mllib/data/sample_naive_bayes_data.txt")
val parsedData = data.map { line =>
  val parts = line.split(',')
  LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
}
// Split data into training (60%) and test (40%).
val splits = parsedData.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0)
val test = splits(1)
val model = NaiveBayes.train(training, lambda = 1.0)
val prediction = model.predict(test.map(_.features))
val predictionAndLabel = prediction.zip(test.map(_.label))
val accuracy = 1.0 * predictionAndLabel.filter(x => x._1 == x._2).count() / test.count()
NaiveBayes implements
multinomial naive Bayes. It takes a Scala RDD of
LabeledPoint and an
optionally smoothing parameter lambda as input, and output a
NaiveBayesModel, which
can be used for evaluation and prediction.
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.classification.NaiveBayes;
import org.apache.spark.mllib.classification.NaiveBayesModel;
import org.apache.spark.mllib.regression.LabeledPoint;
import scala.Tuple2;
JavaRDD<LabeledPoint> training = ... // training set
JavaRDD<LabeledPoint> test = ... // test set
final NaiveBayesModel model = NaiveBayes.train(training.rdd(), 1.0);
JavaRDD<Double> prediction =
  test.map(new Function<LabeledPoint, Double>() {
    @Override public Double call(LabeledPoint p) {
      return model.predict(p.features());
    }
  });
JavaPairRDD<Double, Double> predictionAndLabel = 
  prediction.zip(test.map(new Function<LabeledPoint, Double>() {
    @Override public Double call(LabeledPoint p) {
      return p.label();
    }
  }));
double accuracy = 1.0 * predictionAndLabel.filter(new Function<Tuple2<Double, Double>, Boolean>() {
    @Override public Boolean call(Tuple2<Double, Double> pl) {
      return pl._1() == pl._2();
    }
  }).count() / test.count();
NaiveBayes implements multinomial
naive Bayes. It takes an RDD of
LabeledPoint and an optionally
smoothing parameter lambda as input, and output a
NaiveBayesModel, which can be
used for evaluation and prediction.
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import NaiveBayes
# an RDD of LabeledPoint
data = sc.parallelize([
  LabeledPoint(0.0, [0.0, 0.0])
  ... # more labeled points
])
# Train a naive Bayes model.
model = NaiveBayes.train(data, 1.0)
# Make prediction.
prediction = model.predict([0.0, 0.0])
