Question

I am trying to analyze 2 billion rows (of text files in HDFS). Each file's lines contain an array of sorted integers:

[1,2,3,4]

The integer values can be 0 to 100,000. I am looking to overlap within each array of integers all possibly combinations (one-way aka 1,2 and 2,1 are not necessary). Then reduce and sum the counts of those overlaps. For example:

File:

[1,2,3,4]
[2,3,4]

Final Output:

(1,2) - 1
(1,3) - 1
(1,4) - 1
(2,3) - 2
(2,4) - 2
(3,4) - 2

The methodology that I have tried is using Apache Spark, to create a simple job that parallelizes the processing and reducing of blocks of data. However I am running into issues where the memory can't hold a hash of ((100,000)^2)/2 options and thus I am having to result in running traditional map reduce of map, sort, shuffle, reduce locally, sort, shuffle, reduce globally. I know creating the combinations is a double for loop so O(n^2) but what is the most efficient way to programmatically do this so I can minimally write to disk? I am trying to perform this task sub 2 hours on a cluster of 100 nodes (64gb ram/2 cores) Also any recommended technologies or frameworks. Below is what I have been using in Apache Spark and Pydoop. I tried using more memory optimized Hashs, however they still were too much memory.

import collection.mutable.HashMap
import collection.mutable.ListBuffer

def getArray(line: String):List[Int] = {
    var a = line.split("\\x01")(1).split("\\x02")
    var ids = new ListBuffer[Int]
    for (x <- 0 to a.length - 1){
        ids += Integer.parseInt(a(x).split("\\x03")(0))
    }
    return ids.toList
}

var textFile = sc.textFile("hdfs://data/")
val counts = textFile.mapPartitions(lines => {
    val hashmap = new HashMap[(Int,Int),Int]()
    lines.foreach( line =>  { 
        val array = getArray(line)      
        for((x,i) <- array.view.zipWithIndex){
            for (j <- (i+1) to array.length - 1){
                hashmap((x,array(j))) = hashmap.getOrElse((x,array(j)),0) + 1
            }
        } 
    })
    hashmap.toIterator
}).reduceByKey(_ + _)

Also Tried PyDoop:

def mapper(_, text, writer):
    columns = text.split("\x01")
        slices = columns[1].split("\x02")
        slice_array = []
        for slice_obj in slices:
            slice_id = slice_obj.split("\x03")[0]
            slice_array.append(int(slice_id))
            val array = getArray(line)
        for (i, x) in enumerate(array):
            for j in range(i+1, len(array) - 1):
                write.emit((x,array[j]),1)

def reducer(key, vals, writer):
    writer.emit(key, sum(map(int, vals)))

def combiner(key, vals, writer):
    writer.count('combiner calls', 1)
    reducer(key, vals, writer)
Was it helpful?

Solution

I think your problem can be reduced to word count where the corpus contains at most 5 billion distinct words.

In both of your code examples, you're trying to pre-count all of the items appearing in each partition and sum the per-partition counts during the reduce phase.

Consider the worst-case memory requirements for this, which occur when every partition contains all of the 5 billion keys. The hashtable requires at least 8 bytes to represent each key (as two 32-bit integers) and 8 bytes for the count if we represent it as a 64-bit integer. Ignoring the additional overheads of Java/Scala hashtables (which aren't insignificant), you may need at least 74 gigabytes of RAM to hold the map-side hashtable:

num_keys = 100000**2 / 2
bytes_per_key = 4 + 4 + 8
bytes_per_gigabyte = 1024 **3
hashtable_size_gb = (num_keys * bytes_per_key) / (1.0 * bytes_per_gigabyte)

The problem here is that the keyspace at any particular mapper is huge. Things are better at the reducers, though: assuming a good hash partitioning, each reducer processes an even share of the keyspace, so the reducers only require roughly (74 gigabytes / 100 machines) ~= 740 MB per machine to hold their hashtables.

Performing a full shuffle of the dataset with no pre-aggregation is probably a bad idea, since the 2 billion row dataset probably becomes much bigger once you expand it into pairs.

I'd explore partial pre-aggregation, where you pick a fixed size for your map-side hashtable and spill records to reducers once the hashtable becomes full. You can employ different policies, such as LRU or randomized eviction, to pick elements to evict from the hashtable. The best technique might depend on the distribution of keys in your dataset (if the distribution exhibits significant skew, you may see larger benefits from partial pre-aggregation).

This gives you the benefit of reducing the amount of data transfer for frequent keys while using a fixed amount of memory.

You could also consider using a disk-backed hashtable that can spill blocks to disk in order to limit its memory requirements.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top