I'm looking to implement a custom hadoop Writable class where one of the fields is a time stamp. I can't seem to find a class in the hadoop libraries (e. g. a Writable for Date or Calendar) which would make this easy. I'm thinking of creating a custom writable using get/setTimeInMillis on Calendar, but I'm wondering if there is a better/built-in solution to this problem.

有帮助吗?

解决方案

There is no Writable for a Calendar/Date in Hadoop. Considering that you can get the timeInMillis as a long from the Calendar object you can use the LongWritable to serialiaze a calendar object if and only if your application always uses the default UTC time zone (i.e. it's "agnostic" to time zones, it always assumes that timeInMillis represent an UTC time).

If you use another time zone or if your application needs to be able to interpret a timeInMillis with respect to various time zone, you'll have to write from scratch a default Writable implementation.

其他提示

Here's a custom writable that I generated for you to illustrate a writable with three properties, one of which is a date. You can see that the data value is persisted as a long and that it's easy to convert a long to and from a Date. If having three properties is too much, I can just generate a writable with a date for you.

package com.lmx.writable;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import com.eaio.uuid.UUID;
import org.apache.hadoop.io.*;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.DefaultDataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;

public class MyCustomWritable implements Writable {

  public static int PROPERTY_DATE = 0;
  public static int PROPERTY_COUNT = 1;
  public static int PROPERTY_NAME = 2;

  private boolean[] changeFlag = new boolean[3];

  private Date  _date;
  private int   _count;
  private String    _name;

  public MyCustomWritable() {
    resetChangeFlags();
  }

  public MyCustomWritable(Date _date, int _count, String _name) {
    resetChangeFlags();
    setDate(_date);  
    setCount(_count);  
    setName(_name);  
  }

  public MyCustomWritable(byte[] bytes) {
    ByteArrayInputStream is = new ByteArrayInputStream(bytes);
    DataInput in = new DataInputStream(is);
    try { readFields(in); } catch (IOException e) { }
    resetChangeFlags();
  }



  public Date getDate() {
    return _date;
  }

  public void setDate(Date value) {
    _date = value;
    changeFlag[PROPERTY_DATE] = true;
  }  

  public int getCount() {
    return _count;
  }

  public void setCount(int value) {
    _count = value;
    changeFlag[PROPERTY_COUNT] = true;
  }  

  public String getName() {
    return _name;
  }

  public void setName(String value) {
    _name = value;
    changeFlag[PROPERTY_NAME] = true;
  }  

  public void readFields(DataInput in) throws IOException {

            // Read Date _date

        if (in.readBoolean()) {
            _date = new Date(in.readLong());
            changeFlag[PROPERTY_DATE] = true;
        } else {
            _date = null;
            changeFlag[PROPERTY_DATE] = false;
        }       
            // Read int _count

        _count = in.readInt();
        changeFlag[PROPERTY_COUNT] = true;

            // Read String _name

        if (in.readBoolean()) {
            _name = Text.readString(in);
            changeFlag[PROPERTY_NAME] = true;
        } else {
            _name = null;
            changeFlag[PROPERTY_NAME] = false;
        }
  }

  public void write(DataOutput out) throws IOException {

            // Write Date _date

      if (_date == null) {
            out.writeBoolean(false);
      } else {
            out.writeBoolean(true);
            out.writeLong(_date.getTime());
      }

            // Write int _count

      out.writeInt(_count);

            // Write String _name

      if (_name == null) {
            out.writeBoolean(false);
      } else {
            out.writeBoolean(true);
            Text.writeString(out,_name);
      }
  }

  public byte[] getBytes() throws IOException {
      ByteArrayOutputStream os = new ByteArrayOutputStream();
      DataOutputStream out = new DataOutputStream(os);
      write(out);
      out.flush();
      out.close();
      return os.toByteArray();
  }

  public void resetChangeFlags() {
    changeFlag[PROPERTY_DATE] = false;
    changeFlag[PROPERTY_COUNT] = false;
    changeFlag[PROPERTY_NAME] = false;
  }

  public boolean getChangeFlag(int i) {
    return changeFlag[i];
  }


   public byte[] getDateAsBytes() throws IOException {
      ByteArrayOutputStream os = new ByteArrayOutputStream();
      DataOutputStream out = new DataOutputStream(os);

            // Write Date _date

      if (_date == null) {
            out.writeBoolean(false);
      } else {
            out.writeBoolean(true);
            out.writeLong(_date.getTime());
      }

      out.flush();
      out.close();
      return os.toByteArray();
   }

   public byte[] getCountAsBytes() throws IOException {
      ByteArrayOutputStream os = new ByteArrayOutputStream();
      DataOutputStream out = new DataOutputStream(os);

            // Write int _count

      out.writeInt(_count);

      out.flush();
      out.close();
      return os.toByteArray();
   }

   public byte[] getNameAsBytes() throws IOException {
      ByteArrayOutputStream os = new ByteArrayOutputStream();
      DataOutputStream out = new DataOutputStream(os);

            // Write String _name

      if (_name == null) {
            out.writeBoolean(false);
      } else {
            out.writeBoolean(true);
            Text.writeString(out,_name);
      }

      out.flush();
      out.close();
      return os.toByteArray();
   }


   public void setDateFromBytes(byte[] b) throws IOException {
      ByteArrayInputStream is = new ByteArrayInputStream(b);
      DataInput in = new DataInputStream(is);
      int len;

            // Read Date _date

        if (in.readBoolean()) {
            _date = new Date(in.readLong());
            changeFlag[PROPERTY_DATE] = true;
        } else {
            _date = null;
            changeFlag[PROPERTY_DATE] = false;
        }
   }

   public void setCountFromBytes(byte[] b) throws IOException {
      ByteArrayInputStream is = new ByteArrayInputStream(b);
      DataInput in = new DataInputStream(is);
      int len;

            // Read int _count

        _count = in.readInt();
        changeFlag[PROPERTY_COUNT] = true;

   }

   public void setNameFromBytes(byte[] b) throws IOException {
      ByteArrayInputStream is = new ByteArrayInputStream(b);
      DataInput in = new DataInputStream(is);
      int len;

            // Read String _name

        if (in.readBoolean()) {
            _name = Text.readString(in);
            changeFlag[PROPERTY_NAME] = true;
        } else {
            _name = null;
            changeFlag[PROPERTY_NAME] = false;
        }

   }

    public Tuple asTuple() throws ExecException {

        Tuple tuple = TupleFactory.getInstance().newTuple(3);

        if (getDate() == null) {
            tuple.set(0, (Long) null);
        } else {
            tuple.set(0, new Long(getDate().getTime()));
        }
        tuple.set(1, new Integer(getCount()));
        if (getName() == null) {
            tuple.set(2, (String) null);
        } else {
            tuple.set(2, getName());
        }

        return tuple;
    }

    public static ResourceSchema getPigSchema() throws IOException {

        ResourceSchema schema = new ResourceSchema();
        ResourceFieldSchema fieldSchema[] = new ResourceFieldSchema[3];
        ResourceSchema bagSchema;
        ResourceFieldSchema bagField[];

        fieldSchema[0] = new ResourceFieldSchema();
        fieldSchema[0].setName("date");
        fieldSchema[0].setType(DataType.LONG);

        fieldSchema[1] = new ResourceFieldSchema();
        fieldSchema[1].setName("count");
        fieldSchema[1].setType(DataType.INTEGER);

        fieldSchema[2] = new ResourceFieldSchema();
        fieldSchema[2].setName("name");
        fieldSchema[2].setType(DataType.CHARARRAY);

        schema.setFields(fieldSchema);
        return schema;

    }

    public static MyCustomWritable fromJson(String source) {

        MyCustomWritable obj = null;

        try {
            JSONObject jsonObj = new JSONObject(source);
            obj = fromJson(jsonObj);
        } catch (JSONException e) {
            System.out.println(e.toString());
        }

        return obj; 
    }

    public static MyCustomWritable fromJson(JSONObject jsonObj) {

        MyCustomWritable obj = new MyCustomWritable();

        try {

            if (jsonObj.has("date")) {
                obj.setDate(new Date(jsonObj.getLong("date")));
            }

            if (jsonObj.has("count")) {
                obj.setCount(jsonObj.getInt("count"));
            }

            if (jsonObj.has("name")) {
                obj.setName(jsonObj.getString("name"));
            }

        } catch (JSONException e) {
            System.out.println(e.toString());
            obj = null;
        }

        return obj; 
    }

    public JSONObject toJson() {

        try {
            JSONObject jsonObj = new JSONObject();
            JSONArray jsonArray;

            if (getDate() != null) {
                jsonObj.put("date", getDate().getTime());
            }
            jsonObj.put("count", getCount());

            if (getName() != null) {
                jsonObj.put("name", getName());
            }
            return jsonObj; 
        } catch (JSONException e) { }

        return null;    
    }

    public String toJsonString() {

        return toJson().toString(); 

    }
}
许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top