1   
   2   
   3   
   4   
   5   
   6   
   7   
   8   
   9   
  10   
  11   
  12   
  13   
  14   
  15   
  16   
  17   
  18  from base64 import standard_b64encode as b64enc 
  19  import copy 
  20  from collections import defaultdict 
  21  from collections import namedtuple 
  22  from itertools import chain, ifilter, imap 
  23  import operator 
  24  import os 
  25  import sys 
  26  import shlex 
  27  import traceback 
  28  from subprocess import Popen, PIPE 
  29  from tempfile import NamedTemporaryFile 
  30  from threading import Thread 
  31  import warnings 
  32  import heapq 
  33  from random import Random 
  34   
  35  from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ 
  36      BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long 
  37  from pyspark.join import python_join, python_left_outer_join, \ 
  38      python_right_outer_join, python_cogroup 
  39  from pyspark.statcounter import StatCounter 
  40  from pyspark.rddsampler import RDDSampler 
  41  from pyspark.storagelevel import StorageLevel 
  42  from pyspark.resultiterable import ResultIterable 
  43   
  44  from py4j.java_collections import ListConverter, MapConverter 
  45   
  46  __all__ = ["RDD"] 
  50      """ 
  51      This function returns the traceback info for a callsite, returns a dict 
  52      with function name, file name and line number 
  53      """ 
  54      tb = traceback.extract_stack() 
  55      callsite = namedtuple("Callsite", "function file linenum") 
  56      if len(tb) == 0: 
  57          return None 
  58      file, line, module, what = tb[len(tb) - 1] 
  59      sparkpath = os.path.dirname(file) 
  60      first_spark_frame = len(tb) - 1 
  61      for i in range(0, len(tb)): 
  62          file, line, fun, what = tb[i] 
  63          if file.startswith(sparkpath): 
  64              first_spark_frame = i 
  65              break 
  66      if first_spark_frame == 0: 
  67          file, line, fun, what = tb[0] 
  68          return callsite(function=fun, file=file, linenum=line) 
  69      sfile, sline, sfun, swhat = tb[first_spark_frame] 
  70      ufile, uline, ufun, uwhat = tb[first_spark_frame-1] 
  71      return callsite(function=sfun, file=ufile, linenum=uline) 
   72   
  73  _spark_stack_depth = 0 
  77          tb = _extract_concise_traceback() 
  78          if tb is not None: 
  79              self._traceback = "%s at %s:%s" % (tb.function, tb.file, tb.linenum) 
  80          else: 
  81              self._traceback = "Error! Could not extract traceback info" 
  82          self._context = sc 
   83   
  89   
  97      """ 
  98      An implementation of MaxHeap. 
  99      >>> import pyspark.rdd 
 100      >>> heap = pyspark.rdd.MaxHeapQ(5) 
 101      >>> [heap.insert(i) for i in range(10)] 
 102      [None, None, None, None, None, None, None, None, None, None] 
 103      >>> sorted(heap.getElements()) 
 104      [0, 1, 2, 3, 4] 
 105      >>> heap = pyspark.rdd.MaxHeapQ(5) 
 106      >>> [heap.insert(i) for i in range(9, -1, -1)] 
 107      [None, None, None, None, None, None, None, None, None, None] 
 108      >>> sorted(heap.getElements()) 
 109      [0, 1, 2, 3, 4] 
 110      >>> heap = pyspark.rdd.MaxHeapQ(1) 
 111      >>> [heap.insert(i) for i in range(9, -1, -1)] 
 112      [None, None, None, None, None, None, None, None, None, None] 
 113      >>> heap.getElements() 
 114      [0] 
 115      """ 
 116   
 118           
 119          self.q = [0] 
 120          self.maxsize = maxsize 
  121   
 123          while (k > 1) and (self.q[k/2] < self.q[k]): 
 124              self._swap(k, k/2) 
 125              k = k/2 
  126   
 128          t = self.q[i] 
 129          self.q[i] = self.q[j] 
 130          self.q[j] = t 
  131   
 133          N = self.size() 
 134          while 2 * k <= N: 
 135              j = 2 * k 
 136               
 137               
 138              if j < N and self.q[j] < self.q[j + 1]: 
 139                  j = j + 1 
 140              if(self.q[k] > self.q[j]): 
 141                  break 
 142              self._swap(k, j) 
 143              k = j 
  144   
 146          return len(self.q) - 1 
  147   
 149          if (self.size()) < self.maxsize: 
 150              self.q.append(value) 
 151              self._swim(self.size()) 
 152          else: 
 153              self._replaceRoot(value) 
  154   
 157   
 159          if(self.q[1] > value): 
 160              self.q[1] = value 
 161              self._sink(1) 
   162   
 164      """ 
 165      A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. 
 166      Represents an immutable, partitioned collection of elements that can be 
 167      operated on in parallel. 
 168      """ 
 169   
 170 -    def __init__(self, jrdd, ctx, jrdd_deserializer): 
  171          self._jrdd = jrdd 
 172          self.is_cached = False 
 173          self.is_checkpointed = False 
 174          self.ctx = ctx 
 175          self._jrdd_deserializer = jrdd_deserializer 
 176          self._id = jrdd.id() 
  177   
 179          """ 
 180          A unique ID for this RDD (within its SparkContext). 
 181          """ 
 182          return self._id 
  183   
 185          return self._jrdd.toString() 
  186   
 187      @property 
 189          """ 
 190          The L{SparkContext} that this RDD was created on. 
 191          """ 
 192          return self.ctx 
  193   
 195          """ 
 196          Persist this RDD with the default storage level (C{MEMORY_ONLY}). 
 197          """ 
 198          self.is_cached = True 
 199          self._jrdd.cache() 
 200          return self 
  201   
 203          """ 
 204          Set this RDD's storage level to persist its values across operations after the first time 
 205          it is computed. This can only be used to assign a new storage level if the RDD does not 
 206          have a storage level set yet. 
 207          """ 
 208          self.is_cached = True 
 209          javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel) 
 210          self._jrdd.persist(javaStorageLevel) 
 211          return self 
  212   
 214          """ 
 215          Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. 
 216          """ 
 217          self.is_cached = False 
 218          self._jrdd.unpersist() 
 219          return self 
  220   
 222          """ 
 223          Mark this RDD for checkpointing. It will be saved to a file inside the 
 224          checkpoint directory set with L{SparkContext.setCheckpointDir()} and 
 225          all references to its parent RDDs will be removed. This function must 
 226          be called before any job has been executed on this RDD. It is strongly 
 227          recommended that this RDD is persisted in memory, otherwise saving it 
 228          on a file will require recomputation. 
 229          """ 
 230          self.is_checkpointed = True 
 231          self._jrdd.rdd().checkpoint() 
  232   
 234          """ 
 235          Return whether this RDD has been checkpointed or not 
 236          """ 
 237          return self._jrdd.rdd().isCheckpointed() 
  238   
 240          """ 
 241          Gets the name of the file to which this RDD was checkpointed 
 242          """ 
 243          checkpointFile = self._jrdd.rdd().getCheckpointFile() 
 244          if checkpointFile.isDefined(): 
 245              return checkpointFile.get() 
 246          else: 
 247              return None 
  248   
 249 -    def map(self, f, preservesPartitioning=False): 
  250          """ 
 251          Return a new RDD by applying a function to each element of this RDD. 
 252           
 253          >>> rdd = sc.parallelize(["b", "a", "c"]) 
 254          >>> sorted(rdd.map(lambda x: (x, 1)).collect()) 
 255          [('a', 1), ('b', 1), ('c', 1)] 
 256          """ 
 257          def func(split, iterator): return imap(f, iterator) 
 258          return PipelinedRDD(self, func, preservesPartitioning) 
  259   
 260 -    def flatMap(self, f, preservesPartitioning=False): 
  261          """ 
 262          Return a new RDD by first applying a function to all elements of this 
 263          RDD, and then flattening the results. 
 264   
 265          >>> rdd = sc.parallelize([2, 3, 4]) 
 266          >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect()) 
 267          [1, 1, 1, 2, 2, 3] 
 268          >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect()) 
 269          [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)] 
 270          """ 
 271          def func(s, iterator): return chain.from_iterable(imap(f, iterator)) 
 272          return self.mapPartitionsWithIndex(func, preservesPartitioning) 
  273   
 275          """ 
 276          Return a new RDD by applying a function to each partition of this RDD. 
 277   
 278          >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 
 279          >>> def f(iterator): yield sum(iterator) 
 280          >>> rdd.mapPartitions(f).collect() 
 281          [3, 7] 
 282          """ 
 283          def func(s, iterator): return f(iterator) 
 284          return self.mapPartitionsWithIndex(func) 
  285   
 287          """ 
 288          Return a new RDD by applying a function to each partition of this RDD, 
 289          while tracking the index of the original partition. 
 290   
 291          >>> rdd = sc.parallelize([1, 2, 3, 4], 4) 
 292          >>> def f(splitIndex, iterator): yield splitIndex 
 293          >>> rdd.mapPartitionsWithIndex(f).sum() 
 294          6 
 295          """ 
 296          return PipelinedRDD(self, f, preservesPartitioning) 
  297   
 299          """ 
 300          Deprecated: use mapPartitionsWithIndex instead. 
 301   
 302          Return a new RDD by applying a function to each partition of this RDD, 
 303          while tracking the index of the original partition. 
 304   
 305          >>> rdd = sc.parallelize([1, 2, 3, 4], 4) 
 306          >>> def f(splitIndex, iterator): yield splitIndex 
 307          >>> rdd.mapPartitionsWithSplit(f).sum() 
 308          6 
 309          """ 
 310          warnings.warn("mapPartitionsWithSplit is deprecated; " 
 311              "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2) 
 312          return self.mapPartitionsWithIndex(f, preservesPartitioning) 
  313   
 315          """ 
 316          Return a new RDD containing only the elements that satisfy a predicate. 
 317   
 318          >>> rdd = sc.parallelize([1, 2, 3, 4, 5]) 
 319          >>> rdd.filter(lambda x: x % 2 == 0).collect() 
 320          [2, 4] 
 321          """ 
 322          def func(iterator): return ifilter(f, iterator) 
 323          return self.mapPartitions(func) 
  324   
 326          """ 
 327          Return a new RDD containing the distinct elements in this RDD. 
 328   
 329          >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) 
 330          [1, 2, 3] 
 331          """ 
 332          return self.map(lambda x: (x, None)) \ 
 333                     .reduceByKey(lambda x, _: x) \ 
 334                     .map(lambda (x, _): x) 
  335   
 336 -    def sample(self, withReplacement, fraction, seed=None): 
  337          """ 
 338          Return a sampled subset of this RDD (relies on numpy and falls back 
 339          on default random generator if numpy is unavailable). 
 340   
 341          >>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP 
 342          [2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98] 
 343          """ 
 344          assert fraction >= 0.0, "Invalid fraction value: %s" % fraction 
 345          return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True) 
  346   
 347       
 348 -    def takeSample(self, withReplacement, num, seed=None): 
  349          """ 
 350          Return a fixed-size sampled subset of this RDD (currently requires numpy). 
 351   
 352          >>> sc.parallelize(range(0, 10)).takeSample(True, 10, 1) #doctest: +SKIP 
 353          [4, 2, 1, 8, 2, 7, 0, 4, 1, 4] 
 354          """ 
 355   
 356          fraction = 0.0 
 357          total = 0 
 358          multiplier = 3.0 
 359          initialCount = self.count() 
 360          maxSelected = 0 
 361   
 362          if (num < 0): 
 363              raise ValueError 
 364   
 365          if (initialCount == 0): 
 366              return list() 
 367   
 368          if initialCount > sys.maxint - 1: 
 369              maxSelected = sys.maxint - 1 
 370          else: 
 371              maxSelected = initialCount 
 372   
 373          if num > initialCount and not withReplacement: 
 374              total = maxSelected 
 375              fraction = multiplier * (maxSelected + 1) / initialCount 
 376          else: 
 377              fraction = multiplier * (num + 1) / initialCount 
 378              total = num 
 379   
 380          samples = self.sample(withReplacement, fraction, seed).collect() 
 381   
 382           
 383           
 384           
 385          rand = Random(seed) 
 386          while len(samples) < total: 
 387              samples = self.sample(withReplacement, fraction, rand.randint(0, sys.maxint)).collect() 
 388   
 389          sampler = RDDSampler(withReplacement, fraction, rand.randint(0, sys.maxint)) 
 390          sampler.shuffle(samples) 
 391          return samples[0:total] 
  392   
 394          """ 
 395          Return the union of this RDD and another one. 
 396   
 397          >>> rdd = sc.parallelize([1, 1, 2, 3]) 
 398          >>> rdd.union(rdd).collect() 
 399          [1, 1, 2, 3, 1, 1, 2, 3] 
 400          """ 
 401          if self._jrdd_deserializer == other._jrdd_deserializer: 
 402              rdd = RDD(self._jrdd.union(other._jrdd), self.ctx, 
 403                        self._jrdd_deserializer) 
 404              return rdd 
 405          else: 
 406               
 407               
 408              self_copy = self._reserialize() 
 409              other_copy = other._reserialize() 
 410              return RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx, 
 411                         self.ctx.serializer) 
  412   
 414          """ 
 415          Return the intersection of this RDD and another one. The output will not  
 416          contain any duplicate elements, even if the input RDDs did. 
 417           
 418          Note that this method performs a shuffle internally. 
 419   
 420          >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5]) 
 421          >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8]) 
 422          >>> rdd1.intersection(rdd2).collect() 
 423          [1, 2, 3] 
 424          """ 
 425          return self.map(lambda v: (v, None)) \ 
 426              .cogroup(other.map(lambda v: (v, None))) \ 
 427              .filter(lambda x: (len(x[1][0]) != 0) and (len(x[1][1]) != 0)) \ 
 428              .keys() 
  429   
 431          if self._jrdd_deserializer == self.ctx.serializer: 
 432              return self 
 433          else: 
 434              return self.map(lambda x: x, preservesPartitioning=True) 
  435   
 437          """ 
 438          Return the union of this RDD and another one. 
 439   
 440          >>> rdd = sc.parallelize([1, 1, 2, 3]) 
 441          >>> (rdd + rdd).collect() 
 442          [1, 1, 2, 3, 1, 1, 2, 3] 
 443          """ 
 444          if not isinstance(other, RDD): 
 445              raise TypeError 
 446          return self.union(other) 
  447   
 448 -    def sortByKey(self, ascending=True, numPartitions=None, keyfunc = lambda x: x): 
  449          """ 
 450          Sorts this RDD, which is assumed to consist of (key, value) pairs. 
 451   
 452          >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] 
 453          >>> sc.parallelize(tmp).sortByKey(True, 2).collect() 
 454          [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] 
 455          >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)] 
 456          >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)]) 
 457          >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect() 
 458          [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)] 
 459          """ 
 460          if numPartitions is None: 
 461              numPartitions = self.ctx.defaultParallelism 
 462   
 463          bounds = list() 
 464   
 465           
 466           
 467           
 468          if numPartitions > 1: 
 469              rddSize = self.count() 
 470              maxSampleSize = numPartitions * 20.0  
 471              fraction = min(maxSampleSize / max(rddSize, 1), 1.0) 
 472   
 473              samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect() 
 474              samples = sorted(samples, reverse=(not ascending), key=keyfunc) 
 475   
 476               
 477               
 478              for i in range(0, numPartitions - 1): 
 479                  index = (len(samples) - 1) * (i + 1) / numPartitions 
 480                  bounds.append(samples[index]) 
 481   
 482          def rangePartitionFunc(k): 
 483              p = 0 
 484              while p < len(bounds) and keyfunc(k) > bounds[p]: 
 485                  p += 1 
 486              if ascending: 
 487                  return p 
 488              else: 
 489                  return numPartitions-1-p 
  490   
 491          def mapFunc(iterator): 
 492              yield sorted(iterator, reverse=(not ascending), key=lambda (k, v): keyfunc(k)) 
  493   
 494          return (self.partitionBy(numPartitions, partitionFunc=rangePartitionFunc) 
 495                      .mapPartitions(mapFunc,preservesPartitioning=True) 
 496                      .flatMap(lambda x: x, preservesPartitioning=True)) 
 497   
 499          """ 
 500          Return an RDD created by coalescing all elements within each partition 
 501          into a list. 
 502   
 503          >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 
 504          >>> sorted(rdd.glom().collect()) 
 505          [[1, 2], [3, 4]] 
 506          """ 
 507          def func(iterator): yield list(iterator) 
 508          return self.mapPartitions(func) 
  509   
 511          """ 
 512          Return the Cartesian product of this RDD and another one, that is, the 
 513          RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and 
 514          C{b} is in C{other}. 
 515   
 516          >>> rdd = sc.parallelize([1, 2]) 
 517          >>> sorted(rdd.cartesian(rdd).collect()) 
 518          [(1, 1), (1, 2), (2, 1), (2, 2)] 
 519          """ 
 520           
 521          deserializer = CartesianDeserializer(self._jrdd_deserializer, 
 522                                               other._jrdd_deserializer) 
 523          return RDD(self._jrdd.cartesian(other._jrdd), self.ctx, deserializer) 
  524   
 525 -    def groupBy(self, f, numPartitions=None): 
  526          """ 
 527          Return an RDD of grouped items. 
 528   
 529          >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8]) 
 530          >>> result = rdd.groupBy(lambda x: x % 2).collect() 
 531          >>> sorted([(x, sorted(y)) for (x, y) in result]) 
 532          [(0, [2, 8]), (1, [1, 1, 3, 5])] 
 533          """ 
 534          return self.map(lambda x: (f(x), x)).groupByKey(numPartitions) 
  535   
 536 -    def pipe(self, command, env={}): 
  537          """ 
 538          Return an RDD created by piping elements to a forked external process. 
 539   
 540          >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect() 
 541          ['1', '2', '', '3'] 
 542          """ 
 543          def func(iterator): 
 544              pipe = Popen(shlex.split(command), env=env, stdin=PIPE, stdout=PIPE) 
 545              def pipe_objs(out): 
 546                  for obj in iterator: 
 547                      out.write(str(obj).rstrip('\n') + '\n') 
 548                  out.close() 
  549              Thread(target=pipe_objs, args=[pipe.stdin]).start() 
 550              return (x.rstrip('\n') for x in iter(pipe.stdout.readline, '')) 
 551          return self.mapPartitions(func) 
 552   
 554          """ 
 555          Applies a function to all elements of this RDD. 
 556   
 557          >>> def f(x): print x 
 558          >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f) 
 559          """ 
 560          def processPartition(iterator): 
 561              for x in iterator: 
 562                  f(x) 
 563              yield None 
  564          self.mapPartitions(processPartition).collect()   
 565   
 567          """ 
 568          Applies a function to each partition of this RDD. 
 569   
 570          >>> def f(iterator):  
 571          ...      for x in iterator:  
 572          ...           print x  
 573          ...      yield None 
 574          >>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f) 
 575          """ 
 576          self.mapPartitions(f).collect()   
  577           
 579          """ 
 580          Return a list that contains all of the elements in this RDD. 
 581          """ 
 582          with _JavaStackTrace(self.context) as st: 
 583            bytesInJava = self._jrdd.collect().iterator() 
 584          return list(self._collect_iterator_through_file(bytesInJava)) 
  585   
 587           
 588           
 589           
 590          tempFile = NamedTemporaryFile(delete=False, dir=self.ctx._temp_dir) 
 591          tempFile.close() 
 592          self.ctx._writeToFile(iterator, tempFile.name) 
 593           
 594          with open(tempFile.name, 'rb') as tempFile: 
 595              for item in self._jrdd_deserializer.load_stream(tempFile): 
 596                  yield item 
 597          os.unlink(tempFile.name) 
  598   
 600          """ 
 601          Reduces the elements of this RDD using the specified commutative and 
 602          associative binary operator. Currently reduces partitions locally. 
 603   
 604          >>> from operator import add 
 605          >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add) 
 606          15 
 607          >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add) 
 608          10 
 609          """ 
 610          def func(iterator): 
 611              acc = None 
 612              for obj in iterator: 
 613                  if acc is None: 
 614                      acc = obj 
 615                  else: 
 616                      acc = f(obj, acc) 
 617              if acc is not None: 
 618                  yield acc 
  619          vals = self.mapPartitions(func).collect() 
 620          return reduce(f, vals) 
 621   
 622 -    def fold(self, zeroValue, op): 
  623          """ 
 624          Aggregate the elements of each partition, and then the results for all 
 625          the partitions, using a given associative function and a neutral "zero 
 626          value." 
 627   
 628          The function C{op(t1, t2)} is allowed to modify C{t1} and return it 
 629          as its result value to avoid object allocation; however, it should not 
 630          modify C{t2}. 
 631   
 632          >>> from operator import add 
 633          >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) 
 634          15 
 635          """ 
 636          def func(iterator): 
 637              acc = zeroValue 
 638              for obj in iterator: 
 639                  acc = op(obj, acc) 
 640              yield acc 
  641          vals = self.mapPartitions(func).collect() 
 642          return reduce(op, vals, zeroValue) 
 643   
 644 -    def aggregate(self, zeroValue, seqOp, combOp): 
  645          """ 
 646          Aggregate the elements of each partition, and then the results for all 
 647          the partitions, using a given combine functions and a neutral "zero 
 648          value." 
 649   
 650          The functions C{op(t1, t2)} is allowed to modify C{t1} and return it 
 651          as its result value to avoid object allocation; however, it should not 
 652          modify C{t2}. 
 653   
 654          The first function (seqOp) can return a different result type, U, than 
 655          the type of this RDD. Thus, we need one operation for merging a T into an U 
 656          and one operation for merging two U 
 657   
 658          >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1)) 
 659          >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1])) 
 660          >>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp) 
 661          (10, 4) 
 662          >>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp) 
 663          (0, 0) 
 664          """ 
 665          def func(iterator): 
 666              acc = zeroValue 
 667              for obj in iterator: 
 668                  acc = seqOp(acc, obj) 
 669              yield acc 
  670   
 671          return self.mapPartitions(func).fold(zeroValue, combOp) 
 672           
 673   
 675          """ 
 676          Find the maximum item in this RDD. 
 677   
 678          >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max() 
 679          43.0 
 680          """ 
 681          return self.reduce(max) 
  682   
 684          """ 
 685          Find the maximum item in this RDD. 
 686   
 687          >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min() 
 688          1.0 
 689          """ 
 690          return self.reduce(min) 
  691       
 693          """ 
 694          Add up the elements in this RDD. 
 695   
 696          >>> sc.parallelize([1.0, 2.0, 3.0]).sum() 
 697          6.0 
 698          """ 
 699          return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) 
  700   
 702          """ 
 703          Return the number of elements in this RDD. 
 704   
 705          >>> sc.parallelize([2, 3, 4]).count() 
 706          3 
 707          """ 
 708          return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 
  709   
 711          """ 
 712          Return a L{StatCounter} object that captures the mean, variance 
 713          and count of the RDD's elements in one operation. 
 714          """ 
 715          def redFunc(left_counter, right_counter): 
 716              return left_counter.mergeStats(right_counter) 
  717   
 718          return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc) 
 719   
 721          """ 
 722          Compute the mean of this RDD's elements. 
 723   
 724          >>> sc.parallelize([1, 2, 3]).mean() 
 725          2.0 
 726          """ 
 727          return self.stats().mean() 
  728   
 730          """ 
 731          Compute the variance of this RDD's elements. 
 732   
 733          >>> sc.parallelize([1, 2, 3]).variance() 
 734          0.666... 
 735          """ 
 736          return self.stats().variance() 
  737   
 739          """ 
 740          Compute the standard deviation of this RDD's elements. 
 741   
 742          >>> sc.parallelize([1, 2, 3]).stdev() 
 743          0.816... 
 744          """ 
 745          return self.stats().stdev() 
  746   
 748          """ 
 749          Compute the sample standard deviation of this RDD's elements (which corrects for bias in 
 750          estimating the standard deviation by dividing by N-1 instead of N). 
 751   
 752          >>> sc.parallelize([1, 2, 3]).sampleStdev() 
 753          1.0 
 754          """ 
 755          return self.stats().sampleStdev() 
  756   
 758          """ 
 759          Compute the sample variance of this RDD's elements (which corrects for bias in 
 760          estimating the variance by dividing by N-1 instead of N). 
 761   
 762          >>> sc.parallelize([1, 2, 3]).sampleVariance() 
 763          1.0 
 764          """ 
 765          return self.stats().sampleVariance() 
  766   
 768          """ 
 769          Return the count of each unique value in this RDD as a dictionary of 
 770          (value, count) pairs. 
 771   
 772          >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items()) 
 773          [(1, 2), (2, 3)] 
 774          """ 
 775          def countPartition(iterator): 
 776              counts = defaultdict(int) 
 777              for obj in iterator: 
 778                  counts[obj] += 1 
 779              yield counts 
  780          def mergeMaps(m1, m2): 
 781              for (k, v) in m2.iteritems(): 
 782                  m1[k] += v 
 783              return m1 
 784          return self.mapPartitions(countPartition).reduce(mergeMaps) 
 785       
 786 -    def top(self, num): 
  787          """ 
 788          Get the top N elements from a RDD. 
 789   
 790          Note: It returns the list sorted in descending order. 
 791          >>> sc.parallelize([10, 4, 2, 12, 3]).top(1) 
 792          [12] 
 793          >>> sc.parallelize([2, 3, 4, 5, 6], 2).cache().top(2) 
 794          [6, 5] 
 795          """ 
 796          def topIterator(iterator): 
 797              q = [] 
 798              for k in iterator: 
 799                  if len(q) < num: 
 800                      heapq.heappush(q, k) 
 801                  else: 
 802                      heapq.heappushpop(q, k) 
 803              yield q 
  804   
 805          def merge(a, b): 
 806              return next(topIterator(a + b)) 
 807   
 808          return sorted(self.mapPartitions(topIterator).reduce(merge), reverse=True) 
 809   
 811          """ 
 812          Get the N elements from a RDD ordered in ascending order or as specified 
 813          by the optional key function.  
 814   
 815          >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6) 
 816          [1, 2, 3, 4, 5, 6] 
 817          >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x) 
 818          [10, 9, 7, 6, 5, 4] 
 819          """ 
 820   
 821          def topNKeyedElems(iterator, key_=None): 
 822              q = MaxHeapQ(num) 
 823              for k in iterator: 
 824                  if key_ != None: 
 825                      k = (key_(k), k) 
 826                  q.insert(k) 
 827              yield q.getElements() 
  828   
 829          def unKey(x, key_=None): 
 830              if key_ != None: 
 831                  x = [i[1] for i in x] 
 832              return x 
 833           
 834          def merge(a, b): 
 835              return next(topNKeyedElems(a + b)) 
 836          result = self.mapPartitions(lambda i: topNKeyedElems(i, key)).reduce(merge) 
 837          return sorted(unKey(result, key), key=key) 
 838   
 839   
 840 -    def take(self, num): 
  841          """ 
 842          Take the first num elements of the RDD. 
 843   
 844          This currently scans the partitions *one by one*, so it will be slow if 
 845          a lot of partitions are required. In that case, use L{collect} to get 
 846          the whole RDD instead. 
 847   
 848          >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2) 
 849          [2, 3] 
 850          >>> sc.parallelize([2, 3, 4, 5, 6]).take(10) 
 851          [2, 3, 4, 5, 6] 
 852          """ 
 853          def takeUpToNum(iterator): 
 854              taken = 0 
 855              while taken < num: 
 856                  yield next(iterator) 
 857                  taken += 1 
  858           
 859          mapped = self.mapPartitions(takeUpToNum) 
 860          items = [] 
 861           
 862           
 863           
 864          with _JavaStackTrace(self.context) as st: 
 865              for partition in range(mapped._jrdd.splits().size()): 
 866                  partitionsToTake = self.ctx._gateway.new_array(self.ctx._jvm.int, 1) 
 867                  partitionsToTake[0] = partition 
 868                  iterator = mapped._jrdd.collectPartitions(partitionsToTake)[0].iterator() 
 869                  items.extend(mapped._collect_iterator_through_file(iterator)) 
 870                  if len(items) >= num: 
 871                      break 
 872          return items[:num] 
 873   
 875          """ 
 876          Return the first element in this RDD. 
 877   
 878          >>> sc.parallelize([2, 3, 4]).first() 
 879          2 
 880          """ 
 881          return self.take(1)[0] 
  882   
 883 -    def saveAsTextFile(self, path): 
  884          """ 
 885          Save this RDD as a text file, using string representations of elements. 
 886   
 887          >>> tempFile = NamedTemporaryFile(delete=True) 
 888          >>> tempFile.close() 
 889          >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name) 
 890          >>> from fileinput import input 
 891          >>> from glob import glob 
 892          >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) 
 893          '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n' 
 894   
 895          Empty lines are tolerated when saving to text files. 
 896   
 897          >>> tempFile2 = NamedTemporaryFile(delete=True) 
 898          >>> tempFile2.close() 
 899          >>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name) 
 900          >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*")))) 
 901          '\\n\\n\\nbar\\nfoo\\n' 
 902          """ 
 903          def func(split, iterator): 
 904              for x in iterator: 
 905                  if not isinstance(x, basestring): 
 906                      x = unicode(x) 
 907                  yield x.encode("utf-8") 
  908          keyed = PipelinedRDD(self, func) 
 909          keyed._bypass_serializer = True 
 910          keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) 
 911   
 912       
 913   
 915          """ 
 916          Return the key-value pairs in this RDD to the master as a dictionary. 
 917   
 918          >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap() 
 919          >>> m[1] 
 920          2 
 921          >>> m[3] 
 922          4 
 923          """ 
 924          return dict(self.collect()) 
  925   
 927          """ 
 928          Return an RDD with the keys of each tuple. 
 929          >>> m = sc.parallelize([(1, 2), (3, 4)]).keys() 
 930          >>> m.collect() 
 931          [1, 3] 
 932          """ 
 933          return self.map(lambda (k, v): k) 
  934   
 936          """ 
 937          Return an RDD with the values of each tuple. 
 938          >>> m = sc.parallelize([(1, 2), (3, 4)]).values() 
 939          >>> m.collect() 
 940          [2, 4] 
 941          """ 
 942          return self.map(lambda (k, v): v) 
  943   
 945          """ 
 946          Merge the values for each key using an associative reduce function. 
 947   
 948          This will also perform the merging locally on each mapper before 
 949          sending results to a reducer, similarly to a "combiner" in MapReduce. 
 950   
 951          Output will be hash-partitioned with C{numPartitions} partitions, or 
 952          the default parallelism level if C{numPartitions} is not specified. 
 953   
 954          >>> from operator import add 
 955          >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
 956          >>> sorted(rdd.reduceByKey(add).collect()) 
 957          [('a', 2), ('b', 1)] 
 958          """ 
 959          return self.combineByKey(lambda x: x, func, func, numPartitions) 
  960   
 962          """ 
 963          Merge the values for each key using an associative reduce function, but 
 964          return the results immediately to the master as a dictionary. 
 965   
 966          This will also perform the merging locally on each mapper before 
 967          sending results to a reducer, similarly to a "combiner" in MapReduce. 
 968   
 969          >>> from operator import add 
 970          >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
 971          >>> sorted(rdd.reduceByKeyLocally(add).items()) 
 972          [('a', 2), ('b', 1)] 
 973          """ 
 974          def reducePartition(iterator): 
 975              m = {} 
 976              for (k, v) in iterator: 
 977                  m[k] = v if k not in m else func(m[k], v) 
 978              yield m 
  979          def mergeMaps(m1, m2): 
 980              for (k, v) in m2.iteritems(): 
 981                  m1[k] = v if k not in m1 else func(m1[k], v) 
 982              return m1 
 983          return self.mapPartitions(reducePartition).reduce(mergeMaps) 
 984   
 986          """ 
 987          Count the number of elements for each key, and return the result to the 
 988          master as a dictionary. 
 989   
 990          >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
 991          >>> sorted(rdd.countByKey().items()) 
 992          [('a', 2), ('b', 1)] 
 993          """ 
 994          return self.map(lambda x: x[0]).countByValue() 
  995   
 996 -    def join(self, other, numPartitions=None): 
  997          """ 
 998          Return an RDD containing all pairs of elements with matching keys in 
 999          C{self} and C{other}. 
1000   
1001          Each pair of elements will be returned as a (k, (v1, v2)) tuple, where 
1002          (k, v1) is in C{self} and (k, v2) is in C{other}. 
1003   
1004          Performs a hash join across the cluster. 
1005   
1006          >>> x = sc.parallelize([("a", 1), ("b", 4)]) 
1007          >>> y = sc.parallelize([("a", 2), ("a", 3)]) 
1008          >>> sorted(x.join(y).collect()) 
1009          [('a', (1, 2)), ('a', (1, 3))] 
1010          """ 
1011          return python_join(self, other, numPartitions) 
 1012   
