1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17   
 18  import numpy 
 19   
 20  from numpy import array, shape 
 21  from pyspark import SparkContext 
 22  from pyspark.mllib._common import \ 
 23      _dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \ 
 24      _serialize_double_matrix, _deserialize_double_matrix, \ 
 25      _serialize_double_vector, _deserialize_double_vector, \ 
 26      _get_initial_weights, _serialize_rating, _regression_train_wrapper, \ 
 27      _linear_predictor_typecheck, _get_unmangled_labeled_point_rdd 
 28  from pyspark.mllib.linalg import SparseVector 
 29  from pyspark.mllib.regression import LabeledPoint, LinearModel 
 30  from math import exp, log 
 34   
 35      """A linear binary classification model derived from logistic regression. 
 36   
 37      >>> data = [ 
 38      ...     LabeledPoint(0.0, [0.0]), 
 39      ...     LabeledPoint(1.0, [1.0]), 
 40      ...     LabeledPoint(1.0, [2.0]), 
 41      ...     LabeledPoint(1.0, [3.0]) 
 42      ... ] 
 43      >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data)) 
 44      >>> lrm.predict(array([1.0])) > 0 
 45      True 
 46      >>> lrm.predict(array([0.0])) <= 0 
 47      True 
 48      >>> sparse_data = [ 
 49      ...     LabeledPoint(0.0, SparseVector(2, {0: 0.0})), 
 50      ...     LabeledPoint(1.0, SparseVector(2, {1: 1.0})), 
 51      ...     LabeledPoint(0.0, SparseVector(2, {0: 0.0})), 
 52      ...     LabeledPoint(1.0, SparseVector(2, {1: 2.0})) 
 53      ... ] 
 54      >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(sparse_data)) 
 55      >>> lrm.predict(array([0.0, 1.0])) > 0 
 56      True 
 57      >>> lrm.predict(array([0.0, 0.0])) <= 0 
 58      True 
 59      >>> lrm.predict(SparseVector(2, {1: 1.0})) > 0 
 60      True 
 61      >>> lrm.predict(SparseVector(2, {1: 0.0})) <= 0 
 62      True 
 63      """ 
 64   
 66          _linear_predictor_typecheck(x, self._coeff) 
 67          margin = _dot(x, self._coeff) + self._intercept 
 68          if margin > 0: 
 69              prob = 1 / (1 + exp(-margin)) 
 70          else: 
 71              exp_margin = exp(margin) 
 72              prob = exp_margin / (1 + exp_margin) 
 73          return 1 if prob > 0.5 else 0 
   74   
 77   
 78      @classmethod 
 79 -    def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0, 
 80                initialWeights=None, regParam=1.0, regType=None, intercept=False): 
  81          """ 
 82          Train a logistic regression model on the given data. 
 83   
 84          @param data:              The training data. 
 85          @param iterations:        The number of iterations (default: 100). 
 86          @param step:              The step parameter used in SGD 
 87                                    (default: 1.0). 
 88          @param miniBatchFraction: Fraction of data to be used for each SGD 
 89                                    iteration. 
 90          @param initialWeights:    The initial weights (default: None). 
 91          @param regParam:          The regularizer parameter (default: 1.0). 
 92          @param regType:           The type of regularizer used for training 
 93                                    our model. 
 94                                    Allowed values: "l1" for using L1Updater, 
 95                                                    "l2" for using 
 96                                                         SquaredL2Updater, 
 97                                                    "none" for no regularizer. 
 98                                    (default: "none") 
 99          @param intercept:         Boolean parameter which indicates the use 
100                                    or not of the augmented representation for 
101                                    training data (i.e. whether bias features 
102                                    are activated or not). 
103          """ 
104          sc = data.context 
105          if regType is None: 
106              regType = "none" 
107          train_func = lambda d, i: sc._jvm.PythonMLLibAPI().trainLogisticRegressionModelWithSGD( 
108              d._jrdd, iterations, step, miniBatchFraction, i, regParam, regType, intercept) 
109          return _regression_train_wrapper(sc, train_func, LogisticRegressionModel, data, 
110                                           initialWeights) 
  111   
114   
115      """A support vector machine. 
116   
117      >>> data = [ 
118      ...     LabeledPoint(0.0, [0.0]), 
119      ...     LabeledPoint(1.0, [1.0]), 
120      ...     LabeledPoint(1.0, [2.0]), 
121      ...     LabeledPoint(1.0, [3.0]) 
122      ... ] 
123      >>> svm = SVMWithSGD.train(sc.parallelize(data)) 
124      >>> svm.predict(array([1.0])) > 0 
125      True 
126      >>> sparse_data = [ 
127      ...     LabeledPoint(0.0, SparseVector(2, {0: -1.0})), 
128      ...     LabeledPoint(1.0, SparseVector(2, {1: 1.0})), 
129      ...     LabeledPoint(0.0, SparseVector(2, {0: 0.0})), 
130      ...     LabeledPoint(1.0, SparseVector(2, {1: 2.0})) 
131      ... ] 
132      >>> svm = SVMWithSGD.train(sc.parallelize(sparse_data)) 
133      >>> svm.predict(SparseVector(2, {1: 1.0})) > 0 
134      True 
135      >>> svm.predict(SparseVector(2, {0: -1.0})) <= 0 
136      True 
137      """ 
138   
140          _linear_predictor_typecheck(x, self._coeff) 
141          margin = _dot(x, self._coeff) + self._intercept 
142          return 1 if margin >= 0 else 0 
  143   
