Code cleanup

This commit is contained in:
Ehren Kret 2021-06-08 17:22:45 -05:00
parent 2c33d22a30
commit 827a3af419
3 changed files with 16 additions and 36 deletions

View File

@ -5,7 +5,7 @@
package org.whispersystems.dispatch; package org.whispersystems.dispatch;
public interface DispatchChannel { public interface DispatchChannel {
public void onDispatchMessage(String channel, byte[] message); void onDispatchMessage(String channel, byte[] message);
public void onDispatchSubscribed(String channel); void onDispatchSubscribed(String channel);
public void onDispatchUnsubscribed(String channel); void onDispatchUnsubscribed(String channel);
} }

View File

@ -59,9 +59,7 @@ public class DispatchManager extends Thread {
logger.warn("Subscription error", e); logger.warn("Subscription error", e);
} }
if (previous.isPresent()) { previous.ifPresent(channel -> dispatchUnsubscription(name, channel));
dispatchUnsubscription(name, previous.get());
}
} }
public synchronized void unsubscribe(String name, DispatchChannel channel) { public synchronized void unsubscribe(String name, DispatchChannel channel) {
@ -132,9 +130,7 @@ public class DispatchManager extends Thread {
} }
private void resubscribeAll() { private void resubscribeAll() {
new Thread() { new Thread(() -> {
@Override
public void run() {
synchronized (DispatchManager.this) { synchronized (DispatchManager.this) {
try { try {
for (String name : subscriptions.keySet()) { for (String name : subscriptions.keySet()) {
@ -144,34 +140,18 @@ public class DispatchManager extends Thread {
logger.warn("***** RESUBSCRIPTION ERROR *****", e); logger.warn("***** RESUBSCRIPTION ERROR *****", e);
} }
} }
} }).start();
}.start();
} }
private void dispatchMessage(final String name, final DispatchChannel channel, final byte[] message) { private void dispatchMessage(final String name, final DispatchChannel channel, final byte[] message) {
executor.execute(new Runnable() { executor.execute(() -> channel.onDispatchMessage(name, message));
@Override
public void run() {
channel.onDispatchMessage(name, message);
}
});
} }
private void dispatchSubscription(final String name, final DispatchChannel channel) { private void dispatchSubscription(final String name, final DispatchChannel channel) {
executor.execute(new Runnable() { executor.execute(() -> channel.onDispatchSubscribed(name));
@Override
public void run() {
channel.onDispatchSubscribed(name);
}
});
} }
private void dispatchUnsubscription(final String name, final DispatchChannel channel) { private void dispatchUnsubscription(final String name, final DispatchChannel channel) {
executor.execute(new Runnable() { executor.execute(() -> channel.onDispatchUnsubscribed(name));
@Override
public void run() {
channel.onDispatchUnsubscribed(name);
}
});
} }
} }

View File

@ -8,6 +8,6 @@ import org.whispersystems.dispatch.redis.PubSubConnection;
public interface RedisPubSubConnectionFactory { public interface RedisPubSubConnectionFactory {
public PubSubConnection connect(); PubSubConnection connect();
} }