Engineering
July 22, 2024

Building a Custom DAG Orchestration System for Experimentation

Eric Petzel
Engineer at Eppo and former engineering leader at Replica and Airbnb

This is the second post in a three part series on our migration from Airflow to our own custom DAG orchestration system. To find more context regarding how we use Airflow, the challenges we faced in using it, and the process we followed which led us to decide to write our own replacement, check out this post.

Requirements

As we set out to build a replacement for Airflow, capable of handling 50,000 concurrent experiments running on the Eppo platformed, we IDed a number of a requirements: 

  • Define workflows in code as DAGs
  • Be able to run DAGs on a schedule (cron), as well as on demand
  • Dynamically scale worker capacity based on active workloads
  • Configurable retries and concurrency limits
  • Improved support for DAGs with the same structure, but different inputs
  • Ability to dynamically add, modify, or remove DAGs at runtime without requiring a system restart or significant resource utilization
  • Efficiently support 10,000 concurrent DAG runs
  • Intuitive user interface for both technical and non-technical users
  • Support NodeJS task execution

We envisioned a solution built on top of queues which takes inspiration from Airflow’s scheduler and workers, yet also takes advantage of the fact that we do not need the flexibility of computation environments that Airflow provides. We embrace the monolith at Eppo. We also push as much computation to the data warehouse as possible, which reduces the need to support computational environments other than our NodeJS monolith.

Core Components of Paddle (our system)

Queue

At the heart of every scalable orchestration system is some sort of queue for maintaining what tasks need to be executed. We decided to use Redis to store the contents of our queue as it is blazing fast and battle tested. We interface with Redis via the BullMQ library, which has a great set of APIs for using queues in Redis.

The object stored in the queue is a simple ID which workers then use to query our Postgres instance to retrieve additional metadata about the task to be run (i.e. which code to run, and which inputs to use).

We ran some tests and found that with BullMQ we could easily handle 50,000 tasks in parallel, which satisfied our concurrency requirements.

Daemon

The Paddle daemon is a single long-running process which is responsible for:

  • Starting new DAG runs based on defined schedules
  • Initiating task retries
  • Autoscaling the number of workers
  • Identifying and killing zombie/stuck tasks

The daemon currently runs as a single instance. It could be run in parallel with minimal work if we need to in the future.

One major pain point we experienced with Airflow was performance of the Scheduler, which has similar responsibilities to Paddle’s daemon. Because of this, we wanted to remove as much processing/responsibility from the daemon as we could. For example, in Airflow the scheduler is responsible for setting a task’s state to SUCCESS and initiating downstream tasks. This can be very computationally intensive with a large number of DAGs/tasks. Paddle designates this responsibility to the workers.

Delegating/distributing work from the daemon to the workers helps us avoid the performance issues we experienced with Airflow’s scheduler process. This was possible in large part due to the fact that we only needed to support tasks in our NodeJS monolith, so we could reuse the code to update task status and traverse the computational graph within the workers. Redis was also a crucial piece for us, as it makes distributed atomic updates easy.

Workers

Workers are responsible for task processing, which involves:

  • Retrieving tasks from Redis (via BullMQ)
  • Executing the task
  • Updating the status of the task (success/failure)
  • Starting any downstream tasks which are eligible to be run

Workers are stateless, and can be horizontally scaled to respond to queue depth/task volume.

Data Model

Airflow is complex. That complexity is perfectly exemplified by its data model. Despite having over a dozen engineers on the team, operation of Airflow at Eppo usually fell on one of the few engineers who had enough of an understanding to operate it.

In an effort to reduce the operational burden of our new orchestration system, we wanted to boil our data model down as much as possible. We ended up with a simple schema of three tables which cleanly represents what is essentially a set of graphs:

dag_runs

A single instance of a DAG run

dag_task_runs

A node (task) in the computational graph for a DAG run

dag_task_run_dependencies

An edge (dependency) in the computational graph for a DAG task

Defining DAGs

DAGs are defined in code using Typescript. We have a set of base classes which make defining new tasks and DAGs easier:

// types for task input and output
type ExampleTaskInput = {
  foo?: string;
};
type ExampleTaskOutput = {
  bar?: string;
};

