Kontinuierliche Datenpipeline in Snowflake
Business Intelligence

Kontinuierliche Datenpipeline mit Streams und Tasks in Snowflake

Lesezeit
11 ​​min

Willkommen im zweiten Artikel unserer Serie über die Snowflake Data Cloud. Der erste Artikel der Serie hat sich mit Snowpipe beschäftigt und wie es das Laden von Dateien ermöglicht, sobald sie in einem externen Cloudspeicher zur Verfügung stehen.  In diesem Teil stellen wir eine weitere Möglichkeit vor, Daten über eine gesteuerte Pipeline zu laden und im selben Schritt Transformationen anwenden zu können. Bei den Werkzeugen handelt es sich um die Objekttypen Streams und Tasks in Snowflake. Im Folgenden werden wir uns diese detailliert ansehen und gehen auf die Vorteile ein, die sie insbesondere im Zusammenspiel mit sich bringen.Ein Anwendungsfall in nahezu jedem datengetriebenen Projekt ist es, neue oder geänderte Daten einer Quelltabelle zuverlässig weiterzuverarbeiten und in entsprechende Zielstrukturen zu überführen.
Genau für einen solchen Use Case (auch die Bewirtschaftung einer Slowly Changing Dimension Typ 2 ist beispielsweise ein solches Szenario) bieten sich Streams und Tasks hervorragend an, da sie Features wie Scheduling und die automatisierbare Weiterverarbeitung von Änderungen bereits als integrierte Features mitbringen.

Use Case

Um genau zu verstehen, wie die Objekttypen Streams und Tasks in Snowflake funktionieren, bauen wir Schritt für Schritt einen geeigneten Anwendungsfall.
Hierfür das Beispieldataset „TPC-DS“ zum Einsatz. Da es frei verfügbar ist, lassen sich alle Code-Beispiele gut in der eigenen Umgebung nachbauen.

In unserem Use Case werden wir Daten von einer Quell- in eine Zieltabelle via Datenpipeline mit Streams und Tasks überführen und dabei

  • einen Klon der Quelltabelle „CUSTOMER“ erstellen
  • diesen Klon inkrementell mit Daten der Quelltabelle füllen
  • zusätzlich zufällige Aktualisierungen und Löschungen ausführen
  • alle CDC-Änderungen an dieser Tabelle identifizieren und in eine Zieltabelle weiterverarbeiten
  • zur besseren Nutzbarkeit der Daten eine kleine beispielhafte Transformation implementieren
  • verschiedene Tasks erstellen, ausführen und zeitgesteuert einplanen
  • einen Stream erstellen

In den nachfolgenden Skripten muss die Bezeichnung für das Warehouse angepasst werden, wobei hier ein Warehouse der Größe „XSMALL“ ausreichend ist. Um die Skripte 1:1 ausführen zu können, kann mit folgendem Code ein Warehouse mit dem Namen „DATALAB_WH“ erstellt werden.

Um eine saubere Trennung zu den Qelldaten aus dem TPD-DS Dataset zu ermöglichen, erstellen wir zunächst ein separates Schema und erstellen dort alle weiteren Datenbankobjekte.

 

Streams

In Snowflake stellt ein Stream change data capture (CDC) Informationen zu einer Tabelle bereit. Hiermit lassen sich Änderungen (INSERT, UPDATE und DELETE) auf Zeilenebene zu einem Zeitpunkt identifizieren. Diese können dann je nach Anwendungsfall verschiedenartig weiterverarbeitet werden. Der Inhalt eines Streams kann über eine gewöhnliche Query wie eine Tabelle abgefragt werden und enthält neben den Spalten der darunter liegenden Tabelle die folgenden drei zusätzliche Spalten:

  • „METADATA$ACTION“: Information über die Art der Aktion. Kann die Werte INSERT oder DELETE annehmen. Updates werden in einem Stream als DELETE der ursprünglichen Zeile und INSERT der Zeile mit neuem Wert dargestellt.
  • „METADATA$ISUPDATE“: Information, ob es sich bei dieser Zeile um eine UPDATE Operation handelt. Kann die Werte TRUE oder FALSE annehmen.
  • „METADATA$ROW_ID“: Eine eindeutige und unveränderliche ID für jede Zeile in der Quelltabelle. Mit ihr lassen sich über die Zeit hinweg Änderungen eindeutig zuordnen. Auch die durch UPDATES erzeugten DELETE und INSERT Einträge im Stream können auf diese Weise durch dieselbe ID in Zusammenhang gesetzt werden.

Wie zuvor erwähnt, möchten wir einen Staging-Klon der Customer-Tabelle im gerade erzeugten Schema erstellen. Diesen beladen wir später periodisch, um einen sich verändernden Datenbestand einer produktiven Umgebung zu simulieren.
Der Übersicht halber arbeiten wir nur mit einer Auswahl der ursprünglichen Spalten.

