Question

I want to attach different files to different reducers. Is it possible using distributed cache technology in hadoop?

I able to attach the same file(files) to all the reducers. But due to memory constraints, I want to know if I can attach different files to different reducers.

Forgive me if its an ignorant question.

Pls help!

Thanks in advance!

Was it helpful?

Solution

It is a strange desire since any reducer is not bound to a particular node and during the execution a reducer can be run on any node or even nodes (if there is a failure or speculative execution). Therefore all reducers should be homogeneous, the only thing that differs them is data they process.

So I suppose when you say that you want to put different files on different reducers you actually want to put different files on reducer and those files should correspond to the data (keys) those reducers will be processing.

The only one way I know to do it is put your data on HDFS and read it from reducer when it start processing data.

OTHER TIPS

Also it may be worth trying to use an in-memory compute/data grid technology like GridGain, Infinispan, etc... This way you can load your data in memory and you would not have any limits on how to map your computational jobs (map/reduce) to any data using data affinity.

package com.a;

import javax.security.auth.login.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class PrefixNew4Reduce4 extends MapReduceBase implements Reducer<Text, Text,   Text, Text>{
//  @SuppressWarnings("unchecked")


 ArrayList<String> al = new ArrayList<String>();
public void configure(JobConf conf4)
{

    String from = "home/users/mlakshm/haship"; 

    OutputStream dst = null;
    try {
        dst = new BufferedOutputStream(new FileOutputStream(to, false));
    } catch (FileNotFoundException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } /* src (hdfs file) something like hdfs://127.0.0.1:8020/home/rystsov/hi                                         */


    FileSystem fs = null;
    try {
        fs = FileSystem.get(new URI(from), conf4);
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (URISyntaxException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    FSDataInputStream src;
    try {
        src = fs.open(new Path(from));

        String val = src.readLine();
        StringTokenizer st = new StringTokenizer(val);

        al.add(val);


        System.out.println("val:----------------->"+val);

    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }



}



    public void reduce (Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {


        StringTokenizer stk = new StringTokenizer(key.toString());
        String t = stk.nextToken();
        String i = stk.nextToken();
        String j = stk.nextToken();

    ArrayList<String> al1 = new ArrayList<String>();

           for(int i = 0; i<al.size(); i++)
            {

                     boolean a = (al.get(i).equals(i)) || (al.get(i).equals(j));

                     if(a==true)
                     {

                         output.collect(key, new Text(al.get(i));                              

                     }


      while(values.hasNext())
          {

             String val = values.next().toString();
             al1.add(val);

      }

for(int i = 0; i<al1.size(); i++)
{
output.collect(key, new Text(al1.get(i));
}
Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top