// classes for each task
class AppleTask extends DagTask {
  async process(jobData: ExampleTaskInput, dagContext: DagContext): ExampleTaskOutput {
    console.log('apple task');
    return { bar: `${jobData.foo}` };
  }
}
class BananaTask extends DagTask {
  async process(jobData: ExampleTaskInput, dagContext: DagContext): ExampleTaskOutput {
    console.log('banana task');
    return { bar: `${jobData.foo} banana` };
  }
}
class CucumberTask extends DagTask {
  async process(jobData: ExampleTaskInput, dagContext: DagContext): ExampleTaskOutput {
    console.log('cucumber task');
    return { bar: `${jobData.foo} cucumber` };
  }
}

// class for the DAG
type ExampleDagInput = {
  baz?: string;
};
class ExampleDag extends Dag {
  protected build() {
    /**
     * DAG structure:
     *
     *                - bananaTask1 -
     *              /                 \
     *   appleTask -                   - cucumberTask
     *              \                 /
     *                - bananaTask2 -
     */
    const appleTask = new DagNode('appleTask', AppleTask, { foo: 'apple' });
    const bananaTask1 = new DagNode('bananaTask1', BananaTask, { foo: 'banana' });
    const bananaTask2 = new DagNode('bananaTask2', BananaTask, { foo: 'banana' });
    const cucumberTask = new DagNode('cucumberTask', CucumberTask, { foo: 'cucumber' });

    this.addNode(appleTask);
    this.addNode(bananaTask1, [appleTask]);
    this.addNode(bananaTask2, [appleTask]);
    this.addNode(cucumberTask, [bananaTask1, bananaTask2]);
  }
}

How to run the thing

Due to its scalability, reliability, maturity and availability of documentation, Kubernetes was a perfect choice for Paddle. In our usage of Airflow, we primarily used the GKEStartPodOperator which runs tasks in a Kubernetes cluster in GCP, so we already had the operational experience necessary. We ran into some interesting challenges with versioning and deployments, which I’ll cover in more detail later in this post.

High level architecture

A diagram showing how each of the pieces fit together

Fun problems to solve

One of the joys of migrating from Airflow to your own orchestration system is implementing features in Airflow which you very likely took for granted. Some turned out to be easier than expected, others not so much.

Scheduling

We need the following properties when it comes to scheduling:

  • Cron-based definitions
    • Static (defined in code)
    • Dynamic (modifiable by our end users, changes frequently, stored in Postgres)
  • Respect concurrency limits
    • Some customers have limited computing capacity. One approach to manage this limitation is to distribute pipeline runs over time by setting a maximum concurrency limit. This method is somewhat crude, but it works well enough.
  • Only run within some threshold of the defined schedule
    • If a DAG is created at 7pm which should run at 1am, we should wait until the next schedule to avoid excess executions

This turned out to be a bit tricker than we anticipated to get right.

One approach we considered for scheduling was to populate the dag_runs table with instances which are pending/should be run in the future. Since the set of DAGs we run can change frequenly, we wanted to avoid having to perform cleanup whenever a DAG should no longer be run, or should be run at a different time. Instead we opted to only create dag_runs instances when a DAG run is started, and not introduce additional persisted state for pending DAGs.

For determining whether or not a DAG should be started, the scheduler daemon process can either be forward or backward looking. Forward looking would involve determining the next execution time, and queueing a delayed task (using something like setTimeout) to run at that time. Forward looking can prevent over scheduling, but it could lead to dropped executions if the daemon process is down when a DAG is to be run, can happen during a deployment. We opted to use backward looking scheduling:

const previousSchedule = getPreviousDateFromCronExpression(cronSchedule);

const dagRuns = await this.dagRunService.getDagRuns(dag.name, {
  input: dag.input,
  createdAfterDate: previousSchedule.toJSDate(),
  limit: 1,
});

if (dagRuns.length > 0) {
  return; // DAG has already executed
}

await this.dagService.run(dag);

Backward looking scheduling avoids dropped executions, but runs the risk of over scheduling. To prevent dropped executions, we introduced a grace period within which a scheduled DAG can be started. For daily schedules, we have this set to 15 minutes.

We ended up with a fairly flexible system which allows for static and dynamic DAG scheduling, defined in code.