Den Inhalt und die Änderungen an dieser Tabelle wollen wir später in einer kuratierten Zieltabelle festhalten. Um diese Änderungen zu erkennen, erstellen wir einen Stream.

Initial ist dieser Stream leer, da noch keine Operationen auf der darunter liegenden Tabelle ausgeführt wurden.

 

Tasks

Ein Task bietet in Snowflake die Möglichkeit, auf einfache Art und Weise Transformationen, beispielsweise via SQL Statements, wiederholt ausführen zu lassen.

In unserem Fall werden wir mit einem Task den Inhalt eines Streams, also das Delta dessen zugrunde liegender Tabelle, gezielt mit einem SQL Statement weiterverarbeiten. Da ein Stream laufend Änderungen ansammelt und ein Task zur wiederkehrenden Ausführung gedacht ist, bietet sich die Verwendung in Kombination hervorragend für die Modellierung einer kontinuierlichen Datenpipeline an.

Tasks kommen außerdem mit einer Vielzahl an zusätzlichen Features daher, wie beispielsweise automatischem Logging und Monitoring, Scheduling sowie die an Bedingungen gekoppelte Ausführung.

Nachfolgend erzeugen wir einen Task, der ein Subset an Zeilen der Customer-Tabelle aus dem TPC-DS Dataset in den Staging-Klon lädt. Wie eingangs erwähnt, verschlanken wir die sehr breite Tabelle und arbeiten mit einer Auswahl an Spalten weiter.

Ein Task ist bei der Erstellung standardmäßig deaktiviert. Es gibt die Möglichkeit

  • den Task auszuführen („EXECUTE“)
  • den konfigurierten zeitgesteuerten Plan zu aktivieren („RESUME“)
  • den zeitgesteuerten Plan zu deaktivieren  („SUPEND“).

Um einem realen Szenario möglichst nahe zu kommen, erstellen wir auch Tasks, die Löschungen und Änderungen vornehmen.

Bei dem Task „STAGING_CUSTOMER_DELETES“ wird eine willkürliche Zeile gelöscht.
Bei dem Task „STAGING_CUSTOMER_UPDATES“ ändern wir den Wert des Felds „C_FIRST_NAME“ ebenfalls für eine willkürliche Zeile. Als Operation setzen wir den Inhalt des Felds über die Funktion UPPER() auf Großbuchstaben. Beide Tasks werden noch über den „EXECUTE“-Befehl ausgeführt.

Die drei historischen Ausführungen der Tasks (pro Task eine Ausführung) lassen sich übersichtlich mit Bordmitteln abfragen. Es sind umfassende Metadaten zu den Läufen vorhanden. Zu den bereitgestellten Informationen zählen beispielsweise

  • der komplette Ausgeführte Code in der Spalte „QUERY TEXT“,
  • sofern konfiguriert, ist auch die Bedingung für jeden Ladelauf in der Spalte „CONDITION_TEXT“ einsehbar,
  • auch werden genaue Zeitstempel von Start „SCHEDULED_TIME“ und Ende „COMPLETED_TIME“ der Ausführung des Tasks, Start der Ausführung Query „QUERY_START_TIME“ und nächster planmäßiger Start „NEXT_SCHEDULED_TIME“ des Tasks automatisch gepflegt.

Zieltabelle

Nachdem wir nun eine Quelltabelle für unsere Zwecke gebaut haben und auf dieser mit passenden Tasks INSERTS, UPDATES und DELETES erzeugen können, folgt wie eingangs erwähnt unsere Zieltabelle. In dieser sollen sich grundsätzlich alle durch die Tasks erzeugten oder veränderten Daten wiederfinden. Zusätzlich sollen Löschungen erkenntlich gemacht und das umständlich formatierte Geburtsdatum in den Datentyp DATE transformiert werden. Außerdem beladen wir die folgenden selbst erstellten Metadatenspalten:

  • „CREATED_AT“: Zeitstempel der initialen Beladung.
  • „DELETED_AT“: Zeitstempel der Löschung. NULL sofern Datensatz in der Quelltabelle nicht gelöscht wurde.
  • „IS_DELETED“: Information darüber, ob der Datensatz in der Quelltabelle gelöscht wurde. Kann die Werte TRUE oder FALSE annehmen.
  • „LAST_MODIFIED_AT“: Zeitstempel der letzten Änderung des Datensatzes.

Als Quelle dient uns der Inhalt des Streams. Die Weiterverarbeitung der Streamdaten lässt sich je nach Anforderung gestalten.

