Creating a Standard Data Flow Spec Reference
A standard Data Flow Spec is the most basic type of Data Flow Spec and is suited to basic use cases where you are performing 1:1 ingestion or loads. It is particularly suited to Bronze Ingestion Use Cases.
Example:
The below demonstrates a standard Data Flow Spec for a Bronze ingestion use case (refer to patterns_streaming_basic_1_to_1 for more information):
{
"dataFlowId": "crm_1",
"dataFlowGroup": "crm",
"dataFlowType": "standard",
"sourceType": "delta",
"sourceSystem": "crm",
"sourceViewName": "v_customer_address",
"sourceDetails": {
"database": "source_db",
"table": "customer_address",
"cdfEnabled": true,
"schemaPath": "schemas/customer_address.json"
},
"mode": "stream",
"targetFormat": "delta",
"targetDetails": {
"table": "customer_address",
"tableProperties": {
"delta.autoOptimize.optimizeWrite": "true",
"delta.autoOptimize.autoCompact": "true"
},
"partitionColumns": ["country_code"],
"schemaPath": "schemas/customer_address.json"
},
"dataQualityExpectationsEnabled": true,
"quarantineMode": "table",
"quarantineTargetDetails": {
"targetFormat": "delta",
"table": "customer_address_quarantine",
"tableProperties": {}
},
"cdcSettings": {
"keys": ["address_id"],
"sequence_by": "updated_timestamp",
"scd_type": "2",
"where": "",
"ignore_null_updates": true,
"except_column_list": ["updated_timestamp"],
"apply_as_deletes": "DELETE_FLAG = True"
}
}
dataFlowId: crm_1
dataFlowGroup: crm
dataFlowType: standard
sourceType: delta
sourceSystem: crm
sourceViewName: v_customer_address
sourceDetails:
database: source_db
table: customer_address
cdfEnabled: true
schemaPath: schemas/customer_address.json
mode: stream
targetFormat: delta
targetDetails:
table: customer_address
tableProperties:
delta.autoOptimize.optimizeWrite: 'true'
delta.autoOptimize.autoCompact: 'true'
partitionColumns:
- country_code
schemaPath: schemas/customer_address.json
dataQualityExpectationsEnabled: true
quarantineMode: table
quarantineTargetDetails:
targetFormat: delta
table: customer_address_quarantine
tableProperties: {}
cdcSettings:
keys:
- address_id
sequence_by: updated_timestamp
scd_type: '2'
where: ''
ignore_null_updates: true
except_column_list:
- updated_timestamp
apply_as_deletes: DELETE_FLAG = True
The above dataflow spec sample contains the following core components:
Dataflow metadata configuration
Source configuration
Target configuration
Data quality and quarantine settings
CDC (SCD2) configuration
The following sections detail each of the above components.
Dataflow Metadata Configuration
These properties define the basic identity and type of the dataflow:
Field |
Type |
Description |
|---|---|---|
dataFlowId |
|
A unique identifier for the data flow. |
dataFlowGroup |
|
The group to which the data flow belongs, can be the same as dataFlowId if there is no group. |
dataFlowType |
|
The type of data flow. It can be either flow or standard. Supported: [“flow”, “standard”] |
Source Configuration
These properties define the source of the data:
Field |
Type |
Description |
|---|---|---|
sourceSystem (optional) |
|
The source system name. Value is not used to determine or change any behaviour, required if dataFlowType is standard. |
sourceType |
|
The type of source, required if dataFlowType is standard.
Supported: |
sourceViewName |
|
The name to assign the source view, required if dataFlowType is standard. String Pattern: v_([A-Za-z0-9_]+) |
sourceDetails |
|
See Data Flow Spec - Source Details for more information. |
Target Configuration
These properties define where and how the data will be written:
Field |
Type |
Description |
|---|---|---|
mode |
|
The mode of the data flow.
Supported: |
targetFormat |
|
The format of the target data. If the format is delta, additional targetDetails must be provided. |
targetDetails |
|
Change Data Capture (CDC) Configuration
The cdcSettings and cdcSnapshotSettings enable and pass configuration info to the CDC API’s.
Field |
Type |
Description |
|---|---|---|
cdcSettings |
|
See cdcSettings for more information. |
cdcSnapshotSettings |
|
See cdcSnapshotSettings for more information. |
cdcSettings
The cdcSettings object contains the following properties:
Parameter |
Type |
Description |
|---|---|---|
keys |
|
The column or combination of columns that uniquely identify a row in the source data. This is used to identify which CDC events apply to specific records in the target table. |
sequence_by |
str |
The column name specifying the logical order of CDC events in the source data. Delta Live Tables uses this sequencing to handle change events that arrive out of order. |
scd_type |
|
Whether to store records as SCD type 1 or SCD type 2. Set to |
apply_as_deletes |
|
(optional) Specifies when a CDC event should be treated as a DELETE rather than an upsert. |
where |
|
(optional) Filter the rows by a condition. |
ignore_null_updates |
|
(optional) Allow ingesting updates containing a subset of the target columns. When a CDC event matches an existing row and ignore_null_updates is True, columns with a null retain their existing values in the target. This also applies to nested columns with a value of null. When ignore_null_updates is False, existing values are overwritten with null values. |
except_column_list |
|
(optional) A list of columns to exclude from the upsert into the target table. |
track_history_column_list
track_history_except_column_list
|
|
A subset of output columns to be tracked for history in the target table. Use track_history_column_list to specify the complete list of columns to be tracked. Use track_history_except_column_list to specify the columns to be excluded from tracking. |
cdcSnapshotSettings
The cdcSnapshotSettings object contains the following properties:
Parameter |
Type |
Description |
|---|---|---|
keys |
|
The column or combination of columns that uniquely identify a row in the source data. This is used to identify which CDC events apply to specific records in the target table. |
snapshotType |
str |
The type of snapshot to process. Set to |
scd_type |
|
Whether to store records as SCD type 1 or SCD type 2. Set to |
sourceType |
|
The type of source to ingest the snapshots from. Set to |
source |
|
The source to ingest the snapshots from. This is required for |
track_history_column_list |
|
(optional) A subset of output columns to be tracked for history in the target table. Use this to specify the complete list of columns to be tracked. This cannot be used in conjunction with |
track_history_except_column_list |
|
(optional) A subset of output columns to be excluded from history tracking in the target table. Use this to specify which columns should not be tracked. This cannot be used in conjunction with |
CDC Historical Snapshot Source Configuration
The
sourceobject contains the following properties forfilebased sources:
Parameter
Type
Description
format
stringThe format of the source data. E.g. supported formats are
table,parquet,csv,json. All formats supported by spark see PySpark Data Sources API.path
stringThe location to load the source data from. This can be a table name or a path to a a file or directory with multiple snapshots. A placeholder
{version}can be used in this path which will be substituted with the version value in run time.versionType
stringThe type of versioning to use. Can be either
intordatetime.datetimeFormat
string(conditional) Required if
versionTypeisdatetime. The format ofstartingVersiondatetime value.microSecondMaskLength
integer(optional) WARNING: Edge Cases Only! - Specify this if your
versionTypeisdatetimeand your filename includes microsends, but not the full 6 digits. The number of microsecond digits to included at the end of the datetime value. - The default value is 6.startingVersion
stringorinteger(optional) The version to start processing from.
readerOptions
object(optional) Additional options to pass to the reader.
schemaPath
string(optional) The schema path to use for the source data.
selectExp
list(optional) A list of select expressions to apply to the source data.
filter
string(optional) A filter expression to apply to the source data. This filter is applied to the dataframe as a WHERE clause when the source is read. A placeholder
{version}can be used in this filter expression which will be substituted with the version value in run time.recursiveFileLookup
boolean(optional) When set to
true, enables recursive directory traversal to find snapshot files. This should be used when snapshots are stored in a nested directory structure such as Hive-style partitioning (e.g.,/data/{version}/file.parquet). When set tofalse(default), only files in the immediate directory are searched. Default:false.Note
If
recursiveFileLookupis set totrue, ensure that thepathparameter is specified in a way that is compatible with recursive directory traversal. I.e. the{version}placeholder is used in the path and not the filename.The
sourceobject contains the following properties fortablebased sources:
Parameter
Type
Description
table
stringThe table name to load the source data from.
versionColumn
stringThe column name to use for versioning.
startingVersion
stringorinteger(optional) The version to start processing from.
selectExp
list(optional) A list of select expressions to apply to the source data.
Data Quality and Quarantine Configuration
These properties control how data quality issues are handled:
Field |
Type |
Description |
|---|---|---|
dataQualityExpectationsEnabled (optional) |
|
A flag indicating whether data quality expectations are enabled (see Data Quality - Quarantine). |
dataQualityExpectationsPath (optional) |
|
Either a relative path or filename for the expectations file. Note that the framework automatically calculates all relative paths from the appropriate expectations sub-folder, in the Pipeline Bundle. Examples:
|
quarantineMode (optional) |
|
The mode for handling quarantined data. It can be off, flag, or table. Supported: [“off”, “flag”, “table”] |
quarantineTargetDetails (optional) |
|
Details about the quarantine target, only required if |
quarantineTargetDetails
The quarantineTargetDetails object contains the following properties:
Parameter |
Type |
Description |
|---|---|---|
targetFormat |
|
The format of the quarantine target. Currently, only Supported:
["delta"]Default:
"delta" |
table |
|
(conditional) The table name, required if |
tableProperties |
|
(conditional) Additional properties for the table, required if |
path |
|
(conditional) The path to the table, required if |
Table Migration Configuration
These properties control table migration:
Field |
Type |
Description |
|---|---|---|
tableMigrationDetails (optional) |
|
Details about table migration, only required if a table migration is needed. See Table Migration Details section below. |
tableMigrationDetails
The tableMigrationDetails object contains the following properties:
Property |
Type |
Description |
|---|---|---|
enabled |
|
A flag indicating whether table migration is enabled. |
catalogType |
|
The type of catalog, either hms or uc. Supported values: [“hms”, “uc”] |
autoStartingVersionsEnabled (optional) |
|
Flag to enable automatic starting version management. When enabled, the system automatically tracks source table versions and manages starting versions for views. Defaults to |
sourceDetails |
|
Details about the source for migration. See sourceDetails. |
sourceDetails
The sourceDetails object can potentially cater to different types of sources but is currently limited to the following:
sourceMigrateDelta
sourceMigrateDelta
The sourceMigrateDelta object contains the following properties:
Field |
Type |
Description |
|---|---|---|
database |
|
The database name. |
table |
|
The table name. |
selectExp (optional) |
|
An array of select expressions. |
whereClause (optional) |
|
An array of where clauses. |
exceptColumns (optional) |
|
An array of columns to exclude. |