如何使用固定数量的工作线程实现简单线程
-
02-07-2019 - |
题
我正在寻找最简单、最直接的方法来实现以下内容:
- 主要程序实例化工作线程来执行任务。
- 仅有的
n
任务可以同时运行。 - 什么时候
n
已达到到达,直到运行线程的计数下降到下方,才有更多的工人n
.
解决方案
我觉得 Executors.newFixedThreadPool 符合您的要求。有多种不同的方法可以使用生成的 ExecutorService,具体取决于您是否希望将结果返回到主线程,或者任务是否完全独立,以及您是否有一组要预先执行的任务,或者任务是否排队以响应某些事件。
Collection<YourTask> tasks = new ArrayList<YourTask>();
YourTask yt1 = new YourTask();
...
tasks.add(yt1);
...
ExecutorService exec = Executors.newFixedThreadPool(5);
List<Future<YourResultType>> results = exec.invokeAll(tasks);
或者,如果您有一个新的异步任务要执行以响应某些事件,您可能只想使用 ExecutorService 的简单方法 execute(Runnable)
方法。
其他提示
/* Get an executor service that will run a maximum of 5 threads at a time: */
ExecutorService exec = Executors.newFixedThreadPool(5);
/* For all the 100 tasks to be done altogether... */
for (int i = 0; i < 100; i++) {
/* ...execute the task to run concurrently as a runnable: */
exec.execute(new Runnable() {
public void run() {
/* do the work to be done in its own thread */
System.out.println("Running in: " + Thread.currentThread());
}
});
}
/* Tell the executor that after these 100 steps above, we will be done: */
exec.shutdown();
try {
/* The tasks are now running concurrently. We wait until all work is done,
* with a timeout of 50 seconds: */
boolean b = exec.awaitTermination(50, TimeUnit.SECONDS);
/* If the execution timed out, false is returned: */
System.out.println("All done: " + b);
} catch (InterruptedException e) { e.printStackTrace(); }
Executors.newFixedThreadPool(int)
Executor executor = Executors.newFixedThreadPool(n);
Runnable runnable = new Runnable() {
public void run() {
// do your thing here
}
}
executor.execute(runnable);
使用Executor框架;即 new固定线程池(N)
如果您的任务队列不是无限的并且任务可以在更短的时间间隔内完成,您可以使用
Executors.newFixedThreadPool(n)
;正如专家建议的那样。该解决方案的唯一缺点是任务队列大小不受限制。你无法控制它。任务队列中的巨大堆积会降低应用程序的性能,并且在某些情况下可能会导致内存不足。
如果你想使用
ExecutorService
并启用work stealing
空闲工作线程通过窃取任务队列中的任务来共享繁忙工作线程的工作负载的机制。它将返回 ForkJoinPool 类型的 Executor Service。公共静态
ExecutorService newWorkStealingPool
(int并行性)创建一个线程池,该线程池维护足够的线程来支持给定的并行级别,并且可以使用多个队列来减少争用。并行级别对应于主动参与或可用于参与任务处理的线程的最大数量。实际线程数可能会动态增长和收缩。工作窃取池不保证提交任务的执行顺序。
我更喜欢
ThreadPoolExecutor
由于 API 可以灵活地控制许多参数,从而控制流任务的执行。ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
在你的情况下,设置两者 corePoolSize and maximumPoolSize as N
. 。在这里您可以控制任务队列大小,定义您自己的自定义线程工厂和拒绝处理程序策略。
查看相关的 SE 问题来动态控制池大小:
如果你想自己推出:
private static final int MAX_WORKERS = n;
private List<Worker> workers = new ArrayList<Worker>(MAX_WORKERS);
private boolean roomLeft() {
synchronized (workers) {
return (workers.size() < MAX_WORKERS);
}
}
private void addWorker() {
synchronized (workers) {
workers.add(new Worker(this));
}
}
public void removeWorker(Worker worker) {
synchronized (workers) {
workers.remove(worker);
}
}
public Example() {
while (true) {
if (roomLeft()) {
addWorker();
}
}
}
其中 Worker 是扩展 Thread 的类。当每个工作人员完成其工作时,都会调用此类的removeWorker 方法,将其自身作为参数传递。
话虽如此,Executor 框架看起来好多了。
编辑:有人愿意解释为什么这如此糟糕,而不是仅仅降低它吗?
正如这里其他人提到的,最好的选择是创建一个线程池 执行者 班级:
但是,如果您想自己动手,此代码应该可以让您了解如何继续。基本上,只需将每个新线程添加到线程组中,并确保该组中的活动线程永远不会超过 N 个:
Task[] tasks = getTasks(); // array of tasks to complete
ThreadGroup group = new ThreadGroup();
int i=0;
while( i<tasks.length || group.activeCount()>0 ) {
if( group.activeCount()<N && i<tasks.length ) {
new TaskThread(group, tasks[i]).start();
i++;
} else {
Thread.sleep(100);
}
}