Python Extensions
Applies To: |
Pipeline Bundle |
Configuration Scope: |
Pipeline |
Overview
Python Extensions allow data engineers to write custom Python modules that extend the framework’s capabilities. Extensions are organized in a central extensions/ directory and can be imported as standard Python modules throughout your dataflow specifications.
Important
Extensions provide a powerful mechanism for implementing custom logic—sources, transforms, and sinks—while maintaining clean separation between framework code and business logic.
This feature allows development teams to:
Centralize Custom Logic: Organize all custom Python code in one location
Reuse Across Dataflows: Reference the same functions from multiple dataflow specs
Maintain Clean Imports: Use standard Python module imports (e.g.,
transforms.my_function)Manage Dependencies: Install additional Python packages via
requirements_additional.txtTest Independently: Extensions can be unit tested outside of Spark Declarative Pipelines
Note
Extensions are loaded during pipeline initialization when the framework adds the extensions/ directory to the Python path. Any additional dependencies specified in requirements_additional.txt are installed before the pipeline starts.
How It Works
The extension system consists of three main components:
Extensions Directory: A
src/extensions/folder in your pipeline bundle containing Python modulesModule References: Dataflow specs reference extension functions using
modulesyntax (e.g.,transforms.my_function)Dependency Management: Optional
requirements_additional.txtfiles for installing pip packages
Directory Structure
Extensions live in the src/extensions/ directory of your pipeline bundle:
my_pipeline_bundle/
├── src/
│ ├── extensions/
│ │ ├── __init__.py # Optional, for package imports
│ │ ├── sources.py # Custom source functions
│ │ ├── transforms.py # Custom transform functions
│ │ └── sinks.py # Custom sink functions
│ ├── dataflows/
│ │ └── ...
│ └── pipeline_configs/
│ └── ...
└── requirements_additional.txt # Optional pip dependencies
Dependency Management
Extensions may require additional Python packages beyond the framework’s core dependencies. For detailed information on managing Python dependencies, see Python Dependency Management.
Extension Examples
Source Extensions
Custom functions that generate DataFrames for use as data sources.
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from typing import Dict
def get_customer_cdf(spark: SparkSession, tokens: Dict) -> DataFrame:
"""
Get customer data with Change Data Feed enabled.
"""
source_table = tokens["sourceTable"]
reader_options = {"readChangeFeed": "true"}
return (
spark.readStream
.options(**reader_options)
.table(source_table)
)
def get_api_data(spark: SparkSession, tokens: Dict) -> DataFrame:
"""
Fetch data from an external API.
"""
import requests # From requirements_additional.txt
api_url = tokens["apiUrl"]
response = requests.get(api_url)
data = response.json()
return spark.createDataFrame(data)
Reference in Dataflow Spec:
{
"dataFlowId": "customer_from_extension",
"dataFlowGroup": "my_dataflows",
"dataFlowType": "standard",
"sourceSystem": "custom",
"sourceType": "python",
"sourceViewName": "v_customer",
"sourceDetails": {
"tokens": {
"sourceTable": "{staging_schema}.customer"
},
"pythonModule": "sources.get_customer_cdf"
},
"mode": "stream",
"targetFormat": "delta",
"targetDetails": {
"table": "customer"
}
}
dataFlowId: customer_from_extension
dataFlowGroup: my_dataflows
dataFlowType: standard
sourceSystem: custom
sourceType: python
sourceViewName: v_customer
sourceDetails:
tokens:
sourceTable: '{staging_schema}.customer'
pythonModule: sources.get_customer_cdf
mode: stream
targetFormat: delta
targetDetails:
table: customer
Transform Extensions
Custom functions that transform DataFrames after they are read from a source.
Function Signatures:
# Without tokens
def my_transform(df: DataFrame) -> DataFrame:
...
# With tokens
def my_transform_with_tokens(df: DataFrame, tokens: Dict) -> DataFrame:
...
Example:
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from typing import Dict
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
def explode_deletes_function_transform(df: DataFrame) -> DataFrame:
"""
Duplicates delete records and adjusts sequence_by timestamp.
For deletes: is_delete=0 gets +1ms, is_delete=1 gets +2ms.
"""
# Create array: [0,1] for deletes, [0] for others, then explode
sequence_column = "LOAD_TIMESTAMP"
change_type_column = "meta_cdc_operation"
is_delete = F.col(change_type_column) == "delete"
array_col = F.when(is_delete, F.array(F.lit(0), F.lit(1))).otherwise(F.array(F.lit(0)))
return (
df.withColumnRenamed("_change_type", change_type_column)
.withColumn("is_delete", F.explode(array_col))
.withColumn(
sequence_column,
F.when(is_delete & (F.col("is_delete") == 0),
F.col(sequence_column) + F.expr("INTERVAL 1 millisecond"))
.when(is_delete & (F.col("is_delete") == 1),
F.col(sequence_column) + F.expr("INTERVAL 2 millisecond"))
.otherwise(F.col(sequence_column))
)
)
Reference in Dataflow Spec:
{
"dataFlowId": "customer",
"dataFlowGroup": "my_dataflows",
"dataFlowType": "standard",
"sourceSystem": "erp",
"sourceType": "delta",
"sourceViewName": "v_customer",
"sourceDetails": {
"database": "{bronze_schema}",
"table": "customer",
"cdfEnabled": true,
"pythonTransform": {
"module": "transforms.explode_deletes_function_transform",
}
},
"mode": "stream",
"targetFormat": "delta",
"targetDetails": {
"table": "customer"
}
}
dataFlowId: customer
dataFlowGroup: my_dataflows
dataFlowType: standard
sourceSystem: erp
sourceType: delta
sourceViewName: v_customer
sourceDetails:
database: '{bronze_schema}'
table: customer
cdfEnabled: true
pythonTransform:
module: transforms.explode_deletes_function_transform
mode: stream
targetFormat: delta
targetDetails:
table: customer_aggregated
Sink Extensions
Custom functions for foreach_batch_sink targets that process micro-batches.
Function Signature:
def my_batch_handler(df: DataFrame, batch_id: int, tokens: Dict) -> None:
"""
Process a micro-batch of data.
Args:
df: The micro-batch DataFrame
batch_id: The batch identifier
tokens: Dictionary of token values from the dataflow spec
"""
...
Example:
from pyspark.sql import DataFrame
from typing import Dict
def write_to_external_api(df: DataFrame, batch_id: int, tokens: Dict) -> None:
"""
Send each batch to an external API.
"""
import requests # From requirements_additional.txt
api_url = tokens["apiUrl"]
api_key = tokens["apiKey"]
# Convert to JSON and send
records = df.toJSON().collect()
for record in records:
requests.post(
api_url,
headers={"Authorization": f"Bearer {api_key}"},
json=record
)
Reference in Dataflow Spec:
{
"dataFlowId": "customer_to_api",
"dataFlowGroup": "my_dataflows",
"dataFlowType": "standard",
"sourceSystem": "erp",
"sourceType": "delta",
"sourceViewName": "v_customer_api",
"sourceDetails": {
"database": "{silver_schema}",
"table": "customer",
"cdfEnabled": true
},
"mode": "stream",
"targetFormat": "foreach_batch_sink",
"targetDetails": {
"name": "customer_api_sink",
"type": "python_function",
"config": {
"module": "sinks.write_to_external_api",
"tokens": {
"apiUrl": "https://api.example.com/customers",
"apiKey": "{api_secret_key}"
}
}
}
}
dataFlowId: customer_to_api
dataFlowGroup: my_dataflows
dataFlowType: standard
sourceSystem: erp
sourceType: delta
sourceViewName: v_customer_api
sourceDetails:
database: '{silver_schema}'
table: customer
cdfEnabled: true
mode: stream
targetFormat: foreach_batch_sink
targetDetails:
name: customer_api_sink
type: python_function
config:
module: sinks.write_to_external_api
tokens:
apiUrl: https://api.example.com/customers
apiKey: '{api_secret_key}'
Additional Resources
Python Dependency Management - Managing Python dependencies
Python Source - Using Python as a source type
Python Function Transforms - Python transform functions (file path approach)
Data Flow Spec - Source Details - Complete source configuration reference
Target Details Reference - Complete target configuration reference