Pregunta

I have a Java UDF that takes tuples and returns a bag of tuples. When I operate on that bag (see code below) I get the error message

2013-12-18 14:32:33,943 [main] ERROR org.apache.pig.tools.pigstats.PigStats - ERROR: java.lang.Long cannot be cast to org.apache.pig.data.Tuple

I cannot recreate this error just by reading in data, grouping and flattening, it only happens with the bag-of-tuples returned by the UDF, even when the DESCRIBE-ed data looks identical to the result of group/flatten/etc.

UPDATE: Here is actual code that reproduces the error. (A thousand thanks to anyone who takes the time to read through it.)

REGISTER test.jar;
A = LOAD 'test-input.txt' using PigStorage(',')
         AS (id:long, time:long, lat:double, lon:double, alt:double);
A_grouped = GROUP A BY (id);
U_out = FOREACH A_grouped
        GENERATE FLATTEN(
                test.Test(A)
        );
DESCRIBE U_out;
V = FOREACH U_out GENERATE output_tuple.id, output_tuple.time;
DESCRIBE V;
rmf test.out
STORE V INTO 'test.out' using PigStorage(',');

file 'test-input.txt':

0,1000,33,-100,5000
0,1010,33,-101,6000
0,1020,33,-102,7000
0,1030,33,-103,8000
1,1100,34,-100,15000
1,1110,34,-101,16000
1,1120,34,-102,17000
1,1130,34,-103,18000

The output:

