"Sync" Channels in Nextflow

Posted on Apr 8, 2025

Nextflow is based on the Dataflow paradigm. The operations on the data are defined in a directed acyclic graph (DAG). The nodes of the DAG are the operations that need to happen and the edges are the inputs and outputs which connect the edges. This short post won’t go into too much detail about this.

Nextflow handles the parallelization and distribution of the task for the user. Developers have two basic structures to handle the parallelization of tasks, channels and channel operators. Channels are used to communicate between processes, and channel operators, often simply called operators, are used to consume, transform, and produce channels themselves. Operators borrow concepts from functional programming, as this paradigm is well suited to handle asynchronous tasks.

From the Nextflow docs:

Processes are executed independently and are isolated from each other, i.e. they do not share a common (writable) state. The only way they can communicate is via asynchronous FIFO queues, called channels in Nextflow.

Schematic representation of a channel and two processes, one of them producing data and the other ingesting it.

A schema of a Nextflow channel and a couple of processes pushing and reading data from it.

Nextflow channel and processes

Data enters a channel, and is then sent to any processes that read from it. For each input a processes spawns a task to do the computation, you can think of tasks as “instances” of a processs. the order in which a given input is processed by a task and sent to the channel is non-deterministic; it can vary due to a number of factors, i.e. size/complexity of the input file,the load on the shared infrastructure, so one task may get executed before another.

A schema of a Nextflow channel and a couple of processes pushing and reading data from it.

Nextflow channels, processes and tasks

This is a key concept of Nextflow, failing to internalize this can cause some nasty bugs in a pipeline.

Processes with multiples inputs

Our example so far is simplistic, with only one process producing and a second one ingesting. In this scenario the order it is not relevant. This changes when a process takes several inputs, then the order becomes really important – especially if a pipeline processes several samples at the same time.

process EXAMPLE {

  input:
  tuple val(meta), path(fasta)
  tuple val(meta2), path(bam)

  output:
  tuple val(meta), path("output.txt")

  script:
  """
  fake_tool -i $fasta -b $bam -o output.txt
  """
}

This is a fake tool that does some work with a fasta and a bam file, and it will fail if the sequences in the fasta file do not match the ones in the bam file. Each input (the fasta and bam) are independent channels so the order is not guaranteed. This is a common scenario and one that can cause unexpected bugs if the pipeline is not adjusted to account for this.

Sync channels

One neat trick to “sync” the channels is to use the join and multiMap operators.

I learned this one from @vagkaratzas, which he found in one of the nf-core pipelines. Please checkout nf-core if you are interested in Nextflow.

For a refresher about these operators… scroll to the end. The join operators emits the inner product of two source channels using a matching key. This allow two different channels to be joined, hence “synced” (we are using the term synced loosely here). The only inconvenience is that the join operator emits a single channel, which is not suitable in our case as we need two different channels. We can “split” it into several channels using multiMap, so the pattern is to use join to “sync” the channels and then multiMap to “generate” several named channels.

Example


fastas_ch = Channel.from(
  [ [id: example_one], example_one.fasta ],
  [ [id: example_two], example_two.fasta ]
)

bam_ch = Channel.from(
  [ [id: example_one], example_one.bam ],
  [ [id: example_two], example_two.bam ]
)

PROCESS_A(
  fastas_ch
)

PROCESS_B(
  bam_ch
)

PROCESS_A.out.fasta
  .join( PROCESS_B.out.bams )
  .multiMap { meta, fasta, bam ->
    fasta: [meta, fasta]
    bam: [meta, bam]
  }
  .set { ch_example }

EXAMPLE(
  ch_example.fasta,
  ch_example.bam,
)

The key step is as follows:

PROCESS_A.out.fasta
  .join( PROCESS_B.out.bams )
  .multiMap { meta, fasta, bam ->
    fasta: [meta, fasta]
    bam: [meta, bam]
  }
  .set { ch_example }

Firstly we join to get the outputs from PROCESS_A and PROCESS_B to be joined using the meta value. Once that is done, we can rely on the order of the fastas and bams belonging to the same key. Now that they are in order, we need to generate two channels with multiMap to feed into the EXAMPLE process.

Conclusion

Programming in Nextflow requires the developer to understand how data flows through the pipeline, and how queues and operators work with Channels. It is a steep learning curve if this is the first time you are exposed to this paradigm. Remember, everything is async and never assume things will be ordered.

join

Extracted from the official documentation

left  = Channel.of(
  ['X', 1],
  ['Y', 2],
  ['Z', 3],
  ['P', 7]
)

right = Channel.of(
  ['Z', 6],
  ['Y', 5],
  ['X', 4]
)

left.join(right).view()

Output

[Z, 3, 6]
[Y, 2, 5]
[X, 1, 4]

multiMap

Extracted from the official documentation

Channel.of( 1, 2, 3, 4 )
    .multiMap { v ->
        foo: v + 1
        bar: v * v
    }
    .set { result }

result.foo.view { v -> "foo $v" }
result.bar.view { v -> "bar $v" }

Output

foo 2
foo 3
foo 4
foo 5
bar 1
bar 4
bar 9
bar 16