使用 bash 和常用工具为 Linux 构建最小任务队列系统的最佳/最简单方法是什么?

我有一个有9'000行的文件,每行都有一个bash命令行,命令是完全独立的。

command 1 > Logs/1.log
command 2 > Logs/2.log
command 3 > Logs/3.log
...

我的机器有多个核心,我想同时执行 X 个任务。我在网上搜索了一个好方法来做到这一点。显然,很多人都遇到这个问题,但到目前为止还没有一个好的解决方案。

如果该解决方案具有以下功能,那就太好了:

  • 可以解释多个命令(例如 command; command)
  • 可以解释线路上的流重定向(例如 ls > /tmp/ls.txt)
  • 只使用常用的Linux工具

如果它可以在其他 Unix 克隆上运行而没有太奇特的要求,那就加分了。

有帮助吗?

解决方案

您可以将命令列表转换为 Makefile 吗?如果是这样,你可以运行“make -j X”。

其他提示

GNU 并行 http://www.gnu.org/software/parallel/ 是比 PPSS 更通用的并行化工具。

如果运行文件包含:

command 1 > Logs/1.log
command 2 > Logs/2.log
command 3 > Logs/3.log

你可以做:

cat runfile | parallel -j+0

它将为每个 CPU 核心运行一个命令。

如果您的命令像上面一样简单,您甚至不需要 runfile,但可以执行以下操作:

seq 1 3 | parallel -j+0 'command {} > Logs/{}.log'

如果您有更多计算机可用于执行处理,您可能需要查看 GNU Parallel 的 --sshlogin 和 --trc 选项。

好的,在这里发布问题后,我发现以下项目看起来很有前途: 聚苯硫醚.

编辑:不完全是我想要的,PPSS 专注于处理“目录 A 中的所有文件”。

嗯,无论如何,这是一个有趣的问题。

这就是我要做的,假设 重击(1) 当然。

  • 弄清楚有多少个命令可以有效地同时运行。这不仅仅是核心数量的问题;还包括核心数量的问题。许多命令将因 I/O 之类的事情而暂停。拨打该号码 N。 N=15 例如。
  • 为 SIGCHLD 信号设置陷阱信号处理程序,该信号在子进程终止时发生。 trap signalHandler SIGCHLD
  • 将命令列表放入管道中
  • 编写一个循环来读取 stdin 并逐一执行命令,从而递减计数器。当计数器为0时, waits。
  • 你的信号处理程序,在 SIGCHLD 上运行, 增量 那个柜台。

所以现在,它运行第一个 N 命令,然后等待。当第一个子进程终止时,等待返回,它读取另一行,运行新命令,然后再次等待。

现在,这种情况涉及到许多工作同时终止。我 怀疑 你可以使用一个更简单的版本:

 N=15
 COUNT=N
 cat mycommands.sh | 
 while read cmd 
 do
   eval $cmd &
   if $((count-- == 0))
   then
       wait
   fi
 od

现在,该命令将启动前 15 个命令,然后在某个命令终止时一次运行其余命令。

类似的分布式计算乐趣是 Mapreduce Bash 脚本:

http://blog.last.fm/2009/04/06/mapreduce-bash-script

感谢您指出 pps!

您可以使用 参数 命令,其 --最大进程 做你想做的事。例如 Charlie Martin 的解决方案就变成了 xargs:

tr '\012' '\000' <mycommands.sh |xargs --null --max-procs=$X bash -c

细节:

  • X 是最大进程数。例如:X=15。--max-procs 正在发挥魔力
  • 第一个 tr 在这里以 xargs --null 选项的空字节终止行,以便引号重定向等不会错误地扩展
  • bash -c 运行命令

例如,我使用 mycommands.sh 文件对其进行了测试:

date
date "+%Y-%m-%d" >"The Date".txt
wc -c <'The Date'.txt >'The Count'.txt

这是一种特定情况,但如果您尝试处理一组文件并生成另一组输出文件,则可以启动 #cores 个进程,并在处理之前检查输出文件是否存在。下面的示例将 .m4b 文件目录转换为 .mp3 文件:

只要有核心就运行此命令多次:

ls *m4b|同时读取 f;做测试-f $ {f%M4B} mp3 || mencoder -of rawaudio“ $ f” -oac mp3lame -ovc复制-o $ $ {f%m4b} mp3;完毕 &

你可以看到我的任务队列写在 bash 上: https://github.com/pavelpat/yastq

任务队列+并行+动态添加

该脚本使用 FIFO 分叉自身来处理队列。这样,您可以动态地将命令添加到队列中(当队列已经启动时)。

用法:./queue 命令 [子级数] [队列名称]

例如,有 1 个线程:

./queue "sleep 5; echo ONE"
./queue "echo TWO"

输出:

ONE
TWO

例如,有 2 个线程:

./queue "sleep 5; echo ONE" 2
./queue "echo TWO"

输出:

TWO
ONE

例如,有 2 个队列:

./queue "sleep 5; echo ONE queue1" 1 queue1
./queue "sleep 3; echo ONE queue2" 1 queue2

输出:

ONE queue2
ONE queue1

脚本(将其另存为“队列”并 chmod +x 队列):

    #!/bin/bash

    #Print usage
    [[ $# -eq 0 ]] && echo Usage: $0 Command [# of children] [Queue name] && exit

    #Param 1 - Command to execute
    COMMAND="$1"

    #Param 2 - Number of childs in parallel
    MAXCHILD=1
    [[ $# -gt 1 ]] && MAXCHILD="$2"

    #Param 3 - File to be used as FIFO
    FIFO="/tmp/defaultqueue"
    [[ $# -gt 2 ]] && FIFO="$3"

    #Number of seconds to keep the runner active when unused
    TIMEOUT=5

    runner(){
      #Associate file descriptor 3 to the FIFO
      exec 3"$FIFO"

      while read -u 3 -t $TIMEOUT line; do
        #max child check
        while [ `jobs | grep Running | wc -l` -ge "$MAXCHILD" ]; do
          sleep 1
        done

        #exec in backgroud
        (eval "$line")&
      done
      rm $FIFO
    }

    writer(){
      #fork if the runner is not running
      lsof $FIFO >/dev/null || ($0 "QueueRunner" "$MAXCHILD" "$FIFO" &)

      #send the command to the runner
      echo "$COMMAND" > $FIFO
    }

    #Create the FIFO file
    [[ -e "$FIFO" ]] || mkfifo "$FIFO"

    #Start the runner if in the runner fork, else put the command in the queue
    [[ "$COMMAND" == "QueueRunner" ]] && runner || writer

许可以下: CC-BY-SA归因
不隶属于 StackOverflow
scroll top