Case Study: Improving Grid's ETL
Realizing an 80% reduction in BigQuery costs
Nate Robinson | October 9, 2023Introduction
Grid is a financial services company founded in 2019 in California that empowers their clients with tools for modern financial freedom. They offer a suite of products including cashback credit cards, credit building, cash advances, and employment insurance. The company is largely composed of a dynamic engineering team that is steadily adding in-demand products to their suite, and an operations team that actively runs a highly data-driven business. This case study is an in-depth and technical summary of the work that Snowpack Data did with Grid to generate cost savings in BigQuery related to their ETL and empower Grid to leverage large amounts of data effectively.
Problem
The problems of running a fast moving data-driven business often lie in prioritization, and early at Grid, access to near real-time data for business operations and decision making resulted in a robust internal ETL ¹ In a traditional context, ETL stands for Extract, Transform, Load. Modern data stacks typically follow more of an Extract, Load, Transform path. And in reality, this process at Grid was only an extraction, and subsequent load. For consistency we’ll continue to refer to this process as ETL, but understand that we are only referring to the extraction of data from source and loading into a BigQuery warehouse. system to move data from application database to a data warehouse. At the time of inception the cost of BigQuery was low and well worth the leverage that low-latency data provided. But, as Grid grew the costs associated with using BigQuery began to scale exponentially, such that the opportunity cost of access to production data no longer matched the true cost of maintaining the data pipeline in BigQuery. Grid got in contact with Snowpack seeking to reduce their BigQuery costs, both in an absolute manner as a step-change, and so that it scaled linearly with the growing business.
Solution
The root causes of cost spikes were mostly known to Grid, and were driven by early engineering decisions during the ETL build. The Grid team shared with Snowpack that the cost spikes were largely attributed to a few specific problems:
Problems
- A custom ETL service inefficiently leveraged Federated Queries to perform dozens of interactions with BigQuery target data before inserting source records into BigQuery.
- Users queried raw ETL tables via views that deduped data, but ultimately incurred full table scans to the underlying raw datasets. These datasets are often a factor of 10x larger than the ultimate dataset due to the immutable nature of the models.
- Non-Partitioned datasets, and improperly partitioned datasets mean that unnecessary and stale data could not be filtered from upstream queries. The result is that just about any analytical query generated a full table scan to raw data.
- Due to the behavior of incrementing created_at fields in the immutable model, the partition of an updated record was unknown in the target table, resulting in prohibitive costs in a transformation layer and the inability to do merge inserts.
These problems meant that query costs would continue to scale non-linearly with data size, and in a few short months an uncomfortable BigQuery bill would become untenable. Ultimately the data sat in a tricky medium between streaming and batching, both of which are thoroughly documented problems, but access to low latency data without introducing streaming is a challenge that can vary greatly between users. There were also a number of constraints that we had to consider with our solution:
Constraints
- We needed to maintain the status quo for data access, employees required access to the same data, at the same low-latency as they were accustomed to.
- To remain consistent with Grid’s approach to cloud native-devops the ETL must be responsive to frequent changes in source data, and remain self-healing.
- Grid does not yet have a dedicated data engineering team, so ETL updates had to easily fit in the current workflow of software engineers contributing to the product.
- Grid has concerns about product lock-in, and if possible would like to maintain the ETL process within their existing ecosystem before considering 3rd party services or passing through one-way doors with existing Google products.
Build vs Buy
The first step towards solution after identifying the problems was assessing the build vs buy equation. On the buy side of the equation, the concept of an ETL from CloudSQL to BigQuery is not new, from utilizing change data capture with Debezium, open source options like Airbyte or fully managed services like FiveTran, this problem is very common and has many solutions. Unfortunately our version of CloudSQL did not grant us access to the binlog/cdc so Debezium was a nonstarter. Snowpack does not generally recommend FiveTran for data if you can manage it, as the cost scales linearly with data size, and can become exorbitant quickly. Airbyte offers an open source solution to host your own ETL and is industry standard, Snowpack has set up Airbyte for multiple of our clients in the past, however the overhead of hosting a new service that none of the Grid engineers were familiar with was a risk. Finally, Google offers some managed services such as Dataflow and Datastream that solve this exact problem, but we had a few issues, the merge operations were prohibitively expensive and would require changes to the application databases, which introduced additional risks to Grid’s clients.
On the build side of the equation was the fact that a relatively robust ETL already existed at Grid. The ETL, despite the cost, managed to move data quickly and efficiently into BigQuery, was self-healing, well understood by the Grid software engineers, and flexible to source changes and updates. Only minor updates would need to be made to improve the process and maintain parity with the existing system while reducing cost, with an added plus of not introducing any new software tools/vendors.
After weighing the pros and cons we agreed that we would proceed with improvements to the existing ETL service. We predicted that we could reduce the cost of the ETL service by ~95% from its existing costs with no changes to user experience.
Stage I: Improvements to data extraction and loading
How did things work before?
To understand the underlying changes to the ETL service we must first understand what the ETL service was doing in the first place. The service sat atop Grid’s existing engineering architecture, a Go application utilizing the concurrency native to Go, and the asynchronous nature of pub/sub messaging to coordinate data load jobs. The process steps were:
- Each timestep, a cronjob published pub/sub messages for every table in the source databases with instructions on how to retrieve data, and a “cured” timestamp that indicated the “up to” point of data that could be ingested.
- A go routine picks up a message and creates a bulk insert job using the BigQuery API to issue a Federated Query to the source database to retrieve data from the last update point, up to the current timestamp, and insert that data into the corresponding BigQuery raw table.
- The go routine waits for a confirmation message from BigQuery and acks the pub/sub message, or logs an error and exits.
The elegance of the process is that hundreds of backfill operations could be run in parallel on the existing Grid infrastructure and leverage BigQuery’s existing API. The flow of data could be monitored from the pub/sub queue and throughput tuned by adjusting the concurrency and publish limit.
The problems lie in a misunderstanding of BigQuery’s usage costs compared to CloudSQL. For design simplicity the existing system only issued queries via BigQuery federated queries, including procedural calls necessary for operation. These procedural calls were run in interactive mode to increase speed, which used up available slots for analytical use. These procedural queries fell into three buckets:
- Schema match for the source and target tables
- Record count of the target table
- A max timestamp for all cursor ² Cursor columns are the term we use for columns to impute what data had already been inserted, these are generally timestamp columns such as updated at or archived at. columns in the target table
The third call specifically was the most troublesome. Grid didn’t want to insert the whole history of the source table at each timestep and so needed to find what the latest record inserted was from the last insert. This was done by finding the max cursor value in the target table to generate an insert statement between the latest cursor value from our previous job, and the up-to timestamp in our new job. Because the target table was not partitioned, retrieving the max of each cursor column generated a full table scan across the target table; some of these tables are billions of rows and terabytes of data. The max timestamp procedural calls had to scan every row in these columns resulting in large costs, and made up ~80% of the cost of the ETL.
How we fixed it
The underpinning infrastructure of the ETL was sound. We sought to eliminate the unnecessary BigQuery procedural calls. To do this we first needed to establish read connections to the CloudSQL databases instead of interacting via federated queries. With connections established we could retrieve the schemas without incurring procedural calls.
The second major decision was to establish a cache for cursor values rather than retrieving them from the target table. By storing this information in a table in our existing CloudSQL database, we know what data we have already inserted in past ETL jobs, and where we can begin the current insert from. This isn’t exactly a novel idea, if you look under the hood of Airbyte you’ll see that they do the same thing.
The cache also opened up a variety of other avenues for improving the ETL process, specifically the ability to manage a state of the ETL job as it progressed. In the past the ETL was playing a continuous game of “catchup”. If something interrupted its operation the data would simply grow stale and fail silently and it would be difficult to recover what happened. With the cache we could now manage states of the ETL, when the pub/sub message was received a cursor object was created indicating the intent to insert a chunk of data into BigQuery. Once the job was created the state would transition to pending, the BigQuery Job ID was stashed, and we could ack the message and move on to the next one, meaning we did not have to wait for BigQuery to respond with a success message while in-process, an important consideration because Bulk Jobs have an SLA of 24 hours, and oftentimes during times of high user load, our ETL would slow down. A separate cleanup job would pass through periodically and retrieve the state of the BigQuery jobs via the API, marking the as success if the data had been inserted and noting the completion time, or marking them as errored if the job failed and logging the error.
These changes unlocked valuable monitoring of the ETL. We can now proactively know if/when/how insert jobs fail, and retry them, as well as understand gaps in our warehouse data. We can also report on average job completion time to optimize our ETL process, and have a continual feed of how recent each table in our BigQuery database is. Finally, if we needed to schedule database downtime in CloudSQL, we could continue to generate a backlog of cursors in the pub/sub queue and hold them until our source database is ready for operations to commence.
Stage II: Improvements to the Transformation Layer
With the extraction and loading portion of the ETL optimized, we turned to the transformation step. The goals of Snowpack were not to begin the process of creating normalized data models and semantic layers yet, but rather to lower the cost-barrier to Grid staff accessing data at its most fundamental layer.
The description of the solution first relies on outlining a few problems. The Grid engineering paradigm uses an
immutable model of storing records in their application database. Each record update will generate a new row with a
new ID, the primary key of the table, and each record will contain a token that can be used to view the change
history of a particular object over time. This can reduce costs in BigQuery because we only need to consider new
updates when building models incrementally. However, one detail of the immutable model implementation led to
difficulties in the partitioning data in BigQuery. When a record is updated, the created_at
and updated_at
fields
are incremented, meaning that to identify the original created_at
timestamp of an object, you would
need to traverse
the record history. Most analytics use cases are predicated on knowing the original timestamp of a record.
We conceptualize the problem as an optimization in the cost of modifying partitions. Data inserted into raw tables in
BigQuery would be partitioned by the _etl_loaded_at
field. In theory to build a smaller table for
analytics of only
up-to-date records, we would simply take that partition and merge it into the domain model. This is done in BigQuery
by doing a merge insert, but because of the incrementing creation timestamp, we could not know what partition a
particular record exists in, and thus need to perform a full table scan for every merge, an expensive operation that
only grows in cost over time
³
For example, if we only had one new update today, say changing the name of a client in the clients table
who signed up yesterday, we cannot know what partition the created_at
timestamp of the previous version of
this record exists in because the created_at
value has been incremented to today. So when we merge this record
into the domain model we need to “search” in each partition for the record, incurring a full table scan.
The first problem to solve is knowing the partition of data that need to update, this was relatively simple, we
updated the ETL to retrieve the original timestamps of records at the time of insert from MySQL, so all records will
now have an original_created_at
timestamp in addition to an incrementing created_at
that
we can use to find the
correct partition. The second relied on our use of dbt and partition pruning. Inspired by a dbt writeup, Snowpack created
a custom merge macro
in dbt that first identified which partitions in the domain table would need to be modified, and provided them as an
incremental predicate to the merge operation. The nuance here is that BigQuery
will not prune partitions in the
target merge table unless they are provided as a static predicate, so the partitions must be identified ahead of
time in a custom macro and stored as a static variable. This is by no means a novel approach and has been pretty
thoroughly documented within the dbt/BigQuery community, but by creating auto-generation scripts that built
incremental models for each table in the ETL using this predicate, we moved the complexity of this process away from
our end users and provide them the output without having to consider the underlying engineering decisions. The code
to do this varies slightly by use case, but roughly ours looked like this in dbt.
-- get_merge_sql.sql {% macro get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%} {% set predicate_override = "" %} -- If we pass a custom_day incremental predicate we run the partition pruning macro {% if incremental_predicates[0] == "custom_day" %} {% set get_limits_query %} -- run a query to the source table to find min and max partitions SELECT coalesce(MIN(first_created_at), current_datetime()) AS min_day, coalesce(MAX(first_created_at), current_datetime()) AS max_day FROM {{ source }} {% endset %} {% set limits = run_query(get_limits_query)[0] %} {% set min_day, max_day = limits[0], limits[1] %} {% set predicate_override %} DBT_INTERNAL_DEST.first_created_at >= '{{ min_day }}' AND DBT_INTERNAL_DEST.first_created_at <= '{{ max_day }}' {% endset %} {% endif %} -- override the predicate behavior with our custom date range from the source table {% set predicates = [predicate_override] if predicate_override else incremental_predicates %} -- standard merge from here {% set merge_sql = dbt.get_merge_sql(target, source, unique_key, dest_columns, predicates) %} {{ return(merge_sql) }} {% endmacro %}
This change greatly reduced the cost of dbt incremental updates. The cost of a dbt update for all domain models reduced in cost by 90% from the previous case before incremental pruning and provide access to lower cost tables that were properly partitioned and clustered to end users
The second nuance we wanted to consider is access to data with minimal delay. Before Grid came along the methodology for retrieving data was through a view built atop every single raw source that deduplicated records by token, only taking the most recent value. This meant that all queries scanned the entire history of a dataset to only retrieve the most recent incarnation of a record. We set up dbt to only build our domain models daily. So each day we updated a version of the domain model that was already deduped and stored as a table, however this was only current to the most recent dbt run. In the past, business users were able to retrieve data as current as the past ETL run, and relied on this low-latency data in some business operations.
To continue to serve those users, we created what we referred to as stream views for each data set. The idea is that we know what ETL load partitions have not already been inserted into the domain model, so we can combine those with the domain model to generate a more current view of data. This is more expensive, and less computationally efficient than querying the domain model, but for certain use cases this process is necessary to access current records.
These stream views add some nuance, for example if you want to query from one steam view, and join it to a non-streaming view, there’s no guarantee that you won’t drop unmatched records on an inner join. But for some of the use cases of Grid, like reviewing transaction data for the past hour, this view model provides an elegant workaround.
Results
The Snowpack team was able to hit our target of reducing the Grid monthly BigQuery bill by 80%. The cost of the extraction and loading was reduced by 99.4%, even after accounting for the cost of the cursor cache database. We were able to generate domain models and stream views that reduced the access to data for consumers, dropping the On-Demand usage costs by ~50%. Finally, Snowpack set up dbt, built an auto-generation script for their base models and began to migrate the BigQuery scheduled queries that made up the transformation layer at Grid which resulted in additional cost savings in BigQuery.
Due to these improvements, Grid has realized cost savings in 2 months that offset the fees associated with our engagement. Furthermore, the updated system will continue to scale at a much slower rate than the current data size at Grid, while enabling employees to contribute to the analytics in a structured manner that will allow their analytics to scale.
Conclusion
- Engineering decisions come with costs that may not be immediately understood. At the time of the original ETL development the associated costs were low due to the small size of the target tables. These costs seemed like a clear win against managed data loading services like Datafusion or Airbyte, but ultimately resulted in runaway costs in the future. We should be aware of the long term viability of short term cost-cutting measures.
- Management of analytics data begins at the source. The inconsistent nature of the created_at column in application databases resulted in additional overhead work and costs to manage the ETL in BigQuery. Snowpack applied a “band-aid” to this problem, but we also provided Grid with a recommendation for updating their application databases to make the behavior of created_at fields consistent across tables.
- The value of dbt extends beyond the database interactivity features. One of the most difficult issues of reducing On-Demand query costs was untangling the web of scheduled queries across BigQuery projects. By pushing users into dbt, managing files via git, and enabling lineage testing we are able to improve the debugging experience for Grid users and document data sources cleanly.
This case study also only covers a relatively high-level approach to the problem and our solution. However the details behind the scenes were enabled by our breadth of experience, ranging from a more traditional background in data engineering and dbt, to the flexibility of the Snowpack team to jump into rewriting an engineering service in Golang, which turned out to be a very rewarding experience. With the infrastructure and foundation set for Grid, we were able to turn to providing much deeper and more valuable analytical insights, and accelerate the contributions of their team to their analytical stack.
Snowpack specializes in helping organizations solve analytical problems. We work with clients to build and deploy a modern data for them that will scale as they grow. For more information, follow our blog, or shoot us an email at [email protected]