1014          """ 
1015          Perform a left outer join of C{self} and C{other}. 
1016   
1017          For each element (k, v) in C{self}, the resulting RDD will either 
1018          contain all pairs (k, (v, w)) for w in C{other}, or the pair 
1019          (k, (v, None)) if no elements in other have key k. 
1020   
1021          Hash-partitions the resulting RDD into the given number of partitions. 
1022   
1023          >>> x = sc.parallelize([("a", 1), ("b", 4)]) 
1024          >>> y = sc.parallelize([("a", 2)]) 
1025          >>> sorted(x.leftOuterJoin(y).collect()) 
1026          [('a', (1, 2)), ('b', (4, None))] 
1027          """ 
1028          return python_left_outer_join(self, other, numPartitions) 
 1029   
1031          """ 
1032          Perform a right outer join of C{self} and C{other}. 
1033   
1034          For each element (k, w) in C{other}, the resulting RDD will either 
1035          contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w)) 
1036          if no elements in C{self} have key k. 
1037   
1038          Hash-partitions the resulting RDD into the given number of partitions. 
1039   
1040          >>> x = sc.parallelize([("a", 1), ("b", 4)]) 
1041          >>> y = sc.parallelize([("a", 2)]) 
1042          >>> sorted(y.rightOuterJoin(x).collect()) 
1043          [('a', (2, 1)), ('b', (None, 4))] 
1044          """ 
1045          return python_right_outer_join(self, other, numPartitions) 
 1046   
