cartesian
# cartesian
x = sc.parallelize(['A','B'])
y = sc.parallelize(['C','D'])
z = x.cartesian(y)
print(x.collect())
print(y.collect())
print(z.collect())
['A', 'B']
['C', 'D']
[('A', 'C'), ('A', 'D'), ('B', 'C'), ('B', 'D')]
groupBy
# groupBy
x = sc.parallelize([1,2,3])
y = x.groupBy(lambda x: 'A' if (x%2 == 1) else 'B' )
print(x.collect())
# y is nested, this iterates through it
print([(j[0],[i for i in j[1]]) for j in y.collect()])
[1, 2, 3]
[('A', [1, 3]), ('B', [2])]
pipe
# pipe
x = sc.parallelize(['A', 'Ba', 'C', 'AD'])
y = x.pipe('grep -i "A"') # calls out to grep, may fail under Windows
print(x.collect())
print(y.collect())
['A', 'Ba', 'C', 'AD']
['A', 'Ba', 'AD']
foreach
# foreach
from __future__ import print_function
x = sc.parallelize([1,2,3])
def f(el):
'''side effect: append the current RDD elements to a file'''
f1=open("./foreachExample.txt", 'a+')
print(el,file=f1)
# first clear the file contents
open('./foreachExample.txt', 'w').close()
y = x.foreach(f) # writes into foreachExample.txt
print(x.collect())
print(y) # foreach returns 'None'
# print the contents of foreachExample.txt
with open("./foreachExample.txt", "r") as foreachExample:
print (foreachExample.read())
[1, 2, 3]
None
3
1
2
foreachPartition
# foreachPartition
from __future__ import print_function
x = sc.parallelize([1,2,3],5)
def f(parition):
'''side effect: append the current RDD partition contents to a file'''
f1=open("./foreachPartitionExample.txt", 'a+')
print([el for el in parition],file=f1)
# first clear the file contents
open('./foreachPartitionExample.txt', 'w').close()
y = x.foreachPartition(f) # writes into foreachExample.txt
print(x.glom().collect())
print(y) # foreach returns 'None'
# print the contents of foreachExample.txt
with open("./foreachPartitionExample.txt", "r") as foreachExample:
print (foreachExample.read())
[[], [1], [], [2], [3]]
None
[]
[]
[1]
[2]
[3]
讲解的很细致