Frage

To read a file into memory I use :

val lines = sc.textFile("myLogFile*")

which is of type :

org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:12

Reading the Scala doc at : http://spark.apache.org/docs/0.9.1/scala-programming-guide.html#parallelized-collections "Parallelized collections are created by calling SparkContext’s parallelize method on an existing Scala collection (a Seq object)."

This does not seem to apply to an RDD ? Can parallelized processing occur on an RDD ? Do I need to convert the RDD to a Seq object ?

War es hilfreich?

Lösung

Resilient Distributed Datasets (RDDs) RDD as the name suggests are distributed and fault-tolerant and parallel.

"RDDs are fault-tolerant, parallel data structures that let users explicitly persist intermediate results in memory, control their partitioning to optimize data placement, and ma- nipulate them using a rich set of operators." Please see this paper.

No you don't need to convert an RDD to a Seq object. All processing on RDDs are done in parallel (depending on how parallel your Spark installation is).

Andere Tipps

Extending previous answer. It is possible to loop thru rdd in a paralleized way with partitions.

rdd.foreachPartition { partition =>
  -----some code----
    partition.foreach { item =>  // where item is object of your intended type, our case is Row
    }
}

RDD is like a (highly efficient and spark native) data structure holding data of Type T.
We did some work related to RDD, where Row is org.apache.hadoop.hbase.client.Row.

Approach :-
1. First created a DataFrame
2. Convert to RDD in some way. In our case, select only rowkey of DF.
3. Set number of partitions and create an RDD which will be parallelized with required partitions.
Otherwise, default partitions of SparkContext will be used (in spark-shell mode it is 1)
4. use the for loop structure of rdd.forEachPArtition and partition.forEach

Sample Code :- (in scala, same way can be done for Java)

    // Assume your DF (intended dataframe is created in some way)
    // in our case df for hbase table was created using catalog approach of spark-hbase-connector of com.hortonworks package  
    // learn.microsoft.com/en-us/azure/hdinsight/hdinsight-using-spark-query-hbase   
    var rdd: RDD[org.apache.hadoop.hbase.client.Row] = df.select("rowkey").rdd()
    var numberOfPartitions = 80
    if (rdd.count > 1000000 && numberOfPartitions < 100)
      numberOfPartitions = 300

    // Optional; to take advantage of partitions during processing   
    var partitionReadyRDD = rdd.distinct(numberOfPartitions)  

    partitionReadyRDD.foreachPartition { partition =>
      partition.foreach { item => //item; each row key
          //.........some code..........   
      }
    }
Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top