Since DAGs can be defined to run only on-demand/without a schedule, defining schedules for a DAG is done by extending a separate base class. Schedules are evaluated periodically by the scheduler, and can be based on a statically defined/hard-coded schedule or pulled from our database which contains values modifiable by our end users:

class StaticDagSchedulerExample extends DagScheduler {
  public getDagsWithSchedules(): DagWithSchedule[] {
    return [
      {
        dag: new ExampleDag({ baz: 'hello world' }),
        schedule: '10 * * * *',
      },
    ];
  }
}

class DynamicDagSchedulerExample extends DagScheduler {
  public async getDagsWithSchedules(): Promise[]> {
    const experiments = await fetchExperimentsFromDatabase();
    return experiments.map((experiment) => ({
      dag: new ExampleDag({ baz: experiment.id }),
      schedule: experiment.cronSchedule,
    }));
  }
}

Deployments & Long-Running Tasks

In Airflow, the GKEStartPodOperator creates a new Kubernetes Pod for each task execution, which runs until the task completes, then is terminated. One major benefit to this approach is the pod executes until completion, and is unaffected by any deployments which occur during execution. One major downside to this approach is the overhead associated with spinning up a new Pod for each task, which can add up especially if a DAG is composed of many distinct nodes/tasks.

We needed a solution which both allows workers to continue processing until a task is completed, and does not incur the overhead of starting a new pod. We then took a look at the various options we had for running our workers in Kubernetes.

Kubernetes has a variety of workload resources to choose from, with each designed for specific types of applications/workloads:

For the daemon, we opted to use a Deployment. The daemon is stateless, needing one (and only one) instance to be running at any point in time. We could run multiple daemons if we needed to in the future, but we were confident a single instance would be sufficient.

For the workers, again we wanted to avoid the unnecessary overhead of creating a new pod and booting our application for each task. We also wanted to avoid startup costs for every task run, so we decided to run our workers as long-running processes which could process multiple jobs at once. We also need to ensure that workers processing a task continue until completion, even while a new version is deployed. This left us with a few options to choose from:

  • Job
    • During a deploy, we could create a new Job with the latest version. Old versions would continue to process until completion.
    • Kubernetes Job instances have a parallelism as well as a completions property. We originally tried to modify these two values for autoscaling workers, but the completions property is immutable, which would prevent us from being able to scale up a set of workers, then gradually taper off instances after a new version has been deployed.
  • CronJob
    • While the cron capabilities of CronJob would help with regularly scheduled tasks/pipelines, we also need to be able to start DAG runs on demand so this doesn’t add much value over using a Job.
  • Deployment
    • During a deploy, we could replace a Deployment configuration with that of the new version
    • Scaling of workers can be done by modifying the replicas property
    • We can set the value of terminationGracePeriodSeconds to a number larger than our expected longest running task, which will ensure Kubernetes waits for terminationGracePeriodSeconds seconds after sending a SIGTERM (which it sends when a new version of a Deployment is created/released) before sending a SIGKILL, which will cause the worker to forcefully exit.

After prototyping with each Job and Deployment, the latter was a clear choice for our workers.

We did have to do a bit of work to ensure signals were being passed through to our application code so we could then stop picking up new jobs off of the queue. We also had to iterate a bit on where to handle the signals, as we had to ensure only the Redis queue connection was halted but all other connections (like Postgres) remained active.

We ended up with a solution which allows for any given worker version to continue processing until it finishes processing any job(s) it had started, while also eliminating Kubernetes Pod and application initialization costs for each task.

Retries

Handling retries is one of the responsibilities of the daemon process. The daemon looks at each of the tasks which have recently failed, and initiates a new attempt if the task has attempts remaining (which is a property configurable on the task level).

Zombie Tasks

Since workers are responsible for setting their own status upon completion, we do experience cases where a task’s status is not accurate. For example, if Kubernetes kills a worker pod due to excessive resource utilization, the task(s) run on that worker will remain in a RUNNING state despite being killed.

Slaying zombie tasks, or reconciling task status, is another responsibility of the daemon. The daemon looks at all RUNNING tasks, and ensures those tasks are indeed running by checking their status in BullMQ. If a task is failed in BullMQ, the task is marked as failed in Paddle’s database, and the retry mechanism will later retry the task if it has any attempts remaining.

Tasks with different resource requirements

