2016年4月3日 星期日

[ Learn Spark ] Ch4. Working with Key/Value Pairs - Part1

Introduction
This chapter covers how to work with RDDs of key/value pairs, which are a common data type required for many operations in Spark. Key/Value RDDs are commonly used to perform aggregations, and often we will do some initial ETL (Extrac, transform, and load) to get our data into a key/value format. Key/value RDDs expose new operations (e.g., counting up reviews for each product, grouping together data with the same key, and grouping together two different RDDs).

We also discuss an advanced feature that lets users control the layout of pair RDDs across nodes: partitioningUsing controllable partitioning, applications can sometimes greatly reduce communication costs by ensuring that data will be accessed together and will be on the same node. This can provide significant speedups. We illustrate partitioning using the PageRank algorithm as an example. Choosing the right partitioning for a distributed dataset is similar to choosing the right data structure for a local one - in both cases, data layout can greatly affect performance.

Motivation
Spark provides special operations on RDDs containing key/value pairs. These RDDs are called pair RDDs. Pair RDDs are a useful building block in many programs, as they expose operations that allow you to act on each key in parallel or regroup data across the network. For example, pair RDDs have a reduceByKey() method that can aggregate data separately for each key, and a join() method that can merge two RDDs together by grouping elements with the same key. It is common to extract fields from an RDD (representing, for instance, an event time, customer ID, or other identifier) and use those fields as keys in pair RDD operations.

Creating Pair RDDs
There are a number of ways to get pair RDDs in Spark. Many functions we explore loading from in Chapter 5 will directly return pair RDDs fro their key/value data. In other cases we have a regular RDD that we want to turn into a pair RDD. We can do this by running a map() function that returns key/value pairs. To illustrate, we show code that starts with a RDD of lines of text and keys the data by the first word in each line.

The way to build key-value RDDs differs by languages. In Python, for the functions on keyed data to work we need to return an RDD composed of tuples (see Example 4-1)
- Example 4-1. Creating a pair RDD using the first word as the key in Python
>>> lines = sc.textFile('README.md')
>>> pairs = lines.map(lambda x: (x.split(" ")[0], x))
>>> pairs
PythonRDD[2] at RDD at PythonRDD.scala:43
>>> pairs.first()
(u'#', u'# Apache Spark')

Java doesn't have a built-in tuple type, so Spark's Java API has users create tuples using the scala.Tuple2 class. This class is very simple: Java users can construct a new tuple by writing new Tuple2(elem1, elem2) and can then access its elements with the ._1() and ._2() methods. Java users also need to call special version of Spark's functions when creating pair RDDs. For instance, the mapToPair() function should be used in place of the basic map() function. This is discussed in more detail in "Java" topic and let's look at a simple case in Example 4-3:
- Example 4-3. Creating a pair RDD using the first word as the key in Java
  1. PairFunction keyData = new PairFunction{  
  2.     public Tuple2 call(String x) {  
  3.         return new Tuple2(x.split(" ")[0]), x);  
  4.     }  
  5. };  
  6. JavaPairRDD pairs = lines.mapToPair(keyData);  
When creating a pair RDD from an in-memory collection in Python/Scala, we only need to call SparkContext.parallelize() on a collection of pairs. To create pair RDD in Java from an in-memory collection, we instead useJavaSparkContext.parallelizePairs().

Transformations on Pair RDDs
Pair RDDs are allowed to use all the transformations available to standard RDDs. The same rules apply from "Passing Functions to Spark". Since pair RDDs contain tuples, we need to pass functions that operate on tuples rather than on individual elements. Table 4-1 and 4-2 summarize transformation on pair RDDs, and we will dive into the transformation in detail later.

reduceBykey(func): Combine values with the same key.
>>> rdd = sc.parallelize([(1,2), (3,4), (3,6)])
>>> nrdd = rdd.reduceByKey(lambda x, y: x+y )
>>> nrdd.collect()
[(1, 2), (3, 10)]


