Das Snowflake-Logo auf hellblauem Hintergrund
Business Intelligence

Kontinuierlicher Import von Daten mit Snowpipe

Lesezeit
7 ​​min

Willkommen zu unserem ersten Artikel in einer längeren Artikelserie über die Snowflake Data Cloud, einer cloud-nativen Plattform, die separate Data Warehouses, Data Lakes und Data Marts überflüssig macht und eine sichere gemeinsame Nutzung von Daten im gesamten Unternehmen ermöglicht.

Snowflake ist seit vielen Jahren als cloud-native Data Warehouse Lösung bekannt, hat aber in den letzten Monaten viele neue Features, wie z.B. Snowpark als Spark-ähnliche Dataframe API für klassisches Data Engineering und Snowpark ML für Data Science, hinzugewonnen. Da damit viele Use-Cases erstmals auf Snowflake ermöglicht werden, die bisher traditionell auf der Databricks Lakehouse Plattform umgesetzt wurden, möchten wir in einer Reihe an Artikeln die Features der Snowflake Data Cloud beleuchten. Ziel dieser Artikelserie ist es, einem breiteren Publikum ein umfassendes Verständnis über die Snowflake Technologien zu vermitteln, um selbst evaluieren zu können, ob Snowflake eine passende Lösung für den eigenen Use-Case sein kann.

In diesem ersten Beitrag unserer Blogserie zu Snowflake schauen wir uns Snowpipe an. Snowpipe ermöglicht das Laden von Dateien, sobald sie in einem externen Cloudspeicher zur Verfügung stehen. Entsprechende Ladestrecken können vollständig automatisiert werden, indem Ereignisbenachrichtigungen verwendet werden, die über das Eintreffen neuer Dateien informieren.

Wir bei inovex nutzen diese Technologie, um die Daten unseres Meetingraum-Monitorings auf einfachem und schnellem Wege in Snowflake zur Verfügung zu stellen. Hierzu werden die von einem Azure IoT Hub in einen Storage Account geschriebenen Daten kontinuierlich in eine Snowflake-Tabelle geladen.

Anhand dieses Use Case zeigen wir Step-by-step, wie eine automatisierte Ladestrecke aus einem Azure Storage Account mit Snowpipe umgesetzt werden kann.

Voraussetzungen

Wir setzen voraus, dass die folgenden Komponenten bereits vorhanden sind:

  • Snowflake-Zugang mit entsprechende Berechtigungen sowie Datenbank und Schema
  • Azure-Zugang mit entsprechende Berechtigungen
  • Storage Account auf dem kontinuierlich Daten eintreffen (kann natürlich auch manuell nachgeahmt werden)
  • Storage Queue auf dem Storage Account

Die einzelnen Schritte in der Übersicht

  1. Erstellen einer Storage Integration
  2. Erstellen einer Notification Integration
  3. Importieren der Daten mit Snowpipe

Erstellen einer Storage Integration

Zunächst erstellen wir in Snowflake eine Storage Integration. Diese ermöglicht die Integration mit externen Cloudspeichern, in unserem Falle mit einem Azure Storage Account. Auf diesem Wege können dann Daten von dort abgerufen und dort gespeichert werden.

Die AZURE_TENANT_ID ist die ID des Office 365 Mandanten, zu dem der verwendete Storage Account gehört.

In STORAGE_ALLOWED_LOCATIONS können beliebig viele Pfade angegeben werden, auf die wir Zugriff gewähren wollen. Wir geben dort also den Pfad zu dem Container an, in den kontinuierlich Daten geliefert werden.

Anschließend lassen wir uns die Storage Integration mittels DESC beschreiben:

Abfrageergebnis des DESC-Befehls

Der AZURE_MULTI_TENANT_APP_NAME ist der Name der Snowflake-Clientanwendung, die für die Storage Integration verwendet wird. Diesen werden wir brauchen, um der Anwendung im Azure Portal die benötigten Berechtigungen zu erteilen.

Die AZURE_CONSENT_URL zeigt die URL zu einer Microsoft-Berechtigungsanforderungsseite.

Geben wir diese in unseren Browser ein, werden wir aufgefordert, der Clientanwendung die benötigten Berechtigungen zu erteilen:

Microsoft-Berechtigungsanforderungsseite

Wir akzeptieren die Vergabe dieser Berechtigungen und werden im Anschluss auf die Snowflake-Website weitergeleitet.

Damit die Anwendung die Daten in unserem Storage Account lesen darf, fügen wir ihn im Access Control (IAM) Blade des Storage Accounts mit der Rolle “Storage Blob Data Reader“ hinzu:

