Driver program's in memory array can be converted to distributed data structure using parallelize
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);
Spark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc.
JavaRDD<String> distFile = sc.textFile("data.txt");
RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset.
For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).
Since RDDs are distributed data stuctures, something special needs to be done to count elements inside RDD.
// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);
rdd.collect().foreach(println) //can give out of memory since all data brought to drivernode
rdd.take(100).foreach(println)Total Character Count Example
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);
The following code uses the reduceByKey operation on key-value pairs to count how many times each line of text occurs in a file:
JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
Filter lines in a file for occurence of string "ganesh"
JavaRDD<String> logData = sc.textFile(logFile).cache();
JavaRDD<String> retData = logData.filter(s -> s.contains("ganesh"));
Different types of transformations and actions
https://spark.apache.org/docs/2.1.0/programming-guide.html
Word Count
JavaRDD<String> words = lines.flatMap(s->Arrays.asList(s.split(" ")).iterator()); JavaPairRDD<String, Integer> pairs = words.mapToPair(s -> new Tuple2(s, 1)); JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
Shared Variables
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});
broadcastVar.value();
// returns [1, 2, 3]
LongAccumulator accum = jsc.sc().longAccumulator();
sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
accum.value();
// returns 10
No comments:
Post a Comment