Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: Static types #309

Draft
wants to merge 25 commits into
base: dev
Choose a base branch
from
Draft

Proposal: Static types #309

wants to merge 25 commits into from

Conversation

bentsherman
Copy link

@bentsherman bentsherman commented May 1, 2024

This PR is a showcase of the proposed syntax for static types in Nextflow.

While I started with the goal of simply adding type annotations and type checking, I realized that many aspects of the language needed to be re-thought in order to provide a consistent developer experience. Some of these things can be done now, but I suspect they will be more difficult without static types, so I have tried to show them in their "best form" in this PR.

Changes

  • Type annotations. The following declarations can be annotated with a type:

    • workflow params/outputs
    • workflow takes/emits
    • process inputs/outputs
    • function parameters/return
    • local variables (generally not needed)

    Nextflow will use these type annotations to infer the type of every value in the workflow and make sure they are valid.

    The main built-in types are:

    • int, float, boolean, String: primitive types
    • Path: file or directory
    • List<E>, Set<E>, Bag<E>: collections with various constraints on ordering and uniqueness
    • Map<K,V>: map of key-value pairs
    • Channel<E>: channel (i.e. queue channel)
  • User-defined types. Types can be composed in several ways to facilitate domain modeling:

    • tuples: any number of values can be grouped into a tuple, e.g. (1, 'hello', true) has type (int, String, boolean)
    • records: a combination of named values, e.g. a sample is a meta map AND some files:
      record Sample { meta: Map ; files: List<Path> }
    • enums: a union of named values, e.g. a shirt size can be small or medium or large:
      enum TshirtSize { Small, Medium, Large }
    • optionals: any type can be suffixed with ? to denote that it can be null (e.g. String?), otherwise it should never be null
  • Only use params in top-level workflow. Params are not known outside the entry workflow. Pass params into processes and workflows as explicit inputs instead.

  • Replace publishDir with workflow outputs. Publish channels to output targets in the entry workflow instead. Declare an index file for the output target instead of creating a samplesheet in the workflow. Bundle related data in a single channel (e.g. metadata, fastq, and md5 files for each sample). Don't try to anticipate the needs of downstream pipelines, just publish a comprehensive index file that can be filtered/renamed as needed.

  • Use eval output, topic channels to collect tool versions. Send tool versions to a topic instead of emitting them as process outputs. Use the eval() or env() function to emit the tool version. Declare an index file (see versions output target) instead of rendering the YAML directly.

  • Define pipeline params in the main script. Each param has a type. Complex types can be composed from collections, records, and enums. Rather than specifying a particular input format for input files, simply specify a type and Nextflow will use the type to generate a schema and load from various sources. Config params are defined separately in the main config.

  • Queue channels are just channels and value channels are just values. The Channel type always refers to a queue channel. Value channels can be used like any value, e.g. in an if statement:

    // convert a list into a channel and back into a list
    vals = Channel.of(1..10).collect()
    // `vals` is just a value, so just use it!
    if( vals.size() > 2 )
      println 'more than two!'
  • Processes are just functions. Processes cannot be called directly with channels. Instead, call a process with values to execute it once, or call it in an operator to execute it for each value in the source channel:

    // execute FASTQC in parallel on each input file
    Channel.fromPath( "inputs/*.fastq" )
      .map(FASTQC) // short for { fastq -> FASTQC(fastq) }
    
    // execute ACCUMULATE sequentially on each input file
    // (replaces experimental recursion)
    Channel.fromPath( "inputs/*.txt" )
      .reduce { result, file -> ACCUMULATE( result, file ) }
  • Simple operator library. Many operators can be removed in favor of equivalent functions or similar operators. All in all, the operator library can be reduced to the following core operators:

    • collect: collect channel elements into a collection (i.e. bag)
    • cross: cross product of two channels
    • filter: filter a channel based on a condition
    • gather: nested gather (similar to groupTuple)
    • join: relational join of two channels (i.e. horizontal)
    • map: transform a channel
    • mix: concatenate multiple channels (i.e. vertical)
    • reduce: accumulate each channel element into a single value
    • scan: like reduce but emit each intermediate value
    • scatter: nested scatter (similar to flatMap)
    • subscribe: invoke a function for each channel element
    • view: print each channel element