146   
147      @classmethod 
148 -    def train(cls, data, iterations=100, step=1.0, regParam=1.0, 
149                miniBatchFraction=1.0, initialWeights=None, regType=None, intercept=False): 
 150          """ 
151          Train a support vector machine on the given data. 
152   
153          @param data:              The training data. 
154          @param iterations:        The number of iterations (default: 100). 
155          @param step:              The step parameter used in SGD 
156                                    (default: 1.0). 
157          @param regParam:          The regularizer parameter (default: 1.0). 
158          @param miniBatchFraction: Fraction of data to be used for each SGD 
159                                    iteration. 
160          @param initialWeights:    The initial weights (default: None). 
161          @param regType:           The type of regularizer used for training 
162                                    our model. 
163                                    Allowed values: "l1" for using L1Updater, 
164                                                    "l2" for using 
165                                                         SquaredL2Updater, 
166                                                    "none" for no regularizer. 
167                                    (default: "none") 
168          @param intercept:         Boolean parameter which indicates the use 
169                                    or not of the augmented representation for 
170                                    training data (i.e. whether bias features 
171                                    are activated or not). 
172          """ 
173          sc = data.context 
174          if regType is None: 
175              regType = "none" 
176          train_func = lambda d, i: sc._jvm.PythonMLLibAPI().trainSVMModelWithSGD( 
177              d._jrdd, iterations, step, regParam, miniBatchFraction, i, regType, intercept) 
178          return _regression_train_wrapper(sc, train_func, SVMModel, data, initialWeights) 
  179   
182   
183      """ 
184      Model for Naive Bayes classifiers. 
185   
186      Contains two parameters: 
187      - pi: vector of logs of class priors (dimension C) 
188      - theta: matrix of logs of class conditional probabilities (CxD) 
189   
190      >>> data = [ 
191      ...     LabeledPoint(0.0, [0.0, 0.0]), 
192      ...     LabeledPoint(0.0, [0.0, 1.0]), 
193      ...     LabeledPoint(1.0, [1.0, 0.0]), 
194      ... ] 
195      >>> model = NaiveBayes.train(sc.parallelize(data)) 
196      >>> model.predict(array([0.0, 1.0])) 
197      0.0 
198      >>> model.predict(array([1.0, 0.0])) 
199      1.0 
200      >>> sparse_data = [ 
201      ...     LabeledPoint(0.0, SparseVector(2, {1: 0.0})), 
202      ...     LabeledPoint(0.0, SparseVector(2, {1: 1.0})), 
203      ...     LabeledPoint(1.0, SparseVector(2, {0: 1.0})) 
204      ... ] 
205      >>> model = NaiveBayes.train(sc.parallelize(sparse_data)) 
206      >>> model.predict(SparseVector(2, {1: 1.0})) 
207      0.0 
208      >>> model.predict(SparseVector(2, {0: 1.0})) 
209      1.0 
210      """ 
211   
213          self.labels = labels 
214          self.pi = pi 
215          self.theta = theta 
 216   
218          """Return the most likely class for a data vector x""" 
219          return self.labels[numpy.argmax(self.pi + _dot(x, self.theta.transpose()))] 
  220   
223   
224      @classmethod 
225 -    def train(cls, data, lambda_=1.0): 
 226          """ 
227          Train a Naive Bayes model given an RDD of (label, features) vectors. 
228   
229          This is the Multinomial NB (U{http://tinyurl.com/lsdw6p}) which can 
230          handle all kinds of discrete data.  For example, by converting 
231          documents into TF-IDF vectors, it can be used for document 
232          classification.  By making every vector a 0-1 vector, it can also be 
233          used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}). 
234   
235          @param data: RDD of NumPy vectors, one per element, where the first 
236                 coordinate is the label and the rest is the feature vector 
237                 (e.g. a count vector). 
238          @param lambda_: The smoothing parameter 
239          """ 
240          sc = data.context 
241          dataBytes = _get_unmangled_labeled_point_rdd(data) 
242          ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_) 
243          return NaiveBayesModel( 
244              _deserialize_double_vector(ans[0]), 
245              _deserialize_double_vector(ans[1]), 
246              _deserialize_double_matrix(ans[2])) 
  247   
250      import doctest 
251      globs = globals().copy() 
252      globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) 
253      (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) 
254      globs['sc'].stop() 
255      if failure_count: 
256          exit(-1) 
 257   
258  if __name__ == "__main__": 
259      _test() 
260