Question

I am new to using Startcluster/qsub/grid engine to run parallel jobs and I tried reading couple of other posts regarding the same. I still am not sure of how to build a scalable solution for my specific requirement. I would like to take in some more suggestions before I proceed with the same.

Here are my requirements:

  1. I have a huge tar file [~40 - 50 GB and it can go up to 100GB] -----> There is not much I can do here. I have accept that a huge single tar file as input.

  2. I have to untar and uncompress it ----->I run tar xvf tarfilename.tar | parallel pbzip -d to untar and uncompress the same.

  3. The output of this uncompression is say few hundred thousand files, approx 500,000 files.

  4. This uncompressed files have to processed. I have modular code that can take in every single file and process it and output 5 different files.

Tar File -----Parallel Uncompression---> Uncompressed Files -----Parallel Processing ---> 5 output files per file processed

  1. I currently have a parallel python script which runs on a 16 cores, 16GB memory taking in this list of uncompressed files and processing the same in parallel.

  2. The problem is how do I seamlessly scale. For example, if my code has been say running for 10 hours and I would like to add one more 8 core machine to it, I cannot do it in parallel python as I would have to know the number of processors in advance.

  3. At the same time, when I dynamically add more nodes to the currently cluster, how about the data accessibility and read/write operations?

So, I went about reading and doing basic experimentation with starcluster and qsub. While I see I can submit multiple jobs via qsub, How will I make it to take the input files from the uncompressed input folder?

For example, Can I write a script.sh which in for loop pick the file names one by one and submit the same to qsub command? Is there another efficient solution?

Say, if have 3 machines with 16 CPUs each, and if I submit 48 jobs to the queue, will the qsub launch them automatically in different CPUs of the clusters or will I have to use Parallel environment parameters like -np orte command set the number of CPUs in each cluster respectively. Is it necessary to make my python script MPI executable?

As a summary, I have a few hundred thousand files as input, I would like to submit them to a job queues to multi core machines. If I dynamically add more machines, the jobs should automatically be distributed.

Another major challenge is I need all the output of the 500,000 odd operations to be aggregated at the end? Is there a suggestion on how to aggregate output of parallel jobs as and when output is written out?

I am test running few scenarios but I would like to know if there are people who have experimented on similar scenarios.

Any suggestions using Hadoop Plugin? http://star.mit.edu/cluster/docs/0.93.3/plugins/hadoop.html

Thanks in Advance Karthick

No correct solution

OTHER TIPS

  1. I/O and data sharing. If you're I/O is low, you can possibly leave your data on your master node and use nfs to share it among your nodes. If you have a lot of I/O, I would recommend using an S3 bucket.

  2. Distribution: Your bash script that launches multiple qsub is the right thing to do. It's up to you to either call it on a single file or on a few files at once.

  3. Scaling: See your parallel jobs running on the cluster as different tasks. It's up to you to run 1 or more instances of your application on each node. Eg.: If you use cr1.8xlarge nodes, you have 32 cores. You can launch 1 instance of your app there using the 32 cores or 4 instances of your app using 8 cores. See "slots" configuration for each nodes within Open Grid Engine. (If you were more willing to run one big instance of your app combining the cores of multiple nodes, I never did it so I can't help you with that.) Then, to add a node, you can use the "addnode" command from StarCluster. Once the node is up, OGS will automatically distribute jobs there too. You could also use StarCluster loadbalancer to automatically add/remove nodes.

So, here is my suggestion. 1. Extract your files to S3. 2. Launch StarCluster 3. Using your bashscript, qsub a job for every few files (might be more efficient for a job to work on say 10 files than having a job for every single files) 4. Your application must I/O to s3. 5. When the queue is empty, have a script look at the results to make sure all jobs ran well. You may reschedule jobs when the output is missing.

  • I don't know how your aggregation is done so I can't tell.
  • I never used hadoop, so I can't help there either.
  • You have no need to make your python script MPI executable.
  • If you use an heterogeneous cluster, then you know from the start how many cores will be available on each nodes.
  • If you define a node with 32 cores to have 4 slots, then you should make your jobs use at most 8 cores each.

After some time researching on various options available for dynamic scaling, I decided to use the Queue mechanism to distribute jobs to multiple workers.

Job_Manager - Reads input, constructs job, adds the job into the queue SQS Queue is the Queue service Worker processes - Listens to the queue and processes the output.

The input/output drives are NFS and are available to all the server/clients.

To dynamically scale, Add NFS client info in /exports and restart the server. The active clients have a rw,hard,intr configuration in their respective fstab. By launching n worker processes in the new client, more workers are added to process.

In so far, it is reliable and scaling well. I was able to launch close 90 workers across 3 machines, and process 200,000 files in less than 5 hours. Earlier, it was taking close to 24 hours for the same as I couldn't distribute data and run workers across multiple nodes.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top