In unserem Beispiel möchten wir zunächst einmal alle neuen Einträge und alle Aktualisierungen überführen. Löschungen sollen in der Zieltabelle bestehen bleiben und lediglich entsprechende Metadaten zur Löschung in die Felder „DELETED_AT“ und „IS_DELETED“ eingetragen werden.

Bei dieser Gelegenheit (Überführung von Staging zu kuratiert) implementieren wir außerdem kleinere Datentransformationen, um die Daten für Endnutzer:innen besser lesbar zu machen. Konkret geht es um das Geburtsdatum, das in der Quelltabelle in separaten Spalten für Jahr, Monat und Tag gespeichert wurde. In unserem Fall bietet eine Umwandlung dieser Spalten in den Datentyp DATE eine bessere Lesbarkeit.
Die Zieltabelle wird mit folgendem Code erzeugt:

Um diese Tabelle nun automatisiert zu befüllen, erstellen wir einen letzten Task. Diesen konfigurieren wir so, dass er minütlich gestartet wird, und schalten über den „RESUME“-Befehl die periodische Ausführung aktiv.

Da die Tasks zur Befüllung der Staging-Tabelle bisher nur manuell ausgeführt wurden und noch nicht zeitlich eingeplant sind, zeigt die Ausführungshistorie nur eine Aktivität (grüne Markierung im nächsten Screenshot). Danach wurde die Ausführung aufgrund der implementierten Bedingung übersprungen (gelbe Markierung). Es handelt sich dabei um die Prüfung auf den Inhalt des Streams, so dass der Task nur dann startet, wenn auch zu verarbeitende Zeilen vorhanden sind. Konkret umgesetzt wurde das elegant über die Funktion „system$stream_has_data()“. Anschaulich wird das Ganze, wenn wir wenige Minuten, also ein paar Starts des Tasks, abwarten und dann die Historie abfragen. Über diese Selektion sind eine Vielzahl an Metainformationen zu den Aufrufen des Tasks einsehbar.

Abschließend aktivieren wir die zeitgesteuerte Ausführung der zu Beginn angelegten Tasks. Diese wurden ebenfalls mit einem minütlichen Zeitintervall erstellt.  Wir warten nochmals ein paar Minuten und fragen die Ausführungshistorie über obige Query erneut ab. Wir sehen nun mehrfach erfolgreich durchgeführte Läufe.

Um Kosten zu sparen, deaktivieren wir am Ende die automatische Ausführung aller Tasks und stoppen sicherheitshalber auch den Betrieb des Warehouses.

Fazit zu Streams und Tasks in Snowflake

Selbst getrennt betrachtet bieten Streams und auch Tasks je für sich von Haus aus schon sehr viel Funktionalität und Möglichkeiten zur Konfiguration.

Tasks sind ein sehr einfaches und schlankes Mittel, um wiederkehrende SQL Statements ausführen zu lassen. Der zeitliche Turnus lässt sich dabei je nach Anforderung bis auf eine minütliche Ausführung takten. Sollte mehr Individualität notwendig sein, kann das Scheduling auch via Crontab beliebig an die Gegebenheiten angepasst werden.

Für komplexere Abläufe können Tasks ganz einfach in Reihe / in Abhängigkeit geschaltet werden und so beliebige Abläufe steuern. Definiert werden diese dann über einen Directed Acyclic Graph (DAG). Zum Einsatz kommt dies in unseren Projekten beispielsweise bei der Beladung von Data Warehouses. Bevor die Faktentabellen prozessiert werden können, muss sichergestellt sein, dass die dazugehörigen Quellsysteme und Stammdaten (Dimensionen) vollständig beladen sind:

Mit Streams haben wir ebenfalls ein sehr mächtiges, aber dennoch schlankes Werkzeug zur Orchestrierung von Daten zur Verfügung. Je nach Anwendungsfall kommt es in unseren größeren Datenplattformen dazu, dass wir mehrere Datenströme für den Inhalt eines Streams haben. Da dieser aber bei Verarbeitung sofort geleert wird, bietet es sich an, für verschiedene Abläufe separate Streams auf dieselbe Quelltabelle zu setzen. Ein Stream kann übrigens nicht nur eine Tabelle, sondern auch Verzeichnistabellen, externe Tabellen, oder die einer View zugrunde liegenden Tabellen auf Änderungen überwachen.

Die individuellen Stärken entfalten sich – wie Eingangs im Artikel erwähnt – umso mehr, wenn sie im Zusammenspiel als Datenpipeline implementiert werden. Auf anderen Stacks und insbesondere bei früheren Lösungen zu Datenplattformen musste hier noch sehr viel manueller Code geschrieben und gewartet werden. Es fühlt sich sehr gut an, wie diese beiden Tools nahtlos in Snowflake integriert sind und wie smooth sie sich in verschiedenen Konstellationen nutzen lassen.

Hat dir der Beitrag gefallen?

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert