Question

I'm trying to join two datasets based on two columns. It works until I use one column but fails with below error

:29: error: value join is not a member of org.apache.spark.rdd.RDD[(String, String, (String, String, String, String, Double))] val finalFact = fact.join(dimensionWithSK).map { case(nk1,nk2, ((parts1,parts2,parts3,parts4,amount), (sk, prop1,prop2,prop3,prop4))) => (sk,amount) }

Code :

    import org.apache.spark.rdd.RDD

    def zipWithIndex[T](rdd: RDD[T]) = {
      val partitionSizes = rdd.mapPartitions(p => Iterator(p.length)).collect

      val ranges = partitionSizes.foldLeft(List((0, 0))) { case(accList, count) =>
        val start = accList.head._2
        val end = start + count
        (start, end) :: accList
      }.reverse.tail.toArray

      rdd.mapPartitionsWithIndex( (index, partition) => {
          val start = ranges(index)._1
          val end = ranges(index)._2
          val indexes = Iterator.range(start, end)
          partition.zip(indexes)
      })
    }

    val dimension = sc.
      textFile("dimension.txt").
      map{ line => 
        val parts = line.split("\t")
        (parts(0),parts(1),parts(2),parts(3),parts(4),parts(5))
      }

    val dimensionWithSK = 
      zipWithIndex(dimension).map { case((nk1,nk2,prop3,prop4,prop5,prop6), idx) => (nk1,nk2,(prop3,prop4,prop5,prop6,idx + nextSurrogateKey)) }

    val fact = sc.
      textFile("fact.txt").
      map { line =>
        val parts = line.split("\t")
        // we need to output (Naturalkey, (FactId, Amount)) in
        // order to be able to join with the dimension data.
        (parts(0),parts(1), (parts(2),parts(3), parts(4),parts(5),parts(6).toDouble))
      }  

    val finalFact = fact.join(dimensionWithSK).map { case(nk1,nk2, ((parts1,parts2,parts3,parts4,amount), (sk, prop1,prop2,prop3,prop4))) => (sk,amount) }  

Request someone's help here.. Thanks Sridhar

Was it helpful?

Solution 3

    val emp = sc.
      textFile("emp.txt").
      map { line =>
        val parts = line.split("\t")
        // we need to output (Naturalkey, (FactId, Amount)) in
        // order to be able to join with the dimension data.
        ((parts(0), parts(2)),parts(1))
      }

    val emp_new = sc.
      textFile("emp_new.txt").
      map { line =>
        val parts = line.split("\t")
        // we need to output (Naturalkey, (FactId, Amount)) in
        // order to be able to join with the dimension data.
        ((parts(0), parts(2)),parts(1))
      }

    val finalemp = 
      emp_new.join(emp).
      map { case((nk1,nk2) ,((parts1), (val1))) => (nk1,parts1,val1) }  

OTHER TIPS

If you look at the signature of join it works on an RDD of pairs:

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]

You have a triple. I guess your trying to join on the first 2 elements of the tuple, and so you need to map your triple to a pair, where the first element of the pair is a pair containing the first two elements of the triple, e.g. for any Types V1 and V2

val left: RDD[(String, String, V1)] = ??? // some rdd

val right: RDD[(String, String, V2)] = ??? // some rdd

left.map {
  case (key1, key2, value) => ((key1, key2), value)
}
.join(
  right.map {
    case (key1, key2, value) => ((key1, key2), value)
  })

This will give you an RDD of the form RDD[(String, String), (V1, V2)]

rdd1 Schema : field1,field2, field3, fieldX,.....

rdd2 Schema : field1, field2, field3, fieldY,.....

val joinResult = rdd1.join(rdd2, Seq("field1", "field2", "field3"), "outer")

joinResult schema : field1, field2, field3, fieldX, fieldY, ......

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top