admin管理员组

文章数量:1414945

I'm trying to implement Optimistic Locking with Redis Lettuce client in Java Reactive according to this article: . I got 2 approaches, V1 and V2. V2 works but not V1. I don't understand why, and V2 doesn't seem very effecient in my opinion.

This doesn't work:

public Mono<List<T>> optimisticUpdateAllV1(List<String> ids, Function<T, T> mapper) {
    // map the ids to redis keys
    List<String> keys = // ...
    String[] keysArray = keys.toArray(new String[0]);

    RedisReactiveCommands<String, String> redisReactiveCommands = redisConnection.reactive();
    return redisReactiveCommands.watch(keysArray)
        .then(redisReactiveCommands.mget(keysArray).collectList())
        .map(entries -> {
            List<T> oldItems = // ...

            return oldItems.stream().map(mapper).toList();
        })
        .flatMap(newItems -> {
            // map the entities back to <key, stringified entity>
            Map<String, String> newItemsMap = // ...

            return redisReactiveCommands.multi()
                    .flatMap(multiStatus -> {
                        return redisReactiveCommands.mset(newItemsMap);
                    })
                    .flatMap(mSetStatus -> {
                        return redisReactiveCommands.exec();
                    })
                    .flatMap(executeStatus -> {
                        if(executeStatus.wasDiscarded() || executeStatus.isEmpty()){
                            return optimisticUpdateAllV1(ids, mapper);
                        }

                        return Mono.just(newItems);
                    });
        });
}

And this works by switching to blocking mset instruction inside multi block by adding an onSuccess callback to the multi mono.

public Mono<List<T>> optimisticUpdateAllV2(List<String> ids, Function<T, T> mapper) {
    // map the ids to redis keys
    List<String> keys = // ...
    String[] keysArray = keys.toArray(new String[0]);

    RedisReactiveCommands<String, String> redisReactiveCommands = redisConnection.reactive();
    return redisReactiveCommands.watch(keysArray)
        .then(redisReactiveCommands.mget(keysArray).collectList())
        .map(entries -> {
            List<T> oldItems = // ...

            return oldItems.stream().map(mapper).toList();
        })
        .flatMap(newItems -> {
            // map the entities back to <key, stringified entity>
            Map<String, String> newItemsMap = // ...

            return redisReactiveCommands.multi()
                    .doOnSuccess((e) -> {
                        redisReactiveCommands.mset(newItemsMap).subscribe();
                    })
                    .flatMap(multiStatus -> {
                        return redisReactiveCommands.exec();
                    })
                    .flatMap(executeStatus -> {
                        if(executeStatus.wasDiscarded() || executeStatus.isEmpty()){
                            return optimisticUpdateAllV2(ids, mapper);
                        }

                        return Mono.just(newItems);
                    });
            });
}

First, i don't understand why it doesn't work with the first approach, and secondly, i'm not sure if putting a subscribe() and blocking the flow inside the mutli mono block is a wise idea.

本文标签: Redis Lettuce Optimistic Locking in Java ReactiveStack Overflow