Python Source
Applies To: |
Pipeline Bundle |
Configuration Scope: |
Data Flow Spec |
Databricks Docs: |
NA |
Overview
You can specify a Python function as a source type in your Data Flow Specs. These allow for flexibility and more complex data retrieval to be supported, as needed, without overly complicating the Framework.
There are two approaches to defining Python sources:
Extensions: Define functions in the
src/extensions/directory and reference them by module name usingpythonModuleFile Path: Define functions in
./python_functions/directories and reference by file path usingfunctionPath
Sample Bundle
Samples are available in the bronze_sample bundle in the src/dataflows/feature_samples folder.
Configuration
Using Extensions
The extensions approach allows you to organize your Python source functions in a central location and import them as standard Python modules.
1. Create an Extension Module
Create your source functions in the extensions/ directory at the bundle root:
my_pipeline_bundle/
├── src/
│ ├── extensions/
│ │ └── sources.py # Your source functions
│ ├── dataflows/
│ │ └── ...
Your extension module can contain multiple functions. Each function must:
Accept
spark(SparkSession) andtokens(Dict) as parametersReturn a DataFrame
# src/extensions/sources.py
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from typing import Dict
def get_customer_cdf(spark: SparkSession, tokens: Dict) -> DataFrame:
"""
Get customer data with Change Data Feed enabled.
"""
source_table = tokens["sourceTable"]
reader_options = {
"readChangeFeed": "true"
}
df = spark.readStream.options(**reader_options).table(source_table)
return df.withColumn("TEST_COLUMN", F.lit("testing from extension..."))
def get_orders_batch(spark: SparkSession, tokens: Dict) -> DataFrame:
"""
Get orders data as a batch read.
"""
source_table = tokens["sourceTable"]
return spark.read.table(source_table)
2. Reference in Data Flow Spec
Use pythonModule in sourceDetails to reference your function:
{
"dataFlowId": "feature_python_extension_source",
"dataFlowGroup": "feature_samples",
"dataFlowType": "standard",
"sourceSystem": "testSystem",
"sourceType": "python",
"sourceViewName": "v_feature_python_extension_source",
"sourceDetails": {
"tokens": {
"sourceTable": "{staging_schema}.customer"
},
"pythonModule": "sources.get_customer_cdf"
},
"mode": "stream",
"targetFormat": "delta",
"targetDetails": {
"table": "feature_python_extension_source",
"tableProperties": {
"delta.enableChangeDataFeed": "true"
}
}
}
dataFlowId: feature_python_extension_source
dataFlowGroup: feature_samples
dataFlowType: standard
sourceSystem: testSystem
sourceType: python
sourceViewName: v_feature_python_extension_source
sourceDetails:
tokens:
sourceTable: '{staging_schema}.customer'
pythonModule: sources.get_customer_cdf
mode: stream
targetFormat: delta
targetDetails:
table: feature_python_extension_source
tableProperties:
delta.enableChangeDataFeed: 'true'
Using File Path
To define a python source function using file paths, create a python_functions folder under the base folder for your dataflowspec:
my_pipeline_bundle/
├── src/
│ ├── dataflows/
│ │ ├── use_case_1/
│ │ │ ├── dataflowspec/
│ │ │ │ └── my_data_flow_spec_main.json
│ │ │ ├── python_functions/
│ │ │ │ └── my_source_function.py
│ │ │ └── schemas/
Your file must contain a function called get_df that:
Accepts
spark(SparkSession) andtokens(Dict) as parametersReturns a DataFrame
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from typing import Dict
def get_df(spark: SparkSession, tokens: Dict) -> DataFrame:
"""
Get a DataFrame from the source details with applied transformations.
"""
source_table = tokens["sourceTable"]
reader_options = {
"readChangeFeed": "true"
}
df = spark.readStream.options(**reader_options).table(source_table)
return df.withColumn("TEST_COLUMN", F.lit("testing..."))
Reference using functionPath:
{
"dataFlowId": "feature_python_function_source",
"dataFlowGroup": "feature_samples",
"dataFlowType": "standard",
"sourceSystem": "testSystem",
"sourceType": "python",
"sourceViewName": "v_feature_python_function_source",
"sourceDetails": {
"tokens": {
"sourceTable": "{staging_schema}.customer"
},
"functionPath": "my_source_function.py"
},
"mode": "stream",
"targetFormat": "delta",
"targetDetails": {
"table": "feature_python_function_source"
}
}
dataFlowId: feature_python_function_source
dataFlowGroup: feature_samples
dataFlowType: standard
sourceSystem: testSystem
sourceType: python
sourceViewName: v_feature_python_function_source
sourceDetails:
tokens:
sourceTable: '{staging_schema}.customer'
functionPath: my_source_function.py
mode: stream
targetFormat: delta
targetDetails:
table: feature_python_function_source
sourceDetails Schema for Python Source
When using sourceType: "python", the sourceDetails object supports the following properties:
Property |
Required |
Description |
|---|---|---|
|
One of pythonModule/functionPath |
Module and function reference (e.g., |
|
One of pythonModule/functionPath |
Path to a Python file containing a |
|
No |
Dictionary of token values to pass to the source function. Supports substitution variables like |
Function Signatures
For Extensions (pythonModule)
The function name can be anything, but it must accept spark and tokens:
def my_source_function(spark: SparkSession, tokens: Dict) -> DataFrame:
...
For File Path (functionPath)
The function must be named get_df:
def get_df(spark: SparkSession, tokens: Dict) -> DataFrame:
...
Additional Resources
Refer to the Data Flow Spec - Source Details section of the Data Flow Spec Reference documentation for more information on source configuration.