Week 9: Data Pipeline Orchestration

DSAN 5500: Data Structures, Objects, and Algorithms in Python

Class Sessions
Author
Affiliation

Jeff Jacobs

Published

Monday, March 18, 2024

Open slides in new window →

The Necessary Buzzwords

(Underlined words link to “Concepts” section of Prefect’s docs)

  • Flow: The “main thing” your pipeline is doing!
    • Except in simple cases, will consist of multiple Tasks
  • Flows and Tasks alone already provide much more functionality than “basic” functions…
  • Deployments: Flows + Tasks + Metadata about how and when you want them to run.
    • Prefect docs: “Deployments elevate workflows from [functions that you call manually] to [API-managed entities].”

Deployments \(\Rightarrow\) Run Flows Programmatically

The Power of Deployments

  • “Packaging” code as Deployments enables Triggers:
    • On a particular Schedules: every 4 hours, every day at noon, once per week, etc.
    • When important Events happen: pushes to GitHub, addition, removal, modification of files in Dropbox, etc.
  • Logging, Notifications (Slack, email, text messages)
  • Results as natural-language explanations (produced by Prefect) or custom summaries, called Artifacts, that you define as part of your flows

Schedules

  • Cron: Full-on scheduling language (used by computers since 1975!)
crontab.sh
# ┌───────────── minute (0–59)
# │ ┌───────────── hour (0–23)
# │ │ ┌───────────── day of the month (1–31)
# │ │ │ ┌───────────── month (1–12)
# │ │ │ │ ┌───────────── day of the week (0–6) (Sunday to Saturday)
# │ │ │ │ │
# │ │ │ │ │
# │ │ │ │ │
# * * * * * <command to execute>
my_interval.yml
schedule:
  interval: 600
  timezone: America/Chicago
my_rrule.yml
schedule:
  rrule: 'FREQ=WEEKLY;BYDAY=MO,WE,FR;UNTIL=20240730T040000Z'

Events

  • These integrations are nice, but in reality usually overkill: you can just use Webhooks

Logging

  • For most non-advanced use cases: literally just put log_prints=True as a parameters of your Flow:
flow_with_logging.py
from prefect import task, flow

@task
def my_task():
    print("we're logging print statements from a task")

@flow(log_prints=True)
def my_flow():
    print("we're logging print statements from a flow")
    my_task()

Notifications

  • Actually immensely powerful, because it uses a templating engine called Jinja which is VERY worth learning!
  • With your brain in pipeline mode, think of Jinja as the [?] in:

Jinja Example

homepage.jinja
<h3>{{ me['name'] }}'s Favorite Hobbies</h3>
<ul>
{%- for hobby in hobbies %}
  <li>{{ hobby }}</li>
{%- endfor %}
</ul>
+
render_jinja.py
from jinja2 import Template
tmpl = Template('homepage.jinja')
tmpl.render(
    me = {'name': 'Jeff'},
    hobbies = [
        "sleeping",
        "jetski",
        "getting sturdy"
    ]
)


rendered.html
<h3>Jeff's Favorite Hobbies</h3>
<ul>
  <li>sleeping</li>
  <li>jetski</li>
  <li>getting sturdy</li>
</ul>

\(\leadsto\)

Jeff's Favorite Hobbies

  • sleeping
  • jetski
  • getting sturdy

Lab Time!