From now on, we will assumed that the rdd variable contains RDD data [(1, 2), (3, 4), (3, 6)].
groupByKey(): Group values with the same key.
>>> from __future__ import print_function
>>> nrdd = rdd.groupByKey()
>>> nrdd.collect()
[(1, ), (3, )]
>>> colt = nrdd.collect()
>>> for k, v in colt:
... print("Iterate key=%d:" % (k), end='')
... for i in v:
... print(" %d" % (i), end='')
... print("")
...
Iterate key=1: 2
Iterate key=3: 4 6


combineByKey(createCombiner, mergeValue, mergeCombiners): Combine values with the same key using a different result type.
createCombiner called per partition when a new key is found
mergeValue combines a new value to an existing accumulator
mergeCombiners with result from different partitions
>>> x = sc.parallelize([("coffee", 1), ("coffee", 2), ("coffee", 2), ("panda", 3), ("panda", 1)])
>>> def createCombiner(value):
... return (value, 1)
...
>>> def mergeValue(acc, value):
... return (acc[0]+value, acc[1]+1)
...
>>> def mergeCombiners(acc1, acc2):
... return (acc1[0]+acc2[0], acc1[1]+acc2[1])
...
>>> sorted(x.combineByKey(createCombiner, mergeValue, mergeCombiners).collect())
[('coffee', (5, 3)), ('panda', (4, 2))]


mapValues(func): Apply a function to each value of a pair RDD without changing the key.
>>> rdd.mapValues(lambda x: x+1).collect()
[(1, 3), (3, 5), (3, 7)]


flatMapValues(func): Apply a function that returns an iterator to each value of a pair RDD, and for each element returned, produce a key/value entry with the old key. Often used for tokenization.
>>> rdd.flatMapValues(lambda x: range(x, 6)).collect()
[(1, 2), (1, 3), (1, 4), (1, 5), (3, 4), (3, 5)]


keys(): Return an RDD of just the keys.
>>> rdd.keys().collect()
[1, 3, 3]


values(): Return an RDD of just the values
>>> rdd.values().collect()
[2, 4, 6]


sortByKey(): Return an RDD sorted by the key.
>>> rdd.sortByKey().collect()
[(1, 2), (3, 4), (3, 6)]


Below APIs will do transformation involving two RDDs.
subtractByKey(): Remove elements with a key present in other RDD
>>> other = sc.parallelize([(3,9)])
>>> rdd.subtractByKey(other).collect()
[(1, 2)]


join(): Perform an inner join between two RDDs
>>> rdd.join(other).collect()
[(3, (4, 9)), (3, (6, 9))]


rightOuterJoin(): Perform a join between two RDDs where the key must be present in the other RDD.
>>> rdd.rightOuterJoin(other).collect()
[(3, (4, 9)), (3, (6, 9))]


leftOuterJoin(): Perform a join between two RDDs where the key must be present in the first RDD.
>>> rdd.leftOuterJoin(other).collect()
[(1, (2, None)), (3, (4, 9)), (3, (6, 9))]


cogroup(): Group data from both RDDs sharing the same key.
>>> grdd = rdd.cogroup(other)
>>> cdata = grdd.collect() // cdata will be {(1: ([2], [0])), (3: ([4, 6], [9]))}


We will discuess each of these families of pair RDD functions in more details in the upcoming sections. Pair RDDs are also still RDDs (of Tuple2 objects in Java/Scala or of Python tuples), and thus support the same functions as RDDs. For instance, we ca take our pair RDD from the previous section and filter out lines longer than 20 characters or more, as shown in below example:
- Example 4-4. Simpler filter on second element in Python
>>> lines = sc.textFile("README.md")
>>> pairs = lines.map(lambda x: (x.split(" ")[0], x))
>>> result = pairs.filter(lambda keyValue: len(keyValue[1]) < 20 and len(keyValue[1]) > 1)
>>> result.collect()
[(u'#', u'# Apache Spark'), ..., (u'can', u'can be run using:'), (u'', u' ./dev/run-tests'), (u'##', u'## Configuration')]


