Check for cached/persisted messages in parallel
This commit is contained in:
parent
4ee67064bb
commit
56fdebde75
|
@ -22,6 +22,7 @@ import org.reactivestreams.Publisher;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
|
||||||
|
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
|
||||||
import org.whispersystems.textsecuregcm.util.Pair;
|
import org.whispersystems.textsecuregcm.util.Pair;
|
||||||
import reactor.core.observability.micrometer.Micrometer;
|
import reactor.core.observability.micrometer.Micrometer;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
|
@ -37,6 +38,9 @@ public class MessagesManager {
|
||||||
private static final Counter PERSIST_MESSAGE_COUNTER = Metrics.counter(
|
private static final Counter PERSIST_MESSAGE_COUNTER = Metrics.counter(
|
||||||
name(MessagesManager.class, "persistMessage"));
|
name(MessagesManager.class, "persistMessage"));
|
||||||
|
|
||||||
|
private static final String MAY_HAVE_MESSAGES_COUNTER_NAME =
|
||||||
|
MetricsUtil.name(MessagesManager.class, "mayHaveMessages");
|
||||||
|
|
||||||
private final MessagesDynamoDb messagesDynamoDb;
|
private final MessagesDynamoDb messagesDynamoDb;
|
||||||
private final MessagesCache messagesCache;
|
private final MessagesCache messagesCache;
|
||||||
private final ReportMessageManager reportMessageManager;
|
private final ReportMessageManager reportMessageManager;
|
||||||
|
@ -65,9 +69,24 @@ public class MessagesManager {
|
||||||
|
|
||||||
public CompletableFuture<Boolean> mayHaveMessages(final UUID destinationUuid, final Device destinationDevice) {
|
public CompletableFuture<Boolean> mayHaveMessages(final UUID destinationUuid, final Device destinationDevice) {
|
||||||
return messagesCache.hasMessagesAsync(destinationUuid, destinationDevice.getId())
|
return messagesCache.hasMessagesAsync(destinationUuid, destinationDevice.getId())
|
||||||
.thenCompose(hasMessages -> hasMessages
|
.thenCombine(messagesDynamoDb.mayHaveMessages(destinationUuid, destinationDevice),
|
||||||
? CompletableFuture.completedFuture(true)
|
(mayHaveCachedMessages, mayHavePersistedMessages) -> {
|
||||||
: messagesDynamoDb.mayHaveMessages(destinationUuid, destinationDevice));
|
final String outcome;
|
||||||
|
|
||||||
|
if (mayHaveCachedMessages && mayHavePersistedMessages) {
|
||||||
|
outcome = "both";
|
||||||
|
} else if (mayHaveCachedMessages) {
|
||||||
|
outcome = "cached";
|
||||||
|
} else if (mayHavePersistedMessages) {
|
||||||
|
outcome = "persisted";
|
||||||
|
} else {
|
||||||
|
outcome = "none";
|
||||||
|
}
|
||||||
|
|
||||||
|
Metrics.counter(MAY_HAVE_MESSAGES_COUNTER_NAME, "outcome", outcome).increment();
|
||||||
|
|
||||||
|
return mayHaveCachedMessages || mayHavePersistedMessages;
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean hasCachedMessages(final UUID destinationUuid, final byte destinationDevice) {
|
public boolean hasCachedMessages(final UUID destinationUuid, final byte destinationDevice) {
|
||||||
|
|
Loading…
Reference in New Issue