Pregunta

Estoy intentando escribir un rastreador web multiproceso.

Mi clase principal de entrada tiene el siguiente código:

ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers);
while(true){
    URL url = frontier.get();
    if(url == null)
         return;
exec.execute(new URLCrawler(this, url));
}

El URLCrawler obtiene la URL especificada, analiza los enlaces HTML extractos de ella, y los horarios de enlaces que no se ven de nuevo a la frontera.

Una frontera es una cola de URL no rastreadas. El problema es cómo escribir el método get (). Si la cola está vacía, se debe esperar a que terminen las URLCrawlers y vuelva a intentarlo. Debe devolver nulo sólo cuando la cola está vacía y no hay actualmente URLCrawler activo.

Mi primera idea era utilizar un AtomicInteger para contar el número actual de URLCrawlers de trabajo y un objeto auxiliar para notifyAll () / espera () llamadas. Cada rastreador en incrementos de inicio del número de URLCrawlers de trabajo actuales, y al salir lo decrementa, y notificar el objeto que ha completado.

Pero leí que notificar () / notify () y wait () son métodos para hacer la comunicación hilo obsoleto algo.

¿Qué debo usar en este modelo de trabajo? Es similar a los productores M y N los consumidores, la pregunta es cómo hacer frente a exaustion de los productores.

¿Fue útil?

Solución

Creo uso de espera / Notify se está justificada en este caso. No se puede pensar en ninguna manera directa de hacer esto utilizando j.u.c.
En una clase, vamos a Coordinador de llamada:

private final int numOfCrawlers;
private int waiting;

public boolean shouldTryAgain(){
    synchronized(this){
        waiting++;
        if(waiting>=numOfCrawlers){
            //Everybody is waiting, terminate
            return false;
        }else{
            wait();//spurious wake up is okay
            //waked up for whatever reason. Try again
            waiting--;
            return true;
        }
    }

public void hasEnqueued(){
    synchronized(this){
        notifyAll();
    }
} 

entonces,

ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers);
while(true){
    URL url = frontier.get();
    if(url == null){
        if(!coordinator.shouldTryAgain()){
            //all threads are waiting. No possibility of new jobs.
            return;
        }else{
            //Possible that there are other jobs. Try again
            continue;
        }
    }
    exec.execute(new URLCrawler(this, url));
}//while(true)

Otros consejos

No estoy seguro de entender su diseño, pero esto puede ser un trabajo para un Semaphore

Una opción es hacer "frontera" una cola de bloqueo, así que cualquier hilo de tratar de "llegar" de que bloqueará. Tan pronto como cualquier otro pone URLCrawler objetos en esa cola, otros temas serán notificados automáticamente (con el objeto quita de la cola)

creo que un bloque de construcción básico para su caso de uso es un "enganche", similar a CountDownLatch, pero a diferencia de CountDownLatch, uno que permite incrementación el conteo también.

Una interfaz para tal un pestillo podría ser

public interface Latch {
    public void countDown();
    public void countUp();
    public void await() throws InterruptedException;
    public int getCount();
}

Los valores válidos para el recuento sería 0 en adelante. El método Await () dejaría a bloquear hasta que el recuento se pone a cero.

Si usted tiene tal un pestillo, su caso de uso puede ser descrito con bastante facilidad. También sospecho la cola (frontera) se pueden eliminar en esta solución (ejecutor ofrece uno de todos modos por lo que es algo redundante). Me gustaría reescribir su rutina principal como

ExecutorService executor = Executors.newFixedThreadPool(numberOfCrawlers);
Latch latch = ...; // instantiate a latch
URL[] initialUrls = ...;
for (URL url: initialUrls) {
    executor.execute(new URLCrawler(this, url, latch));
}
// now wait for all crawling tasks to finish
latch.await();

Su URLCrawler usaría el pestillo de esta manera:

public class URLCrawler implements Runnable {
    private final Latch latch;

    public URLCrawler(..., Latch l) {
        ...
        latch = l;
        latch.countUp(); // increment the count as early as possible
    }

    public void run() {
        try {
            List<URL> secondaryUrls = crawl();
            for (URL url: secondaryUrls) {
                // submit new tasks directly
                executor.execute(new URLCrawler(..., latch));
            }
        } finally {
            // as a last step, decrement the count
            latch.countDown();
        }
    }
}

En cuanto a las implementaciones de enganche, no puede haber una serie de implementaciones posibles, que van desde uno que se basa en espera () y notify (), que utiliza bloqueo y Condiciones, a una aplicación que utiliza el AbstractQueuedSynchronizer. Todas estas implementaciones, creo que sería bastante sencillo. Tenga en cuenta que la espera () - notifyAll () versión y la versión Lock-Condición estarían basadas en la exclusión mutua, mientras que la versión AQS utilizaría CAS (de comparación y de intercambio), por lo que podrían escalar mejor en ciertas situaciones

