Skip to content

Latest commit

 

History

History
1142 lines (817 loc) · 55.1 KB

lab01-data-integration-part-1.md

File metadata and controls

1142 lines (817 loc) · 55.1 KB

Data Integration Part 1

Resource naming throughout this lab

For the remainder of this guide, the following terms will be used for various ASA-related resources (make sure you replace them with actual names and values):

Azure Synapse Analytics Resource To be referred to
Workspace resource group WorkspaceResourceGroup
Workspace / workspace name Workspace
Primary Storage Account PrimaryStorage
Default file system container DefaultFileSystem
SQL Pool SqlPool01

Lab prerequisite

Start the SQL Pool in your lab environment.

  1. Open Synapse Studio (https://web.azuresynapse.net/). If you see a prompt to select your workspace, select the Azure subscription and workspace name used for the lab. When using a hosted lab environment, the workspace name will end with the same SUFFIX as your user name, as shown in the screenshot below. Make note of the suffix, as it is referenced throughout this and the remaining labs.

    The username suffix and workspace suffix are highlighted.

    NOTE You may need to logout of whatever account you are already logged into before you can gain access to the target workspace. You can also attempt to utilize a new InPrivate browser window.

  2. If this is your first time connecting to the Synapse Analytics workspace, you may see the Getting started prompt. Select Close to continue.

    The Close button is highlighted on the Getting started dialog.

  3. Once the Synapse Studio workspace is loaded, navigate to the Manage hub.

    The Manage menu item is highlighted.

  4. From the center menu, select SQL pools from beneath the Analytics pools heading. Locate SQLPool01, and select the Resume button.

    The Manage menu item is selected, with SQL pools selected from the center menu. The resume button is selected next to the SQLPool01 item.

Exercise 1: Configure linked service and create datasets

Task 1: Create linked service

Our data sources for labs 1 and 2 include files stored in ADLS Gen2 and Azure Cosmos DB. The linked service for ADLS Gen2 already exists as it is the primary ADLS Gen2 account for the workspace.

  1. After connecting to the Synapse Analytics workspace, navigate to the Manage hub.

    The Manage menu item is highlighted.

  2. Open Linked services and select + New to create a new linked service. Select Azure Cosmos DB (SQL API) in the list of options, then select Continue.

    Manage, New, and the Azure Cosmos DB linked service option are highlighted.

  3. Name the linked service asacosmosdb01. Set the Account selection method to From Azure subscription and select the Azure Labs X subscription. For Azure Cosmos DB account name select asacosmosdb{Suffix} and set the Database name value to CustomerProfile.

    New Azure Cosmos DB linked service.

  4. Select Create to create the linked service.

Task 2: Create datasets

  1. Navigate to the Data hub.

    The Data menu item is highlighted.

  2. With the Workspace tab selected under Data, select + in the toolbar, then select Integration dataset to create a new dataset.

    Create new Dataset.

  3. Create a new Azure Cosmos DB (SQL API) dataset with the following characteristics:

    • Name: Enter asal400_customerprofile_cosmosdb.
    • Linked service: Select the Azure Cosmos DB linked service.
    • Collection: Select OnlineUserProfile01.

    New Azure Cosmos DB dataset.

  4. After creating the dataset, navigate to its Connection tab, then select Preview data.

    The preview data button on the dataset is highlighted.

  5. Preview data queries the selected Azure Cosmos DB collection and returns a sample of the documents within. The documents are stored in JSON format and include a userId field, cartId, preferredProducts (an array of product IDs that may be empty), and productReviews (an array of written product reviews that may be empty). We will use this data in lab 2.

    A preview of the Azure Cosmos DB data is displayed.

  6. Select the Schema tab, then select Import schema. Synapse Analytics evaluates the JSON documents within the collection and infers the schema based on the nature of the data within. Since we are only storing one document type in this collection, you will see the inferred schema for all documents within.

    The inferred schema for the Azure Cosmos DB documents is displayed.

  7. Remaining in the Data Hub, on the Data blade, expand the + menu, and select Integration dataset. Create a new Azure Data Lake Storage Gen2 dataset with the Parquet format type with the following characteristics (remember, you can create integration datasets in the Data Hub):

    • Name: Enter asal400_sales_adlsgen2.
    • Linked service: Select the asadatalakeXX linked service that already exists.
    • File path: Browse to the wwi-02/sale-small path.
    • Import schema: Select From connection/store.

    The create ADLS Gen2 dataset form is displayed.

  8. Remaining in the Data Hub, on the Data blade, expand the + menu, and select Integration dataset. Create a new Azure Data Lake Storage Gen2 integration dataset with the JSON format type with the following characteristics (Data Hub + New Integration Dataset):

    • Name: Enter asal400_ecommerce_userprofiles_source.
    • Linked service: Select the asadatalakeXX linked service that already exists.
    • File path: Browse to the wwi-02/online-user-profiles-02 path.
    • Import schema: Select From connection/store.
  9. Select Publish all, then Publish to save your new resources.

    Publish all is highlighted.

Exercise 2: Explore source data in the Data hub

Understanding data through data exploration is one of the core challenges faced today by data engineers and data scientists as well. Depending on the underlying structure of the data as well as the specific requirements of the exploration process, different data processing engines will offer varying degrees of performance, complexity, and flexibility.

In Azure Synapse Analytics, you have the possibility of using either the Synapse serverless SQL engine, the big-data Spark engine, or both.

In this exercise, you will explore the data lake using both options.

Task 1: Query sales Parquet data with a serverless SQL pool

When you query Parquet files using a serverless SQL pool, you can explore the data with T-SQL syntax.

  1. In Synapse Analytics Studio, navigate to the Data hub.

    The Data menu item is highlighted.

  2. Select the Linked tab and expand Azure Data Lake Storage Gen2*. Expand the asaworkspaceXX primary ADLS Gen2 account and select wwi-02.

  3. Navigate to the sale-small/Year=2016/Quarter=Q4/Month=12/Day=20161231 folder. Right-click on the sale-small-20161231-snappy.parquet file, select New SQL script, then Select TOP 100 rows.

    The Data hub is displayed with the options highlighted.

  4. Ensure Built-in is selected in the Connect to dropdown list above the query window, then run the query. Data is loaded by the Synapse SQL Serverless endpoint and processed as if was coming from any regular relational database.

    The serverless SQL connection is highlighted.

  5. Modify the SQL query to perform aggregates and grouping operations to better understand the data. Replace the query with the following, making sure that the file path in OPENROWSET matches your current file path:

    SELECT
        TransactionDate, ProductId,
            CAST(SUM(ProfitAmount) AS decimal(18,2)) AS [(sum) Profit],
            CAST(AVG(ProfitAmount) AS decimal(18,2)) AS [(avg) Profit],
            SUM(Quantity) AS [(sum) Quantity]
    FROM
        OPENROWSET(
            BULK 'https://asadatalakeSUFFIX.dfs.core.windows.net/wwi-02/sale-small/Year=2016/Quarter=Q4/Month=12/Day=20161231/sale-small-20161231-snappy.parquet',
            FORMAT='PARQUET'
        ) AS [r] GROUP BY r.TransactionDate, r.ProductId;

    The T-SQL query above is displayed within the query window.

  6. Now let's figure out how many records are contained within the Parquet files for 2019 data. This information is important for planning how we optimize for importing the data into Azure Synapse Analytics. To do this, replace your query with the following (be sure to update the name of your data lake in BULK statement, by replacing [asadatalakeSUFFIX]):

    SELECT
        COUNT(*)
    FROM
        OPENROWSET(
            BULK 'https://asadatalakeSUFFIX.dfs.core.windows.net/wwi-02/sale-small/Year=2019/*/*/*/*',
            FORMAT='PARQUET'
        ) AS [r];

    Notice how we updated the path to include all Parquet files in all subfolders of sale-small/Year=2019.

    The output should be 339507246 records.

    Optional: If you wish to keep this SQL script for future reference, select the Properties button, provide a descriptive name, such as ASAL400 - Lab1 - Explore sales data, then select Publish all.

    The SQL Script properties is displayed with the new script name, and the Publish all button is highlighted.

Task 2: Query sales Parquet data with Azure Synapse Spark

  1. Navigate to the Data hub, browse to the data lake storage account folder sale-small/Year=2010/Quarter=Q4/Month=12/Day=20101231 if needed, then right-click the Parquet file and select New notebook, then Load to DataFrame.

    The Parquet file is displayed with the New notebook menu item highlighted.

  2. This will generate a notebook with PySpark code to load the data in a dataframe and display 100 rows with the header.

  3. Attach the notebook to a Spark pool.

    The Spark pool list is displayed.

  4. Select Run all on the notebook toolbar to execute the notebook.

    Note: The first time you run a notebook in a Spark pool, Synapse creates a new session. This can take approximately 3-5 minutes.

    Note: To run just the cell, either hover over the cell and select the Run cell icon to the left of the cell, or select the cell then type Ctrl+Enter on your keyboard.

  5. Create a new cell underneath by hovering over the cell and selecting the + Code button beneath the notebook cell.

    The Add Code menu option is highlighted.

  6. The Spark engine can analyze the Parquet files and infer the schema. To do this, enter the following in the new cell:

    df.printSchema()

    Your output should look like the following:

    root
        |-- TransactionId: string (nullable = true)
        |-- CustomerId: integer (nullable = true)
        |-- ProductId: short (nullable = true)
        |-- Quantity: short (nullable = true)
        |-- Price: decimal(29,2) (nullable = true)
        |-- TotalAmount: decimal(29,2) (nullable = true)
        |-- TransactionDate: integer (nullable = true)
        |-- ProfitAmount: decimal(29,2) (nullable = true)
        |-- Hour: byte (nullable = true)
        |-- Minute: byte (nullable = true)
        |-- StoreId: short (nullable = true)
    
  7. Now let's use the dataframe to perform the same grouping and aggregate query we performed with the serverless SQL pool. Create a new cell and enter the following:

    from pyspark.sql import SparkSession
    from pyspark.sql.types import *
    from pyspark.sql.functions import *
    
    profitByDateProduct = (df.groupBy("TransactionDate","ProductId")
        .agg(
            sum("ProfitAmount").alias("(sum)ProfitAmount"),
            round(avg("Quantity"), 4).alias("(avg)Quantity"),
            sum("Quantity").alias("(sum)Quantity"))
        .orderBy("TransactionDate"))
    profitByDateProduct.show(100)

We import required Python libraries to use aggregation functions and types defined in the schema to successfully execute the query.

Task 3: Query user profile JSON data with Apache Spark in Azure Synapse Analytics

In addition to the sales data, we have customer profile data from an e-commerce system that provides top product purchases for each visitor of the site (customer) over the past 12 months. This data is stored within JSON files in the data lake. We will import this data in the next lab, but let's explore it while we're in the Spark notebook.

  1. Create a new cell in the Spark notebook, enter the following code, replace <asadatalakeNNNNNN> with your data lake name (you can find this value in the first cell of the notebook), and execute the cell:

    df = (spark.read \
            .option("inferSchema", "true") \
            .json("abfss://[email protected]/online-user-profiles-02/*.json", multiLine=True)
        )
    
    df.printSchema()

    Your output should look like the following:

    root
    |-- topProductPurchases: array (nullable = true)
    |    |-- element: struct (containsNull = true)
    |    |    |-- itemsPurchasedLast12Months: long (nullable = true)
    |    |    |-- productId: long (nullable = true)
    |-- visitorId: long (nullable = true)
    

    Notice that we are selecting all JSON files within the online-user-profiles-02 directory. Each JSON file contains several rows, which is why we specified the multiLine=True option. Also, we set the inferSchema option to true, which instructs the Spark engine to review the files and create a schema based on the nature of the data.

  2. We have been using Python code in these cells up to this point. If we want to query the files using SQL syntax, one option is to create a temporary view of the data within the dataframe. Execute the following in a new cell to create a view named user_profiles:

    # create a view called user_profiles
    df.createOrReplaceTempView("user_profiles")
  3. Create a new cell. Since we want to use SQL instead of Python, we use the %%sql magic to set the language of the cell to SQL. Execute the following code in the cell:

    %%sql
    
    SELECT * FROM user_profiles LIMIT 10

    Notice that the output shows nested data for topProductPurchases, which includes an array of productId and itemsPurchasedLast12Months values. You can expand the fields by clicking the right triangle in each row.

    JSON nested output.

    This makes analyzing the data a bit difficult. This is because the JSON file contents look like the following:

    [
    {
        "visitorId": 9529082,
        "topProductPurchases": [
        {
            "productId": 4679,
            "itemsPurchasedLast12Months": 26
        },
        {
            "productId": 1779,
            "itemsPurchasedLast12Months": 32
        },
        {
            "productId": 2125,
            "itemsPurchasedLast12Months": 75
        },
        {
            "productId": 2007,
            "itemsPurchasedLast12Months": 39
        },
        {
            "productId": 1240,
            "itemsPurchasedLast12Months": 31
        },
        {
            "productId": 446,
            "itemsPurchasedLast12Months": 39
        },
        {
            "productId": 3110,
            "itemsPurchasedLast12Months": 40
        },
        {
            "productId": 52,
            "itemsPurchasedLast12Months": 2
        },
        {
            "productId": 978,
            "itemsPurchasedLast12Months": 81
        },
        {
            "productId": 1219,
            "itemsPurchasedLast12Months": 56
        },
        {
            "productId": 2982,
            "itemsPurchasedLast12Months": 59
        }
        ]
    },
    {
        ...
    },
    {
        ...
    }
    ]
  4. PySpark contains a special explode function, which returns a new row for each element of the array. This will help flatten the topProductPurchases column for better readability or for easier querying. Execute the following in a new cell:

    from pyspark.sql.functions import udf, explode
    
    flat=df.select('visitorId',explode('topProductPurchases').alias('topProductPurchases_flat'))
    flat.show(100)

    In this cell, we created a new dataframe named flat that includes the visitorId field and a new aliased field named topProductPurchases_flat. As you can see, the output is a bit easier to read and, by extension, easier to query.

    The improved output is displayed.

  5. Create a new cell and execute the following code to create a new flattened version of the dataframe that extracts the topProductPurchases_flat.productId and topProductPurchases_flat.itemsPurchasedLast12Months fields to create new rows for each data combination:

    topPurchases = (flat.select('visitorId','topProductPurchases_flat.productId','topProductPurchases_flat.itemsPurchasedLast12Months')
        .orderBy('visitorId'))
    
    topPurchases.show(100)

    In the output, notice that we now have multiple rows for each visitorId.

    The vistorId rows are highlighted.

  6. Let's order the rows by the number of items purchased in the last 12 months. Create a new cell and execute the following code:

    # Let's order by the number of items purchased in the last 12 months
    sortedTopPurchases = topPurchases.orderBy("itemsPurchasedLast12Months")
    
    sortedTopPurchases.show(100)
  7. How do we sort in reverse order? One might conclude that we could make a call like this: topPurchases.orderBy("itemsPurchasedLast12Months desc"). Try it in a new cell:

    topPurchases.orderBy("itemsPurchasedLast12Months desc")

    An error is displayed.

    Notice that there is an AnalysisException error, because itemsPurchasedLast12Months desc does not match up with a column name.

    Why does this not work?

    • The DataFrames API is built upon an SQL engine.
    • There is a lot of familiarity with this API and SQL syntax in general.
    • The problem is that orderBy(..) expects the name of the column.
    • What we specified was an SQL expression in the form of requests desc.
    • What we need is a way to programmatically express such an expression.
    • This leads us to the second variant, orderBy(Column) and more specifically, the class Column.
  8. The Column class is an object that encompasses more than just the name of the column, but also column-level-transformations, such as sorting in a descending order. Execute the following code in a new cell:

    sortedTopPurchases = (topPurchases
        .orderBy( col("itemsPurchasedLast12Months").desc() ))
    
    sortedTopPurchases.show(100)
  9. How many types of products did each customer purchase? To figure this out, we need to group by visitorId and aggregate on the number of rows per customer. Execute the following code in a new cell:

    groupedTopPurchases = (sortedTopPurchases.select("visitorId")
        .groupBy("visitorId")
        .agg(count("*").alias("total"))
        .orderBy("visitorId") )
    
    groupedTopPurchases.show(100)

    The query output is displayed.

  10. How many total items did each customer purchase? To figure this out, we need to group by visitorId and aggregate on the sum of itemsPurchasedLast12Months values per customer. Execute the following code in a new cell:

    groupedTopPurchases = (sortedTopPurchases.select("visitorId","itemsPurchasedLast12Months")
        .groupBy("visitorId")
        .agg(sum("itemsPurchasedLast12Months").alias("totalItemsPurchased"))
        .orderBy("visitorId") )
    
    groupedTopPurchases.show(100)

    The query output is displayed.

Exercise 3: Import sales data with PolyBase and COPY using T-SQL

There are different options for loading large amounts and varying types of data into Azure Synapse Analytics, such as through T-SQL commands using a Synapse SQL Pool, and with Azure Synapse pipelines. In our scenario, Wide World Importers stores most of their raw data in a data lake and in different formats. Among the data loading options available to them, WWI's data engineers are most comfortable using T-SQL.

However, even with their familiarity with SQL, there are some things to consider when loading large or disparate file types and formats. Since the files are stored in ADLS Gen2, WWI can use either PolyBase external tables or the new COPY statement. Both options enable fast and scalable data load operations, but there are some differences between the two:

PolyBase COPY
GA, stable GA, stable
Needs CONTROL permission Relaxed permission
Has row width limits No row width limit
No delimiters within text Supports delimiters in text
Fixed line delimiter Supports custom column and row delimiters
Complex to set up in code Reduces amount of code

WWI has heard that PolyBase is generally faster than COPY, especially when working with large data sets.

In this exercise, you will help WWI compare ease of setup, flexibility, and speed between these loading strategies.

Task 1: Create staging tables

The Sale table has a columnstore index to optimize for read-heavy workloads. It is also used heavily for reporting and ad-hoc queries. To achieve the fastest loading speed and minimize the impact of heavy data inserts on the Sale table, WWI has decided to create a staging table for loads.

In this task, you will create a new staging table named SaleHeap in a new schema named wwi_staging. You will define it as a heap and use round-robin distribution. When WWI finalizes their data loading pipeline, they will load the data into SaleHeap, then insert from the heap table into Sale. Although this is a two-step process, the second step of inserting the rows to the production table does not incur data movement across the distributions.

You will also create a new Sale clustered columnstore table within the wwi_staging to compare data load speeds.

  1. Open Synapse Analytics Studio (https://web.azuresynapse.net/), and then navigate to the Develop hub.

    The Develop menu item is highlighted.

  2. From the Develop menu, select the + button and choose SQL Script from the context menu.

    The SQL script context menu item is highlighted.

  3. In the toolbar menu, connect to the Dedicated SQL Pool resource to execute the query.

    The connect to option is highlighted in the query toolbar.

  4. In the query window, replace the script with the following to create the wwi_staging schema:

    CREATE SCHEMA [wwi_staging]
  5. Select Run from the toolbar menu to execute the SQL command.

    The run button is highlighted in the query toolbar.

    Note: If you receive the following error, continue to the next step: Failed to execute query. Error: There is already an object named 'wwi_staging' in the database. CREATE SCHEMA failed due to previous errors.

  6. In the query window, replace the script with the following to create the heap table:

    CREATE TABLE [wwi_staging].[SaleHeap]
    ( 
        [TransactionId] [uniqueidentifier]  NOT NULL,
        [CustomerId] [int]  NOT NULL,
        [ProductId] [smallint]  NOT NULL,
        [Quantity] [smallint]  NOT NULL,
        [Price] [decimal](9,2)  NOT NULL,
        [TotalAmount] [decimal](9,2)  NOT NULL,
        [TransactionDate] [int]  NOT NULL,
        [ProfitAmount] [decimal](9,2)  NOT NULL,
        [Hour] [tinyint]  NOT NULL,
        [Minute] [tinyint]  NOT NULL,
        [StoreId] [smallint]  NOT NULL
    )
    WITH
    (
        DISTRIBUTION = ROUND_ROBIN,
        HEAP
    )
  7. Select Run from the toolbar menu to execute the SQL command.

  8. In the query window, replace the script with the following to create the Sale table in the wwi_staging schema for load comparisons:

    CREATE TABLE [wwi_staging].[Sale]
    (
        [TransactionId] [uniqueidentifier]  NOT NULL,
        [CustomerId] [int]  NOT NULL,
        [ProductId] [smallint]  NOT NULL,
        [Quantity] [smallint]  NOT NULL,
        [Price] [decimal](9,2)  NOT NULL,
        [TotalAmount] [decimal](9,2)  NOT NULL,
        [TransactionDate] [int]  NOT NULL,
        [ProfitAmount] [decimal](9,2)  NOT NULL,
        [Hour] [tinyint]  NOT NULL,
        [Minute] [tinyint]  NOT NULL,
        [StoreId] [smallint]  NOT NULL
    )
    WITH
    (
        DISTRIBUTION = HASH ( [CustomerId] ),
        CLUSTERED COLUMNSTORE INDEX,
        PARTITION
        (
            [TransactionDate] RANGE RIGHT FOR VALUES (20100101, 20100201, 20100301, 20100401, 20100501, 20100601, 20100701, 20100801, 20100901, 20101001, 20101101, 20101201, 20110101, 20110201, 20110301, 20110401, 20110501, 20110601, 20110701, 20110801, 20110901, 20111001, 20111101, 20111201, 20120101, 20120201, 20120301, 20120401, 20120501, 20120601, 20120701, 20120801, 20120901, 20121001, 20121101, 20121201, 20130101, 20130201, 20130301, 20130401, 20130501, 20130601, 20130701, 20130801, 20130901, 20131001, 20131101, 20131201, 20140101, 20140201, 20140301, 20140401, 20140501, 20140601, 20140701, 20140801, 20140901, 20141001, 20141101, 20141201, 20150101, 20150201, 20150301, 20150401, 20150501, 20150601, 20150701, 20150801, 20150901, 20151001, 20151101, 20151201, 20160101, 20160201, 20160301, 20160401, 20160501, 20160601, 20160701, 20160801, 20160901, 20161001, 20161101, 20161201, 20170101, 20170201, 20170301, 20170401, 20170501, 20170601, 20170701, 20170801, 20170901, 20171001, 20171101, 20171201, 20180101, 20180201, 20180301, 20180401, 20180501, 20180601, 20180701, 20180801, 20180901, 20181001, 20181101, 20181201, 20190101, 20190201, 20190301, 20190401, 20190501, 20190601, 20190701, 20190801, 20190901, 20191001, 20191101, 20191201)
        )
    )
  9. Select Run from the toolbar menu to execute the SQL command.

Task 2: Configure and run PolyBase load operation

PolyBase requires the following elements:

  • An external data source that points to the abfss path in ADLS Gen2 where the Parquet files are located
  • An external file format for Parquet files
  • An external table that defines the schema for the files, as well as the location, data source, and file format
  1. In the query window, replace the script with the following to create the external data source. Be sure to replace SUFFIX with the lab workspace id:

    -- Replace SUFFIX with the lab workspace id.
    CREATE EXTERNAL DATA SOURCE ABSS
    WITH
    ( TYPE = HADOOP,
        LOCATION = 'abfss://[email protected]'
    );
  2. Select Run from the toolbar menu to execute the SQL command.

  3. In the query window, replace the script with the following to create the external file format and external data table. Notice that we defined TransactionId as an nvarchar(36) field instead of uniqueidentifier. This is because external tables do not currently support uniqueidentifier columns:

    CREATE EXTERNAL FILE FORMAT [ParquetFormat]
    WITH (
        FORMAT_TYPE = PARQUET,
        DATA_COMPRESSION = 'org.apache.hadoop.io.compress.SnappyCodec'
    )
    GO
    
    CREATE SCHEMA [wwi_external];
    GO
    
    CREATE EXTERNAL TABLE [wwi_external].Sales
        (
            [TransactionId] [nvarchar](36)  NOT NULL,
            [CustomerId] [int]  NOT NULL,
            [ProductId] [smallint]  NOT NULL,
            [Quantity] [smallint]  NOT NULL,
            [Price] [decimal](9,2)  NOT NULL,
            [TotalAmount] [decimal](9,2)  NOT NULL,
            [TransactionDate] [int]  NOT NULL,
            [ProfitAmount] [decimal](9,2)  NOT NULL,
            [Hour] [tinyint]  NOT NULL,
            [Minute] [tinyint]  NOT NULL,
            [StoreId] [smallint]  NOT NULL
        )
    WITH
        (
            LOCATION = '/sale-small/Year=2019',  
            DATA_SOURCE = ABSS,
            FILE_FORMAT = [ParquetFormat]  
        )  
    GO

    Note: The /sale-small/Year=2019/ folder's Parquet files contain 339,507,246 rows.

  4. Select Run from the toolbar menu to execute the SQL command.

  5. In the query window, replace the script with the following to load the data into the wwi_staging.SalesHeap table:

    INSERT INTO [wwi_staging].[SaleHeap]
    SELECT *
    FROM [wwi_external].[Sales]
  6. Select Run from the toolbar menu to execute the SQL command. It will take a few minutes to execute this command. Take note of how long it took to execute this query.

  7. In the query window, replace the script with the following to see how many rows were imported:

    SELECT COUNT(1) FROM wwi_staging.SaleHeap(nolock)
  8. Select Run from the toolbar menu to execute the SQL command. You should see a result of 339507246.

Task 3: Configure and run the COPY statement

Now let's see how to perform the same load operation with the COPY statement.

  1. In the query window, replace the script with the following to truncate the heap table and load data using the COPY statement. Be sure to replace SUFFIX with the id from your workspace:

    TRUNCATE TABLE wwi_staging.SaleHeap;
    GO
    
    -- Replace SUFFIX with the id from your workspace.
    COPY INTO wwi_staging.SaleHeap
    FROM 'https://asadatalakeSUFFIX.dfs.core.windows.net/wwi-02/sale-small%2FYear%3D2019'
    WITH (
        FILE_TYPE = 'PARQUET',
        COMPRESSION = 'SNAPPY'
    )
    GO
  2. Select Run from the toolbar menu to execute the SQL command. It takes a few minutes to execute this command. Take note of how long it took to execute this query.

  3. In the query window, replace the script with the following to see how many rows were imported:

    SELECT COUNT(1) FROM wwi_staging.SaleHeap(nolock)
  4. Select Run from the toolbar menu to execute the SQL command. You should see a result of 339507246.

Do the number of rows match for both load operations? Which activity was fastest? You should see that both copied the same amount of data in roughly the same amount of time.

Task 4: Load data into the clustered columnstore table

For both of the load operations above, we inserted data into the heap table. What if we inserted into the clustered columnstore table instead? Is there really a performance difference? Let's find out!

  1. In the query window, replace the script with the following to load data into the clustered columnstore Sale table using the COPY statement. Be sure to replace SUFFIX with the id for your workspace:

    -- Replace SUFFIX with the workspace default storage account name.
    COPY INTO wwi_staging.Sale
    FROM 'https://asadatalakeSUFFIX.dfs.core.windows.net/wwi-02/sale-small%2FYear%3D2019'
    WITH (
        FILE_TYPE = 'PARQUET',
        COMPRESSION = 'SNAPPY'
    )
    GO
  2. Select Run from the toolbar menu to execute the SQL command. It takes a few minutes to execute this command. Take note of how long it took to execute this query.

  3. In the query window, replace the script with the following to see how many rows were imported:

    SELECT COUNT(1) FROM wwi_staging.Sale(nolock)
  4. Select Run from the toolbar menu to execute the SQL command.

What were the results? Did the load operation take more or less time writing to Sale table vs. the heap (SaleHeap) table?

In our case, the results are as follows:

PolyBase vs. COPY (DW2000) (insert 2019 small data set (339,507,246 rows)):

  • COPY (Heap: 2:31, clustered columnstore: 3:26)
  • PolyBase (Heap: 2:38)

Task 5: Use COPY to load text file with non-standard row delimiters

One of the advantages COPY has over PolyBase is that it supports custom column and row delimiters.

WWI has a nightly process that ingests regional sales data from a partner analytics system and saves the files in the data lake. The text files use non-standard column and row delimiters where columns are delimited by a . and rows by a ,:

20200421.114892.130282.159488.172105.196533,20200420.109934.108377.122039.101946.100712,20200419.253714.357583.452690.553447.653921

The data has the following fields: Date, NorthAmerica, SouthAmerica, Europe, Africa, and Asia. They must process this data and store it in Synapse Analytics.

  1. In the query window, replace the script with the following to create the DailySalesCounts table and load data using the COPY statement. Be sure to replace <PrimaryStorage>` with the default storage account name for your workspace:

    CREATE TABLE [wwi_staging].DailySalesCounts
        (
            [Date] [int]  NOT NULL,
            [NorthAmerica] [int]  NOT NULL,
            [SouthAmerica] [int]  NOT NULL,
            [Europe] [int]  NOT NULL,
            [Africa] [int]  NOT NULL,
            [Asia] [int]  NOT NULL
        )
    GO
    
    -- Replace <PrimaryStorage> with the workspace default storage account name.
    COPY INTO wwi_staging.DailySalesCounts
    FROM 'https://asadatalakeSUFFIX.dfs.core.windows.net/wwi-02/campaign-analytics/dailycounts.txt'
    WITH (
        FILE_TYPE = 'CSV',
        FIELDTERMINATOR='.',
        ROWTERMINATOR=','
    )
    GO

    Notice the FIELDTERMINATOR and ROWTERMINATOR properties that help us correctly parse the file.

  2. Select Run from the toolbar menu to execute the SQL command.

  3. In the query window, replace the script with the following to view the imported data:

    SELECT * FROM [wwi_staging].DailySalesCounts
    ORDER BY [Date] DESC
  4. Select Run from the toolbar menu to execute the SQL command.

  5. Try viewing the results in a Chart and set the Category column to Date:

    The results are displayed in a chart.

Task 6: Use PolyBase to load text file with non-standard row delimiters

Let's try this same operation using PolyBase.

  1. In the query window, replace the script with the following to create a new external file format, external table, and load data using PolyBase:

    CREATE EXTERNAL FILE FORMAT csv_dailysales
    WITH (
        FORMAT_TYPE = DELIMITEDTEXT,
        FORMAT_OPTIONS (
            FIELD_TERMINATOR = '.',
            DATE_FORMAT = '',
            USE_TYPE_DEFAULT = False
        )
    );
    GO
    
    CREATE EXTERNAL TABLE [wwi_external].DailySalesCounts
        (
            [Date] [int]  NOT NULL,
            [NorthAmerica] [int]  NOT NULL,
            [SouthAmerica] [int]  NOT NULL,
            [Europe] [int]  NOT NULL,
            [Africa] [int]  NOT NULL,
            [Asia] [int]  NOT NULL
        )
    WITH
        (
            LOCATION = '/campaign-analytics/dailycounts.txt',  
            DATA_SOURCE = ABSS,
            FILE_FORMAT = csv_dailysales
        )  
    GO
    INSERT INTO [wwi_staging].[DailySalesCounts]
    SELECT *
    FROM [wwi_external].[DailySalesCounts]
  2. Select Run from the toolbar menu to execute the SQL command.

You should see an error similar to: Failed to execute query. Error: HdfsBridge::recordReaderFillBuffer - Unexpected error encountered filling record reader buffer: HadoopExecutionException: Too many columns in the line..

Why is this? According to PolyBase documentation:

The row delimiter in delimited-text files must be supported by Hadoop's LineRecordReader. That is, it must be either \r, \n, or \r\n. These delimiters are not user-configurable.

This is an example of where COPY's flexibility gives it an advantage over PolyBase.

Exercise 4: Import sales data with COPY using a pipeline

Now that WWI has gone through the process of loading data using PolyBase and COPY via T-SQL statements, it's time for them to experiment with loading sales data through a Synapse pipeline.

When moving data into a data warehouse, there is oftentimes a level of orchestration involved, coordinating movement from one or more data sources and sometimes some level of transformation. The transformation step can occur during (extract-transform-load - ETL) or after (extract-load-transform - ELT) data movement. Any modern data platform must provide a seamless experience for all the typical data wrangling actions like extractions, parsing, joining, standardizing, augmenting, cleansing, consolidating, and filtering. Azure Synapse Analytics provides two significant categories of features - data flows and data orchestrations (implemented as pipelines).

In this exercise, we will focus on the orchestration aspect. Lab 2 will focus more on the transformation (data flow) pipelines. You will create a new pipeline to import a large Parquet file, following best practices to improve the load performance.

Task 1: Configure workload management classification

When loading a large amount of data, it is best to run only one load job at a time for fastest performance. If this isn't possible, run a minimal number of loads concurrently. If you expect a large loading job, consider scaling up your SQL pool before the load.

Be sure that you allocate enough memory to the pipeline session. To do this, increase the resource class of a user which has permissions to rebuild the index on this table to the recommended minimum.

To run loads with appropriate compute resources, create loading users designated for running loads. Assign each loading user to a specific resource class or workload group. To run a load, sign in as one of the loading users, and then run the load. The load runs with the user's resource class.

  1. In the query window, replace the script with the following to create a workload group, BigDataLoad, that uses workload isolation by reserving a minimum of 50% resources with a cap of 100%:

    IF NOT EXISTS (SELECT * FROM sys.workload_management_workload_classifiers WHERE group_name = 'BigDataLoad')
    BEGIN
        CREATE WORKLOAD GROUP BigDataLoad WITH  
        (
            MIN_PERCENTAGE_RESOURCE = 50 -- integer value
            ,REQUEST_MIN_RESOURCE_GRANT_PERCENT = 25 --  (guaranteed a minimum of 4 concurrency)
            ,CAP_PERCENTAGE_RESOURCE = 100
        );
    END
  2. Select Run from the toolbar menu to execute the SQL command.

  3. In the query window, replace the script with the following to create a new workload classifier, HeavyLoader that assigns the asa.sql.import01 user we created in your environment to the BigDataLoad workload group. At the end, we select from sys.workload_management_workload_classifiers to view all classifiers, including the one we just created:

    IF NOT EXISTS (SELECT * FROM sys.workload_management_workload_classifiers WHERE [name] = 'HeavyLoader')
    BEGIN
        CREATE WORKLOAD Classifier HeavyLoader WITH
        (
            Workload_Group ='BigDataLoad',
            MemberName='asa.sql.import01',
            IMPORTANCE = HIGH
        );
    END
    
    SELECT * FROM sys.workload_management_workload_classifiers
  4. Select Run from the toolbar menu to execute the SQL command. You should see the new classifier in the query results:

    The new workload classifier is highlighted.

  5. Navigate to the Manage hub.

    The Manage menu item is highlighted.

  6. Locate and select a linked service named sqlpool01_import01. Notice that the user name for the SQL Pool connection is the asa.sql.import01 user we added to the HeavyLoader classifier. We will use this linked service in our new pipeline to reserve resources for the data load activity.

    The user name is highlighted.

Task 2: Create pipeline with copy activity

  1. Navigate to the Integrate hub.

    The Integrate hub is highlighted.

  2. Select + then Pipeline to create a new pipeline.

    The new pipeline context menu item is selected.

  3. In the Properties pane for the new pipeline, enter the following Name: ASAL400 - Copy December Sales.

    The Name property is highlighted.

  4. Expand Move & transform within the Activities list, then drag the Copy data activity onto the pipeline canvas.

    Copy data is dragged to the canvas

  5. Select the Copy data activity on the canvas and set the Name to Copy Sales.

  6. Select the Source tab, then select + New next to Source dataset.

  7. Select the Azure Data Lake Storage Gen2 data store, then select Continue.

  8. Choose the Parquet format, then select Continue.

  9. In the properties, set the name to asal400_december_sales and select the asadatalakeSUFFIX linked service. Browse to the wwi-02/campaign-analytics/sale-20161230-snappy.parquet file location, select From sample file for schema import. Download this sample file to your computer, then browse to it in the Select file field. Select OK.

    The properties are displayed.

  10. Select the Sink tab, then select + New next to Sink dataset.

  11. Select the Azure Synapse Analytics data store, then select Continue.

  12. In the properties, set the name to asal400_saleheap_asa and select the sqlpool01_import01 linked service that connects to Synapse Analytics with the asa.sql.import01 user. For the table name, scroll the Table name dropdown and choose the wwi_staging.SaleHeap table then select OK.

    The properties are displayed.

  13. In the Sink tab, select the Copy command copy method and enter the following in the pre-copy script to clear the table before import: TRUNCATE TABLE wwi_staging.SaleHeap.

    The described settings are displayed.

  14. Select the Mapping tab and select Import schemas to create mappings for each source and destination field.

    The mapping is displayed.

  15. Select Settings and set the Data integration unit to 8. This is required due to the large size of the source Parquet file.

    The data integration unit value is set to 32.

  16. Select Publish all to save your new resources.

    Publish all is highlighted.

  17. Select Add trigger, then Trigger now. Select OK in the pipeline run trigger to begin.

    Trigger now.

  18. Navigate to the Monitor hub.

    The Monitor hub menu item is selected.

  19. Select Pipeline Runs. You can see the status of your pipeline run here. Note that you may need to refresh the view. Once the pipeline run is complete, you can query the wwi_staging.SaleHeap table to view the imported data.

    The completed pipeline run is displayed.

  20. Clean up! In a SQL script, execute the following:

    DROP WORKLOAD CLASSIFIER HeavyLoader;
    DROP WORKLOAD GROUP BigDataLoad;

Exercise 5: Post load maintenance

Task 1: Statistics

The more the SQL pool resource knows about your data, the faster it can execute queries. After loading data into SQL pool, collecting statistics on your data is one of the most important things you can do for query optimization. The SQL pool query optimizer is a cost-based optimizer. It compares the cost of various query plans, and then chooses the plan with the lowest cost. In most cases, it chooses the plan that will execute the fastest.

For example, if the optimizer estimates that the date your query is filtering on will return one row it will choose one plan. If it estimates that the selected date will return 1 million rows, it will return a different plan.

  1. To determine if statistics are set to be automatically created in your database, execute the following query in a new SQL script tab in Synapse Studio. The is_auto_create_stats_on column from the sys.databases table indicates whether statistics have been enabled.

    SELECT name, is_auto_create_stats_on
    FROM sys.databases
  2. To retrieve a listing of statistics that have been automatically created, use the sys.dm_pdw_exec_requests DMV.

    SELECT
        *
    FROM 
        sys.dm_pdw_exec_requests
    WHERE 
        Command like 'CREATE STATISTICS%'

    Notice the special name pattern used for automatically created statistics:

    View automatically created statistics

  3. To determine if there are any statistics created for a specific column, you can leverage DBCC SHOW_STATISTICS. In our scenario, let's determine the statistics set on the CustomerId column from the wwi_perf.Sale_Hash table.

    DBCC SHOW_STATISTICS ('wwi_perf.Sale_Heap', CustomerId) WITH HISTOGRAM
  4. If there is no statistics for CustomerId, an error will occur. To create statistics for the CustomerId column, use the CREATE STATISTICS statement.

    CREATE STATISTICS Sale_Heap_CustomerId ON wwi_perf.Sale_Heap (CustomerId)
  5. Execute the following query to view the statistics created by the CREATE STATISTICS command:

    DBCC SHOW_STATISTICS ('wwi_perf.Sale_Heap', Sale_Heap_CustomerId) WITH HISTOGRAM
  6. An example of statistics retrieved using DBCC SHOW_STATISTICS is as follows. The Chart option has been selected to see the visual.

    Statistics created for CustomerId

Task 2: Check for skew

Data skew occurs when data is not distributed correctly across different storage distributions. Skew decreases the performance of your dedicated SQL pools by forcing some compute nodes to work harder to read and retrieve data from the tables.

Azure Synapse Analytics dedicated SQL pools have 60 storage distributions and when selecting a distribution key for your hash, the goal is to select the optimal column for distributing data evenly.

To identify data skew in your hash distributed tables:

  1. Use DBCC PDW_SHOWSPACEUSED() to find data skew. In a new SQL script tab, copy, paste and execute the following query:

    -- Find data skew for a distributed table
    DBCC PDW_SHOWSPACEUSED('wwi_staging.Sale');

    This is a very quick and simple way to see the number of table rows that are stored in each of the 60 distributions of your database. Remember that for the most balanced performance, the rows in your distributed table should be spread evenly across all the distributions.

  2. By leveraging Azure Synapse Analytics Dynamic Management Views (DMVs), you can perform a more detailed skew analysis. Copy, paste, and execute the following script to create a view to identify which tables have data skew.

    CREATE VIEW dbo.vDistributionSkew
    AS
    WITH base
    AS
    (
        SELECT 
            SUBSTRING(@@version,34,4) AS [build_number]
            ,GETDATE() AS [execution_time]
            ,DB_NAME() AS [database_name]
            ,s.name AS [schema_name]
            ,t.name AS [table_name]
            ,QUOTENAME(s.name)+'.'+QUOTENAME(t.name) AS [two_part_name]
            ,nt.[name] AS [node_table_name]
            ,ROW_NUMBER() OVER(PARTITION BY nt.[name] ORDER BY (SELECT NULL)) AS [node_table_name_seq]
            ,tp.[distribution_policy_desc] AS [distribution_policy_name]
            ,nt.[distribution_id] AS [distribution_id]
            ,nt.[pdw_node_id] AS [pdw_node_id]
            ,pn.[type] AS [pdw_node_type]
            ,pn.[name] AS [pdw_node_name]
            ,nps.[partition_number] AS [partition_nmbr]
            ,nps.[reserved_page_count] AS [reserved_space_page_count]
            ,nps.[reserved_page_count] - nps.[used_page_count] AS [unused_space_page_count]
            ,nps.[in_row_data_page_count] 
            + nps.[row_overflow_used_page_count] + nps.[lob_used_page_count] AS [data_space_page_count]
            ,nps.[reserved_page_count] 
            - (nps.[reserved_page_count] - nps.[used_page_count]) 
            - ([in_row_data_page_count]+[row_overflow_used_page_count]+[lob_used_page_count]) AS  [index_space_page_count]
            ,nps.[row_count] AS [row_count]
        FROM sys.schemas s
        JOIN sys.tables t ON s.[schema_id] = t.[schema_id]
        JOIN sys.pdw_table_distribution_properties tp ON t.[object_id] = tp.[object_id]
        JOIN sys.pdw_table_mappings tm ON t.[object_id] = tm.[object_id]
        JOIN sys.pdw_nodes_tables nt ON tm.[physical_name] = nt.[name]
        JOIN sys.dm_pdw_nodes pn ON nt.[pdw_node_id] = pn.[pdw_node_id]
        JOIN sys.dm_pdw_nodes_db_partition_stats nps ON nt.[object_id] = nps.[object_id]
                                                        AND nt.[pdw_node_id] = nps.[pdw_node_id]
                                                        AND nt.[distribution_id] = nps.[distribution_id]
    )
    ,size
    AS
    (
        SELECT
            [build_number]
            ,[execution_time]
            ,[database_name]
            ,[schema_name]
            ,[table_name]
            ,[two_part_name]
            ,[node_table_name]
            ,[node_table_name_seq]
            ,[distribution_policy_name]
            ,[distribution_id]
            ,[pdw_node_id]
            ,[pdw_node_type]
            ,[pdw_node_name]
            ,[partition_nmbr]
            ,[reserved_space_page_count]
            ,[unused_space_page_count]
            ,[data_space_page_count]
            ,[index_space_page_count]
            ,[row_count]
            ,([reserved_space_page_count] * 8.0) AS [reserved_space_KB]
            ,([reserved_space_page_count] * 8.0)/1000 AS [reserved_space_MB]
            ,([reserved_space_page_count] * 8.0)/1000000 AS [reserved_space_GB]
            ,([reserved_space_page_count] * 8.0)/1000000000 AS [reserved_space_TB]
            ,([unused_space_page_count] * 8.0) AS [unused_space_KB]
            ,([unused_space_page_count] * 8.0)/1000 AS [unused_space_MB]
            ,([unused_space_page_count] * 8.0)/1000000 AS [unused_space_GB]
            ,([unused_space_page_count] * 8.0)/1000000000 AS [unused_space_TB]
            ,([data_space_page_count] * 8.0) AS [data_space_KB]
            ,([data_space_page_count] * 8.0)/1000 AS [data_space_MB]
            ,([data_space_page_count] * 8.0)/1000000 AS [data_space_GB]
            ,([data_space_page_count] * 8.0)/1000000000 AS [data_space_TB]
            ,([index_space_page_count] * 8.0) AS [index_space_KB]
            ,([index_space_page_count] * 8.0)/1000 AS [index_space_MB]
            ,([index_space_page_count] * 8.0)/1000000 AS [index_space_GB]
            ,([index_space_page_count] * 8.0)/1000000000 AS [index_space_TB]
        FROM base
    )
    SELECT * FROM size;
  3. Query the view:

    SELECT
        [two_part_name]
        ,[distribution_id]
        ,[row_count]
        ,[reserved_space_GB]
        ,[unused_space_GB]
        ,[data_space_GB]
        ,[index_space_GB]
    FROM [dbo].[vDistributionSkew]
    WHERE [table_name] = 'Sale'
        AND [schema_name] = 'wwi_staging'
    ORDER BY [row_count] DESC

    To decide if you should resolve data skew in a table, you should understand as much as possible about the data volumes and queries in your workload.

    Distributing data is a matter of finding the right balance between minimizing data skew and minimizing data movement. These can be opposing goals, and sometimes you will want to keep data skew in order to reduce data movement. For example, when the distribution column is frequently the shared column in joins and aggregations, you will be minimizing data movement. The benefit of having the minimal data movement might outweigh the impact of having data skew.

Cleanup: Pause the dedicated SQL pool

  1. Navigate to the Manage hub.

    The Manage menu item is highlighted.

  2. From the center menu, select SQL pools from beneath the Analytics pools heading. Locate SQLPool01, and select the Pause button.

    The Manage menu item is selected, with SQL pools selected from the center menu. The resume button is selected next to the SQLPool01 item.

  3. When prompted, select Pause.