Add timer to removeRecipientViewFromMrmData
This commit is contained in:
parent
0601f6a35c
commit
a0770db179
|
@ -148,6 +148,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
private final Timer getMessagesTimer = Metrics.timer(name(MessagesCache.class, "get"));
|
private final Timer getMessagesTimer = Metrics.timer(name(MessagesCache.class, "get"));
|
||||||
private final Timer getQueuesToPersistTimer = Metrics.timer(name(MessagesCache.class, "getQueuesToPersist"));
|
private final Timer getQueuesToPersistTimer = Metrics.timer(name(MessagesCache.class, "getQueuesToPersist"));
|
||||||
private final Timer removeByGuidTimer = Metrics.timer(name(MessagesCache.class, "removeByGuid"));
|
private final Timer removeByGuidTimer = Metrics.timer(name(MessagesCache.class, "removeByGuid"));
|
||||||
|
private final Timer removeRecipientViewTimer = Metrics.timer(name(MessagesCache.class, "removeRecipientView"));
|
||||||
private final Timer clearQueueTimer = Metrics.timer(name(MessagesCache.class, "clear"));
|
private final Timer clearQueueTimer = Metrics.timer(name(MessagesCache.class, "clear"));
|
||||||
private final Counter pubSubMessageCounter = Metrics.counter(name(MessagesCache.class, "pubSubMessage"));
|
private final Counter pubSubMessageCounter = Metrics.counter(name(MessagesCache.class, "pubSubMessage"));
|
||||||
private final Counter newMessageNotificationCounter = Metrics.counter(
|
private final Counter newMessageNotificationCounter = Metrics.counter(
|
||||||
|
@ -427,8 +428,8 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
final Experiment experiment = new Experiment(MRM_VIEWS_EXPERIMENT_NAME);
|
final Experiment experiment = new Experiment(MRM_VIEWS_EXPERIMENT_NAME);
|
||||||
|
|
||||||
final byte[] key = mrmMessage.getSharedMrmKey().toByteArray();
|
final byte[] key = mrmMessage.getSharedMrmKey().toByteArray();
|
||||||
final byte[] sharedMrmViewKey = MessagesCache.getSharedMrmViewKey(
|
final byte[] sharedMrmViewKey = MessagesCache.getSharedMrmViewKey(new AciServiceIdentifier(destinationUuid),
|
||||||
new AciServiceIdentifier(destinationUuid), destinationDevice);
|
destinationDevice);
|
||||||
|
|
||||||
final Mono<MessageProtos.Envelope> mrmMessageMono = Mono.from(redisCluster.withBinaryClusterReactive(
|
final Mono<MessageProtos.Envelope> mrmMessageMono = Mono.from(redisCluster.withBinaryClusterReactive(
|
||||||
conn -> conn.reactive().hmget(key, "data".getBytes(StandardCharsets.UTF_8), sharedMrmViewKey)
|
conn -> conn.reactive().hmget(key, "data".getBytes(StandardCharsets.UTF_8), sharedMrmViewKey)
|
||||||
|
@ -463,13 +464,22 @@ public class MessagesCache extends RedisClusterPubSubAdapter<String, String> imp
|
||||||
*/
|
*/
|
||||||
private void removeRecipientViewFromMrmData(final List<byte[]> sharedMrmKeys, final UUID accountUuid,
|
private void removeRecipientViewFromMrmData(final List<byte[]> sharedMrmKeys, final UUID accountUuid,
|
||||||
final byte deviceId) {
|
final byte deviceId) {
|
||||||
|
|
||||||
|
final Timer.Sample sample = Timer.start();
|
||||||
Flux.fromIterable(sharedMrmKeys)
|
Flux.fromIterable(sharedMrmKeys)
|
||||||
.collectMultimap(SlotHash::getSlot)
|
.collectMultimap(SlotHash::getSlot)
|
||||||
.flatMapMany(slotsAndKeys -> Flux.fromIterable(slotsAndKeys.values()))
|
.flatMapMany(slotsAndKeys -> Flux.fromIterable(slotsAndKeys.values()))
|
||||||
.flatMap(
|
.flatMap(
|
||||||
keys -> removeRecipientViewFromMrmDataScript.execute(keys, new AciServiceIdentifier(accountUuid), deviceId),
|
keys -> removeRecipientViewFromMrmDataScript.execute(keys, new AciServiceIdentifier(accountUuid), deviceId),
|
||||||
REMOVE_MRM_RECIPIENT_VIEW_CONCURRENCY)
|
REMOVE_MRM_RECIPIENT_VIEW_CONCURRENCY)
|
||||||
.subscribe(sharedMrmDataKeyRemovedCounter::increment, e -> logger.warn("Error removing recipient view", e));
|
.doOnNext(sharedMrmDataKeyRemovedCounter::increment)
|
||||||
|
.onErrorResume(e -> {
|
||||||
|
logger.warn("Error removing recipient view", e);
|
||||||
|
return Mono.just(0L);
|
||||||
|
})
|
||||||
|
.then()
|
||||||
|
.doOnTerminate(() -> sample.stop(removeRecipientViewTimer))
|
||||||
|
.subscribe();
|
||||||
}
|
}
|
||||||
|
|
||||||
private Mono<Pair<List<byte[]>, Long>> getNextMessagePage(final UUID destinationUuid,
|
private Mono<Pair<List<byte[]>, Long>> getNextMessagePage(final UUID destinationUuid,
|
||||||
|
|
Loading…
Reference in New Issue