Data Validation in Python with Great Expectations: Elevating data quality in ETL

In ETL and data analysis projects, data quality is crucial. An error in a column or a duplicate value can have serious repercussions. Great Expectations enables us to establish validation rules to proactively detect these errors. This project demonstrates a complete workflow, from configuration and connection to data validation.

Project Structure: Modular and Scalable

This project is designed to be modular and easy to maintain. The basic structure is as follows:

project/
│
├── config/
│   ├── config_loader.py
│   └── database_config.py
│
├── connection/
│   └── database_connection.py
│
├── processing/
│   └── database_processor.py
│
├── validation/
│   ├── validator.py
│   └── great_expectations.yml
│
├── test/
│   └── customers_test.py
│
└── .env

.env File: Environment Configuration

The .env file contains sensitive information, such as database credentials and environment-specific configurations, and should never be committed to version control for security reasons. To ensure this, add the .env file to your .gitignore file:

# .gitignore
.env

This practice prevents sensitive information from being exposed in your Git repository, keeping it secure and accessible only in your local environment.

Configuring and Connecting to the Database

config_loader.py: Loading Configuration

The config/config_loader.py file uses environment variables to load the database connection configuration.

import os
from dotenv import load_dotenv
from config.database_config import DatabaseConfig

# Loading environment variables from the .env file
load_dotenv()

class ConfigLoader:
    @staticmethod
    def load_config(env="development"):
        """Load the database configuration according to the specified environment."""
        if env == "development":
            return DatabaseConfig(
                server=os.getenv("DEV_DB_SERVER", "DBServer"),
                database=os.getenv("DEV_DB_DATABASE", "DBName"),
                username=os.getenv("DEV_DB_USERNAME", "User"),
                password=os.getenv("DEV_DB_PASSWORD", "password")
            )
        elif env == "production":
            return DatabaseConfig(
                server=os.getenv("PROD_DB_SERVER"),
                database=os.getenv("PROD_DB_DATABASE"),
                username=os.getenv("PROD_DB_USERNAME"),
                password=os.getenv("PROD_DB_PASSWORD")
            )
        else:
            raise ValueError(f"Unknown environment: {env}")

database_connection.py: Establishing the Connection

In connection/database_connection.py, we define the DatabaseConnection class, which establishes and manages the connection to the database:

import pyodbc

# Class to manage the database connection
class DatabaseConnection:
    def __init__(self, config):
        self.config = config
        self.connection = None

    def connect(self):
        try:
            self.connection = pyodbc.connect(self.config.get_connection_string())
            print("Successfully connected to the database.")
        except Exception as e:
            print("Error connecting to the database:", e)

    def close_connection(self):
        if self.connection:
            self.connection.close()
            print("Connection closed.")

Processing and Retrieving Data

database_processor.py: Processing Data

The DatabaseProcessor module manages database connection and querying, returning results in a pandas DataFrame.

import pandas as pd
from connection.database_connection import DatabaseConnection
from validation.validator import DataValidator

class DatabaseProcessor:
    def __init__(self, config):
        self.config = config
        self.db_connection = None
        self.data = None

    def connect(self):
        """Establish a database connection."""
        self.db_connection = DatabaseConnection(self.config)
        self.db_connection.connect()

    def fetch_data(self, query):
        """Execute a SQL query and retrieve data as a pandas DataFrame."""
        if self.db_connection is None:
            raise ConnectionError("No database connection available.")
        self.data = pd.read_sql(query, self.db_connection.connection)
        return self.data

    def close_connection(self):
        """Close the database connection."""
        if self.db_connection:
            self.db_connection.close_connection()

    def validate_data(self, expectations_file):
        """Initialize data validation process."""
        if self.data is not None:
            validator = DataValidator(self.data, expectations_file)
            validator.run_validations()
        else:
            print("No data available for validation.")

Data Validation with Great Expectations

validator.py: Validation Class

We use Great Expectations to define validation rules in validator.py. The rules are specified in a YAML file.

import great_expectations as ge
import yaml

class DataValidator:
    def __init__(self, dataframe, expectations_file):
        self.data_ge = ge.from_pandas(dataframe)
        self.expectations_file = expectations_file

    def load_expectations(self):
        """Load expectations from a YAML file."""
        with open(self.expectations_file, 'r') as file:
            expectations = yaml.safe_load(file).get('expectations', [])
        return expectations

    def run_validations(self):
        """Run validations based on expectations loaded from YAML file."""
        expectations = self.load_expectations()
        all_success = True

        for expectation in expectations:
            expectation_type = expectation['expectation_type']
            kwargs = expectation['kwargs']
            result = getattr(self.data_ge, expectation_type)(**kwargs)
            success = result.success
            message = f"{expectation_type} on {kwargs.get('column')}: {'Passed' if success else 'Failed'}"
            print(message)
            all_success = all_success and success

        if all_success:
            print("All validations passed.")
        else:
            print("Some validations failed.")

