Python Source

Applies To:

Pipeline Bundle

Configuration Scope:

Data Flow Spec

Databricks Docs:

NA

Overview

You can specify a Python function as a source type in your Data Flow Specs. These allow for flexibility and more complex data retrieval to be supported, as needed, without overly complicating the Framework.

There are two approaches to defining Python sources:

  1. Extensions: Define functions in the src/extensions/ directory and reference them by module name using pythonModule

  2. File Path: Define functions in ./python_functions/ directories and reference by file path using functionPath

Sample Bundle

Samples are available in the bronze_sample bundle in the src/dataflows/feature_samples folder.

Configuration

Using Extensions

The extensions approach allows you to organize your Python source functions in a central location and import them as standard Python modules.

1. Create an Extension Module

Create your source functions in the extensions/ directory at the bundle root:

my_pipeline_bundle/
├── src/
│   ├── extensions/
│   │   └── sources.py      # Your source functions
│   ├── dataflows/
│   │   └── ...

Your extension module can contain multiple functions. Each function must:

  • Accept spark (SparkSession) and tokens (Dict) as parameters

  • Return a DataFrame

# src/extensions/sources.py
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from typing import Dict

def get_customer_cdf(spark: SparkSession, tokens: Dict) -> DataFrame:
    """
    Get customer data with Change Data Feed enabled.
    """
    source_table = tokens["sourceTable"]
    reader_options = {
        "readChangeFeed": "true"
    }

    df = spark.readStream.options(**reader_options).table(source_table)
    return df.withColumn("TEST_COLUMN", F.lit("testing from extension..."))

def get_orders_batch(spark: SparkSession, tokens: Dict) -> DataFrame:
    """
    Get orders data as a batch read.
    """
    source_table = tokens["sourceTable"]
    return spark.read.table(source_table)

2. Reference in Data Flow Spec

Use pythonModule in sourceDetails to reference your function:

{
    "dataFlowId": "feature_python_extension_source",
    "dataFlowGroup": "feature_samples",
    "dataFlowType": "standard",
    "sourceSystem": "testSystem",
    "sourceType": "python",
    "sourceViewName": "v_feature_python_extension_source",
    "sourceDetails": {
        "tokens": {
            "sourceTable": "{staging_schema}.customer"
        },
        "pythonModule": "sources.get_customer_cdf"
    },
    "mode": "stream",
    "targetFormat": "delta",
    "targetDetails": {
        "table": "feature_python_extension_source",
        "tableProperties": {
            "delta.enableChangeDataFeed": "true"
        }
    }
}

Using File Path

To define a python source function using file paths, create a python_functions folder under the base folder for your dataflowspec:

my_pipeline_bundle/
├── src/
│   ├── dataflows/
│   │   ├── use_case_1/
│   │   │   ├── dataflowspec/
│   │   │   │   └── my_data_flow_spec_main.json
│   │   │   ├── python_functions/
│   │   │   │   └── my_source_function.py
│   │   │   └── schemas/

Your file must contain a function called get_df that:

  • Accepts spark (SparkSession) and tokens (Dict) as parameters

  • Returns a DataFrame

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from typing import Dict

def get_df(spark: SparkSession, tokens: Dict) -> DataFrame:
    """
    Get a DataFrame from the source details with applied transformations.
    """
    source_table = tokens["sourceTable"]
    reader_options = {
        "readChangeFeed": "true"
    }

    df = spark.readStream.options(**reader_options).table(source_table)
    return df.withColumn("TEST_COLUMN", F.lit("testing..."))

Reference using functionPath:

{
    "dataFlowId": "feature_python_function_source",
    "dataFlowGroup": "feature_samples",
    "dataFlowType": "standard",
    "sourceSystem": "testSystem",
    "sourceType": "python",
    "sourceViewName": "v_feature_python_function_source",
    "sourceDetails": {
        "tokens": {
            "sourceTable": "{staging_schema}.customer"
        },
        "functionPath": "my_source_function.py"
    },
    "mode": "stream",
    "targetFormat": "delta",
    "targetDetails": {
        "table": "feature_python_function_source"
    }
}

sourceDetails Schema for Python Source

When using sourceType: "python", the sourceDetails object supports the following properties:

Property

Required

Description

pythonModule

One of pythonModule/functionPath

Module and function reference (e.g., sources.get_customer_cdf). The module must be in the src/extensions/ directory.

functionPath

One of pythonModule/functionPath

Path to a Python file containing a get_df function. Resolved relative to the ./python_functions/ directory.

tokens

No

Dictionary of token values to pass to the source function. Supports substitution variables like {staging_schema}.

Function Signatures

For Extensions (pythonModule)

The function name can be anything, but it must accept spark and tokens:

def my_source_function(spark: SparkSession, tokens: Dict) -> DataFrame:
    ...

For File Path (functionPath)

The function must be named get_df:

def get_df(spark: SparkSession, tokens: Dict) -> DataFrame:
    ...

Additional Resources

Refer to the Data Flow Spec - Source Details section of the Data Flow Spec Reference documentation for more information on source configuration.