Minimal “Task Queue” with stock Linux tools to leverage Multicore CPU
-
06-07-2019 - |
Question
What is the best/easiest way to build a minimal task queue system for Linux using bash and common tools?
I have a file with 9'000 lines, each line has a bash command line, the commands are completely independent.
command 1 > Logs/1.log
command 2 > Logs/2.log
command 3 > Logs/3.log
...
My box has more than one core and I want to execute X tasks at the same time. I searched the web for a good way to do this. Apparently, a lot of people have this problem but nobody has a good solution so far.
It would be nice if the solution had the following features:
- can interpret more than one command (e.g.
command; command
) - can interpret stream redirects on the lines (e.g.
ls > /tmp/ls.txt
) - only uses common Linux tools
Bonus points if it works on other Unix-clones without too exotic requirements.
Solution
Can you convert your command list to a Makefile? If so, you could just run "make -j X".
OTHER TIPS
GNU Parallel http://www.gnu.org/software/parallel/ is a more general tool for parallelizing than PPSS.
If runfile contains:
command 1 > Logs/1.log
command 2 > Logs/2.log
command 3 > Logs/3.log
you can do:
cat runfile | parallel -j+0
which will run one command per CPU core.
If your commands are as simple as above you do not even need runfile but can do:
seq 1 3 | parallel -j+0 'command {} > Logs/{}.log'
If you have more computers available to do the processing you may want to look at the --sshlogin and --trc options for GNU Parallel.
Okay, after posting the question here, I found the following project which looks promising: ppss.
Edit: Not quite what I want, PPSS is focused on processing "all files in directory A".
Well, this is a kind of fun question anyway.
Here's what I'd do, assuming bash(1) of course.
- figure out how many of these commands can usefully run concurrently. It's not going to be just the number of cores; a lot of commands will be suspended for I/O and that sort of thing. Call that number N.
N=15
for example. - set up a trap signal handler for the SIGCHLD signal, which occurs when a child process terminates.
trap signalHandler SIGCHLD
- cat your list of commands into a pipe
- write a loop that reads stdin and executes the commands one by one, decrementing a counter. When the counter is 0, it
wait
s. - your signal handler, which runs on SIGCHLD, increments that counter.
So now, it runs the first N
commands, then waits. When the first child terminates, the wait returns, it reads another line, runs a new command, and waits again.
Now, this is a case that takes care of many jobs terminating close together. I suspect you can get away with a simpler version:
N=15
COUNT=N
cat mycommands.sh |
while read cmd
do
eval $cmd &
if $((count-- == 0))
then
wait
fi
od
Now, this one will start up the first 15 commands, and then run the rest one at a time as some command terminates.
Similar distributed-computing fun is the Mapreduce Bash Script:
http://blog.last.fm/2009/04/06/mapreduce-bash-script
And thanks for pointing out ppss!
You can use the xargs command, its --max-procs does what you want. For instance Charlie Martin solution becomes with xargs:
tr '\012' '\000' <mycommands.sh |xargs --null --max-procs=$X bash -c
details:
- X is the number of processes max. E.g: X=15. --max-procs is doing the magic
- the first tr is here to terminate lines by null bytes for xargs --null option so that quotes redirection etc are not expansed wrongly
- bash -c runs the command
I tested it with this mycommands.sh file for instance:
date
date "+%Y-%m-%d" >"The Date".txt
wc -c <'The Date'.txt >'The Count'.txt
This is a specific case, but if you are trying to process a set of files and produce another set of output files, you can start #cores number of processes, and check if an output file exists before processing it. The example below converts a directory of .m4b files to .mp3 files:
Just run this command as many times as you have cores:
ls *m4b|while read f; do test -f ${f%m4b}mp3 || mencoder -of rawaudio "$f" -oac mp3lame -ovc copy -o ${f%m4b}mp3; done &
You could see my tasks queue written on bash: https://github.com/pavelpat/yastq
Task Queue + Parallelized + Dynamic addition
Using a FIFO, this script fork itself to process the queue. This way, you can add commands to the queue on the fly (when the queue is already started).
Usage: ./queue Command [# of children] [Queue name]
Example, with 1 thread:
./queue "sleep 5; echo ONE" ./queue "echo TWO"
Output:
ONE TWO
Example, with 2 thread:
./queue "sleep 5; echo ONE" 2 ./queue "echo TWO"
Output:
TWO ONE
Example, with 2 queues:
./queue "sleep 5; echo ONE queue1" 1 queue1 ./queue "sleep 3; echo ONE queue2" 1 queue2
Output:
ONE queue2 ONE queue1
The script (save it as "queue" and chmod +x queue):
#!/bin/bash #Print usage [[ $# -eq 0 ]] && echo Usage: $0 Command [# of children] [Queue name] && exit #Param 1 - Command to execute COMMAND="$1" #Param 2 - Number of childs in parallel MAXCHILD=1 [[ $# -gt 1 ]] && MAXCHILD="$2" #Param 3 - File to be used as FIFO FIFO="/tmp/defaultqueue" [[ $# -gt 2 ]] && FIFO="$3" #Number of seconds to keep the runner active when unused TIMEOUT=5 runner(){ #Associate file descriptor 3 to the FIFO exec 3"$FIFO" while read -u 3 -t $TIMEOUT line; do #max child check while [ `jobs | grep Running | wc -l` -ge "$MAXCHILD" ]; do sleep 1 done #exec in backgroud (eval "$line")& done rm $FIFO } writer(){ #fork if the runner is not running lsof $FIFO >/dev/null || ($0 "QueueRunner" "$MAXCHILD" "$FIFO" &) #send the command to the runner echo "$COMMAND" > $FIFO } #Create the FIFO file [[ -e "$FIFO" ]] || mkfifo "$FIFO" #Start the runner if in the runner fork, else put the command in the queue [[ "$COMMAND" == "QueueRunner" ]] && runner || writer