Python Extensions

Applies To:

Pipeline Bundle

Configuration Scope:

Pipeline

Overview

Python Extensions allow data engineers to write custom Python modules that extend the framework’s capabilities. Extensions are organized in a central extensions/ directory and can be imported as standard Python modules throughout your dataflow specifications.

Important

Extensions provide a powerful mechanism for implementing custom logic—sources, transforms, and sinks—while maintaining clean separation between framework code and business logic.

This feature allows development teams to:

  • Centralize Custom Logic: Organize all custom Python code in one location

  • Reuse Across Dataflows: Reference the same functions from multiple dataflow specs

  • Maintain Clean Imports: Use standard Python module imports (e.g., transforms.my_function)

  • Manage Dependencies: Install additional Python packages via requirements_additional.txt

  • Test Independently: Extensions can be unit tested outside of Spark Declarative Pipelines

Note

Extensions are loaded during pipeline initialization when the framework adds the extensions/ directory to the Python path. Any additional dependencies specified in requirements_additional.txt are installed before the pipeline starts.

How It Works

The extension system consists of three main components:

  1. Extensions Directory: A src/extensions/ folder in your pipeline bundle containing Python modules

  2. Module References: Dataflow specs reference extension functions using module syntax (e.g., transforms.my_function)

  3. Dependency Management: Optional requirements_additional.txt files for installing pip packages

Directory Structure

Extensions live in the src/extensions/ directory of your pipeline bundle:

my_pipeline_bundle/
├── src/
│   ├── extensions/
│   │   ├── __init__.py           # Optional, for package imports
│   │   ├── sources.py            # Custom source functions
│   │   ├── transforms.py         # Custom transform functions
│   │   └── sinks.py              # Custom sink functions
│   ├── dataflows/
│   │   └── ...
│   └── pipeline_configs/
│       └── ...
└── requirements_additional.txt   # Optional pip dependencies

Dependency Management

Extensions may require additional Python packages beyond the framework’s core dependencies. For detailed information on managing Python dependencies, see Python Dependency Management.

Extension Examples

Source Extensions

Custom functions that generate DataFrames for use as data sources.

src/extensions/sources.py
 from pyspark.sql import DataFrame, SparkSession
 from pyspark.sql import functions as F
 from typing import Dict

 def get_customer_cdf(spark: SparkSession, tokens: Dict) -> DataFrame:
     """
     Get customer data with Change Data Feed enabled.
     """
     source_table = tokens["sourceTable"]
     reader_options = {"readChangeFeed": "true"}

     return (
         spark.readStream
         .options(**reader_options)
         .table(source_table)
     )

 def get_api_data(spark: SparkSession, tokens: Dict) -> DataFrame:
     """
     Fetch data from an external API.
     """
     import requests  # From requirements_additional.txt

     api_url = tokens["apiUrl"]
     response = requests.get(api_url)
     data = response.json()

     return spark.createDataFrame(data)

Reference in Dataflow Spec:

{
    "dataFlowId": "customer_from_extension",
    "dataFlowGroup": "my_dataflows",
    "dataFlowType": "standard",
    "sourceSystem": "custom",
    "sourceType": "python",
    "sourceViewName": "v_customer",
    "sourceDetails": {
        "tokens": {
            "sourceTable": "{staging_schema}.customer"
        },
        "pythonModule": "sources.get_customer_cdf"
    },
    "mode": "stream",
    "targetFormat": "delta",
    "targetDetails": {
        "table": "customer"
    }
}

Transform Extensions

Custom functions that transform DataFrames after they are read from a source.

Function Signatures:

# Without tokens
def my_transform(df: DataFrame) -> DataFrame:
    ...

# With tokens
def my_transform_with_tokens(df: DataFrame, tokens: Dict) -> DataFrame:
    ...

Example:

src/extensions/transforms.py
 from pyspark.sql import DataFrame
 from pyspark.sql import functions as F
 from typing import Dict

 from pyspark.sql import DataFrame
 from pyspark.sql import functions as F

 def explode_deletes_function_transform(df: DataFrame) -> DataFrame:
     """
     Duplicates delete records and adjusts sequence_by timestamp.
     For deletes: is_delete=0 gets +1ms, is_delete=1 gets +2ms.
     """
     # Create array: [0,1] for deletes, [0] for others, then explode
     sequence_column = "LOAD_TIMESTAMP"
     change_type_column = "meta_cdc_operation"

     is_delete = F.col(change_type_column) == "delete"
     array_col = F.when(is_delete, F.array(F.lit(0), F.lit(1))).otherwise(F.array(F.lit(0)))

     return (
         df.withColumnRenamed("_change_type", change_type_column)
         .withColumn("is_delete", F.explode(array_col))
         .withColumn(
             sequence_column,
             F.when(is_delete & (F.col("is_delete") == 0),
                 F.col(sequence_column) + F.expr("INTERVAL 1 millisecond"))
             .when(is_delete & (F.col("is_delete") == 1),
                 F.col(sequence_column) + F.expr("INTERVAL 2 millisecond"))
             .otherwise(F.col(sequence_column))
         )
     )

Reference in Dataflow Spec:

{
    "dataFlowId": "customer",
    "dataFlowGroup": "my_dataflows",
    "dataFlowType": "standard",
    "sourceSystem": "erp",
    "sourceType": "delta",
    "sourceViewName": "v_customer",
    "sourceDetails": {
        "database": "{bronze_schema}",
        "table": "customer",
        "cdfEnabled": true,
        "pythonTransform": {
            "module": "transforms.explode_deletes_function_transform",
        }
    },
    "mode": "stream",
    "targetFormat": "delta",
    "targetDetails": {
        "table": "customer"
    }
}

Sink Extensions

Custom functions for foreach_batch_sink targets that process micro-batches.

Function Signature:

def my_batch_handler(df: DataFrame, batch_id: int, tokens: Dict) -> None:
    """
    Process a micro-batch of data.

    Args:
        df: The micro-batch DataFrame
        batch_id: The batch identifier
        tokens: Dictionary of token values from the dataflow spec
    """
    ...

Example:

src/extensions/sinks.py
 from pyspark.sql import DataFrame
 from typing import Dict

 def write_to_external_api(df: DataFrame, batch_id: int, tokens: Dict) -> None:
     """
     Send each batch to an external API.
     """
     import requests  # From requirements_additional.txt

     api_url = tokens["apiUrl"]
     api_key = tokens["apiKey"]

     # Convert to JSON and send
     records = df.toJSON().collect()
     for record in records:
         requests.post(
             api_url,
             headers={"Authorization": f"Bearer {api_key}"},
             json=record
         )

Reference in Dataflow Spec:

{
    "dataFlowId": "customer_to_api",
    "dataFlowGroup": "my_dataflows",
    "dataFlowType": "standard",
    "sourceSystem": "erp",
    "sourceType": "delta",
    "sourceViewName": "v_customer_api",
    "sourceDetails": {
        "database": "{silver_schema}",
        "table": "customer",
        "cdfEnabled": true
    },
    "mode": "stream",
    "targetFormat": "foreach_batch_sink",
    "targetDetails": {
        "name": "customer_api_sink",
        "type": "python_function",
        "config": {
            "module": "sinks.write_to_external_api",
            "tokens": {
                "apiUrl": "https://api.example.com/customers",
                "apiKey": "{api_secret_key}"
            }
        }
    }
}

Additional Resources