Question

I have data type :

counted: org.apache.spark.rdd.RDD[(String, Seq[(String, Int)])] = MapPartitionsRDD[24] at groupByKey at <console>:28

And I'm trying to apply the following to this type :

def func = 2

counted.flatMap { x => counted.map { y => ((x._1+","+y._1),func) } }

So each sequence is compared to each other and a function is applied. For simplicity the function is just returning 2. When I attempt above function I receive this error :

scala> counted.flatMap { x => counted.map { y => ((x._1+","+y._1),func) } }
<console>:33: error: type mismatch;
 found   : org.apache.spark.rdd.RDD[(String, Int)]
 required: TraversableOnce[?]
              counted.flatMap { x => counted.map { y => ((x._1+","+y._1),func) } }

How can this function be applied using Spark ?

I have tried

val dataArray = counted.collect
dataArray.flatMap { x => dataArray.map { y => ((x._1+","+y._1),func) } }

which converts the collection to Array type and applies same function. But I run out of memory when I try this method. I think using an RDD is more efficient than using an Array ? The max amount of memory I can allocate is 7g , is there a mechanism in spark that I can use hard drive memory to augment available RAM memory ?

The collection I'm running this function on contain 20'000 entries so 20'000^2 comparisons (400'000'000) but in Spark terms this is quite small ?

Was it helpful?

Solution 2

@RexKerr pointed to me that I was somewhat incorrect in the comment section, so I deleted my comments. But while doing that, I had the chance to read the post again and came up with the idea that might be of some use to you.

Since what you are trying to implement is actually some operation over a cartesian product, you might want to try just calling the RDD#cartesian. Here is a dumb example, but if you can give some real code, maybe I'll be able to do something like this in that case as well:

// get collection with the type corresponding to the type in question:
val v1 = sc.parallelize(List("q"-> (".", 0), "s"->(".", 1), "f" -> (".", 2))).groupByKey
// try doing something
v1.cartesian(v1).map{x => (x._1._1+","+x._1._1, 2)}.foreach(println)

OTHER TIPS

Short answer:

counted.cartesian(counted).map {
  case ((x, _), (y, _)) => (x + "," + y, func)
}

Please use pattern matching to extract tuple elements for nested tuples to avoid unreadable chained underscore notation. Using _ for the second elements shows the reader that these values are being ignored.

Now what would be even more readable (and maybe more efficient) if func doesn't use the second elements would be to do this:

val projected = counted.map(_._1)
projected.cartesian(projected).map(x => (x._1 + "," + x._2, func))

Note that you do not need curly braces if your lambda fits in a single semantic line this is a very common mistake in Scala.

I would like to know why you wish to have this Cartesian product, there is often ways to avoid doing this that are significantly more scalable. Please say what your going to do with this Cartesian product and I will try to find a scalable way of doing what you want.

One final point; please put spaces between operators

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top