"Sync" Channels in Nextflow
Nextflow is based on the Dataflow paradigm. The operations on the data are defined in a directed acyclic graph (DAG). The nodes of the DAG are the operations that need to happen and the edges are the inputs and outputs which connect the edges. This short post won’t go into too much detail about this.
Nextflow handles the parallelization and distribution of the task for the user. Developers have two basic structures to handle the parallelization of tasks, channels and channel operators. Channels are used to communicate between processes, and channel operators, often simply called operators, are used to consume, transform, and produce channels themselves. Operators borrow concepts from functional programming, as this paradigm is well suited to handle asynchronous tasks.
Processes are executed independently and are isolated from each other, i.e. they do not share a common (writable) state. The only way they can communicate is via asynchronous FIFO queues, called channels in Nextflow.
Schematic representation of a channel and two processes, one of them producing data and the other ingesting it.
Nextflow channel and processes
Data enters a channel, and is then sent to any processes that read from it. For each input a processes spawns a task to do the computation, you can think of tasks as “instances” of a processs. the order in which a given input is processed by a task and sent to the channel is non-deterministic; it can vary due to a number of factors, i.e. size/complexity of the input file,the load on the shared infrastructure, so one task may get executed before another.
Nextflow channels, processes and tasks
This is a key concept of Nextflow, failing to internalize this can cause some nasty bugs in a pipeline.
Processes with multiples inputs
Our example so far is simplistic, with only one process producing and a second one ingesting. In this scenario the order it is not relevant. This changes when a process takes several inputs, then the order becomes really important – especially if a pipeline processes several samples at the same time.
process EXAMPLE {
input:
tuple val(meta), path(fasta)
tuple val(meta2), path(bam)
output:
tuple val(meta), path("output.txt")
script:
"""
fake_tool -i $fasta -b $bam -o output.txt
"""
}
This is a fake tool that does some work with a fasta and a bam file, and it will fail if the sequences in the fasta file do not match the ones in the bam file. Each input (the fasta and bam) are independent channels so the order is not guaranteed. This is a common scenario and one that can cause unexpected bugs if the pipeline is not adjusted to account for this.
Sync channels
One neat trick to “sync” the channels is to use the join and multiMap operators.
I learned this one from @vagkaratzas, which he found in one of the nf-core pipelines. Please checkout nf-core if you are interested in Nextflow.
For a refresher about these operators… scroll to the end. The join
operators emits the inner product of two source channels using a matching key. This allow two different channels to be joined, hence “synced” (we are using the term synced loosely here). The only inconvenience is that the join
operator emits a single channel, which is not suitable in our case as we need two different channels. We can “split” it into several channels using multiMap
, so the pattern is to use join
to “sync” the channels and then multiMap
to “generate” several named channels.
Example
fastas_ch = Channel.from(
[ [id: example_one], example_one.fasta ],
[ [id: example_two], example_two.fasta ]
)
bam_ch = Channel.from(
[ [id: example_one], example_one.bam ],
[ [id: example_two], example_two.bam ]
)
PROCESS_A(
fastas_ch
)
PROCESS_B(
bam_ch
)
PROCESS_A.out.fasta
.join( PROCESS_B.out.bams )
.multiMap { meta, fasta, bam ->
fasta: [meta, fasta]
bam: [meta, bam]
}
.set { ch_example }
EXAMPLE(
ch_example.fasta,
ch_example.bam,
)
The key step is as follows:
PROCESS_A.out.fasta
.join( PROCESS_B.out.bams )
.multiMap { meta, fasta, bam ->
fasta: [meta, fasta]
bam: [meta, bam]
}
.set { ch_example }
Firstly we join to get the outputs from PROCESS_A
and PROCESS_B
to be joined using the meta
value. Once that is done, we can rely on the order of the fastas and bams belonging to the same key. Now that they are in order, we need to generate two channels with multiMap
to feed into the EXAMPLE
process.
Conclusion
Programming in Nextflow requires the developer to understand how data flows through the pipeline, and how queues and operators work with Channels. It is a steep learning curve if this is the first time you are exposed to this paradigm. Remember, everything is async and never assume things will be ordered.
join
left = Channel.of(
['X', 1],
['Y', 2],
['Z', 3],
['P', 7]
)
right = Channel.of(
['Z', 6],
['Y', 5],
['X', 4]
)
left.join(right).view()
Output
[Z, 3, 6]
[Y, 2, 5]
[X, 1, 4]
multiMap
Channel.of( 1, 2, 3, 4 )
.multiMap { v ->
foo: v + 1
bar: v * v
}
.set { result }
result.foo.view { v -> "foo $v" }
result.bar.view { v -> "bar $v" }
Output
foo 2
foo 3
foo 4
foo 5
bar 1
bar 4
bar 9
bar 16