Вопрос

I'm trying to create an indexing strategy which requires the index blocks to be on the same datanode as the data blocks to reduce latency at the times of data retrieval. I've managed to write the code for reading the datablocks related to a specific file. For writing, I open up a socket connection to a specific datanode, write my data, and then close the socket. Unfortunately, I'm not sure 'where' or 'how' the data is being written using this method because when I query the HDFS using hadoop fs -ls I can't see my data written anywhere (in some stray file maybe?!), but my program executes without any errors.

Here's my code:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;

import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.FileChannel;
import java.nio.file.OpenOption;
import java.nio.file.StandardOpenOption;
import java.util.Random;
import java.nio.ByteBuffer;

import javax.net.SocketFactory;

import org.apache.hadoop.security. UserGroupInformation;

public class CopyOfChunkedIndexes {

    public static void main(String[] args) throws Exception {
        if (args.length != 1) {
            System.err.println("Usage: ChunkedIndexes <input path>");
            System.exit(-1);
        }


        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://localhost:9000");               //for defaulting to HDFS rather than local filesystem
        conf.set("hadoop.security.authentication", "simple");               //disable authentication
        conf.set("hadoop.security.authorization", "false");                 //disable authorization

        Job job = Job.getInstance(conf, "Chunked Indexes");
        job.setJarByClass(CopyOfChunkedIndexes.class);


        Path inputPath = new Path("/user/hadoop-user/sample.txt");
        FileInputFormat.setInputPaths(job, inputPath);

        try{
            FileSystem fs = FileSystem.get(conf);
            DistributedFileSystem dfs = (DistributedFileSystem) fs;
            DFSClient dfsclient = dfs.getClient();



            System.out.println("Proceeding for file: " + inputPath.toString());

            FileStatus fileStatus = fs.getFileStatus(inputPath);
            BlockLocation[] bLocations = fs.getFileBlockLocations(inputPath, 0, fileStatus.getLen());


            for(int i = 0; i < bLocations.length; i++)
            {
                System.out.println("Block[" + + i + "]::");
                System.out.println("\nHost(s): ");

                String[] temp = bLocations[i].getHosts();
                for(int j = 0; j < temp.length; j++)
                {
                    System.out.println(temp[j] + "\t");
                }

                System.out.println("\nBlock length: " + bLocations[i].getLength() +
                                    "\n\nDataNode(s) hosting this block: ");

                temp = bLocations[i].getNames();
                for(int j = 0; j < temp.length; j++)
                {
                    System.out.println(temp[j] + "\t");
                }

                System.out.println("\nOffset: " + bLocations[i].getOffset());

                //READING A BLOCK
                FSDataInputStream in = fs.open(inputPath);
                in.seek(bLocations[i].getOffset());

                byte[] buf = new byte[(int)bLocations[i].getLength()];
                in.read(buf, (int)bLocations[i].getOffset(), (int)bLocations[i].getLength());
                in.close();

                System.out.println(new String(buf, "UTF-8"));
                System.out.println("--------------------------------------------------------------------------------------------");
            }


            //WRITE A FILE TO A SPECIFIC DATANODE
            for(int i = 0; i < bLocations.length; i++)
            {
                System.out.println("Block[" + + i + "]::");
                String[] temp;

                System.out.println("\n\nDataNode(s) hosting this block: ");                        //Name(s) = datanode addresses

                temp = bLocations[i].getNames();
                for(int j = 0; j < temp.length; j++)
                {
                    System.out.println(temp[j].split(":")[0] + "\t" + temp[j].split(":")[1]);      //host vs. port
                }

                Socket sock = SocketFactory.getDefault().createSocket();
                InetSocketAddress targetAddr = new InetSocketAddress(temp[0].split(":")[0], Integer.parseInt(temp[0].split(":")[1]));
                NetUtils.connect(sock, targetAddr, 10000);
                sock.setSoTimeout(10000);

                OutputStream baseStream = NetUtils.getOutputStream(sock, 10000);
                DataOutputStream oStream = new DataOutputStream(new BufferedOutputStream(baseStream, 10000));
                oStream.writeBytes("-----------------------------------------Sample text-----------------------------------------------");


                sock.close();
                System.out.println("Data written, socket closed!");
            }

        }catch(Exception ex){
            ex.printStackTrace();
        }

    }
}

Any help as to where I'm going wrong would be deeply appreciated! Thanks!

[PS: I'm using Hadoop 2.2.0 on a Linux VM. I've disabled authorization/authentication in my code above because I want to access a datanode directly (without the 'overhead' of authentication) as this is for testing purposes.]

Это было полезно?

Решение 2

There's a class called BlockPlacementPolicy, which (in theory) can be extended for customizing how HDFS choose a datanode. Though hacky, this approach may work for someone looking to do something similar in the future and stumbles upon this question.

Другие советы

All the edits are discarded by the cluster because you didn't pass throught the namenode. All your modifications are treated as file corruption.

Hadoop already does the job for you: when you want to execute a distributed task on an Hadoop cluster, the nearest data are loaded for a task. For instance if you have an elasticsearch cluster and an Hadoop cluster sharing the same hardware, you just have to create a mapreduce task that will use the local elasticsearch node, and that's all: no network dance for your data, all the tasks will load a partial set of data and push them to the local elasticsearch instance.

Enjoy!

Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top