lab | ||||
---|---|---|---|---|
|
In this lab, you will learn how Azure Synapse Link enables seamless connectivity of an Azure Cosmos DB account to a Synapse workspace. You will learn how to enable and configure Synapse link, then how to query the Azure Cosmos DB analytical store using Apache Spark and SQL Serverless.
After completing this lab, you will be able to:
- Configure Azure Synapse Link with Azure Cosmos DB
- Query Azure Cosmos DB with Apache Spark for Synapse Analytics
- Query Azure Cosmos DB with serverless SQL pool for Azure Synapse Analytics
Before starting this lab, you should complete Lab 6: Transform data with Azure Data Factory or Azure Synapse Pipelines.
Note: If you have not completed lab 6, but you have completed the lab setup for this course, you can complete these steps to create the required linked service and dataset.
- In Synapse Studio, on the Manage hub, add a new Linked service for Azure Cosmos DB (SQL API) with the following settings:
- Name: asacosmosdb01
- Cosmos DB account name: asacosmosdbxxxxxxx
- Database name: CustomerProfile
- On the Data hub, create the following Integration dataset:
- Source: Azure Cosmos DB (SQL API)
- Name: asal400_customerprofile_cosmosdb
- Linked service: asacosmosdb01
- Collection: OnlineUserProfile01
- Import schema: From connection/store
Tailwind Traders uses Azure Cosmos DB to store user profile data from their eCommerce site. The NoSQL document store provided by the Azure Cosmos DB SQL API provides the familiarity of managing their data using SQL syntax, while being able to read and write the files at a massive, global scale.
While Tailwind Traders is happy with the capabilities and performance of Azure Cosmos DB, they are concerned about the cost of executing a large volume of analytical queries over multiple partitions (cross-partition queries) from their data warehouse. They want to efficiently access all the data without needing to increase the Azure Cosmos DB request units (RUs). They have looked at options for extracting data from their containers to the data lake as it changes, through the Azure Cosmos DB change feed mechanism. The problem with this approach is the extra service and code dependencies and long-term maintenance of the solution. They could perform bulk exports from a Synapse Pipeline, but then they won't have the most up-to-date information at any given moment.
You decide to enable Azure Synapse Link for Cosmos DB and enable the analytical store on their Azure Cosmos DB containers. With this configuration, all transactional data is automatically stored in a fully isolated column store. This store enables large-scale analytics against the operational data in Azure Cosmos DB, without impacting the transactional workloads or incurring resource unit (RU) costs. Azure Synapse Link for Cosmos DB creates a tight integration between Azure Cosmos DB and Azure Synapse Analytics, which enables Tailwind Traders to run near real-time analytics over their operational data with no-ETL and full performance isolation from their transactional workloads.
By combining the distributed scale of Cosmos DB's transactional processing with the built-in analytical store and the computing power of Azure Synapse Analytics, Azure Synapse Link enables a Hybrid Transactional/Analytical Processing (HTAP) architecture for optimizing Tailwind Trader's business processes. This integration eliminates ETL processes, enabling business analysts, data engineers & data scientists to self-serve and run near real-time BI, analytics, and Machine Learning pipelines over operational data.
-
In the Azure portal (https://portal.azure.com), open the resource group for your lab environment.
-
Select the Azure Cosmos DB account.
-
In the left-hand menu, select Azure Synapse Link.
-
Select Enable.
Before we can create an Azure Cosmos DB container with an analytical store, we must first enable Azure Synapse Link.
-
You must wait for this operation to complete before continuing, which should take about a minute. Check the status by selecting the Azure Notifications icon.
You will see a green checkmark next to "Enabling Synapse Link" when it successfully completes.
Tailwind Traders has an Azure Cosmos DB container named OnlineUserProfile01. Since we enabled the Azure Synapse Link feature after the container was already created, we cannot enable the analytical store on the container. We will create a new container that has the same partition key and enable the analytical store.
After creating the container, we will create a new Synapse Pipeline to copy data from the OnlineUserProfile01 container to the new one.
-
Select Data Explorer on the left-hand menu.
-
Select New Container.
-
Create a new container with the following settings:
- Database id: Use the existing CustomerProfile database.
- Container id: Enter
UserProfileHTAP
- Partition key: Enter
/userId
- Throughput: Select Autoscale
- Container max RU/s: Enter
4000
- Analytical store: On
Here we set the partition key value to userId, because it is a field we use most often in queries and contains a relatively high cardinality (number of unique values) for good partitioning performance. We set the throughput to Autoscale with a maximum value of 4,000 request units (RUs). This means that the container will have a minimum of 400 RUs allocated (10% of the maximum number), and will scale up to a maximum of 4,000 when the scale engine detects a high enough demand to warrant increasing the throughput. Finally, we enable the analytical store on the container, which allows us to take full advantage of the Hybrid Transactional/Analytical Processing (HTAP) architecture from within Synapse Analytics.
Let's take a quick look at the data we will copy over to the new container.
-
Expand the OnlineUserProfile01 container underneath the CustomerProfile database, then select Items. Select one of the documents and view its contents. The documents are stored in JSON format.
-
Select Keys in the left-hand menu. You will need the Primary Key and the Cosmos DB account name (in the upper-left corner) later, so keep this tab open.
Now that we have the new Azure Cosmos DB container with the analytical store enabled, we need to copy the contents of the existing container by using a Synapse Pipeline.
-
Open Synapse Studio (https://web.azuresynapse.net/) in a different tab, and then navigate to the Integrate hub.
-
In the + menu, select Pipeline.
-
Under Activities, expand the Move & transform group, then drag the Copy data activity onto the canvas. Set the Name to
Copy Cosmos DB Container
in the Properties blade. -
Select the new Copy data activity that you added to the canvas; and on the Source tab beneath the canvas, select the asal400_customerprofile_cosmosdb source dataset.
-
Select the Sink tab, then select + New.
-
Select the Azure Cosmos DB (SQL API) dataset type, then select Continue.
-
Set the following properties, then click OK:
- Name: Enter
cosmos_db_htap
. - Linked service: Select asacosmosdb01.
- Collection: Select UserProfileHTAP/
- Import schema: Select From connection/store under Import schema).
- Name: Enter
-
Underneath the new sink dataset you just added, ensure that the Insert write behavior is selected.
-
Select Publish all, then Publish to save the new pipeline.
-
Above the pipeline canvas, select Add trigger, then Trigger now. Select OK to trigger the run.
-
Navigate to the Monitor hub.
-
Select Pipeline runs and wait until the pipeline run has successfully completed. You may have to select Refresh a few times.
This may take around 4 minutes to complete.
Tailwind Traders wants to use Apache Spark to run analytical queries against the new Azure Cosmos DB container. In this segment, we will use built-in gestures in Synapse Studio to quickly create a Synapse Notebook that loads data from the analytical store of the HTAP-enabled container, without impacting the transactional store.
Tailwind Traders is trying to solve how they can use the list of preferred products identified with each user, coupled with any matching product IDs in their review history, to show a list of all preferred product reviews.
-
Navigate to the Data hub.
-
Select the Linked tab and expand the Azure Cosmos DB section (if this is not visible, use the ↻ button at the top right to refresh Synapse Studio), then expand the asacosmosdb01 (CustomerProfile) linked service. Right-click the UserProfileHTAP container, select New notebook, and then select Load to DataFrame.
Notice that the UserProfileHTAP container that we created has a slightly different icon than the other container. This indicates that the analytical store is enabled.
-
In the new notebook, select the SparkPool01 Spark pool in the Attach to dropdown list.
-
Select Run all.
It will take a few minutes to start the Spark session the first time.
In the generated code within Cell 1, notice that the spark.read format is set to cosmos.olap. This instructs Synapse Link to use the container's analytical store. If we wanted to connect to the transactional store instead, like to read from the change feed or write to the container, we'd use cosmos.oltp instead.
Note: You cannot write to the analytical store, only read from it. If you want to load data into the container, you need to connect to the transactional store.
The first option configures the name of the Azure Cosmos DB linked service. The second
option
defines the Azure Cosmos DB container from which we want to read. -
Select the + Code button underneath the cell you ran. This adds a new code cell beneath the first one.
-
The DataFrame contains extra columns that we don't need. Let's remove the unwanted columns and create a clean version of the DataFrame. To do this, enter the following in the new code cell and run it:
unwanted_cols = {'_attachments','_etag','_rid','_self','_ts','collectionType','id'} # Remove unwanted columns from the columns collection cols = list(set(df.columns) - unwanted_cols) profiles = df.select(cols) display(profiles.limit(10))
The output now only contains the columns that we want. Notice that the preferredProducts and productReviews columns contain child elements. Expand the values on a row to view them. You may recall seeing the raw JSON format in the UserProfiles01 container within the Azure Cosmos DB Data Explorer.
-
We should know how many records we're dealing with. To do this, enter the following in a new code cell and run it:
profiles.count()
You should see a count result of 99,999.
-
We want to use the preferredProducts column array and productReviews column array for each user and create a graph of products that are from their preferred list that match with products that they have reviewed. To do this, we need to create two new DataFrames that contain flattened values from those two columns so we can join them in a later step. Enter the following in a new code cell and run it:
from pyspark.sql.functions import udf, explode preferredProductsFlat=profiles.select('userId',explode('preferredProducts').alias('productId')) productReviewsFlat=profiles.select('userId',explode('productReviews').alias('productReviews')) display(productReviewsFlat.limit(10))
In this cell, we imported the special PySpark explode function, which returns a new row for each element of the array. This function helps flatten the preferredProducts and productReviews columns for better readability or for easier querying.
Observe the cell output where we display the productReviewFlat DataFrame contents. We see a new productReviews column that contains the productId we want to match to the preferred products list for the user, as well as the reviewText that we want to display or save.
-
Let's look at the preferredProductsFlat DataFrame contents. To do this, enter the following in a new cell and run it:
display(preferredProductsFlat.limit(20))
Since we used the explode function on the preferred products array, we have flattened the column values to userId and productId rows, ordered by user.
-
Now we need to further flatten the productReviewFlat DataFrame contents to extract the productReviews.productId and productReviews.reviewText fields and create new rows for each data combination. To do this, enter the following in a new code cell and run it:
productReviews = (productReviewsFlat.select('userId','productReviews.productId','productReviews.reviewText') .orderBy('userId')) display(productReviews.limit(10))
In the output, notice that we now have multiple rows for each
userId
. -
The final step is to join the preferredProductsFlat and productReviews DataFrames on the userId and productId values to build our graph of preferred product reviews. To do this, enter the following in a new code cell and run it:
preferredProductReviews = (preferredProductsFlat.join(productReviews, (preferredProductsFlat.userId == productReviews.userId) & (preferredProductsFlat.productId == productReviews.productId)) ) display(preferredProductReviews.limit(100))
Note: You can click on the column headers in the Table view to sort the result set.
-
At the top right of the notebook, use the Stop Session button to stop the notebook session. Then close the notebook, discarding the changes.
Tailwind Traders wants to explore the Azure Cosmos DB analytical store with T-SQL. Ideally, they can create views that can then be used for joins with other analytical store containers, files from the data lake, or accessed by external tools, like Power BI.
-
Navigate to the Develop hub.
-
In the + menu, select SQL script.
-
When the script opens, in the Properties pane to the right, change the Name to
User Profile HTAP
. Then use the Properties button to close the pane. -
Verify that the serverless SQL pool (Built-in) is selected.
-
Paste the following SQL query. In the OPENROWSET statement, replace YOUR_ACCOUNT_NAME with the Azure Cosmos DB account name and YOUR_ACCOUNT_KEY with the Azure Cosmos DB Primary Key from the Keys page in the Azure portal (which should still be open in another tab).
USE master GO IF DB_ID (N'Profiles') IS NULL BEGIN CREATE DATABASE Profiles; END GO USE Profiles GO DROP VIEW IF EXISTS UserProfileHTAP; GO CREATE VIEW UserProfileHTAP AS SELECT * FROM OPENROWSET( 'CosmosDB', N'account=YOUR_ACCOUNT_NAME;database=CustomerProfile;key=YOUR_ACCOUNT_KEY', UserProfileHTAP ) WITH ( userId bigint, cartId varchar(50), preferredProducts varchar(max), productReviews varchar(max) ) AS profiles CROSS APPLY OPENJSON (productReviews) WITH ( productId bigint, reviewText varchar(1000) ) AS reviews GO
-
Use the Run button to run the query, which:
- Creates a new serverless SQL pool database named Profiles if it does not exist
- Changes the database context to the Profiles database.
- Drops the UserProfileHTAP view if it exists.
- Creates a SQL view named UserProfileHTAP.
- Uses the OPENROWSET statement to set the data source type to CosmosDB, sets the account details, and specifies that we want to create the view over the Azure Cosmos DB analytical store container named UserProfileHTAP.
- Matches the property names in the JSON documents and applies the appropriate SQL data types. Notice that we set the preferredProducts and productReviews fields to varchar(max). This is because both of these properties contain JSON-formatted data within.
- Since the productReviews property in the JSON documents contain nested subarrays, the script needs to "join" the properties from the document with all elements of the array. Synapse SQL enables us to flatten the nested structure by applying the OPENJSON function on the nested array. We flatten the values within productReviews like we did using the Python explode function earlier in the Synapse Notebook.
-
Navigate to the Data hub.
-
Select the Workspace tab and expand the SQL database group. Expand the Profiles SQL on-demand database (if you do not see this on the list, refresh the Databases list). Expand Views, then right-click the UserProfileHTA view, select New SQL script, and then Select TOP 100 rows.
-
Ensure the script is connected to the Built-in SQL pool, then run the query and view the results.
The preferredProducts and productReviews fields are included in the view, which both contain JSON-formatted values. Notice how the CROSS APPLY OPENJSON statement in the view successfully flattened the nested subarray values in the productReviews field by extracting the productId and reviewText values into new fields.