val emp = sc.
textFile("emp.txt").
map { line =>
val parts = line.split("\t")
// we need to output (Naturalkey, (FactId, Amount)) in
// order to be able to join with the dimension data.
((parts(0), parts(2)),parts(1))
}
val emp_new = sc.
textFile("emp_new.txt").
map { line =>
val parts = line.split("\t")
// we need to output (Naturalkey, (FactId, Amount)) in
// order to be able to join with the dimension data.
((parts(0), parts(2)),parts(1))
}
val finalemp =
emp_new.join(emp).
map { case((nk1,nk2) ,((parts1), (val1))) => (nk1,parts1,val1) }
spark join operation based on two columns
-
06-07-2023 - |
Pregunta
I'm trying to join two datasets based on two columns. It works until I use one column but fails with below error
:29: error: value join is not a member of org.apache.spark.rdd.RDD[(String, String, (String, String, String, String, Double))] val finalFact = fact.join(dimensionWithSK).map { case(nk1,nk2, ((parts1,parts2,parts3,parts4,amount), (sk, prop1,prop2,prop3,prop4))) => (sk,amount) }
Code :
import org.apache.spark.rdd.RDD
def zipWithIndex[T](rdd: RDD[T]) = {
val partitionSizes = rdd.mapPartitions(p => Iterator(p.length)).collect
val ranges = partitionSizes.foldLeft(List((0, 0))) { case(accList, count) =>
val start = accList.head._2
val end = start + count
(start, end) :: accList
}.reverse.tail.toArray
rdd.mapPartitionsWithIndex( (index, partition) => {
val start = ranges(index)._1
val end = ranges(index)._2
val indexes = Iterator.range(start, end)
partition.zip(indexes)
})
}
val dimension = sc.
textFile("dimension.txt").
map{ line =>
val parts = line.split("\t")
(parts(0),parts(1),parts(2),parts(3),parts(4),parts(5))
}
val dimensionWithSK =
zipWithIndex(dimension).map { case((nk1,nk2,prop3,prop4,prop5,prop6), idx) => (nk1,nk2,(prop3,prop4,prop5,prop6,idx + nextSurrogateKey)) }
val fact = sc.
textFile("fact.txt").
map { line =>
val parts = line.split("\t")
// we need to output (Naturalkey, (FactId, Amount)) in
// order to be able to join with the dimension data.
(parts(0),parts(1), (parts(2),parts(3), parts(4),parts(5),parts(6).toDouble))
}
val finalFact = fact.join(dimensionWithSK).map { case(nk1,nk2, ((parts1,parts2,parts3,parts4,amount), (sk, prop1,prop2,prop3,prop4))) => (sk,amount) }
Request someone's help here.. Thanks Sridhar
Solución 3
Otros consejos
If you look at the signature of join it works on an RDD of pairs:
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
You have a triple. I guess your trying to join on the first 2 elements of the tuple, and so you need to map your triple to a pair, where the first element of the pair is a pair containing the first two elements of the triple, e.g. for any Types V1
and V2
val left: RDD[(String, String, V1)] = ??? // some rdd
val right: RDD[(String, String, V2)] = ??? // some rdd
left.map {
case (key1, key2, value) => ((key1, key2), value)
}
.join(
right.map {
case (key1, key2, value) => ((key1, key2), value)
})
This will give you an RDD of the form RDD[(String, String), (V1, V2)]
rdd1 Schema : field1,field2, field3, fieldX,.....
rdd2 Schema : field1, field2, field3, fieldY,.....
val joinResult = rdd1.join(rdd2, Seq("field1", "field2", "field3"), "outer")
joinResult schema : field1, field2, field3, fieldX, fieldY, ......