Domanda

Problem description

We have a Hadoop cluster on which we store data which is serialized to bytes using Kryo (a serialization framework). The Kryo version which we used to do this has been forked from the official release 2.21 to apply our own patches to issues we have experienced using Kryo. The current Kryo version 2.22 also fixes these issues, but with different solutions. As a result, we cannot just change the Kryo version we use, because this would mean that we would no longer be able to read the data which is already stored on our Hadoop cluster. To address this problem, we want to run a Hadoop job which

  1. reads the stored data
  2. deserializes the data stored with the old version of Kryo
  3. serializes the restored objects with the new version of Kryo
  4. writes the new serialized representation back to our data store

The problem is that it is not trivial to use two different versions of the same class in one Java program (more precisely, in a Hadoop job's mapper class).

Question in a nutshell

How is it possible to deserialize and serialize an object with two different versions of the same serialization framework in one Hadoop job?

Relevant facts overview

  • We have data stored on a Hadoop CDH4 cluster, serialized with a Kryo version 2.21.2-ourpatchbranch
  • We want to have the data serialized with Kryo version 2.22, which is incompatible to our version
  • We build our Hadoop job JARs with Apache Maven

Possible (and impossible) approaches

(1) Renaming packages

The first approach which has come to our minds was to rename the packages in our own Kryo branch using the relocation functionality of the Maven Shade plugin and release it with a different artifact ID so we could depend on both artifacts in our conversion job project. We would then instantiate one Kryo object of both the old and the new version and use the old one for deserialization and the new one for serializing the object again.

Problems
We don't use Kryo explicitly in Hadoop jobs, but rather access it through multiple layers of our own libraries. For each of these libraries, it would be necessary to

  1. rename involved packages and
  2. create a release with a different group or artifact ID

To make things even more messy, we also use Kryo serializers provided by other 3rd party libraries for which we would have to do the same thing.


(2) Using multiple class loaders

The second approach we came up with was to not depend on Kryo at all in the Maven project which contains the conversion job, but load the required classes from a JAR for each version, which is stored in Hadoop's distributed cache. Serializing an object would then look something like this:

public byte[] serialize(Object foo, JarClassLoader cl) {
    final Class<?> kryoClass = cl.loadClass("com.esotericsoftware.kryo.Kryo");
    Object k = kryoClass.getConstructor().newInstance();
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    final Class<?> outputClass = cl.loadClass("com.esotericsoftware.kryo.io.Output");

    Object output = outputClass.getConstructor(OutputStream.class).newInstance(baos);
    Method writeObject = kryoClass.getMethod("writeObject", outputClass, Object.class);
    writeObject.invoke(k, output, foo);
    outputClass.getMethod("close").invoke(output);
    baos.close();
    byte[] bytes = baos.toByteArray();
    return bytes;
}

Problems
Though this approach might work to instantiate an unconfigured Kryo object and serialize / restore some object, we use a much more complex Kryo configuration. This includes several custom serializers, registered class ids et cetera. We were for example unable to figure out a way to set custom serializers for classes without getting a NoClassDefFoundError - the following code does not work:

Class<?> kryoClass = this.loadClass("com.esotericsoftware.kryo.Kryo");
Object kryo = kryoClass.getConstructor().newInstance();
Method addDefaultSerializer = kryoClass.getMethod("addDefaultSerializer", Class.class, Class.class);
addDefaultSerializer.invoke(kryo, URI.class, URISerializer.class); // throws NoClassDefFoundError

The last line throws a

java.lang.NoClassDefFoundError: com/esotericsoftware/kryo/Serializer

because the URISerializer class references Kryo's Serializer class and tries to load it using its own class loader (which is the System class loader), which does not know the Serializer class.


(3) Using an intermediate serialization

Currently the most promising approach seems to be using an independant intermediate serialization, e.g. JSON using Gson or alike, and then running two separate jobs:

  1. kryo:2.21.2-ourpatchbranch in our regular store -> JSON in a temporary store
  2. JSON in the temporary store -> kryo:2-22 in our regular store

Problems
The biggest problem with this solution is the fact that it roughly doubles the space consumption of the data processed. Moreover, we need another serialization method which works without problems on all of our data, which we would need to investigate first.

È stato utile?

Soluzione

I would use the multiple classloaders approach.

(Package renaming will also work. It does seem ugly, but this is a one-off hack so beauty and correctness can take a back seat. Intermediate serialization seems risky - there was a reason you are using Kryo, and that reason will be negated by using a different intermediate form).

The overall design would be:

child classloaders:      Old Kryo     New Kryo   <-- both with simple wrappers
                                \       /
                                 \     /
                                  \   /
                                   \ /
                                    |
default classloader:    domain model; controller for the re-serialization
  1. Load the domain object classes in the default classloader
  2. Load a Jar with the modified Kryo version and wrapper code. The wrapper has a static 'main' method with one argument: The name of the file to deserialize. Call the main method via reflection from the default classloader:

        Class deserializer = deserializerClassLoader.loadClass("com.example.deserializer.Main");
        Method mainIn = deserializer.getMethod("main", String.class);
        Object graph = mainIn.invoke(null, "/path/to/input/file");
    
    1. This method:
      1. Deserializes the file as one object graph
      2. Places the object into a shared space. ThreadLocal is a simple way, or returning it to the wrapper script.
  3. When the call returns, load a second Jar with the new serialization framework with a simple wrapper. The wrapper has a static 'main' method and an argument to pass the name of the file to serialize in. Call the main method via reflection from the default classloader:

        Class serializer = deserializerClassLoader.loadClass("com.example.serializer.Main");
        Method mainOut = deserializer.getMethod("main", Object.class, String.class);
        mainOut.invoke(null, graph, "/path/to/output/file");
    
    1. This method
      1. Retrieves the object from the ThreadLocal
      2. Serializes the object and writes it to the file

Considerations

In the code fragments, one classloader is created for each object serialization and deserialization. You probably want to load the classloaders only once, discover the main methods and loop over the files, something like:

for (String file: files) {
    Object graph = mainIn.invoke(null, file + ".in");
    mainOut.invoke(null, graph, file + ".out");
}

Do the domain objects have any reference to any Kryo class? If so, you have difficulties:

  1. If the reference is just a class reference, eg to call a method, then the first use of the class will load one of the two Kryo versions into the default classloader. This probably will cause problems as part of the serialization or deserialization might be performed by the wrong version of Kryo
  2. If the reference is used to instantiate any Kryo objects and store the reference in the domain model (class or instance members), then Kryo will actually be serializing part of itself in the model. This may be a deal-breaker for this approach.

In either case, your first approach should be to examine these references and eliminate them. One approach to ensure that you have done this is to ensure the default classloader does not have access to any Kryo version. If the domain objects reference Kryo in any way, the reference will fail (with a ClassNotFoundError if the class is referenced directly or ClassNotFoundException if reflection is used).

Altri suggerimenti

For 2, you can create two jar files that contain the serializer and all the dependencies for the new and old versions of your serializer as shown here. Then create a map reduce job that loads each version of your code in a separate class loader, and add some glue code in the middle which deserializes with the old code, then serializes with the new code.

You will have to be careful that your domain object is loaded in the same class loader as your glue code, and the code to serialize/deserialize depends on the same class loader as your glue code so that they both see the same domain object class.

The most easiest way I would come up without thinking is using an additional Java application doing the transformation for you. So you send the binary data to the secondary java application (simple local sockets would do the trick nicely) so you do not have to fiddle with classloaders or packages.

The only thing to think about is the intermediate representation. You might want to use another serialization mechanism or if time is no issue you might want to use the internal serialization of Java.

Using a second Java application saves you from dealing with a temporary storage and do everything in memory.

And once you have those sockets + second application code you find tons of situations where this comes handy.

Also one can build a local cluster using jGroups and save the hassle with sockets after all. jGroups is the most simply communication API I know off. Just form a logical channel and check who joins. And best it even works within the same JVM which makes testing easy and if done remotely one can bind different physical server together just the same way it would work for local applications.

Another variable alternative is using ZeroMQ with its ipc (inter process communication) protocol.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top