Since their introduction in Java 8, Streams have fundamentally changed how developers handle collections of data. They brought a declarative style to data processing, allowing programmers to express what they wanted to achieve rather than how to iterate through the elements. For years, the java.util.stream package has remained largely static, relying on a fixed set of intermediate operations like map, filter, and flatMap.
While these operations cover a vast majority of use cases, they exhibit a significant limitation regarding stateful operations. Developers often find themselves in a bind when they need to perform complex transformations that require maintaining state between elements-such as sliding windows, dedicated batching logic, or custom deduplication-without terminating the stream. The Stream Gatherers API, introduced as a preview feature in Java 22 (JEP 461) and standardized later, addresses this gap. It provides a new intermediate operation, gather which allows for custom, stateful intermediate operations to be defined and used within stream pipelines.
This article explores the technical depth of the Gatherer API, demonstrating how it expands the capabilities of the Java Stream ecosystem and enabling a new level of expressiveness in data processing pipelines.
To understand the necessity of the Gatherer API, one must first recognize the constraints of the existing Stream toolkit. Standard intermediate operations in Java are predominantly stateless. Operations like map or filter look at one element at a time without any knowledge of previous or subsequent elements.
When developers need stateful logic, they typically have two options. The first is to use Stream.collect() at an intermediate stage, effectively terminating the stream, processing the data into a collection, and then streaming it again. This breaks the laziness of the stream and forces the entire dataset into memory, which is inefficient for large or infinite streams. The second approach involves using side-effects within stateless operations, such as manipulating an external AtomicInteger or a custom state object inside a map operation. This approach is discouraged because it leads to thread-safety issues and breaks the functional programming paradigm that Streams are built upon.
Consider a scenario where a developer needs to process a stream of sensor readings but only wants to emit a value if the reading has changed significantly from the previous one. This requires a stateful filter. Using standard filter logic is impossible because the predicate only receives the current element. Developers often resort to hacks involving reduce or external mutable state. The Gatherer API solves this by providing a structured, safe, and native way to inject state into the middle of a stream pipeline.
Architecture of the Gatherer API
The Gatherer API introduces a new interface, java.util.stream.Gatherer . It represents an intermediate operation that transforms a sequence of elements from type T to type R. Unlike a Collector, which reduces a stream to a single result or a container, Gatherer produces a new stream. This distinction is crucial. Gatherer is effectively a stream-to-stream transformation machine.
The interface is defined with four core components, mirroring the design of the Collector interface but adapting it for the push-based nature of streams.
The first component is the Initializer. This is a supplier that creates a new instance of the private state object to be used during the evaluation of the stream. This state is isolated per parallel evaluation, ensuring thread safety without requiring the developer to manage synchronization manually.
The second component is the Integrator. This is the heart of the Gatherer. It is a function that integrates a new element from the input stream into the state and potentially pushes elements to the downstream receiver (the next stage in the pipeline). It takes three arguments: the current state, the incoming element, and a Downstream object. The Downstream object acts as a sink whatever is pushed to it becomes the input for the next stage of the stream. The Integrator returns a boolean indicating whether the stream should continue processing or terminate early (short-circuit).
The third component is the Finisher. This is a function that runs after all elements of the input stream have been processed by the integrator. It allows the Gatherer to flush any remaining data held in the state. For instance, in a batching Gatherer, the last batch might not be full when the stream ends. The Finisher is where that partial batch is pushed downstream.
The fourth component is the Combiner. This is relevant only for parallel streams. It defines how the states of two parallel evaluations are merged. This allows Gatherers to be parallelized safely and efficiently.
Understanding the Gatherer Lifecycle
When a stream pipeline containing a gather operation is executed, the lifecycle is strictly defined. The Java runtime invokes the initializer to create a state instance. For a parallel stream, this happens for each split segment of the data.
The integrator is then invoked for every element in the stream segment. The integrator logic can inspect the element, update the private state, and decide what to push downstream. The Downstream interface provided to the integrator offers methods like push(R element) and isRejecting(). The integrator can choose to push zero, one, or multiple elements for a single input element. It can also check isRejecting() to see if the downstream stage is no longer accepting data (for example, if a limit() operation further down the line has been satisfied).
Once the input elements are exhausted, the finisher is called. The finisher receives the state and the Downstream sink. It can perform cleanup or push final accumulated data.
Built-in Gatherers in the JDK
The JDK includes a set of built-in Gatherers accessible via static methods on the Gatherers class. These implementations cover common patterns that were previously difficult to express.
One of the most useful built-in Gatherers is windowFixed. This Gatherer groups elements into lists of a specific size. Before Gatherers, achieving a sliding window or fixed window required complex logic or third-party libraries. With Gatherers.windowFixed(int), the process is declarative and efficient.
Stream.of(1, 2, 3, 4, 5, 6, 7)
.gather(Gatherers.windowFixed(3))
.forEach(System.out::println);
// Output:
// [1, 2, 3]
// [4, 5, 6]
// [7]
Another powerful built-in is fold. While Stream.reduce combines elements into a single result, fold allows for a stateful accumulation that remains lazy. It is similar to a mutable reduction but integrated as an intermediate operation. This is particularly useful when the accumulation logic needs to reset or when developers want to pass partial results downstream during the process.
The scan Gatherer is another essential addition. It performs a prefix scan (also known as a cumulative operation). For example, calculating a running sum or running maximum. Unlike reduce, which produces one final result, scan emits a new result for every input element.
Stream.of(1, 2, 3, 4, 5) .gather(Gatherers.scan(() -> 0, Integer::sum)) .forEach(System.out::println); // Output: // 1 // 3 // 6 // 10 // 15
Implementing a Custom Gatherer
While built-ins are helpful, the true power of the Gatherer API lies in creating custom implementations. Let us consider a real-world requirement: a Stateful Deduplication. We want to filter a stream of log events so that only the first event of each specific severity level is passed through, ignoring subsequent duplicates of the same level until a different level appears. This is distinct from distinct(), which would eliminate all duplicates globally here, we want local de-duplication based on state.
To implement this, we define a Gatherer. We start by defining the state object. In this case, we need to store the last seen severity.
public class LogEvent { private String message; private int severity; public LogEvent(String message, int severity) { this.message = message; this.severity = severity; } public int getSeverity() { return severity; } public String getMessage() { return message; }}
Now we implement the Gatherer distinctBySeverity. We use the factory methods on the Gatherer interface.
```java Gatherer distinctBySeverity = Gatherer.of( // Initializer: The state is the last seen severity (stored as Integer) () -> new int[] { -1 },
// Integrator: Logic to decide whether to push (state, element, downstream) -> { if (state[0] != element.getSeverity()) { state[0] = element.getSeverity(); downstream.push(element); } return true; // Continue processing } // No Finisher needed // No Combiner needed for sequential stream
This custom Gatherer maintains an int[] as state. Since arrays are mutable, we can update the state directly. The integrator checks if the current element’s severity differs from the stored state. If it differs, it updates the state and pushes the element downstream. If it matches, it ignores the element. This logic runs lazily, handling infinite streams perfectly without buffering the entire dataset.
Short-Circuiting and Infinite Streams
One of the most sophisticated aspects of the Gatherer API is its support for short-circuiting. In standard Streams, operations like limit() or findFirst() can stop the pipeline early. Custom Gatherers can implement this same behavior.
The Integrator returns a boolean. If the integrator returns false, the stream source is signaled to stop producing elements. This allows for the creation of Gatherers that can process infinite streams and stop based on a condition internal to the Gatherer.
Imagine a Take Until Duration Gatherer. This Gatherer should process elements until a specific amount of time has elapsed since the start of processing.
```java static Gatherer takeUntilDuration(Duration duration) { return Gatherer.of( // State: Start time stored as a long () -> new long[] { System.nanoTime() }, // Integrator (state, element, downstream) -> { long elapsed = System.nanoTime() - state[0]; if (elapsed < duration.toNanos()) { downstream.push(element); return true; // Continue } return false; // Stop the stream } ); } ``` This example illustrates the power of the API. We have created an intermediate operation that terminates a stream based on time, a logic that previously required a custom Spliterator or an external side-effect. The return value `false` in the integrator efficiently stops the data pull from the source, preserving resources. Parallelism and the Combiner For sequential streams, the combiner component of a Gatherer is unnecessary. However, to support parallel streams correctly, a Gatherer must define how to merge states. When a stream is processed in parallel, the input is split into segments. Each segment runs its own Gatherer instance with its own isolated state. Once the segments are processed, the states must be combined, and the accumulated results must be pushed downstream. This logic resides in the Combiner. Let us look at a parallel-friendly "Summing Gatherer" that pushes the sum downstream only at the end. Note that this is distinct from `reduce` because it is an intermediate operation; you could map the sum to another object or filter it afterward. ```java Gatherer sumGatherer = Gatherer.of( // State: Mutable integer () -> new int[] { 0 }, // Integrator: Accumulate (state, element, downstream) -> { state[0] += element; return true; }, // Combiner: Merge two states (for parallel) (state1, state2) -> { state1[0] += state2[0]; return state1; }, // Finisher: Push the final sum (state, downstream) -> { downstream.push(state[0]); } ); ```
This Gatherer works correctly in a parallel stream. The runtime splits the stream, accumulates sums in different threads, combines the partial sums using the Combiner, and finally pushes the total result via the Finisher. The ability to define this logic explicitly ensures that Gatherers are not just syntactic sugar but robust tools for high-performance data processing.
Comparison with Existing Approaches
To fully appreciate the Gatherer API, it is helpful to compare it with existing strategies for similar problems.
HTML Table: Comparison of Stream Processing Strategies
| Feature | Stream.collect() | Side-Effects (map/filter) | Gatherer API |
|---|---|---|---|
| Laziness | No (Eager) | Yes | Yes |
| State Management | Terminal State | External/Mutable | Internal/Encapsulated |
| Thread Safety | Safe (Concurrent Collectors) | Risky (Manual sync needed) | Safe (Isolated state per segment) |
| Short-Circuiting | No | Possible but complex | Native Support |
| Stream Output | Single Result (Collection) | Stream | Stream |
The table highlights that collect is eager and terminal, making it unsuitable for intermediate transformations. Side-effects within map or filter break the functional purity and thread-safety guarantees of streams. The Gatherer API sits in the sweet spot, offering the laziness of intermediate operations with the statefull capability of collectors, all while maintaining thread safety and encapsulation.
Real-World Use Case: Intelligent Batching
Common requirement in enterprise systems is batching data for external API calls or database inserts. Often, simple fixed-size batching (as provided by Gatherers.windowFixed) is insufficient. We might need Time-Window Batching or Condition-Based Batching.
For instance, consider a stream of network packets. We want to batch packets until we reach a specific byte size limit or a specific count, whichever comes first. This is a stateful decision process.
We can implement a smartBatch Gatherer.
```java public static Gatherer> smartBatch(int maxSize, int maxCount) { return Gatherer.of( // State: Current batch list and current size () -> new Object() { List batch = new ArrayList<>(); int currentSize = 0; }, // Integrator (state, element, downstream) -> { state.batch.add(element); state.currentSize += element.getSize();
if (state.currentSize >= maxSize || state.batch.size() >= maxCount) { downstream.push(new ArrayList<>(state.batch)); state.batch.clear(); state.currentSize = 0; } return true; }, // Finisher: Flush remaining (state, downstream) -> { if (!state.batch.isEmpty()) { downstream.push(state.batch); } } );
This Gatherer logic is encapsulated and reusable. It pushes List
Conclusion
The Stream Gatherers API represents a significant maturation of the Java Stream framework. It acknowledges that real-world data processing is rarely a simple stateless mapping or filtering exercise. By providing a structured mechanism for stateful intermediate operations, the API fills a long-standing capability gap.
Developers can now express complex logic-such as sliding windows, custom batching, and state-ful deduplication-in a way that is lazy, parallelizable, and encapsulated. The separation of concerns enforced by the Gatherer interface (Initializer, Integrator, Finisher, Combiner) ensures that code remains readable and maintainable, even when the logic is intricate.
As Java continues to evolve, features like the Gatherer API demonstrate a commitment to enabling developers to write high-performance, declarative code without sacrificing the flexibility required for sophisticated data manipulation. Whether utilizing the built-in Gatherers utility class or implementing custom interfaces for domain-specific logic, the gather method is poised to become a standard tool in the professional Java developer’s arsenal, transforming how we think about intermediate stream operations.

