Domanda

Sto cercando di scrivere un web crawler multithread.

La mia classe di ingresso principale ha il seguente codice:

ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers);
while(true){
    URL url = frontier.get();
    if(url == null)
         return;
exec.execute(new URLCrawler(this, url));
}

L'URLCrawler recupera l'URL specificato, analizza i collegamenti estratti HTML da esso, e gli orari collegamenti invisibili Torna alla frontiera.

Una frontiera è una coda di URL non scansionati. Il problema è come scrivere il metodo get (). Se la coda è vuota, si deve attendere eventuali URLCrawlers finiscono e poi riprovare. Dovrebbe ritorno nullo solo quando la coda è vuota e non c'è attualmente URLCrawler attivo.

La mia prima idea era quella di utilizzare un AtomicInteger per il conteggio attuale numero di URLCrawlers di lavoro e un oggetto ausiliario per notifyAll () / wait () chiamate. Ogni crawler sul incrementi inizio del numero di URLCrawlers di lavoro attuali, e in uscita decrementa di esso, e notificare l'oggetto che ha completato.

Ma ho letto che notify () / notifyAll () e wait () sono un po 'i metodi per fare comunicazione filo deprecato.

Che cosa devo usare in questo modello di lavoro? E 'simile a produttori M e N consumatori, la questione è come affrontare exaustion dei produttori.

È stato utile?

Soluzione

Credo uso di wait / notify è giustificata in questo caso. Non riesco a pensare a un modo dritto in avanti per fare questo usando j.u.c.
In una classe, di lasciare chiamata Coordinatore:

private final int numOfCrawlers;
private int waiting;

public boolean shouldTryAgain(){
    synchronized(this){
        waiting++;
        if(waiting>=numOfCrawlers){
            //Everybody is waiting, terminate
            return false;
        }else{
            wait();//spurious wake up is okay
            //waked up for whatever reason. Try again
            waiting--;
            return true;
        }
    }

public void hasEnqueued(){
    synchronized(this){
        notifyAll();
    }
} 

poi,

ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers);
while(true){
    URL url = frontier.get();
    if(url == null){
        if(!coordinator.shouldTryAgain()){
            //all threads are waiting. No possibility of new jobs.
            return;
        }else{
            //Possible that there are other jobs. Try again
            continue;
        }
    }
    exec.execute(new URLCrawler(this, url));
}//while(true)

Altri suggerimenti

Io non sono sicuro di aver capito il vostro disegno, ma questo può essere un lavoro per un Semaphore

Una possibilità è quella di rendere "frontiera" una coda di blocco, quindi ogni discussione cercando di "ottenere" da esso bloccherà. Non appena tutti gli altri mette URLCrawler gli oggetti in quella coda, eventuali altri thread saranno informati automaticamente (con l'oggetto rimosse dalla coda)

Penso che un blocco di base per il vostro caso d'uso è un "latch", simile a CountDownLatch, ma a differenza di CountDownLatch, quella che permette di incremento il conteggio pure.

Un interfaccia per un tale dispositivo di chiusura potrebbe essere

public interface Latch {
    public void countDown();
    public void countUp();
    public void await() throws InterruptedException;
    public int getCount();
}

I valori validi per i conteggi sarebbero 0 e fino. Il metodo await () avrebbe permesso di bloccare fino a quando il conteggio scende a zero.

Se si dispone di un tale dispositivo di chiusura, il tuo caso d'uso può essere descritto abbastanza facilmente. Sospetto anche la coda (frontiera) può essere eliminato in questa soluzione (esecutore fornisce uno comunque quindi è un po 'ridondante). Vorrei riscrivere la routine principale come

ExecutorService executor = Executors.newFixedThreadPool(numberOfCrawlers);
Latch latch = ...; // instantiate a latch
URL[] initialUrls = ...;
for (URL url: initialUrls) {
    executor.execute(new URLCrawler(this, url, latch));
}
// now wait for all crawling tasks to finish
latch.await();

Il tuo URLCrawler avrebbe usato il fermo in questo modo:

public class URLCrawler implements Runnable {
    private final Latch latch;

    public URLCrawler(..., Latch l) {
        ...
        latch = l;
        latch.countUp(); // increment the count as early as possible
    }

    public void run() {
        try {
            List<URL> secondaryUrls = crawl();
            for (URL url: secondaryUrls) {
                // submit new tasks directly
                executor.execute(new URLCrawler(..., latch));
            }
        } finally {
            // as a last step, decrement the count
            latch.countDown();
        }
    }
}

