How do you perform accumulation on large data sets and pass the results as a response to REST API?

softwareengineering.stackexchange https://softwareengineering.stackexchange.com/questions/422487

  •  22-03-2021
  •  | 
  •  

Pergunta

I have around 125 million event records on s3. The s3 bucket structure is: year/month/day/hour/*. Inside each hour directory, we have files for every minute. A typical filename looks like this: yy_mm_dd_hh_min.json.gz Each file contains subscription events in json. The subscription record has the following fields:

  • Time of creation
  • Time of arrival
  • User id
  • Valid_Until
  • And other user data such as: age, gender, country, state, etc. Consider these to be filters.

I was to find and derive the following things from the data based on a given date range:

  1. Opening active subscribers: the number of open subscriptions at the beginning of the day, identical to the "closing active" of the previous day. Of course the "opening active" of the first day is 0. The first day was 01/01/2017.
  2. Acquired Subscription: the number of subscriptions that occurred on the day.
  3. Renewed Subscription: the number of subscriptions that occurred on that day by users who had subscribed before.
  4. Churned Subscription: number of subscriptions that expired on that day.
  5. Closing Active subscribers: Opening active + Acquired subscription + Renewed subscription - Churned Subscription.

This closing active will be the opening active for the next day. So you see there is a dependency here between the days. The calculation of "Closing active" requires the "Opening active".

I was to provide a rest API, which upon receiving a date range, could provide these 5 metrics for each day in the date range, so that we could plot a graph for the same.

Approach 1:

The first approach was to run a batch process on the s3 data and calculate these results for each day and store them on a database. We used mongodb for storage (we tried Cassandra but didn't get too far with it because we lacked expertise and the client wanted the solution very quickly) and pyspark for data processing. Upon the query to the REST API, the API would simply query MongoDB with a date range and get the results. We ran the pyspark job on the entire data on s3 and once finished we would simply monitor new events and add the calculations to mongodb.

Problem with approach 1:

  1. There was a problem with backfilling. We were told to use time of creation for the calculation and sometimes, data that had been created along time ago would arrive late. Since, the late arriving data would impact the closing active of a previous day, the opening active and closing active of every other day after that would get affected. For this, we had a condition in the pyspark code. Every record that would have a difference of more than a day between the time of creation and time of arrival would be dealt by a different function. The function would update the calculations for a day and then update the calculations for every other day after that. The worst case was, we would get backfill data for the first day. Because after updating the calculations for the first day, we would have to update the calculations for every other day upto 02/01/2019.

This approach was painfully slow, but it was all good because things were happening in the background and did not impact the performance of the REST API. The REST API would simply yield correct results when the update would be complete.

  1. FILTERS. Like I mentioned above. We had user data such as age, gender, country, state, etc. We were told to filter these results based on these values. The REST API would now also receive filters along with dates. Now, this might seem like no problem at all at first glance. Simply apply the filters to the results returned by MongoDB. But the problem was the opening and closing active. The closing active would change based on filter and with that the opening and closing active of every other day after that would change. This would mean with every filter combination, we would have to recalculate the whole thing.

So with the introduction of filters, we could no longer store the calculated results on the database, because the calculations would change based on the filter, and that too for the whole data.

Approach 2:

Instead of storing calculations, we decided to store the entire data from s3 (125 million records) into MongoDB (We had to shard mongo). We simply could not store calculated results for each and every filter combination on MongoDB as the filters would keep growing with more user data getting added into the json. So we had to query the data source itself. So we decided to store the data into mongodb and once the data is on mongo we would use first apply the filters and then use aggregate queries to calculate the opening and closing active.

Problem with Approach 2:

Remember calculation of opening and closing active has to happen from the first record. This process took around 4 - 10 minutes in total. Since a REST API cannot wait for that long, this process happened in the background. The results would be stored on REDIS as key value pairs and the front end would periodically keep querying another REST endpoint which would then query Redis for updates and provide the results. This process was a hack, but it wouldn't be accepted. The client wanted the latest data to appear first. The latest data would take the longest to calculate. This meant the client had to wait for 4-10 minutes or the latest correct calculations to appear.

Approach 3:

This approach was to use Pyspark dataframes for MongoDB and calculate the results. We would then do the same thing we did in approach 2. Upload the results asynchronously into Redis, For some reason, my boss thought it would work. Luckily, I never got to try this solution as I left the company.

So obviously, I lack expertise in the domain of big data. I went from building REST APIs to suddenly building these huge data systems which none of us in the company had any idea of. Obviously, I made a lot of bad choices in the design of the system.

I am currently working with Pyspark and Kafka a lot but am still not an expert. I also have never encountered a scenario like this after that company. So I ask the community, what would be the correct approach to building a system to solve a problem like this.

Foi útil?

Solução

Here is the answer I already scetched in the comments:

Let us start without the requirement of filters first: I would recommend not to precalculate and store the "Opening active" and "Closing active" values in the database, only the acquired, renewed and churned values per day, as well as the number of changed subscriptions per day (which is "acquired + renewed - churned"). I assume this is possible just by using the event records of the specific day, nothing else. This makes it possible to do those "backfilling calculations" restricted to the affected days, without having to update the subscription counters of any other days.

"Opening active" and "Closing active" can be calculated "on the fly", whenever someone requests them. Those are the result of the straightforward summation over all "number of changed subscriptions per day", which means this will require to sum up "365 * number of years" days at maximum (so today, over a range from 01/01/2017 to the beginning of 2021 this means less than 1600 partial sums at maximum), which should last miliseconds per request, not minutes. Of course, you can buffer even the results of those calculations in some sequential list and update them only when a backfilling occurs.

Concerning filters, I would recommend to create the database model as a star schema, with dimension tables for age (or age range blocks), gender, country/state, as well as a "date" dimension table with one record per evaluated day. The fact table could again hold the counters from acquired, renewed and churned subscription (broken down to the combination of one age block, one gender, one country or state, and one day). Assumed you need to support 20 age blocks, 3 genders, ~100 countries/states, this will require around 6000 records per day, and for 1600 days this means around 10 million records, which is quite a few, but still less than the 125 million records you mentioned.

Backfilling will still be efficient - updating the db will not lead to any cascading effects, a late arriving event will affect just one age block, one gender, one country and a few number of days, so only a few records of those 10 millions will actually have to be touched.

"On-the-fly" calculation of the opening and closing active values might last longer, heavily depending on how many filters will be active in parallel, of course. You would have to measure if the current strategy is fast enough, and if not, you may decide to create more star schemas for "frequent" filter combinations. For three additional filter dimensions (besides days/date), it should be feasible to create the seven star schemas relating to all combinations where one of the filters is applied, two of the filters are applied, or all three are applied.

Licenciado em: CC-BY-SA com atribuição
scroll top