Pergunta

I cannot figure it out, how can I wake up RecursiveTasks, invoked by ForkJoinPool, when these tasks are on hold by wait method. Here is my simple example with the method MainRecursionClass.resume which is incorrect (does not wakes up RecursiveTasks).

public class Program {
   public static void main(String[] args) {
      Program p = new Program();
      final MainRecursionClass mrc = p.new MainRecursionClass();

      //Thread outputs integers to simulate work
      new Thread() {
         public void run() {
            mrc.doJob();
         }
      }.start();

      //Thread performs wait and notify on MainRecursionClass object
      p.new PauseResume(mrc).start();
   }

/**
 * 
 * This class performs suspend and resume operations to the MainRecursionClass class object
 *
 */
   private class PauseResume extends Thread {
      private MainRecursionClass rv;

      public PauseResume(MainRecursionClass rv) {
         this.rv = rv;
      }

      @Override
      public void run() {
         while(!isInterrupted()) {
            try {
               sleep(4000);

               rv.suspend();
               sleep(8000);

               rv.resume();
            } catch (InterruptedException e) {
               e.printStackTrace();
            }
         }
      }
   }

   private class MainRecursionClass {
      private boolean pause = false;
      private MyRecursive rv;

      public void doJob() {
         rv = new MyRecursive(0, 100000);
         ForkJoinPool pool = new ForkJoinPool();      
         pool.invoke(rv);
      }

      public void suspend() {
         pause = true;

         System.out.println("Suspended");
      }

      /**
       * This method is incorrect. It should wake up all MyRecursive instances to continue their work.
       */
      public synchronized void resume() {
         pause = false;
         notifyAll();

         System.out.println("Resumed");
      }

      private class MyRecursive extends RecursiveTask<Object> {
         private static final long serialVersionUID = 1L;
         private int start;
         private int length;
         private int threshold = 15;

         public MyRecursive(int start, int length) {
            super();
            this.start = start;
            this.length = length;
         }

         protected void computeDirectly() throws Exception {
            for (int index = start; index < start + length; index++) {
               //PAUSE
               synchronized (this) {
                  try {
                     while(pause) {
                        wait();
                     }
                  } catch (InterruptedException e) {
                     e.printStackTrace();
                  }
               }
               //PAUSE

               //some output to simulate work...
               System.out.println(index);

               Thread.sleep(1000);
            }
         }

         /**
          * Recursion
          */
         @Override
         protected Object compute() {
            if (length <= threshold) {
               try {
                  computeDirectly();
               } catch (Exception e) {
                  return e;
               }

               return null;
            }

            int split = length / 2;

            invokeAll(new MyRecursive(start, split),
                      new MyRecursive(start + split, length - split));

            return null;
         }
      }
   }
}
Foi útil?

Solução 2

Finally I came to this solution: I created List<MyRecursive> list = new ArrayList<>(); object in MainRecursionClass and added every MyRecursive instance, created recursively, in the list. The class MyRecursive has new method:

public synchronized void resume() {
   notify();
}

When, the method MainRecursionClass.resume(), which wakes up the threads, looks like this:

public void resume() {
   System.out.println("Resumed");

   pause = false;
   for(MyRecursive mr : list) {
      if(mr != null)
         mr.resume();
      }
   }
}

Outras dicas

You should not use wait/notify in the tasks running on a thread pool. If your pool is bounded, then it may cause thread starvation (a form of deadlock). If it is unbounded, too many threads can be created and main memory exhausted.

Instead, you should split your task in 2 (or more) and start subtasks according to their starting conditions. When you want a task to wait(), then refactor it so that current subtask exits, and next subtask is prepared to run.

Licenciado em: CC-BY-SA com atribuição
Não afiliado a StackOverflow
scroll top