Advanced Java Streams: Beyond the Basics
Streams transform collections processing in Java. Beyond map()
and filter()
, advanced operations unlock performance gains, cleaner code, and complex transformations. We’ll explore parallel streams, custom collectors, stateful operations, and production patterns.
Parallel Streams
Parallel streams split work across multiple threads automatically.
Basic Parallelism
List<Integer> numbers = IntStream.rangeClosed(1, 1000000)
.boxed()
.collect(Collectors.toList());
// Sequential
long sum = numbers.stream()
.mapToLong(Integer::longValue)
.sum();
// Parallel
long parallelSum = numbers.parallelStream()
.mapToLong(Integer::longValue)
.sum();
How it works: Parallel streams use ForkJoinPool to split data into chunks, process in parallel, then combine results. For aggregations like sum()
, this combines partial sums from each thread.
When to Use Parallel Streams
Good candidates:
- Large datasets (100,000+ elements)
- CPU-intensive operations per element
- Independent operations (no shared state)
Bad candidates:
- Small datasets (overhead exceeds benefit)
- I/O bound operations (database, network)
- Operations with side effects
// Good: CPU-intensive computation
List<BigInteger> primes = LongStream.rangeClosed(2, 10000000)
.parallel()
.filter(n -> isPrime(n))
.mapToObj(BigInteger::valueOf)
.collect(Collectors.toList());
// Bad: I/O bound
List<User> users = userIds.parallelStream()
.map(id -> userRepository.findById(id)) // Database call - sequential better
.collect(Collectors.toList());
Controlling Parallelism
Default parallelism = number of CPU cores. Override with custom pool:
ForkJoinPool customPool = new ForkJoinPool(4);
List<String> result = customPool.submit(() ->
data.parallelStream()
.map(String::toUpperCase)
.collect(Collectors.toList())
).get();
Production pattern:
public class ParallelStreamProcessor {
private final ForkJoinPool pool;
public ParallelStreamProcessor(int parallelism) {
this.pool = new ForkJoinPool(parallelism);
}
public <T, R> List<R> process(List<T> items, Function<T, R> mapper) {
try {
return pool.submit(() ->
items.parallelStream()
.map(mapper)
.collect(Collectors.toList())
).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Parallel processing failed", e);
}
}
public void shutdown() {
pool.shutdown();
}
}
Performance Comparison
@Benchmark
public long sequentialSum() {
return LongStream.rangeClosed(1, 10_000_000)
.sum();
}
@Benchmark
public long parallelSum() {
return LongStream.rangeClosed(1, 10_000_000)
.parallel()
.sum();
}
Results (4-core CPU):
Benchmark Mode Cnt Score Error Units
sequentialSum avgt 10 6.234 ± 0.124 ms/op
parallelSum avgt 10 2.156 ± 0.089 ms/op
Parallel is 3x faster for large computations. Small datasets show opposite: sequential wins due to thread overhead.
Custom Collectors
Collectors define how stream elements accumulate into results.
Collector Anatomy
public interface Collector<T, A, R> {
Supplier<A> supplier(); // Create accumulator
BiConsumer<A, T> accumulator(); // Add element to accumulator
BinaryOperator<A> combiner(); // Combine accumulators (parallel)
Function<A, R> finisher(); // Transform accumulator to result
Set<Characteristics> characteristics();
}
Custom Collector: Immutable List
public class ImmutableListCollector<T>
implements Collector<T, ImmutableList.Builder<T>, ImmutableList<T>> {
@Override
public Supplier<ImmutableList.Builder<T>> supplier() {
return ImmutableList::builder;
}
@Override
public BiConsumer<ImmutableList.Builder<T>, T> accumulator() {
return ImmutableList.Builder::add;
}
@Override
public BinaryOperator<ImmutableList.Builder<T>> combiner() {
return (left, right) -> {
left.addAll(right.build());
return left;
};
}
@Override
public Function<ImmutableList.Builder<T>, ImmutableList<T>> finisher() {
return ImmutableList.Builder::build;
}
@Override
public Set<Characteristics> characteristics() {
return Collections.emptySet();
}
}
Usage:
ImmutableList<String> names = users.stream()
.map(User::getName)
.collect(new ImmutableListCollector<>());
Collector Factory Method
Simpler syntax with Collector.of()
:
public static <T> Collector<T, ?, ImmutableList<T>> toImmutableList() {
return Collector.of(
ImmutableList::<T>builder,
ImmutableList.Builder::add,
(left, right) -> left.addAll(right.build()),
ImmutableList.Builder::build
);
}
// Usage
ImmutableList<String> names = users.stream()
.map(User::getName)
.collect(toImmutableList());
Real-World Collector: Partitioned Map
Group elements into map where keys map to lists, but only if condition met:
public static <T, K> Collector<T, ?, Map<K, List<T>>>
partitioningBy(Function<T, K> classifier, Predicate<T> filter) {
return Collector.of(
HashMap::new,
(map, element) -> {
if (filter.test(element)) {
map.computeIfAbsent(classifier.apply(element), k -> new ArrayList<>())
.add(element);
}
},
(map1, map2) -> {
map2.forEach((key, list) ->
map1.merge(key, list, (l1, l2) -> {
l1.addAll(l2);
return l1;
})
);
return map1;
}
);
}
Usage:
// Group active users by city
Map<String, List<User>> activeUsersByCity = users.stream()
.collect(partitioningBy(User::getCity, User::isActive));
Statistics Collector
Collect multiple aggregates in single pass:
public class Statistics {
private long count;
private double sum;
private double min = Double.POSITIVE_INFINITY;
private double max = Double.NEGATIVE_INFINITY;
public void accept(double value) {
count++;
sum += value;
min = Math.min(min, value);
max = Math.max(max, value);
}
public Statistics combine(Statistics other) {
count += other.count;
sum += other.sum;
min = Math.min(min, other.min);
max = Math.max(max, other.max);
return this;
}
public double getAverage() {
return count > 0 ? sum / count : 0.0;
}
}
public static Collector<Double, Statistics, Statistics> statistics() {
return Collector.of(
Statistics::new,
Statistics::accept,
Statistics::combine,
Function.identity()
);
}
Usage:
Statistics orderStats = orders.stream()
.map(Order::getTotal)
.collect(statistics());
System.out.printf("Count: %d, Avg: %.2f, Min: %.2f, Max: %.2f%n",
orderStats.count, orderStats.getAverage(),
orderStats.min, orderStats.max);
Stateful vs Stateless Operations
Stateless Operations
Each element processed independently:
// Stateless: map, filter, flatMap
List<String> upperCase = names.stream()
.map(String::toUpperCase)
.collect(Collectors.toList());
Safe for parallelism. No shared state between elements.
Stateful Operations
Depend on previously seen elements:
// Stateful: sorted, distinct, limit, skip
List<String> distinctSorted = names.stream()
.distinct()
.sorted()
.collect(Collectors.toList());
Performance impact: Stateful operations require buffering data. sorted()
must see all elements before emitting any. limit()
and skip()
in parallel streams lose efficiency.
Custom Stateful Operation
Running total with reduce()
:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
List<Integer> runningTotals = numbers.stream()
.reduce(
new ArrayList<>(),
(list, num) -> {
int newTotal = list.isEmpty() ? num : list.get(list.size() - 1) + num;
List<Integer> newList = new ArrayList<>(list);
newList.add(newTotal);
return newList;
},
(list1, list2) -> {
List<Integer> combined = new ArrayList<>(list1);
combined.addAll(list2);
return combined;
}
);
// Output: [1, 3, 6, 10, 15]
Not parallel-safe. Sequential only.
Better approach with Stream builders:
public static <T, R> Stream<R> scan(Stream<T> stream, R identity,
BinaryOperator<R> accumulator,
Function<T, R> mapper) {
Stream.Builder<R> builder = Stream.builder();
AtomicReference<R> current = new AtomicReference<>(identity);
stream.forEach(element -> {
R next = accumulator.apply(current.get(), mapper.apply(element));
current.set(next);
builder.add(next);
});
return builder.build();
}
// Usage
List<Integer> runningTotals = scan(
numbers.stream(),
0,
Integer::sum,
Function.identity()
).collect(Collectors.toList());
Advanced Grouping
Nested Grouping
// Group orders by customer, then by status
Map<Long, Map<OrderStatus, List<Order>>> ordersByCustomerAndStatus =
orders.stream()
.collect(Collectors.groupingBy(
Order::getCustomerId,
Collectors.groupingBy(Order::getStatus)
));
// Access
Map<OrderStatus, List<Order>> customerOrders =
ordersByCustomerAndStatus.get(customerId);
List<Order> completedOrders = customerOrders.get(OrderStatus.COMPLETED);
Grouping with Downstream Collectors
// Group by city, count users per city
Map<String, Long> userCountByCity = users.stream()
.collect(Collectors.groupingBy(
User::getCity,
Collectors.counting()
));
// Group by city, sum order totals
Map<String, Double> totalRevenueByCity = orders.stream()
.collect(Collectors.groupingBy(
order -> order.getUser().getCity(),
Collectors.summingDouble(Order::getTotal)
));
// Group by city, collect names as comma-separated string
Map<String, String> namesByCity = users.stream()
.collect(Collectors.groupingBy(
User::getCity,
Collectors.mapping(
User::getName,
Collectors.joining(", ")
)
));
Partitioning with Statistics
Map<Boolean, DoubleSummaryStatistics> orderStatsByCompletion =
orders.stream()
.collect(Collectors.partitioningBy(
order -> order.getStatus() == OrderStatus.COMPLETED,
Collectors.summarizingDouble(Order::getTotal)
));
DoubleSummaryStatistics completedStats = orderStatsByCompletion.get(true);
DoubleSummaryStatistics pendingStats = orderStatsByCompletion.get(false);
System.out.printf("Completed orders: count=%d, avg=%.2f%n",
completedStats.getCount(), completedStats.getAverage());
Custom Grouping Key
// Group by age range
Map<String, List<User>> usersByAgeRange = users.stream()
.collect(Collectors.groupingBy(user -> {
int age = user.getAge();
if (age < 18) return "Minor";
if (age < 30) return "Young Adult";
if (age < 50) return "Adult";
return "Senior";
}));
FlatMap Patterns
Flatten Nested Collections
class Department {
List<Employee> employees;
}
List<Department> departments = ...;
// Get all employees across departments
List<Employee> allEmployees = departments.stream()
.flatMap(dept -> dept.getEmployees().stream())
.collect(Collectors.toList());
// Get all unique skills across all employees
Set<String> allSkills = departments.stream()
.flatMap(dept -> dept.getEmployees().stream())
.flatMap(emp -> emp.getSkills().stream())
.collect(Collectors.toSet());
FlatMap with Optional
List<Optional<User>> optionalUsers = ...;
// Extract present users only
List<User> users = optionalUsers.stream()
.flatMap(Optional::stream)
.collect(Collectors.toList());
Before Java 9: Use filter(Optional::isPresent).map(Optional::get)
Cartesian Product
List<String> colors = Arrays.asList("Red", "Green", "Blue");
List<String> sizes = Arrays.asList("S", "M", "L");
// Generate all color-size combinations
List<String> products = colors.stream()
.flatMap(color -> sizes.stream()
.map(size -> color + "-" + size))
.collect(Collectors.toList());
// Output: [Red-S, Red-M, Red-L, Green-S, Green-M, Green-L, Blue-S, Blue-M, Blue-L]
FlatMap for String Processing
List<String> sentences = Arrays.asList(
"Java Streams are powerful",
"Functional programming is elegant",
"Lambda expressions simplify code"
);
// Extract all unique words
Set<String> words = sentences.stream()
.flatMap(sentence -> Arrays.stream(sentence.split("\\s+")))
.map(String::toLowerCase)
.collect(Collectors.toSet());
Performance Optimization
Lazy Evaluation
Streams are lazy: intermediate operations don’t execute until terminal operation called.
List<String> names = users.stream()
.peek(u -> System.out.println("Filtering: " + u.getName()))
.filter(User::isActive)
.peek(u -> System.out.println("Mapping: " + u.getName()))
.map(User::getName)
.peek(name -> System.out.println("Collecting: " + name))
.collect(Collectors.toList());
Output shows: Each element flows through entire pipeline before next element starts. Stream processes element-by-element, not operation-by-operation.
Short-Circuiting
Operations that don’t need full stream:
// findFirst stops after first match
Optional<User> firstActiveUser = users.stream()
.filter(User::isActive)
.findFirst();
// anyMatch stops after first true
boolean hasAdmin = users.stream()
.anyMatch(user -> user.getRole() == Role.ADMIN);
// limit stops after n elements
List<User> firstTen = users.stream()
.limit(10)
.collect(Collectors.toList());
Use short-circuiting to avoid processing entire stream when possible.
Primitive Streams
Avoid boxing overhead with specialized streams:
// Inefficient: boxing/unboxing
int sum = orders.stream()
.map(Order::getQuantity) // Returns Integer (boxed)
.reduce(0, Integer::sum);
// Efficient: primitive stream
int sum = orders.stream()
.mapToInt(Order::getQuantity) // IntStream (no boxing)
.sum();
Primitive streams: IntStream
, LongStream
, DoubleStream
Benefits:
- No boxing/unboxing overhead
- Specialized operations:
sum()
,average()
,max()
,min()
- Range generators:
IntStream.range(0, 100)
Avoiding Stream Recreation
// Bad: recreates stream each time
public long countActiveUsers(List<User> users) {
return users.stream().filter(User::isActive).count();
}
public List<String> getActiveUserNames(List<User> users) {
return users.stream().filter(User::isActive).map(User::getName).collect(Collectors.toList());
}
// Better: single pass with custom collector
public class UserStats {
long activeCount;
List<String> activeNames;
}
public UserStats analyzeUsers(List<User> users) {
return users.stream()
.filter(User::isActive)
.collect(Collector.of(
UserStats::new,
(stats, user) -> {
stats.activeCount++;
stats.activeNames.add(user.getName());
},
(s1, s2) -> {
s1.activeCount += s2.activeCount;
s1.activeNames.addAll(s2.activeNames);
return s1;
}
));
}
Real-World Patterns
Pattern 1: Top-N Elements
// Get top 5 customers by total spending
List<Customer> topCustomers = orders.stream()
.collect(Collectors.groupingBy(
Order::getCustomerId,
Collectors.summingDouble(Order::getTotal)
))
.entrySet().stream()
.sorted(Map.Entry.<Long, Double>comparingByValue().reversed())
.limit(5)
.map(entry -> customerRepository.findById(entry.getKey()))
.collect(Collectors.toList());
Pattern 2: Conditional Accumulation
// Calculate discounted total only for orders above threshold
double discountedTotal = orders.stream()
.filter(order -> order.getTotal() > 1000)
.mapToDouble(order -> order.getTotal() * 0.9)
.sum();
Pattern 3: Multi-Level Filtering
// Get active users in specific cities with orders in last 30 days
LocalDate thirtyDaysAgo = LocalDate.now().minusDays(30);
Set<String> targetCities = Set.of("Mumbai", "Delhi", "Bangalore");
List<User> qualifiedUsers = users.stream()
.filter(User::isActive)
.filter(user -> targetCities.contains(user.getCity()))
.filter(user -> user.getOrders().stream()
.anyMatch(order -> order.getDate().isAfter(thirtyDaysAgo)))
.collect(Collectors.toList());
Pattern 4: Lookup Map
// Create lookup map for quick access
Map<Long, User> userMap = users.stream()
.collect(Collectors.toMap(User::getId, Function.identity()));
// Handle duplicate keys
Map<String, User> userByEmail = users.stream()
.collect(Collectors.toMap(
User::getEmail,
Function.identity(),
(existing, replacement) -> existing // Keep first
));
Pattern 5: Hierarchical Data
class Category {
Long id;
String name;
Long parentId;
}
// Build category tree
Map<Long, List<Category>> categoriesByParent = categories.stream()
.collect(Collectors.groupingBy(Category::getParentId));
// Find all descendants of category
Set<Long> findDescendants(Long categoryId, Map<Long, List<Category>> map) {
return Stream.concat(
Stream.of(categoryId),
map.getOrDefault(categoryId, Collections.emptyList()).stream()
.flatMap(child -> findDescendants(child.getId(), map).stream())
).collect(Collectors.toSet());
}
Pattern 6: Data Validation
class ValidationResult {
List<String> errors = new ArrayList<>();
void addError(String error) {
errors.add(error);
}
boolean isValid() {
return errors.isEmpty();
}
}
ValidationResult validateUsers(List<User> users) {
return users.stream()
.collect(Collector.of(
ValidationResult::new,
(result, user) -> {
if (user.getEmail() == null || !user.getEmail().contains("@")) {
result.addError("Invalid email for user: " + user.getId());
}
if (user.getAge() < 0 || user.getAge() > 150) {
result.addError("Invalid age for user: " + user.getId());
}
},
(r1, r2) -> {
r1.errors.addAll(r2.errors);
return r1;
}
));
}
Common Pitfalls
Pitfall 1: Side Effects in Streams
// Bad: modifying external state
List<String> results = new ArrayList<>();
users.stream()
.map(User::getName)
.forEach(results::add); // Side effect - not thread-safe
// Good: use collector
List<String> results = users.stream()
.map(User::getName)
.collect(Collectors.toList());
Pitfall 2: Reusing Streams
// Bad: stream can only be used once
Stream<User> userStream = users.stream();
long count = userStream.count();
List<User> list = userStream.collect(Collectors.toList()); // ERROR: stream already operated upon
// Good: create new stream
long count = users.stream().count();
List<User> list = users.stream().collect(Collectors.toList());
Pitfall 3: Null Values
// Bad: NullPointerException if name is null
List<String> names = users.stream()
.map(User::getName)
.map(String::toUpperCase) // Crashes if name is null
.collect(Collectors.toList());
// Good: filter nulls or use Optional
List<String> names = users.stream()
.map(User::getName)
.filter(Objects::nonNull)
.map(String::toUpperCase)
.collect(Collectors.toList());
Pitfall 4: Parallel Stream Overhead
// Bad: parallel for small dataset
List<Integer> small = Arrays.asList(1, 2, 3, 4, 5);
int sum = small.parallelStream().mapToInt(i -> i).sum(); // Slower than sequential
// Good: parallel for large dataset
int sum = IntStream.rangeClosed(1, 1_000_000)
.parallel()
.sum();
Pitfall 5: Boxing in Reductions
// Bad: unnecessary boxing
Optional<Integer> max = numbers.stream()
.reduce(Integer::max); // Boxes each integer
// Good: use primitive stream
OptionalInt max = numbers.stream()
.mapToInt(Integer::intValue)
.max();
Summary
Advanced streams unlock powerful data processing:
- Parallel streams: Speed up CPU-intensive operations, use custom ForkJoinPool for control
- Custom collectors: Build domain-specific aggregations, combine multiple metrics in single pass
- Stateful vs stateless: Understand performance implications, prefer stateless when possible
- Advanced grouping: Nest groupings, use downstream collectors for complex aggregations
- FlatMap: Flatten nested structures, combine streams, generate combinations
- Performance: Leverage lazy evaluation, short-circuiting, primitive streams
- Patterns: Top-N, conditional accumulation, lookups, hierarchical data, validation
Streams make complex data transformations readable and efficient. Master these patterns and your code becomes cleaner, faster, and more maintainable.