Drop the old (and now unused!) `redis-dispatch` module

This commit is contained in:
Jon Chambers 2022-08-16 13:21:36 -04:00 committed by Jon Chambers
parent 11829d1f9f
commit fd5e9ea016
22 changed files with 23 additions and 1275 deletions

View File

@ -33,7 +33,6 @@
<module>api-doc</module>
<module>event-logger</module>
<module>integration-tests</module>
<module>redis-dispatch</module>
<module>service</module>
<module>websocket-resources</module>
</modules>

View File

@ -1,20 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>TextSecureServer</artifactId>
<groupId>org.whispersystems.textsecure</groupId>
<version>JGITVER</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>redis-dispatch</artifactId>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -1,11 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.dispatch;
public interface DispatchChannel {
void onDispatchMessage(String channel, byte[] message);
void onDispatchSubscribed(String channel);
void onDispatchUnsubscribed(String channel);
}

View File

@ -1,157 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.dispatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.dispatch.io.RedisPubSubConnectionFactory;
import org.whispersystems.dispatch.redis.PubSubConnection;
import org.whispersystems.dispatch.redis.PubSubReply;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public class DispatchManager extends Thread {
private final Logger logger = LoggerFactory.getLogger(DispatchManager.class);
private final Executor executor = Executors.newCachedThreadPool();
private final Map<String, DispatchChannel> subscriptions = new ConcurrentHashMap<>();
private final Optional<DispatchChannel> deadLetterChannel;
private final RedisPubSubConnectionFactory redisPubSubConnectionFactory;
private PubSubConnection pubSubConnection;
private volatile boolean running;
public DispatchManager(RedisPubSubConnectionFactory redisPubSubConnectionFactory,
Optional<DispatchChannel> deadLetterChannel)
{
this.redisPubSubConnectionFactory = redisPubSubConnectionFactory;
this.deadLetterChannel = deadLetterChannel;
}
@Override
public void start() {
this.pubSubConnection = redisPubSubConnectionFactory.connect();
this.running = true;
super.start();
}
public void shutdown() {
this.running = false;
this.pubSubConnection.close();
}
public synchronized void subscribe(String name, DispatchChannel dispatchChannel) {
Optional<DispatchChannel> previous = Optional.ofNullable(subscriptions.get(name));
subscriptions.put(name, dispatchChannel);
try {
pubSubConnection.subscribe(name);
} catch (IOException e) {
logger.warn("Subscription error", e);
}
previous.ifPresent(channel -> dispatchUnsubscription(name, channel));
}
public synchronized void unsubscribe(String name, DispatchChannel channel) {
Optional<DispatchChannel> subscription = Optional.ofNullable(subscriptions.get(name));
if (subscription.isPresent() && subscription.get() == channel) {
subscriptions.remove(name);
try {
pubSubConnection.unsubscribe(name);
} catch (IOException e) {
logger.warn("Unsubscribe error", e);
}
dispatchUnsubscription(name, subscription.get());
}
}
public boolean hasSubscription(String name) {
return subscriptions.containsKey(name);
}
@Override
public void run() {
while (running) {
try {
PubSubReply reply = pubSubConnection.read();
switch (reply.getType()) {
case UNSUBSCRIBE: break;
case SUBSCRIBE: dispatchSubscribe(reply); break;
case MESSAGE: dispatchMessage(reply); break;
default: throw new AssertionError("Unknown pubsub reply type! " + reply.getType());
}
} catch (IOException e) {
logger.warn("***** PubSub Connection Error *****", e);
if (running) {
this.pubSubConnection.close();
this.pubSubConnection = redisPubSubConnectionFactory.connect();
resubscribeAll();
}
}
}
logger.warn("DispatchManager Shutting Down...");
}
private void dispatchSubscribe(final PubSubReply reply) {
Optional<DispatchChannel> subscription = Optional.ofNullable(subscriptions.get(reply.getChannel()));
if (subscription.isPresent()) {
dispatchSubscription(reply.getChannel(), subscription.get());
} else {
logger.info("Received subscribe event for non-existing channel: " + reply.getChannel());
}
}
private void dispatchMessage(PubSubReply reply) {
Optional<DispatchChannel> subscription = Optional.ofNullable(subscriptions.get(reply.getChannel()));
if (subscription.isPresent()) {
dispatchMessage(reply.getChannel(), subscription.get(), reply.getContent().get());
} else if (deadLetterChannel.isPresent()) {
dispatchMessage(reply.getChannel(), deadLetterChannel.get(), reply.getContent().get());
} else {
logger.warn("Received message for non-existing channel, with no dead letter handler: " + reply.getChannel());
}
}
private void resubscribeAll() {
new Thread(() -> {
synchronized (DispatchManager.this) {
try {
for (String name : subscriptions.keySet()) {
pubSubConnection.subscribe(name);
}
} catch (IOException e) {
logger.warn("***** RESUBSCRIPTION ERROR *****", e);
}
}
}).start();
}
private void dispatchMessage(final String name, final DispatchChannel channel, final byte[] message) {
executor.execute(() -> channel.onDispatchMessage(name, message));
}
private void dispatchSubscription(final String name, final DispatchChannel channel) {
executor.execute(() -> channel.onDispatchSubscribed(name));
}
private void dispatchUnsubscription(final String name, final DispatchChannel channel) {
executor.execute(() -> channel.onDispatchUnsubscribed(name));
}
}

