Wait.On
Wait.On
returns a PCollection
with the contents identical to the input PCollection
, but delays the downstream processing until one or more other PCollections
(signals) have finished processing. This is useful for enforcing ordering or dependencies between different parts of a pipeline, especially when some outputs interact with external systems (such as writing to a database).
When you apply Wait.On
, the elements of the main PCollection
will not be emitted for downstream processing until the computations required to produce the specified signal PCollections
have completed. In streaming mode, this is enforced per window: the corresponding window of each waited-on PCollection
must close before elements are passed through.
Examples
Example 1: Basic usage
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<String> main = p.apply("CreateMain", Create.of("item1", "item2", "item3"));
PCollection<Void> signal = p.apply("CreateSignal", Create.of("trigger"))
.apply("ProcessSignal", ParDo.of(new DoFn<String, Void>() {
@ProcessElement
public void processElement(ProcessContext c) throws InterruptedException {
// Simulate some processing time
Thread.sleep(2000);
// Signal processing complete
}
}));
// Wait for 'signal' to complete before processing 'main'
// Elements pass through unchanged after 'signal' finishes
PCollection<String> processed = main.apply("WaitOnSignal", Wait.on(signal))
.apply("ProcessAfterWait", MapElements.into(TypeDescriptors.strings())
.via(item -> "Processed: " + item));
processed.apply("LogResults", ParDo.of(new DoFn<String, Void>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println(c.element());
}
}));
Example 2: Using multiple signals
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
// The PCollection to be processed after the signals.
PCollection<String> main2 = p.apply("CreateMain2", Create.of("data1", "data2"));
// Signal 1: Simulate a long-running setup task.
PCollection<Void> signal1 = p.apply("CreateSignal1", Create.of("setup"))
.apply("SetupDatabase", ParDo.of(new DoFn<String, Void>() {
@ProcessElement
public void processElement(ProcessContext c) throws InterruptedException {
System.out.println("Starting database setup...");
Thread.sleep(1000);
System.out.println("Database setup complete.");
}
}));
// Signal 2: Simulate loading a configuration file.
PCollection<Void> signal2 = p.apply("CreateSignal2", Create.of("config"))
.apply("LoadConfig", ParDo.of(new DoFn<String, Void>() {
@ProcessElement
public void processElement(ProcessContext c) throws InterruptedException {
System.out.println("Loading configuration...");
Thread.sleep(1500);
System.out.println("Configuration loaded.");
}
}));
// Wait for both signal1 and signal2 to complete before processing main2.
PCollection<String> result2 = main2.apply("WaitOnSignals", Wait.on(signal1, signal2))
.apply("TransformData", MapElements.into(TypeDescriptors.strings())
.via(data -> data.toUpperCase() + "_READY"));
// Log the final results.
result2.apply("LogResults", ParDo.of(new DoFn<String, Void>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println("Final Result: " + c.element());
}
}));
Related transforms
- Flatten merges multiple
PCollection
objects into a single logicalPCollection
. - Window logically divides or groups elements into finite windows.
- WithTimestamps assigns timestamps to elements in a collection.
Last updated on 2025/07/25
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!