$ pig -x local test.pig
    2013-12-18 16:47:50,467 [main] INFO  org.apache.pig.Main - Logging error messages to: /home/jsnider/pig_1387403270431.log
    2013-12-18 16:47:50,751 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: file:///
    U_out: {bag_of_tuples::output_tuple: (id: long,time: long,lat: double,lon: double,alt: double)}
    V: {id: long,time: long}
    2013-12-18 16:47:51,532 [main] INFO  org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: GROUP_BY
    2013-12-18 16:47:51,532 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - pig.usenewlogicalplan is set to true. New logical plan will be used.
    2013-12-18 16:47:51,907 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - (Name: V: Store(file:///home/jsnider/test.out:PigStorage(',')) - scope-32 Operator Key: scope-32)
    2013-12-18 16:47:51,929 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false
    2013-12-18 16:47:51,988 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 1
    2013-12-18 16:47:51,988 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 1
    2013-12-18 16:47:51,996 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.AccumulatorOptimizer - Reducer is to run in accumulative mode.
    2013-12-18 16:47:52,139 [main] INFO  org.apache.hadoop.metrics.jvm.JvmMetrics - Initializing JVM Metrics with processName=JobTracker, sessionId=
    2013-12-18 16:47:52,158 [main] INFO  org.apache.pig.tools.pigstats.ScriptState - Pig script settings are added to the job
    2013-12-18 16:47:52,199 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - mapred.job.reduce.markreset.buffer.percent is not set, set to default 0.3
    2013-12-18 16:47:54,225 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Setting up single store job
    2013-12-18 16:47:54,249 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - BytesPerReducer=1000000000 maxReducers=999 totalInputFileSize=164
    2013-12-18 16:47:54,249 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Neither PARALLEL nor default parallelism is set for this job. Setting number of reducers to 1
    2013-12-18 16:47:54,299 [main] INFO  org.apache.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
    2013-12-18 16:47:54,299 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 1 map-reduce job(s) waiting for submission.
    2013-12-18 16:47:54,308 [Thread-1] INFO  org.apache.hadoop.util.NativeCodeLoader - Loaded the native-hadoop library
    2013-12-18 16:47:54,601 [Thread-1] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
    2013-12-18 16:47:54,601 [Thread-1] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
    2013-12-18 16:47:54,627 [Thread-1] WARN  org.apache.hadoop.io.compress.snappy.LoadSnappy - Snappy native library is available
    2013-12-18 16:47:54,627 [Thread-1] INFO  org.apache.hadoop.io.compress.snappy.LoadSnappy - Snappy native library loaded
    2013-12-18 16:47:54,633 [Thread-1] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths (combined) to process : 1
    2013-12-18 16:47:54,801 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 0% complete
    2013-12-18 16:47:54,965 [Thread-1] WARN  org.apache.hadoop.conf.Configuration - file:/tmp/hadoop-jsnider/mapred/local/localRunner/job_local_0001.xml:a attempt to override final parameter: mapred.system.dir;  Ignoring.
    2013-12-18 16:47:54,966 [Thread-1] WARN  org.apache.hadoop.conf.Configuration - file:/tmp/hadoop-jsnider/mapred/local/localRunner/job_local_0001.xml:a attempt to override final parameter: fs.trash.interval;  Ignoring.
    2013-12-18 16:47:54,966 [Thread-1] WARN  org.apache.hadoop.conf.Configuration - file:/tmp/hadoop-jsnider/mapred/local/localRunner/job_local_0001.xml:a attempt to override final parameter: mapred.userlog.retain.hours;  Ignoring.
    2013-12-18 16:47:54,968 [Thread-1] WARN  org.apache.hadoop.conf.Configuration - file:/tmp/hadoop-jsnider/mapred/local/localRunner/job_local_0001.xml:a attempt to override final parameter: mapred.userlog.limit.kb;  Ignoring.
    2013-12-18 16:47:54,970 [Thread-1] WARN  org.apache.hadoop.conf.Configuration - file:/tmp/hadoop-jsnider/mapred/local/localRunner/job_local_0001.xml:a attempt to override final parameter: mapred.temp.dir;  Ignoring.
    2013-12-18 16:47:54,991 [Thread-2] INFO  org.apache.hadoop.mapred.LocalJobRunner - Waiting for map tasks
    2013-12-18 16:47:54,994 [pool-1-thread-1] INFO  org.apache.hadoop.mapred.LocalJobRunner - Starting task: attempt_local_0001_m_000000_0
    2013-12-18 16:47:55,047 [pool-1-thread-1] INFO  org.apache.hadoop.util.ProcessTree - setsid exited with exit code 0
    2013-12-18 16:47:55,053 [pool-1-thread-1] INFO  org.apache.hadoop.mapred.Task -  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@ffeef1
    2013-12-18 16:47:55,058 [pool-1-thread-1] INFO  org.apache.hadoop.mapred.MapTask - Processing split: Number of splits :1
    Total Length = 164
    Input split[0]:
       Length = 164
      Locations:

    -----------------------

    2013-12-18 16:47:55,068 [pool-1-thread-1] INFO  org.apache.hadoop.mapred.MapTask - io.sort.mb = 100
    2013-12-18 16:47:55,118 [pool-1-thread-1] INFO  org.apache.hadoop.mapred.MapTask - data buffer = 79691776/99614720
    2013-12-18 16:47:55,118 [pool-1-thread-1] INFO  org.apache.hadoop.mapred.MapTask - record buffer = 262144/327680
    2013-12-18 16:47:55,152 [pool-1-thread-1] INFO  org.apache.hadoop.mapred.MapTask - Starting flush of map output
    2013-12-18 16:47:55,164 [pool-1-thread-1] INFO  org.apache.hadoop.mapred.MapTask - Finished spill 0
    2013-12-18 16:47:55,167 [pool-1-thread-1] INFO  org.apache.hadoop.mapred.Task - Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
    2013-12-18 16:47:55,170 [pool-1-thread-1] INFO  org.apache.hadoop.mapred.LocalJobRunner -
    2013-12-18 16:47:55,171 [pool-1-thread-1] INFO  org.apache.hadoop.mapred.Task - Task 'attempt_local_0001_m_000000_0' done.
    2013-12-18 16:47:55,171 [pool-1-thread-1] INFO  org.apache.hadoop.mapred.LocalJobRunner - Finishing task: attempt_local_0001_m_000000_0
    2013-12-18 16:47:55,172 [Thread-2] INFO  org.apache.hadoop.mapred.LocalJobRunner - Map task executor complete.
    2013-12-18 16:47:55,192 [Thread-2] INFO  org.apache.hadoop.mapred.Task -  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@38650646
    2013-12-18 16:47:55,192 [Thread-2] INFO  org.apache.hadoop.mapred.LocalJobRunner -
    2013-12-18 16:47:55,196 [Thread-2] INFO  org.apache.hadoop.mapred.Merger - Merging 1 sorted segments
    2013-12-18 16:47:55,201 [Thread-2] INFO  org.apache.hadoop.mapred.Merger - Down to the last merge-pass, with 1 segments left of total size: 418 bytes
    2013-12-18 16:47:55,201 [Thread-2] INFO  org.apache.hadoop.mapred.LocalJobRunner -
    2013-12-18 16:47:55,257 [Thread-2] WARN  org.apache.hadoop.mapred.LocalJobRunner - job_local_0001
    java.lang.ClassCastException: java.lang.Long cannot be cast to org.apache.pig.data.Tuple
            at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:408)
            at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:276)
            at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:138)
            at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.getNext(POProject.java:312)
            at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:360)
            at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNext(POForEach.java:290)
            at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.runPipeline(PigMapReduce.java:434)
            at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.processOnePackageOutput(PigMapReduce.java:402)
            at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.reduce(PigMapReduce.java:382)
            at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.reduce(PigMapReduce.java:251)
            at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
            at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:572)
            at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:414)
            at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:392)
    2013-12-18 16:47:55,477 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_local_0001
    2013-12-18 16:47:59,995 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - job job_local_0001 has failed! Stop running all dependent jobs
    2013-12-18 16:48:00,008 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete
    2013-12-18 16:48:00,010 [main] ERROR org.apache.pig.tools.pigstats.PigStatsUtil - 1 map reduce job(s) failed!
    2013-12-18 16:48:00,011 [main] INFO  org.apache.pig.tools.pigstats.PigStats - Detected Local mode. Stats reported below may be incomplete
    2013-12-18 16:48:00,015 [main] INFO  org.apache.pig.tools.pigstats.PigStats - Script Statistics:

    HadoopVersion   PigVersion      UserId  StartedAt       FinishedAt      Features
    0.20.2-cdh3u6   0.8.1-cdh3u6    jsnider 2013-12-18 16:47:52     2013-12-18 16:48:00     GROUP_BY

    Failed!

    Failed Jobs:
    JobId   Alias   Feature Message Outputs
    job_local_0001  A,A_grouped,U_out,V     GROUP_BY        Message: Job failed! Error - NA file:///home/jsnider/test.out,

    Input(s):
    Failed to read data from "file:///home/jsnider/test-input.txt"

    Output(s):
    Failed to produce result in "file:///home/jsnider/test.out"

    Job DAG:
    job_local_0001


    2013-12-18 16:48:00,015 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Failed!
    2013-12-18 16:48:00,040 [main] ERROR org.apache.pig.tools.grunt.GruntParser - ERROR 2244: Job failed, hadoop does not return any error message
    Details at logfile: /home/jsnider/pig_1387403270431.log

