Question

I have a string parser (parsing large text blobs) that needs to be run in a java fork join pool. The pool is faster than other threading and has reduced my parsing time by over 30 minutes when using both regular expressions and xpath. However, the number of threads being created climbs dramatically and I need to be able to terminate them since the thread pool is called multiple times. How can I reduce the increase in threads without limiting the pool to just 1 core on a 4 core system?

My thread count is exceeding 40000 and I need it to be closer to 5000 since the program is running 10 times with a stone cold execution limit of 50000 threads for my user.

This issue is happening on both Windows and Linux.

I am:

  • setting the max processors to the number of available processors*configurable number which is currently 1
  • cancelling tasks after get() is called
  • desperately setting the forkjoin pool to null before reinstantiating because I am desperate

Any Help would be appreciated. Thanks.

Here is the code I am using to stop, get and restart the pool. I should probably also note that I am submitting each task with fjp.submit(TASK) and then invoking them all at shutdown.

while(pages.size()>0) { log.info("Currently Active Threads: "+Thread.activeCount()); log.info("Pages Found in the Iteration "+j+": "+pages.size());

        if(fjp.isShutdown())
        {
            fjp=new ForkJoinPool(Runtime.getRuntime().availableProcessors()*procnum);
        }

        i=0;
        //if asked to generate a hash, due this first
        if(getHash==true){
            log.info("Generating Hash");
            int s=pages.size();
            while(i<s){
                String withhash=null;
                String str=pages.get(0);

                if(str != null){
                    jmap=Json.read(str).asJsonMap();
                    jmap.put("offenderhash",Json.read(genHash(jmap.get("offenderhash").asString()+i)));

                    for(String k:jmap.keySet()){
                        withhash=(withhash==null)?"{\""+k+"\":\""+jmap.get(k).asString()+"\"":withhash+",\""+k+"\":\""+jmap.get(k).asString()+"\"";
                    }

                    if(withhash != null){
                        withhash+=",}";
                    }

                    pages.remove(0);
                    pages.add((pages.size()-1), withhash);
                    i++;
                }
            }
            i=0;
        }

        if(singlepats != null)
        {

        log.info("Found Singlepats");
        for(String row:pages)
        {   

            String str=row;
            str=str.replaceAll("\t|\r|\r\n|\n","");
            jmap=Json.read(str).asJsonMap();

            if(singlepats.containsKey("table"))
            {
                if(fjp.isShutdown())
                {
                    fjp=new ForkJoinPool((Runtime.getRuntime().availableProcessors()*procnum));
                }

                fjp=new ForkJoinPool((Runtime.getRuntime().availableProcessors()*procnum));

                if(jmap.get(column)!=null)
                {

                    if(test){
                        System.out.println("//////////////////////HTML////////////////////////\n"+jmap.get(column).asString()+"\n///////////////////////////////END///////////////////////////\n\n");
                    }

                    if(mustcontain != null)
                    {
                        if(jmap.get(column).asString().contains(mustcontain))
                        {
                            if(cannotcontain != null)
                            {
                                if(jmap.get(column).asString().contains(cannotcontain)==false)
                                results.add(fjp.submit(new ParsePage(replacementPattern,singlepats.get("table"),jmap.get(column).asString().replaceAll("\\s\\s", " "),singlepats, Calendar.getInstance().getTime().toString(), jmap.get("offenderhash").asString())));
                            }
                            else
                            {
                                results.add(fjp.submit(new ParsePage(replacementPattern,singlepats.get("table"),jmap.get(column).asString().replaceAll("\\s\\s", " "),singlepats, Calendar.getInstance().getTime().toString(), jmap.get("offenderhash").asString())));
                            }
                        }
                    }
                    else if(cannotcontain != null)
                    {
                        if(jmap.get(column).asString().contains(cannotcontain)==false)
                        {
                            results.add(fjp.submit(new ParsePage(replacementPattern,singlepats.get("table"),jmap.get(column).asString().replaceAll("\\s\\s", " "),singlepats, Calendar.getInstance().getTime().toString(), jmap.get("offenderhash").asString())));
                        }
                    }
                    else
                    {
                        results.add(fjp.submit(new ParsePage(replacementPattern,singlepats.get("table"),jmap.get(column).asString().replaceAll("\\s\\s", " "),singlepats, Calendar.getInstance().getTime().toString(), jmap.get("offenderhash").asString())));
                    }
                }
            }

            i++;

            if(((i%commit_size)==0 & i != 0) | i==pages.size() |pages.size()==1 & singlepats != null)
            {
                log.info("Getting Regex Results");

                log.info("Shutdown");

                try {
                    fjp.awaitTermination(termtime, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e1) {
                    // TODO Auto-generated catch block
                    e1.printStackTrace();
                }

                fjp.shutdown();
                while(fjp.isTerminated()==false)
                {
                    try{
                        Thread.sleep(5);
                    }catch(InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                }


                for(Future<String> r:results)
                {
                    try {
                        add=r.get();
                        if(add.contains("No Data")==false)
                        {
                            parsedrows.add(add);
                        }

                        add=null;
                        if(r.isDone()==false)
                        {
                            r.cancel(true);
                        }

                        if(fjp.getActiveThreadCount()>0 && fjp.getRunningThreadCount()>0)
                        {
                            fjp.shutdownNow();
                        }

                        fjp=new ForkJoinPool(Runtime.getRuntime().availableProcessors()*procnum);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }

                results=new ArrayList<ForkJoinTask<String>>();

                if(parsedrows.size()>=commit_size)
                {

                    if(parsedrows.size()>=SPLITSIZE)
                    {
                        sendToDb(parsedrows,true);
                    }
                    else
                    {
                        sendToDb(parsedrows,false);
                    }

                    parsedrows=new ArrayList<String>();
                }


                //hint to the gc in case it actually pays off (think if i were a gambling man)
                System.gc();
                Runtime.getRuntime().gc();
            }


        }
        }
        log.info("REMAINING ROWS TO COMMIT "+parsedrows.size());
        log.info("Rows Left"+parsedrows.size());
        if(parsedrows.size()>0)
        {


            if(parsedrows.size()>=SPLITSIZE)
            {
                sendToDb(parsedrows,true);
            }
            else
            {
                sendToDb(parsedrows,false);
            }


            parsedrows=new ArrayList<String>();
        }

        records+=i;
        i=0;

//Query for more records to parse
Was it helpful?

Solution

It looks like you're making a new ForkJoinPool for every result. What you really want to do is make a single ForkJoinPool that all your tasks will share. Extra pools won't make extra parallelism available, so one should be fine. When you get a task that is ready to run take your fjp and call fjp.execute(ForkJoinTask) or ForkJoinTask.fork() if you're in a task already.

Making multiple pools seems like a bookkeeping nightmare. Try to get away with just one that's shared.

OTHER TIPS

You are probably using join() in Java7. Join doesn't work. It requires a context switch and Java programs can't do a context switch so the framework creates "continuation threads" to keep moving. I detailed that problem several years ago in this article: ForkJoin Clamamity

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top