Data Quality Framework (DQX)

Data quality is more critical than ever in today’s data-driven world. Organizations are generating and collecting vast amounts of data, and the ability to trust and leverage this information is paramount for success. Poor data quality can have severe negative impacts, ranging from flawed decision-making to regulatory non-compliance and significant financial losses.

Key Dimensions of Data Quality (DAMA-DMBOK or ISO 8000 Standards)

A robust DQX evaluates data across multiple dimensions:

  • Accuracy: Data correctly represents real-world values.
  • Completeness: No missing or null values where expected.
  • Consistency: Data is uniform across systems and over time.
  • Timeliness: Data is up-to-date and available when needed.
  • Validity: Data conforms to defined business rules (e.g., format, range).
  • Uniqueness: No unintended duplicates.
  • Integrity: Relationships between datasets are maintained.

What is Data Quality Framework (DQX)

Data Quality Framework (DQX) is an open-source framework from Databricks Labs designed to simplify and automate data quality checks for PySpark workloads on both batch and streaming data.

DAX is a structured approach to assessing, monitoring, and improving the quality of data within an organization. It define, validate, and enforce data quality rules across your data pipelines. It ensures that data is accurate, consistent, complete, reliable, and fit for its intended use. so it can be used confidently for analytics, reporting, compliance, and decision-making.

This article will explore how the DQX framework helps improve data reliability, reduce data errors, and enforce compliance with data quality standards. We will step by step go through all steps, from setup and use DQX framework in databricks notebook with code snippets to implement data quality checks.

DQX usage in the Lakehouse Architecture

In the Lakehouse architecture, new data validation should happen during data entry into the curated layer to ensure bad data is not propagated to the subsequent layers. With DQX, you can implement Dead-Letter pattern to quarantine invalid data and re-ingest it after curation to ensure data quality constraints are met. The data quality can be monitored in real-time between layers, and the quarantine process can be automated.

Credits: https://databrickslabs.github.io/dqx/docs/motivation/

Components of a Data Quality Framework

A DQX typically includes:

A. Data Quality Assessment

  • Profiling: Analyze data to identify anomalies (e.g., outliers, nulls).
  • Metrics & KPIs: Define measurable standards (e.g., % completeness, error rates).
  • Benchmarking: Compare against industry standards or past performance.

B. Data Quality Rules & Standards

  • Define validation rules (e.g., “Email must follow RFC 5322 format”).
  • Implement checks at the point of entry (e.g., form validation) and during processing.

C. Governance & Roles

  • Assign data stewards responsible for quality.
  • Establish accountability (e.g., who fixes issues? Who approves changes?).

D. Monitoring & Improvement

  • Automated checks: Use tools like Great Expectations, Talend, or custom scripts.
  • Root Cause Analysis (RCA): Identify why errors occur (e.g., system glitches, human input).
  • Continuous Improvement: Iterative fixes (e.g., process changes, user training).

E. Tools & Technology

  • Data Quality Tools: Informatica DQ, IBM InfoSphere, Ataccama, or open-source (Apache Griffin).
  • Metadata Management: Track data lineage and quality scores.
  • AI/ML: Anomaly detection (e.g., identifying drift in datasets).

F. Culture & Training

  • Promote data literacy across teams.
  • Encourage reporting of data issues without blame.

Using Databricks DQX Framework in a Notebook

Step by Step Implementing DQX

Step 1: Install the DQX Library

install it using the Databricks Labs CLI:

%pip install databricks-labs-dqx

# Restart the kernel after the package is installed in the notebook:
# in a separate cell run:
dbutils.library.restartPython()

Step 2: Initialize the Environment and read input data

Set up the necessary environment for running the Databricks DQX framework, including:

Importing the key components from the Databricks DQX library.
  • DQProfiler: Used for profiling the input data to understand its structure and generate summary statistics.
  • DQGenerator: Generates data quality rules based on the profiles.
  • DQEngine: Executes the defined data quality checks.
  • WorkspaceClient: Handles communication with the Databricks workspace.