And the three java files:

Test.java

package test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;

import org.apache.pig.Accumulator;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;

public class Test extends EvalFunc<DataBag> implements Accumulator<DataBag>
{
    public static ArrayList<Point> points = null;

    public DataBag exec(Tuple input) throws IOException {
        if (input == null || input.size() == 0)
            return null;
        accumulate(input);
        DataBag output = getValue();
        cleanup();
        return output;
    }

    public void accumulate(DataBag b) throws IOException {
        try {
            if (b == null)
                return;
            Iterator<Tuple> fit = b.iterator();
            while (fit.hasNext()) {
                Tuple f = fit.next();
                storePt(f);
            }
        } catch (Exception e) {
            int errCode = 2106;
            String msg = "Error while computing in " + this.getClass().getSimpleName();
            throw new ExecException(msg, errCode, PigException.BUG, e);           
        }
    }

    public void accumulate(Tuple b) throws IOException {
        try {
            if (b == null || b.size() == 0)
                return;
            for (Object f : b.getAll()) {
                if (f instanceof Tuple) {
                    storePt((Tuple)f);
                } else if (f instanceof DataBag) {
                    accumulate((DataBag)f);
                } else {
                    throw new IOException("tuple input is not a tuple or a databag... x__x");
                }
            }
        } catch (Exception e) {
            int errCode = 2106;
            String msg = "Error while computing in " + this.getClass().getSimpleName();
            throw new ExecException(msg, errCode, PigException.BUG, e);           
        }
    }

