Question

I am trying to write a program in Java using ExecutorService and its function invokeAll. My question is: does the invokeAll function solve the tasks simultaneously? I mean, if I have two processors, there will be two workers at the same time? Because aI can't make it scale correctly. It takes the same time to complete the problem if I give newFixedThreadPool(2) or 1.

List<Future<PartialSolution>> list = new ArrayList<Future<PartialSolution>>();
Collection<Callable<PartialSolution>> tasks = new ArrayList<Callable<PartialSolution>>();
for(PartialSolution ps : wp)
{
    tasks.add(new Map(ps, keyWords));
}
list = executor.invokeAll(tasks);

Map is a class that implements Callable and wp is a vector of Partial Solutions, a class that holds some information in different times.

Why doesn't it scale? What could be the problem?

This is the code for PartialSolution:

import java.util.HashMap;
import java.util.Vector;

public class PartialSolution 
{
    public String fileName;//the name of a file
    public int b, e;//the index of begin and end of the fragment from the file
    public String info;//the fragment
    public HashMap<String, Word> hm;//here i retain the informations
    public HashMap<String, Vector<Word>> hmt;//this i use for the final reduce

    public PartialSolution(String name, int b, int e, String i, boolean ok)
    {
        this.fileName = name;
        this.b = b;
        this.e = e;
        this.info = i;
        hm = new HashMap<String, Word>();
        if(ok == true)
        {
            hmt = new HashMap<String, Vector<Word>>();
        }
        else
        {
             hmt = null;
        }    
    }
}

An this is the code for Map:

public class Map implements Callable<PartialSolution>
{
    private PartialSolution ps;
    private Vector<String> keyWords;

    public Map(PartialSolution p, Vector<String> kw)
    {
        this.ps = p;
        this.keyWords = kw;
    }

    @Override
    public PartialSolution call() throws Exception 
    {
        String[] st = this.ps.info.split("\\n");
        for(int j = 0 ; j < st.length ; j++)
        {
            for(int i = 0 ; i < keyWords.size() ; i++)
            {
                if(keyWords.elementAt(i).charAt(0) != '\'')
                {
                    int k = 0;
                    int index = 0;
                    int count = 0;

                    while((index = st[j].indexOf(keyWords.elementAt(i), k)) != -1)
                    {
                        k = index + keyWords.elementAt(i).length();
                        count++;
                    }
                    if(count != 0)
                    {
                        Word wr = this.ps.hm.get(keyWords.elementAt(i));
                        if(wr != null)
                        {
                            Word nw = new Word(ps.fileName);
                            nw.nrap = wr.nrap + count;
                            nw.lines = wr.lines;
                            int grep = count;
                            while(grep > 0)
                            {
                                nw.lines.addElement(ps.b + j);
                                grep--;
                            }
                            this.ps.hm.put(keyWords.elementAt(i), nw);
                        }
                        else
                        {
                            Word nw = new Word(ps.fileName);
                            nw.nrap = count;
                            int grep = count;
                            while(grep > 0)
                            {
                                nw.lines.addElement(ps.b + j);
                                grep--;
                            }
                            this.ps.hm.put(keyWords.elementAt(i), nw);
                        }
                    }
                } 
                else
                {
                    String regex = keyWords.elementAt(i).substring(1, keyWords.elementAt(i).length() - 1);
                    StringBuffer sb = new StringBuffer(regex);
                    regex = sb.toString();
                    Pattern pt = Pattern.compile(regex);
                    Matcher m = pt.matcher(st[j]);
                    int count = 0;
                    while(m.find())
                    {
                        count++;
                    }
                    if(count != 0)
                    {
                        Word wr = this.ps.hm.get(keyWords.elementAt(i));
                        if(wr != null)
                        {
                            Word nw = new Word(this.ps.fileName);
                            nw.nrap = wr.nrap + count;
                            nw.lines = wr.lines;
                            int grep = count;
                            while(grep > 0)
                            {
                                nw.lines.addElement(ps.b + j);
                                grep--;
                            }
                            this.ps.hm.put(keyWords.elementAt(i), nw);
                        }
                        else
                        {
                            Word nw = new Word(this.ps.fileName);
                            nw.nrap = count;
                            int grep = count;
                            while(grep > 0)
                            {
                                nw.lines.addElement(ps.b + j);
                                grep--;
                            }
                            this.ps.hm.put(keyWords.elementAt(i), nw);
                        }
                    }
                }
            }
        }
        this.ps.info = null;
        return this.ps;
    }
}

