Question

I have 32 machine threads and one ConcurrentHashMap<Key,Value> map, which contains a lot of keys. Key has defined a public method visit(). I want to visit() every element of map exactly once using the processing power I have available and possibly some sort of thread pooling.

Things I could try:

  • I could use the method map.keys(). The resulting Enumeration<Key> could be iterated over using nextElement(), but since a call to key.visit() is very brief I won't manage to keep threads busy. The Enumeration is inherently single-threaded.
  • I could use a synchronised HashSet<Key> instead, invoke a method toArray() and split the work on the array into all 32 threads. I seriously doubt in this solution, since the method toArray() will likely be a single-thread bottle-neck.
  • I could try to inherit from ConcurrentHashMap, get my hands on the instances of its inner Segment<K,V>, try to group them into 32 groups and work on each group separately. This sounds like a hardcore approach though.
  • or similar magic with Enumeration<Key>.

Ideally:

  • Ideally a ConcurrentHashMap<Key, Value> would define a method keysEnumerator(int approximatePosition), which could drop me an enumerator missing approximately first 1/32 elements, i.e. map.keysEnumerator(map.size()/32)

Am I missing anything obvious? Has anybody run into similar problem before?

EDIT

I've had a go at profiling to see whether this problem is actually going to affect the performance in practice. As I don't have access to the cluster at the moment I used my laptop and tried to extrapolate the results to a bigger dataset. On my machine I can create a 2 million keys ConcurrentHashMap and it takes about 1 second to iterate over it invoking the visit() method on every key. The program is supposed to scale to 85 million keys (and over). The cluster's processor is slightly faster, but it still should take about 40 seconds to iterate over entire map. Now a few words about the logic flow of the program. The logic presented is sequential, i.e. it is not allowed for any thread to proceed to the next step until all the threads in the previous step are finished:

  1. Create the hash map, create the keys and populate the hash map
  2. Iterate over entire hash map visiting all the keys.
  3. Do some data shuffling which is parallel insertions and deletions.
  4. Repeat step 2 and 3 a few hundred times.

That logic flow means that a 40 second iteration is going to be repeated a few hundred times, say 100. Which gives us a bit over an hour spent just in visiting the nodes. With a set of 32 parallel iterators it could go down to just a few minutes, which is a significant performance improvement.

Now a few words on how ConcurrentHashMap works (Or how I believe it works). Every ConcurrentHashMap consists of segments (by default 16). Every write to a hash map is synchronised on a relevant segment. So say we're trying to write two new keys k1 and k2 to the hash map and that they would be resolved to belong to the same segment, say s1. If they are attempted to be written simultaneously, one of them is going to acquire the lock first and be added earlier then the other. What is the chance of two elements to be resolved to belong to the same segment? In case we have got a good hash function and 16 segements it is 1/16.

I belive that ConcurrentHashMap should have a method concurrentKeys(), which would return an array of Enumerations, one per each segment. I have got a few ideas how to add it to ConcurrentHashMap through inheritance and I'll let you know if I succeed. As for now the solution seems to be to create an array of ConcurrentHashMaps and pre-hashing every key to resolve to one member of such array. I'll share that code as well, once it's ready.

EDIT

This is the same problem in a different language:

Parallel Iterators

Was it helpful?

Solution 3

The solution I will eventually go for is an array of ConcurrentHashMaps instead of one ConcurrentHashMap. This is ad hoc, but seems to be relevant for my usecase. I don't care about the second step being slow as it doesn't affect my code's performance. The solution is:

Object Creation:

  1. Create an array of size t of ConcurrentHashMaps, where t is a number of threads.
  2. Create an array of Runnables, also of size t.

Array Population (single threaded, not an issue):

  1. Create the keys and apply pre-hash function, which will return an int in the range 0 ... t-1. In my case simply modulo t.
  2. Put the key in the hashmap, by accessing appropriate entry in the array. E.g. if the pre-hash has resulted in index 4, then go for hashArray[4].put(key)

Array Iteration (nicely multithreaded, performance gain):

  1. Assign every thread from Runnables array a job of iterating over the hashmap with a corresponding index. This should give give a t times shorter iteration as opposed to single threaded.

To see the proof of concept code (as it's got some dependencies from the project I can't post it here) head towards my project on github

EDIT

Actually, implementing the above proof of concept for my system has proven to be time-consuming, bug-prone and grossly disappointing. Additionally I've discovered I would have missed many features of the standard library ConcurrentHashMap. The solution I have been exploring recently, which looks much less ad-hoc and much more promising is to use Scala, which produces bytecode that is fully interoperable with Java. The proof of concept relies on stunning library described in this paper and AFAIK it is currently IMPOSSIBLE to achieve a corresponding solution in vanilla Java without writing thousands lines of code, given the current state of the standard library and corresponding third-party libraries.

import scala.collection.parallel.mutable.ParHashMap

class Node(value: Int, id: Int){
    var v = value
    var i = id
    override def toString(): String = v toString
}

object testParHashMap{
    def visit(entry: Tuple2[Int, Node]){
        entry._2.v += 1
    }
    def main(args: Array[String]){
        val hm = new ParHashMap[Int, Node]()
        for (i <- 1 to 10){
            var node = new Node(0, i)
            hm.put(node.i, node)
        }

        println("========== BEFORE ==========")
        hm.foreach{println}

        hm.foreach{visit}

        println("========== AFTER ==========")
        hm.foreach{println}

    }
}

OTHER TIPS

I could try to inherit from ConcurrentHashMap, get my hands on the instances of its inner Segment, try to group them into 32 groups and work on each group separately. This sounds like a hardcore approach though.

Hardcore indeed but about the only thing I would see that would work. toArray() builds the array by doing an enumeration so no win there. I can't believe that a synchronized HashSet would be better unless the ratio of visit() runs to other map operations is pretty high.

The problem with the working with the Segments is that you are going to have to be extremely careful that your code is resilient because I assume other threads may be altering the table at the same time you are visiting the nodes and you need to avoid the inevitable race conditions. Delicate for sure.

The big question in my mind is if this is necessary? Has a profiler or timing runs shown to you that this is taking too long to visit() each of the keys in one thread? Have you tried to do a thread-pool for each visit() call and have one thread doing the enumeration and the pool threads doing the visit()?

If I were you I'd just try iterating the key set of ConcurrentHashMap first. You could try passing the processing of keys off to a thread pool (in bundles, if the task is too light weight), or even to a ForkJoin task but you should do that only if it's really necessary.

Having said that you could use a ConcurrentSkipListMap, in which you can get a NavigableSet of keys. You can then take out partitions from this by using the subSet method. However, ConcurrentHashMap would have better performance for put, get operations (note also it would use compareTo rather than hashCode). Situations where this is better seems pretty unlikely.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top