Spring Boot Reactive Programming - Part 7: Reactive Data Access
Building reactive applications requires reactive data access. In this part, we’ll master R2DBC for relational databases, Spring Data R2DBC, transactions, and integrate with MongoDB and Redis reactively.
R2DBC Overview
R2DBC (Reactive Relational Database Connectivity) is the reactive alternative to JDBC.
JDBC vs R2DBC
// JDBC (Blocking)
@Repository
public class JdbcUserRepository {
@Autowired
private JdbcTemplate jdbcTemplate;
public User findById(Long id) {
// Thread BLOCKS waiting for database
return jdbcTemplate.queryForObject(
"SELECT * FROM users WHERE id = ?",
new Object[]{id},
new BeanPropertyRowMapper<>(User.class)
);
}
}
// R2DBC (Non-blocking)
@Repository
public interface R2dbcUserRepository extends ReactiveCrudRepository<User, Long> {
// Thread is NEVER blocked
Mono<User> findById(Long id);
}
Configuration
Dependencies (pom.xml)
<dependencies>
<!-- Spring Data R2DBC -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<!-- PostgreSQL R2DBC Driver -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>r2dbc-postgresql</artifactId>
</dependency>
<!-- Pool -->
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-pool</artifactId>
</dependency>
</dependencies>
Application Properties
spring:
r2dbc:
url: r2dbc:postgresql://localhost:5432/productdb
username: postgres
password: password
pool:
initial-size: 10
max-size: 20
max-idle-time: 30m
validation-query: SELECT 1
liquibase:
enabled: true
change-log: classpath:db/changelog/db.changelog-master.yaml
Configuration Class
@Configuration
@EnableR2dbcRepositories
public class R2dbcConfig extends AbstractR2dbcConfiguration {
@Override
@Bean
public ConnectionFactory connectionFactory() {
ConnectionFactoryOptions options = ConnectionFactoryOptions.builder()
.option(DRIVER, "postgresql")
.option(HOST, "localhost")
.option(PORT, 5432)
.option(USER, "postgres")
.option(PASSWORD, "password")
.option(DATABASE, "productdb")
.build();
ConnectionFactory connectionFactory =
ConnectionFactories.get(options);
// Connection pooling
ConnectionPoolConfiguration poolConfig =
ConnectionPoolConfiguration.builder(connectionFactory)
.maxIdleTime(Duration.ofMinutes(30))
.initialSize(10)
.maxSize(20)
.maxCreateConnectionTime(Duration.ofSeconds(2))
.build();
return new ConnectionPool(poolConfig);
}
@Bean
public R2dbcEntityTemplate r2dbcEntityTemplate(
DatabaseClient databaseClient) {
return new R2dbcEntityTemplate(databaseClient);
}
}
Entity Mapping
@Table("users")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {
@Id
private Long id;
@Column("full_name")
private String name;
private String email;
@Column("created_at")
private LocalDateTime createdAt;
@Column("updated_at")
private LocalDateTime updatedAt;
private boolean active;
@Transient // Not persisted to database
private List<Order> orders;
}
@Table("products")
@Data
public class Product {
@Id
private Long id;
private String name;
private BigDecimal price;
private String category;
@Column("in_stock")
private boolean inStock;
private Integer quantity;
@CreatedDate
private LocalDateTime createdAt;
@LastModifiedDate
private LocalDateTime updatedAt;
}
Repository Interfaces
@Repository
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
// Derived query methods
Mono<User> findByEmail(String email);
Flux<User> findByNameContaining(String name);
Flux<User> findByActiveTrue();
Mono<Boolean> existsByEmail(String email);
// Query with pagination
Flux<User> findAllBy(Pageable pageable);
// Custom query
@Query("SELECT * FROM users WHERE created_at >= :date")
Flux<User> findUsersCreatedAfter(LocalDateTime date);
// Query with parameters
@Query("SELECT * FROM users WHERE active = :active AND created_at >= :date")
Flux<User> findActiveUsersCreatedAfter(boolean active, LocalDateTime date);
// Modifying query
@Modifying
@Query("UPDATE users SET active = false WHERE last_login < :date")
Mono<Integer> deactivateInactiveUsers(LocalDateTime date);
// Count
@Query("SELECT COUNT(*) FROM users WHERE active = true")
Mono<Long> countActiveUsers();
}
@Repository
public interface ProductRepository extends ReactiveCrudRepository<Product, Long> {
Flux<Product> findByCategory(String category);
Flux<Product> findByInStockTrue();
Flux<Product> findByPriceBetween(BigDecimal min, BigDecimal max);
Flux<Product> findByNameContainingIgnoreCase(String name);
@Query("SELECT * FROM products WHERE category = :category AND in_stock = true ORDER BY price")
Flux<Product> findAvailableProductsByCategory(String category);
// Custom query with join (if relationships exist)
@Query("SELECT p.* FROM products p WHERE p.category = :category LIMIT :limit")
Flux<Product> findTopProductsByCategory(String category, int limit);
}
CRUD Operations
@Service
@Slf4j
public class UserService {
@Autowired
private UserRepository userRepository;
// Create
public Mono<User> createUser(UserDto dto) {
User user = new User();
user.setName(dto.getName());
user.setEmail(dto.getEmail());
user.setActive(true);
user.setCreatedAt(LocalDateTime.now());
return userRepository.save(user)
.doOnSuccess(saved ->
log.info("Created user: {}", saved.getId())
);
}
// Read
public Mono<User> getUserById(Long id) {
return userRepository.findById(id)
.switchIfEmpty(Mono.error(
new NotFoundException("User not found: " + id)
));
}
// Update
public Mono<User> updateUser(Long id, UserDto dto) {
return userRepository.findById(id)
.flatMap(existing -> {
existing.setName(dto.getName());
existing.setEmail(dto.getEmail());
existing.setUpdatedAt(LocalDateTime.now());
return userRepository.save(existing);
})
.switchIfEmpty(Mono.error(
new NotFoundException("User not found: " + id)
));
}
// Delete
public Mono<Void> deleteUser(Long id) {
return userRepository.findById(id)
.flatMap(user -> userRepository.delete(user))
.switchIfEmpty(Mono.error(
new NotFoundException("User not found: " + id)
));
}
// List all
public Flux<User> getAllUsers() {
return userRepository.findAll();
}
// Pagination
public Flux<User> getUsersPage(int page, int size) {
return userRepository.findAllBy(
PageRequest.of(page, size, Sort.by("createdAt").descending())
);
}
// Search
public Flux<User> searchUsers(String query) {
return userRepository.findByNameContaining(query);
}
// Batch operations
public Flux<User> createUsers(List<UserDto> dtos) {
List<User> users = dtos.stream()
.map(dto -> {
User user = new User();
user.setName(dto.getName());
user.setEmail(dto.getEmail());
user.setActive(true);
return user;
})
.collect(Collectors.toList());
return userRepository.saveAll(users);
}
}
Database Client (Low-level API)
@Service
public class CustomUserService {
@Autowired
private DatabaseClient databaseClient;
// Custom query with DatabaseClient
public Flux<User> findUsersByCustomQuery(String namePattern, boolean active) {
return databaseClient.sql(
"SELECT * FROM users WHERE name LIKE :pattern AND active = :active"
)
.bind("pattern", "%" + namePattern + "%")
.bind("active", active)
.map((row, metadata) -> {
User user = new User();
user.setId(row.get("id", Long.class));
user.setName(row.get("full_name", String.class));
user.setEmail(row.get("email", String.class));
user.setActive(row.get("active", Boolean.class));
return user;
})
.all();
}
// Insert with returning generated ID
public Mono<User> createUserWithGeneratedId(User user) {
return databaseClient.sql(
"INSERT INTO users (full_name, email, active, created_at) " +
"VALUES (:name, :email, :active, :createdAt) " +
"RETURNING id"
)
.bind("name", user.getName())
.bind("email", user.getEmail())
.bind("active", user.isActive())
.bind("createdAt", user.getCreatedAt())
.map((row, metadata) -> {
user.setId(row.get("id", Long.class));
return user;
})
.one();
}
// Update
public Mono<Integer> updateUserEmail(Long userId, String newEmail) {
return databaseClient.sql(
"UPDATE users SET email = :email, updated_at = :updatedAt " +
"WHERE id = :id"
)
.bind("email", newEmail)
.bind("updatedAt", LocalDateTime.now())
.bind("id", userId)
.fetch()
.rowsUpdated();
}
// Complex aggregation
public Flux<UserStatistics> getUserStatistics() {
return databaseClient.sql(
"SELECT " +
" DATE(created_at) as date, " +
" COUNT(*) as user_count, " +
" SUM(CASE WHEN active THEN 1 ELSE 0 END) as active_count " +
"FROM users " +
"GROUP BY DATE(created_at) " +
"ORDER BY date DESC"
)
.map((row, metadata) -> new UserStatistics(
row.get("date", LocalDate.class),
row.get("user_count", Long.class),
row.get("active_count", Long.class)
))
.all();
}
}
Transactions
@Service
public class TransactionalService {
@Autowired
private UserRepository userRepository;
@Autowired
private OrderRepository orderRepository;
@Autowired
private TransactionalOperator transactionalOperator;
// Declarative transaction
@Transactional
public Mono<Order> createOrderWithUserUpdate(Long userId, OrderDto orderDto) {
return userRepository.findById(userId)
.flatMap(user -> {
Order order = new Order();
order.setUserId(userId);
order.setTotal(orderDto.getTotal());
order.setStatus(OrderStatus.PENDING);
return orderRepository.save(order)
.flatMap(savedOrder -> {
user.setUpdatedAt(LocalDateTime.now());
return userRepository.save(user)
.thenReturn(savedOrder);
});
});
// If any operation fails, entire transaction rolls back
}
// Programmatic transaction
public Mono<OrderResult> processOrderProgrammatic(OrderRequest request) {
return userRepository.findById(request.getUserId())
.flatMap(user -> {
Order order = createOrder(request, user);
return orderRepository.save(order)
.flatMap(savedOrder ->
updateUserBalance(user, request.getTotal())
.thenReturn(new OrderResult(savedOrder))
);
})
.as(transactionalOperator::transactional)
.onErrorResume(e -> {
log.error("Transaction failed, rolling back", e);
return Mono.just(OrderResult.failed());
});
}
// Multiple operations in transaction
@Transactional
public Mono<TransferResult> transferFunds(
Long fromUserId, Long toUserId, BigDecimal amount) {
return Mono.zip(
userRepository.findById(fromUserId),
userRepository.findById(toUserId)
)
.flatMap(tuple -> {
User fromUser = tuple.getT1();
User toUser = tuple.getT2();
// Deduct from source
// Add to destination
// (Simplified - real app would have account balance table)
return Mono.zip(
userRepository.save(fromUser),
userRepository.save(toUser)
)
.map(savedTuple -> new TransferResult(true, "Success"));
})
.onErrorReturn(new TransferResult(false, "Transfer failed"));
}
private Order createOrder(OrderRequest request, User user) {
Order order = new Order();
order.setUserId(user.getId());
order.setTotal(request.getTotal());
return order;
}
private Mono<User> updateUserBalance(User user, BigDecimal amount) {
// Update logic
return userRepository.save(user);
}
}
Reactive MongoDB
// Dependencies
/*
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
*/
// Configuration
@Configuration
@EnableReactiveMongoRepositories
public class MongoConfig {
@Bean
public ReactiveMongoTemplate reactiveMongoTemplate(
ReactiveMongoDatabaseFactory factory) {
return new ReactiveMongoTemplate(factory);
}
}
// Document
@Document(collection = "products")
@Data
public class ProductDocument {
@Id
private String id;
@Indexed
private String name;
private BigDecimal price;
@Indexed
private String category;
private List<String> tags;
private Map<String, Object> attributes;
@CreatedDate
private LocalDateTime createdAt;
}
// Repository
@Repository
public interface ProductMongoRepository
extends ReactiveMongoRepository<ProductDocument, String> {
Flux<ProductDocument> findByCategory(String category);
Flux<ProductDocument> findByTagsContaining(String tag);
Flux<ProductDocument> findByPriceGreaterThan(BigDecimal price);
@Query("{ 'category': ?0, 'price': { $lte: ?1 } }")
Flux<ProductDocument> findByCategoryAndMaxPrice(String category, BigDecimal maxPrice);
}
// Service
@Service
public class ProductMongoService {
@Autowired
private ProductMongoRepository repository;
@Autowired
private ReactiveMongoTemplate mongoTemplate;
// Complex query with MongoDB template
public Flux<ProductDocument> searchProducts(ProductSearchCriteria criteria) {
Criteria mongoCriteria = new Criteria();
if (criteria.getCategory() != null) {
mongoCriteria.and("category").is(criteria.getCategory());
}
if (criteria.getMinPrice() != null) {
mongoCriteria.and("price").gte(criteria.getMinPrice());
}
if (criteria.getMaxPrice() != null) {
mongoCriteria.and("price").lte(criteria.getMaxPrice());
}
if (criteria.getTags() != null && !criteria.getTags().isEmpty()) {
mongoCriteria.and("tags").in(criteria.getTags());
}
Query query = new Query(mongoCriteria)
.with(Sort.by(Sort.Direction.DESC, "createdAt"))
.limit(100);
return mongoTemplate.find(query, ProductDocument.class);
}
// Aggregation
public Flux<CategoryStats> getCategoryStatistics() {
Aggregation aggregation = Aggregation.newAggregation(
Aggregation.group("category")
.count().as("count")
.avg("price").as("avgPrice")
.min("price").as("minPrice")
.max("price").as("maxPrice"),
Aggregation.sort(Sort.Direction.DESC, "count")
);
return mongoTemplate.aggregate(
aggregation,
"products",
CategoryStats.class
);
}
}
Reactive Redis
// Dependencies
/*
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
*/
// Configuration
@Configuration
public class RedisConfig {
@Bean
public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() {
return new LettuceConnectionFactory("localhost", 6379);
}
@Bean
public ReactiveRedisTemplate<String, Object> reactiveRedisTemplate(
ReactiveRedisConnectionFactory factory) {
RedisSerializationContext<String, Object> serializationContext =
RedisSerializationContext
.<String, Object>newSerializationContext(new StringRedisSerializer())
.value(new GenericJackson2JsonRedisSerializer())
.build();
return new ReactiveRedisTemplate<>(factory, serializationContext);
}
}
// Cache Service
@Service
@Slf4j
public class RedisCacheService {
@Autowired
private ReactiveRedisTemplate<String, Object> redisTemplate;
// Set value with TTL
public <T> Mono<Boolean> set(String key, T value, Duration ttl) {
return redisTemplate.opsForValue()
.set(key, value, ttl)
.doOnSuccess(result ->
log.debug("Cached: {} = {}", key, value)
);
}
// Get value
public <T> Mono<T> get(String key, Class<T> type) {
return redisTemplate.opsForValue()
.get(key)
.cast(type)
.doOnNext(value ->
log.debug("Cache hit: {} = {}", key, value)
);
}
// Delete
public Mono<Boolean> delete(String key) {
return redisTemplate.delete(key)
.map(count -> count > 0);
}
// Check exists
public Mono<Boolean> exists(String key) {
return redisTemplate.hasKey(key);
}
// List operations
public Mono<Long> pushToList(String key, Object value) {
return redisTemplate.opsForList().rightPush(key, value);
}
public Flux<Object> getList(String key) {
return redisTemplate.opsForList().range(key, 0, -1);
}
// Set operations
public Mono<Long> addToSet(String key, Object... values) {
return redisTemplate.opsForSet().add(key, values);
}
public Flux<Object> getSet(String key) {
return redisTemplate.opsForSet().members(key);
}
// Hash operations
public Mono<Boolean> setHash(String key, String field, Object value) {
return redisTemplate.opsForHash()
.put(key, field, value);
}
public Mono<Object> getHash(String key, String field) {
return redisTemplate.opsForHash()
.get(key, field);
}
// Increment
public Mono<Long> increment(String key) {
return redisTemplate.opsForValue().increment(key);
}
// Expire
public Mono<Boolean> expire(String key, Duration timeout) {
return redisTemplate.expire(key, timeout);
}
}
// Service with caching
@Service
public class CachedUserService {
@Autowired
private UserRepository userRepository;
@Autowired
private RedisCacheService cacheService;
public Mono<User> getUserById(Long id) {
String cacheKey = "user:" + id;
return cacheService.get(cacheKey, User.class)
.switchIfEmpty(
userRepository.findById(id)
.flatMap(user ->
cacheService.set(cacheKey, user, Duration.ofMinutes(10))
.thenReturn(user)
)
);
}
public Mono<User> updateUser(Long id, UserDto dto) {
return userRepository.findById(id)
.flatMap(user -> {
user.setName(dto.getName());
user.setEmail(dto.getEmail());
return userRepository.save(user);
})
.flatMap(updated ->
cacheService.delete("user:" + id)
.thenReturn(updated)
);
}
}
Key Takeaways
- R2DBC provides reactive database access for SQL databases
- Spring Data R2DBC offers repository abstraction
- Transactions work with
@Transactional
orTransactionalOperator
- DatabaseClient for custom queries and complex operations
- MongoDB Reactive for document-based reactive storage
- Redis Reactive for caching and session management
- Connection pooling is critical for performance
- Always use reactive drivers to maintain non-blocking benefits
What’s Next
In Part 8 (final part), we’ll cover Testing, Debugging, and Production - comprehensive testing strategies with StepVerifier and WebTestClient, debugging reactive code, production monitoring, and deployment best practices.
Practice Exercise: Build a complete reactive data layer:
- User management with R2DBC (PostgreSQL)
- Product catalog with MongoDB
- Caching layer with Redis
- Transactions for order processing
- Complex queries and aggregations
- Proper error handling and logging