1047       
1048 -    def partitionBy(self, numPartitions, partitionFunc=hash): 
 1049          """ 
1050          Return a copy of the RDD partitioned using the specified partitioner. 
1051   
1052          >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x)) 
1053          >>> sets = pairs.partitionBy(2).glom().collect() 
1054          >>> set(sets[0]).intersection(set(sets[1])) 
1055          set([]) 
1056          """ 
1057          if numPartitions is None: 
1058              numPartitions = self.ctx.defaultParallelism 
1059           
1060           
1061           
1062          outputSerializer = self.ctx._unbatched_serializer 
1063          def add_shuffle_key(split, iterator): 
1064   
1065              buckets = defaultdict(list) 
1066   
1067              for (k, v) in iterator: 
1068                  buckets[partitionFunc(k) % numPartitions].append((k, v)) 
1069              for (split, items) in buckets.iteritems(): 
1070                  yield pack_long(split) 
1071                  yield outputSerializer.dumps(items) 
 1072          keyed = PipelinedRDD(self, add_shuffle_key) 
1073          keyed._bypass_serializer = True 
1074          with _JavaStackTrace(self.context) as st: 
1075              pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD() 
1076              partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, 
1077                                                            id(partitionFunc)) 
1078          jrdd = pairRDD.partitionBy(partitioner).values() 
1079          rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer)) 
1080           
1081           
1082          rdd._partitionFunc = partitionFunc 
1083          return rdd 
1084   
1085       
1086 -    def combineByKey(self, createCombiner, mergeValue, mergeCombiners, 
1087                       numPartitions=None): 
 1088          """ 
1089          Generic function to combine the elements for each key using a custom 
1090          set of aggregation functions. 
1091   
1092          Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined 
1093          type" C.  Note that V and C can be different -- for example, one might 
1094          group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]). 
1095   
1096          Users provide three functions: 
1097   
1098              - C{createCombiner}, which turns a V into a C (e.g., creates 
1099                a one-element list) 
1100              - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of 
1101                a list) 
1102              - C{mergeCombiners}, to combine two C's into a single one. 
1103   
1104          In addition, users can control the partitioning of the output RDD. 
1105   
1106          >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
1107          >>> def f(x): return x 
1108          >>> def add(a, b): return a + str(b) 
1109          >>> sorted(x.combineByKey(str, add, add).collect()) 
1110          [('a', '11'), ('b', '1')] 
1111          """ 
1112          if numPartitions is None: 
1113              numPartitions = self.ctx.defaultParallelism 
1114          def combineLocally(iterator): 
1115              combiners = {} 
1116              for x in iterator: 
1117                  (k, v) = x 
1118                  if k not in combiners: 
1119                      combiners[k] = createCombiner(v) 
1120                  else: 
1121                      combiners[k] = mergeValue(combiners[k], v) 
1122              return combiners.iteritems() 
 1123          locally_combined = self.mapPartitions(combineLocally) 
