Как реализовать простую потоковую обработку с фиксированным количеством рабочих потоков
-
02-07-2019 - |
Вопрос
Я ищу самый простой и понятный способ реализовать следующее:
- Основная программа создает работник темы для выполнения задачи.
- Только
n
задачи могут выполняться одновременно. - Когда
n
достигнут, рабочие потоки больше не запускаются до тех пор, пока количество запущенных потоков не упадет нижеn
.
Решение
Я думаю, что Исполнители.newFixedThreadPool соответствует вашим требованиям.Существует несколько различных способов использования результирующего ExecutorService, в зависимости от того, хотите ли вы, чтобы результат возвращался в основной поток, или задача является полностью автономной, и есть ли у вас набор задач для выполнения заранее, или задачи ставятся в очередь в ответ на какое-то событие.
Collection<YourTask> tasks = new ArrayList<YourTask>();
YourTask yt1 = new YourTask();
...
tasks.add(yt1);
...
ExecutorService exec = Executors.newFixedThreadPool(5);
List<Future<YourResultType>> results = exec.invokeAll(tasks);
В качестве альтернативы, если у вас есть новая асинхронная задача для выполнения в ответ на какое-либо событие, вы, вероятно, просто захотите использовать простой ExecutorService execute(Runnable)
способ.
Другие советы
/* Get an executor service that will run a maximum of 5 threads at a time: */
ExecutorService exec = Executors.newFixedThreadPool(5);
/* For all the 100 tasks to be done altogether... */
for (int i = 0; i < 100; i++) {
/* ...execute the task to run concurrently as a runnable: */
exec.execute(new Runnable() {
public void run() {
/* do the work to be done in its own thread */
System.out.println("Running in: " + Thread.currentThread());
}
});
}
/* Tell the executor that after these 100 steps above, we will be done: */
exec.shutdown();
try {
/* The tasks are now running concurrently. We wait until all work is done,
* with a timeout of 50 seconds: */
boolean b = exec.awaitTermination(50, TimeUnit.SECONDS);
/* If the execution timed out, false is returned: */
System.out.println("All done: " + b);
} catch (InterruptedException e) { e.printStackTrace(); }
Исполнители.newFixedThreadPool(int)
Executor executor = Executors.newFixedThreadPool(n);
Runnable runnable = new Runnable() {
public void run() {
// do your thing here
}
}
executor.execute(runnable);
Используйте фреймворк Executor;а именно newFixedThreadPool(N) Новый пул потоков (N)
Если ваша очередь задач не будет неограниченной и задачи могут выполняться за более короткие промежутки времени, вы можете использовать
Executors.newFixedThreadPool(n)
;как предполагают эксперты.Единственным недостатком этого решения является неограниченный размер очереди задач.Ты не можешь это контролировать.Огромное скопление в очереди задач приведет к снижению производительности приложения и может привести к нехватке памяти в некоторых сценариях.
Если вы хотите использовать
ExecutorService
и включитьwork stealing
механизм, при котором простаивающие рабочие потоки разделяют рабочую нагрузку с занятыми рабочими потоками, крадя задачи из очереди задач.Это вернет ForkJoinPool тип службы-исполнителя.публичный статический
ExecutorService newWorkStealingPool
(параллелизм значений int)Создает пул потоков, который поддерживает достаточное количество потоков для поддержки заданного уровня параллелизма и может использовать несколько очередей для уменьшения конкуренции.Уровень параллелизма соответствует максимальному количеству потоков, активно задействованных в обработке задач или доступных для участия.Фактическое количество потоков может динамически увеличиваться и уменьшаться.Пул, похищающий работу, не дает никаких гарантий относительно порядка выполнения отправленных задач.
Я предпочитаю
ThreadPoolExecutor
благодаря гибкости API можно управлять многими парамет-рами, что контролирует выполнение потоковой задачи.ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
в вашем случае установите оба corePoolSize and maximumPoolSize as N
.Здесь вы можете управлять размером очереди задач, определять свою собственную фабрику потоков и политику обработки отклонений.
Взгляните на соответствующий вопрос SE, чтобы динамически управлять размером пула:
Если вы хотите сделать свой собственный:
private static final int MAX_WORKERS = n;
private List<Worker> workers = new ArrayList<Worker>(MAX_WORKERS);
private boolean roomLeft() {
synchronized (workers) {
return (workers.size() < MAX_WORKERS);
}
}
private void addWorker() {
synchronized (workers) {
workers.add(new Worker(this));
}
}
public void removeWorker(Worker worker) {
synchronized (workers) {
workers.remove(worker);
}
}
public Example() {
while (true) {
if (roomLeft()) {
addWorker();
}
}
}
Где Worker - это ваш класс, который расширяет поток.Каждый рабочий вызовет метод removeWorker этого класса, передавая себя в качестве параметра, когда закончит выполнять свою работу.
С учетом сказанного фреймворк Executor выглядит намного лучше.
Редактировать:Кто-нибудь потрудится объяснить, почему это так плохо, вместо того чтобы просто приуменьшать это?
Как уже упоминали другие здесь, лучше всего создать пул потоков с помощью Исполнители класс:
Однако, если вы хотите создать свой собственный, этот код должен дать вам представление о том, как действовать дальше.По сути, просто добавляйте каждый новый поток в группу потоков и убедитесь, что у вас никогда не бывает более N активных потоков в группе:
Task[] tasks = getTasks(); // array of tasks to complete
ThreadGroup group = new ThreadGroup();
int i=0;
while( i<tasks.length || group.activeCount()>0 ) {
if( group.activeCount()<N && i<tasks.length ) {
new TaskThread(group, tasks[i]).start();
i++;
} else {
Thread.sleep(100);
}
}