Creating a Flows Data Flow Spec Reference
A standard Data Flow Spec is the most basic type of Data Flow Spec and is suited to basic use cases where you are performing 1:1 ingestion or loads. It is particularly suited to Bronze Ingestion Use Cases.
Example:
The below sample demonstrates a flows Data Flow Spec for a Silver multi-source streaming use case (refer to Multi-Source Streaming for more information):
{
"dataFlowId": "etp5stg",
"dataFlowGroup": "etp5",
"dataFlowType": "flow",
"targetFormat": "delta",
"targetDetails": {
"table": "staging_table_mrg_p5",
"schemaPath": "",
"tableProperties": {
"delta.enableChangeDataFeed": "true"
},
"partitionColumns": []
},
"cdcSettings": {
"keys": [
"CONTRACT_ID"
],
"sequence_by": "EXTRACT_DTTM",
"where": "",
"ignore_null_updates": true,
"except_column_list": [
"__START_AT",
"__END_AT"
],
"scd_type": "2",
"track_history_column_list": [],
"track_history_except_column_list": []
},
"dataQualityExpectationsEnabled": false,
"quarantineMode": "off",
"quarantineTargetDetails": {},
"flowGroups": [
{
"flowGroupId": "et1",
"stagingTables": {
"staging_table_apnd_p5": {
"type": "ST",
"schemaPath": ""
}
},
"flows": {
"f_contract": {
"flowType": "append_view",
"flowDetails": {
"targetTable": "staging_table_apnd_p5",
"sourceView": "v_brz_contract"
},
"views": {
"v_brz_contract": {
"mode": "stream",
"sourceType": "delta",
"sourceDetails": {
"database": "main.bronze_test_4",
"table": "contract",
"cdfEnabled": true,
"selectExp": [
"*"
],
"whereClause": []
}
}
}
},
"f_loan": {
"flowType": "append_view",
"flowDetails": {
"targetTable": "staging_table_apnd_p5",
"sourceView": "v_brz_loan"
},
"views": {
"v_brz_loan": {
"mode": "stream",
"sourceType": "delta",
"sourceDetails": {
"database": "main.bronze_test_4",
"table": "loan",
"cdfEnabled": true,
"selectExp": [
"*"
],
"whereClause": []
}
}
}
},
"f_merge": {
"flowType": "merge",
"flowDetails": {
"targetTable": "staging_table_mrg_p5",
"sourceView": "staging_table_apnd_p5"
}
}
}
}
]
}
dataFlowId: etp5stg
dataFlowGroup: etp5
dataFlowType: flow
targetFormat: delta
targetDetails:
table: staging_table_mrg_p5
schemaPath: ''
tableProperties:
delta.enableChangeDataFeed: 'true'
partitionColumns: []
cdcSettings:
keys:
- CONTRACT_ID
sequence_by: EXTRACT_DTTM
where: ''
ignore_null_updates: true
except_column_list:
- __START_AT
- __END_AT
scd_type: '2'
track_history_column_list: []
track_history_except_column_list: []
dataQualityExpectationsEnabled: false
quarantineMode: 'off'
quarantineTargetDetails: {}
flowGroups:
- flowGroupId: et1
stagingTables:
staging_table_apnd_p5:
type: ST
schemaPath: ''
flows:
f_contract:
flowType: append_view
flowDetails:
targetTable: staging_table_apnd_p5
sourceView: v_brz_contract
views:
v_brz_contract:
mode: stream
sourceType: delta
sourceDetails:
database: main.bronze_test_4
table: contract
cdfEnabled: true
selectExp:
- '*'
whereClause: []
f_loan:
flowType: append_view
flowDetails:
targetTable: staging_table_apnd_p5
sourceView: v_brz_loan
views:
v_brz_loan:
mode: stream
sourceType: delta
sourceDetails:
database: main.bronze_test_4
table: loan
cdfEnabled: true
selectExp:
- '*'
whereClause: []
f_merge:
flowType: merge
flowDetails:
targetTable: staging_table_mrg_p5
sourceView: staging_table_apnd_p5
The above dataflow spec sample contains the following core components:
Dataflow metadata configuration
Target configuration
Data quality and quarantine settings
Flow group configuration
The following sections detail each of the above components.
Dataflow Metadata Configuration
These properties define the basic identity and type of the dataflow:
Field |
Type |
Description |
|---|---|---|
dataFlowId |
|
A unique identifier for the data flow. |
dataFlowGroup |
|
The group to which the data flow belongs, can be the same as dataFlowId if there is no group. |
dataFlowType |
|
The type of data flow. It can be either flow or standard.
Supported: |
Target Configuration
These properties define where and how the data will be written:
Field |
Type |
Description |
|---|---|---|
mode |
|
The mode of the data flow.
Supported: |
targetFormat |
|
The format of the target data. If the format is delta, additional targetDetails must be provided. |
targetDetails |
|
Flow Group Configuration
The flowGroupDetails object contains the following properties:
Property |
Type |
Description |
|---|---|---|
dataFlowID (optional) |
|
A unique identifier for the data flow. Only required when dataflow specs are split (see Splitting Flows Data Flow Spec into main and flow files). |
flowGroupId |
|
A unique identifier for the flow group. |
stagingTables (optional) |
|
An object containing named objects representing staging tables in the flow group. The key for each nested object in this object will become the table names for the staging tables. |
flows |
|
An array of flows in the flow group. Items: flow-configuration |
Staging Table Configuration
The stagingTableDetails object contains the following properties:
Property |
Type |
Description |
|---|---|---|
type |
|
The type of the staging table can be either a Streaming Table or Materialized View. Supported: |
schemaPath (optional) |
|
The schema path of the staging table. |
partitionColumns (optional) |
|
An array of partition columns for the staging table. Items: |
cdcSettings (optional) |
|
Change data capture (CDC) settings. Object: Change Data Capture (CDC) Configuration |
Recommendation
It is recommended that you avoid specifying a schema path for staging tables, in order to reduce maintenance overhead and to take advantage of schema evolution.
Flow Configuration
A flow object contains the following properties:
Property |
Type |
Description |
|---|---|---|
enabled |
|
A flag indicating whether the flow is enabled. |
flowType |
|
The type of the flow.
Supported: |
flowDetails |
|
Details about the flow, required based on flowType. Properties vary based on flowType. See Flow Details. |
views (optional) |
|
An object containing views used in the flow. The key for each nested object in this object will become the view names. |
Flow Details
The flowDetails object contains the following properties:
Flow Type |
Property |
Type |
Description |
|---|---|---|---|
append_sql |
targetTable |
|
The target table for the SQL append flow. |
sqlPath |
|
The path to the SQL file for the append flow. |
|
append_view |
targetTable |
|
The target table for the view append flow. |
sourceView |
|
The source view for the append flow. |
|
column_prefix (optional) |
|
The prefix for columns in the target table. |
|
column_prefix_exceptions (optional) |
|
An array of columns that are exceptions to the prefix rule. |
|
merge |
targetTable |
|
The target table for the merge flow. |
sourceView |
|
The source view for the merge flow. |
View Configuration
The viewDetails object contains the following properties:
Property |
Type |
Description |
|---|---|---|
mode |
|
The mode of the view, either batch or stream.
Supported: |
sourceType |
|
The type of the source.
Supported: |
columnsToUpdate (optional) |
|
An array of columns to update.
Items: |
sourceDetails (conditional) |
|
Change Data Capture (CDC) Configuration
The cdcSettings and cdcSnapshotSettings enable and pass configuration info to the CDC API’s.
Field |
Type |
Description |
|---|---|---|
cdcSettings |
|
See cdcSettings for more information. |
cdcSnapshotSettings |
|
See cdcSnapshotSettings for more information. |
cdcSettings
The cdcSettings object contains the following properties:
Parameter |
Type |
Description |
|---|---|---|
keys |
|
The column or combination of columns that uniquely identify a row in the source data. This is used to identify which CDC events apply to specific records in the target table. |
sequence_by |
str |
The column name specifying the logical order of CDC events in the source data. Delta Live Tables uses this sequencing to handle change events that arrive out of order. |
scd_type |
|
Whether to store records as SCD type 1 or SCD type 2. Set to |
apply_as_deletes |
|
(optional) Specifies when a CDC event should be treated as a DELETE rather than an upsert. |
where |
|
(optional) Filter the rows by a condition. |
ignore_null_updates |
|
(optional) Allow ingesting updates containing a subset of the target columns. When a CDC event matches an existing row and ignore_null_updates is True, columns with a null retain their existing values in the target. This also applies to nested columns with a value of null. When ignore_null_updates is False, existing values are overwritten with null values. |
except_column_list |
|
(optional) A list of columns to exclude from the upsert into the target table. |
track_history_column_list
track_history_except_column_list
|
|
A subset of output columns to be tracked for history in the target table. Use track_history_column_list to specify the complete list of columns to be tracked. Use track_history_except_column_list to specify the columns to be excluded from tracking. |
cdcSnapshotSettings
The cdcSnapshotSettings object contains the following properties:
Parameter |
Type |
Description |
|---|---|---|
keys |
|
The column or combination of columns that uniquely identify a row in the source data. This is used to identify which CDC events apply to specific records in the target table. |
snapshotType |
str |
The type of snapshot to process. Set to |
scd_type |
|
Whether to store records as SCD type 1 or SCD type 2. Set to |
sourceType |
|
The type of source to ingest the snapshots from. Set to |
source |
|
The source to ingest the snapshots from. This is required for |
track_history_column_list |
|
(optional) A subset of output columns to be tracked for history in the target table. Use this to specify the complete list of columns to be tracked. This cannot be used in conjunction with |
track_history_except_column_list |
|
(optional) A subset of output columns to be excluded from history tracking in the target table. Use this to specify which columns should not be tracked. This cannot be used in conjunction with |
CDC Historical Snapshot Source Configuration
The
sourceobject contains the following properties forfilebased sources:
Parameter
Type
Description
format
stringThe format of the source data. E.g. supported formats are
table,parquet,csv,json. All formats supported by spark see PySpark Data Sources API.path
stringThe location to load the source data from. This can be a table name or a path to a a file or directory with multiple snapshots. A placeholder
{version}can be used in this path which will be substituted with the version value in run time.versionType
stringThe type of versioning to use. Can be either
intordatetime.datetimeFormat
string(conditional) Required if
versionTypeisdatetime. The format ofstartingVersiondatetime value.microSecondMaskLength
integer(optional) WARNING: Edge Cases Only! - Specify this if your
versionTypeisdatetimeand your filename includes microsends, but not the full 6 digits. The number of microsecond digits to included at the end of the datetime value. - The default value is 6.startingVersion
stringorinteger(optional) The version to start processing from.
readerOptions
object(optional) Additional options to pass to the reader.
schemaPath
string(optional) The schema path to use for the source data.
selectExp
list(optional) A list of select expressions to apply to the source data.
filter
string(optional) A filter expression to apply to the source data. This filter is applied to the dataframe as a WHERE clause when the source is read. A placeholder
{version}can be used in this filter expression which will be substituted with the version value in run time.recursiveFileLookup
boolean(optional) When set to
true, enables recursive directory traversal to find snapshot files. This should be used when snapshots are stored in a nested directory structure such as Hive-style partitioning (e.g.,/data/{version}/file.parquet). When set tofalse(default), only files in the immediate directory are searched. Default:false.Note
If
recursiveFileLookupis set totrue, ensure that thepathparameter is specified in a way that is compatible with recursive directory traversal. I.e. the{version}placeholder is used in the path and not the filename.The
sourceobject contains the following properties fortablebased sources:
Parameter
Type
Description
table
stringThe table name to load the source data from.
versionColumn
stringThe column name to use for versioning.
startingVersion
stringorinteger(optional) The version to start processing from.
selectExp
list(optional) A list of select expressions to apply to the source data.
Data Quality and Quarantine Configuration
These properties control how data quality issues are handled:
Field |
Type |
Description |
|---|---|---|
dataQualityExpectationsEnabled (optional) |
|
A flag indicating whether data quality expectations are enabled (see Data Quality - Quarantine). |
dataQualityExpectationsPath (optional) |
|
Either a relative path or filename for the expectations file. Note that the framework automatically calculates all relative paths from the appropriate expectations sub-folder, in the Pipeline Bundle. Examples:
|
quarantineMode (optional) |
|
The mode for handling quarantined data. It can be off, flag, or table. Supported: [“off”, “flag”, “table”] |
quarantineTargetDetails (optional) |
|
Details about the quarantine target, only required if |
quarantineTargetDetails
The quarantineTargetDetails object contains the following properties:
Parameter |
Type |
Description |
|---|---|---|
targetFormat |
|
The format of the quarantine target. Currently, only Supported:
["delta"]Default:
"delta" |
table |
|
(conditional) The table name, required if |
tableProperties |
|
(conditional) Additional properties for the table, required if |
path |
|
(conditional) The path to the table, required if |
Table Migration Configuration
These properties control table migration:
Field |
Type |
Description |
|---|---|---|
tableMigrationDetails (optional) |
|
Details about table migration, only required if a table migration is needed. See Table Migration Details section below. |
tableMigrationDetails
The tableMigrationDetails object contains the following properties:
Property |
Type |
Description |
|---|---|---|
enabled |
|
A flag indicating whether table migration is enabled. |
catalogType |
|
The type of catalog, either hms or uc. Supported values: [“hms”, “uc”] |
autoStartingVersionsEnabled (optional) |
|
Flag to enable automatic starting version management. When enabled, the system automatically tracks source table versions and manages starting versions for views. Defaults to |
sourceDetails |
|
Details about the source for migration. See sourceDetails. |
sourceDetails
The sourceDetails object can potentially cater to different types of sources but is currently limited to the following:
sourceMigrateDelta
sourceMigrateDelta
The sourceMigrateDelta object contains the following properties:
Field |
Type |
Description |
|---|---|---|
database |
|
The database name. |
table |
|
The table name. |
selectExp (optional) |
|
An array of select expressions. |
whereClause (optional) |
|
An array of where clauses. |
exceptColumns (optional) |
|
An array of columns to exclude. |