Question

Does anyone know how to compare consecutive records in scalding when creating a schema. I am looking at tutorial 6 and suppose that I want to print the age of the person if data in record #2 is greater than record #1 (for all records)

for example:

R1: John 30
R2: Kim 55
R3: Mark 20 

if Rn.age > R(n-1).age the output ... which will result to R2: Kim 55

EDIT: Looking at the code I just realized it is a Scala enumeration, so my question is how to compare records in scala enumeration ?

class Tutorial6(args : Args) extends Job(args) {
  /** When a data set has a large number of fields, and we want to specify those fields conveniently
    in code, we can use, for example, a Tuple of Symbols (as most of the other tutorials show), or a List of Symbols.
    Note that Tuples can only be used if the number of fields is at most 22, since Scala Tuples cannot have more
    than 22 elements. Another alternative is to use Enumerations, which we show here **/

  object Schema extends Enumeration {
    val first, last, phone, age, country = Value // arbitrary number of fields
  }

  import Schema._

  Csv("tutorial/data/phones.txt", separator = " ", fields = Schema)
    .read
    .project(first,age)
    .write(Tsv("tutorial/data/output6.tsv"))
}
Was it helpful?

Solution

It seems the implicit conversion from Enumeration#Value is missing, so you could define it yourself:

import cascading.tuple.Fields
implicit def valueToFields(v: Enumeration#Value): Fields = v.toString

object Schema extends Enumeration {
  val first, last, phone, age, country = Value // arbitrary number of fields
}

import Schema._

var current = Int.MaxValue

Csv("tutorial/data/phones.txt", separator = " ", fields = Schema)
  .read
  .map(age -> ('current, 'previous)) { a: String =>
    val previous = current
    current = a.toInt
    current -> previous
  }
  .filter('current, 'previous) { age: (Int, Int) => age._1 > age._2 }
  .project(first, age)
  .write(Tsv("tutorial/data/output6.tsv"))

In the end, we expect the result to be the same as that of:

Csv("tutorial/data/phones.txt", separator = " ", fields = Schema)
  .read
  .map((new Fields("age"), (new Fields("current", "previous"))) { a: String =>
    val previous = current
    current = a.toInt
    current -> previous
  }
  .filter(new Fields("current", "previous")) { age: (Int, Int) =>
    age._1 > age._2
  }
  .project(new Fields("first", "age"))
  .write(Tsv("tutorial/data/output6.tsv"))

The implicit conversions provided by scalding allow you to write shorter versions of these new Fields(...).

An inplicit conversion is just a view, which will get used by the compiler when you pass arguments which are not of the expected type, but can be converted to the appropriate type by this view. For example, because map() expects a pair of Fields while you're passing it a pair of Symbols, Scala will search for an implicit conversion from Symbol -> Symbol to Fields -> Fields. A short explanation on views can be found here.

Scalding 0.8.5 introduced conversions from a product of Eumeration#Value to a Fields, but was missing conversions from a pair of values. The develop branch now also provides the latter.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top