Creating a Flows Data Flow Spec Reference

A standard Data Flow Spec is the most basic type of Data Flow Spec and is suited to basic use cases where you are performing 1:1 ingestion or loads. It is particularly suited to Bronze Ingestion Use Cases.

Example:

The below sample demonstrates a flows Data Flow Spec for a Silver multi-source streaming use case (refer to Multi-Source Streaming for more information):

{
    "dataFlowId": "etp5stg",
    "dataFlowGroup": "etp5",
    "dataFlowType": "flow",
    "targetFormat": "delta",
    "targetDetails": {
        "table": "staging_table_mrg_p5",
        "schemaPath": "",
        "tableProperties": {
            "delta.enableChangeDataFeed": "true"
        },
        "partitionColumns": []
    },
    "cdcSettings": {
        "keys": [
            "CONTRACT_ID"
        ],
        "sequence_by": "EXTRACT_DTTM",
        "where": "",
        "ignore_null_updates": true,
        "except_column_list": [
            "__START_AT",
            "__END_AT"
        ],
        "scd_type": "2",
        "track_history_column_list": [],
        "track_history_except_column_list": []
    },
    "dataQualityExpectationsEnabled": false,
    "quarantineMode": "off",
    "quarantineTargetDetails": {},
    "flowGroups": [
        {
            "flowGroupId": "et1",
            "stagingTables": {
                "staging_table_apnd_p5": {
                    "type": "ST",
                    "schemaPath": ""
                }
            },
            "flows": {
                "f_contract": {
                    "flowType": "append_view",
                    "flowDetails": {
                        "targetTable": "staging_table_apnd_p5",
                        "sourceView": "v_brz_contract"
                    },
                    "views": {
                        "v_brz_contract": {
                            "mode": "stream",
                            "sourceType": "delta",
                            "sourceDetails": {
                                "database": "main.bronze_test_4",
                                "table": "contract",
                                "cdfEnabled": true,
                                "selectExp": [
                                    "*"
                                ],
                                "whereClause": []
                            }
                        }
                    }
                },
                "f_loan": {
                    "flowType": "append_view",
                    "flowDetails": {
                        "targetTable": "staging_table_apnd_p5",
                        "sourceView": "v_brz_loan"
                    },
                    "views": {
                        "v_brz_loan": {
                            "mode": "stream",
                            "sourceType": "delta",
                            "sourceDetails": {
                                "database": "main.bronze_test_4",
                                "table": "loan",
                                "cdfEnabled": true,
                                "selectExp": [
                                    "*"
                                ],
                                "whereClause": []
                            }
                        }
                    }
                },
                "f_merge": {
                    "flowType": "merge",
                    "flowDetails": {
                        "targetTable": "staging_table_mrg_p5",
                        "sourceView": "staging_table_apnd_p5"
                    }
                }
            }
        }
    ]
}

The above dataflow spec sample contains the following core components:

  • Dataflow metadata configuration

  • Target configuration

  • Data quality and quarantine settings

  • Flow group configuration

The following sections detail each of the above components.

Dataflow Metadata Configuration

These properties define the basic identity and type of the dataflow:

Field

Type

Description

dataFlowId

string

A unique identifier for the data flow.

dataFlowGroup

string

The group to which the data flow belongs, can be the same as dataFlowId if there is no group.

dataFlowType

string

The type of data flow. It can be either flow or standard. Supported: flow, standard

Target Configuration

These properties define where and how the data will be written:

Field

Type

Description

mode

string

The mode of the data flow. Supported: stream, batch

targetFormat

string

The format of the target data. If the format is delta, additional targetDetails must be provided.

targetDetails

object

See Target Details Reference.

Flow Group Configuration

The flowGroupDetails object contains the following properties:

Property

Type

Description

dataFlowID (optional)

string

A unique identifier for the data flow. Only required when dataflow specs are split (see Splitting Flows Data Flow Spec into main and flow files).

flowGroupId

string

A unique identifier for the flow group.

stagingTables (optional)

object

An object containing named objects representing staging tables in the flow group. The key for each nested object in this object will become the table names for the staging tables.

flows

array

An array of flows in the flow group. Items: flow-configuration

Staging Table Configuration

The stagingTableDetails object contains the following properties:

Property

Type

Description

type

string

The type of the staging table can be either a Streaming Table or Materialized View. Supported: ST, MV

schemaPath (optional)

string

The schema path of the staging table.

partitionColumns (optional)