La pregunta es un poco viejo, pero creo que he encontrado algo simple, solución de trabajo:

Ampliación de la clase ThreadPoolExecutor como a continuación. La nueva funcionalidad es mantener el recuento tarea activa (por desgracia, siempre getActiveCount() no es fiable). Si las tareas taskCount.get() == 0 y no hay más en cola, significa que no hay nada que hacer y cierra ejecutor hacia abajo. Usted tiene sus criterios de salida. Además, si crea su albacea, pero no presenta ninguna tarea, no va a bloquear:

public class CrawlingThreadPoolExecutor extends ThreadPoolExecutor {

    private final AtomicInteger taskCount = new AtomicInteger();

    public CrawlingThreadPoolExecutor() {
        super(8, 8, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {

        super.beforeExecute(t, r);
        taskCount.incrementAndGet();
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {

        super.afterExecute(r, t);
        taskCount.decrementAndGet();
        if (getQueue().isEmpty() && taskCount.get() == 0) {
            shutdown();
        }
    }
}

Una cosa más que tienes que hacer es poner en práctica su Runnable de una manera que mantiene referencia a Executor está utilizando con el fin de poder presentar nuevas tareas. Aquí está una maqueta:

public class MockFetcher implements Runnable {

    private final String url;
    private final Executor e;

    public MockFetcher(final Executor e, final String url) {
        this.e = e;
        this.url = url;
    }

    @Override
    public void run() {
        final List<String> newUrls = new ArrayList<>();
        // Parse doc and build url list, and then:
        for (final String newUrl : newUrls) {
            e.execute(new MockFetcher(this.e, newUrl));
        }
    }
}

Me gustaría sugerir un AdaptiveExecuter. Sobre la base de un valor característico, se puede elegir para serializar o parallalize un hilo de ejecución. En el ejemplo siguiente, PUID es una cadena / objeto que quería utilizar para tomar esa decisión. Puede alterar la lógica según su código. Algunas porciones de código se comentan para permitir experimentos adicionales.

implementos AdaptiveExecutor clase Ejecutor {     tareas finales de cola = nuevo LinkedBlockingQueue ();     Ejecutable activo;     // ExecutorService threadExecutor = Executors.newCachedThreadPool ();     estática ExecutorService threadExecutor = Executors.newFixedThreadPool (4);

AdaptiveExecutor() {
    System.out.println("Initial Queue Size=" + tasks.size());
}

public void execute(final Runnable r) {
    /* if immediate start is needed do either of below two
    new Thread(r).start();

    try {
        threadExecutor.execute(r);
    } catch(RejectedExecutionException rEE ) {
        System.out.println("Thread Rejected " + new Thread(r).getName());
    }

    */


    tasks.offer(r); // otherwise, queue them up
    scheduleNext(new Thread(r)); // and kick next thread either serial or parallel.
    /*
    tasks.offer(new Runnable() {
        public void run() {
            try {
                r.run();
            } finally {
                scheduleNext();
            }
        }
    });
    */
    if ((active == null)&& !tasks.isEmpty()) {
        active = tasks.poll();
        try {
            threadExecutor.submit(active);
        } catch (RejectedExecutionException rEE) {
            System.out.println("Thread Rejected " + new Thread(r).getName());
        }
    }

    /*
    if ((active == null)&& !tasks.isEmpty()) {
        scheduleNext();
    } else tasks.offer(r);
    */
    //tasks.offer(r);

    //System.out.println("Queue Size=" + tasks.size());

}

private void serialize(Thread th) {
    try {
        Thread activeThread = new Thread(active);

        th.wait(200);
        threadExecutor.submit(th);
    } catch (InterruptedException iEx) {

    }
    /*
    active=tasks.poll();
    System.out.println("active thread is " +  active.toString() );
    threadExecutor.execute(active);
    */
}

private void parallalize() {
    if(null!=active)
        threadExecutor.submit(active);
}

protected void scheduleNext(Thread r) {
    //System.out.println("scheduleNext called") ;
    if(false==compareKeys(r,new Thread(active)))
        parallalize();
    else serialize(r);
}

private boolean compareKeys(Thread r, Thread active) {
    // TODO: obtain names of threads. If they contain same PUID, serialize them.
    if(null==active)
        return true; // first thread should be serialized
    else return false;  //rest all go parallel, unless logic controlls it
}

}

Licenciado bajo: CC-BY-SA con atribución
No afiliado a StackOverflow
scroll top