Benefits

  • Well-defined workflow inputs. Workflow inputs are explicitly defined alongside the entry workflow as a set of params. Each param has a type, and complex params can be loaded transparently from any source (file, database, API, etc) as long as the runtime supports it. The JSON schema of a param can be inferred from the param's type.

  • Well-defined workflow outputs. Workflow outputs are explicitly defined as a set of "output targets". Each target has a type and can create an index file, which is essentially a serialization of a channel to external storage (file, database, API, etc), and each target defines how its published files are organized (e.g. in a directory tree). The JSON schema of an output target can be inferred from the target's type.

  • Make pipeline import-able. Separating the "core" workflow (i.e. SRA) from params and publishing makes it easy to import the pipeline into larger pipelines. See https://github.com/bentsherman/fetchngs2rnaseq for a more complete example.

  • Simpler dataflow logic. Since value channels are just values to the user, they can be used with native constructs like an if statement, rather than only callbacks. Since processes are called in the operator closure, their inputs can be different from the source channel structure, and their outputs can be different from the output channel structure. The amount of boilerplate, both in the workflow logic and process definition, is significantly reduced.

  • Simpler operator library. With a minimal set of operators, users can easily determine which operator to use based on their needs. The operators listed above are statically typed, pertain only to generic stream operations, and work with any type of value, not just tuples and lists.

  • Simpler process inputs/outputs. Process inputs and outputs are defined similarly to workflow inputs and outputs, rather than a custom set of type qualifiers. Files are automatically staged/unstaged based on their type declaration. Inputs can have default values. Thanks to the simplified dataflow logic described above, tuples are generally not needed.

Extra Notes

This proposed syntax will be enabled by the following internal improvements:

  • New script/config parser, which enables us to evolve the Nextflow language into whatever we want, without being constrained by Groovy syntax (though it still must compile to Groovy AST).

  • Static analysis, which can infer the type of every value based on the declared types of pipeline/workflow/process inputs, and infer whether to handle an async value (i.e. value channel) with a callback or an await.

  • Automatic generation of JSON schemas for workflow params and outputs based on their types. Preserves support for external tools like Seqera Platform, and lays the groundwork to transparently support different connectors (CSV/JSON file, HTTP API, SQL database, etc).

Comment on lines 114 to 117
|> map { meta ->
def sample = new Sample( meta, meta.fastq_aspera.tokenize(';').take(2).collect( name -> file(name) ) )
ASPERA_CLI ( sample, 'era-fasp', aspera_cli_args )
} // fastq: Channel<Sample>, md5: Channel<Sample>
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mahesh-panchal to your point about dynamic args, I think we can do even better in DSL3:

|> map { meta ->
  def sample = new Sample( /* ... */ )
  ASPERA_CLI ( sample, 'era-fasp', "${meta.key}" )
}

Because we call the process in a map operator explicitly (currently it is implied), we can control how the process is invoked for each task within the operator closure, instead of passing multiple queue channels.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this specifically is much better. Treating processes like functions is soooo much better since there's no implicit transformation stuff going on with all the singleton/queue channel stuff. It has to be formed and then mapped. And tuples disappear too, except in channels (?).

Actually I think what's worrying me about this syntax is the mixing of input types. An input could be a channel (e.g. MULTIQC_MAPPINGS_CONFIG ( mappings ) lower down ) or it could be an input set (e.g. this dynamically defined Sample). This is already confusing to new comers where we commonly see people trying to use channels inside map, branch, etc.
I guess one could explain the second option as passing dynamically defined singleton channels.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this proposal, there are no "value" channels, only queue channels and regular values. So the MULTIQC_MAPPINGS_CONFIG ( mappings ) is no different because mappings is just a value. It may be an async value, and Nextflow might represent it as a value channel under the hood, but to the user it should be indistinguishable from a regular value