array

An array of partition columns for the staging table. Items: string

cdcSettings (optional)

object

Change data capture (CDC) settings. Object: Change Data Capture (CDC) Configuration

Recommendation

It is recommended that you avoid specifying a schema path for staging tables, in order to reduce maintenance overhead and to take advantage of schema evolution.

Flow Configuration

A flow object contains the following properties:

Property

Type

Description

enabled

boolean

A flag indicating whether the flow is enabled.

flowType

string

The type of the flow. Supported: append_view, append_sql, merge

flowDetails

object

Details about the flow, required based on flowType. Properties vary based on flowType. See Flow Details.

views (optional)

object

An object containing views used in the flow. The key for each nested object in this object will become the view names.

Flow Details

The flowDetails object contains the following properties:

Flow Type

Property

Type

Description

append_sql

targetTable

string

The target table for the SQL append flow.

sqlPath

string

The path to the SQL file for the append flow.

append_view

targetTable

string

The target table for the view append flow.

sourceView

string

The source view for the append flow.

column_prefix (optional)

string

The prefix for columns in the target table.

column_prefix_exceptions (optional)

array

An array of columns that are exceptions to the prefix rule.

merge

targetTable

string

The target table for the merge flow.

sourceView

string

The source view for the merge flow.

View Configuration

The viewDetails object contains the following properties:

Property

Type

Description

mode

string

The mode of the view, either batch or stream. Supported: batch, stream

sourceType

string

The type of the source. Supported: cloudFiles, delta, deltaJoin, sql

columnsToUpdate (optional)

array

An array of columns to update. Items: string

sourceDetails (conditional)

object

See Data Flow Spec - Source Details.

Change Data Capture (CDC) Configuration

The cdcSettings and cdcSnapshotSettings enable and pass configuration info to the CDC API’s.

Field

Type

Description

cdcSettings

object

See cdcSettings for more information.

cdcSnapshotSettings

object

See cdcSnapshotSettings for more information.

cdcSettings

The cdcSettings object contains the following properties:

Parameter

Type

Description

keys

list

The column or combination of columns that uniquely identify a row in the source data. This is used to identify which CDC events apply to specific records in the target table.

sequence_by

str

The column name specifying the logical order of CDC events in the source data. Delta Live Tables uses this sequencing to handle change events that arrive out of order.

scd_type

string

Whether to store records as SCD type 1 or SCD type 2. Set to 1 for SCD type 1 or 2 for SCD type 2.

apply_as_deletes

string

(optional) Specifies when a CDC event should be treated as a DELETE rather than an upsert.

where

string

(optional) Filter the rows by a condition.

ignore_null_updates

boolean

(optional) Allow ingesting updates containing a subset of the target columns. When a CDC event matches an existing row and ignore_null_updates is True, columns with a null retain their existing values in the target. This also applies to nested columns with a value of null. When ignore_null_updates is False, existing values are overwritten with null values.

except_column_list

list

(optional) A list of columns to exclude from the upsert into the target table.

track_history_column_list
track_history_except_column_list

list

A subset of output columns to be tracked for history in the target table. Use track_history_column_list to specify the complete list of columns to be tracked. Use track_history_except_column_list to specify the columns to be excluded from tracking.

cdcSnapshotSettings

The cdcSnapshotSettings object contains the following properties:

Parameter

Type

Description

keys

list

The column or combination of columns that uniquely identify a row in the source data. This is used to identify which CDC events apply to specific records in the target table.

snapshotType

str

The type of snapshot to process. Set to periodic for periodic snapshots or historical for historical snapshots (refer to CDC Historical Snapshot Source Configuration for which type to use). Note that historical snapshot types are not supported in flow data flow types.

scd_type

string

Whether to store records as SCD type 1 or SCD type 2. Set to 1 for SCD type 1 or 2 for SCD type 2.

sourceType

string

The type of source to ingest the snapshots from. Set to file for file based sources.

source

object

The source to ingest the snapshots from. This is required for historical snapshot types. See CDC Historical Snapshot Source Configuration for more information.

track_history_column_list

list

(optional) A subset of output columns to be tracked for history in the target table. Use this to specify the complete list of columns to be tracked. This cannot be used in conjunction with track_history_except_column_list.

track_history_except_column_list

list

(optional) A subset of output columns to be excluded from history tracking in the target table. Use this to specify which columns should not be tracked. This cannot be used in conjunction with track_history_column_list.

CDC Historical Snapshot Source Configuration

