#' Example ETL pipeline
#' @maestroFrequency 1 day
#' @maestroStartTime 2024-03-25 12:30:00
<- function() {
my_etl
# Pretend we're getting data from a source
message("Get data")
<- mtcars
extracted
# Transform
message("Transforming")
<- extracted |>
transformed ::mutate(hp_deviation = hp - mean(hp))
dplyr
# Load - write to a location
message("Writing")
write.csv(transformed, file = paste0("transformed_mtcars_", Sys.Date(), ".csv"))
}
Automating Data Pipelines with Maeestro
Introduction
Having just successfully scheduled my first data pipeline to run automatically with maestro
and cron
made me realise that I hadn’t found a good tutorial for the whole process. It was a bit of a learning curve, but once I got everything set up in the correct way, it actually seemed pretty straightforward (always the case in hindsight, right?).
Where I struggled on previous attempts at automation was that I misunderstood how the automation occurrs. It wasn’t until I read the last sentence of the Maestro Quick Start Guide that I understood where I had gone wrong:
“Importantly, it isn’t
maestro
’s job to actually run [the pipeline] this often - it’s your job to make sure [the pipeline] runs at that frequency (e.g., deploying it viacron
or some cloud environment where code can be scheduled).”
I originally thought that maestro
not only orchestrated the pipelines, but also ran them at the specified schedule. This is not the case. So, for all those who are also struggling, here is the process I used to automate my pipeline. I’m sure a better tutorial exists, but because I couldn’t find it in a 30 second google, I decided to write one.
The break-down
Maestro is an R
package designed to organize and orchestrate complex pipelines. It handles tasks like dependencies, scheduling logic, and defining pipeline steps, but maestro
does not manage when the pipeline executes. This is where cron
(or other schedulers) comes into play.
cron
is a Unix-based utility for task scheduling. It allows you to specify when a task (like running a maestro
pipeline) should execute. cron
periodically triggers the execution of a script, ensuring the pipeline runs at the desired frequency. cron
is not the only task scheduler, there are a lot. While doing a quick search to check for other task schedulers, I just discovered there are two R packages cronR
(for Linux/macOS) and taskscheduleR
(for Windows) that are designed to make this process easier. Wish I’d known about them a few hours ago. Oh well!
So essentially, maestro
using a YAML file ensures tasks within the pipeline are executed in the correct order and retries failed tasks as needed. Whereas, cron
periodically runs a shell script that triggers the maestro
pipeline. More on shell scripts in a bit!
My set-up
I have my data pipeline and orchestration files inside an R package that I created, so the set-up might be slightly different that stand alone scripts in a directory. Normally, I would link to the git repo, so that anyone looking at this can see all my code and my set-up, but in this case, it’s part of some consulting work that is private.
Steps for Automating the Pipeline
1. Create a YAML file
The first step is to create a YAML file that defines the schedule, tasks, and notifications for the pipeline. Here’s an example:
name: my_data_pipeline
schedule:
type: cron
cron: "0 0 1,15 * *" # Runs on the 1st and 15th of every month at midnight
start_time: "2024-12-01 00:00:00"
task:
script: Rscript -e 'my_package::my_data_pipeline()'
working_dir: ~/Documents/my_package
retry_policy:
retries: 3
delay: 5m
notifications:
on_success:
- type: email
to: my.email@gmail.com
on_failure:
- type: email
to: my.email@gmail.com
The name
field should be a descriptive name for your pipeline. The schedule
field specifies when and how often the task should run, and using what scheduling mechanism (here, type: cron
). start_time
specifies the first time the pipeline is allowed to run, (here, December 1, 2024 at 12:00 AM). The task
field defines the details of the task to execute. Specifically, script
specifies the command to run the pipeline, working_dir
specifies the directory to execute the script from, retry_policy
specifies how retries are handled if the task fails, retries
denotes the maximum number of retries (here, 3), and delay
specifies the time delay between retries (here, 5 minutes). The notifications
field configures notifications for task outcomes. Here, we configure maestro
to send an email when the task succeeds (on_success
) and when the task fails (on_failure
).
2. Wrap your pipeline into a function
Ensure that your pipeline is encapsulated within a function, such as my_package::my_data_pipeline()
, so that it can be executed with the appropriate arguments. There’s a good explanation of how to do this in the maestro
README. Below is an example I took from the above README.
All of the pipeline tasks are wrapped in a function called my_etl()
, which has some maestro
-specific roxygen
-style comments, namely @maestroFrequency
and @maestroStartTime
which indicate how frequently the pipeline should run and when it should strat, respectively. For more details, see here.
3. Automate the pipeline using cron
Next, you’ll need to create a Shell script and set up cron
to run your pipeline at the specified frequencies.
Step 3.1: Create a .sh
file
Create a .sh
file by first opening a text editor using the Mac terminal. I used the nano
editor.
nano my_script.sh
Add the text to the .sh
file to execute the pipeline using Rscript
:
#!/bin/bash
Rscript -e "system('maestro run /path/to/maestro.yaml')"
If editing the text to the .sh
file directly in the terminal with nano
:
- Press CTRL + O to save. (the letter “O”, not zero)
- Press Enter to confirm the file name.
- Press CTRL + X to exit.
Step 3.2: Make the .sh
file executable
Run the following command to make the script executable:
chmod +x /path/to/run_pipeline.sh
Step 3.3: Schedule the pipeline with cron
Open your crontab configuration using crontab -e
in the terminal and adding a line to schedule the pipeline to run at the desired frequency. For example, to run it daily at midnight:
0 0 * * * /path/to/run_pipeline.sh
The cron
schedule is defined by five fields followed by the command to execute:
* * * * * command_to_run
- - - - -
| | | | |
| | | | +--- Day of the week (0 - 7, where both 0 and 7 represent Sunday)
| | | +----- Month (1 - 12)
| | +------- Day of the month (1 - 31)
| +--------- Hour (0 - 23)
+----------- Minute (0 - 59)
The different fields are defined as: 1. Minute: Specifies the minute of the hour (0-59). 2. Hour: Specifies the hour of the day (0-23, 24-hour clock). 3. Day of the Month: Specifies the day of the month (1-31). 4. Month: Specifies the month (1-12, or names like jan, feb). 5. Day of the Week: Specifies the day of the week (0-7, where both 0 and 7 represent Sunday).
Special characters can be used to indicated: - *
: Wildcard that means “every” (e.g., every minute, every hour). - ,
: List separator (e.g., 1,15 means the 1st and 15th). - -
: Range (e.g., 1-5 means days 1 through 5). - /
: Step values (e.g., */5 means every 5 units, like every 5 minutes).
Here are some examples:
30 14 * * 1-5 /path/to/script.sh
: Runs at 2:30 PM, Monday through Friday.0 0 1,15 * * /path/to/script.sh
: Runs at midnight on the 1st and 15th of the month.*/10 * * * * /path/to/script.sh
: Runs every 10 minutes.0 8-18/2 * * 1-5 /path/to/script.sh
: Runs every 2 hours between 8 AM and 6 PM, Monday to Friday.
Step 3.4: Verify the cron
job
Once you’ve configured your chrontab, you can check that the cron
job has been scheduled successfully by running:
crontab -l
4. Run the pipeline manually
Before waiting for the first scheduled runtime, test that everything is set up correctly by manually running the .sh
script in the terminal:
/path/to/run_pipeline.sh
5. Wait for the scheduled runtime
Once everything is working and verified, you can sit back and wait for the first official scheduled runtime. The pipeline will run automatically based on the cron
schedule you’ve defined!
Conclusion
That’s it! After setting up the YAML file, wrapping the pipeline in a function, creating the .sh file, and scheduling the job with cron, your pipeline should run automatically at the intervals you’ve defined. Remember, maestro
handles the pipeline’s internal orchestration, but cron
or another scheduling system is what ensures the pipeline is triggered at the right frequency.