Question

I'm trying to write a multithreaded web crawler.

My main entry class has the following code:

ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers);
while(true){
    URL url = frontier.get();
    if(url == null)
         return;
exec.execute(new URLCrawler(this, url));
}

The URLCrawler fetches the specified URL, parses the HTML extracts links from it, and schedules unseen links back to frontier.

A frontier is a queue of uncrawled URLs. The problem is how to write the get() method. If the queue is empty, it should wait until any URLCrawlers finish and then try again. It should return null only when the queue is empty and there is no currently active URLCrawler.

My first idea was to use an AtomicInteger for counting current number of working URLCrawlers and an auxiliary object for notifyAll()/wait() calls. Each crawler on start increments the number of current working URLCrawlers, and on exit decrements it, and notify the object that it has completed.

But I read that notify()/notifyAll() and wait() are somewhat deprecated methods to do thread communication.

What should I use in this work pattern? It is similar to M producers and N consumers, the question is how to deal with exaustion of producers.

Was it helpful?

Solution

I think use of wait/notify is justified in this case. Can't think of any straight forward way to do this using j.u.c.
In a class, let's call Coordinator:

private final int numOfCrawlers;
private int waiting;

public boolean shouldTryAgain(){
    synchronized(this){
        waiting++;
        if(waiting>=numOfCrawlers){
            //Everybody is waiting, terminate
            return false;
        }else{
            wait();//spurious wake up is okay
            //waked up for whatever reason. Try again
            waiting--;
            return true;
        }
    }

public void hasEnqueued(){
    synchronized(this){
        notifyAll();
    }
} 

then,

ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers);
while(true){
    URL url = frontier.get();
    if(url == null){
        if(!coordinator.shouldTryAgain()){
            //all threads are waiting. No possibility of new jobs.
            return;
        }else{
            //Possible that there are other jobs. Try again
            continue;
        }
    }
    exec.execute(new URLCrawler(this, url));
}//while(true)

OTHER TIPS

I am not sure I understand your design, but this may be a job for a Semaphore

One option is to make "frontier" a blocking queue, So any thread trying to "get" from it will block. As soon as any other URLCrawler puts objects into that queue, any other threads will be automatically notified (with the object dequeued)

I think a basic building block for your use case is a "latch", similar to CountDownLatch, but unlike CountDownLatch, one that permits incrementing the count as well.

An interface for such a latch might be

public interface Latch {
    public void countDown();
    public void countUp();
    public void await() throws InterruptedException;
    public int getCount();
}

Legal values for counts would be 0 and up. The await() method would let you block until the count goes down to zero.

