diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java index 772bb98ba..5a7b85c65 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/ClientPresenceManager.java @@ -107,10 +107,7 @@ public class ClientPresenceManager extends RedisClusterPubSubAdapter event instanceof ClusterTopologyChangedEvent) - .handle((event, sink) -> { - resubscribeAll(); - sink.next(event); - }); + .subscribe(event -> resubscribeAll()); final String presenceChannel = getManagerPresenceChannel(managerId); final int slot = SlotHash.getSlot(presenceChannel); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index 887f8ee33..3fc85448a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -115,10 +115,7 @@ public class MessagesCache extends RedisClusterPubSubAdapter imp connection.addListener(this); connection.getResources().eventBus().get() .filter(event -> event instanceof ClusterTopologyChangedEvent) - .handle((event, sink) -> { - resubscribeAll(); - sink.next(event); - }); + .subscribe(event -> resubscribeAll()); }); }