Import Libraries
from databricks.labs.dqx.profiler.profiler import DQProfiler
from databricks.labs.dqx.profiler.generator import DQGenerator
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
Loading the input data that you want to evaluate.
# Read the input data from a Delta table
input_df = spark.read.table("catalog.schema.table")
Establishing a connection to the Databricks workspace.
# Initialize the WorkspaceClient to interact with the Databricks workspace
ws = WorkspaceClient()

# Initialize a DQProfiler instance with the workspace client
profiler = DQProfiler(ws)
Profiling for data quality.
# Profile the input DataFrame to get summary statistics and data profiles

summary_stats, profiles = profiler.profile(input_df)
Generate DQX quality rules/checks
# generate DQX quality rules/checks
generator = DQGenerator(ws)
checks = generator.generate_dq_rules(profiles)  # with default level "error"

dq_engine = DQEngine(ws)
Save checks in arbitrary workspace location
# save checks in arbitrary workspace location
dq_engine.save_checks_in_workspace_file(checks, workspace_path="/Shared/App1/checks.yml")
Generate DLT expectations
# generate DLT expectations
dlt_generator = DQDltGenerator(ws)

dlt_expectations = dlt_generator.generate_dlt_rules(profiles, language="SQL")
print(dlt_expectations)

dlt_expectations = dlt_generator.generate_dlt_rules(profiles, language="Python")
print(dlt_expectations)

dlt_expectations = dlt_generator.generate_dlt_rules(profiles, language="Python_Dict")
print(dlt_expectations)

The profiler samples 30% of the data (sample ratio = 0.3) and limits the input to 1000 records by default.

Profiling a Table

Tables can be loaded and profiled using profile_table.

from databricks.labs.dqx.profiler.profiler import DQProfiler
from databricks.sdk import WorkspaceClient

# Profile a single table directly
ws = WorkspaceClient()
profiler = DQProfiler(ws)

# Profile a specific table with custom options
summary_stats, profiles = profiler.profile_table(
    table="catalog1.schema1.table1",
    columns=["col1", "col2", "col3"],  # specify columns to profile
    options={
        "sample_fraction": 0.1,  # sample 10% of data
        "limit": 500,            # limit to 500 records
        "remove_outliers": True, # enable outlier detection
        "num_sigmas": 2.5       # use 2.5 standard deviations for outliers
    }
)

print("Summary Statistics:", summary_stats)
print("Generated Profiles:", profiles)
Profiling Multiple Tables

The profiler can discover and profile multiple tables in Unity Catalog. Tables can be passed explicitly as a list or be included/excluded using regex patterns.

from databricks.labs.dqx.profiler.profiler import DQProfiler
from databricks.sdk import WorkspaceClient

ws = WorkspaceClient()
profiler = DQProfiler(ws)

# Profile several tables by name:
results = profiler.profile_tables(
    tables=["main.data.table_001", "main.data.table_002"]
)

# Process results for each table
for summary_stats, profiles in results:
    print(f"Table statistics: {summary_stats}")
    print(f"Generated profiles: {profiles}")

# Include tables matching specific patterns
results = profiler.profile_tables(
    patterns=["$main.*", "$data.*"]
)

# Process results for each table
for summary_stats, profiles in results:
    print(f"Table statistics: {summary_stats}")
    print(f"Generated profiles: {profiles}")

# Exclude tables matching specific patterns
results = profiler.profile_tables(
    patterns=["$sys.*", ".*_tmp"],
    exclude_matched=True
)

# Process results for each table
for summary_stats, profiles in results:
    print(f"Table statistics: {summary_stats}")
    print(f"Generated profiles: {profiles}")

Profiling Options

The profiler supports extensive configuration options to customize the profiling behavior.

from databricks.labs.dqx.profiler.profiler import DQProfiler
from databricks.sdk import WorkspaceClient

# Custom profiling options
custom_options = {
# Sampling options
"sample_fraction": 0.2, # Sample 20% of the data
"sample_seed": 42, # Seed for reproducible sampling
"limit": 2000, # Limit to 2000 records after sampling

# Outlier detection options
"remove_outliers": True, # Enable outlier detection for min/max rules
"outlier_columns": ["price", "age"], # Only detect outliers in specific columns
"num_sigmas": 2.5, # Use 2.5 standard deviations for outlier detection

# Null value handling
"max_null_ratio": 0.05, # Generate is_not_null rule if <5% nulls

# String handling
"trim_strings": True, # Trim whitespace from strings before analysis
"max_empty_ratio": 0.02, # Generate is_not_null_or_empty if <2% empty strings

# Distinct value analysis
"distinct_ratio": 0.01, # Generate is_in rule if <1% distinct values
"max_in_count": 20, # Maximum items in is_in rule list

# Value rounding
"round": True, # Round min/max values for cleaner rules
}