1124          shuffled = locally_combined.partitionBy(numPartitions) 
1125          def _mergeCombiners(iterator): 
1126              combiners = {} 
1127              for (k, v) in iterator: 
1128                  if not k in combiners: 
1129                      combiners[k] = v 
1130                  else: 
1131                      combiners[k] = mergeCombiners(combiners[k], v) 
1132              return combiners.iteritems() 
1133          return shuffled.mapPartitions(_mergeCombiners) 
1134       
1135 -    def foldByKey(self, zeroValue, func, numPartitions=None): 
 1136          """ 
1137          Merge the values for each key using an associative function "func" and a neutral "zeroValue" 
1138          which may be added to the result an arbitrary number of times, and must not change  
1139          the result (e.g., 0 for addition, or 1 for multiplication.).                 
1140   
1141          >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
1142          >>> from operator import add 
1143          >>> rdd.foldByKey(0, add).collect() 
1144          [('a', 2), ('b', 1)] 
1145          """ 
1146          return self.combineByKey(lambda v: func(zeroValue, v), func, func, numPartitions) 
 1147       
1148       
1149       
1151          """ 
1152          Group the values for each key in the RDD into a single sequence. 
1153          Hash-partitions the resulting RDD with into numPartitions partitions. 
1154   
1155          Note: If you are grouping in order to perform an aggregation (such as a 
1156          sum or average) over each key, using reduceByKey will provide much better 
1157          performance. 
1158   
1159          >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
1160          >>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect())) 
1161          [('a', [1, 1]), ('b', [1])] 
1162          """ 
1163   
1164          def createCombiner(x): 
1165              return [x] 
 1166   