View File

@ -1,68 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.dispatch.io;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
public class RedisInputStream {
private static final byte CR = 0x0D;
private static final byte LF = 0x0A;
private final InputStream inputStream;
public RedisInputStream(InputStream inputStream) {
this.inputStream = inputStream;
}
public String readLine() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
boolean foundCr = false;
while (true) {
int character = inputStream.read();
if (character == -1) {
throw new IOException("Stream closed!");
}
baos.write(character);
if (foundCr && character == LF) break;
else if (character == CR) foundCr = true;
else if (foundCr) foundCr = false;
}
byte[] data = baos.toByteArray();
return new String(data, 0, data.length-2);
}
public byte[] readFully(int size) throws IOException {
byte[] result = new byte[size];
int offset = 0;
int remaining = result.length;
while (remaining > 0) {
int read = inputStream.read(result, offset, remaining);
if (read < 0) {
throw new IOException("Stream closed!");
}
offset += read;
remaining -= read;
}
return result;
}
public void close() throws IOException {
inputStream.close();
}
}

View File

@ -1,13 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.dispatch.io;
import org.whispersystems.dispatch.redis.PubSubConnection;
public interface RedisPubSubConnectionFactory {
PubSubConnection connect();
}

View File

@ -1,123 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.dispatch.redis;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.dispatch.io.RedisInputStream;
import org.whispersystems.dispatch.redis.protocol.ArrayReplyHeader;
import org.whispersystems.dispatch.redis.protocol.IntReply;
import org.whispersystems.dispatch.redis.protocol.StringReplyHeader;
import org.whispersystems.dispatch.util.Util;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
public class PubSubConnection {
private final Logger logger = LoggerFactory.getLogger(PubSubConnection.class);
private static final byte[] UNSUBSCRIBE_TYPE = {'u', 'n', 's', 'u', 'b', 's', 'c', 'r', 'i', 'b', 'e' };
private static final byte[] SUBSCRIBE_TYPE = {'s', 'u', 'b', 's', 'c', 'r', 'i', 'b', 'e' };
private static final byte[] MESSAGE_TYPE = {'m', 'e', 's', 's', 'a', 'g', 'e' };
private static final byte[] SUBSCRIBE_COMMAND = {'S', 'U', 'B', 'S', 'C', 'R', 'I', 'B', 'E', ' ' };
private static final byte[] UNSUBSCRIBE_COMMAND = {'U', 'N', 'S', 'U', 'B', 'S', 'C', 'R', 'I', 'B', 'E', ' '};
private static final byte[] CRLF = {'\r', '\n' };
private final OutputStream outputStream;
private final RedisInputStream inputStream;
private final Socket socket;
private final AtomicBoolean closed;
public PubSubConnection(Socket socket) throws IOException {
this.socket = socket;
this.outputStream = socket.getOutputStream();
this.inputStream = new RedisInputStream(new BufferedInputStream(socket.getInputStream()));
this.closed = new AtomicBoolean(false);
}
public void subscribe(String channelName) throws IOException {
if (closed.get()) throw new IOException("Connection closed!");
byte[] command = Util.combine(SUBSCRIBE_COMMAND, channelName.getBytes(), CRLF);
outputStream.write(command);
}
public void unsubscribe(String channelName) throws IOException {
if (closed.get()) throw new IOException("Connection closed!");
byte[] command = Util.combine(UNSUBSCRIBE_COMMAND, channelName.getBytes(), CRLF);
outputStream.write(command);
}
public PubSubReply read() throws IOException {
if (closed.get()) throw new IOException("Connection closed!");
ArrayReplyHeader replyHeader = new ArrayReplyHeader(inputStream.readLine());
if (replyHeader.getElementCount() != 3) {
throw new IOException("Received array reply header with strange count: " + replyHeader.getElementCount());
}
StringReplyHeader replyTypeHeader = new StringReplyHeader(inputStream.readLine());
byte[] replyType = inputStream.readFully(replyTypeHeader.getStringLength());
inputStream.readLine();
if (Arrays.equals(SUBSCRIBE_TYPE, replyType)) return readSubscribeReply();
else if (Arrays.equals(UNSUBSCRIBE_TYPE, replyType)) return readUnsubscribeReply();
else if (Arrays.equals(MESSAGE_TYPE, replyType)) return readMessageReply();
else throw new IOException("Unknown reply type: " + new String(replyType));
}
public void close() {
try {
this.closed.set(true);
this.inputStream.close();
this.outputStream.close();
this.socket.close();
} catch (IOException e) {
logger.warn("Exception while closing", e);
}
}
private PubSubReply readMessageReply() throws IOException {
StringReplyHeader channelNameHeader = new StringReplyHeader(inputStream.readLine());
byte[] channelName = inputStream.readFully(channelNameHeader.getStringLength());
inputStream.readLine();
StringReplyHeader messageHeader = new StringReplyHeader(inputStream.readLine());
byte[] message = inputStream.readFully(messageHeader.getStringLength());
inputStream.readLine();
return new PubSubReply(PubSubReply.Type.MESSAGE, new String(channelName), Optional.of(message));
}
private PubSubReply readUnsubscribeReply() throws IOException {
String channelName = readSubscriptionReply();
return new PubSubReply(PubSubReply.Type.UNSUBSCRIBE, channelName, Optional.empty());
}
private PubSubReply readSubscribeReply() throws IOException {
String channelName = readSubscriptionReply();
return new PubSubReply(PubSubReply.Type.SUBSCRIBE, channelName, Optional.empty());
}
private String readSubscriptionReply() throws IOException {
StringReplyHeader channelNameHeader = new StringReplyHeader(inputStream.readLine());
byte[] channelName = inputStream.readFully(channelNameHeader.getStringLength());
inputStream.readLine();
new IntReply(inputStream.readLine());
return new String(channelName);
}
}