In other words, you can not call a process with a channel, only values

@bentsherman bentsherman changed the title DSL2+ / DSL3 preview DSL2+ / DSL3 proof-of-concept May 1, 2024
@bentsherman bentsherman changed the title DSL2+ / DSL3 proof-of-concept Preview: DSL2+ (and beyond) May 1, 2024
main.nf Outdated Show resolved Hide resolved
types/types.nf Outdated Show resolved Hide resolved
workflows/sra/main.nf Outdated Show resolved Hide resolved
Copy link
Member

@mahesh-panchal mahesh-panchal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like the pipe replacement |> is easier to see, and provides some visual directionality, helping readability.

Something extra: How about also shifting:

workflow {
    workflow.onComplete {
    }
}

to

workflow {
    onStart:
    ...

    take:
    ...

    main:
    ...
    
    onComplete:
    ...
}

There are some things I really like here, but I have reservations about other stuff like how channels are obfuscated with their channel values, and process outputs

main.nf Outdated Show resolved Hide resolved
main.nf Outdated Show resolved Hide resolved
main.nf Outdated Show resolved Hide resolved
workflows/sra/main.nf Outdated Show resolved Hide resolved
Comment on lines 23 to 25
topic:
[ task.process, 'sratools', eval("fasterq-dump --version 2>&1 | grep -Eo '[0-9.]+'") ] >> 'versions'
[ task.process, 'pigz', eval("pigz --version 2>&1 | sed 's/pigz //g'") ] >> 'versions'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of like this, but I dislike the name topic. I don't feel like the word communicates what it's function is.

It would also be nice if we could supply a regex to validate what should be returned by the eval for some fast fail behavior when there's extra stuff being emitted. Where would one define a global variable pattern? E.g.

def SOFTWARE_VERSION = /\d+.../
def SHASUM = /\w{16}/ 

Or maybe this should be a class? like you can filter { Number }.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

topic is a term from stream processing, used to collect related events from many different sources. in this case we are sending the tool version info to a custom "versions" topic, then the workflow reads from that topic to build the versions yaml file.

eval is just a function defined in the output / topic scope, so you could wrap it in a custom validation function:

def validate( pattern, text ) {
  // ...
}

// ...
  topic:
  validate( /foo/, eval('...') ) >> 'versions'

workflows/sra/nextflow.config Show resolved Hide resolved
subworkflows/local/utils_nfcore_fetchngs_pipeline/main.nf Outdated Show resolved Hide resolved
workflows/sra/main.nf Outdated Show resolved Hide resolved
workflows/sra/main.nf Outdated Show resolved Hide resolved
@samuell

This comment was marked as off-topic.

@bentsherman

This comment was marked as off-topic.

@samuell

This comment was marked as off-topic.

@bentsherman

This comment was marked as off-topic.

@mahesh-panchal

This comment was marked as outdated.

@bentsherman

This comment was marked as outdated.

@mahesh-panchal

This comment was marked as outdated.

@bentsherman

This comment was marked as outdated.

@mahesh-panchal

This comment was marked as outdated.

@bentsherman

This comment was marked as outdated.

@bentsherman bentsherman changed the title Preview: DSL2+ (and beyond) Proposal: Beyond DSL2 May 21, 2024
Signed-off-by: Ben Sherman <[email protected]>
@bentsherman bentsherman changed the title Proposal: Beyond DSL2 Proposal: Static types Nov 2, 2024
Comment on lines +24 to +25
def args_fasterqdump = task.ext.args_fasterqdump ?: ''
def args_pigz = task.ext.args_pigz ?: ''
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rethinking the approach to ext args. People seem to really like being able to set these args from the config without the hassle of custom params and passing them through the workflow logic. If nothing else, I think we can keep supporting the ext approach, even with static types, so that it doesn't become a roadblock.

One way we could evolve the ext approach is to allow a process to declare additional params/args in a separate section:

  input:
  // ...

  params:
  args_fasterqdump: String
  args_pigz: String
  prefix: String

These are essentially process-level params, i.e. inputs that are passed in through the config rather than the pipeline code.