1167          def mergeValue(xs, x): 
1168              xs.append(x) 
1169              return xs 
1170   
1171          def mergeCombiners(a, b): 
1172              return a + b 
1173   
1174          return self.combineByKey(createCombiner, mergeValue, mergeCombiners, 
1175                  numPartitions).mapValues(lambda x: ResultIterable(x)) 
1176   
1177       
1179          """ 
1180          Pass each value in the key-value pair RDD through a flatMap function 
1181          without changing the keys; this also retains the original RDD's 
1182          partitioning. 
1183   
1184          >>> x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])]) 
1185          >>> def f(x): return x 
1186          >>> x.flatMapValues(f).collect() 
1187          [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')] 
1188          """ 
1189          flat_map_fn = lambda (k, v): ((k, x) for x in f(v)) 
1190          return self.flatMap(flat_map_fn, preservesPartitioning=True) 
 1191   
1193          """ 
1194          Pass each value in the key-value pair RDD through a map function 
1195          without changing the keys; this also retains the original RDD's 
1196          partitioning. 
1197   
1198          >>> x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])]) 
1199          >>> def f(x): return len(x) 
1200          >>> x.mapValues(f).collect() 
1201          [('a', 3), ('b', 1)] 
1202          """ 
1203          map_values_fn = lambda (k, v): (k, f(v)) 
1204          return self.map(map_values_fn, preservesPartitioning=True) 
 1205   
