Question

My spark cluster hangs when I try to cache() or persist(MEMORY_ONLY_SER()) my RDDs. It works great and computes results in about 7min. if I don't use cache().

I've got 6 c3.xlarge EC2 instances (4 cores, 7.5 GB RAM each), which gives in total 24 cores and 37.7 GB.

I run my application with the following command on master:

SPARK_MEM=5g MEMORY_FRACTION="0.6" SPARK_HOME="/root/spark" java -cp ./uber-offline.jar:/root/spark/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar pl.instream.dsp.offline.OfflineAnalysis

The data set is about 50GB of data partitioned into 24 files. I compressed it and stored in S3 bucket in 24 files (where each of it has size of 7MB to 300MB).

I absolutely can't find a reason for such behaviour of my cluster, but it seems, like spark consumed all available memory and got into GC collecting loop. When I look into gc verbose, I can find a cycles like below:

[GC 5208198K(5208832K), 0,2403780 secs]
[Full GC 5208831K->5208212K(5208832K), 9,8765730 secs]
[Full GC 5208829K->5208238K(5208832K), 9,7567820 secs]
[Full GC 5208829K->5208295K(5208832K), 9,7629460 secs]
[GC 5208301K(5208832K), 0,2403480 secs]
[Full GC 5208831K->5208344K(5208832K), 9,7497710 secs]
[Full GC 5208829K->5208366K(5208832K), 9,7542880 secs]
[Full GC 5208831K->5208415K(5208832K), 9,7574860 secs]

This finally leads to the messages like:

WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(0, ip-xx-xx-xxx-xxx.eu-west-1.compute.internal, 60048, 0) with no recent heart beats: 64828ms exceeds 45000ms

...and stops any progress in computing. This looks like the memory was consumed in 100%, but I tried to use machines with more RAM (like 30GB each), and the effect is the same.

What might be the reason of such behaviour?? Could anybody help??

Was it helpful?

Solution

Try using more partitions, you should have 2 - 4 per CPU. IME increasing the number of partitions is often the easiest way to make a program more stable (and often faster).

By default I think your code will use 24 partitions, but for 50 GB of data that is far too little. I'd try a few 100 partitions at least.

Next you use SPARK_MEM=5g but say each node has 7.5 GB, so you might as well have SPARK_MEM=7500m.

You could also try increasing the memory fraction, but I think the above is more likely to help.

General points: use HDFS for you files not s3, it's hugely faster. Ensure you munge your data properly before caching it - e.g. if you have say TSV data with 100 columns, but you only use 10 of the fields, then make sure you've extracted those fields before you try to cache.

OTHER TIPS

There is a big difference between 'raw' caching and 'serialized' caching

  1. Raw Caching : (rdd.cache() or rdd.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY) )

    This will consume 2x-3x the memory. For example a 100MB rdd can consume 350MB in memory

  2. serialized caching (rdd.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER))

    This consumes pretty much the same amount of memory plus some small overhead. For example 100MB data will consume 100MB + few KB in memory.

How ever raw caching is faster during operations. Serialized cache takes longer (because the object has to be de-serialized before compute)

Here is a interesting result from my experiment.

enter image description here enter image description here

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top