admin管理员组文章数量:1334421
In our reactive application (Spring WebFlux, project Reactor) we carry important logging information within the reactive context. The problem is when we use the Caffeine AsyncCache
, the context is not automatically transferred between the calling Mono
and the caffeine cache Future
.
I'd like to share our solution as I think some others might have the same need.
In our reactive application (Spring WebFlux, project Reactor) we carry important logging information within the reactive context. The problem is when we use the Caffeine AsyncCache
, the context is not automatically transferred between the calling Mono
and the caffeine cache Future
.
I'd like to share our solution as I think some others might have the same need.
Share Improve this question edited Nov 21, 2024 at 8:00 Honza Zidek asked Nov 20, 2024 at 15:32 Honza ZidekHonza Zidek 20.5k9 gold badges91 silver badges141 bronze badges1 Answer
Reset to default 2The solution is in the code, with added comments
This is the main testing class:
import com.github.benmanes.caffeine.cache.AsyncCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import .junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import java.util.Map;
class ContextFromMonoToCaffeineCache {
@Test
void context() {
final UserService userService = new UserService();
// Call the user service 4 times, each time with a different context.
// Fetch the user with ID=1 twice, using the method which loses the context.
// The 1st fetch is from the database, the 2nd is already from the cache.
userService.getUserWithoutContext(1)
.delayUntil(this::printUser)
.contextWrite(context -> context.put("myContext", "FIRST"))
.block();
userService.getUserWithoutContext(1)
.delayUntil(this::printUser)
.contextWrite(context -> context.put("myContext", "SECOND"))
.block();
// Fetch the user with ID=2 twice, using the context preserving fetch.
// The 1st fetch is from the database, the 2nd is already from the cache.
userService.getUserWithContext(2)
.delayUntil(this::printUser)
.contextWrite(context -> context.put("myContext", "THIRD"))
.block();
userService.getUserWithContext(2)
.delayUntil(this::printUser)
.contextWrite(context -> context.put("myContext", "FORTH"))
.block();
}
private Mono<Object> printUser(User user) {
return Mono.deferContextual(context -> {
System.out.printf("Found %s, with myContext: %s %n", user, context.get("myContext"));
return Mono.empty();
});
}
}
The output is:
Fetched for ID: 1, with myContext: N/A
Found User[id=1, name=Alex], with myContext: FIRST
Found User[id=1, name=Alex], with myContext: SECOND
Fetched for ID: 2, with myContext: THIRD
Found User[id=2, name=Betty], with myContext: THIRD
Found User[id=2, name=Betty], with myContext: FORTH
As you can see, the cache works as expected. Each ID is fetched from the "database" only once.
ID=1 was fetched without the context handling,
ID=2 has the context properly handled.
The UserService
class uses Caffeine AsyncCache
.
cache.get()
has 2 parameters:
- The key to search for.
- A lambda telling how to fetch the data if they are not yet in the cache.
class UserService {
private final UserRepository userRepository = new UserRepository();
private final AsyncCache<Integer, User> cache = Caffeine.newBuilder().buildAsync();
public Mono<User> getUserWithoutContext(Integer userKey) {
return Mono.deferContextual(contextView -> Mono.fromCompletionStage(cache.get(userKey,
// Executed when the key is not in the cache yet:
(key, executor) -> userRepository.fetchUser(key)
.toFuture()
)));
}
public Mono<User> getUserWithContext(Integer userKey) {
return Mono.deferContextual(contextView -> Mono.fromCompletionStage(cache.get(userKey,
// Executed when the key is not in the cache yet:
(key, executor) -> userRepository.fetchUser(key)
// Copy the context.
// Notice that the call of contextWrite() must be AFTER the call of fetchUser(),
// as it is propagated upstream and not downstream.
.contextWrite(context -> context.put("myContext", contextView.get("myContext")))
.toFuture()
)));
}
}
UserRepository
is just a simulated database repository.
In the fetchUser()
method we inspect the context and print it out to demonstrate if it has or has not been properly propagated.
class UserRepository {
// Simulated database table
final private Map<Integer, User> users = Map.of(
1, new User(1, "Alex"),
2, new User(2, "Betty")
);
public Mono<User> fetchUser(Integer userKey) {
return Mono.just(userKey)
// Let's check what is in the context
.transformDeferredContextual((integerMono, contextView) -> {
System.out.printf("Fetched for ID: %s, with myContext: %s %n",
userKey, contextView.getOrDefault("myContext", "N/A"));
return integerMono;
})
// Simulate the database fetch
.mapNotNull(users::get);
}
}
User
is our business object:
record User(Integer id, String name) {}
本文标签: javaPropagate reactive Mono context to Caffeine AsyncCacheStack Overflow
版权声明:本文标题:java - Propagate reactive Mono context to Caffeine AsyncCache - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1742348860a2458078.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论