Automating Data Pipelines with Maeestro

Author

Kylie Ainslie

Published

November 28, 2024

Introduction

Having just successfully scheduled my first data pipeline to run automatically with maestro and cron made me realise that I hadn’t found a good tutorial for the whole process. It was a bit of a learning curve, but once I got everything set up in the correct way, it actually seemed pretty straightforward (always the case in hindsight, right?).

Where I struggled on previous attempts at automation was that I misunderstood how the automation occurrs. It wasn’t until I read the last sentence of the Maestro Quick Start Guide that I understood where I had gone wrong:

“Importantly, it isn’t maestro’s job to actually run [the pipeline] this often - it’s your job to make sure [the pipeline] runs at that frequency (e.g., deploying it via cron or some cloud environment where code can be scheduled).”

I originally thought that maestro not only orchestrated the pipelines, but also ran them at the specified schedule. This is not the case. So, for all those who are also struggling, here is the process I used to automate my pipeline. I’m sure a better tutorial exists, but because I couldn’t find it in a 30 second google, I decided to write one.

The break-down

Maestro is an R package designed to organize and orchestrate complex pipelines. It handles tasks like dependencies, scheduling logic, and defining pipeline steps, but maestro does not manage when the pipeline executes. This is where cron (or other schedulers) comes into play.

cron is a Unix-based utility for task scheduling. It allows you to specify when a task (like running a maestro pipeline) should execute. cron periodically triggers the execution of a script, ensuring the pipeline runs at the desired frequency. cronis not the only task scheduler, there are a lot. While doing a quick search to check for other task schedulers, I just discovered there are two R packages cronR (for Linux/macOS) and taskscheduleR (for Windows) that are designed to make this process easier. Wish I’d known about them a few hours ago. Oh well!

So essentially, maestro using a YAML file ensures tasks within the pipeline are executed in the correct order and retries failed tasks as needed. Whereas, cron periodically runs a shell script that triggers the maestro pipeline. More on shell scripts in a bit!

My set-up

I have my data pipeline and orchestration files inside an R package that I created, so the set-up might be slightly different that stand alone scripts in a directory. Normally, I would link to the git repo, so that anyone looking at this can see all my code and my set-up, but in this case, it’s part of some consulting work that is private.

Steps for Automating the Pipeline

1. Create a YAML file

The first step is to create a YAML file that defines the schedule, tasks, and notifications for the pipeline. Here’s an example:

name: my_data_pipeline
schedule:
  type: cron
  cron: "0 0 1,15 * *"  # Runs on the 1st and 15th of every month at midnight
start_time: "2024-12-01 00:00:00"
task:
  script: Rscript -e 'my_package::my_data_pipeline()'
  working_dir: ~/Documents/my_package
  retry_policy:
    retries: 3
    delay: 5m
notifications:
  on_success:
    - type: email
      to: my.email@gmail.com
  on_failure:
    - type: email
      to: my.email@gmail.com

The name field should be a descriptive name for your pipeline. The schedule field specifies when and how often the task should run, and using what scheduling mechanism (here, type: cron). start_time specifies the first time the pipeline is allowed to run, (here, December 1, 2024 at 12:00 AM). The task field defines the details of the task to execute. Specifically, script specifies the command to run the pipeline, working_dir specifies the directory to execute the script from, retry_policy specifies how retries are handled if the task fails, retries denotes the maximum number of retries (here, 3), and delay specifies the time delay between retries (here, 5 minutes). The notifications field configures notifications for task outcomes. Here, we configure maestro to send an email when the task succeeds (on_success) and when the task fails (on_failure).

2. Wrap your pipeline into a function

Ensure that your pipeline is encapsulated within a function, such as my_package::my_data_pipeline(), so that it can be executed with the appropriate arguments. There’s a good explanation of how to do this in the maestro README. Below is an example I took from the above README.

