Willkommen zu unserem ersten Artikel in einer längeren Artikelserie über die Snowflake Data Cloud, einer cloud-nativen Plattform, die separate Data Warehouses, Data Lakes und Data Marts überflüssig macht und eine sichere gemeinsame Nutzung von Daten im gesamten Unternehmen ermöglicht.
Snowflake ist seit vielen Jahren als cloud-native Data Warehouse Lösung bekannt, hat aber in den letzten Monaten viele neue Features, wie z.B. Snowpark als Spark-ähnliche Dataframe API für klassisches Data Engineering und Snowpark ML für Data Science, hinzugewonnen. Da damit viele Use-Cases erstmals auf Snowflake ermöglicht werden, die bisher traditionell auf der Databricks Lakehouse Plattform umgesetzt wurden, möchten wir in einer Reihe an Artikeln die Features der Snowflake Data Cloud beleuchten. Ziel dieser Artikelserie ist es, einem breiteren Publikum ein umfassendes Verständnis über die Snowflake Technologien zu vermitteln, um selbst evaluieren zu können, ob Snowflake eine passende Lösung für den eigenen Use-Case sein kann.
In diesem ersten Beitrag unserer Blogserie zu Snowflake schauen wir uns Snowpipe an. Snowpipe ermöglicht das Laden von Dateien, sobald sie in einem externen Cloudspeicher zur Verfügung stehen. Entsprechende Ladestrecken können vollständig automatisiert werden, indem Ereignisbenachrichtigungen verwendet werden, die über das Eintreffen neuer Dateien informieren.
Wir bei inovex nutzen diese Technologie, um die Daten unseres Meetingraum-Monitorings auf einfachem und schnellem Wege in Snowflake zur Verfügung zu stellen. Hierzu werden die von einem Azure IoT Hub in einen Storage Account geschriebenen Daten kontinuierlich in eine Snowflake-Tabelle geladen.
Anhand dieses Use Case zeigen wir Step-by-step, wie eine automatisierte Ladestrecke aus einem Azure Storage Account mit Snowpipe umgesetzt werden kann.
Voraussetzungen
Wir setzen voraus, dass die folgenden Komponenten bereits vorhanden sind:
- Snowflake-Zugang mit entsprechende Berechtigungen sowie Datenbank und Schema
- Azure-Zugang mit entsprechende Berechtigungen
- Storage Account auf dem kontinuierlich Daten eintreffen (kann natürlich auch manuell nachgeahmt werden)
- Storage Queue auf dem Storage Account
Die einzelnen Schritte in der Übersicht
- Erstellen einer Storage Integration
- Erstellen einer Notification Integration
- Importieren der Daten mit Snowpipe
Erstellen einer Storage Integration
Zunächst erstellen wir in Snowflake eine Storage Integration. Diese ermöglicht die Integration mit externen Cloudspeichern, in unserem Falle mit einem Azure Storage Account. Auf diesem Wege können dann Daten von dort abgerufen und dort gespeichert werden.
1 2 3 4 5 6 7 |
-- Create storage integration CREATE OR REPLACE STORAGE INTEGRATION SENSOR_DATA_INTEGRATION TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = 'AZURE' ENABLED = TRUE AZURE_TENANT_ID = '<tenantId>' STORAGE_ALLOWED_LOCATIONS = ('azure://<storageAccountName>.blob.core.windows.net/<containerName>/'); |
Die AZURE_TENANT_ID ist die ID des Office 365 Mandanten, zu dem der verwendete Storage Account gehört.
In STORAGE_ALLOWED_LOCATIONS können beliebig viele Pfade angegeben werden, auf die wir Zugriff gewähren wollen. Wir geben dort also den Pfad zu dem Container an, in den kontinuierlich Daten geliefert werden.
Anschließend lassen wir uns die Storage Integration mittels DESC beschreiben:
1 2 |
-- Get consent url DESC STORAGE INTEGRATION SENSOR_DATA_INTEGRATION; |
Der AZURE_MULTI_TENANT_APP_NAME ist der Name der Snowflake-Clientanwendung, die für die Storage Integration verwendet wird. Diesen werden wir brauchen, um der Anwendung im Azure Portal die benötigten Berechtigungen zu erteilen.
Die AZURE_CONSENT_URL zeigt die URL zu einer Microsoft-Berechtigungsanforderungsseite.
Geben wir diese in unseren Browser ein, werden wir aufgefordert, der Clientanwendung die benötigten Berechtigungen zu erteilen:
Wir akzeptieren die Vergabe dieser Berechtigungen und werden im Anschluss auf die Snowflake-Website weitergeleitet.
Damit die Anwendung die Daten in unserem Storage Account lesen darf, fügen wir ihn im Access Control (IAM) Blade des Storage Accounts mit der Rolle “Storage Blob Data Reader“ hinzu:
Erstellen einer Notification Integration
Eine Notification Integration ist ein Snowflake-Objekt, das als Schnittstelle zwischen Snowflake und Cloud-Messagingdiensten dient. Für unser Szenario bedeutet dies, dass Snowflake von einer Azure Storage Queue über das Eintreffen neuer Daten auf dem Storage Account benachrichtigt wird. Auf diese Benachrichtigung setzen wir dann eine Pipe, die automatisch die neuen Daten importieren wird.
Das Erstellen einer Notification Integration ist ähnlich dem Erstellen einer Storage Integration. Zuerst legen wir das Objekt in Snowflake an:
1 2 3 4 5 6 7 |
-- Create notification integration CREATE OR REPLACE NOTIFICATION INTEGRATION SENSOR_DATA_QUEUE TYPE = QUEUE NOTIFICATION_PROVIDER = AZURE_STORAGE_QUEUE ENABLED = true AZURE_TENANT_ID = '<tenantId>' AZURE_STORAGE_QUEUE_PRIMARY_URI = 'https://<storageAccountName>.queue.core.windows.net/<storageQueueName>'; |
Wie zuvor ist die AZURE_TENANT_ID die ID des Office 365 Mandanten, zu dem der verwendete Storage Account gehört.
In AZURE_STORAGE_QUEUE_PRIMARY_URI tragen wir die URL der Azure Storage Queue auf unserem Storage Account ein.
Mit DESC sehen wir den Namen der Snowflake-Clientanwendung und holen uns die Consent URL:
1 2 |
-- Get consent url DESC NOTIFICATION INTEGRATION SENSOR_DATA_QUEUE; |
Ab in den Browser damit und die angeforderten Berechtigungen akzeptieren:
Im Access Control (IAM) Blade der Storage Queue muss die Anwendung nun noch mit der Rolle “Storage Queue Data Contributor“ hinzugefügt werden:
Importieren der Daten mit Snowpipe
Für den Import der Daten in unsere Snowflake-Instanz müssen wir dort nun noch eine Stage, ein File Format, eine Tabelle und eine Pipe anlegen:
1 2 3 4 |
-- Create stage CREATE OR REPLACE STAGE SENSOR_DATA_STAGE URL = 'azure://<storageAccountName>.blob.core.windows.net/<containerName>/' STORAGE_INTEGRATION = SENSOR_DATA_INTEGRATION; |
Diese externe Stage zeigt auf den Container unseres Storage Accounts, in den die kontinuierliche Datenlieferung erfolgt.
Als STORAGE_INTEGRATION geben wir die zuvor erstellte Storage Integration an.
1 2 3 4 |
-- Create a json file format CREATE OR REPLACE FILE FORMAT SENSOR_DATA_FORMAT TYPE = JSON BINARY_FORMAT = BASE64; |
Das File Format beschreibt die Struktur der zu ladenden Daten.
In unserem Falle sind dies base64-kodierte json-Dateien.
1 2 3 4 |
-- Create table for json data CREATE OR REPLACE TABLE SENSOR_DATA_RAW ( BODY VARIANT ); |
In diese Tabelle wird die Pipe die auf dem Storage Account ankommenden Daten schreiben.
Da wir base64-kodierte json-Dateien laden, besteht sie lediglich aus einer einzigen Spalte vom Typ VARIANT.
1 2 3 4 5 6 7 |
-- Create pipe CREATE OR REPLACE PIPE SENSOR_DATA_PIPE INTEGRATION = 'SENSOR_DATA_QUEUE' AUTO_INGEST = true AS COPY INTO SENSOR_DATA_RAW FROM @SENSOR_DATA_STAGE FILE_FORMAT = SENSOR_DATA_FORMAT; |
Die Pipe erstellen wir auf der SENSOR_DATA_QUEUE und lassen sie einen COPY INTO Befehl ausführen.
Jedes BlobCreated Event unserer Storage Queue wird nun dazu führen, dass die Pipe ausgeführt wird und die neu hinzugekommenen Daten aus der externen Stage in die zuvor angelegte Tabelle geschrieben werden.
Um die komplette Historie der Daten und damit alle bereits im Azure Storage Account liegenden Daten zu laden, führen wir einen Refresh der Pipe aus:
1 2 |
-- Load the history ALTER PIPE SENSOR_DATA_PIPE REFRESH; |
Nun werden die Daten nach und nach in der zuvor angelegten Tabelle eintreffen:
Um die Ausführungen der Pipe anzusehen, kann beispielsweise die Tabellenfunktion PIPE_USAGE_HISTORY verwendet werden:
1 2 3 4 5 6 |
-- Check pipe executions SELECT * FROM table(information_schema.pipe_usage_history( date_range_start => dateadd('day', -70, current_date()), date_range_end => current_date(), pipe_name => 'SENSOR_DATA_PIPE')); |
Um keine unnötigen Kosten zu verursachen, sollte abschließend die automatische Ausführung der Pipe ausgesetzt werden:
1 2 |
-- Suspend pipe ALTER PIPE SENSOR_DATA_PIPE SET PIPE_EXECUTION_PAUSED = TRUE; |
Fazit
Snowpipes bieten eine einfache und schnelle Möglichkeit, die Integration strukturierter und semi-strukturierter Daten aus externen Cloudspeichern zu automatisieren. So können zeitkritische Anwendungen in real-time oder near real-time mit Daten versorgt werden.
Aufgrund der nahtlosen Integration von Snowflake mit den Cloud-Messagingdiensten können Unternehmen den manuellen Entwicklungsaufwand gering halten und dennoch eine kontinuierliche Versorgung aller Anwendungen mit den relevanten Daten gewährleisten. Die serverlose Architektur von Snowflake Snowpipes vereinfacht die Nutzung im Vergleich zu ähnlichen Technologien auf Azure oder Databricks, da sie automatische Skalierung bieten und einen geringen Verwaltungsaufwand benötigen, ohne dass Nutzer sich um die Provisionierung oder Verwaltung von Servern kümmern müssen. Ferner machen Skalierbarkeit und Kosteneffizienz sie zu einem hervorragenden Werkzeug für Entwicklungsteams, die effiziente und robuste Datenverarbeitungsprozesse etablieren wollen.
Links
https://docs.snowflake.com/en/user-guide/data-load-snowpipe-intro
https://docs.snowflake.com/en/sql-reference/sql/create-storage-integration
https://docs.snowflake.com/en/sql-reference/sql/create-notification-integration