So in Map i take every line from the fragment and search for every expression the number of appearances and i save also the number of line. After i process all the fragment, in the same PartialSolution i save the informations in a hash map and return the new PartialSolution. In the next step i combine the PartialSolutions with the same fileName and introduce them in a Callable class Reduce, who is the same as map, the difference is that it makes other operations, but returns also a PartialSolution.

This is the code to run the Map tasks:

List<Future<PartialSolution>> list = new ArrayList<Future<PartialSolution>>();
Collection<Callable<PartialSolution>> tasks = new ArrayList<Callable<PartialSolution>>();
for(PartialSolution ps : wp)
{
   tasks.add(new Map(ps, keyWords));
}    
list = executor.invokeAll(tasks);

In task i create task of type Map and in list i obtain them. I don't know how to read the JVM thread dump. I hope it's good enough what informations i gave you. I work in NetBeans 7.0.1 if that helps.

Thank you, Alex

Was it helpful?

Solution

What i want to know is if the method invokeAll, if i created the ExcutorService with 10 threads, will solve 10 tasks at the same time or will solve one at a time?

If you submit ten tasks to an ExecutorService with ten threads, it will run them all concurrently. Whether they can proceed completely parallel and independent from each-other depends on what they are doing. But they will each have their own thread.

And another question, if i say list.get(i).get() this will return the PartialSolution after it was solved?

Yes, it will block until the computation is done (if not done already) and return its result.

I really don't understand why doesn't the time improves if i use 2 threads instead of 1.

We need to see more code. Do they synchronize on some shared data? How long do these tasks take? If they are very short, you may not notice any difference. If they take longer, look at the JVM thread dump to verify that all of them are running.

OTHER TIPS

If you create the thread pool with two threads, then two tasks will be run at the same time.

There's two things I see that could be causing two threads to take the same amount of time as one thread.

If only one of your Map tasks is taking the majority of your time, then extra threads will not make that one task run faster. It cannot finish faster than the slowest job.

The other possibility is that your map task reads from a shared vector often. This could be causing enough contention to cancel out the gain from having two threads.

You should bring this up in jvisualvm to see what each thread is doing.

Java 8 has introduced one more API in Executors - newWorkStealingPool to create work stealing pool. You don't have to create RecursiveTask and RecursiveAction but still can use ForkJoinPool.

public static ExecutorService newWorkStealingPool()

Creates a work-stealing thread pool using all available processors as its target parallelism level.

By default, it will take number of CPU cores as parameter for parallelism. If you have core CPUs, you can have 8 threads to process the work task queue.

Work stealing of idle worker threads from busy worker threads improves overall performance. Since task queue is unbounded in nature, this ForkJoinPool is recommended for the tasks executing in short time intervals.

Either ExecutorService or ForkJoinPool or ThreadPoolExecutor performance would be good if you don't have shared data and shared locking (synchronization) and inter thread communication. If all task are independent of each other in task queue, performance would be improved.

ThreadPoolExecutor constructor to customize and control workflow of tasks:

 ThreadPoolExecutor(int corePoolSize, 
                       int maximumPoolSize, 
                       long keepAliveTime, 
                       TimeUnit unit, 
                       BlockingQueue<Runnable> workQueue, 
                       ThreadFactory threadFactory,
                       RejectedExecutionHandler handler)

Have a look at related SE questions:

How to properly use Java Executor?

Java's Fork/Join vs ExecutorService - when to use which?

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top