Welcome again, to part 3 of this article series about data orchestration. In this part, we want to implement our beloved pipeline from part 1 once again, but this time in Dagster. In part 2 we implemented this pipeline in Prefect and could see that although Prefect has some differences from Airflow, the task implementation was quite similar. Dagster has a completely new approach to data orchestration and so we will learn a lot of new concepts. So tune in!
Dagster
Elementl, the company which invented Dagster, was also founded in 2018 and has its headquarters in the bay area of San Fransisco. What does Dagster say about itself?
“The cloud-native orchestrator for the whole development lifecycle, with integrated lineage and observability, a declarative programming model, and best-in-class testability.“
Wow, what a statement. Let’s see whether Dagster can justify this statement or if this is some heavy marketing right there!
Prerequisites
Before diving into Dagster, we have to install dagster and a few dependencies. Open up a terminal and run:
1 2 |
pip install dagster dagit pip install plotly pandas pytest python-dotenv psycopg2 |
Dagster is as simple as installing Prefect. By the way, I use version 1.1.5 for Dagster and we will only focus on the open-source version of Dagster in this experiment.
What I also like about Dagster is how to bootstrap a project structure. To get a default project skeleton, we just have to run
1 |
dagster project scaffold --name franchise-blog |
Your project structure should look similar to what is illustrated on figure 1.
We can ignore most files for now. What is interesting though, is that the project structure resembles a setup that you usually encounter when creating your own Python libraries. The reason is that this is actually a fully functioning Python package! You can try this out by completing the setup.py file:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
from setuptools import find_packages, setup setup( name="franchise_blog", packages=find_packages(exclude=["franchise_blog_tests"]), install_requires=[ "dagster", "pandas", "plotly", "psycopg2", "python-dotenv" ], extras_require={"dev": ["dagit", "pytest"]}, ) |
Afterward, you can install the package via pip:
1 |
pip install -e ".[dev]" |
The flag -e tells pip to automatically apply local code changes.
This approach is quite favorable since every pipeline can be put in isolated packages. Prefect isolated the pipeline code by using Deployments. So both tools make it easy for us to do local development. Anyway, let’s start implementing our pipeline in Dagster!
Resources
The first concept where we want to dig in is resources. Contrary to Prefect and Airflow, Dagster follows an asset-centric paradigm. Thus, Dagster focuses on files, tables, machine learning models, and so forth. These are our assets. This shift in paradigm is very interesting since very often, we deal with technicalities when using data orchestration tools but actually, we should focus more on our assets. We will see at a later point how Dagster handles these technicalities.
First of all, we need to define two resources: a path to our base directory where we will store our assets, and a Postgres API resource. Thus, we create a resource.py file in the franchise_blog directory and insert the following code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
import psycopg2 from dagster import resource, InitResourceContext @resource(config_schema={"host": str, "port": str, "database": str, "user": str, "password": str}) def postgres_api(init_context: InitResourceContext): database_connection = { 'host': init_context.resource_config["host"], 'port': init_context.resource_config["port"], 'database': init_context.resource_config["database"], 'user': init_context.resource_config["user"], 'password': init_context.resource_config["password"] } return psycopg2.connect(**database_connection) @resource(config_schema={"base_dir": str}) def base_dir(init_context: InitResourceContext): return init_context.resource_config["base_dir"] |
We can declare a resource with the resource decorator and define a configuration schema. This schema defines the shape of the resource. Also, a resource accepts a context object as an argument. We do not have to worry about this at the moment. Just know, that these context objects are enriched with metadata and encapsulate important functionalities for configuration. Nonetheless, our Postgres resource is quite simple, we simply fetch the database details and return a connection object. Note that the return value actually represents our resource. We should not forget about our second resource, so we also have to set up the base directory resource.
So our resources are ready to go but we still have to configure them. For that purpose, create a configurations.py file inside of the franchise_blog directory and Copy & Paste the following code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
import os from dagster import ResourceDefinition from dotenv import load_dotenv from franchise_blog.resources import postgres_api, base_dir load_dotenv() def get_configured_postgres_api() -> ResourceDefinition: return postgres_api.configured( { 'host': os.environ["POSTGRES_HOST"], 'port': os.environ["POSTGRES_PORT"], 'database': os.environ["POSTGRES_DB"], 'user': os.environ["POSTGRES_USER"], 'password': os.environ["POSTGRES_PW"] } ) def get_configured_base_dir() -> ResourceDefinition: return base_dir.configured({'base_dir': os.environ["BASE_DIR"]}) |
Essentially, we are storing our resource-related data in an environment file. Therefore, we have to load our environment file and configure our resources. So basically what we have done is separating the resource configuration from the resource declaration. Only when we configure our resources, do we obtain a so-called ResourceDefinition. Configuring the resources is simple, we just have to use the configured method. Do not forget to create an appropriate .env file in the root of the working space which should contain the following content:
1 2 3 4 5 6 7 |
POSTGRES_HOST=<postgres host, e.g. localhost> POSTGRES_PORT=<postgres port, e.g. 5432> POSTGRES_DB=<postgres database> POSTGRES_USER=<postgres user> POSTGRES_PW=<postgres pw> BASE_DIR=./data #you can also choose another directory if you want |
You might raise the suspicion that we have to write too much boilerplate code in Dagster and that we didn’t even start writing out our tasks yet! Well, this is the price to pay for the separation of concerns. Furthermore, we want to manage our resources and assets appropriately and this will pay off in the end since we only have to define them once! Let’s define our first asset!
Asset: postgres ingestion
We will once again create 3 directories: ingestion, plotting, and transformation. Create those inside the assets directory. Afterward, we create our first asset file postgres.py in the ingestion directory. Look at the following code which defines our asset – do you recognize any differences in comparison to Prefect and Airflow?
1 2 3 4 5 6 7 8 9 10 11 12 |
from dagster import asset @asset(required_resource_keys={"postgres_api"}, group_name="franchise", io_manager_key="local_postgres_io_manager") def ingest_store_data_from_psql(context): sql_statement = """ select id, manager, city, street, street_number, revenue, day from stores; """ with context.resources.postgres_api.cursor() as cursor: cursor.execute(sql_statement) result = cursor.fetchall() return {'result': result} |
Well, of course, we use the asset decorator instead of the task decorator. The business logic looks also very similar – but wait, where is the logic that stores our raw data into a CSV file? We will explore this in a minute. But this is the beauty behind Dagster. Our assets focus only on the business logic and don’t care where our data is going or what happens to our data afterward. It is much cleaner this way – but as we will see, we have to pay a price, again! When we take a closer look at our asset decorator, we can see that we pass 3 arguments to it:
- required_resource_keys: A set of resource references that are required by the op. You have to know at this point that an asset constitutes of the following 3 parts: An asset key, a function that computes the content of the asset, and a set of upstream assets that are provided as inputs. The function which computes the asset is basically an op which is the core unit of computation in Dagster.
- group_name: Simply a string that groups assets that are semantically related.
- io_manager_key: An IO manager reference that the asset should use. We will learn in the next section what an IO manager does.
The concept of resource keys and IO manager keys is actually quite useful since this utilizes dependency injection. Thus, we can easily switch out resources and IO manager if we want to use a different one. We will also see that this will be quite handy in testing. Moreover, do not confuse Dagster’s assets with the notion of tasks, the asset in the ingestion step is actually the csv file that contains the result. That asset is what we care about.
Next, we will investigate the concept of IO Manager. The IO manager is the reason why we can separate the business logic from the rather task-oriented logic. So let’s define our own custom IO manager!
IO Manager: LocalPostgresIOManager
Let’s create another directory inside of our asset directory called resources and create a Python file named local_postgres_io_manager.py. Copy & paste the following lines of code into it:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
import os import csv from dagster import ( IOManager, OutputContext, InputContext, InitResourceContext, build_init_resource_context, io_manager ) class LocalPostgresIOManager(IOManager): def __init__(self, target_file: str) -> None: self.target_file = target_file def handle_output(self, context: OutputContext, obj: dict) -> None: os.makedirs(os.path.dirname(self.target_file), exist_ok=True) result = obj["result"] with open(self.target_file, 'w', newline='') as file: target_csv = csv.writer(file) target_csv.writerow(['id', 'manager', 'city', 'street', 'street_number', 'revenue', 'day']) for row in result: target_csv.writerow(row) def load_input(self, context: InputContext) -> str: return self.target_file @io_manager(required_resource_keys={"base_dir"}) def local_postgres_io_manager(init_context: InitResourceContext) -> LocalPostgresIOManager: target_file = os.path.join(init_context.resources.base_dir, "storage/stores.csv") return LocalPostgresIOManager(target_file=target_file) @io_manager(required_resource_keys={"base_dir"}) def postgres_io_manager(init_context: InitResourceContext) -> LocalPostgresIOManager: return local_postgres_io_manager( build_init_resource_context( resources={"base_dir": init_context.resources.base_dir}, ) ) |
Wow, this looks overwhelming but don’t panic, it looks actually worse than it is. But this is the price we have to pay! We outsourced our logic of writing raw data to a CSV file to a dedicated IO Manager. An IO Manager in Dagster has the primary task to manage outputs of assets between multiple in-going and out-going assets. In our case, the IO Manager should just manage the output of our Postgres asset.
In order for our LocalPostgresIOManager to work, we only have to implement two abstract base methods: handle_output and load_input.
- handle_output: Receives the output of our upstream asset and is responsible for handling our output appropriately, e.g. storing it on a local file system. In our case, we are storing the data that we fetched from our Postgres database in our local file system.
- load_input: Responsible for loading the correct object into the downstream asset. In our example, it is just passing the path to the CSV file to the next downstream asset.
This approach increases code complexity but simplifies our assets and decouples dependencies. We are decoupling our asset, the raw data which we query from the Postgres database, from the storage. For instance, imagine we don’t want to store our data on a local file system anymore but on S3, then we could simply switch out our IO manager and our pipeline would be still good to go.
Note, that the output of ingest_store_data_from_psql is a dictionary with one key named result which holds the raw data as a value. The argument obj in the handle_output method will actually hold a reference to this dictionary.
Furthermore, in order to use our IO manager, we have to construct an IO manager definition that returns an instance of our IO manager class. Therefore, we define the function postgres_io_manager which will build a context for us. This context object holds a resource, namely the base_dir resource. This will be given as an argument to the function local_postgres_io_manager. If you do not need to parameterize your IO manager, then you will not need to implement the method postgres_io_manager since we can directly initialize the IO manager in local_postgres_io_manager without further ado.
If this is still too complicated for you, you can leave the IO manager out and code the logic completely into your assets. This will also work fine but the separation will be gone.
Repository
Before coding the other assets, let’s have a look at how we wire up all of our resources, IO managers, and assets. Therefore, create a file repository.py inside the franchise_blog directory. The skeleton for our repository should look like this:
1 2 3 4 5 6 7 8 9 10 |
from dagster import ( load_assets_from_package_module, repository ) @repository def franchise_blog(): return [ load_assets_from_package_module(assets) ] |
The function load_assets_from_package_module will load all assets which are inside the assets directory into this repository. This is how Dagster can determine which assets it has to include in our pipeline. We will see later how we can manage the dependencies between the assets. Moreover, we can already add our scheduling information by modifying our code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
from dagster import ( load_assets_from_package_module, repository, define_asset_job, ScheduleDefinition ) from franchise_blog import assets daily_job = define_asset_job(name="daily_franchise_update", selection="*") daily_schedule = ScheduleDefinition( job=daily_job, cron_schedule="0 7 * * *" ) @repository def franchise_blog(): return [ daily_job, daily_schedule, load_assets_from_package_module(assets) ] |
Next, we should define our resources and IO managers in our repository because our asset ingest_store_data_from_psql requires the IO manager key local_postgres_io_manager. This can be adjusted by a slight change in our code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
from dagster import ( load_assets_from_package_module, repository, define_asset_job, with_resources, ScheduleDefinition ) from franchise_blog.configurations import get_configured_base_dir, get_configured_postgres_api from franchise_blog.assets.resources.local_postgres_io_manager import postgres_io_manager ... @repository def franchise_blog(): return [ daily_job, daily_schedule, with_resources( load_assets_from_package_module(assets), { "postgres_api": get_configured_postgres_api(), "base_dir": get_configured_base_dir(), "local_postgres_io_manager": postgres_io_manager } ) ] |
Dagster really has a rather steep learning curve but once you are familiar with how it works, it really ramps up the experience of creating data pipelines. In the beginning, it can be quite difficult to understand what a repository represents. Think about it in the following way: A repository is a collection of assets, jobs, and whatever we use in our package. This collection represents a unit that is later used by dagit, Dagster’s CLI, or the dagster-daemon.
Note, that the concept of repositories in Dagster is largely deprecated since version 1.1.6. When you are using version 1.1.6 or higher, you should use Dagster’s concept of Definitions.
Asset: revenue per day per manager plot
Once again, create the file series.py inside the plotting directory with the following content:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
import os import pandas as pd import plotly.express as px from dagster import asset @asset(required_resource_keys={"base_dir"}, group_name="franchise") def plot_revenue_per_day_per_manager(context, ingest_store_data_from_psql: str): base_dir = context.resources.base_dir target_file = f"{base_dir}/plots/revenue_per_day_per_manager.html" os.makedirs(os.path.dirname(target_file), exist_ok=True) df = pd.read_csv(ingest_store_data_from_psql) fig = px.line(df, x='day', y='revenue', color='manager', symbol="manager") fig.update_layout( font=dict( size=20 ) ) fig.write_html(target_file) |
Note, that we pass the argument ingest_store_data_from_psql to our asset plot_revenue_per_day_per_manager. This is how we define dependencies in Dagster! Furthermore, since we associated an IO manager to our asset ingest_store_data_from_psql, the IO manager’s load_input function will inject the value to our plot asset.
Asset: average revenue per manager aggregation
Next, we create the file aggregation.py inside the transformation directory with the following lines of code:
1 2 3 4 5 6 7 8 9 10 11 12 |
import os import pandas as pd from dagster import asset @asset(group_name="franchise", io_manager_key="local_transformation_io_manager") def aggregate_avg_revenue_per_manager(context, ingest_store_data_from_psql: str): df = pd.read_csv(ingest_store_data_from_psql) result = df[["manager", "city", "street", "street_number", "revenue"]].groupby(["manager" "city", "street", "street_number"]).aggregate('mean') result["average_revenue"] = result["revenue"] result = result.drop(columns=["revenue"]).reset_index() return {'result': result} |
Once again, notice how concise our asset is. That is because we can again define a custom IO manager. By the way, you do not have to use a custom IO manager, Dagster offers several. But this way we learn more about them and how they work. As an exercise for the reader, try to implement the IO manager yourself and see how it feels like to handle one.
But do not worry, here is the code for the IO manager, just in case you have trouble implementing it yourself or do not want to do it:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
import os from dagster import ( IOManager, OutputContext, InputContext, InitResourceContext, io_manager, build_init_resource_context ) class LocalTransformationIOManager(IOManager): def __init__(self, target_file: str, pickle_file: str) -> None: self.target_file = target_file self.pickle_file = pickle_file def handle_output(self, context: OutputContext, obj: dict) -> None: os.makedirs(os.path.dirname(self.target_file), exist_ok=True) result = obj['result'] result.to_pickle(self.pickle_file) result.to_json(self.target_file, orient="records") def load_input(self, context: InputContext) -> str: return self.pickle_file @io_manager(required_resource_keys={"base_dir"}) def local_transformation_io_manager(init_context: InitResourceContext) -> LocalTransformationIOManager: base_dir = init_context.resources.base_dir target_file = os.path.join(base_dir, "storage/agg_avg_revenue_manager.json") pickle_file = os.path.join(base_dir, "storage/agg_avg_revenue_manager.pkl") return LocalTransformationIOManager(target_file=target_file, pickle_file=pickle_file) @io_manager(required_resource_keys={"base_dir"}) def transformation_io_manage(init_context: InitResourceContext) -> LocalTransformationIOManager: return local_transformation_io_manager( build_init_resource_context( resources={"base_dir": init_context.resources.base_dir} ) ) |
The logic is indeed very similar to the IO manager from before. You could go even a step further and try to create a single IO manager which manages both assets! This will require some more work though and more abstraction layers to implement. But for the sake of this experiment, we should not bother with overcomplications. Do not forget to add this IO manager to our repository!
Asset: average revenue per manager plot
As a last step, we have to create the file aggregation.py inside the plotting directory. Copy & Paste the following lines of code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
import os import pandas as pd import plotly.express as px from dagster import asset @asset(required_resource_keys={"base_dir"}, group_name="franchise") def plot_avg_revenue_per_manager(context, aggregate_avg_revenue_per_manager: str): base_dir = context.resources.base_dir target_file = f"{base_dir}/plots/agg_avg_revenue_manager.html" os.makedirs(os.path.dirname(target_file), exist_ok=True) df = pd.read_pickle(aggregate_avg_revenue_per_manager) fig = px.bar(df, x='manager', y='average_revenue', hover_data = ['city', 'street', 'street_number'], labels = {'average_revenue': "Average Revenue by Manager", 'manager_name': "Manager"}) fig.update_layout( font=dict( size=20 ) ) fig.write_html(target_file) |
By now, you should probably understand what we did here. If this doesn’t seem familiar to you, you should check out parts 1 & 2 of this article series.
Dagster’s web UI & pipeline run
To see our pipeline in the UI, execute the following command in a terminal in the working space directory:
1 |
dagit |
It should display a URL that points to the UI. When you open up your browser and follow the URL, you should see something like this:
You should see your pipeline and if you click on „Materialize all“ in the top right corner, Dagster will instantiate a run and a popup appears where we can click on „View run“. But before doing this, look around the UI, there is some useful information like when our pipeline is scheduled and when the last run finished.
When materializing our assets, the execution timeline will look like this:
What I really like about this UI view is that you can follow your pipeline run in real time. You can see when an asset is being processed and which assets are being processed in parallel and how long it takes. Furthermore, you have an event display and if some asset fails, you are also able to partially re-run your pipeline. I invite you to further investigate the UI and especially investigate the tabs „Assets“ and „Deployment“.
Moreover, you can even instantiate a Backfill via the UI, in Airflow you can only do this via the CLI. And if you want to run your pipeline on schedule, don’t forget to launch a Dagster daemon. Open up a terminal in your working space directory where also your .toml is located and run
1 |
dagster-daemon run |
Final remarks
Puhh, congratulations, we did it! We implemented the same pipeline in 3 data orchestration tools: Airflow, Prefect, and Dagster! If you are still motivated, then the next and last part of this article series might interest you: We will see how to do unit testing in each tool, how each tool performs in our rating, and what kind of future trends I see coming up more and more in data orchestration!