Question

In the Observer Design Pattern, the subject notifies all observers by calling the update() operation of each observer. One way of doing this is

void notify() {
   for (observer: observers) {
      observer.update(this);
   }
}

But the problem here is each observer is updated in a sequence and update operation for an observer might not be called till all the observers before it is updated. If there is an observer that has an infinite loop for update then all the observer after it will never be notified.

Question:

  1. Is there a way to get around this problem?
  2. If so what would be a good example?
Was it helpful?

Solution

Classic design patterns do not involve parallelism and threading. You'd have to spawn N threads for the N observers. Be careful though since their interaction to this will have to be done in a thread safe manner.

OTHER TIPS

The problem is the infinite loop, not the one-after-the-other notifications.

If you wanted things to update concurrently, you'd need to fire things off on different threads - in which case, each listener would need to synchronize with the others in order to access the object that fired the event.

Complaining about one infinite loop stopping other updates from happening is like complaining that taking a lock and then going into an infinite loop stops others from accessing the locked object - the problem is the infinite loop, not the lock manager.

You could make use of the java.utils.concurrent.Executors.newFixedThreadPool(int nThreads) method, then call the invokeAll method (could make use of the one with the timout too to avoid the infinite loop).

You would change your loop to add a class that is Callable that takes the "observer" and the "this" and then call the update method in the "call" method.

Take a look at this package for more info.

This is a quick and dirty implementation of what I was talking about:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Main
{
    private Main()
    {
    }

    public static void main(final String[] argv)
    {
        final Watched       watched;
        final List<Watcher> watchers;

        watched = new Watched();
        watchers = makeWatchers(watched, 10);
        watched.notifyWatchers(9);
    }

    private static List<Watcher> makeWatchers(final Watched watched,
                                              final int     count)
    {
        final List<Watcher> watchers;

        watchers = new ArrayList<Watcher>(count);

        for(int i = 0; i < count; i++)
        {
            final Watcher watcher;

            watcher = new Watcher(i + 1);
            watched.addWatcher(watcher);
            watchers.add(watcher);
        }

        return (watchers);
    }
}

class Watched
{
    private final List<Watcher> watchers;

    {
        watchers = new ArrayList<Watcher>();
    }

    public void addWatcher(final Watcher watcher)
    {
        watchers.add(watcher);
    }

    public void notifyWatchers(final int seconds)
    {
        final List<Watcher>         currentWatchers;
        final List<WatcherCallable> callables;
        final ExecutorService       service;

        currentWatchers = new CopyOnWriteArrayList<Watcher>(watchers);
        callables       = new ArrayList<WatcherCallable>(currentWatchers.size());

        for(final Watcher watcher : currentWatchers)
        {
            final WatcherCallable callable;

            callable = new WatcherCallable(watcher);
            callables.add(callable);
        }

        service = Executors.newFixedThreadPool(callables.size());

        try
        {
            final boolean value;

            service.invokeAll(callables, seconds, TimeUnit.SECONDS);
            value = service.awaitTermination(seconds, TimeUnit.SECONDS);
            System.out.println("done: " + value);
        }
        catch (InterruptedException ex)
        {
        }

        service.shutdown();
        System.out.println("leaving");
    }

    private class WatcherCallable
        implements Callable<Void>
    {
        private final Watcher watcher;

        WatcherCallable(final Watcher w)
        {
            watcher = w;
        }

        public Void call()
        {
            watcher.update(Watched.this);
            return (null);
        }
    }
}

class Watcher
{
    private final int value;

    Watcher(final int val)
    {
        value = val;
    }

    public void update(final Watched watched)
    {
        try
        {
            Thread.sleep(value * 1000);
        }
        catch (InterruptedException ex)
        {
            System.out.println(value + "interupted");
        }

        System.out.println(value + " done");
    }
}

I'd be more concerned about the observer throwing an exception than about it looping indefinitely. Your current implementation would not notify the remaining observers in such an event.

1. Is there a way to get around this problem?

Yes, make sure the observer work fine and return in a timely fashion.

2. Can someone please explain it with an example.

Sure:

class ObserverImpl implements Observer {
     public void update( Object state ) {
            // remove the infinite loop.
            //while( true ) {
            //   doSomething();
            //}

            // and use some kind of control:
            int iterationControl = 100;
            int currentIteration = 0;
            while( curentIteration++ < iterationControl ) {
                 doSomething();
            }
     }
     private void doSomething(){}
}

This one prevent from a given loop to go infinite ( if it makes sense, it should run at most 100 times )

Other mechanism is to start the new task in a second thread, but if it goes into an infinite loop it will eventually consume all the system memory:

class ObserverImpl implements Observer {
     public void update( Object state ) {
         new Thread( new Runnable(){ 
             public void run() {
                 while( true ) {
                     doSomething();
                 }
             }
          }).start();
     }
     private void doSomething(){}
}

That will make the that observer instance to return immediately, but it will be only an illusion, what you have to actually do is to avoid the infinite loop.

Finally, if your observers work fine but you just want to notify them all sooner, you can take a look at this related question: Invoke a code after all mouse event listeners are executed..

All observers get notified, that's all the guarantee you get.

If you want to implement some fancy ordering, you can do that:

  • Connect just a single Observer;
  • have this primary Observer notify his friends in an order you define in code or by some other means.

That takes you away from the classic Observer pattern in that your listeners are hardwired, but if it's what you need... do it!

If you have an observer with an "infinite loop", it's no longer really the observer pattern.

You could fire a different thread to each observer, but the observers MUST be prohibited from changing the state on the observed object.

The simplest (and stupidest) method would simply be to take your example and make it threaded.

void notify() {
   for (observer: observers) {
      new Thread(){
          public static void run() {
              observer.update(this);
          } 
      }.start();
   }
}

(this was coded by hand, is untested and probably has a bug or five--and it's a bad idea anyway)

The problem with this is that it will make your machine chunky since it has to allocate a bunch of new threads at once.

So to fix the problem with all the treads starting at once, use a ThreadPoolExecutor because it will A) recycle threads, and B) can limit the max number of threads running.

This is not deterministic in your case of "Loop forever" since each forever loop will permanently eat one of the threads from your pool.

Your best bet is to not allow them to loop forever, or if they must, have them create their own thread.

If you have to support classes that can't change, but you can identify which will run quickly and which will run "Forever" (in computer terms I think that equates to more than a second or two) then you COULD use a loop like this:

void notify() {
   for (observer: observers) {
      if(willUpdateQuickly(observer))
          observer.update(this);
      else
          new Thread(){
              public static void run() {
                  observer.update(this);
              } 
          }.start();
   }
}

Hey, if it actually "Loops forever", will it consume a thread for every notification? It really sounds like you may have to spend some more time on your design.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top