Question

I need to process multiple files scattered across various directories. I would like to load all these up in a single RDD and then perform map/reduce on it. I see that SparkContext is able to load multiple files from a single directory using wildcards. I am not sure how to load up files from multiple folders.

The following code snippet fails:

for fileEntry in files:
    fileName = basePath + "/" + fileEntry
    lines = sc.textFile(fileName)
    if retval == None:
        retval = lines
    else:
        retval = sc.union(retval, lines)

This fails on the third loop with the following error message:

retval = sc.union(retval, lines)
TypeError: union() takes exactly 2 arguments (3 given)

Which is bizarre given I am providing only 2 arguments. Any pointers appreciated.

Was it helpful?

Solution

How about this phrasing instead?

sc.union([sc.textFile(basepath + "/" + f) for f in files])

In Scala SparkContext.union() has two variants, one that takes vararg arguments, and one that takes a list. Only the second one exists in Python (since Python does not have polymorphism).

UPDATE

You can use a single textFile call to read multiple files.

sc.textFile(','.join(files))

OTHER TIPS

I solve similar problems by using wildcard.

e.g. I found some traits in the files I want to load in spark,

dir

subdir1/folder1/x.txt

subdir2/folder2/y.txt

you can use the following sentence

sc.textFile("dir/*/*/*.txt")

to load all relative files.

The wildcard '*' only works in single level directory, which is not recursive.

You can use the following function of SparkContext:

wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]

Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file.

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext

You can use this

First You can get a Buffer/List of S3 Paths :

import scala.collection.JavaConverters._
import java.util.ArrayList
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.amazonaws.services.s3.model.ListObjectsRequest

def listFiles(s3_bucket:String, base_prefix : String) = {
    var files = new ArrayList[String]

    //S3 Client and List Object Request
    var s3Client = new AmazonS3Client();
    var objectListing: ObjectListing = null;
    var listObjectsRequest = new ListObjectsRequest();

    //Your S3 Bucket
    listObjectsRequest.setBucketName(s3_bucket)

    //Your Folder path or Prefix
    listObjectsRequest.setPrefix(base_prefix)

    //Adding s3:// to the paths and adding to a list
    do {
      objectListing = s3Client.listObjects(listObjectsRequest);
      for (objectSummary <- objectListing.getObjectSummaries().asScala) {
        files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
      }
      listObjectsRequest.setMarker(objectListing.getNextMarker());
    } while (objectListing.isTruncated());

    //Removing Base Directory Name
    files.remove(0)

    //Creating a Scala List for same
    files.asScala
  }

Now Pass this List object to the following piece of code, note : sc is an object of SQLContext

var df: DataFrame = null;
  for (file <- files) {
    val fileDf= sc.textFile(file)
    if (df!= null) {
      df= df.unionAll(fileDf)
    } else {
      df= fileDf
    }
  }

Now you got a final Unified RDD i.e. df

Optional, And You can also repartition it in a single BigRDD

val files = sc.textFile(filename, 1).repartition(1)

Repartitioning always works :D

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top