Question

Right now processing a large amount of Json data coming from a Mixpanel API. With a small dataset, it's a breeze and the code below runs just fine. However, a large dataset takes a rather long time to process and we're starting to see timeouts because of it.

My Scala optimization skills are rather poor, so I am hoping someone can show a faster way to process the following with large data sets. Please do explain why since it will help my own understanding of Scala.

val people = parse[mp.data.Segmentation](o)
val list = people.data.values.map(b => 
  b._2.map(p => 
    Map(
      "id" -> p._1, 
      "activity" -> p._2.foldLeft(0)(_+_._2)
    )
  )
)
.flatten
.filter{ behavior => behavior("activity") != 0 }
.groupBy(o => o("id"))
.map{ case (k,v) => Map("id" -> k, "activity" -> v.map( o => o("activity").asInstanceOf[Int]).sum) }

And that Segmentation class:

case class Segmentation(
  val legend_size: Int,
  val data: Data
)

case class Data(
  val series: List[String],
  val values: Map[String, Map[String, Map[String, Int]]]
)

Thanks for your help!

Edit: sample data as requested

{"legend_size": 4, "data": {"series": ["2013-12-17", "2013-12-18", "2013-12-19", "2013-12-20", "2013-12-21", "2013-12-22", "2013-12-23", "2013-12-24", "2013-12-25", "2013-12-26", "2013-12-27", "2013-12-28", "2013-12-29", "2013-12-30", "2013-12-31", "2014-01-01", "2014-01-02", "2014-01-03", "2014-01-04", "2014-01-05", "2014-01-06"], "values": {"afef4ac12a21d5c4ef679c6507fe65cd": {"id:twitter.com:194436690": {"2013-12-20": 0, "2013-12-29": 0, "2013-12-28": 0, "2013-12-23": 0, "2013-12-22": 0, "2013-12-21": 1, "2013-12-25": 0, "2013-12-27": 0, "2013-12-26": 0, "2013-12-24": 0, "2013-12-31": 0, "2014-01-06": 0, "2014-01-04": 0, "2014-01-05": 0, "2014-01-02": 0, "2014-01-03": 0, "2014-01-01": 0, "2013-12-30": 0, "2013-12-17": 0, "2013-12-18": 0, "2013-12-19": 0}, "id:twitter.com:330103796": {"2013-12-20": 0, "2013-12-29": 0, "2013-12-28": 0, "2013-12-23": 0, "2013-12-22": 0, "2013-12-21": 0, "2013-12-25": 0, "2013-12-27": 0, "2013-12-26": 1, "2013-12-24": 0, "2013-12-31": 0, "2014-01-06": 0, "2014-01-04": 0, "2014-01-05": 0, "2014-01-02": 0, "2014-01-03": 0, "2014-01-01": 0, "2013-12-30": 0, "2013-12-17": 0, "2013-12-18": 0, "2013-12-19": 0}, "id:twitter.com:216664121": {"2013-12-20": 0, "2013-12-29": 0, "2013-12-28": 0, "2013-12-23": 1, "2013-12-22": 0, "2013-12-21": 0, "2013-12-25": 0, "2013-12-27": 0, "2013-12-26": 0, "2013-12-24": 0, "2013-12-31": 0, "2014-01-06": 0, "2014-01-04": 0, "2014-01-05": 0, "2014-01-02": 0, "2014-01-03": 0, "2014-01-01": 0, "2013-12-30": 0, "2013-12-17": 0, "2013-12-18": 0, "2013-12-19": 0}, "id:twitter.com:414117608": {"2013-12-20": 0, "2013-12-29": 0, "2013-12-28": 1, "2013-12-23": 0, "2013-12-22": 0, "2013-12-21": 0, "2013-12-25": 0, "2013-12-27": 0, "2013-12-26": 0, "2013-12-24": 0, "2013-12-31": 0, "2014-01-06": 0, "2014-01-04": 0, "2014-01-05": 0, "2014-01-02": 0, "2014-01-03": 0, "2014-01-01": 0, "2013-12-30": 0, "2013-12-17": 0, "2013-12-18": 0, "2013-12-19": 0}}}}}

To answer Millhouse's question, the intention is to sum up each date to provide a number that describes total volume of "activity" for each ID. The "ID" is formatted as id:twitter.com:923842.

Was it helpful?

Solution

I don't know the full extent of your processing, what pipelines you have going on, what stress your server is under or what sort of threading profile you've set up to receive the information. However, assuming that you've correctly separated I/O from CPU bound tasks and what you've shown us is strictly CPU bound try simply adding .par to the very first Map.

people.data.values.par.map(b =>

as a first pass to see if you can get some performance gains. I don't see any specific ordering required of the processing which tells me it's ripe for parallelization.

Edit

After playing around with parallelization, I would add that modifying the TaskSupport is helpful for this case. You can modify a parallelized collection's tasksupport as such:

import scala.collection.parallel._
val pc = mutable.ParArray(1, 2, 3)
pc.tasksupport = new ForkJoinTaskSupport(
  new scala.concurrent.forkjoin.ForkJoinPool(2))

See http://www.scala-lang.org/api/2.10.3/index.html#scala.collection.parallel.TaskSupport

OTHER TIPS

I have some suggestions that might help.

  1. I would try to move the filter command as early in the program as possible. Since your data contains many dates with 0 activity you would see improvements doing this. The best solution might be to test for this while parsing the json data. If this is not possible make it the first statement.

  2. The way I understand it you would like to end up with a way to look up a aggregate of the sums for a given id. I would suggest you represent this with a map from the id to the aggregate. Also the scala List class has a sum function. I came up with this code:

    val originalList_IdToAggregate = people.data.values.map(p=> (p._2._1, p._2._2.sum) );

    It might not match your project directly, but I think it is almost what you need. If you need to make a map of this you just append toMap to the end.

  3. If this doesn't give you enough speed you could create your own parser that aggregates and filters while parsing only this kind of json. Writing parsers is quite easy in scala if you are using the parser combinators. Just keep in mind to throw away what you don't need as early as possible and not to make too many deep branches this should be a fast solution with a low memory footprint.

  4. As for going parallel this can be a good idea. I don't know enough about your application to tell you what is the best way, but it might be possible to hide the computational cost of processing the data under the cost of transporting the data. Try to balance parsing and io over multiple threads and see if you can achieve this.

Licensed under: CC-BY-SA with attribution
Not affiliated with StackOverflow
scroll top