great_expectations.yml: Defining Expectations

The YAML file specifies rules, such as primary key uniqueness or non-null constraints, ensuring that data meets certain standards.

expectations:
  - expectation_type: expect_column_values_to_be_unique
    kwargs:
      column: CustomerID

  - expectation_type: expect_column_values_to_not_be_null
    kwargs:
      column: Email

  - expectation_type: expect_column_value_lengths_to_be_between
    kwargs:
      column: Phone
      min_value: 0
      max_value: 15

Full Workflow Execution

customers_test.py: Sample Execution

The test script runs the complete workflow: loading configuration, connecting to the database, extracting data, and validating it.

import sys
import os

# Adding the project root directory to sys.path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))

from config import ConfigLoader
from processing import DatabaseProcessor
import os

# Main execution
if __name__ == "__main__":
    env = os.getenv("APP_ENV", "development")
    print(f"Starting application in '{env}' environment.")

    try:
        config = ConfigLoader.load_config(env)
        print(f"Connecting to database '{config.database}' at '{config.server}' for '{env}' environment.")

        processor = DatabaseProcessor(config)
        processor.connect()

        query = "SELECT * FROM dbo.Customers"
        processor.fetch_data(query)

        expectations_file = "./validation/great_expectations.yml"
        processor.validate_data(expectations_file)

    except Exception as e:
        print("Critical error:", e)

    finally:
        processor.close_connection()

Setting Up Environment Variables in a CI/CD Pipeline

In a CI/CD pipeline, such as those provided by GitHub Actions, GitLab CI, or Jenkins, you’ll need to securely manage and access these environment variables for database connections and other configurations.

Steps for Configuring Environment Variables in CI/CD

  1. Define Variables in the Pipeline Configuration: Most CI/CD platforms allow you to securely store environment variables in their settings. For example, in GitHub Actions, you can add secrets by navigating to Settings > Secrets and variables > Actions > New repository secret. Add each variable (e.g., DEV_DB_SERVER, DEV_DB_DATABASE) there.

  2. Access the Variables in the Pipeline: Once added to the pipeline, these variables can be accessed the same way as environment variables in your code. When the pipeline runs, the variables will be injected into the environment, allowing the database connection and other configurations to work seamlessly.

  3. Sample Pipeline Configuration: Here’s a brief example of how to set this up in GitHub Actions:

# .github/workflows/test.yml
name: Database Validation Pipeline

on:
  push:
    branches:
      - main

jobs:
  run-tests:
    runs-on: ubuntu-latest
    env:
      DEV_DB_SERVER: ${{ secrets.DEV_DB_SERVER }}
      DEV_DB_DATABASE: ${{ secrets.DEV_DB_DATABASE }}
      DEV_DB_USERNAME: ${{ secrets.DEV_DB_USERNAME }}
      DEV_DB_PASSWORD: ${{ secrets.DEV_DB_PASSWORD }}
      APP_ENV: development
    steps:
      - name: Check out code
        uses: actions/checkout@v2
      - name: Set up Python
        uses: actions/setup-python@v2
        with:
          python-version: '3.8'
      - name: Install dependencies
        run: |
          python -m pip install --upgrade pip
          pip install -r requirements.txt
      - name: Run database validations
        run: python test/customers_test.py

In this example, the secrets are accessed securely in the env section and are available for the script to use when connecting to the database.

Conclusion

This project illustrates how to structure a robust data validation workflow in Python, integrating Great Expectations to ensure data quality in ETL and data analysis projects. With its modular organization and environment separation, this approach enables you to validate data before analysis, reducing errors and improving informed decision-making.

Key Benefits

  • Guaranteed Data Quality: Detects data issues before they impact critical analysis.

  • Scalability and Flexibility: Easily adapt to new environments and database configurations.

  • Integration with Great Expectations: A powerful and flexible tool for data validation in projects of any size.

  • Secure Management of Sensitive Information: The .env file is ignored in Git, and sensitive environment variables are securely handled in CI/CD pipelines.

This modular and proactive approach to data management is essential for organizations that rely on data quality for strategic decision-making.

Github repo: https://github.com/engaudy/Sample

Thanks for reading.
Now let's get to know each other.

What we do

WAES supports organizations with various solutions at every stage of their digital transformation.

Discover solutions

Work at WAES

Are you ready to start your relocation to the Netherlands? Have a look at our jobs.

Discover jobs

Let's shape our future

Work at WAES

Start a new chapter. Join our team of Modern Day Spartans.

Discover jobs

Work with WAES

Camilo Parra Gonzalez

Camilo Parra Gonzalez

Account Manager