Как реализовать простую потоковую обработку с фиксированным количеством рабочих потоков

StackOverflow https://stackoverflow.com/questions/125333

Вопрос

Я ищу самый простой и понятный способ реализовать следующее:

  • Основная программа создает работник темы для выполнения задачи.
  • Только n задачи могут выполняться одновременно.
  • Когда n достигнут, рабочие потоки больше не запускаются до тех пор, пока количество запущенных потоков не упадет ниже n.
Это было полезно?

Решение

Я думаю, что Исполнители.newFixedThreadPool соответствует вашим требованиям.Существует несколько различных способов использования результирующего ExecutorService, в зависимости от того, хотите ли вы, чтобы результат возвращался в основной поток, или задача является полностью автономной, и есть ли у вас набор задач для выполнения заранее, или задачи ставятся в очередь в ответ на какое-то событие.

  Collection<YourTask> tasks = new ArrayList<YourTask>();
  YourTask yt1 = new YourTask();
  ...
  tasks.add(yt1);
  ...
  ExecutorService exec = Executors.newFixedThreadPool(5);
  List<Future<YourResultType>> results = exec.invokeAll(tasks);

В качестве альтернативы, если у вас есть новая асинхронная задача для выполнения в ответ на какое-либо событие, вы, вероятно, просто захотите использовать простой ExecutorService execute(Runnable) способ.

Другие советы

/* Get an executor service that will run a maximum of 5 threads at a time: */
ExecutorService exec = Executors.newFixedThreadPool(5);
/* For all the 100 tasks to be done altogether... */
for (int i = 0; i < 100; i++) {
    /* ...execute the task to run concurrently as a runnable: */
    exec.execute(new Runnable() {
        public void run() {
            /* do the work to be done in its own thread */
            System.out.println("Running in: " + Thread.currentThread());
        }
    });
}
/* Tell the executor that after these 100 steps above, we will be done: */
exec.shutdown();
try {
    /* The tasks are now running concurrently. We wait until all work is done, 
     * with a timeout of 50 seconds: */
    boolean b = exec.awaitTermination(50, TimeUnit.SECONDS);
    /* If the execution timed out, false is returned: */
    System.out.println("All done: " + b);
} catch (InterruptedException e) { e.printStackTrace(); }

Исполнители.newFixedThreadPool(int)

Executor executor = Executors.newFixedThreadPool(n);

Runnable runnable = new Runnable() {
 public void run() {
  // do your thing here
 }
}

executor.execute(runnable);

Используйте фреймворк Executor;а именно newFixedThreadPool(N) Новый пул потоков (N)

  1. Если ваша очередь задач не будет неограниченной и задачи могут выполняться за более короткие промежутки времени, вы можете использовать Executors.newFixedThreadPool(n);как предполагают эксперты.

    Единственным недостатком этого решения является неограниченный размер очереди задач.Ты не можешь это контролировать.Огромное скопление в очереди задач приведет к снижению производительности приложения и может привести к нехватке памяти в некоторых сценариях.

  2. Если вы хотите использовать ExecutorService и включить work stealing механизм, при котором простаивающие рабочие потоки разделяют рабочую нагрузку с занятыми рабочими потоками, крадя задачи из очереди задач.Это вернет ForkJoinPool тип службы-исполнителя.

    публичный статический ExecutorService newWorkStealingPool(параллелизм значений int)

    Создает пул потоков, который поддерживает достаточное количество потоков для поддержки заданного уровня параллелизма и может использовать несколько очередей для уменьшения конкуренции.Уровень параллелизма соответствует максимальному количеству потоков, активно задействованных в обработке задач или доступных для участия.Фактическое количество потоков может динамически увеличиваться и уменьшаться.Пул, похищающий работу, не дает никаких гарантий относительно порядка выполнения отправленных задач.

  3. Я предпочитаю ThreadPoolExecutor благодаря гибкости API можно управлять многими парамет-рами, что контролирует выполнение потоковой задачи.

    ThreadPoolExecutor(int corePoolSize, 
                           int maximumPoolSize, 
                           long keepAliveTime, 
                           TimeUnit unit, 
                           BlockingQueue<Runnable> workQueue, 
                           ThreadFactory threadFactory,
                           RejectedExecutionHandler handler)
    

в вашем случае установите оба corePoolSize and maximumPoolSize as N.Здесь вы можете управлять размером очереди задач, определять свою собственную фабрику потоков и политику обработки отклонений.

Взгляните на соответствующий вопрос SE, чтобы динамически управлять размером пула:

Динамический пул потоков

Если вы хотите сделать свой собственный:

private static final int MAX_WORKERS = n;
private List<Worker> workers = new ArrayList<Worker>(MAX_WORKERS);

private boolean roomLeft() {
    synchronized (workers) {
        return (workers.size() < MAX_WORKERS);
    }
}

private void addWorker() {
    synchronized (workers) {
        workers.add(new Worker(this));
    }
}

public void removeWorker(Worker worker) {
    synchronized (workers) {
        workers.remove(worker);
    }
}

public Example() {
    while (true) {
        if (roomLeft()) {
            addWorker();
        } 
    }
}

Где Worker - это ваш класс, который расширяет поток.Каждый рабочий вызовет метод removeWorker этого класса, передавая себя в качестве параметра, когда закончит выполнять свою работу.

С учетом сказанного фреймворк Executor выглядит намного лучше.

Редактировать:Кто-нибудь потрудится объяснить, почему это так плохо, вместо того чтобы просто приуменьшать это?

Как уже упоминали другие здесь, лучше всего создать пул потоков с помощью Исполнители класс:

Однако, если вы хотите создать свой собственный, этот код должен дать вам представление о том, как действовать дальше.По сути, просто добавляйте каждый новый поток в группу потоков и убедитесь, что у вас никогда не бывает более N активных потоков в группе:

Task[] tasks = getTasks(); // array of tasks to complete
ThreadGroup group = new ThreadGroup();
int i=0;
while( i<tasks.length || group.activeCount()>0 ) {
    if( group.activeCount()<N && i<tasks.length ) {
        new TaskThread(group, tasks[i]).start();
        i++;
    } else {
        Thread.sleep(100);
    }
}
Лицензировано под: CC-BY-SA с атрибуция
Не связан с StackOverflow
scroll top