The following blog article describes one possible way to automate and orchestrate Snowpark scripts written in Python with the help of the generic orchestration tool Apache Airflow. I will show some ideas, concepts, and code snippets that we implemented at a project with one of our customers dmTECH. All the examples in this article are based on a simple “Hello World“ example.
Project context
dmTECH has been using Snowflake successfully for some time to migrate its existing data warehouse to a modern cloud-based solution. We support a team in another department with the implementation of various analytical use cases mainly on an on-premise Hadoop cluster in PySpark. Now we wanted to migrate one of the existing analytics use cases to Snowpark as a proof of concept to get to know the technology and its potential better. This blog article is mainly about the integration of Snowflake into our existing automation tooling.
Motivation
We already showed in a previous article how you can use tasks to easily automate Snowflake scripts. So why should we use an external tool like Airflow at all?
First of all, Airflow is a specialized orchestration tool that has much richer functionalities to build the sequence of tasks and their dependencies (we call this a DAG – Directed acyclic graph). For example, you can use so-called trigger rules to define under which circumstances a task should be triggered, depending on its predecessor tasks. Many more powerful features allow you to build complex DAG logic, which is currently missing in Snowflake.
Furthermore, Airflow’s web frontend is more suited for operational aspects like browsing and filtering historical runs, rerunning single tasks or a chain of tasks, browsing the logs, and so on. There are basic filter and sorting controls in Snowflakes web UI (a.k.a. Snowsight), but not as powerful as in Airflow.
The most important argument is that we use Airflow in the current project for almost every automated process, regardless of the technology the tasks run on. That means it is technology-agnostic and can schedule processes on almost every runtime technology you can think of. Thus, we do not have two separate places where we automate things but have it in one central place. Together with the fact that it’s probably the scheduling tool with the largest community, this makes it easier for the operation of all the processes in the first-level support, because they do not need a Snowflake account and Snowflake knowledge to restart a process from Airflow. Furthermore, at the customer company in this project, we have multiple other data processing and storage systems we want to integrate with, for example, an on-prem Hadoop system where we run various Spark jobs. Okay, we could migrate all those from Hadoop to Snowflake – but that is nothing you do in a few days or weeks.
Those are the reasons why we decided to use Airflow for automation over the Snowflake tasks.
Implementation
In the following subchapters, I want to show you which steps are necessary to integrate an Airflow installation with your Snowflake account and how to schedule stored procedures on it.
Install the Snowflake connector
The first step is to install the Snowflake provider on your Airflow machine. This can simply be done with the following command on your Airflow host:
1 2 3 4 5 |
export SNOWFLAKE_PROVIDER_VERSION="5.0.0" export AIRFLOW_VERSION="2.7.3" export PYTHON_VERSION="3.8" pip install "apache-airflow-providers-snowflake==${SNOWFLAKE_PROVIDER_VERSION}" --constraint /tmp/airflow_${AIRFLOW_VERSION}_py${PYTHON_VERSION}_constraints.txt |
You have to set the three mentioned environment variables depending on your needs and your given environment.
At the customer project, we are very lucky and have a platform team that provides the Airflow machine as a service. They put Airflow-specific pip constraint files on the filesystem of the remote machine so that you can use them to install further dependencies on your own without breaking the dependency tree. That’s done with the additional –constraint parameter.
Please note that in reality we do not simply SSH to our machine and manually install dependencies on the CLI. In fact, we have an automated process in place that runs all the provisioning steps in a reproducible CI pipeline.
Configure a Snowflake connection
After the installation of the Snowflake provider is done we can add and configure a Snowflake connection. This can either be done from the web frontend or the Airflow CLI. This is also something we automated in provisioning steps in a CI pipeline, where we use the following CLI command:
1 2 3 4 5 6 7 |
airflow connections add \ --conn-type snowflake \ --conn-schema SBX_JUSEITHER \ --conn-login "$SNOWFLAKE_SERVICE_USERNAME" \ --conn-password "$SNOWFLAKE_SERVICE_USER_PASSWORD" \ --conn-extra "{\"account\": \"$ACCOUNT_NAME\", \"warehouse\": \"$SNOWFLAKE_DEFAULT_WAREHOUSE\", \"database\": \"$SNOWFLAKE_DATABASE\", \"role\": \"$SNOWFLAKE_ROLE\"}" \ snowflake |
Again, all those variables need to be set before. In our case, they are injected from Gitlab as CI variables.
Wrap your code into a stored procedure
To call a Snowflake script – either written in SQL or any Snowpark-supported language (in our case Python) – you need to encapsulate it in a stored procedure. This is usually done with an SQL statement like this:
1 2 3 4 5 6 7 8 9 10 11 |
CREATE OR REPLACE PROCEDURE DEV.SBX_JUSEITHER.HELLO_WORLD(name string) RETURNS VARCHAR(16777216) LANGUAGE PYTHON RUNTIME_VERSION = '3.8' PACKAGES = ('snowflake-snowpark-python') HANDLER = 'main' EXECUTE AS OWNER AS '#This is a simple Hello World script import snowflake.snowpark as snowpark def main(session: snowpark.Session, name: str): return f"Hello {name}"'; |
As you can see, the Python logic is implemented inside the procedure definition itself. This may work for very small amounts of code but does not make much sense for larger or more complex code. What you usually want to do is deploy the Python files separately on a Snowflake stage and reference those files from your procedure definition.
So let’s first create a stage and make sure you have access to it (we assume you use the role “DEV_DEVELOPER“ in this example):
1 2 3 4 5 6 |
USE ROLE SYSADMIN; CREATE STAGE DEV.SBX_JUSEITHER.DEPLOY DIRECTORY = (ENABLE=TRUE); GRANT ALL PRIVILEGES ON STAGE DEV.SBX_JUSEITHER.DEPLOY TO ROLE DEV_DEVELOPER; |
The DIRECTORY part of the above statement ensures that you can view folders and files on your stage in Snowsight.
Next, let’s create a Python file with the following content and save it as hello_world.py:
1 2 3 4 5 6 |
# This is a simple Hello World script import snowflake.snowpark as snowpark def main(session: snowpark.Session, name: str): return f"Hello {name}” |
Now upload the Python file to the previously created stage. This can be done with SnowSQL, the Snowflake CLI tools:
1 2 3 4 5 6 7 8 9 10 |
export WORKING_DIR=$(pwd) snowsql \ --accountname $ACCOUNT_NAME \ --dbname $DB_NAME \ --schemaname $SCHEMA_NAME \ --rolename $ROLE_NAME \ --warehouse $WAREHOUSE \ --username $USERNAME \ --query "PUT file:///${WORKING_DIR}/hello_world.py @DEV.SBX_JUSEITHER.DEPLOY/hello_world AUTO_COMPRESS=FALSE OVERWRITE=TRUE |
After that you can see in Snowsight that the file was placed on the stage:
We can now adapt the definition of our stored procedure to use a reference to the Python file instead of the inline code:
1 2 3 4 5 6 7 8 |
CREATE OR REPLACE PROCEDURE DEV.SBX_JUSEITHER.HELLO_WORLD(name string) RETURNS VARCHAR(16777216) LANGUAGE PYTHON RUNTIME_VERSION = '3.8' PACKAGES = ('snowflake-snowpark-python') IMPORTS = (‘@DEV.SBX_JUSEITHER.DEPLOY/hello_world/hello_world.py’) HANDLER = 'hello_world.main' EXECUTE AS OWNER; |
The necessary changes are colored. In the IMPORTS part, you tell the procedure to import the hello_world.py file from your stage. Please note that you need to adapt the HANDLER definition to be prefixed with the name of the corresponding Python module. The code definition in the AS part can simply be skipped.
Again, this is all wrapped in CI pipelines in reality.
You can test your procedure with the following command:
1 |
CALL DEV.SBX_JUSEITHER.HELLO_WORLD(name => ‘inovex’); |
As you can see on the following screenshot, the return value is shown as result of this query:
Write your DAG
Now that your code is packed into a stored procedure, and Airflow can connect to your Snowflake account, we can prepare a DAG which runs periodically and simply calls it. The Snowflake Airflow provider mentioned above has a special operator for running arbitrary SQL code. At the time of writing, according to the docs, this was already marked as deprecated. The suggested alternative is to use the generic SQLExecuteQueryOperator .
The task definition in our DAG looks like this:
1 2 3 4 5 6 7 |
hello_world_snowflake = SQLExecuteQueryOperator( dag=dag, task_id="hello_world_snowflake", sql='CALL DEV.SBX_JUSEITHER.DEPLOY(name => {{ run_id }})', conn_id="snowflake", show_return_value_in_logs=True, ) |
Here are a few things to note:
- You can use any pre-defined Airflow macros in your SQL code, in the above example I put the run ID of the current DAG run as a name into the procedure call
- The parameter conn_id references the Snowflake connection that we created a few steps before
- show_return_value_in_logs can be used to print the output of the operator to the Airflow logs. Caution: This should only be used for small amounts of data, for example, while debugging/developing
Monitoring and logging
When it comes to automating production jobs, one important aspect is monitoring the running scripts and finding logs that let you investigate errors.
The monitoring part is at least at a basic level covered by Airflow itself, because in its web UI you have a good overview of the tasks that ran and the corresponding logs, status, and so on. Many of us have experience in the operation of Airflow DAGs and there is a lot of material out there covering this topic in detail. So I will not dive deeper into it. But what can you do in addition from Snowflake side?
First, you can simply use the query history in Snowsight to get a feeling of the runtime of your procedure, the single steps that are executed (Tab “Query Profile“), and possible failures:
Second, you can add logging to your Python script and find the log messages in the so-called event table. The event table is a special table in your account (you can define exactly one) that stores all logs and traces from your applications. Please note that you need to create and configure the event table initially.
To add logging to your code, adapt the Python script as follows:
1 2 3 4 5 6 7 8 9 |
# This is a simple Hello World script import snowflake.snowpark as snowpark import logging logger = logging.getLogger("hello_world") def main(session: snowpark.Session, name: str): logger.warn(f"Hello {name}") return f"Hello {name}" |
As you can see, we simply use the Python standard logging mechanism.
After upgrading the procedure with the new code and running it again, you can find your logs in the event table for example with the following query:
1 2 3 4 |
SELECT timestamp, value FROM COMMON.COMMON.EVENT_TABLE WHERE resource_attributes['snow.schema.name'] = 'SBX_JUSEITHER' AND record['severity_text'] = 'WARN' ORDER BY TIMESTAMP DESC; |
Run your Airflow behind a proxy server
Especially in a larger corporate context, servers often run behind a proxy server for security reasons. To route the Snowflake connection through the proxy, you can simply set the following environment variables inside your DAG definition:
1 2 3 |
import os os.environ["HTTP_PROXY"] = "http://your.proxy.host.com:8000" os.environ["HTTPS_PROXY"] = "http://your.proxy.host.com:8000" |
Of cause you have to adapt the hostname and port to your actual values.
The proxy brings one more challenge into the game: Snowflake uses OCSP for certificate validation by default. You could disable it in the client (for example in your Airflow Snowflake connection definition), but it’s strongly recommended to leave it enabled for security reasons. Depending on the configuration of your proxy, there is a chance that it will intercept the SSL certificate chain (also known as SSL termination). This will break the OCSP communication between your Airflow server and the OCSP responder. Your network administrators need to bypass the Snowflake hostnames on the proxy, otherwise, you will get OCSP errors (or even worse: depending on the mode, it won’t work at all).
Final thoughts
I showed multiple steps on how we use Airflow to automate Snowpark Python scripts in one of our customer projects at dmTECH. Please note that there might be other options that could better suit your requirements. I want to mention especially the Snowflake internal way in the form of tasks. At the time of writing Snowflake has a very interesting preview feature available in its Python API which allows you to define more complex DAGs as Python code, almost like in Airflow. If you are exclusively on Snowflake, don’t have the need to schedule tasks on multiple technology stacks from one common orchestration tool, and have simpler DAG logic, check that out!
This article focused mainly on the automation and production aspects of a Snowpark application. If you want to learn more about Snowpark itself and its capabilities please check out this blog post with a more sophisticated code example.