Question

What is the best/easiest way to build a minimal task queue system for Linux using bash and common tools?

I have a file with 9'000 lines, each line has a bash command line, the commands are completely independent.

command 1 > Logs/1.log
command 2 > Logs/2.log
command 3 > Logs/3.log
...

My box has more than one core and I want to execute X tasks at the same time. I searched the web for a good way to do this. Apparently, a lot of people have this problem but nobody has a good solution so far.

It would be nice if the solution had the following features:

  • can interpret more than one command (e.g. command; command)
  • can interpret stream redirects on the lines (e.g. ls > /tmp/ls.txt)
  • only uses common Linux tools

Bonus points if it works on other Unix-clones without too exotic requirements.

Was it helpful?

Solution

Can you convert your command list to a Makefile? If so, you could just run "make -j X".

OTHER TIPS

GNU Parallel http://www.gnu.org/software/parallel/ is a more general tool for parallelizing than PPSS.

If runfile contains:

command 1 > Logs/1.log
command 2 > Logs/2.log
command 3 > Logs/3.log

you can do:

cat runfile | parallel -j+0

which will run one command per CPU core.

If your commands are as simple as above you do not even need runfile but can do:

seq 1 3 | parallel -j+0 'command {} > Logs/{}.log'

If you have more computers available to do the processing you may want to look at the --sshlogin and --trc options for GNU Parallel.

Okay, after posting the question here, I found the following project which looks promising: ppss.

Edit: Not quite what I want, PPSS is focused on processing "all files in directory A".

Well, this is a kind of fun question anyway.

Here's what I'd do, assuming bash(1) of course.

  • figure out how many of these commands can usefully run concurrently. It's not going to be just the number of cores; a lot of commands will be suspended for I/O and that sort of thing. Call that number N. N=15 for example.
  • set up a trap signal handler for the SIGCHLD signal, which occurs when a child process terminates. trap signalHandler SIGCHLD
  • cat your list of commands into a pipe
  • write a loop that reads stdin and executes the commands one by one, decrementing a counter. When the counter is 0, it waits.
  • your signal handler, which runs on SIGCHLD, increments that counter.

So now, it runs the first N commands, then waits. When the first child terminates, the wait returns, it reads another line, runs a new command, and waits again.

Now, this is a case that takes care of many jobs terminating close together. I suspect you can get away with a simpler version:

 N=15
 COUNT=N
 cat mycommands.sh | 
 while read cmd 
 do
   eval $cmd &
   if $((count-- == 0))
   then
       wait
   fi
 od

Now, this one will start up the first 15 commands, and then run the rest one at a time as some command terminates.

Similar distributed-computing fun is the Mapreduce Bash Script:

http://blog.last.fm/2009/04/06/mapreduce-bash-script

And thanks for pointing out ppss!

You can use the xargs command, its --max-procs does what you want. For instance Charlie Martin solution becomes with xargs:

tr '\012' '\000' <mycommands.sh |xargs --null --max-procs=$X bash -c

details:

  • X is the number of processes max. E.g: X=15. --max-procs is doing the magic
  • the first tr is here to terminate lines by null bytes for xargs --null option so that quotes redirection etc are not expansed wrongly
  • bash -c runs the command

I tested it with this mycommands.sh file for instance:

date
date "+%Y-%m-%d" >"The Date".txt
wc -c <'The Date'.txt >'The Count'.txt

This is a specific case, but if you are trying to process a set of files and produce another set of output files, you can start #cores number of processes, and check if an output file exists before processing it. The example below converts a directory of .m4b files to .mp3 files:

Just run this command as many times as you have cores:

ls *m4b|while read f; do test -f ${f%m4b}mp3 || mencoder -of rawaudio "$f" -oac mp3lame -ovc copy -o ${f%m4b}mp3; done &

You could see my tasks queue written on bash: https://github.com/pavelpat/yastq

Task Queue + Parallelized + Dynamic addition

Using a FIFO, this script fork itself to process the queue. This way, you can add commands to the queue on the fly (when the queue is already started).

Usage: ./queue Command [# of children] [Queue name]

Example, with 1 thread:

./queue "sleep 5; echo ONE"
./queue "echo TWO"

Output:

ONE
TWO

Example, with 2 thread:

./queue "sleep 5; echo ONE" 2
./queue "echo TWO"

Output:

TWO
ONE

Example, with 2 queues:

./queue "sleep 5; echo ONE queue1" 1 queue1
./queue "sleep 3; echo ONE queue2" 1 queue2

Output:

ONE queue2
ONE queue1

The script (save it as "queue" and chmod +x queue):

    #!/bin/bash

    #Print usage
    [[ $# -eq 0 ]] && echo Usage: $0 Command [# of children] [Queue name] && exit

    #Param 1 - Command to execute
    COMMAND="$1"

    #Param 2 - Number of childs in parallel
    MAXCHILD=1
    [[ $# -gt 1 ]] && MAXCHILD="$2"

    #Param 3 - File to be used as FIFO
    FIFO="/tmp/defaultqueue"
    [[ $# -gt 2 ]] && FIFO="$3"

    #Number of seconds to keep the runner active when unused
    TIMEOUT=5

    runner(){
      #Associate file descriptor 3 to the FIFO
      exec 3"$FIFO"

      while read -u 3 -t $TIMEOUT line; do
        #max child check
        while [ `jobs | grep Running | wc -l` -ge "$MAXCHILD" ]; do
          sleep 1
        done

        #exec in backgroud
        (eval "$line")&
      done
      rm $FIFO
    }

    writer(){
      #fork if the runner is not running
      lsof $FIFO >/dev/null || ($0 "QueueRunner" "$MAXCHILD" "$FIFO" &)

      #send the command to the runner
      echo "$COMMAND" > $FIFO
    }

    #Create the FIFO file
    [[ -e "$FIFO" ]] || mkfifo "$FIFO"

    #Start the runner if in the runner fork, else put the command in the queue
    [[ "$COMMAND" == "QueueRunner" ]] && runner || writer

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top