#' Example ETL pipeline
#' @maestroFrequency 1 day
#' @maestroStartTime 2024-03-25 12:30:00
my_etl <- function() {
  
  # Pretend we're getting data from a source
  message("Get data")
  extracted <- mtcars
  
  # Transform
  message("Transforming")
  transformed <- extracted |> 
    dplyr::mutate(hp_deviation = hp - mean(hp))
  
  # Load - write to a location
  message("Writing")
  write.csv(transformed, file = paste0("transformed_mtcars_", Sys.Date(), ".csv"))
}

All of the pipeline tasks are wrapped in a function called my_etl(), which has some maestro-specific roxygen-style comments, namely @maestroFrequency and @maestroStartTimewhich indicate how frequently the pipeline should run and when it should strat, respectively. For more details, see here.

3. Automate the pipeline using cron

Next, you’ll need to create a Shell script and set up cron to run your pipeline at the specified frequencies.

Step 3.1: Create a .sh file

Create a .sh file by first opening a text editor using the Mac terminal. I used the nano editor.

nano my_script.sh

Add the text to the .sh file to execute the pipeline using Rscript:

#!/bin/bash
Rscript -e "system('maestro run /path/to/maestro.yaml')"

If editing the text to the .sh file directly in the terminal with nano:

  • Press CTRL + O to save. (the letter “O”, not zero)
  • Press Enter to confirm the file name.
  • Press CTRL + X to exit.

Step 3.2: Make the .sh file executable

Run the following command to make the script executable:

chmod +x /path/to/run_pipeline.sh

Step 3.3: Schedule the pipeline with cron

Open your crontab configuration using crontab -e in the terminal and adding a line to schedule the pipeline to run at the desired frequency. For example, to run it daily at midnight:

0 0 * * * /path/to/run_pipeline.sh

The cron schedule is defined by five fields followed by the command to execute:

* * * * * command_to_run
- - - - -
| | | | |
| | | | +--- Day of the week (0 - 7, where both 0 and 7 represent Sunday)
| | | +----- Month (1 - 12)
| | +------- Day of the month (1 - 31)
| +--------- Hour (0 - 23)
+----------- Minute (0 - 59)

The different fields are defined as: 1. Minute: Specifies the minute of the hour (0-59). 2. Hour: Specifies the hour of the day (0-23, 24-hour clock). 3. Day of the Month: Specifies the day of the month (1-31). 4. Month: Specifies the month (1-12, or names like jan, feb). 5. Day of the Week: Specifies the day of the week (0-7, where both 0 and 7 represent Sunday).

Special characters can be used to indicated: - *: Wildcard that means “every” (e.g., every minute, every hour). - ,: List separator (e.g., 1,15 means the 1st and 15th). - -: Range (e.g., 1-5 means days 1 through 5). - /: Step values (e.g., */5 means every 5 units, like every 5 minutes).

Here are some examples:

  • 30 14 * * 1-5 /path/to/script.sh: Runs at 2:30 PM, Monday through Friday.
  • 0 0 1,15 * * /path/to/script.sh: Runs at midnight on the 1st and 15th of the month.
  • */10 * * * * /path/to/script.sh: Runs every 10 minutes.
  • 0 8-18/2 * * 1-5 /path/to/script.sh: Runs every 2 hours between 8 AM and 6 PM, Monday to Friday.

Step 3.4: Verify the cron job

Once you’ve configured your chrontab, you can check that the cron job has been scheduled successfully by running:

crontab -l

4. Run the pipeline manually

Before waiting for the first scheduled runtime, test that everything is set up correctly by manually running the .sh script in the terminal:

/path/to/run_pipeline.sh 

5. Wait for the scheduled runtime

Once everything is working and verified, you can sit back and wait for the first official scheduled runtime. The pipeline will run automatically based on the cron schedule you’ve defined!

Conclusion

That’s it! After setting up the YAML file, wrapping the pipeline in a function, creating the .sh file, and scheduling the job with cron, your pipeline should run automatically at the intervals you’ve defined. Remember, maestro handles the pipeline’s internal orchestration, but cron or another scheduling system is what ensures the pipeline is triggered at the right frequency.