1206       
1208          """ 
1209          Alias for cogroup. 
1210          """ 
1211          return self.cogroup(other) 
 1212   
1213       
1214 -    def cogroup(self, other, numPartitions=None): 
 1215          """ 
1216          For each key k in C{self} or C{other}, return a resulting RDD that 
1217          contains a tuple with the list of values for that key in C{self} as well 
1218          as C{other}. 
1219   
1220          >>> x = sc.parallelize([("a", 1), ("b", 4)]) 
1221          >>> y = sc.parallelize([("a", 2)]) 
1222          >>> map((lambda (x,y): (x, (list(y[0]), list(y[1])))), sorted(list(x.cogroup(y).collect()))) 
1223          [('a', ([1], [2])), ('b', ([4], []))] 
1224          """ 
1225          return python_cogroup(self, other, numPartitions) 
 1226   
1228          """ 
1229          Return each (key, value) pair in C{self} that has no pair with matching key 
1230          in C{other}. 
1231   
1232          >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)]) 
1233          >>> y = sc.parallelize([("a", 3), ("c", None)]) 
1234          >>> sorted(x.subtractByKey(y).collect()) 
1235          [('b', 4), ('b', 5)] 
1236          """ 
1237          filter_func = lambda (key, vals): len(vals[0]) > 0 and len(vals[1]) == 0 
1238          map_func = lambda (key, vals): [(key, val) for val in vals[0]] 
1239          return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func) 
 1240   