Per quanto riguarda le implementazioni fermo, non ci può essere una serie di possibili implementazioni, che vanno da uno che si basa su wait () e notifyAll (), uno che utilizza Blocco e Condizioni, a un'implementazione che utilizza l'AbstractQueuedSynchronizer. Tutte queste implementazioni penso che sarebbe piuttosto semplice. Si noti che l'attesa () - notifyAll () la versione e la versione di Lock-Condizione sarebbero basati su mutua esclusione, mentre la versione AQS utilizzerebbe CAS (confrontare-e-swap), e, quindi, potrebbe scalare meglio in determinate situazioni

La questione è un po 'vecchio, ma penso che ho trovato un po' semplice, soluzione di lavoro:

Estendere la classe ThreadPoolExecutor come qui di seguito. La nuova funzionalità è mantenere il conteggio attività attiva (purtroppo, a condizione getActiveCount() è inaffidabile). Se le attività taskCount.get() == 0 e non ci sono più in coda, vuol dire che non c'è niente da fare e chiude executor verso il basso. Avete i vostri criteri di uscita. Inoltre, se si crea vostro esecutore, ma non riescono a presentare eventuali compiti, esso non bloccare:

public class CrawlingThreadPoolExecutor extends ThreadPoolExecutor {

    private final AtomicInteger taskCount = new AtomicInteger();

    public CrawlingThreadPoolExecutor() {
        super(8, 8, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {

        super.beforeExecute(t, r);
        taskCount.incrementAndGet();
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {

        super.afterExecute(r, t);
        taskCount.decrementAndGet();
        if (getQueue().isEmpty() && taskCount.get() == 0) {
            shutdown();
        }
    }
}

Una cosa che dovete fare è implementare il Runnable in modo mantiene riferimento alla Executor che si sta utilizzando in modo da essere in grado di presentare nuovi compiti. Ecco un finto:

public class MockFetcher implements Runnable {

    private final String url;
    private final Executor e;

    public MockFetcher(final Executor e, final String url) {
        this.e = e;
        this.url = url;
    }

    @Override
    public void run() {
        final List<String> newUrls = new ArrayList<>();
        // Parse doc and build url list, and then:
        for (final String newUrl : newUrls) {
            e.execute(new MockFetcher(this.e, newUrl));
        }
    }
}

Mi piacerebbe suggerire un AdaptiveExecuter. Sulla base di un valore caratteristico, è possibile scegliere di serializzare o parallalize un thread per l'esecuzione. Nell'esempio qui sotto, PUID è una stringa / oggetto che ho voluto usare per prendere questa decisione. È possibile modificare la logica per soddisfare il vostro codice. Alcune porzioni di codice sono commentati per consentire ulteriori esperimenti.

attrezzi class AdaptiveExecutor Esecutore {     operazioni finali coda = nuovo LinkedBlockingQueue ();     Runnable attivo;     // ExecutorService threadExecutor = Executors.newCachedThreadPool ();     static ExecutorService threadExecutor = Executors.newFixedThreadPool (4);

AdaptiveExecutor() {
    System.out.println("Initial Queue Size=" + tasks.size());
}

public void execute(final Runnable r) {
    /* if immediate start is needed do either of below two
    new Thread(r).start();

    try {
        threadExecutor.execute(r);
    } catch(RejectedExecutionException rEE ) {
        System.out.println("Thread Rejected " + new Thread(r).getName());
    }

    */


    tasks.offer(r); // otherwise, queue them up
    scheduleNext(new Thread(r)); // and kick next thread either serial or parallel.
    /*
    tasks.offer(new Runnable() {
        public void run() {
            try {
                r.run();
            } finally {
                scheduleNext();
            }
        }
    });
    */
    if ((active == null)&& !tasks.isEmpty()) {
        active = tasks.poll();
        try {
            threadExecutor.submit(active);
        } catch (RejectedExecutionException rEE) {
            System.out.println("Thread Rejected " + new Thread(r).getName());
        }
    }

    /*
    if ((active == null)&& !tasks.isEmpty()) {
        scheduleNext();
    } else tasks.offer(r);
    */
    //tasks.offer(r);

    //System.out.println("Queue Size=" + tasks.size());

}

private void serialize(Thread th) {
    try {
        Thread activeThread = new Thread(active);

        th.wait(200);
        threadExecutor.submit(th);
    } catch (InterruptedException iEx) {

    }
    /*
    active=tasks.poll();
    System.out.println("active thread is " +  active.toString() );
    threadExecutor.execute(active);
    */
}

private void parallalize() {
    if(null!=active)
        threadExecutor.submit(active);
}

protected void scheduleNext(Thread r) {
    //System.out.println("scheduleNext called") ;
    if(false==compareKeys(r,new Thread(active)))
        parallalize();
    else serialize(r);
}

private boolean compareKeys(Thread r, Thread active) {
    // TODO: obtain names of threads. If they contain same PUID, serialize them.
    if(null==active)
        return true; // first thread should be serialized
    else return false;  //rest all go parallel, unless logic controlls it
}

}

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top