Willkommen im zweiten Artikel unserer Serie über die Snowflake Data Cloud. Der erste Artikel der Serie hat sich mit Snowpipe beschäftigt und wie es das Laden von Dateien ermöglicht, sobald sie in einem externen Cloudspeicher zur Verfügung stehen. In diesem Teil stellen wir eine weitere Möglichkeit vor, Daten über eine gesteuerte Pipeline zu laden und im selben Schritt Transformationen anwenden zu können. Bei den Werkzeugen handelt es sich um die Objekttypen Streams und Tasks in Snowflake. Im Folgenden werden wir uns diese detailliert ansehen und gehen auf die Vorteile ein, die sie insbesondere im Zusammenspiel mit sich bringen.Ein Anwendungsfall in nahezu jedem datengetriebenen Projekt ist es, neue oder geänderte Daten einer Quelltabelle zuverlässig weiterzuverarbeiten und in entsprechende Zielstrukturen zu überführen.
Genau für einen solchen Use Case (auch die Bewirtschaftung einer Slowly Changing Dimension Typ 2 ist beispielsweise ein solches Szenario) bieten sich Streams und Tasks hervorragend an, da sie Features wie Scheduling und die automatisierbare Weiterverarbeitung von Änderungen bereits als integrierte Features mitbringen.
Use Case
Um genau zu verstehen, wie die Objekttypen Streams und Tasks in Snowflake funktionieren, bauen wir Schritt für Schritt einen geeigneten Anwendungsfall.
Hierfür das Beispieldataset „TPC-DS“ zum Einsatz. Da es frei verfügbar ist, lassen sich alle Code-Beispiele gut in der eigenen Umgebung nachbauen.
In unserem Use Case werden wir Daten von einer Quell- in eine Zieltabelle via Datenpipeline mit Streams und Tasks überführen und dabei
- einen Klon der Quelltabelle „CUSTOMER“ erstellen
- diesen Klon inkrementell mit Daten der Quelltabelle füllen
- zusätzlich zufällige Aktualisierungen und Löschungen ausführen
- alle CDC-Änderungen an dieser Tabelle identifizieren und in eine Zieltabelle weiterverarbeiten
- zur besseren Nutzbarkeit der Daten eine kleine beispielhafte Transformation implementieren
- verschiedene Tasks erstellen, ausführen und zeitgesteuert einplanen
- einen Stream erstellen
In den nachfolgenden Skripten muss die Bezeichnung für das Warehouse angepasst werden, wobei hier ein Warehouse der Größe „XSMALL“ ausreichend ist. Um die Skripte 1:1 ausführen zu können, kann mit folgendem Code ein Warehouse mit dem Namen „DATALAB_WH“ erstellt werden.
1 2 3 4 5 6 |
-- create warehouse for following executions CREATE OR REPLACE WAREHOUSE DATALAB_WH WITH WAREHOUSE_SIZE = 'XSMALL' AUTO_SUSPEND = 10 AUTO_RESUME = TRUE INITIALLY_SUSPENDED = TRUE; |
Um eine saubere Trennung zu den Qelldaten aus dem TPD-DS Dataset zu ermöglichen, erstellen wir zunächst ein separates Schema und erstellen dort alle weiteren Datenbankobjekte.
1 2 |
-- create a separate schema CREATE OR replace SCHEMA TPC_DS.BLOG; |
Streams
In Snowflake stellt ein Stream change data capture (CDC) Informationen zu einer Tabelle bereit. Hiermit lassen sich Änderungen (INSERT, UPDATE und DELETE) auf Zeilenebene zu einem Zeitpunkt identifizieren. Diese können dann je nach Anwendungsfall verschiedenartig weiterverarbeitet werden. Der Inhalt eines Streams kann über eine gewöhnliche Query wie eine Tabelle abgefragt werden und enthält neben den Spalten der darunter liegenden Tabelle die folgenden drei zusätzliche Spalten:
- „METADATA$ACTION“: Information über die Art der Aktion. Kann die Werte INSERT oder DELETE annehmen. Updates werden in einem Stream als DELETE der ursprünglichen Zeile und INSERT der Zeile mit neuem Wert dargestellt.
- „METADATA$ISUPDATE“: Information, ob es sich bei dieser Zeile um eine UPDATE Operation handelt. Kann die Werte TRUE oder FALSE annehmen.
- „METADATA$ROW_ID“: Eine eindeutige und unveränderliche ID für jede Zeile in der Quelltabelle. Mit ihr lassen sich über die Zeit hinweg Änderungen eindeutig zuordnen. Auch die durch UPDATES erzeugten DELETE und INSERT Einträge im Stream können auf diese Weise durch dieselbe ID in Zusammenhang gesetzt werden.
Wie zuvor erwähnt, möchten wir einen Staging-Klon der Customer-Tabelle im gerade erzeugten Schema erstellen. Diesen beladen wir später periodisch, um einen sich verändernden Datenbestand einer produktiven Umgebung zu simulieren.
Der Übersicht halber arbeiten wir nur mit einer Auswahl der ursprünglichen Spalten.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
-- create staging table clone from CUSTOMER as a new source we want to fill periodically CREATE OR replace TABLE TPC_DS.BLOG.STAGING_CUSTOMER ( C_CUSTOMER_SK NUMBER(38, 0) ,C_CUSTOMER_ID VARCHAR(16777216) ,C_SALUTATION VARCHAR(16777216) ,C_FIRST_NAME VARCHAR(16777216) ,C_LAST_NAME VARCHAR(16777216) ,C_PREFERRED_CUST_FLAG VARCHAR(16777216) ,C_BIRTH_DAY NUMBER(38, 0) ,C_BIRTH_MONTH NUMBER(38, 0) ,C_BIRTH_YEAR NUMBER(38, 0) ,C_BIRTH_COUNTRY VARCHAR(16777216) ,C_LOGIN VARCHAR(16777216) ,C_EMAIL_ADDRESS VARCHAR(16777216) ,LAST_MODIFIED DATETIME ); |
Den Inhalt und die Änderungen an dieser Tabelle wollen wir später in einer kuratierten Zieltabelle festhalten. Um diese Änderungen zu erkennen, erstellen wir einen Stream.
1 2 |
-- create stream on staging table CREATE OR replace stream STAGING_CUSTOMER_STREAM ON TABLE TPC_DS.BLOG.STAGING_CUSTOMER; |
Initial ist dieser Stream leer, da noch keine Operationen auf der darunter liegenden Tabelle ausgeführt wurden.
1 2 3 |
-- select stream SELECT * FROM TPC_DS.BLOG.STAGING_CUSTOMER_STREAM; |
Tasks
Ein Task bietet in Snowflake die Möglichkeit, auf einfache Art und Weise Transformationen, beispielsweise via SQL Statements, wiederholt ausführen zu lassen.
In unserem Fall werden wir mit einem Task den Inhalt eines Streams, also das Delta dessen zugrunde liegender Tabelle, gezielt mit einem SQL Statement weiterverarbeiten. Da ein Stream laufend Änderungen ansammelt und ein Task zur wiederkehrenden Ausführung gedacht ist, bietet sich die Verwendung in Kombination hervorragend für die Modellierung einer kontinuierlichen Datenpipeline an.
Tasks kommen außerdem mit einer Vielzahl an zusätzlichen Features daher, wie beispielsweise automatischem Logging und Monitoring, Scheduling sowie die an Bedingungen gekoppelte Ausführung.
Nachfolgend erzeugen wir einen Task, der ein Subset an Zeilen der Customer-Tabelle aus dem TPC-DS Dataset in den Staging-Klon lädt. Wie eingangs erwähnt, verschlanken wir die sehr breite Tabelle und arbeiten mit einer Auswahl an Spalten weiter.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
-- create task for incremental staging table load CREATE OR replace task STAGING_CUSTOMER_LOAD warehouse = DATALAB_WH schedule = '1 minute' AS INSERT INTO TPC_DS.BLOG.STAGING_CUSTOMER ( SELECT C_CUSTOMER_SK ,C_CUSTOMER_ID ,C_SALUTATION ,C_FIRST_NAME ,C_LAST_NAME ,C_PREFERRED_CUST_FLAG ,C_BIRTH_DAY ,C_BIRTH_MONTH ,C_BIRTH_YEAR ,C_BIRTH_COUNTRY ,C_LOGIN ,C_EMAIL_ADDRESS ,current_timestamp FROM TPC_DS.silver_1gb.customer WHERE round(c_customer_sk / 10) = ( SELECT COALESCE(MAX(ROUND(C_CUSTOMER_SK/ 10)) + 1, 1) FROM TPC_DS.BLOG.STAGING_CUSTOMER ) ); |
Ein Task ist bei der Erstellung standardmäßig deaktiviert. Es gibt die Möglichkeit
- den Task auszuführen („EXECUTE“)
- den konfigurierten zeitgesteuerten Plan zu aktivieren („RESUME“)
- den zeitgesteuerten Plan zu deaktivieren („SUPEND“).
1 2 3 4 |
-- commands for the recent task execute task TPC_DS.BLOG.STAGING_CUSTOMER_LOAD; -- alter task TPC_DS.BLOG.STAGING_CUSTOMER_LOAD resume; -- alter task TPC_DS.BLOG.STAGING_CUSTOMER_LOAD suspend; |
Um einem realen Szenario möglichst nahe zu kommen, erstellen wir auch Tasks, die Löschungen und Änderungen vornehmen.
Bei dem Task „STAGING_CUSTOMER_DELETES“ wird eine willkürliche Zeile gelöscht.
Bei dem Task „STAGING_CUSTOMER_UPDATES“ ändern wir den Wert des Felds „C_FIRST_NAME“ ebenfalls für eine willkürliche Zeile. Als Operation setzen wir den Inhalt des Felds über die Funktion UPPER() auf Großbuchstaben. Beide Tasks werden noch über den „EXECUTE“-Befehl ausgeführt.
1 2 3 4 5 6 7 8 9 |
-- create task for incremental delete simulation CREATE OR replace task STAGING_CUSTOMER_DELETES warehouse = DATALAB_WH schedule = '1 minute' AS DELETE FROM TPC_DS.BLOG.STAGING_CUSTOMER AS a USING ( SELECT TOP 1 c_customer_id FROM TPC_DS.BLOG.STAGING_CUSTOMER ORDER BY random() ) AS b WHERE a.c_customer_id = b.c_customer_id; |
1 2 3 4 |
-- commands for the recent task execute task TPC_DS.BLOG.STAGING_CUSTOMER_DELETES; -- alter task TPC_DS.BLOG.STAGING_CUSTOMER_DELETES resume; -- alter task TPC_DS.BLOG.STAGING_CUSTOMER_DELETES suspend; |
1 2 3 4 5 6 7 8 9 10 |
-- create task for incremental update simulation CREATE OR replace task STAGING_CUSTOMER_UPDATES warehouse = DATALAB_WH schedule = '1 minute' AS UPDATE TPC_DS.BLOG.STAGING_CUSTOMER SET C_FIRST_NAME = UPPER(C_FIRST_NAME) WHERE c_customer_id = ( SELECT TOP 1 c_customer_id FROM TPC_DS.BLOG.STAGING_CUSTOMER ORDER BY random() ); |
1 2 3 4 |
-- commands for the recent task execute task TPC_DS.BLOG.STAGING_CUSTOMER_UPDATES; -- alter task TPC_DS.BLOG.STAGING_CUSTOMER_UPDATES resume; -- alter task TPC_DS.BLOG.STAGING_CUSTOMER_UPDATES suspend; |
Die drei historischen Ausführungen der Tasks (pro Task eine Ausführung) lassen sich übersichtlich mit Bordmitteln abfragen. Es sind umfassende Metadaten zu den Läufen vorhanden. Zu den bereitgestellten Informationen zählen beispielsweise
- der komplette Ausgeführte Code in der Spalte „QUERY TEXT“,
- sofern konfiguriert, ist auch die Bedingung für jeden Ladelauf in der Spalte „CONDITION_TEXT“ einsehbar,
- auch werden genaue Zeitstempel von Start „SCHEDULED_TIME“ und Ende „COMPLETED_TIME“ der Ausführung des Tasks, Start der Ausführung Query „QUERY_START_TIME“ und nächster planmäßiger Start „NEXT_SCHEDULED_TIME“ des Tasks automatisch gepflegt.
1 2 3 4 5 6 7 8 9 |
-- query task history SELECT * FROM TABLE (information_schema.task_history()) WHERE name IN ( 'STAGING_CUSTOMER_LOAD' ,'STAGING_CUSTOMER_DELETES' ,'STAGING_CUSTOMER_UPDATES' ) ORDER BY scheduled_time DESC; |
Zieltabelle
Nachdem wir nun eine Quelltabelle für unsere Zwecke gebaut haben und auf dieser mit passenden Tasks INSERTS, UPDATES und DELETES erzeugen können, folgt wie eingangs erwähnt unsere Zieltabelle. In dieser sollen sich grundsätzlich alle durch die Tasks erzeugten oder veränderten Daten wiederfinden. Zusätzlich sollen Löschungen erkenntlich gemacht und das umständlich formatierte Geburtsdatum in den Datentyp DATE transformiert werden. Außerdem beladen wir die folgenden selbst erstellten Metadatenspalten:
- „CREATED_AT“: Zeitstempel der initialen Beladung.
- „DELETED_AT“: Zeitstempel der Löschung. NULL sofern Datensatz in der Quelltabelle nicht gelöscht wurde.
- „IS_DELETED“: Information darüber, ob der Datensatz in der Quelltabelle gelöscht wurde. Kann die Werte TRUE oder FALSE annehmen.
- „LAST_MODIFIED_AT“: Zeitstempel der letzten Änderung des Datensatzes.
Als Quelle dient uns der Inhalt des Streams. Die Weiterverarbeitung der Streamdaten lässt sich je nach Anforderung gestalten.
In unserem Beispiel möchten wir zunächst einmal alle neuen Einträge und alle Aktualisierungen überführen. Löschungen sollen in der Zieltabelle bestehen bleiben und lediglich entsprechende Metadaten zur Löschung in die Felder „DELETED_AT“ und „IS_DELETED“ eingetragen werden.
Bei dieser Gelegenheit (Überführung von Staging zu kuratiert) implementieren wir außerdem kleinere Datentransformationen, um die Daten für Endnutzer:innen besser lesbar zu machen. Konkret geht es um das Geburtsdatum, das in der Quelltabelle in separaten Spalten für Jahr, Monat und Tag gespeichert wurde. In unserem Fall bietet eine Umwandlung dieser Spalten in den Datentyp DATE eine bessere Lesbarkeit.
Die Zieltabelle wird mit folgendem Code erzeugt:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
-- create target table for customer stream data CREATE OR replace TABLE TPC_DS.BLOG.CUSTOMER ( C_CUSTOMER_ID VARCHAR(16777216) ,C_SALUTATION VARCHAR(16777216) ,C_FIRST_NAME VARCHAR(16777216) ,C_LAST_NAME VARCHAR(16777216) ,C_BIRTH_COUNTRY VARCHAR(16777216) ,c_BIRTH_DATE DATE ,C_EMAIL_ADDRESS VARCHAR(16777216) ,CREATED_AT DATETIME ,DELETED_AT DATETIME ,IS_DELETED VARCHAR(7) ,LAST_MODIFIED_AT DATETIME ); |
Um diese Tabelle nun automatisiert zu befüllen, erstellen wir einen letzten Task. Diesen konfigurieren wir so, dass er minütlich gestartet wird, und schalten über den „RESUME“-Befehl die periodische Ausführung aktiv.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
-- create task to fill new customer target table (insert, update, delete) with stream data as source CREATE OR replace task CUSTOMER_LOAD warehouse = DATALAB_WH schedule = '1 minute' when system$stream_has_data('TPC_DS.BLOG.STAGING_CUSTOMER_STREAM') AS MERGE INTO TPC_DS.BLOG.CUSTOMER t USING TPC_DS.BLOG.STAGING_CUSTOMER_STREAM s ON t.c_customer_id = s.c_customer_id -- new entry: insert entry WHEN NOT MATCHED AND METADATA$ACTION = 'INSERT' AND METADATA$ISUPDATE = 'False' THEN INSERT ( C_CUSTOMER_ID ,C_SALUTATION ,C_FIRST_NAME ,C_LAST_NAME ,C_BIRTH_COUNTRY ,C_BIRTH_DATE ,C_EMAIL_ADDRESS ,CREATED_AT ,DELETED_AT ,IS_DELETED ,LAST_MODIFIED_AT ) VALUES ( s.C_CUSTOMER_ID ,s.C_SALUTATION ,s.C_FIRST_NAME ,s.C_LAST_NAME ,s.C_BIRTH_COUNTRY ,TO_DATE(CONCAT(s.C_BIRTH_YEAR, '-', s.C_BIRTH_MONTH, '-', s.C_BIRTH_DAY)) ,s.C_EMAIL_ADDRESS ,current_timestamp ,NULL ,'False' ,current_timestamp ) -- deleted entry: update entry WHEN MATCHED AND s.METADATA$ACTION = 'DELETE' AND s.METADATA$ISUPDATE = 'False' THEN UPDATE SET t.IS_DELETED = 'True' ,DELETED_AT = current_timestamp -- updated entry: update entry WHEN MATCHED AND s.METADATA$ACTION = 'INSERT' AND s.METADATA$ISUPDATE = 'True' THEN UPDATE SET t.C_SALUTATION = s.C_SALUTATION ,t.C_FIRST_NAME = s.C_FIRST_NAME ,t.C_LAST_NAME = s.C_LAST_NAME ,t.C_BIRTH_COUNTRY = s.C_BIRTH_COUNTRY ,t.C_BIRTH_DATE = TO_DATE(CONCAT(s.C_BIRTH_YEAR, '-', s.C_BIRTH_MONTH, '-', s.C_BIRTH_DAY)) ,t.C_EMAIL_ADDRESS = s.C_EMAIL_ADDRESS ,t.LAST_MODIFIED_AT = current_timestamp; |
1 2 3 4 |
-- commands for the recent task -- execute task TPC_DS.BLOG.CUSTOMER_LOAD; alter task TPC_DS.BLOG.CUSTOMER_LOAD resume; -- alter task TPC_DS.BLOG.CUSTOMER_LOAD suspend; |
Da die Tasks zur Befüllung der Staging-Tabelle bisher nur manuell ausgeführt wurden und noch nicht zeitlich eingeplant sind, zeigt die Ausführungshistorie nur eine Aktivität (grüne Markierung im nächsten Screenshot). Danach wurde die Ausführung aufgrund der implementierten Bedingung übersprungen (gelbe Markierung). Es handelt sich dabei um die Prüfung auf den Inhalt des Streams, so dass der Task nur dann startet, wenn auch zu verarbeitende Zeilen vorhanden sind. Konkret umgesetzt wurde das elegant über die Funktion „system$stream_has_data()“. Anschaulich wird das Ganze, wenn wir wenige Minuten, also ein paar Starts des Tasks, abwarten und dann die Historie abfragen. Über diese Selektion sind eine Vielzahl an Metainformationen zu den Aufrufen des Tasks einsehbar.
1 2 3 4 5 6 7 |
-- query task history SELECT * FROM TABLE (information_schema.task_history()) WHERE name IN ( 'CUSTOMER_LOAD' ) ORDER BY scheduled_time DESC; |
Abschließend aktivieren wir die zeitgesteuerte Ausführung der zu Beginn angelegten Tasks. Diese wurden ebenfalls mit einem minütlichen Zeitintervall erstellt. Wir warten nochmals ein paar Minuten und fragen die Ausführungshistorie über obige Query erneut ab. Wir sehen nun mehrfach erfolgreich durchgeführte Läufe.
1 2 3 4 |
-- resume recent tasks alter task TPC_DS.BLOG.STAGING_CUSTOMER_LOAD resume; alter task TPC_DS.BLOG.STAGING_CUSTOMER_DELETES resume; alter task TPC_DS.BLOG.STAGING_CUSTOMER_UPDATES resume; |
1 2 3 4 5 6 7 |
-- query task history SELECT * FROM TABLE (information_schema.task_history()) WHERE name IN ( 'CUSTOMER_LOAD' ) ORDER BY scheduled_time DESC; |
Um Kosten zu sparen, deaktivieren wir am Ende die automatische Ausführung aller Tasks und stoppen sicherheitshalber auch den Betrieb des Warehouses.
1 2 3 4 5 6 7 |
-- suspend all tasks alter task TPC_DS.BLOG.STAGING_CUSTOMER_LOAD suspend; alter task TPC_DS.BLOG.STAGING_CUSTOMER_DELETES suspend; alter task TPC_DS.BLOG.STAGING_CUSTOMER_UPDATES suspend; alter task TPC_DS.BLOG.CUSTOMER_LOAD suspend; -- suspend warehouse alter warehouse DATALAB_WH suspend; |
Fazit zu Streams und Tasks in Snowflake
Selbst getrennt betrachtet bieten Streams und auch Tasks je für sich von Haus aus schon sehr viel Funktionalität und Möglichkeiten zur Konfiguration.
Tasks sind ein sehr einfaches und schlankes Mittel, um wiederkehrende SQL Statements ausführen zu lassen. Der zeitliche Turnus lässt sich dabei je nach Anforderung bis auf eine minütliche Ausführung takten. Sollte mehr Individualität notwendig sein, kann das Scheduling auch via Crontab beliebig an die Gegebenheiten angepasst werden.
Für komplexere Abläufe können Tasks ganz einfach in Reihe / in Abhängigkeit geschaltet werden und so beliebige Abläufe steuern. Definiert werden diese dann über einen Directed Acyclic Graph (DAG). Zum Einsatz kommt dies in unseren Projekten beispielsweise bei der Beladung von Data Warehouses. Bevor die Faktentabellen prozessiert werden können, muss sichergestellt sein, dass die dazugehörigen Quellsysteme und Stammdaten (Dimensionen) vollständig beladen sind:
Mit Streams haben wir ebenfalls ein sehr mächtiges, aber dennoch schlankes Werkzeug zur Orchestrierung von Daten zur Verfügung. Je nach Anwendungsfall kommt es in unseren größeren Datenplattformen dazu, dass wir mehrere Datenströme für den Inhalt eines Streams haben. Da dieser aber bei Verarbeitung sofort geleert wird, bietet es sich an, für verschiedene Abläufe separate Streams auf dieselbe Quelltabelle zu setzen. Ein Stream kann übrigens nicht nur eine Tabelle, sondern auch Verzeichnistabellen, externe Tabellen, oder die einer View zugrunde liegenden Tabellen auf Änderungen überwachen.
Die individuellen Stärken entfalten sich – wie Eingangs im Artikel erwähnt – umso mehr, wenn sie im Zusammenspiel als Datenpipeline implementiert werden. Auf anderen Stacks und insbesondere bei früheren Lösungen zu Datenplattformen musste hier noch sehr viel manueller Code geschrieben und gewartet werden. Es fühlt sich sehr gut an, wie diese beiden Tools nahtlos in Snowflake integriert sind und wie smooth sie sich in verschiedenen Konstellationen nutzen lassen.