문제

Relatively new on spark and have tried running SparkPi example on a standalone 12 core three machine cluster. What I'm failing to understand is, that running this example with a single slice gives better performance as compared to using 12 slices. Same was the case when I was using parallelize function. The time is scaling almost linearly with adding each slice. Please let me know if I'm doing anything wrong. The code snippet is given below:

val spark = new SparkContext("spark://telecom:7077", "SparkPi",
  System.getenv("SPARK_HOME"), List("target/scala-2.10/sparkpii_2.10-1.0.jar"))
val slices = 1
val n = 10000000 * slices
val count = spark.parallelize(1 to n, slices).map {
  i =>
    val x = random * 2 - 1
    val y = random * 2 - 1
    if (x * x + y * y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / n)
spark.stop()

Update: Problem was with random function, since it was a synchronized method, it couldn't scale to multiple cores.

도움이 되었습니까?

해결책

The random function used in sparkpi example is a synchronized method and can't scale to multiple cores. It's an easy enough example to deploy on your cluster but don't use it to check Spark's performance and scalability.

다른 팁

As Ahsan mentioned in his answer, the problem was with 'scala.math.random'. I have replaced it with 'org.apache.spark.util.random.XORShiftRandom', and now using multiple processors makes the Pi calculations to run much faster. Below is my code, which is a modified version of SparkPi example from Spark distribution:

// scalastyle:off println
package org.apache.spark.examples

import org.apache.spark.util.random.XORShiftRandom

import org.apache.spark._

/** Computes an approximation to pi */
object SparkPi {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Spark Pi").setMaster(args(0))
    val spark = new SparkContext(conf)
    val slices = if (args.length > 1) args(1).toInt else 2
    val n = math.min(100000000L * slices, Int.MaxValue).toInt // avoid overflow
    val rand = new XORShiftRandom()

    val count = spark.parallelize(1 until n, slices).map { i =>
        val x = rand.nextDouble * 2 - 1
        val y = rand.nextDouble * 2 - 1
        if (x*x + y*y < 1) 1 else 0
      }.reduce(_ + _)

    println("Pi is roughly " + 4.0 * count / n)
    spark.stop()
  }
}
// scalastyle:on println

When I run the program above using one core with parameters 'local[1] 16' it takes about 60 seconds on my laptop. Same program using 8 cores ('local[*] 16') it takes 17 seconds.

라이센스 : CC-BY-SA ~와 함께 속성
제휴하지 않습니다 StackOverflow
scroll top