Question

I'm implementing a custom MapReduce (for school, so please do not suggest using Hadoop, or existing solutions), the problem I have is "storing" intermediate data between Map and Reduce phases.

I was thinking of using a ConcurrentHashMap, CHM_key == Map's_Key, and as its value an Arraylist that contained all Map's_values associated on a Map's_Key, so the reduce can simply aggregate this values.

However, I can't think of a way to mutate the Arraylist instance of the CHM atomically without locking the whole collection.

I understand this collection implements an putifAbsent and a replace method; actually putifAbsent is usefull because if the key does not exist I just put a new ArrayList and done.

However... replacing the Arraylist is not that straightforward, because I have to obtain it, add the new value, and replace it, which can't be done atomically without locking the whole collection...

Was it helpful?

Solution

If anyone is interested I found the solution, actually is very simple in this case; you only have to lock the value!

public class MapReduceDictionary extends ConcurrentHashMap<String, ArrayList<Integer>>{

    private static final long serialVersionUID = 1L;

    public void addValue(String key, int value) throws InterruptedException {
        ArrayList<Integer> absentArr = new ArrayList<Integer>();
        absentArr.add(value);
        if (putIfAbsent(key, absentArr) == null) return;

        synchronized (get(key)) {
            get(key).add(value);
        }

    }
}

I debbuged this and seems to work, but if you have any comment on how this can fail, please tell me!

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top