1241 -    def subtract(self, other, numPartitions=None): 
 1242          """ 
1243          Return each value in C{self} that is not contained in C{other}. 
1244   
1245          >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)]) 
1246          >>> y = sc.parallelize([("a", 3), ("c", None)]) 
1247          >>> sorted(x.subtract(y).collect()) 
1248          [('a', 1), ('b', 4), ('b', 5)] 
1249          """ 
1250          rdd = other.map(lambda x: (x, True))  
1251          return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0])  
 1252   
1254          """ 
1255          Creates tuples of the elements in this RDD by applying C{f}. 
1256   
1257          >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x) 
1258          >>> y = sc.parallelize(zip(range(0,5), range(0,5))) 
1259          >>> map((lambda (x,y): (x, (list(y[0]), (list(y[1]))))), sorted(x.cogroup(y).collect())) 
1260          [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))] 
1261          """ 
1262          return self.map(lambda x: (f(x), x)) 
 1263   
1265          """ 
1266           Return a new RDD that has exactly numPartitions partitions. 
1267             
1268           Can increase or decrease the level of parallelism in this RDD. Internally, this uses 
1269           a shuffle to redistribute data. 
1270           If you are decreasing the number of partitions in this RDD, consider using `coalesce`, 
1271           which can avoid performing a shuffle. 
1272           >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4) 
1273           >>> sorted(rdd.glom().collect()) 
1274           [[1], [2, 3], [4, 5], [6, 7]] 
1275           >>> len(rdd.repartition(2).glom().collect()) 
1276           2 
1277           >>> len(rdd.repartition(10).glom().collect()) 
1278           10 
1279          """ 
1280          jrdd = self._jrdd.repartition(numPartitions) 
1281          return RDD(jrdd, self.ctx, self._jrdd_deserializer) 
 1282   