    @Override
    public DataBag getValue() {

        if (points == null)
            points = new ArrayList<Point>();
        Collections.sort(points);

        DataBag myBag = BagFactory.getInstance().newDefaultBag();

        for (Point pt : points) {
            Measure sm = new Measure(pt);
            myBag.add(sm.asTuple());
        }
        return myBag;
    }

    public void cleanup() {
        points = null;
    }

    public Schema outputSchema(Schema input) {
        try {
            Schema.FieldSchema tupleFs 
                = new Schema.FieldSchema("output_tuple", Measure.smSchema(), DataType.TUPLE);
            Schema bagSchema = new Schema(tupleFs);
            Schema.FieldSchema bagFs = new Schema.FieldSchema("bag_of_tuples", bagSchema, DataType.BAG);
            return new Schema(bagFs);
        } catch (Exception e){
                return null;
        }
    }

    public static void storePt(Tuple f) {
        Object[] field = f.getAll().toArray();

        Point pt = new Point(
                field[0] == null ? 0 : (Long)field[0],
                field[1] == null ? 0 : (Long)field[1],
                field[2] == null ? 0 : (Double)field[2],
                field[3] == null ? 0 : (Double)field[3],
                field[4] == null ? Double.MIN_VALUE : (Double)field[4]
            );

        if (points == null)
            points = new ArrayList<Point>();

        points.add(pt);
    }
}

Point.java:

package test;

public class Point implements Comparable<Point> {
    long id;
    long time;
    double lat;
    double lon;
    double alt;

    public Point(Point c) {
        this.id = c.id;
        this.time = c.time;
        this.lat = c.lat;
        this.lon = c.lon;
        this.alt = c.alt;
    }

    public Point(long l, long m, double d, double e, double f) {
        id = l;
        time = m;
        lat = d;
        lon = e;
        alt = f;
    }

    @Override
    public int compareTo(Point other) {
        final int BEFORE = -1;
        final int EQUAL = 0;
        final int AFTER = 1;

        if (this == other) return EQUAL;
        if (this.id < other.id) return BEFORE;
        if (this.id > other.id) return AFTER;
        if (this.time < other.time) return BEFORE;
        if (this.time > other.time) return AFTER;
        if (this.lat > other.lat) return BEFORE;
        if (this.lat < other.lat) return AFTER;
        if (this.lon > other.lon) return BEFORE;
        if (this.lon < other.lon) return AFTER;
        if (this.alt > other.alt) return BEFORE;
        if (this.alt < other.alt) return AFTER;
        return EQUAL;
    }

    public String toString() {
        return id + " " + time; 
    }
}

Measure.java:

package test;

import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.schema.Schema;

public class Measure {
    private long id;
    private long time;
    private double lat;
    private double lon;
    private double alt;

    public Measure(Point pt) {
        id = pt.id;
        time = pt.time;
        lat = pt.lat;
        lon = pt.lon;
        alt = pt.alt;
    }

    public Tuple asTuple() {
        Tuple myTuple = TupleFactory.getInstance().newTuple();
        myTuple.append(id);
        myTuple.append(time);
        myTuple.append(lat);
        myTuple.append(lon);
        myTuple.append(alt);
        return myTuple;
    }

    public static Schema smSchema() {
        Schema tupleSchema = new Schema();
        tupleSchema.add(new Schema.FieldSchema("id", DataType.LONG));
        tupleSchema.add(new Schema.FieldSchema("time", DataType.LONG));
        tupleSchema.add(new Schema.FieldSchema("lat", DataType.DOUBLE));
        tupleSchema.add(new Schema.FieldSchema("lon", DataType.DOUBLE));
        tupleSchema.add(new Schema.FieldSchema("alt", DataType.DOUBLE));
        return tupleSchema;
    }
}
¿Fue útil?

Solución

The solution is to cast the return of the UDF to the appropriate bag:

U_out = FOREACH A_grouped
    GENERATE FLATTEN(
        (bag{tuple(long,long,double,double,double)})(test.Test(A))
    ) AS (id:long, time:long, lat:double, lon:double, alt:double);

Even though the schema returned by the UDF is correct, the output still needs to be cast, in order to work correctly.

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top