Skip to content

Latest commit

 

History

History
118 lines (103 loc) · 4.4 KB

data-flow-script.md

File metadata and controls

118 lines (103 loc) · 4.4 KB

Data Flow script (DFS)

What is the DFS?

The data flow script (DFS) is the underlying text, similar to a coding language, that is used to execute the transformations that are included in a mapping data flow. Every transformation is represented by a series of properties that provide the necessary information to run the job properly.

For instance, allowSchemaDrift: true, in a source transformation tells the service to include all columns from the source dataset in the data flow even if they are not included in the schema projection.

Use cases

The DFS is usually hidden from users and is automatically produced by the user interface. As a result, most of the time reading or editing the DFS directly is unnecessary. There are some cases, though, where it can be helpful or necessary to have an understanding of the script while debugging and producing data flows.

Here are a few examples:

  • Programatically producing many data flows that are fairly similar
  • Complex expressions that are difficult to manage in the UI or are resulting in validation issues
  • Debugging and better understanding various errors returned during execution

How to add transforms

Adding transformations requires three basic steps: adding the core transformation data, rerouting the input stream, and then rerouting the output stream. This can be seen easiest in an example. Let's say we start with a simple source to sink data flow like the following:

source(output(
		movieId as string,
		title as string,
		genres as string
	),
	allowSchemaDrift: true,
	validateSchema: false) ~> source1
source1 sink(allowSchemaDrift: true,
	validateSchema: false) ~> sink1

If we decide to add a derive transformation, first we need to create the core transformation text, which has a simple expression to add a new uppercase column called upperCaseTitle:

derive(upperCaseTitle = upper(title)) ~> deriveTransformationName

Then, we take the existing DFS and add the transformation:

source(output(
		movieId as string,
		title as string,
		genres as string
	),
	allowSchemaDrift: true,
	validateSchema: false) ~> source1
derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
	validateSchema: false) ~> sink1

And now we reroute the incoming stream by identifying which transformation we want the new transformation to come after (in this case, source1) and copying the name of the stream to the new transformation:

source(output(
		movieId as string,
		title as string,
		genres as string
	),
	allowSchemaDrift: true,
	validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
source1 sink(allowSchemaDrift: true,
	validateSchema: false) ~> sink1

Finally we identify the transformation we want to come after this new transformation, and replace its input stream (in this case, sink1) with the output stream name of our new transformation:

source(output(
		movieId as string,
		title as string,
		genres as string
	),
	allowSchemaDrift: true,
	validateSchema: false) ~> source1
source1 derive(upperCaseTitle = upper(title)) ~> deriveTransformationName
deriveTransformationName sink(allowSchemaDrift: true,
	validateSchema: false) ~> sink1

DFS fundamentals

The DFS is composed of a series of connected transformations, including sources, sinks, and various others which can add new columns, filter data, join data, and much more. Usually, the script with start with one or more sources followed by many transformations and ending with one or more sinks.

Sources all have the same basic construction:

source(
  source properties
) ~> source_name

For instance, a simple source with three columns (movieId, title, genres) would be:

source(output(
		movieId as string,
		title as string,
		genres as string
	),
	allowSchemaDrift: true,
	validateSchema: false) ~> source1

All transformations other than sources have the same basic construction:

name_of_incoming_stream transformation_type(
  properties
) ~> new_stream_name

For example, a simple derive transformation that takes a column (title) and overwrites it with an uppercase version would be as follows:

source1 derive(
  title = upper(title)
) ~> derive1

And a sink with no schema would simply be:

derive1 sink(allowSchemaDrift: true,
	validateSchema: false) ~> sink1