The source object contains the following properties for file based sources:

Parameter

Type

Description

format

string

The format of the source data. E.g. supported formats are table, parquet, csv, json. All formats supported by spark see PySpark Data Sources API.

path

string

The location to load the source data from. This can be a table name or a path to a a file or directory with multiple snapshots. A placeholder {version} can be used in this path which will be substituted with the version value in run time.

versionType

string

The type of versioning to use. Can be either int or datetime.

datetimeFormat

string

(conditional) Required if versionType is datetime. The format of startingVersion datetime value.

microSecondMaskLength

integer

(optional) WARNING: Edge Cases Only! - Specify this if your versionType is datetime and your filename includes microsends, but not the full 6 digits. The number of microsecond digits to included at the end of the datetime value. - The default value is 6.

startingVersion

string or integer

(optional) The version to start processing from.

readerOptions

object

(optional) Additional options to pass to the reader.

schemaPath

string

(optional) The schema path to use for the source data.

selectExp

list

(optional) A list of select expressions to apply to the source data.

filter

string

(optional) A filter expression to apply to the source data. This filter is applied to the dataframe as a WHERE clause when the source is read. A placeholder {version} can be used in this filter expression which will be substituted with the version value in run time.

recursiveFileLookup

boolean

(optional) When set to true, enables recursive directory traversal to find snapshot files. This should be used when snapshots are stored in a nested directory structure such as Hive-style partitioning (e.g., /data/{version}/file.parquet). When set to false (default), only files in the immediate directory are searched. Default: false.

Note

If recursiveFileLookup is set to true, ensure that the path parameter is specified in a way that is compatible with recursive directory traversal. I.e. the {version} placeholder is used in the path and not the filename.

The source object contains the following properties for table based sources:

Parameter

Type

Description

table

string

The table name to load the source data from.

versionColumn

string

The column name to use for versioning.

startingVersion

string or integer

(optional) The version to start processing from.

selectExp

list

(optional) A list of select expressions to apply to the source data.

Data Quality and Quarantine Configuration

These properties control how data quality issues are handled:

Field

Type

Description

dataQualityExpectationsEnabled (optional)

boolean

A flag indicating whether data quality expectations are enabled (see Data Quality - Quarantine).

dataQualityExpectationsPath (optional)

string

Either a relative path or filename for the expectations file. Note that the framework automatically calculates all relative paths from the appropriate expectations sub-folder, in the Pipeline Bundle. Examples:

  • All expectations files in the expectations sub-folder: . or *

  • A specific expectations file: my_table_dqe.json

quarantineMode (optional)

string

The mode for handling quarantined data. It can be off, flag, or table. Supported: [“off”, “flag”, “table”]

quarantineTargetDetails (optional)

object

Details about the quarantine target, only required if quarantineMode is set to table. See quarantineTargetDetails section below.

quarantineTargetDetails

The quarantineTargetDetails object contains the following properties:

Parameter

Type

Description

targetFormat

string

The format of the quarantine target. Currently, only delta is supported.

Supported: ["delta"]
Default: "delta"

table

string

(conditional) The table name, required if targetFormat is delta.

tableProperties

object

(conditional) Additional properties for the table, required if targetFormat is delta.

path

string

(conditional) The path to the table, required if targetFormat is delta.

Table Migration Configuration

These properties control table migration:

Field

Type

Description

tableMigrationDetails (optional)

object

Details about table migration, only required if a table migration is needed. See Table Migration Details section below.

tableMigrationDetails

The tableMigrationDetails object contains the following properties:

Property

Type

Description

enabled

boolean

A flag indicating whether table migration is enabled.

catalogType

string

The type of catalog, either hms or uc. Supported values: [“hms”, “uc”]

autoStartingVersionsEnabled (optional)

boolean

Flag to enable automatic starting version management. When enabled, the system automatically tracks source table versions and manages starting versions for views. Defaults to true.

sourceDetails

object

Details about the source for migration. See sourceDetails.

sourceDetails

The sourceDetails object can potentially cater to different types of sources but is currently limited to the following:

  • sourceMigrateDelta

sourceMigrateDelta

The sourceMigrateDelta object contains the following properties:

Field

Type

Description

database

string

The database name.

table

string

The table name.

selectExp (optional)

array (items: string)

An array of select expressions.

whereClause (optional)

array (items: string)

An array of where clauses.

exceptColumns (optional)

array (items: string)

An array of columns to exclude.