最小化“任务队列”,使用现有的 Linux 工具来利用多核 CPU
-
06-07-2019 - |
题
使用 bash 和常用工具为 Linux 构建最小任务队列系统的最佳/最简单方法是什么?
我有一个有9'000行的文件,每行都有一个bash命令行,命令是完全独立的。
command 1 > Logs/1.log
command 2 > Logs/2.log
command 3 > Logs/3.log
...
我的机器有多个核心,我想同时执行 X 个任务。我在网上搜索了一个好方法来做到这一点。显然,很多人都遇到这个问题,但到目前为止还没有一个好的解决方案。
如果该解决方案具有以下功能,那就太好了:
- 可以解释多个命令(例如
command; command
) - 可以解释线路上的流重定向(例如
ls > /tmp/ls.txt
) - 只使用常用的Linux工具
如果它可以在其他 Unix 克隆上运行而没有太奇特的要求,那就加分了。
解决方案
您可以将命令列表转换为 Makefile 吗?如果是这样,你可以运行“make -j X”。
其他提示
GNU 并行 http://www.gnu.org/software/parallel/ 是比 PPSS 更通用的并行化工具。
如果运行文件包含:
command 1 > Logs/1.log
command 2 > Logs/2.log
command 3 > Logs/3.log
你可以做:
cat runfile | parallel -j+0
它将为每个 CPU 核心运行一个命令。
如果您的命令像上面一样简单,您甚至不需要 runfile,但可以执行以下操作:
seq 1 3 | parallel -j+0 'command {} > Logs/{}.log'
如果您有更多计算机可用于执行处理,您可能需要查看 GNU Parallel 的 --sshlogin 和 --trc 选项。
好的,在这里发布问题后,我发现以下项目看起来很有前途: 聚苯硫醚.
编辑:不完全是我想要的,PPSS 专注于处理“目录 A 中的所有文件”。
嗯,无论如何,这是一个有趣的问题。
这就是我要做的,假设 重击(1) 当然。
- 弄清楚有多少个命令可以有效地同时运行。这不仅仅是核心数量的问题;还包括核心数量的问题。许多命令将因 I/O 之类的事情而暂停。拨打该号码 N。
N=15
例如。 - 为 SIGCHLD 信号设置陷阱信号处理程序,该信号在子进程终止时发生。
trap signalHandler SIGCHLD
- 将命令列表放入管道中
- 编写一个循环来读取 stdin 并逐一执行命令,从而递减计数器。当计数器为0时,
wait
s。 - 你的信号处理程序,在 SIGCHLD 上运行, 增量 那个柜台。
所以现在,它运行第一个 N
命令,然后等待。当第一个子进程终止时,等待返回,它读取另一行,运行新命令,然后再次等待。
现在,这种情况涉及到许多工作同时终止。我 怀疑 你可以使用一个更简单的版本:
N=15
COUNT=N
cat mycommands.sh |
while read cmd
do
eval $cmd &
if $((count-- == 0))
then
wait
fi
od
现在,该命令将启动前 15 个命令,然后在某个命令终止时一次运行其余命令。
您可以使用 参数 命令,其 --最大进程 做你想做的事。例如 Charlie Martin 的解决方案就变成了 xargs:
tr '\012' '\000' <mycommands.sh |xargs --null --max-procs=$X bash -c
细节:
- X 是最大进程数。例如:X=15。--max-procs 正在发挥魔力
- 第一个 tr 在这里以 xargs --null 选项的空字节终止行,以便引号重定向等不会错误地扩展
- bash -c 运行命令
例如,我使用 mycommands.sh 文件对其进行了测试:
date
date "+%Y-%m-%d" >"The Date".txt
wc -c <'The Date'.txt >'The Count'.txt
这是一种特定情况,但如果您尝试处理一组文件并生成另一组输出文件,则可以启动 #cores 个进程,并在处理之前检查输出文件是否存在。下面的示例将 .m4b 文件目录转换为 .mp3 文件:
只要有核心就运行此命令多次:
ls *m4b|同时读取 f;做测试-f $ {f%M4B} mp3 || mencoder -of rawaudio“ $ f” -oac mp3lame -ovc复制-o $ $ {f%m4b} mp3;完毕 &
你可以看到我的任务队列写在 bash 上: https://github.com/pavelpat/yastq
任务队列+并行+动态添加
该脚本使用 FIFO 分叉自身来处理队列。这样,您可以动态地将命令添加到队列中(当队列已经启动时)。
用法:./queue 命令 [子级数] [队列名称]
例如,有 1 个线程:
./queue "sleep 5; echo ONE" ./queue "echo TWO"
输出:
ONE TWO
例如,有 2 个线程:
./queue "sleep 5; echo ONE" 2 ./queue "echo TWO"
输出:
TWO ONE
例如,有 2 个队列:
./queue "sleep 5; echo ONE queue1" 1 queue1 ./queue "sleep 3; echo ONE queue2" 1 queue2
输出:
ONE queue2 ONE queue1
脚本(将其另存为“队列”并 chmod +x 队列):
#!/bin/bash #Print usage [[ $# -eq 0 ]] && echo Usage: $0 Command [# of children] [Queue name] && exit #Param 1 - Command to execute COMMAND="$1" #Param 2 - Number of childs in parallel MAXCHILD=1 [[ $# -gt 1 ]] && MAXCHILD="$2" #Param 3 - File to be used as FIFO FIFO="/tmp/defaultqueue" [[ $# -gt 2 ]] && FIFO="$3" #Number of seconds to keep the runner active when unused TIMEOUT=5 runner(){ #Associate file descriptor 3 to the FIFO exec 3"$FIFO" while read -u 3 -t $TIMEOUT line; do #max child check while [ `jobs | grep Running | wc -l` -ge "$MAXCHILD" ]; do sleep 1 done #exec in backgroud (eval "$line")& done rm $FIFO } writer(){ #fork if the runner is not running lsof $FIFO >/dev/null || ($0 "QueueRunner" "$MAXCHILD" "$FIFO" &) #send the command to the runner echo "$COMMAND" > $FIFO } #Create the FIFO file [[ -e "$FIFO" ]] || mkfifo "$FIFO" #Start the runner if in the runner fork, else put the command in the queue [[ "$COMMAND" == "QueueRunner" ]] && runner || writer