The config would look basically the same, perhaps with a clearer syntax:

params {
    // no glob patterns, only simple names
    withName: SRATOOLS_FASTERQDUMP {
        args_fasterqdump = '--split-files --include-technical'
    }
}

So it would be basically the same approach as the ext config, but since the process declares these settings, Nextflow can validate them in the config, provide auto-completion, etc.

My only concern is that these "process params" could be abused. At the same time, these params can basically only be strings, so that limits their usability to things like args, prefix, etc, so maybe it doesn't matter. Need to think on it more

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the potential for abuse, but the ability to use closures has been a very strong attraction here in the usage of ext.args. It's essentially a necessity for ext.prefix and highly desired to be able to select parameters based on pipeline input:s too.

Something I'm wondering is if a profiles approach might also solve this issue ( although it would have to be something that supports nesting to some degree ) so it could be set using a json schema or something but this might be too complex for the average user.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also using only simple names is not going to help, for example nf-core rnaseq has 5 patterns for SAMTOOLS_INDEX

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I fully support having this in params though because anything that could potentially change the results should be in the -params-file and not half in params and half in process such as a nextflow.config in the launch directory which could be forgotten if someone comes back later to rerun something.

Comment on lines +93 to +95
ftp_samples
.mix(sratools_samples)
.mix(aspera_samples)
Copy link
Author

@bentsherman bentsherman Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of doing a separate filter/map for each branch, you could handle all three branches in a single map operator:

ncbi_settings = params.download_method == DownloadMethod.SRATOOLS
    ? CUSTOM_SRATOOLSNCBISETTINGS( sra_metadata.collect() )
    : null

samples = sra_metadata.map { meta ->
    def method = getDownloadMethod(meta, params.download_method)
    match( method ) { // or switch, or if-else, etc
        DownloadMethod.FTP -> {
            def out = SRA_FASTQ_FTP ( meta )
            new Sample(meta.id, out.fastq_1, out.fastq_2, out.md5_1, out.md5_2)
        },
        DownloadMethod.SRATOOLS -> {
            def sra = SRATOOLS_PREFETCH ( meta, ncbi_settings, dbgap_key )
            def fastq = SRATOOLS_FASTERQDUMP ( meta, sra, ncbi_settings, dbgap_key )
            def fastq_1 = fastq[0]
            def fastq_2 = !meta.single_end ? fastq[1] : null
            new Sample(meta.id, fastq_1, fastq_2, null, null)
        },
        DownloadMethod.ASPERA -> {
            def out = ASPERA_CLI ( meta, 'era-fasp' )
            new Sample(meta.id, out.fastq_1, out.fastq_2, out.md5_1, out.md5_2)
        }
    }
}

(the match is like a switch-case or if-else chain, just an idea I had for how we could do it in Nextflow)

I think this is a more concise description of the workflow, because you basically just write "regular code" and less operator logic. Now I enjoy playing with operators as much as anyone, but beyond a certain point they become a distraction in my opinion. Others may disagree.

On the other hand, this approach isn't very amenable to subworkflows. You can't call a workflow inside the map closure, and you can't factor out any of this code into a separate function. I might be able to remove this limitation in the future. But for now I think this approach would lead to fewer subworkflows. Maybe that's fine if the refactor simplifies the overall workflow logic by a lot. You certainly wouldn't have to argue so much about naming workflows then 😉

At the end of the day, either way works just fine. You can split up the logic and use more operators, which allows you to factor out subworkflows like for the SRATOOLS route, or you can do it all in one giant map operator, or something in between.

Either way is easy to do for fetchngs because you just mix the various routes at the end. But when you need to join several related channels from different processes -- think like raw fastqc + trimmed fastqc + bam for each sample in rnaseq -- I think that's where the giant map operator is much easier. A good rule of thumb might be to use separate operators for mixing, but use a single map operator for joining.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be nice if subworkflows behaved in a similar fashion to processes. At the moment, they're just zip-ties for channels, and I think people would find working with subworkflows nicer if they behaved more like functions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants