Frage

Below is a data structure of List of tuples, ot type List[(String, String, Int)]

   val data3 = (List( ("id1" , "a", 1), ("id1" , "a", 1), ("id1" , "a", 1) , ("id2" , "a", 1)) )
                                                  //> data3  : List[(String, String, Int)] = List((id1,a,1), (id1,a,1), (id1,a,1),
                                                  //|  (id2,a,1))

I'm attempting to count the occurences of each Int value associated with each id. So above data structure should be converted to List((id1,a,3) , (id2,a,1))

This is what I have come up with but I'm unsure how to group similar items within a Tuple :

data3.map( { case (id,name,num) => (id , name , num + 1)})
                                              //> res0: List[(String, String, Int)] = List((id1,a,2), (id1,a,2), (id1,a,2), (i
                                              //| d2,a,2))

In practice data3 is of type spark obj RDD , I'm using a List in this example for testing but same solution should be compatible with an RDD . I'm using a List for local testing purposes.

Update : based on following code provided by maasg :

val byKey = rdd.map({case (id1,id2,v) => (id1,id2)->v})
val byKeyGrouped = byKey.groupByKey
val result = byKeyGrouped.map{case ((id1,id2),values) => (id1,id2,values.sum)}

I needed to amend slightly to get into format I expect which is of type

.RDD[(String, Seq[(String, Int)])]
which corresponds to .RDD[(id, Seq[(name, count-of-names)])]

:

val byKey = rdd.map({case (id1,id2,v) => (id1,id2)->v})
val byKeyGrouped = byKey.groupByKey
val result = byKeyGrouped.map{case ((id1,id2),values) => ((id1),(id2,values.sum))}
val counted = result.groupedByKey
War es hilfreich?

Lösung

In Spark, you would do something like this: (using Spark Shell to illustrate)

val l = List( ("id1" , "a", 1), ("id1" , "a", 1), ("id1" , "a", 1) , ("id2" , "a", 1))
val rdd = sc.parallelize(l)
val grouped = rdd.groupBy{case (id1,id2,v) => (id1,id2)}
val result = grouped.map{case ((id1,id2),values) => (id1,id2,value.foldLeft(0){case (cumm, tuple) => cumm + tuple._3})}

Another option would be to map the rdd into a PairRDD and use groupByKey:

val byKey = rdd.map({case (id1,id2,v) => (id1,id2)->v})
val byKeyGrouped = byKey.groupByKey
val result = byKeyGrouped.map{case ((id1,id2),values) => (id1,id2,values.sum)}

Option 2 is a slightly better option when handling large sets as it does not replicate the id's in the cummulated value.

Andere Tipps

This seems to work when I use scala-ide:

data3
  .groupBy(tupl => (tupl._1, tupl._2))
  .mapValues(v =>(v.head._1,v.head._2, v.map(_._3).sum))
  .values.toList

And the result is the same as required by the question

res0: List[(String, String, Int)] = List((id1,a,3), (id2,a,1))

You should look into List.groupBy.

You can use the id as the key, and then use the length of your values in the map (ie all the items sharing the same id) to know the count.

@vptheron has the right idea. As can be seen in the docs

def groupBy[K](f: (A) ⇒ K): Map[K, List[A]]

Partitions this list into a map of lists according to some discriminator function.

Note: this method is not re-implemented by views. This means when applied to a view it will >always force the view and return a new list.

K the type of keys returned by the discriminator function.
f the discriminator function.
returns
A map from keys to lists such that the following invariant holds: (xs partition f)(k) = xs filter (x => f(x) == k) That is, every key k is bound to a list of those elements x for which f(x) equals k.

So something like the below function, when used with groupBy will give you a list with keys being the ids. (Sorry, I don't have access to an Scala compiler, so I can't test)

def f(tupule: A) :String = {
  return tupule._1
}

Then you will have to iterate through the List for each id in the Map and sum up the number of integer occurrences. That is straightforward, but if you still need help, ask in the comments.

The following is the most readable, efficient and scalable

data.map {
  case (key1, key2, value) => ((key1, key2), value)
}
.reduceByKey(_ + _)

which will give a RDD[(String, String, Int)]. By using reduceByKey it means the summation will paralellize, i.e. for very large groups it will be distributed and summation will happen on the map side. Think about the case where there are only 10 groups but billions of records, using .sum won't scale as it will only be able to distribute to 10 cores.

A few more notes about the other answers:

Using head here is unnecessary: .mapValues(v =>(v.head._1,v.head._2, v.map(_._3).sum)) can just use .mapValues(v =>(v_1, v._2, v.map(_._3).sum))

Using a foldLeft here is really horrible when the above shows .map(_._3).sum will do: val result = grouped.map{case ((id1,id2),values) => (id1,id2,value.foldLeft(0){case (cumm, tuple) => cumm + tuple._3})}

Lizenziert unter: CC-BY-SA mit Zuschreibung
Nicht verbunden mit StackOverflow
scroll top