Project Structure: Modular and Scalable
This project is designed to be modular and easy to maintain. The basic structure is as follows:
project/
│
├── config/
│ ├── config_loader.py
│ └── database_config.py
│
├── connection/
│ └── database_connection.py
│
├── processing/
│ └── database_processor.py
│
├── validation/
│ ├── validator.py
│ └── great_expectations.yml
│
├── test/
│ └── customers_test.py
│
└── .env
.env
File: Environment Configuration
The .env
file contains sensitive information, such as database credentials and environment-specific configurations, and should never be committed to version control for security reasons. To ensure this, add the .env
file to your .gitignore
file:
# .gitignore
.env
This practice prevents sensitive information from being exposed in your Git repository, keeping it secure and accessible only in your local environment.
Configuring and Connecting to the Database
config_loader.py
: Loading Configuration
The config/config_loader.py
file uses environment variables to load the database connection configuration.
import os
from dotenv import load_dotenv
from config.database_config import DatabaseConfig
# Loading environment variables from the .env file
load_dotenv()
class ConfigLoader:
@staticmethod
def load_config(env="development"):
"""Load the database configuration according to the specified environment."""
if env == "development":
return DatabaseConfig(
server=os.getenv("DEV_DB_SERVER", "DBServer"),
database=os.getenv("DEV_DB_DATABASE", "DBName"),
username=os.getenv("DEV_DB_USERNAME", "User"),
password=os.getenv("DEV_DB_PASSWORD", "password")
)
elif env == "production":
return DatabaseConfig(
server=os.getenv("PROD_DB_SERVER"),
database=os.getenv("PROD_DB_DATABASE"),
username=os.getenv("PROD_DB_USERNAME"),
password=os.getenv("PROD_DB_PASSWORD")
)
else:
raise ValueError(f"Unknown environment: {env}")
database_connection.py
: Establishing the Connection
In connection/database_connection.py
, we define the DatabaseConnection
class, which establishes and manages the connection to the database:
import pyodbc
# Class to manage the database connection
class DatabaseConnection:
def __init__(self, config):
self.config = config
self.connection = None
def connect(self):
try:
self.connection = pyodbc.connect(self.config.get_connection_string())
print("Successfully connected to the database.")
except Exception as e:
print("Error connecting to the database:", e)
def close_connection(self):
if self.connection:
self.connection.close()
print("Connection closed.")
Processing and Retrieving Data
database_processor.py
: Processing Data
The DatabaseProcessor
module manages database connection and querying, returning results in a pandas DataFrame
.
import pandas as pd
from connection.database_connection import DatabaseConnection
from validation.validator import DataValidator
class DatabaseProcessor:
def __init__(self, config):
self.config = config
self.db_connection = None
self.data = None
def connect(self):
"""Establish a database connection."""
self.db_connection = DatabaseConnection(self.config)
self.db_connection.connect()
def fetch_data(self, query):
"""Execute a SQL query and retrieve data as a pandas DataFrame."""
if self.db_connection is None:
raise ConnectionError("No database connection available.")
self.data = pd.read_sql(query, self.db_connection.connection)
return self.data
def close_connection(self):
"""Close the database connection."""
if self.db_connection:
self.db_connection.close_connection()
def validate_data(self, expectations_file):
"""Initialize data validation process."""
if self.data is not None:
validator = DataValidator(self.data, expectations_file)
validator.run_validations()
else:
print("No data available for validation.")
Data Validation with Great Expectations
validator.py
: Validation Class
We use Great Expectations to define validation rules in validator.py
. The rules are specified in a YAML file.
import great_expectations as ge
import yaml
class DataValidator:
def __init__(self, dataframe, expectations_file):
self.data_ge = ge.from_pandas(dataframe)
self.expectations_file = expectations_file
def load_expectations(self):
"""Load expectations from a YAML file."""
with open(self.expectations_file, 'r') as file:
expectations = yaml.safe_load(file).get('expectations', [])
return expectations
def run_validations(self):
"""Run validations based on expectations loaded from YAML file."""
expectations = self.load_expectations()
all_success = True
for expectation in expectations:
expectation_type = expectation['expectation_type']
kwargs = expectation['kwargs']
result = getattr(self.data_ge, expectation_type)(**kwargs)
success = result.success
message = f"{expectation_type} on {kwargs.get('column')}: {'Passed' if success else 'Failed'}"
print(message)
all_success = all_success and success
if all_success:
print("All validations passed.")
else:
print("Some validations failed.")
great_expectations.yml
: Defining Expectations
The YAML file specifies rules, such as primary key uniqueness or non-null constraints, ensuring that data meets certain standards.
expectations:
- expectation_type: expect_column_values_to_be_unique
kwargs:
column: CustomerID
- expectation_type: expect_column_values_to_not_be_null
kwargs:
column: Email
- expectation_type: expect_column_value_lengths_to_be_between
kwargs:
column: Phone
min_value: 0
max_value: 15
Full Workflow Execution
customers_test.py
: Sample Execution
The test script runs the complete workflow: loading configuration, connecting to the database, extracting data, and validating it.
import sys
import os
# Adding the project root directory to sys.path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from config import ConfigLoader
from processing import DatabaseProcessor
import os
# Main execution
if __name__ == "__main__":
env = os.getenv("APP_ENV", "development")
print(f"Starting application in '{env}' environment.")
try:
config = ConfigLoader.load_config(env)
print(f"Connecting to database '{config.database}' at '{config.server}' for '{env}' environment.")
processor = DatabaseProcessor(config)
processor.connect()
query = "SELECT * FROM dbo.Customers"
processor.fetch_data(query)
expectations_file = "./validation/great_expectations.yml"
processor.validate_data(expectations_file)
except Exception as e:
print("Critical error:", e)
finally:
processor.close_connection()
Setting Up Environment Variables in a CI/CD Pipeline
In a CI/CD pipeline, such as those provided by GitHub Actions, GitLab CI, or Jenkins, you’ll need to securely manage and access these environment variables for database connections and other configurations.
Steps for Configuring Environment Variables in CI/CD
Define Variables in the Pipeline Configuration: Most CI/CD platforms allow you to securely store environment variables in their settings. For example, in GitHub Actions, you can add secrets by navigating to Settings > Secrets and variables > Actions > New repository secret. Add each variable (e.g.,
DEV_DB_SERVER
,DEV_DB_DATABASE
) there.Access the Variables in the Pipeline: Once added to the pipeline, these variables can be accessed the same way as environment variables in your code. When the pipeline runs, the variables will be injected into the environment, allowing the database connection and other configurations to work seamlessly.
Sample Pipeline Configuration: Here’s a brief example of how to set this up in GitHub Actions:
# .github/workflows/test.yml
name: Database Validation Pipeline
on:
push:
branches:
- main
jobs:
run-tests:
runs-on: ubuntu-latest
env:
DEV_DB_SERVER: ${{ secrets.DEV_DB_SERVER }}
DEV_DB_DATABASE: ${{ secrets.DEV_DB_DATABASE }}
DEV_DB_USERNAME: ${{ secrets.DEV_DB_USERNAME }}
DEV_DB_PASSWORD: ${{ secrets.DEV_DB_PASSWORD }}
APP_ENV: development
steps:
- name: Check out code
uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.8'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Run database validations
run: python test/customers_test.py
In this example, the secrets are accessed securely in the env
section and are available for the script to use when connecting to the database.
Conclusion
This project illustrates how to structure a robust data validation workflow in Python, integrating Great Expectations to ensure data quality in ETL and data analysis projects. With its modular organization and environment separation, this approach enables you to validate data before analysis, reducing errors and improving informed decision-making.
Key Benefits
Guaranteed Data Quality: Detects data issues before they impact critical analysis.
Scalability and Flexibility: Easily adapt to new environments and database configurations.
Integration with Great Expectations: A powerful and flexible tool for data validation in projects of any size.
Secure Management of Sensitive Information: The
.env
file is ignored in Git, and sensitive environment variables are securely handled in CI/CD pipelines.
This modular and proactive approach to data management is essential for organizations that rely on data quality for strategic decision-making.
Github repo: https://github.com/engaudy/Sample