If you have such a latch, your use case can be described fairly easily. I also suspect the queue (frontier) can be eliminated in this solution (executor provides one anyway so it's somewhat redundant). I would rewrite your main routine as

ExecutorService executor = Executors.newFixedThreadPool(numberOfCrawlers);
Latch latch = ...; // instantiate a latch
URL[] initialUrls = ...;
for (URL url: initialUrls) {
    executor.execute(new URLCrawler(this, url, latch));
}
// now wait for all crawling tasks to finish
latch.await();

Your URLCrawler would use the latch in this manner:

public class URLCrawler implements Runnable {
    private final Latch latch;

    public URLCrawler(..., Latch l) {
        ...
        latch = l;
        latch.countUp(); // increment the count as early as possible
    }

    public void run() {
        try {
            List<URL> secondaryUrls = crawl();
            for (URL url: secondaryUrls) {
                // submit new tasks directly
                executor.execute(new URLCrawler(..., latch));
            }
        } finally {
            // as a last step, decrement the count
            latch.countDown();
        }
    }
}

As for the latch implementations, there can be a number of possible implementations, ranging from one that's based on wait() and notifyAll(), one that uses Lock and Condition, to an implementation that uses the AbstractQueuedSynchronizer. All of these implementations I think would be pretty straightforward. Note that the wait()-notifyAll() version and the Lock-Condition version would be based on mutual exclusion, whereas the AQS version would utilize CAS (compare-and-swap), and thus might scale better under certain situations.

The question is a bit old, but I think i have found some simple, working solution:

Extend the ThreadPoolExecutor class like below. The new functionality is keeping the active task count (unfortunately, provided getActiveCount() is unreliable). If taskCount.get() == 0 and there are no more queued tasks, it means that there is nothing to be done and executor shuts down. You have your exit criteria. Also, if you create your executor, but fail to submit any tasks, it won't block:

public class CrawlingThreadPoolExecutor extends ThreadPoolExecutor {

    private final AtomicInteger taskCount = new AtomicInteger();

    public CrawlingThreadPoolExecutor() {
        super(8, 8, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {

        super.beforeExecute(t, r);
        taskCount.incrementAndGet();
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {

        super.afterExecute(r, t);
        taskCount.decrementAndGet();
        if (getQueue().isEmpty() && taskCount.get() == 0) {
            shutdown();
        }
    }
}

One more thing you have to do is implement your Runnable in a way it keeps reference to Executor you are using in order to be able to submit new tasks. Here is a mock:

public class MockFetcher implements Runnable {

    private final String url;
    private final Executor e;

    public MockFetcher(final Executor e, final String url) {
        this.e = e;
        this.url = url;
    }

    @Override
    public void run() {
        final List<String> newUrls = new ArrayList<>();
        // Parse doc and build url list, and then:
        for (final String newUrl : newUrls) {
            e.execute(new MockFetcher(this.e, newUrl));
        }
    }
}

I'd like to suggest an AdaptiveExecuter. Based on a characteristic value, you can choose to serialize or parallalize a thread for execution. In the sample below, PUID is a string/object that I wanted to use to make that decision. You can alter the logic to suit your code. Some portions of code are commented to allow further experiments.

class AdaptiveExecutor implements Executor { final Queue tasks = new LinkedBlockingQueue(); Runnable active ; //ExecutorService threadExecutor=Executors.newCachedThreadPool(); static ExecutorService threadExecutor=Executors.newFixedThreadPool(4);

AdaptiveExecutor() {
    System.out.println("Initial Queue Size=" + tasks.size());
}

public void execute(final Runnable r) {
    /* if immediate start is needed do either of below two
    new Thread(r).start();

    try {
        threadExecutor.execute(r);
    } catch(RejectedExecutionException rEE ) {
        System.out.println("Thread Rejected " + new Thread(r).getName());
    }

    */


    tasks.offer(r); // otherwise, queue them up
    scheduleNext(new Thread(r)); // and kick next thread either serial or parallel.
    /*
    tasks.offer(new Runnable() {
        public void run() {
            try {
                r.run();
            } finally {
                scheduleNext();
            }
        }
    });
    */
    if ((active == null)&& !tasks.isEmpty()) {
        active = tasks.poll();
        try {
            threadExecutor.submit(active);
        } catch (RejectedExecutionException rEE) {
            System.out.println("Thread Rejected " + new Thread(r).getName());
        }
    }

    /*
    if ((active == null)&& !tasks.isEmpty()) {
        scheduleNext();
    } else tasks.offer(r);
    */
    //tasks.offer(r);

    //System.out.println("Queue Size=" + tasks.size());

}

private void serialize(Thread th) {
    try {
        Thread activeThread = new Thread(active);

        th.wait(200);
        threadExecutor.submit(th);
    } catch (InterruptedException iEx) {

    }
    /*
    active=tasks.poll();
    System.out.println("active thread is " +  active.toString() );
    threadExecutor.execute(active);
    */
}

private void parallalize() {
    if(null!=active)
        threadExecutor.submit(active);
}

protected void scheduleNext(Thread r) {
    //System.out.println("scheduleNext called") ;
    if(false==compareKeys(r,new Thread(active)))
        parallalize();
    else serialize(r);
}

private boolean compareKeys(Thread r, Thread active) {
    // TODO: obtain names of threads. If they contain same PUID, serialize them.
    if(null==active)
        return true; // first thread should be serialized
    else return false;  //rest all go parallel, unless logic controlls it
}

}

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top