Question

Relatively new on spark and have tried running SparkPi example on a standalone 12 core three machine cluster. What I'm failing to understand is, that running this example with a single slice gives better performance as compared to using 12 slices. Same was the case when I was using parallelize function. The time is scaling almost linearly with adding each slice. Please let me know if I'm doing anything wrong. The code snippet is given below:

val spark = new SparkContext("spark://telecom:7077", "SparkPi",
  System.getenv("SPARK_HOME"), List("target/scala-2.10/sparkpii_2.10-1.0.jar"))
val slices = 1
val n = 10000000 * slices
val count = spark.parallelize(1 to n, slices).map {
  i =>
    val x = random * 2 - 1
    val y = random * 2 - 1
    if (x * x + y * y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / n)
spark.stop()

Update: Problem was with random function, since it was a synchronized method, it couldn't scale to multiple cores.

Was it helpful?

Solution

The random function used in sparkpi example is a synchronized method and can't scale to multiple cores. It's an easy enough example to deploy on your cluster but don't use it to check Spark's performance and scalability.

OTHER TIPS

As Ahsan mentioned in his answer, the problem was with 'scala.math.random'. I have replaced it with 'org.apache.spark.util.random.XORShiftRandom', and now using multiple processors makes the Pi calculations to run much faster. Below is my code, which is a modified version of SparkPi example from Spark distribution:

// scalastyle:off println
package org.apache.spark.examples

import org.apache.spark.util.random.XORShiftRandom

import org.apache.spark._

/** Computes an approximation to pi */
object SparkPi {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Spark Pi").setMaster(args(0))
    val spark = new SparkContext(conf)
    val slices = if (args.length > 1) args(1).toInt else 2
    val n = math.min(100000000L * slices, Int.MaxValue).toInt // avoid overflow
    val rand = new XORShiftRandom()

    val count = spark.parallelize(1 until n, slices).map { i =>
        val x = rand.nextDouble * 2 - 1
        val y = rand.nextDouble * 2 - 1
        if (x*x + y*y < 1) 1 else 0
      }.reduce(_ + _)

    println("Pi is roughly " + 4.0 * count / n)
    spark.stop()
  }
}
// scalastyle:on println

When I run the program above using one core with parameters 'local[1] 16' it takes about 60 seconds on my laptop. Same program using 8 cores ('local[*] 16') it takes 17 seconds.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top