ws = WorkspaceClient()
profiler = DQProfiler(ws)

# Apply custom options to profiling
summary_stats, profiles = profiler.profile(input_df, options=custom_options)

# Apply custom options when profiling tables
tables = [
"dqx.demo.test_table_001",
"dqx.demo.test_table_002",
"dqx.demo.test_table_003", # profiled with default options
]
table_options = {
"dqx.demo.test_table_001": {"limit": 2000},
"dqx.demo.test_table_002": {"limit": 5000},
}
summary_stats, profiles = profiler.profile_tables(tables=tables, options=table_options)

Understanding output

Assuming the sample data is:

customer_idcustomer_namecustomer_emailis_activestart_dateend_date
1Alicealice@mainri.ca12025-01-24null
2Bobbob_new@mainri.ca12025-01-24null
3Charlieinvalid_email12025-01-24null
3Charlieinvalid_email02025-01-242025-01-24
# Initialize the WorkspaceClient to interact with the Databricks workspace
ws = WorkspaceClient()

# Initialize a DQProfiler instance with the workspace client
profiler = DQProfiler(ws)

# Read the input data from a Delta table
input_df = spark.read.table("catalog.schema.table")

# Display a sample of the input data
input_df.display()

# Profile the input DataFrame to get summary statistics and data profiles
summary_stats, profiles = profiler.profile(input_df)

Upon checking the summary and profile of my input data generated, below are the results generated by DQX

print(summary_stats)

Summary of input data on all the columns in input dataframe

# Summary of input data on all the columns in input dataframe
{
  "customer_id": {
    "count": 4,
    "mean": 2.25,
    "stddev": 0.9574271077563381,
    "min": 1,
    "25%": 1,
    "50%": 2,
    "75%": 3,
    "max": 3,
    "count_non_null": 4,
    "count_null": 0
  },
  "customer_name": {
    "count": 4,
    "mean": null,
    "stddev": null,
    "min": "Alice",
    "25%": null,
    "50%": null,
    "75%": null,
    "max": "Charlie",
    "count_non_null": 4,
    "count_null": 0
  },
  "customer_email": {
    "count": 4,
    "mean": null,
    "stddev": null,
    "min": "alice@example.com",
    "25%": null,
    "50%": null,
    "75%": null,
    "max": "charlie@example.com",
    "count_non_null": 4,
    "count_null": 0
  },
  "is_active": {
    "count": 4,
    "mean": 0.75,
    "stddev": 0.5,
    "min": 0,
    "25%": 0,
    "50%": 1,
    "75%": 1,
    "max": 1,
    "count_non_null": 4,
    "count_null": 0
  },
  "start_date": {
    "count": 4,
    "count_non_null": 4,
    "count_null": 0,
    "min": "2025-01-24",
    "max": "2025-01-24",
    "mean": "2025-01-24"
  },
  "end_date": {
    "count": 4,
    "count_non_null": 1,
    "count_null": 3,
    "min": 1737676800,
    "max": 1737676800
  }
}

print(profiles)
# Default Data profile generated based on input data
DQProfile(
  name='is_not_null',
  column='customer_id',
  description=None,
  parameters=None
),
DQProfile(
  name='min_max',
  column='customer_id',
  description='Real min/max values were used',
  parameters={
    'min': 1,
    'max': 3
  }
),
DQProfile(
  name='is_not_null',
  column='customer_name',
  description=None,
  parameters=None
),
DQProfile(
  name='is_not_null',
  column='customer_email',
  description=None,
  parameters=None
),
DQProfile(
  name='is_not_null',
  column='is_active',
  description=None,
  parameters=None
),
DQProfile(
  name='is_not_null',
  column='start_date',
  description=None,
  parameters=None
)