View File

@ -1,40 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.dispatch.redis;
import java.util.Optional;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public class PubSubReply {
public enum Type {
MESSAGE,
SUBSCRIBE,
UNSUBSCRIBE
}
private final Type type;
private final String channel;
private final Optional<byte[]> content;
public PubSubReply(Type type, String channel, Optional<byte[]> content) {
this.type = type;
this.channel = channel;
this.content = content;
}
public Type getType() {
return type;
}
public String getChannel() {
return channel;
}
public Optional<byte[]> getContent() {
return content;
}
}

View File

@ -1,28 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.dispatch.redis.protocol;
import java.io.IOException;
public class ArrayReplyHeader {
private final int elementCount;
public ArrayReplyHeader(String header) throws IOException {
if (header == null || header.length() < 2 || header.charAt(0) != '*') {
throw new IOException("Invalid array reply header: " + header);
}
try {
this.elementCount = Integer.parseInt(header.substring(1));
} catch (NumberFormatException e) {
throw new IOException(e);
}
}
public int getElementCount() {
return elementCount;
}
}

View File

@ -1,28 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.dispatch.redis.protocol;
import java.io.IOException;
public class IntReply {
private final int value;
public IntReply(String reply) throws IOException {
if (reply == null || reply.length() < 2 || reply.charAt(0) != ':') {
throw new IOException("Invalid int reply: " + reply);
}
try {
this.value = Integer.parseInt(reply.substring(1));
} catch (NumberFormatException e) {
throw new IOException(e);
}
}
public int getValue() {
return value;
}
}

