Question

I want to apply an operation to all fields of my Pipe. I saw on https://github.com/twitter/scalding/wiki/Fields-based-API-Reference that "You can use '* (here and elsewhere) to mean all fields." but somehow I do not succeed to make it work. Would someone be kind enough to show me an example ?

initially I have something like

mySource.map('field1 -> 'field1){ number: String => number.trim }

which I now would like to apply to all fields like something

mySource.map('* -> '*){ numbers: List[String] => numbers.map(_.trim) }

?

Was it helpful?

Solution

In Scalding Fields API, in order to map from '* to '*, best approach I can think of is Cascading TupleEntry, cascading.tuple.TupleEntry

import com.twitter.scalding._
import cascading.tuple.TupleEntry

// Notice I do not specify the scheme when reading.
// I only know first column is 'user_id', the rest is some value and I want 
// to double the values. You can use 'map' or 'mapTo'.
Tsv(args("input"))
  .read
  .map('* -> '*) {
     fields: TupleEntry =>
     val sz: Int = fields.size()
     for (i <- from 1 until sz) fields.setDouble(i, fields.getDouble(i) * 2.0)
     fields.getTuple()
  }
  .write(Tsv(args("output")))

OTHER TIPS

The '* operator only seems to work with mapTo and full type annotation.

mySource
  .mapTo[(String,String,String),(String,String,String)]('* -> '*) { case (a: String, b: String, c: String) =>
    (a.trim, b.trim, c.trim)
  }

For example this works with Scalding 0.11.0 (neither of the current answers worked as they are):

  mySource
    .mapTo('* -> '*) {
      entry: TupleEntry =>
        for (i <- 0 until entry.size) {
          if (entry.getObject(i) == null) entry.setRaw(i, "\\N")
        }
        entry.getTuple
    }

So essentially mapTo('* -> '*) -> entry.getTuple.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top