Question

I have a problem with some threads.

My script

1 - loads like over 10 millions lines into an Array from a text file

2 - creates an ExecutorPool of 5 fixed threads

3 - then it is iterating that list and add some threads to the queue

executor.submit(new MyCustomThread(line,threadTimeout,"[THREAD "+Integer.toString(increment)+"]"));

Now the active threads never bypass 5 fixed threads, which is good, but i obseved that my processor goes into 100% load, and i have debuged a little bit and i saw that MyCustomThread constructor is being called, witch means that no matter if i declare 5 fixed threads, the ExecutorService will still try to create 10 milions objects.

The main question is : How do i prevent this? I just want to have threads being rejected if they don't have room, not to create 10 million object and run them one by one.

Second question : How do i get the current active threads? I tried threadGroup.activeCount() but it always give me 5 5 5 5 ....

THE CALLER CLASS :

System.out.println("Starting threads ...");
final ThreadGroup threadGroup = new ThreadGroup("workers");
//ExecutorService executor = Executors.newFixedThreadPool(howManyThreads);

ExecutorService executor = Executors.newFixedThreadPool(5,new ThreadFactory() {
    public Thread newThread(Runnable r) {
        return new Thread(threadGroup, r);
    }
});

int increment = 0;              
for(String line : arrayOfLines)
{
    if(increment > 10000)
    {
        //System.out.println("TOO MANY!!");
        //System.exit(0);
    }

    System.out.println(line);
    System.out.println(threadGroup.activeCount());

    if(threadGroup.activeCount() >= 5)
    {
        for(int i = 0; i < 10; i++)
        {
            System.out.println(threadGroup.activeCount());
            System.out.println(threadGroup.activeGroupCount());
            Thread.sleep(1000);
        }
    }


    try
    {
        executor.submit(new MyCustomThread(line,threadTimeout,"[THREAD "+Integer.toString(increment)+"]"));
    }
    catch(Exception ex)
    {
        continue;
        //System.exit(0);
    }

    increment++;
}

executor.awaitTermination(10, TimeUnit.MILLISECONDS);
executor.shutdown();

THREAD CLASS :

public class MyCustomThread extends Thread
{
    private String ip;
    private String threadName;
    private int threadTimeout = 10;

    public MyCustomThread(String ip)
    {
        this.ip = ip;
    }

    public MyCustomThread(String ip,int threadTimeout,String threadName)
    {

        this.ip = ip;
        this.threadTimeout = threadTimeout;
        this.threadName = threadName;

        System.out.prinln("MyCustomThread constructor has been called!");
    }

    @Override
    public void run()
    {
        // do some stuff that takes time ....
    }
}

Thank you.

Was it helpful?

Solution

You are doing it a bit wrong. The philosophy with executors is that you implement the work unit as a Runnable or a Callable (instead of a Thread). Each Runnable or Callable should do one atomic piece of work which is mutually exclusive of other Runnables or Callables.

Executor services internally use a pool of threads so your creating a thread group and Thread is not doing any good.

Try this simple piece:

ExecutorService executor = Executors.newFixedThreadPool(5);`
executor.execute(new MyRunnableWorker());

public class MyRunnableWorker implements Runnable{
    private String ip;
    private String threadName;
    private int threadTimeout = 10;

    public MyRunnableWorker(String ip){
        this.ip = ip;
    }

    public MyRunnableWorker(String ip,int threadTimeout,String threadName){
        this.ip = ip;
        this.threadTimeout = threadTimeout;
        this.threadName = threadName;

        System.out.prinln("MyRunnableWorker constructor has been called!");
    }

    @Override
    public void run(){    {
        // do some stuff that takes time ....
    }
}

This would give you what you want. Also try to test you thread code execution using visualVM to see how threads are running and what the load distribution.

OTHER TIPS

I think your biggest problem here is that MyCustomThread should implement Runnable, not extend Thread. When you use an ExecutorService you let it handle the Thread management (i.e. you don't need to create them.)

Here's an approximation of what I think you're trying to do. Hope this helps.

public class FileProcessor
{

    public static void main(String[] args)
    {

        List<String> lines = readFile();
        System.out.println("Starting threads ...");
        ExecutorService executor = Executors.newFixedThreadPool(5);

        for(String line : lines)
        {
            try
            {
                executor.submit(new MyCustomThread(line));
            }
            catch(Exception ex)
            {
                ex.printStackTrace();
            }
        }

        try
        {
            executor.shutdown();
            executor.awaitTermination(10, TimeUnit.SECONDS);
        }
        catch (InterruptedException e)
        {
            System.out.println("A processor took longer than the await time to complete.");
        }
        executor.shutdownNow();

    }

    protected static List<String> readFile()
    {
        List<String> lines = new ArrayList<String>();
        try
        {
            String filename = "/temp/data.dat";
            FileReader fileReader = new FileReader(filename );
            BufferedReader bufferedReader = new BufferedReader(fileReader);
            String line = null;
            while ((line = bufferedReader.readLine()) != null) {
                lines.add(line);
            }
            bufferedReader.close();
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        return lines;
    }
}

public class MyCustomThread implements Runnable
{

    String line;

    MyCustomThread(String line)
    {
        this.line = line;
    }

    @Override
    public void run()
    {
        System.out.println(Thread.currentThread().getName() + " processed line:" + line);

    }

}

EDIT: This implementation does NOT block on the ExecutorService submit. What I mean by this is that a new instance of MyCustomThread is created for every line in the file regardless of whether any previously submitted MyCustomThreads have completed. You could add a blocking / limiting worker queue to prevent this.

ExecutorService executor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LimitedQueue<Runnable>(10));

An example of a blocking / limiting queue implementation can be found here:

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top