Step 3: Understanding checks applied at data

With the below snippet, we can understand the default checks applied at input data, which generated the data profile as mentioned in previous step.

# generate DQX quality rules/checks
generator = DQGenerator(ws)
checks = generator.generate_dq_rules(profiles)

print(checks)
# Checks applied on input data
[{
  'check': {
    'function': 'is_not_null',
    'arguments': {
      'col_name': 'customer_id'
    }
  },
  'name': 'customer_id_is_null',
  'criticality': 'error'
},
{
  'check': {
    'function': 'is_in_range',
    'arguments': {
      'col_name': 'customer_id',
      'min_limit': 1,
      'max_limit': 3
    }
  },
  'name': 'customer_id_isnt_in_range',
  'criticality': 'error'
},
{
  'check': {
    'function': 'is_not_null',
    'arguments': {
      'col_name': 'customer_name'
    }
  },
  'name': 'customer_name_is_null',
  'criticality': 'error'
},
{
  'check': {
    'function': 'is_not_null',
    'arguments': {
      'col_name': 'customer_email'
    }
  },
  'name': 'customer_email_is_null',
  'criticality': 'error'
},
{
  'check': {
    'function': 'is_not_null',
    'arguments': {
      'col_name': 'is_active'
    }
  },
  'name': 'is_active_is_null',
  'criticality': 'error'
},
{
  'check': {
    'function': 'is_not_null',
    'arguments': {
      'col_name': 'start_date'
    }
  },
  'name': 'start_date_is_null',
  'criticality': 'error'
}]

Step 4: Define custom Data Quality Expectations

In addition to the automatically generated checks, you can define your own custom rules to enforce business-specific data quality requirements. This is particularly useful when your organization has unique validation criteria that aren’t covered by the default checks. By using a configuration-driven approach (e.g., YAML), you can easily maintain and update these rules without modifying your pipeline code.

For example, you might want to enforce that:

  • Customer IDs must not be null or empty.
  • Email addresses must match a specific domain format (e.x: @example.com).
# Define custom data quality expectations.
import yaml

checks_custom = yaml.safe_load("""
- check:
    arguments:
        col_name: customer_id
    function: is_not_null_and_not_empty
    criticality: error
    name: customer_id_is_null
- check:
    arguments:
        col_name: customer_email
        regex: '^[A-Za-z0-9._%+-]+@example\.com$'
    function: regex_match
    criticality: error
    name: customer_emaild_is_not_valid""")
# Validate the custom data quality checks
status = DQEngine.validate_checks(checks_custom)

# The above variable for the custom config yaml file can also be pased from workspace file path as given below:
status = DQEngine.validate_checks("path to yaml file in workspace")

# Assert that there are no errors in the validation status
assert not status.has_errors

Step 5: Applying the custom rules and generating results

Once your custom data quality rules have been defined and validated, the next step is to apply them to your input data. The DQEngine facilitates this by splitting your dataset into two categories:

  • Silver Data: Records that meet all quality expectations.
  • Quarantined Data: Records that fail one or more quality checks.

This approach allows you to separate valid and invalid data for further inspection and remediation. The valid records can proceed to downstream processes, while the quarantined records can be analyzed to determine the cause of failure (e.g., missing values, incorrect formats).

Here’s how you can apply the rules and generate the results:

# Create a DQEngine instance with the WorkspaceClient
dq_engine = DQEngine(WorkspaceClient())

# Apply quality checks and split the DataFrame into silver and quarantine DataFrames
silver_df, quarantine_df = dq_engine.apply_checks_by_metadata_and_split(input_df_1, checks_custom)
Quarantined data – Not matching the rules

Summary

In essence, data quality is no longer just an IT concern; it’s a fundamental business imperative. In today’s complex and competitive landscape, the success of an organization hinges on its ability to leverage high-quality, trusted data for every strategic and operational decision.

A Data Quality Framework (DQX) helps organizations:

  • Establish clear quality standards
  • Implement automated checks
  • Track and resolve issues
  • Ensure trust in data

https://databrickslabs.github.io/dqx/docs/motivation

https://medium.com/@nivethanvenkat28/revolutionizing-data-quality-checks-using-databricks-dqx-f2a49d83c3c6