Within the context of enterprise data warehousing, the effective management of historical data is essential for supporting informed business decision-making. Slowly Changing Dimension (SCD) Type 2 is a widely adopted technique for addressing changes in data over time.
A brief overview of Slowly Changing Dimensions Type 2
Slowly Changing Dimensions Type 2 (SCD Type 2) is a common solution for managing historical data. To ensure clarity, I’ll briefly recap SCD Type 2.
A Type 2 of SCD retains the full history of values. When the value of a chosen attribute changes, the current record is closed. A new record is created with the changed data values and this new record becomes the current record.
Existing Dimension data
surrokey depID dep IsActivity
1 1001 IT 1
2 1002 HR 1
3 1003 Sales 1
Dimension changed and new data comes
depId dep
1003 wholesale <--- depID is same, name changed from "Sales" to "wholesale"
1004 Finance <--- new data
Mark existing dimensional records as expired (inactive); create a new record for the current dimensional data; and insert new incoming data as new dimensional records.
Now, the new Dimension will be:
surrokey depID dep IsActivity
1 1001 IT 1 <-- No action required
2 1002 HR 1 <-- No action required
3 1003 Sales 0 <-- mark as inactive
4 1003 wholesale 1 <-- add updated active value
5 1004 Finance 1 <-- insert new data
This solution demonstrates the core concepts of a Slowly Changing Dimension (SCD) Type 2 implementation. While it covers the major steps involved, real-world production environments often have more complex requirements. When designing dimension tables (e.g., the dep table), I strongly recommend adding more descriptive columns to enhance clarity. Specifically, including [Start_active_date] and [End_active_date] columns significantly improves the traceability and understanding of dimension changes over time.
Implementing SCD Type 2
Step 1: Create a Dimension Table- dep
# Create table
create table dep (
surrokey int IDENTITY(1, 1),
depID int,
dep varchar(50),
IsActivity bit);
# Insert data,
surrokey depID dep IsActivity
1 1001 IT 1
2 1002 HR 1
3 1003 Sales 1
Step 2: Create Data Flow
Add the source dataset. dataset should point to file which is located in your source layer.
We have 2 data rows. That means depID =1003, updated value, a new comes depID=1004 need add into dimension table.
Step 3: Add derived column
Add derived column resource and add column name as isactive and provide the value as 1.
Step 4: Sink dimension data
Create a dataset point to SQL Server Database Table dep
Add a Sink use above dataset, SQLServer_dep_table
Configure the sink mappings as shown below
Step 5: Add SQL dataset as another source.
Step 6: Rename column from Database Table dep
Use select resource to rename columns from SQL table.
rename column name:
depID –> sql_depID
dep –> sql_dep
Isactivity –> sql_IsActivity
Step 7: Lookup
Add lookup to join new dimension data that we have import in “srcDep” at “Step 2”
At this step, existing dimension table “Left Join” out the new coming dimension (need update info or new comes dimension values).
existing dimension data, depID=1003 ,previously “dep” called “Sales” , now it need changing to “wholesales”
Step 8: filter out non-nulls
Add filter, filter out the rows which has non-nulls in the source file columns.
Filter expression : depID column is not null.
!isNull(depid)
This requires filtering the ‘lkpNeedUpdate’ lookup output to include only rows where the depID is not null.
Step 9: Select need columns
Since up stream “filterNonNull” output more columns,
Not all columns are required. The objective is to use the new data (containing depid and dep) to update existing information in the dimension table (specifically sql_depID, sql_dep, and sql_isActivity) and mark the old information as inactive.
Add a “SELECT” to select need the columns that we are going to insert or update in Database dimension table.
Step 10: add a new column and give its value = “0”
Add a deriver, set its value is “0” , means mark it as “inactive“
Step 11: alter row
Add a “Alter Row” to update row information.
configure alter conditions:
Update 1==1
Step 12 Sink updated information
we have updated the existing rows, mark it “0” as “inactive”. it time to save it into database dimension table.
Add a “Sink” point to database dimension table – dep
mapping the columns,
sql_depid ---> depID
sql_dep ---> dep
ActivityStatus ---> IsActivity
Step 13: Adjust Sink order
As there are two sinks, one designated for the source data and the other for the updated data, a specific processing order must be enforced.
Click on a blank area of the canvas, at “Settings” tag, configure them order. 1: sinkUpdated 2: sinkToSQLDBdepTable
Step 14: creata a pipeline
create a pipeline, add this data flow, run it.
SELECT TOP (5000) [surrokey]
,[depID]
,[dep]
,[IsActivity]
FROM [williamSQLDB].[dbo].[dep]
surrokey depID dep IsActivity
1 1001 IT 1
2 1002 HR 1
3 1003 Sales 0
4 1003 Wholesale 15 1004 Finance 1
Conclusion
In conclusion, we have explored the powerful combination of Slowly Changing Dimensions Type 2, it has provided you with a comprehensive understanding of how to effectively implement SCD Type 2 in your data warehousing projects, leveraging modern technologies and following industry best practices.
By implementing SCD Type 2 according to Ralph Kimball’s approach, organizations can achieve a comprehensive view of dimensional data, enabling accurate trend analysis, comparison of historical performance, and tracking of changes over time. It empowers businesses to make data-driven decisions based on a complete understanding of the data’s evolution, ensuring data integrity and reliability within the data warehousing environment.
Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca
This article is part of a series dedicated to dynamic ETL Source-to-Target Mapping (STM) solutions, covering both batch and near real-time use cases. The series will explore various mapping scenarios, including one-to-many, many-to-one, and many-to-many relationships, with implementations provided throughout.
You need create and alter metadata table privilege.
Scenario
In this article, I will focus on scenario where the source schema may have new or missing columns, or the destination schema may have columns with different names or might lack columns to accommodate new incoming source fields.
Requirement:
Dynamically handle source variations, map data to the consistent destination schema, and handle missing columns gracefully. Giving default value to missed column, add new column to target DB table if they are new coming.
Source:
CSV, Schema varies between executions (columns may be missing, reordered, or new). current source columns’ name: name, age, gender and state
Field’s name in the file, or column’s name in t DB table.
“type”
Logical Data Type. The abstract or generalized type used by Azure Data Factory (ADF) to interpret data regardless of the underlying system or format. For example, string, integer, double etc.
“physicalType”
The specific type defined by the database or file system where the data resides. For example, VARCHAR, NVARCHAR, CHAR, INT, FLOAT, NUMERIC(18,10), TEXT etc. in database
Each column has this source-to-sink mapping plan, we will concat all column’s mapping plan, generate a complete Source to Target mapping (STM) plan.
Step 2: Creating known field-column mapping plan
For each known field or column, create a Source-to-Target mapping plan, save it in the “mapping” column of the database metadata table, formatted in JSON style string.
Step 4: Reset the activity status of all source fields in the metadata table to False
Save source data name
Since we will address the item’s metadata one field by one field later, saving source data name in variable is convenient.
add a “Set variable” to save source data name in variable – “var_sourcename”
Reset all source fields to False
Add a “lookup activity”, reset the activity status of all source fields in the metadata table to False.
lookup query:
UPDATE metadata SET
src_col_activity = 0
WHERE source_filename = '@{variables('var_sourcename')}';
SELECT 1;
This is one of the important steps. It allows us to focus on the incoming source fields. When we build the complete ETL Source-to-Target mapping plan, we will utilize these incoming fields.
Step 5: ForEach address source data fields
Add the ‘ForEach activity’ to the pipeline, using the ‘structure’ to address the source data fields one by one.
Save source data field name and data type
In the ForEach activity, add two “Set variable” to save source data field name and data type in variable . ForEach’s @item().name —> var_field_name ForEach’s @item().type —> var_field_type
Lookup source fields in metadata table
Continue in ForEach activity, add a “lookup activity”, create a dataset point to metadata table.
Lookup query:
IF NOT EXISTS ( SELECT src_col from metadata WHERE source_filename = '@{variables('var_sourcename')}' AND src_col = '@{variables('var_field_name')}' ) BEGIN -- Alter target table schema IF NOT EXISTS ( SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'emp' AND COLUMN_NAME = '@{variables('var_field_name')}' ) ALTER TABLE emp ADD @{item().name} NVARCHAR(max); SELECT 'target altered'; -- return
-- insert field metadata and STM plan INSERT INTO metadata (source_filename , src_col , src_dataType , src_col_activity , destination_schema , destination_table , dst_col , dst_dataType , dst_col_activity , mapping) VALUES ( '@{variables('var_sourcename')}' , '@{variables('var_field_name')}' , '@{variables('var_field_type')}' , 1 , 'dbo' , 'emp' , '@{variables('var_field_name')}' , 'NVARCHAR' , 1 , '{ "source": { "name": "@{variables('var_field_name')}", "type": "@{variables('var_field_type')}", "physicalType":"@{variables('var_field_type')}" }, "sink": { "name": "@{variables('var_field_name')}", "type": "nvarchar(max)", "physicalType":"nvarchar(max)" } }' ); SELECT 'insert field metadata';-- return END ELSE BEGIN UPDATE metadata SET src_col_activity = 1 WHERE source_filename = '@{variables('var_sourcename')}' AND src_col = '@{variables('var_field_name')}' select 'this field actived'; -- return END;
Check if the current source field exists in the ‘metadata’ table. If the field’s name is found, update its activity status to True as an existing field. If the field’s name is not present, it indicates a new field. Insert this new field into the metadata table and establish its mapping plan to specify its intended destination.
Check the target table [emp] to verify if the column exists. If the column is not present, alter the schema of the target table [emp] to add a new column to the destination table.
the target table schema altered
new field, “state”, metadata inserted in to the metadata table
Having established the dynamic mapping plan, we are now prepared to ingest data from the source and deliver it to the target. All preceding steps were dedicated to the development of the ETL mapping plan.
Copy activity: Applying the STM mapping plan
Add a “Copy activity”, using Source and Sink dataset we built previous.
changing to “Mapping” tag, click “Add dynamic content”, write expression:
@json(variables('var_mapping_plan'))
All previous steps were dedicated to building the ETL mapping plan.
Done !!!
Afterword
This article focuses on explaining the underlying logic of dynamic source-to-target mapping through a step-by-step demonstration. To clearly illustrate the workflow and logic flow, four “Set Variable” activities and four pipeline variables are included. However, in a production environment, these are not required.
Having demonstrated dynamic source-to-target mapping with all necessary logic flow steps, this solution provides a foundation that can be extended to other scenarios, such as one-to-many, many-to-one, and many-to-many mappings. Implementations for these scenarios will be provided later.
Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca (remove all space from the email account 😊)
The Get Metadata activity in Azure Data Factory (ADF) is used to retrieve metadata about a file, folder, or database. This activity is particularly useful when you need to dynamically determine properties like file name, size, structure, or existence and use them in subsequent pipeline activities.
We can specify the following metadata types in the Get Metadata activity field list to retrieve the corresponding information:
Metadata type
Description
itemName
Name of the file or folder.
itemType
Type of the file or folder. Returned value is File or Folder.
size
Size of the file, in bytes. Applicable only to files.
created
Created datetime of the file or folder.
lastModified
Last modified datetime of the file or folder.
childItems
List of subfolders and files in the given folder. Applicable only to folders. Returned value is a list of the name and type of each child item.
contentMD5
MD5 of the file. Applicable only to files.
structure
Data structure of the file or relational database table. Returned value is a list of column names and column types.
columnCount
Number of columns in the file or relational table.
exists
Whether a file, folder, or table exists. If exists is specified in the Get Metadata field list, the activity won’t fail even if the file, folder, or table doesn’t exist. Instead, exists: false is returned in the output.
Metadata structure and columnCount are not supported when getting metadata from Binary, JSON, or XML files.
Wildcard filter on folders/files is not supported for Get Metadata activity.
Get Metadata activity on the canvas if it is not already selected, and its Settings tab, to edit its details.
Sample setting and output
Get a folder’s metadata
Setting
select a dataset or create a new
for folder’s metadata, in the Field list of setting, all we can select are:
The Get Metadata activity in Azure Data Factory (ADF) is a versatile tool for building dynamic, efficient, and robust pipelines. It plays a critical role in handling real-time scenarios by providing essential information about data sources, enabling smarter workflows.
Use Case Scenarios Recap
File Verification: Check if a file exists or meets specific conditions (e.g., size or modification date) before processing.
Iterative Processing: Use folder metadata to dynamically loop through files using the ForEach activity.
Schema Validation: Fetch table or file schema for use in dynamic transformations or validations.
Dynamic Path Handling: Adjust source/destination paths based on retrieved metadata properties.
Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca (remove all space from the email account 😊)
PySpark supports a variety of data sources, enabling seamless integration and processing of structured and semi-structured data from multiple formats. such as CSV, JSON, Parquet, and ORC, Database (JDBC), as well as more advanced formats like Avro and Delta Lake, Hive
By default, If you try to write directly to a file (e.g., name1.csv), it conflicts because Spark doesn’t generate a single file but a collection of part files in a directory.
PySpark writes data in parallel, which results in multiple part files rather than a single CSV file by default. However, you can consolidate the data into a single CSV file by performing a coalesce(1) or repartition(1) operation before writing, which reduces the number of partitions to one.
at this time, name1.csv in “dbfs:/FileStore/name1.csv” will treat as a directory, rather than a Filename. PySpark writes the single file with a random name like part-00000-<id>.csv
we need take additional step – “Rename the File to name1.csv“
# List all files in the directory
files = dbutils.fs.ls("dbfs:/FileStore/name1.csv")
# Filter for the part file
for file in files:
if file.name.startswith("part-"):
source_file = file.path # Full path to the part file
destination_file = "dbfs:/FileStore/name1.csv"
# Move and rename the file
dbutils.fs.mv(source_file, destination_file)
break
display(dbutils.fs.ls ( "dbfs:/FileStore/"))
Direct Write with Custom File Name
PySpark doesn’t natively allow specifying a custom file name directly while writing a file because it writes data in parallel using multiple partitions. However, you can achieve a custom file name with a workaround. Here’s how:
Steps:
Use coalesce(1) to combine all data into a single partition.
Save the file to a temporary location.
Rename the part file to the desired name.
# Combine all data into one partition
df.coalesce(1).write.format("csv") \
.option("header", "true") \
.mode("overwrite") \
.save("dbfs:/FileStore/temp_folder")
# Get the name of the part file
files = dbutils.fs.ls("dbfs:/FileStore/temp_folder")
for file in files:
if file.name.startswith("part-"):
part_file = file.path
break
# Move and rename the part file
dbutils.fs.mv(part_file, "dbfs:/FileStore/name1.csv")
# Remove the temporary folder
dbutils.fs.rm("dbfs:/FileStore/temp_folder", True)
sample data save at /tmp/output/people.parquet
df1=spark.read.parquet("/tmp/output/people.parquet")
+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname| dob|gender|salary|
+---------+----------+--------+-----+------+------+
| James | | Smith|36636| M| 3000|
| Michael | Rose| |40288| M| 4000|
| Robert | |Williams|42114| M| 4000|
| Maria | Anne| Jones|39192| F| 4000|
| Jen| Mary| Brown| | F| -1|
+---------+----------+--------+-----+------+------+
read a parquet is the same as reading from csv, nothing special.
write to a parquet
append: appends the data from the DataFrame to the existing file, if the Destination files already exist. In Case the Destination files do not exists, it will create a new parquet file in the specified location.
overwrite: This mode overwrites the destination Parquet file with the data from the DataFrame. If the file does not exist, it creates a new Parquet file.
ignore: If the destination Parquet file already exists, this mode does nothing and does not write the DataFrame to the file. If the file does not exist, it creates a new Parquet file.
pay attention on “.option(“multiline”,”True”)“. since my json file is multiple lines (look at above sample data), if reading without this option, it can still run load. But the dataframe will not work. Once you show, you will get this error
AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named _corrupt_record by default).
Reading from Multiline JSON (JSON Array) File
[{
"RecordNumber": 2,
"Zipcode": 704,
"ZipCodeType": "STANDARD",
"City": "PASEO COSTA DEL SUR",
"State": "PR"
},
{
"RecordNumber": 10,
"Zipcode": 709,
"ZipCodeType": "STANDARD",
"City": "BDA SAN LUIS",
"State": "PR"
}]
df_jsonarray_simple = spark.read\
.format("json")\
.option("multiline", "true")\
.load("dbfs:/FileStore/jsonArrary.json")
df_jsonarray_simple.show()
+-------------------+------------+-----+-----------+-------+
| City|RecordNumber|State|ZipCodeType|Zipcode|
+-------------------+------------+-----+-----------+-------+
|PASEO COSTA DEL SUR| 2| PR| STANDARD| 704|
| BDA SAN LUIS| 10| PR| STANDARD| 709|
+-------------------+------------+-----+-----------+-------+
read complex json
df_complexjson=spark.read\
.option("multiline","true")\
.json("dbfs:/FileStore/jsonArrary2.json")
df_complexjson.select("id","type","name","ppu","batters","topping").show(truncate=False, vertical=True)
-RECORD 0--------------------------------------------------------------------------------------------------------------------------------------------
id | 0001
type | donut
name | Cake
ppu | 0.55
batters | {[{1001, Regular}, {1002, Chocolate}, {1003, Blueberry}, {1004, Devil's Food}]}
topping | [{5001, None}, {5002, Glazed}, {5005, Sugar}, {5007, Powdered Sugar}, {5006, Chocolate with Sprinkles}, {5003, Chocolate}, {5004, Maple}]
-RECORD 1--------------------------------------------------------------------------------------------------------------------------------------------
id | 0002
type | donut
name | Raised
ppu | 0.55
batters | {[{1001, Regular}]}
topping | [{5001, None}, {5002, Glazed}, {5005, Sugar}, {5003, Chocolate}, {5004, Maple}]
-RECORD 2--------------------------------------------------------------------------------------------------------------------------------------------
id | 0003
type | donut
name | Old Fashioned
ppu | 0.55
batters | {[{1001, Regular}, {1002, Chocolate}]}
topping | [{5001, None}, {5002, Glazed}, {5003, Chocolate}, {5004, Maple}]
spark.sql("CREATE OR REPLACE TEMPORARY VIEW zipcode USING json OPTIONS" +
" (path 'resources/zipcodes.json')")
spark.sql("select * from zipcode").show()
Write to JSON
Options
path: Specifies the path where the JSON files will be saved.
mode: Specifies the behavior when writing to an existing directory.
compression: Specifies the compression codec to use when writing the JSON files (e.g., “gzip”, “snappy”). df2.write . option(“compression”, “gzip”)
dateFormat: Specifies the format for date and timestamp columns. df.write . option(“dateFormat”, “yyyy-MM-dd”)
timestampFormat: Specifies the format for timestamp columns. df.write . option(“timestampFormat“, “yyyy-MM-dd’T’HH:mm:ss.SSSXXX”)
lineSep: Specifies the character sequence to use as a line separator between JSON objects. \n (Unix/Linux newline); \r\n (Windows newline) df . write . option(“lineSep”, “\r\n”)
encoding: Specifies the character encoding to use when writing the JSON files. UTF-8, UTF-16, ISO-8859-1 (Latin-1), Other Java-supported encodings. df . write . option(“encoding”, “UTF-8”)
Append: Appends the data to the existing data in the target location. If the target location does not exist, it creates a new one.
Overwrite: Overwrites the data in the target location if it already exists. If the target location does not exist, it creates a new one.
Ignore: Ignores the operation and does nothing if the target location already exists. If the target location does not exist, it creates a new one.
Error or ErrorIfExists: Throws an error and fails the operation if the target location already exists. This is the default behavior if no saving mode is specified.
# Write with savemode example
df2.write.mode('Overwrite').json("/tmp/spark_output/zipcodes.json")
In the above example, it reads the entire table into PySpark DataFrame. Sometimes you may not be required to select the entire table, so to select the specific columns, specify the query you wanted to select with dbtable option.
Append: mode("append") to append the rows to the existing SQL Server table.
# we have define variables, here is show again server_name = “mainri-sqldb.database.windows.net” port=1433 username=”my login name” password=”my login password” database_name=”mainri-sqldb” table_name=”dep” jdbc_url = f”jdbc:sqlserver://{server_name}:{port};databaseName={database_name}”
The mode("overwrite") drops the table if already exists by default and re-creates a new one without indexes. Use option(“truncate”,”true”) to retain the index.
PySpark jdbc() method with the option numPartitions you can read the database table in parallel. This option is used with both reading and writing.
The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing.
# Select columns with where clause
df = spark.read \
.format("jdbc") \
.option("driver","com.mysql.cj.jdbc.Driver") \
.option("url", "jdbc:mysql://localhost:3306/emp") \
.option("query","select id,age from employee where gender='M'") \
.option("numPartitions",5) \
.option("user", "root") \
.option("password", "root") \
.load()
Using fetchsize with numPartitions to Read
The fetchsizeis another option which is used to specify how many rows to fetch at a time, by default it is set to 10. The JDBC fetch size determines how many rows to retrieve per round trip which helps the performance of JDBC drivers. Do not set this to very large number as you might see issues.
# Create temporary view
sampleDF.createOrReplaceTempView("sampleView")
# Create a Database CT
spark.sql("CREATE DATABASE IF NOT EXISTS ct")
# Create a Table naming as sampleTable under CT database.
spark.sql("CREATE TABLE ct.sampleTable (id Int, name String, age Int, gender String)")
# Insert into sampleTable using the sampleView.
spark.sql("INSERT INTO TABLE ct.sampleTable SELECT * FROM sampleView")
# Lets view the data in the table
spark.sql("SELECT * FROM ct.sampleTable").show()
Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca
from pyspark.sql.functions import array_contains
# Using array_contains to check if the array contains 'apple'
df.select("id", array_contains("fruits", "apple").alias("has_apple")).show()
==output==
+---+----------+
| id|has_apple |
+---+----------+
| 1| true|
| 2| true|
| 3| false|
+---+----------+
getItem()
Access individual elements of an array by their index using the getItem() method
# Select the second element (index start from 0) of the 'numbers' array
df1 = df.withColumn("item_1_value", df.numbers.getItem(1))
display(df1)
==output==
id numbers item_1_value
1 [1,2,3] 2
2 [4,5,6] 5
3 [7,8,9] 8
size ()
Returns the size of the array.
from pyspark.sql.functions import size
# Get the size of the 'numbers' array
df.select(size(df.numbers)).show()
==output==
+-------------+
|size(numbers)|
+-------------+
| 3|
| 3|
| 3|
+-------------+
sort_array()
Sorts the array elements.
sort_array(col: ‘ColumnOrName’, asc: bool = True)
If `asc` is True (default) then ascending and if False then descending. if asc=True, can be omitted.
concat() is used to concatenate arrays (or strings) into a single array (or string). When dealing with ArrayType, concat() is typically used to combine two or more arrays into one.
from pyspark.sql.functions import concat concat(*cols)
If any of the input columns are null, the entire result can become null. This is why you’re seeing null instead of just the non-null array.
To handle this, you can use coalesce() to substitute null with an empty array before performing the concat(). coalesce() returns the first non-null argument. Here’s how you can modify your code:
from pyspark.sql.functions import concat, coalesce, lit
# Define an empty array for the same type
empty_array = array()
# Concatenate with null handling using coalesce
df_concat = df.withColumn(
"concatenated_array",
concat(coalesce(col("array1"), empty_array), coalesce(col("array2"), empty_array))
)
df_concat.show(truncate=False)
==output==
+---+------+------+------------------+
|id |array1|array2|concatenated_array|
+---+------+------+------------------+
|1 |[a, b]|[x, y]|[a, b, x, y] |
|2 |[c] |[z] |[c, z] |
|3 |[d, e]|null |[d, e] |
+---+------+------+------------------+
array_zip ()
Combines arrays into a single array of structs.
☰ MapType column, and functions
MapType is used to represent map key-value pair similar to python Dictionary (Dic)
from pyspark.sql.types import MapType, StringType, IntegerType # Define a MapType my_map = MapType(StringType(), IntegerType(), valueContainsNull=True)
Parameters:
keyType: Data type of the keys in the map. You can use PySpark data types like StringType(), IntegerType(), DoubleType(), etc.
valueType: Data type of the values in the map. It can be any valid PySpark data type
valueContainsNull: Boolean flag (optional). It indicates whether null values are allowed in the map. Default is True.
sample dataset # Sample dataset (Product ID and prices in various currencies) data = [ (1, {“USD”: 100, “EUR”: 85, “GBP”: 75}), (2, {“USD”: 150, “EUR”: 130, “GBP”: 110}), (3, {“USD”: 200, “EUR”: 170, “GBP”: 150}), ]
To get the price for a specific currency (e.g., USD) for each product:
from pyspark.sql.functions import col, map_keys, map_values
# Access the value for a specific key in the map
df.select(
col("product_id"),
col("prices").getItem("USD").alias("price_in_usd")
).show(truncate=False)
==output==
+----------+------------+
|product_id|price_in_usd|
+----------+------------+
|1 |100 |
|2 |150 |
|3 |200 |
+----------+------------+
filtering
filter the rows based on conditions involving the map values
from pyspark.sql.functions import col, map_keys, map_values
# Filter rows where price in USD is greater than 150
df.filter(col("prices").getItem("USD") > 150).show(truncate=False)
==output==
+----------+------------------------------------+
|product_id|prices |
+----------+------------------------------------+
|3 |{EUR -> 170, GBP -> 150, USD -> 200}|
+----------+------------------------------------+
map_concat ()
Combines two or more map columns by merging their key-value pairs.
from pyspark.sql.functions import map_concat, create_map, lit
# Define the additional currency as a new map using create_map()
additional_currency = create_map(lit("CAD"), lit(120))
# Add a new currency (e.g., CAD) with a fixed price to all rows
df.withColumn(
"updated_prices",
map_concat(col("prices"), additional_currency)
).show(truncate=False)
==output==
+----------+------------------------------------+
|product_id|prices |
+----------+------------------------------------+
|3 |{EUR -> 170, GBP -> 150, USD -> 200}|
+----------+------------------------------------+
Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca
existingName: The current name of the column you want to rename. newName: The new name for the column.
drop ()
In PySpark, you can drop columns from a DataFrame using the drop() method. Here’s a breakdown of the syntax, options, parameters, and examples for dropping columns in PySpark.
Syntax
DataFrame.drop(*cols)
*cols: One or more column names (as strings) that you want to drop from the DataFrame. You can pass these as individual arguments or a list of column names.
# Drop single column 'age'
df_dropped = df.drop("age")
# Drop multiple columns 'id' and 'age'
df_dropped_multiple = df.drop("id", "age")
# Dropping Columns Using a List of Column Names
# Define columns to drop
columns_to_drop = ["id", "age"]
# Drop columns using list
df_dropped_list = df.drop(*columns_to_drop)
df_dropped_list.show()
Show()
By default, display 20 row, truncate 20 characters
df.show(n=10, truncate= True, vertical=True)
Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca
When your Delta tables reside in Blob Storage or Azure Data Lake Storage (ADLS), you interact with them directly using their file paths. This differs from how you might access tables managed within a metastore like Unity Catalog, where you’d use a cataloged name.
Reading Delta Tables from Blob Storage or ADLS
To read Delta tables from Blob Storage or ADLS, you specify the path to the Delta table and use the delta. format.
When writing to Delta tables, use the delta format and specify the path where you want to store the table.
Spark SQL cannot directly write to a Delta table in Blob or ADLS (use PySpark for this). However, you can run SQL queries and insert into a Delta table using INSERT INTO:
# SparkSQL
INSERT INTO delta.`/mnt/path/to/delta/table`SELECT * FROM my_temp_table
caution: " ` " - backticks
# PySpark
df.write.format("delta").mode("overwrite").save("path/to/delta/table")
Options and Parameters for Delta Read/Write
Options for Reading Delta Tables:
You can configure the read operation with options like:
mergeSchema: Allows schema evolution if the structure of the Delta table changes.
spark.sql.files.ignoreCorruptFiles: Ignores corrupt files during reading.
timeTravel: Enables querying older versions of the Delta table.
Delta supports time travel, allowing you to query previous versions of the data. This is very useful for audits or retrieving data at a specific point in time.
# Read from a specific version
df = spark.read.format("delta").option("versionAsOf", 2).load("path/to/delta/table")
df.show()
# Read data at a specific timestamp
df = spark.read.format("delta").option("timestampAsOf", "2024-10-01").load("path/to/delta/table")
df.show()
Conclusion:
Delta is a powerful format that works well with ADLS or Blob Storage when used with PySpark.
Ensure that you’re using the Delta Lake library to access Delta features, like ACID transactions, schema enforcement, and time travel.
For reading, use .format("delta").load("path").
For writing, use .write.format("delta").save("path").
Please do not hesitate to contact me if you have any questions at William . chen @ mainri.ca
from_json() is a function used to parse a JSON string into a structured DataFrame format (such as StructType, ArrayType, etc.). It is commonly used to deserialize JSON strings stored in a DataFrame column into complex types that PySpark can work with more easily.
Syntax
from_json(column, schema, options={})
Parameters
column: The column containing the JSON string. Can be a string that refers to the column name or a column object.
schema
Specifies the schema of the expected JSON structure. Can be a StructType (or other types like ArrayType depending on the JSON structure).
options
allowUnquotedFieldNames: Allows field names without quotes. (default: false)
allowNumericLeadingZeros: Allows leading zeros in numbers. (default: false)
allowBackslashEscapingAnyCharacter: Allows escaping any character with a backslash. (default: false)
mode: Controls how to handle malformed records PERMISSIVE: The default mode that sets null values for corrupted fields. DROPMALFORMED: Discards rows with malformed JSON strings. FAILFAST: Fails the query if any malformed records are found.
Sample DF
+----------------------------+
|json_string |
+----------------------------+
|{"name": "John", "age": 30} |
|{"name": "Alice", "age": 25}|
+----------------------------+
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, StructType
from pyspark.sql.functions import from_json, col
# Define the schema for the nested JSON
schema = StructType([
StructField("name", StructType([
StructField("first", StringType(), True),
StructField("last", StringType(), True)
]), True),
StructField("age", IntegerType(), True)
])
# Parse the JSON string into structured columns
df_parsed = df.withColumn("parsed_json", from_json(col("json_string"), schema))
# Display the parsed JSON
df_parsed.select("parsed_json.*").show(truncate=False)
+--------+---+
| name |age|
+--------+---+
|{John, Doe}|30|
|{Alice, Smith}|25|
+--------+---+
to_json()
to_json() is a function that converts a structured column (such as one of type StructType, ArrayType, etc.) into a JSON string.
Syntax
to_json(column, options={})
Parameters
column: The column you want to convert into a JSON string. The column should be of a complex data type, such as StructType, ArrayType, or MapType. Can be a column name (as a string) or a Column object.
options
pretty: If set to true, it pretty-prints the JSON output.
dateFormat: Specifies the format for DateType and TimestampType columns (default: yyyy-MM-dd).
timestampFormat: Specifies the format for TimestampType columns (default: yyyy-MM-dd'T'HH:mm:ss.SSSXXX).
ignoreNullFields: When set to true, null fields are omitted from the resulting JSON string (default: true).
compression: Controls the compression codec used to compress the JSON output, e.g., gzip, bzip2.