Engineering
AB Testing 101 for Engineers
What I wish I knew about AB testing when I started my career
Learn more
This is the second post in a three part series on our migration from Airflow to our own custom DAG orchestration system. To find more context regarding how we use Airflow, the challenges we faced in using it, and the process we followed which led us to decide to write our own replacement, check out this post.
As we set out to build a replacement for Airflow, capable of handling 50,000 concurrent experiments running on the Eppo platformed, we IDed a number of a requirements:
We envisioned a solution built on top of queues which takes inspiration from Airflow’s scheduler and workers, yet also takes advantage of the fact that we do not need the flexibility of computation environments that Airflow provides. We embrace the monolith at Eppo. We also push as much computation to the data warehouse as possible, which reduces the need to support computational environments other than our NodeJS monolith.
At the heart of every scalable orchestration system is some sort of queue for maintaining what tasks need to be executed. We decided to use Redis to store the contents of our queue as it is blazing fast and battle tested. We interface with Redis via the BullMQ library, which has a great set of APIs for using queues in Redis.
The object stored in the queue is a simple ID which workers then use to query our Postgres instance to retrieve additional metadata about the task to be run (i.e. which code to run, and which inputs to use).
We ran some tests and found that with BullMQ we could easily handle 50,000 tasks in parallel, which satisfied our concurrency requirements.
The Paddle daemon is a single long-running process which is responsible for:
The daemon currently runs as a single instance. It could be run in parallel with minimal work if we need to in the future.
One major pain point we experienced with Airflow was performance of the Scheduler, which has similar responsibilities to Paddle’s daemon. Because of this, we wanted to remove as much processing/responsibility from the daemon as we could. For example, in Airflow the scheduler is responsible for setting a task’s state to SUCCESS
and initiating downstream tasks. This can be very computationally intensive with a large number of DAGs/tasks. Paddle designates this responsibility to the workers.
Delegating/distributing work from the daemon to the workers helps us avoid the performance issues we experienced with Airflow’s scheduler process. This was possible in large part due to the fact that we only needed to support tasks in our NodeJS monolith, so we could reuse the code to update task status and traverse the computational graph within the workers. Redis was also a crucial piece for us, as it makes distributed atomic updates easy.
Workers are responsible for task processing, which involves:
Workers are stateless, and can be horizontally scaled to respond to queue depth/task volume.
Airflow is complex. That complexity is perfectly exemplified by its data model. Despite having over a dozen engineers on the team, operation of Airflow at Eppo usually fell on one of the few engineers who had enough of an understanding to operate it.
In an effort to reduce the operational burden of our new orchestration system, we wanted to boil our data model down as much as possible. We ended up with a simple schema of three tables which cleanly represents what is essentially a set of graphs:
DAGs are defined in code using Typescript. We have a set of base classes which make defining new tasks and DAGs easier:
Due to its scalability, reliability, maturity and availability of documentation, Kubernetes was a perfect choice for Paddle. In our usage of Airflow, we primarily used the GKEStartPodOperator
which runs tasks in a Kubernetes cluster in GCP, so we already had the operational experience necessary. We ran into some interesting challenges with versioning and deployments, which I’ll cover in more detail later in this post.
One of the joys of migrating from Airflow to your own orchestration system is implementing features in Airflow which you very likely took for granted. Some turned out to be easier than expected, others not so much.
We need the following properties when it comes to scheduling:
This turned out to be a bit tricker than we anticipated to get right.
One approach we considered for scheduling was to populate the dag_runs
table with instances which are pending/should be run in the future. Since the set of DAGs we run can change frequenly, we wanted to avoid having to perform cleanup whenever a DAG should no longer be run, or should be run at a different time. Instead we opted to only create dag_runs
instances when a DAG run is started, and not introduce additional persisted state for pending DAGs.
For determining whether or not a DAG should be started, the scheduler daemon process can either be forward or backward looking. Forward looking would involve determining the next execution time, and queueing a delayed task (using something like setTimeout
) to run at that time. Forward looking can prevent over scheduling, but it could lead to dropped executions if the daemon process is down when a DAG is to be run, can happen during a deployment. We opted to use backward looking scheduling:
Backward looking scheduling avoids dropped executions, but runs the risk of over scheduling. To prevent dropped executions, we introduced a grace period within which a scheduled DAG can be started. For daily schedules, we have this set to 15 minutes.
We ended up with a fairly flexible system which allows for static and dynamic DAG scheduling, defined in code.
Since DAGs can be defined to run only on-demand/without a schedule, defining schedules for a DAG is done by extending a separate base class. Schedules are evaluated periodically by the scheduler, and can be based on a statically defined/hard-coded schedule or pulled from our database which contains values modifiable by our end users:
In Airflow, the GKEStartPodOperator
creates a new Kubernetes Pod
for each task execution, which runs until the task completes, then is terminated. One major benefit to this approach is the pod executes until completion, and is unaffected by any deployments which occur during execution. One major downside to this approach is the overhead associated with spinning up a new Pod for each task, which can add up especially if a DAG is composed of many distinct nodes/tasks.
We needed a solution which both allows workers to continue processing until a task is completed, and does not incur the overhead of starting a new pod. We then took a look at the various options we had for running our workers in Kubernetes.
Kubernetes has a variety of workload resources to choose from, with each designed for specific types of applications/workloads:
For the daemon, we opted to use a Deployment
. The daemon is stateless, needing one (and only one) instance to be running at any point in time. We could run multiple daemons if we needed to in the future, but we were confident a single instance would be sufficient.
For the workers, again we wanted to avoid the unnecessary overhead of creating a new pod and booting our application for each task. We also wanted to avoid startup costs for every task run, so we decided to run our workers as long-running processes which could process multiple jobs at once. We also need to ensure that workers processing a task continue until completion, even while a new version is deployed. This left us with a few options to choose from:
Job
Job
with the latest version. Old versions would continue to process until completion.Job
instances have a parallelism
as well as a completions
property. We originally tried to modify these two values for autoscaling workers, but the completions
property is immutable, which would prevent us from being able to scale up a set of workers, then gradually taper off instances after a new version has been deployed.CronJob
CronJob
would help with regularly scheduled tasks/pipelines, we also need to be able to start DAG runs on demand so this doesn’t add much value over using a Job
.Deployment
Deployment
configuration with that of the new versionreplicas
propertyterminationGracePeriodSeconds
to a number larger than our expected longest running task, which will ensure Kubernetes waits for terminationGracePeriodSeconds
seconds after sending a SIGTERM
(which it sends when a new version of a Deployment
is created/released) before sending a SIGKILL
, which will cause the worker to forcefully exit.After prototyping with each Job
and Deployment
, the latter was a clear choice for our workers.
We did have to do a bit of work to ensure signals were being passed through to our application code so we could then stop picking up new jobs off of the queue. We also had to iterate a bit on where to handle the signals, as we had to ensure only the Redis queue connection was halted but all other connections (like Postgres) remained active.
We ended up with a solution which allows for any given worker version to continue processing until it finishes processing any job(s) it had started, while also eliminating Kubernetes Pod
and application initialization costs for each task.
Handling retries is one of the responsibilities of the daemon process. The daemon looks at each of the tasks which have recently failed, and initiates a new attempt if the task has attempts remaining (which is a property configurable on the task level).
Since workers are responsible for setting their own status upon completion, we do experience cases where a task’s status is not accurate. For example, if Kubernetes kills a worker pod due to excessive resource utilization, the task(s) run on that worker will remain in a RUNNING
state despite being killed.
Slaying zombie tasks, or reconciling task status, is another responsibility of the daemon. The daemon looks at all RUNNING
tasks, and ensures those tasks are indeed running by checking their status in BullMQ. If a task is failed in BullMQ, the task is marked as failed in Paddle’s database, and the retry mechanism will later retry the task if it has any attempts remaining.
We found that the resource (CPU and RAM) requirements of each task varied quite a bit. One solution would be to increase the resources of our workers to accommodate our most resource hungry task. This is wasteful, so we decided to reduce the amount of over provisioning of our workers by using workers with different resource profiles, similar to the varying resource options of VMs in AWS or GCP.
We started with two separate worker variants, one with “default” requirements, and another with higher memory. We then set up two separate BullMQ queues, added a field in dag_task_runs
indicating which queue a task should be added to, and created a new worker Kubernetes Deployment
which has higher resources.requests.memory
and resources.limits.memory
values, and only processes tasks from a single queue.
We also used this as an opportunity to create a Terraform module which makes it very easy to add workers for a new queue. One only needs to create a single resource, defining its queue name and CPU/memory request/limit values:
Our workloads vary significantly throughout the day, as customers are able to control the timing of (most of) their pipelines. Instead of running enough workers to accommodate the periods of highest compute demand, we wanted to adjust our worker/compute capacity dynamically to match demand in order to save costs.
With Airflow on Google Cloud Composer, we found the autoscaling to be mediocre at best. We would occasionally see dozens of Kubernetes instances which were not scaling down due to stuff like this hanging around, sometimes for weeks at a time:
Since we run our workers in a Kubernetes Deployment
, which has a robust set of libraries/APIs, this was relatively straightforward. We use the official Kubernetes library to patch the replicas
in our Deployment
with the new desired number of workers. The desired number of workers is calculated as a function of queue depth, with some smoothing applied.
We certainly could have used a HorizontalPodAutoscaler, and still might in the future. We liked the idea of having more control over autoscaling which allows us to use Eppo’s Feature Flagging capabilities to control autoscaling behavior without a deploy, so we opted to keep the autoscaling logic in our daemon for now.
Our pretty naive autoscaling has worked out quite well for us, even if it is a bit overeager at times. This chart shows Kubernetes requested memory, and the number of VMs in our Kubernetes cluster:
Since most of our workloads are on known schedules defined by users, it is certainly possible for us to anticipate spikes in processing demand and scale workers accordingly.
When a task completes, our default behavior was to run any tasks downstream of that task if and only if all tasks upstream of each downstream task have completed. This covered the most common cases when defining DAGs, but we had some common use cases where we want to conditionally run one set of logic or another, and do so without having to write custom application code each time. Rather, we wanted Paddle to be able to handle the conditional triggers. We named these Trigger Rules, which a concept we borrowed from Airflow.
One example of this is to send an error report if any upstream jobs have failed. This logic could be in the tasks themselves, but it would likely be messy and duplicative.
Since our Airflow setup executed tasks entirely in their own pods, and since Airflow streamed logs to its UI, viewing logs for a single task was easy. Since we now run multiple tasks during the lifecycle of a given pod, and in the future multiple tasks at once in a single pod, we needed a new way to view logs for a given task.
We were able to achieve this by using AsyncLocalStorage to store dag_task_runs
metadata upon starting a new task, then creating a new Logger which adds this metadata as context to each log message. We then generate a link containing the property filters to GCP’s logging interface to filter messages based on this metadata to view logs of any given task run:
Once Paddle was ready to start processing workloads in production, we used Eppo’s Feature Flagging to incrementally switch customers over from Airflow to Paddle. We rolled out over the course of about a month, fixing a few minor issues along the way. After migrating all remaining pipelines from Airflow to Paddle, we ceremoniously deleted our Airflow/Composer environment entirely.
To see how our migration from Airflow to Paddle affected our GCP costs, operational overhead, customer experience, developer experience, and support experience, see this blog post outlining all of the details.