App mit der Berechtigung Storage Blob Data Reader

Erstellen einer Notification Integration

Eine Notification Integration ist ein Snowflake-Objekt, das als Schnittstelle zwischen Snowflake und Cloud-Messagingdiensten dient. Für unser Szenario bedeutet dies, dass Snowflake von einer Azure Storage Queue über das Eintreffen neuer Daten auf dem Storage Account benachrichtigt wird. Auf diese Benachrichtigung setzen wir dann eine Pipe, die automatisch die neuen Daten importieren wird.

Das Erstellen einer Notification Integration ist ähnlich dem Erstellen einer Storage Integration. Zuerst legen wir das Objekt in Snowflake an:

Wie zuvor ist die AZURE_TENANT_ID die ID des Office 365 Mandanten, zu dem der verwendete Storage Account gehört.

In AZURE_STORAGE_QUEUE_PRIMARY_URI tragen wir die URL der Azure Storage Queue auf unserem Storage Account ein.

Mit DESC sehen wir den Namen der Snowflake-Clientanwendung und holen uns die Consent URL:

Abfrageergebnis des DESC-Befehls

Ab in den Browser damit und die angeforderten Berechtigungen akzeptieren:

Microsoft-Berechtigungsanforderungsseite

Im Access Control (IAM) Blade der Storage Queue muss die Anwendung nun noch mit der Rolle “Storage Queue Data Contributor“ hinzugefügt werden:

App mit der Berechtigung Storage Queue Data Contributor

Importieren der Daten mit Snowpipe

Für den Import der Daten in unsere Snowflake-Instanz müssen wir dort nun noch eine Stage, ein File Format, eine Tabelle und eine Pipe anlegen:

Diese externe Stage zeigt auf den Container unseres Storage Accounts, in den die kontinuierliche Datenlieferung erfolgt.
Als STORAGE_INTEGRATION geben wir die zuvor erstellte Storage Integration an.

Das File Format beschreibt die Struktur der zu ladenden Daten.
In unserem Falle sind dies base64-kodierte json-Dateien.

In diese Tabelle wird die Pipe die auf dem Storage Account ankommenden Daten schreiben.
Da wir base64-kodierte json-Dateien laden, besteht sie lediglich aus einer einzigen Spalte vom Typ VARIANT.

Die Pipe erstellen wir auf der SENSOR_DATA_QUEUE und lassen sie einen COPY INTO Befehl ausführen.

Jedes BlobCreated Event unserer Storage Queue wird nun dazu führen, dass die Pipe ausgeführt wird und die neu hinzugekommenen Daten aus der externen Stage in die zuvor angelegte Tabelle geschrieben werden.

Um die komplette Historie der Daten und damit alle bereits im Azure Storage Account liegenden Daten zu laden, führen wir einen Refresh der Pipe aus:

Nun werden die Daten nach und nach in der zuvor angelegten Tabelle eintreffen:

Enkodierte Sensordaten in einer Snowflake-Tabelle

Um die Ausführungen der Pipe anzusehen, kann beispielsweise die Tabellenfunktion PIPE_USAGE_HISTORY verwendet werden:

Um keine unnötigen Kosten zu verursachen, sollte abschließend die automatische Ausführung der Pipe ausgesetzt werden:

Fazit

Snowpipes bieten eine einfache und schnelle Möglichkeit, die Integration strukturierter und semi-strukturierter Daten aus externen Cloudspeichern zu automatisieren. So können zeitkritische Anwendungen in real-time oder near real-time mit Daten versorgt werden.

Aufgrund der nahtlosen Integration von Snowflake mit den Cloud-Messagingdiensten können Unternehmen den manuellen Entwicklungsaufwand gering halten und dennoch eine kontinuierliche Versorgung aller Anwendungen mit den relevanten Daten gewährleisten. Die serverlose Architektur von Snowflake Snowpipes vereinfacht die Nutzung im Vergleich zu ähnlichen Technologien auf Azure oder Databricks, da sie automatische Skalierung bieten und einen geringen Verwaltungsaufwand benötigen, ohne dass Nutzer sich um die Provisionierung oder Verwaltung von Servern kümmern müssen. Ferner machen Skalierbarkeit und Kosteneffizienz sie zu einem hervorragenden Werkzeug für Entwicklungsteams, die effiziente und robuste Datenverarbeitungsprozesse etablieren wollen.

Links
https://docs.snowflake.com/en/user-guide/data-load-snowpipe-intro
https://docs.snowflake.com/en/sql-reference/sql/create-storage-integration
https://docs.snowflake.com/en/sql-reference/sql/create-notification-integration

Hat dir der Beitrag gefallen?

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert