Creating a Standard Data Flow Spec Reference

A standard Data Flow Spec is the most basic type of Data Flow Spec and is suited to basic use cases where you are performing 1:1 ingestion or loads. It is particularly suited to Bronze Ingestion Use Cases.

Example:

The below demonstrates a standard Data Flow Spec for a Bronze ingestion use case (refer to patterns_streaming_basic_1_to_1 for more information):

{
    "dataFlowId": "crm_1",
    "dataFlowGroup": "crm",
    "dataFlowType": "standard",
    "sourceType": "delta",
    "sourceSystem": "crm",
    "sourceViewName": "v_customer_address",
    "sourceDetails": {
        "database": "source_db",
        "table": "customer_address",
        "cdfEnabled": true,
        "schemaPath": "schemas/customer_address.json"
    },
    "mode": "stream",
    "targetFormat": "delta",
    "targetDetails": {
        "table": "customer_address",
        "tableProperties": {
            "delta.autoOptimize.optimizeWrite": "true",
            "delta.autoOptimize.autoCompact": "true"
        },
        "partitionColumns": ["country_code"],
        "schemaPath": "schemas/customer_address.json"
    },
    "dataQualityExpectationsEnabled": true,
    "quarantineMode": "table",
    "quarantineTargetDetails": {
        "targetFormat": "delta",
        "table": "customer_address_quarantine",
        "tableProperties": {}
    },
    "cdcSettings": {
        "keys": ["address_id"],
        "sequence_by": "updated_timestamp",
        "scd_type": "2",
        "where": "",
        "ignore_null_updates": true,
        "except_column_list": ["updated_timestamp"],
        "apply_as_deletes": "DELETE_FLAG = True"
    }
}

The above dataflow spec sample contains the following core components:

  • Dataflow metadata configuration

  • Source configuration

  • Target configuration

  • Data quality and quarantine settings

  • CDC (SCD2) configuration

The following sections detail each of the above components.

Dataflow Metadata Configuration

These properties define the basic identity and type of the dataflow:

Field

Type

Description

dataFlowId

string

A unique identifier for the data flow.

dataFlowGroup

string

The group to which the data flow belongs, can be the same as dataFlowId if there is no group.

dataFlowType

string

The type of data flow. It can be either flow or standard. Supported: [“flow”, “standard”]

Source Configuration

These properties define the source of the data:

Field

Type

Description

sourceSystem (optional)

string

The source system name. Value is not used to determine or change any behaviour, required if dataFlowType is standard.

sourceType

string

The type of source, required if dataFlowType is standard. Supported: cloudFiles, delta, deltaJoin, kafka

sourceViewName

string

The name to assign the source view, required if dataFlowType is standard. String Pattern: v_([A-Za-z0-9_]+)

sourceDetails

object

See Data Flow Spec - Source Details for more information.

Target Configuration

These properties define where and how the data will be written:

Field

Type

Description

mode

string

The mode of the data flow. Supported: ["stream", "batch"]

targetFormat

string

The format of the target data. If the format is delta, additional targetDetails must be provided.

targetDetails

object

See Target Details Reference.

Change Data Capture (CDC) Configuration

The cdcSettings and cdcSnapshotSettings enable and pass configuration info to the CDC API’s.

Field

Type

Description

cdcSettings

object

See cdcSettings for more information.

cdcSnapshotSettings

object

See cdcSnapshotSettings for more information.

cdcSettings

The cdcSettings object contains the following properties:

Parameter

Type

Description

keys

list

The column or combination of columns that uniquely identify a row in the source data. This is used to identify which CDC events apply to specific records in the target table.

sequence_by

str

The column name specifying the logical order of CDC events in the source data. Delta Live Tables uses this sequencing to handle change events that arrive out of order.

scd_type

string

Whether to store records as SCD type 1 or SCD type 2. Set to 1 for SCD type 1 or 2 for SCD type 2.

apply_as_deletes

string

(optional) Specifies when a CDC event should be treated as a DELETE rather than an upsert.

where

string

(optional) Filter the rows by a condition.

ignore_null_updates

boolean

(optional) Allow ingesting updates containing a subset of the target columns. When a CDC event matches an existing row and ignore_null_updates is True, columns with a null retain their existing values in the target. This also applies to nested columns with a value of null. When ignore_null_updates is False, existing values are overwritten with null values.

except_column_list

list

(optional) A list of columns to exclude from the upsert into the target table.

track_history_column_list
track_history_except_column_list

list

A subset of output columns to be tracked for history in the target table. Use track_history_column_list to specify the complete list of columns to be tracked. Use track_history_except_column_list to specify the columns to be excluded from tracking.

cdcSnapshotSettings

The cdcSnapshotSettings object contains the following properties:

Parameter

Type

Description

keys

list

The column or combination of columns that uniquely identify a row in the source data. This is used to identify which CDC events apply to specific records in the target table.

snapshotType

str

The type of snapshot to process. Set to periodic for periodic snapshots or historical for historical snapshots (refer to CDC Historical Snapshot Source Configuration for which type to use). Note that historical snapshot types are not supported in flow data flow types.

scd_type

string

Whether to store records as SCD type 1 or SCD type 2. Set to 1 for SCD type 1 or 2 for SCD type 2.

sourceType

string

The type of source to ingest the snapshots from. Set to file for file based sources.

source

object

The source to ingest the snapshots from. This is required for historical snapshot types. See CDC Historical Snapshot Source Configuration for more information.

track_history_column_list

list

(optional) A subset of output columns to be tracked for history in the target table. Use this to specify the complete list of columns to be tracked. This cannot be used in conjunction with track_history_except_column_list.

track_history_except_column_list

list

(optional) A subset of output columns to be excluded from history tracking in the target table. Use this to specify which columns should not be tracked. This cannot be used in conjunction with track_history_column_list.

CDC Historical Snapshot Source Configuration

The source object contains the following properties for file based sources:

Parameter

Type

Description

format

string

The format of the source data. E.g. supported formats are table, parquet, csv, json. All formats supported by spark see PySpark Data Sources API.

path

string

The location to load the source data from. This can be a table name or a path to a a file or directory with multiple snapshots. A placeholder {version} can be used in this path which will be substituted with the version value in run time.

versionType

string

The type of versioning to use. Can be either int or datetime.

datetimeFormat

string

(conditional) Required if versionType is datetime. The format of startingVersion datetime value.

microSecondMaskLength

integer

(optional) WARNING: Edge Cases Only! - Specify this if your versionType is datetime and your filename includes microsends, but not the full 6 digits. The number of microsecond digits to included at the end of the datetime value. - The default value is 6.

startingVersion

string or integer

(optional) The version to start processing from.

readerOptions

object

(optional) Additional options to pass to the reader.

schemaPath

string

(optional) The schema path to use for the source data.

selectExp

list

(optional) A list of select expressions to apply to the source data.

filter

string

(optional) A filter expression to apply to the source data. This filter is applied to the dataframe as a WHERE clause when the source is read. A placeholder {version} can be used in this filter expression which will be substituted with the version value in run time.

recursiveFileLookup

boolean

(optional) When set to true, enables recursive directory traversal to find snapshot files. This should be used when snapshots are stored in a nested directory structure such as Hive-style partitioning (e.g., /data/{version}/file.parquet). When set to false (default), only files in the immediate directory are searched. Default: false.

Note

If recursiveFileLookup is set to true, ensure that the path parameter is specified in a way that is compatible with recursive directory traversal. I.e. the {version} placeholder is used in the path and not the filename.

The source object contains the following properties for table based sources:

Parameter

Type

Description

table

string

The table name to load the source data from.

versionColumn

string

The column name to use for versioning.

startingVersion

string or integer

(optional) The version to start processing from.

selectExp

list

(optional) A list of select expressions to apply to the source data.

Data Quality and Quarantine Configuration

These properties control how data quality issues are handled:

Field

Type

Description

dataQualityExpectationsEnabled (optional)

boolean

A flag indicating whether data quality expectations are enabled (see Data Quality - Quarantine).

dataQualityExpectationsPath (optional)

string

Either a relative path or filename for the expectations file. Note that the framework automatically calculates all relative paths from the appropriate expectations sub-folder, in the Pipeline Bundle. Examples:

  • All expectations files in the expectations sub-folder: . or *

  • A specific expectations file: my_table_dqe.json

quarantineMode (optional)

string

The mode for handling quarantined data. It can be off, flag, or table. Supported: [“off”, “flag”, “table”]

quarantineTargetDetails (optional)

object

Details about the quarantine target, only required if quarantineMode is set to table. See quarantineTargetDetails section below.

quarantineTargetDetails

The quarantineTargetDetails object contains the following properties:

Parameter

Type

Description

targetFormat

string

The format of the quarantine target. Currently, only delta is supported.

Supported: ["delta"]
Default: "delta"

table

string

(conditional) The table name, required if targetFormat is delta.

tableProperties

object

(conditional) Additional properties for the table, required if targetFormat is delta.

path

string

(conditional) The path to the table, required if targetFormat is delta.

Table Migration Configuration

These properties control table migration:

Field

Type

Description

tableMigrationDetails (optional)

object

Details about table migration, only required if a table migration is needed. See Table Migration Details section below.

tableMigrationDetails

The tableMigrationDetails object contains the following properties:

Property

Type

Description

enabled

boolean

A flag indicating whether table migration is enabled.

catalogType

string

The type of catalog, either hms or uc. Supported values: [“hms”, “uc”]

autoStartingVersionsEnabled (optional)

boolean

Flag to enable automatic starting version management. When enabled, the system automatically tracks source table versions and manages starting versions for views. Defaults to true.

sourceDetails

object

Details about the source for migration. See sourceDetails.

sourceDetails

The sourceDetails object can potentially cater to different types of sources but is currently limited to the following:

  • sourceMigrateDelta

sourceMigrateDelta

The sourceMigrateDelta object contains the following properties:

Field

Type

Description

database

string

The database name.

table

string

The table name.

selectExp (optional)

array (items: string)

An array of select expressions.

whereClause (optional)

array (items: string)

An array of where clauses.

exceptColumns (optional)

array (items: string)

An array of columns to exclude.