We found that the resource (CPU and RAM) requirements of each task varied quite a bit. One solution would be to increase the resources of our workers to accommodate our most resource hungry task. This is wasteful, so we decided to reduce the amount of over provisioning of our workers by using workers with different resource profiles, similar to the varying resource options of VMs in AWS or GCP.

We started with two separate worker variants, one with “default” requirements, and another with higher memory. We then set up two separate BullMQ queues, added a field in dag_task_runs indicating which queue a task should be added to, and created a new worker Kubernetes Deployment which has higher resources.requests.memory and resources.limits.memory values, and only processes tasks from a single queue.

We also used this as an opportunity to create a Terraform module which makes it very easy to add workers for a new queue. One only needs to create a single resource, defining its queue name and CPU/memory request/limit values:

module "paddle_worker_high_memory" {
  providers = {
    kubernetes = kubernetes.paddle-k8s-cluster
  }

  source       = "./modules/queue_worker"
  queue_name   = "high-memory"

  container_resources = {
    limits = {
      cpu    = "1"
      memory = "4G"
    }
    requests = {
      cpu    = "200m"
      memory = "1G"
    }
  }
}

Autoscaling Workers

Our workloads vary significantly throughout the day, as customers are able to control the timing of (most of) their pipelines. Instead of running enough workers to accommodate the periods of highest compute demand, we wanted to adjust our worker/compute capacity dynamically to match demand in order to save costs.

With Airflow on Google Cloud Composer, we found the autoscaling to be mediocre at best. We would occasionally see dozens of Kubernetes instances which were not scaling down due to stuff like this hanging around, sometimes for weeks at a time:

Since we run our workers in a Kubernetes Deployment, which has a robust set of libraries/APIs, this was relatively straightforward. We use the official Kubernetes library to patch the replicas in our Deployment with the new desired number of workers. The desired number of workers is calculated as a function of queue depth, with some smoothing applied.

We certainly could have used a HorizontalPodAutoscaler, and still might in the future. We liked the idea of having more control over autoscaling which allows us to use Eppo’s Feature Flagging capabilities to control autoscaling behavior without a deploy, so we opted to keep the autoscaling logic in our daemon for now.

Our pretty naive autoscaling has worked out quite well for us, even if it is a bit overeager at times. This chart shows Kubernetes requested memory, and the number of VMs in our Kubernetes cluster:

Since most of our workloads are on known schedules defined by users, it is certainly possible for us to anticipate spikes in processing demand and scale workers accordingly.

Trigger rules

When a task completes, our default behavior was to run any tasks downstream of that task if and only if all tasks upstream of each downstream task have completed. This covered the most common cases when defining DAGs, but we had some common use cases where we want to conditionally run one set of logic or another, and do so without having to write custom application code each time. Rather, we wanted Paddle to be able to handle the conditional triggers. We named these Trigger Rules, which a concept we borrowed from Airflow.

One example of this is to send an error report if any upstream jobs have failed. This logic could be in the tasks themselves, but it would likely be messy and duplicative.

Logging

Since our Airflow setup executed tasks entirely in their own pods, and since Airflow streamed logs to its UI, viewing logs for a single task was easy. Since we now run multiple tasks during the lifecycle of a given pod, and in the future multiple tasks at once in a single pod, we needed a new way to view logs for a given task.

We were able to achieve this by using AsyncLocalStorage to store dag_task_runs metadata upon starting a new task, then creating a new Logger which adds this metadata as context to each log message. We then generate a link containing the property filters to GCP’s logging interface to filter messages based on this metadata to view logs of any given task run:

Migrating

Once Paddle was ready to start processing workloads in production, we used Eppo’s Feature Flagging to incrementally switch customers over from Airflow to Paddle. We rolled out over the course of about a month, fixing a few minor issues along the way. After migrating all remaining pipelines from Airflow to Paddle, we ceremoniously deleted our Airflow/Composer environment entirely.

Results

To see how our migration from Airflow to Paddle affected our GCP costs, operational overhead, customer experience, developer experience, and support experience, see this blog post outlining all of the details.

Table of contents

Ready for a 360° experimentation platform?
Turn blind launches into trustworthy experiments
See Eppo in Action

Ready to go from knowledge to action?

Talk to our team of experts and see why companies like Twitch, DraftKings, and Perplexity use Eppo to power experimentation for every team.
Get a demo