View File

@ -1,28 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.dispatch.redis.protocol;
import java.io.IOException;
public class StringReplyHeader {
private final int stringLength;
public StringReplyHeader(String header) throws IOException {
if (header == null || header.length() < 2 || header.charAt(0) != '$') {
throw new IOException("Invalid string reply header: " + header);
}
try {
this.stringLength = Integer.parseInt(header.substring(1));
} catch (NumberFormatException e) {
throw new IOException(e);
}
}
public int getStringLength() {
return stringLength;
}
}

View File

@ -1,40 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.dispatch.util;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
public class Util {
public static byte[] combine(byte[]... elements) {
try {
int sum = 0;
for (byte[] element : elements) {
sum += element.length;
}
ByteArrayOutputStream baos = new ByteArrayOutputStream(sum);
for (byte[] element : elements) {
baos.write(element);
}
return baos.toByteArray();
} catch (IOException e) {
throw new AssertionError(e);
}
}
public static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}
}

View File

@ -1,123 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.dispatch;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.stubbing.Answer;
import org.whispersystems.dispatch.io.RedisPubSubConnectionFactory;
import org.whispersystems.dispatch.redis.PubSubConnection;
import org.whispersystems.dispatch.redis.PubSubReply;
public class DispatchManagerTest {
private PubSubConnection pubSubConnection;
private RedisPubSubConnectionFactory socketFactory;
private DispatchManager dispatchManager;
private PubSubReplyInputStream pubSubReplyInputStream;
@BeforeEach
void setUp() throws Exception {
pubSubConnection = mock(PubSubConnection.class );
socketFactory = mock(RedisPubSubConnectionFactory.class);
pubSubReplyInputStream = new PubSubReplyInputStream();
when(socketFactory.connect()).thenReturn(pubSubConnection);
when(pubSubConnection.read()).thenAnswer((Answer<PubSubReply>) invocationOnMock -> pubSubReplyInputStream.read());
dispatchManager = new DispatchManager(socketFactory, Optional.empty());
dispatchManager.start();
}
@AfterEach
void tearDown() {
dispatchManager.shutdown();
}
@Test
public void testConnect() {
verify(socketFactory).connect();
}
@Test
public void testSubscribe() {
DispatchChannel dispatchChannel = mock(DispatchChannel.class);
dispatchManager.subscribe("foo", dispatchChannel);
pubSubReplyInputStream.write(new PubSubReply(PubSubReply.Type.SUBSCRIBE, "foo", Optional.empty()));
verify(dispatchChannel, timeout(1000)).onDispatchSubscribed(eq("foo"));
}
@Test
public void testSubscribeUnsubscribe() {
DispatchChannel dispatchChannel = mock(DispatchChannel.class);
dispatchManager.subscribe("foo", dispatchChannel);
dispatchManager.unsubscribe("foo", dispatchChannel);
pubSubReplyInputStream.write(new PubSubReply(PubSubReply.Type.SUBSCRIBE, "foo", Optional.empty()));
pubSubReplyInputStream.write(new PubSubReply(PubSubReply.Type.UNSUBSCRIBE, "foo", Optional.empty()));
verify(dispatchChannel, timeout(1000)).onDispatchUnsubscribed(eq("foo"));
}
@Test
public void testMessages() {
DispatchChannel fooChannel = mock(DispatchChannel.class);
DispatchChannel barChannel = mock(DispatchChannel.class);
dispatchManager.subscribe("foo", fooChannel);
dispatchManager.subscribe("bar", barChannel);
pubSubReplyInputStream.write(new PubSubReply(PubSubReply.Type.SUBSCRIBE, "foo", Optional.empty()));
pubSubReplyInputStream.write(new PubSubReply(PubSubReply.Type.SUBSCRIBE, "bar", Optional.empty()));
verify(fooChannel, timeout(1000)).onDispatchSubscribed(eq("foo"));
verify(barChannel, timeout(1000)).onDispatchSubscribed(eq("bar"));
pubSubReplyInputStream.write(new PubSubReply(PubSubReply.Type.MESSAGE, "foo", Optional.of("hello".getBytes())));
pubSubReplyInputStream.write(new PubSubReply(PubSubReply.Type.MESSAGE, "bar", Optional.of("there".getBytes())));
ArgumentCaptor<byte[]> captor = ArgumentCaptor.forClass(byte[].class);
verify(fooChannel, timeout(1000)).onDispatchMessage(eq("foo"), captor.capture());
assertArrayEquals("hello".getBytes(), captor.getValue());
verify(barChannel, timeout(1000)).onDispatchMessage(eq("bar"), captor.capture());
assertArrayEquals("there".getBytes(), captor.getValue());
}
private static class PubSubReplyInputStream {
private final List<PubSubReply> pubSubReplyList = new LinkedList<>();
public synchronized PubSubReply read() {
try {
while (pubSubReplyList.isEmpty()) wait();
return pubSubReplyList.remove(0);
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}
public synchronized void write(PubSubReply pubSubReply) {
pubSubReplyList.add(pubSubReply);
notifyAll();
}
}
}

View File

@ -1,264 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.dispatch.redis;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.security.SecureRandom;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
class PubSubConnectionTest {
private static final String REPLY = "*3\r\n" +
"$9\r\n" +
"subscribe\r\n" +
"$5\r\n" +
"abcde\r\n" +
":1\r\n" +
"*3\r\n" +
"$9\r\n" +
"subscribe\r\n" +
"$5\r\n" +
"fghij\r\n" +
":2\r\n" +
"*3\r\n" +
"$9\r\n" +
"subscribe\r\n" +
"$5\r\n" +
"klmno\r\n" +
":2\r\n" +
"*3\r\n" +
"$7\r\n" +
"message\r\n" +
"$5\r\n" +
"abcde\r\n" +
"$10\r\n" +
"1234567890\r\n" +
"*3\r\n" +
"$7\r\n" +
"message\r\n" +
"$5\r\n" +
"klmno\r\n" +
"$10\r\n" +
"0987654321\r\n";
@Test
void testSubscribe() throws IOException {
OutputStream outputStream = mock(OutputStream.class);
Socket socket = mock(Socket.class );
when(socket.getOutputStream()).thenReturn(outputStream);
PubSubConnection connection = new PubSubConnection(socket);
connection.subscribe("foobar");
ArgumentCaptor<byte[]> captor = ArgumentCaptor.forClass(byte[].class);
verify(outputStream).write(captor.capture());
assertArrayEquals(captor.getValue(), "SUBSCRIBE foobar\r\n".getBytes());
}
@Test
void testUnsubscribe() throws IOException {
OutputStream outputStream = mock(OutputStream.class);
Socket socket = mock(Socket.class );
when(socket.getOutputStream()).thenReturn(outputStream);
PubSubConnection connection = new PubSubConnection(socket);
connection.unsubscribe("bazbar");
ArgumentCaptor<byte[]> captor = ArgumentCaptor.forClass(byte[].class);
verify(outputStream).write(captor.capture());
assertArrayEquals(captor.getValue(), "UNSUBSCRIBE bazbar\r\n".getBytes());
}
@Test
void testTricklyResponse() throws Exception {
InputStream inputStream = mockInputStreamFor(new TrickleInputStream(REPLY.getBytes()));
OutputStream outputStream = mock(OutputStream.class);
Socket socket = mock(Socket.class );
when(socket.getOutputStream()).thenReturn(outputStream);
when(socket.getInputStream()).thenReturn(inputStream);
PubSubConnection pubSubConnection = new PubSubConnection(socket);
readResponses(pubSubConnection);
}
@Test
void testFullResponse() throws Exception {
InputStream inputStream = mockInputStreamFor(new FullInputStream(REPLY.getBytes()));
OutputStream outputStream = mock(OutputStream.class);
Socket socket = mock(Socket.class );
when(socket.getOutputStream()).thenReturn(outputStream);
when(socket.getInputStream()).thenReturn(inputStream);
PubSubConnection pubSubConnection = new PubSubConnection(socket);
readResponses(pubSubConnection);
}
@Test
void testRandomLengthResponse() throws Exception {
InputStream inputStream = mockInputStreamFor(new RandomInputStream(REPLY.getBytes()));
OutputStream outputStream = mock(OutputStream.class);
Socket socket = mock(Socket.class );
when(socket.getOutputStream()).thenReturn(outputStream);
when(socket.getInputStream()).thenReturn(inputStream);
PubSubConnection pubSubConnection = new PubSubConnection(socket);
readResponses(pubSubConnection);
}
private InputStream mockInputStreamFor(final MockInputStream stub) throws IOException {
InputStream result = mock(InputStream.class);
when(result.read()).thenAnswer(new Answer<Integer>() {
@Override
public Integer answer(InvocationOnMock invocationOnMock) throws Throwable {
return stub.read();
}
});
when(result.read(any(byte[].class))).thenAnswer(new Answer<Integer>() {
@Override
public Integer answer(InvocationOnMock invocationOnMock) throws Throwable {
byte[] buffer = (byte[])invocationOnMock.getArguments()[0];
return stub.read(buffer, 0, buffer.length);
}
});
when(result.read(any(byte[].class), anyInt(), anyInt())).thenAnswer(new Answer<Integer>() {
@Override
public Integer answer(InvocationOnMock invocationOnMock) throws Throwable {
byte[] buffer = (byte[]) invocationOnMock.getArguments()[0];
int offset = (int) invocationOnMock.getArguments()[1];
int length = (int) invocationOnMock.getArguments()[2];
return stub.read(buffer, offset, length);
}
});
return result;
}
private void readResponses(PubSubConnection pubSubConnection) throws Exception {
PubSubReply reply = pubSubConnection.read();
assertEquals(reply.getType(), PubSubReply.Type.SUBSCRIBE);
assertEquals(reply.getChannel(), "abcde");
assertFalse(reply.getContent().isPresent());
reply = pubSubConnection.read();
assertEquals(reply.getType(), PubSubReply.Type.SUBSCRIBE);
assertEquals(reply.getChannel(), "fghij");
assertFalse(reply.getContent().isPresent());
reply = pubSubConnection.read();
assertEquals(reply.getType(), PubSubReply.Type.SUBSCRIBE);
assertEquals(reply.getChannel(), "klmno");
assertFalse(reply.getContent().isPresent());
reply = pubSubConnection.read();
assertEquals(reply.getType(), PubSubReply.Type.MESSAGE);
assertEquals(reply.getChannel(), "abcde");
assertArrayEquals(reply.getContent().get(), "1234567890".getBytes());
reply = pubSubConnection.read();
assertEquals(reply.getType(), PubSubReply.Type.MESSAGE);
assertEquals(reply.getChannel(), "klmno");
assertArrayEquals(reply.getContent().get(), "0987654321".getBytes());
}
private interface MockInputStream {
public int read();
public int read(byte[] input, int offset, int length);
}
private static class TrickleInputStream implements MockInputStream {
private final byte[] data;
private int index = 0;
private TrickleInputStream(byte[] data) {
this.data = data;
}
public int read() {
return data[index++];
}
public int read(byte[] input, int offset, int length) {
input[offset] = data[index++];
return 1;
}
}
private static class FullInputStream implements MockInputStream {
private final byte[] data;
private int index = 0;
private FullInputStream(byte[] data) {
this.data = data;
}
public int read() {
return data[index++];
}
public int read(byte[] input, int offset, int length) {
int amount = Math.min(data.length - index, length);
System.arraycopy(data, index, input, offset, amount);
index += length;
return amount;
}
}
private static class RandomInputStream implements MockInputStream {
private final byte[] data;
private int index = 0;
private RandomInputStream(byte[] data) {
this.data = data;
}
public int read() {
return data[index++];
}
public int read(byte[] input, int offset, int length) {
int maxCopy = Math.min(data.length - index, length);
int randomCopy = new SecureRandom().nextInt(maxCopy) + 1;
int copyAmount = Math.min(maxCopy, randomCopy);
System.arraycopy(data, index, input, offset, copyAmount);
index += copyAmount;
return copyAmount;
}
}
}

View File

@ -1,54 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.dispatch.redis.protocol;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.io.IOException;
import org.junit.jupiter.api.Test;
class ArrayReplyHeaderTest {
@Test
void testNull() {
assertThrows(IOException.class, () -> new ArrayReplyHeader(null));
}
@Test
void testBadPrefix() {
assertThrows(IOException.class, () -> new ArrayReplyHeader(":3"));
}
@Test
void testEmpty() {
assertThrows(IOException.class, () -> new ArrayReplyHeader(""));
}
@Test
void testTruncated() {
assertThrows(IOException.class, () -> new ArrayReplyHeader("*"));
}
@Test
void testBadNumber() {
assertThrows(IOException.class, () -> new ArrayReplyHeader("*ABC"));
}
@Test
void testValid() throws IOException {
assertEquals(4, new ArrayReplyHeader("*4").getElementCount());
}
}

View File

@ -1,39 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.dispatch.redis.protocol;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.io.IOException;
import org.junit.jupiter.api.Test;
class IntReplyHeaderTest {
@Test
void testNull() {
assertThrows(IOException.class, () -> new IntReply(null));
}
@Test
void testEmpty() {
assertThrows(IOException.class, () -> new IntReply(""));
}
@Test
void testBadNumber() {
assertThrows(IOException.class, () -> new IntReply(":A"));
}
@Test
void testBadFormat() {
assertThrows(IOException.class, () -> new IntReply("*"));
}
@Test
void testValid() throws IOException {
assertEquals(23, new IntReply(":23").getValue());
}
}

View File

@ -1,35 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.dispatch.redis.protocol;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.io.IOException;
import org.junit.jupiter.api.Test;
class StringReplyHeaderTest {
@Test
void testNull() {
assertThrows(IOException.class, () -> new StringReplyHeader(null));
}
@Test
void testBadNumber() {
assertThrows(IOException.class, () -> new StringReplyHeader("$100A"));
}
@Test
void testBadPrefix() {
assertThrows(IOException.class, () -> new StringReplyHeader("*"));
}
@Test
void testValid() throws IOException {
assertEquals(1000, new StringReplyHeader("$1000").getStringLength());
}
}

View File

@ -41,11 +41,6 @@
<artifactId>event-logger</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.whispersystems.textsecure</groupId>
<artifactId>redis-dispatch</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.whispersystems.textsecure</groupId>
<artifactId>websocket-resources</artifactId>

View File

@ -1,77 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.providers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.dispatch.io.RedisPubSubConnectionFactory;
import org.whispersystems.dispatch.redis.PubSubConnection;
import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.Util;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.LinkedList;
import java.util.List;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Protocol;
public class RedisClientFactory implements RedisPubSubConnectionFactory {
private final Logger logger = LoggerFactory.getLogger(RedisClientFactory.class);
private final String host;
private final int port;
private final ReplicatedJedisPool jedisPool;
public RedisClientFactory(String name, String url, List<String> replicaUrls, CircuitBreakerConfiguration circuitBreakerConfiguration)
throws URISyntaxException
{
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setTestOnBorrow(true);
poolConfig.setMaxWaitMillis(10000);
URI redisURI = new URI(url);
this.host = redisURI.getHost();
this.port = redisURI.getPort();
JedisPool masterPool = new JedisPool(poolConfig, host, port, Protocol.DEFAULT_TIMEOUT, null);
List<JedisPool> replicaPools = new LinkedList<>();
for (String replicaUrl : replicaUrls) {
URI replicaURI = new URI(replicaUrl);
replicaPools.add(new JedisPool(poolConfig, replicaURI.getHost(), replicaURI.getPort(),
500, Protocol.DEFAULT_TIMEOUT, null,
Protocol.DEFAULT_DATABASE, null, false, null ,
null, null));
}
this.jedisPool = new ReplicatedJedisPool(name, masterPool, replicaPools, circuitBreakerConfiguration);
}
public ReplicatedJedisPool getRedisClientPool() {
return jedisPool;
}
@Override
public PubSubConnection connect() {
while (true) {
try {
Socket socket = new Socket(host, port);
return new PubSubConnection(socket);
} catch (IOException e) {
logger.warn("Error connecting", e);
Util.sleep(200);
}
}
}
}

View File

@ -1,119 +0,0 @@
/*
* Copyright 2013-2020 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.storage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.dispatch.DispatchChannel;
import org.whispersystems.dispatch.DispatchManager;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import io.dropwizard.lifecycle.Managed;
import static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage;
import redis.clients.jedis.Jedis;
public class PubSubManager implements Managed {
private static final String KEEPALIVE_CHANNEL = "KEEPALIVE";
private final Logger logger = LoggerFactory.getLogger(PubSubManager.class);
private final DispatchManager dispatchManager;
private final ReplicatedJedisPool jedisPool;
private boolean subscribed = false;
public PubSubManager(ReplicatedJedisPool jedisPool, DispatchManager dispatchManager) {
this.dispatchManager = dispatchManager;
this.jedisPool = jedisPool;
}
@Override
public void start() throws Exception {
this.dispatchManager.start();
KeepaliveDispatchChannel keepaliveDispatchChannel = new KeepaliveDispatchChannel();
this.dispatchManager.subscribe(KEEPALIVE_CHANNEL, keepaliveDispatchChannel);
synchronized (this) {
while (!subscribed) wait(0);
}
new KeepaliveSender().start();
}
@Override
public void stop() throws Exception {
dispatchManager.shutdown();
}
public void subscribe(PubSubAddress address, DispatchChannel channel) {
dispatchManager.subscribe(address.serialize(), channel);
}
public void unsubscribe(PubSubAddress address, DispatchChannel dispatchChannel) {
dispatchManager.unsubscribe(address.serialize(), dispatchChannel);
}
public boolean hasLocalSubscription(PubSubAddress address) {
return dispatchManager.hasSubscription(address.serialize());
}
public boolean publish(PubSubAddress address, PubSubMessage message) {
return publish(address.serialize().getBytes(), message);
}
private boolean publish(byte[] channel, PubSubMessage message) {
try (Jedis jedis = jedisPool.getWriteResource()) {
long result = jedis.publish(channel, message.toByteArray());
if (result < 0) {
logger.warn("**** Jedis publish result < 0");
}
return result > 0;
}
}
private class KeepaliveDispatchChannel implements DispatchChannel {
@Override
public void onDispatchMessage(String channel, byte[] message) {
// Good
}
@Override
public void onDispatchSubscribed(String channel) {
if (KEEPALIVE_CHANNEL.equals(channel)) {
synchronized (PubSubManager.this) {
subscribed = true;
PubSubManager.this.notifyAll();
}
}
}
@Override
public void onDispatchUnsubscribed(String channel) {
logger.warn("***** KEEPALIVE CHANNEL UNSUBSCRIBED *****");
}
}
private class KeepaliveSender extends Thread {
@Override
public void run() {
while (true) {
try {
Thread.sleep(20000);
publish(KEEPALIVE_CHANNEL.getBytes(), PubSubMessage.newBuilder()
.setType(PubSubMessage.Type.KEEPALIVE)
.build());
} catch (Throwable e) {
logger.warn("***** KEEPALIVE EXCEPTION ******", e);
}
}
}
}
}

View File

@ -6,11 +6,16 @@
package org.whispersystems.textsecuregcm.subscriptions;
import java.nio.charset.StandardCharsets;
import org.whispersystems.dispatch.util.Util;
public record ProcessorCustomer(String customerId, SubscriptionProcessor processor) {
public byte[] toDynamoBytes() {
return Util.combine(new byte[]{processor.getId()}, customerId.getBytes(StandardCharsets.UTF_8));
final byte[] customerIdBytes = customerId.getBytes(StandardCharsets.UTF_8);
final byte[] combinedBytes = new byte[customerIdBytes.length + 1];
combinedBytes[0] = processor.getId();
System.arraycopy(customerIdBytes, 0, combinedBytes, 1, customerIdBytes.length);
return combinedBytes;
}
}

View File

@ -0,0 +1,16 @@
package org.whispersystems.textsecuregcm.subscriptions;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
class ProcessorCustomerTest {
@Test
void toDynamoBytes() {
final ProcessorCustomer processorCustomer = new ProcessorCustomer("Test", SubscriptionProcessor.BRAINTREE);
assertArrayEquals(new byte[] { SubscriptionProcessor.BRAINTREE.getId(), 'T', 'e', 's', 't' },
processorCustomer.toDynamoBytes());
}
}