1283 -    def coalesce(self, numPartitions, shuffle=False): 
 1284          """ 
1285          Return a new RDD that is reduced into `numPartitions` partitions. 
1286          >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect() 
1287          [[1], [2, 3], [4, 5]] 
1288          >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect() 
1289          [[1, 2, 3, 4, 5]] 
1290          """ 
1291          jrdd = self._jrdd.coalesce(numPartitions) 
1292          return RDD(jrdd, self.ctx, self._jrdd_deserializer) 
 1293   
1294 -    def zip(self, other): 
 1295          """ 
1296          Zips this RDD with another one, returning key-value pairs with the first element in each RDD 
1297          second element in each RDD, etc. Assumes that the two RDDs have the same number of 
1298          partitions and the same number of elements in each partition (e.g. one was made through 
1299          a map on the other). 
1300   
1301          >>> x = sc.parallelize(range(0,5)) 
1302          >>> y = sc.parallelize(range(1000, 1005)) 
1303          >>> x.zip(y).collect() 
1304          [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)] 
1305          """ 
1306          pairRDD = self._jrdd.zip(other._jrdd) 
1307          deserializer = PairDeserializer(self._jrdd_deserializer, 
1308                                               other._jrdd_deserializer) 
1309          return RDD(pairRDD, self.ctx, deserializer) 
 1310   
1312          """ 
1313          Return the name of this RDD. 
1314          """ 
1315          name_ = self._jrdd.name() 
1316          if not name_: 
1317              return None 
1318          return name_.encode('utf-8') 
 1319   
1321          """ 
1322          Assign a name to this RDD. 
1323          >>> rdd1 = sc.parallelize([1,2]) 
1324          >>> rdd1.setName('RDD1') 
1325          >>> rdd1.name() 
1326          'RDD1' 
1327          """ 
1328          self._jrdd.setName(name) 
 1329   
1331          """ 
1332          A description of this RDD and its recursive dependencies for debugging. 
1333          """ 
1334          debug_string = self._jrdd.toDebugString() 
1335          if not debug_string: 
1336              return None 
1337          return debug_string.encode('utf-8') 
 1338   
1340          """ 
1341          Get the RDD's current storage level. 
1342          >>> rdd1 = sc.parallelize([1,2]) 
1343          >>> rdd1.getStorageLevel() 
1344          StorageLevel(False, False, False, False, 1) 
1345          """ 
1346          java_storage_level = self._jrdd.getStorageLevel() 
1347          storage_level = StorageLevel(java_storage_level.useDisk(), 
1348                                       java_storage_level.useMemory(), 
1349                                       java_storage_level.useOffHeap(), 
1350                                       java_storage_level.deserialized(), 
1351                                       java_storage_level.replication()) 
1352          return storage_level 
 1353   
1360      """ 
1361      Pipelined maps: 
1362      >>> rdd = sc.parallelize([1, 2, 3, 4]) 
1363      >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect() 
1364      [4, 8, 12, 16] 
1365      >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect() 
1366      [4, 8, 12, 16] 
1367   
1368      Pipelined reduces: 
1369      >>> from operator import add 
1370      >>> rdd.map(lambda x: 2 * x).reduce(add) 
1371      20 
1372      >>> rdd.flatMap(lambda x: [x, x]).reduce(add) 
1373      20 
1374      """ 
1375 -    def __init__(self, prev, func, preservesPartitioning=False): 
 1376          if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable(): 
1377               
1378              self.func = func 
1379              self.preservesPartitioning = preservesPartitioning 
1380              self._prev_jrdd = prev._jrdd 
1381              self._prev_jrdd_deserializer = prev._jrdd_deserializer 
1382          else: 
1383              prev_func = prev.func 
1384              def pipeline_func(split, iterator): 
1385                  return func(split, prev_func(split, iterator)) 
 1386              self.func = pipeline_func 
1387              self.preservesPartitioning = \ 
1388                  prev.preservesPartitioning and preservesPartitioning 
1389              self._prev_jrdd = prev._prev_jrdd   
1390              self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer 
1391          self.is_cached = False 
1392          self.is_checkpointed = False 
1393          self.ctx = prev.ctx 
1394          self.prev = prev 
1395          self._jrdd_val = None 
1396          self._jrdd_deserializer = self.ctx.serializer 
1397          self._bypass_serializer = False 
 1398   
1399      @property 
1401          if self._jrdd_val: 
1402              return self._jrdd_val 
1403          if self._bypass_serializer: 
1404              serializer = NoOpSerializer() 
1405          else: 
1406              serializer = self.ctx.serializer 
1407          command = (self.func, self._prev_jrdd_deserializer, serializer) 
1408          pickled_command = CloudPickleSerializer().dumps(command) 
1409          broadcast_vars = ListConverter().convert( 
1410              [x._jbroadcast for x in self.ctx._pickled_broadcast_vars], 
1411              self.ctx._gateway._gateway_client) 
1412          self.ctx._pickled_broadcast_vars.clear() 
1413          class_tag = self._prev_jrdd.classTag() 
1414          env = MapConverter().convert(self.ctx.environment, 
1415                                       self.ctx._gateway._gateway_client) 
1416          includes = ListConverter().convert(self.ctx._python_includes, 
1417                                       self.ctx._gateway._gateway_client) 
1418          python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), 
1419              bytearray(pickled_command), env, includes, self.preservesPartitioning, 
1420              self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator, 
1421              class_tag) 
1422          self._jrdd_val = python_rdd.asJavaRDD() 
1423          return self._jrdd_val 
 1424   
1426          return not (self.is_cached or self.is_checkpointed) 
 1427   
1430      import doctest 
1431      from pyspark.context import SparkContext 
1432      globs = globals().copy() 
1433       
1434       
1435      globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) 
1436      (failure_count, test_count) = doctest.testmod(globs=globs,optionflags=doctest.ELLIPSIS) 
1437      globs['sc'].stop() 
1438      if failure_count: 
1439          exit(-1) 
 1440   
1441   
1442  if __name__ == "__main__": 
1443      _test() 
1444