Sometimes working with pairs can be awkward if we want to access only the value pair of our pair RDD. Since this is a common pattern, Spark provides the mapValues(func) function, which is the same as map{case (x, y): (x, func(y))}. We will use this function in many of our examples. Below is an simple usage example of this function:
>>> x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
>>> def f(x): return len(x)
>>> x.mapValues(f).collect()
[('a', 3), ('b', 1)]

We now discuss each of the families of pair RDD functions, starting with aggregations.

Aggregations
When dataset are described in terms of key/value pairs, it is common to want to aggregate statistics across all elements with the same key. We have looked at the fold()aggregate(), and reduce() actions on basic RDDs, and similar per-key transformations exist on pair RDDs. Spark has a similar set of operations that combines values that have the same key. These operations return RDDs and thus are transformations rather than actions.

reduceByKey() is quite similar to reduce(); both take a function and use it to combine values. reduceByKey() runs several parallel reduce operations, one for each key in the dataset, where each operation combines values that have the same key. Because datasets can have very large numbers of keys, reduceByKey() is not implemented as an action that returns a value to the user program. Instead, it returns a new RDD consisting of each key and the reduced value for that key:
>>> from operator import add
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.reduceByKey(add).collect())
[('a', 2), ('b', 1)]

foldByKey() is quite similar to fold(); both use a zero value of the same type of the data in our RDD and combination function. As with fold(), the provided zero value for foldByKey() should have no impact when added with your combination function to another element.
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> from operator import add
>>> sorted(rdd.foldByKey(0, add).collect())
[('a', 2), ('b', 1)]

As example below demonstrate, we can use reduceByKey() along with mapValue() to compute the per-key average in a very similar manner to how fold() and map() can be used to compute the entire RDD average (see Figure 4-2). As with average, we can achieve the same result using a more specialize function, which we will cover next.
- Example 4-7. Per-key average with reduceByKey() and mapValues() in Python
>>> rdd.collect()
[('a', 1), ('b', 1), ('a', 1), ('a', 3), ('b', 3)]
>>> rdd.mapValues(lambda x: (x, 1)).collect()
[('a', (1, 1)), ('b', (1, 1)), ('a', (1, 1)), ('a', (3, 1)), ('b', (3, 1))]
>>> rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])).collect()
[('a', (5, 3)), ('b', (4, 2))]




We can use a similar approach in below example to also implement the classic distributed word count problem. We will use flatMap() from the previous chapter so that we can produce a pair RDD of words and the number 1 and then sum together all of the words using reduceByKey() as previous example:
- Example 4-9. Word count in Python
>>> lines = sc.textFile('README.md')
>>> words = lines.flatMap(lambda x: x.split(" "))
>>> result = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
>>> sorted(result.collect(), key=lambda t: t[1], reverse=True)[0:3] // Sorted the result in desc order and print top 3
[(u'', 67), (u'the', 21), (u'to', 14)]

combineByKey() is the most general of the per-key aggregation function. Most of the other per-key combines are implemented using it. Like aggregate(). It allows the user to return values that are not the same type as our input data. To understand combineByKey(), it's useful to think of how it handles each element it processes. As it goes through the elements in a partition, each element either has a key it has't seen before or has the same key as a previous element.

If it's a new element, combineByKey() uses a function we provide, called createCombiner(), to create the initial value for the accumulator() on that key. It's important to note that this happens the first time a key is found in each partition, rather than only the first time the key is found in the RDD. If it is a value we have see before while processing that partition, it will instead use the provided function, mergeValue(), with the current value for the accumulator for that key and the new value. Since each partition is processed independently, we can have multiple accumulators for the same key. When we are merging the results from each partition, if two or more partitions have an accumulator for the same key we merge the accumulators using the user-supplied mergeCombiners() function.

Since combineByKey() has a lot of different parameters it is a great candidate for an explanatory example. To better illustrate how this function works, we will look at computing average value for each key, as shown in below example and illustrated in Figure 4-3.
- Example 4-12. Per-key average using combineByKey() in Python
// combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None, partitionFunc=)
>>> rdd.collect()
[('a', 1), ('b', 1), ('a', 1), ('a', 3), ('b', 3)]
>>> createCombiner = lambda x: (x,1)
>>> mergeValue = lambda x, y: (x[0] + y, x[1] + 1)
>>> mergeCombiners = lambda x, y: (x[0] + y[0], x[1] + y[1])
>>> sumCount = rdd.combineByKey(createCombiner, mergeValue, mergeCombiners)
>>> sumCount.collect()
[('a', (5, 3)), ('b', (4, 2))]
>>> sumCount.map(lambda x: (x[0], float(x[1][0])/x[1][1])).collect()
[('a', 1.6666666666666667), ('b', 2.0)]

There are many options for combining our data by key. Most of them are implemented on top of combineByKey() but provide a simple interface. In nay case, using one of the specialized aggregation functions is Spark can be much faster than the naive approach of grouping our data and then reducing it.

Tuning the level of parallelism
So far we have talked how all of our transformations are distributed, but we have not really looked at how Spark decides how to split the work . Every RDD has a fixed number of partitions that determine the degree of parallelism to use when executing operations on the RDD. When performing aggregation or grouping operations, we can ask Spark to use a specific number of partitions. Spark will always try to infer a sensible default value based on the size of your cluster, but in some cases you will want to turn the level of parallelism for better performance.

Most of the operations discussed in this chapter accept a second parameter giving the number of partitions to use when creating the grouped or aggregated RDD, as show in below example:
- Example 4-15. reduceByKey() with custom parallelism in Python
>>> data = [("a", 3), ("b", 4), ("a", 1)]
>>> sc.parallelize(data).reduceByKey(lambda x, y: x + y).collect() // Default parallelism
[('a', 4), ('b', 4)]
>>> sc.parallelize(data).reduceByKey(lambda x, y: x + y, 10).collect() // Custom parallelism
[('b', 4), ('a', 4)]

Sometimes, we want to change the partitioning of an RDD outside the context of grouping and aggregation operations. For those cases, Spark provides the repartition() function, which shuffles the data across the network to create a new set of partitions. Below is an simple example:
// glom(): Return an RDD created by coalescing all elements within each partition into a list.
>>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4) // Split the data into 4 groups
>>> sorted(rdd.glom().collect())
[[1], [2, 3], [4, 5], [6, 7]]
>>> len(rdd.repartition(2).glom().collect())
2 // The data is distributed in 2 partitions
>>> len(rdd.repartition(10).glom().collect())
10

Keep in mind that repartitioning your data is a fairly expensive operation. Spark also has an optimized version of repartition() called coalesce() that allows avoiding data movement, but only if you are decreasing the number of RDD partitions. To know whether you can safely call this function, you can check the size of the RDD using rdd.partitios. size() in Java/Scala and rdd.getNumPartitions() in Python and make sure that you are coalescing it to fewer partitions than i currently has.
>>> rdd = sc.parallelize([1, 2, 3, 4, 5], 3)
>>> rdd.glom().collect()
[[1], [2, 3], [4, 5]]
>>> rdd.getNumPartitions()
3
>>> rdd.coalesce(1).glom().collect()
[[1, 2, 3, 4, 5]]

沒有留言:

張貼留言

[Git 常見問題] error: The following untracked working tree files would be overwritten by merge

  Source From  Here 方案1: // x -----删除忽略文件已经对 git 来说不识别的文件 // d -----删除未被添加到 git 的路径中的文件 // f -----强制运行 #   git clean -d -fx 方案2: 今天在服务器上  gi...