diff --git a/pom.xml b/pom.xml
index 53fc1c1ff..ae29bb099 100644
--- a/pom.xml
+++ b/pom.xml
@@ -33,7 +33,6 @@
api-doc
event-logger
integration-tests
- redis-dispatch
service
websocket-resources
diff --git a/redis-dispatch/pom.xml b/redis-dispatch/pom.xml
deleted file mode 100644
index febe3a782..000000000
--- a/redis-dispatch/pom.xml
+++ /dev/null
@@ -1,20 +0,0 @@
-
-
-
- TextSecureServer
- org.whispersystems.textsecure
- JGITVER
-
- 4.0.0
- redis-dispatch
-
-
-
- org.slf4j
- slf4j-api
-
-
-
-
diff --git a/redis-dispatch/src/main/java/org/whispersystems/dispatch/DispatchChannel.java b/redis-dispatch/src/main/java/org/whispersystems/dispatch/DispatchChannel.java
deleted file mode 100644
index 761c59619..000000000
--- a/redis-dispatch/src/main/java/org/whispersystems/dispatch/DispatchChannel.java
+++ /dev/null
@@ -1,11 +0,0 @@
-/*
- * Copyright 2013-2020 Signal Messenger, LLC
- * SPDX-License-Identifier: AGPL-3.0-only
- */
-package org.whispersystems.dispatch;
-
-public interface DispatchChannel {
- void onDispatchMessage(String channel, byte[] message);
- void onDispatchSubscribed(String channel);
- void onDispatchUnsubscribed(String channel);
-}
diff --git a/redis-dispatch/src/main/java/org/whispersystems/dispatch/DispatchManager.java b/redis-dispatch/src/main/java/org/whispersystems/dispatch/DispatchManager.java
deleted file mode 100644
index 2dbf4de2a..000000000
--- a/redis-dispatch/src/main/java/org/whispersystems/dispatch/DispatchManager.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Copyright 2013-2020 Signal Messenger, LLC
- * SPDX-License-Identifier: AGPL-3.0-only
- */
-package org.whispersystems.dispatch;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.whispersystems.dispatch.io.RedisPubSubConnectionFactory;
-import org.whispersystems.dispatch.redis.PubSubConnection;
-import org.whispersystems.dispatch.redis.PubSubReply;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-
-@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
-public class DispatchManager extends Thread {
-
- private final Logger logger = LoggerFactory.getLogger(DispatchManager.class);
- private final Executor executor = Executors.newCachedThreadPool();
- private final Map subscriptions = new ConcurrentHashMap<>();
-
- private final Optional deadLetterChannel;
- private final RedisPubSubConnectionFactory redisPubSubConnectionFactory;
-
- private PubSubConnection pubSubConnection;
- private volatile boolean running;
-
- public DispatchManager(RedisPubSubConnectionFactory redisPubSubConnectionFactory,
- Optional deadLetterChannel)
- {
- this.redisPubSubConnectionFactory = redisPubSubConnectionFactory;
- this.deadLetterChannel = deadLetterChannel;
- }
-
- @Override
- public void start() {
- this.pubSubConnection = redisPubSubConnectionFactory.connect();
- this.running = true;
- super.start();
- }
-
- public void shutdown() {
- this.running = false;
- this.pubSubConnection.close();
- }
-
- public synchronized void subscribe(String name, DispatchChannel dispatchChannel) {
- Optional previous = Optional.ofNullable(subscriptions.get(name));
- subscriptions.put(name, dispatchChannel);
-
- try {
- pubSubConnection.subscribe(name);
- } catch (IOException e) {
- logger.warn("Subscription error", e);
- }
-
- previous.ifPresent(channel -> dispatchUnsubscription(name, channel));
- }
-
- public synchronized void unsubscribe(String name, DispatchChannel channel) {
- Optional subscription = Optional.ofNullable(subscriptions.get(name));
-
- if (subscription.isPresent() && subscription.get() == channel) {
- subscriptions.remove(name);
-
- try {
- pubSubConnection.unsubscribe(name);
- } catch (IOException e) {
- logger.warn("Unsubscribe error", e);
- }
-
- dispatchUnsubscription(name, subscription.get());
- }
- }
-
- public boolean hasSubscription(String name) {
- return subscriptions.containsKey(name);
- }
-
- @Override
- public void run() {
- while (running) {
- try {
- PubSubReply reply = pubSubConnection.read();
-
- switch (reply.getType()) {
- case UNSUBSCRIBE: break;
- case SUBSCRIBE: dispatchSubscribe(reply); break;
- case MESSAGE: dispatchMessage(reply); break;
- default: throw new AssertionError("Unknown pubsub reply type! " + reply.getType());
- }
- } catch (IOException e) {
- logger.warn("***** PubSub Connection Error *****", e);
- if (running) {
- this.pubSubConnection.close();
- this.pubSubConnection = redisPubSubConnectionFactory.connect();
- resubscribeAll();
- }
- }
- }
-
- logger.warn("DispatchManager Shutting Down...");
- }
-
- private void dispatchSubscribe(final PubSubReply reply) {
- Optional subscription = Optional.ofNullable(subscriptions.get(reply.getChannel()));
-
- if (subscription.isPresent()) {
- dispatchSubscription(reply.getChannel(), subscription.get());
- } else {
- logger.info("Received subscribe event for non-existing channel: " + reply.getChannel());
- }
- }
-
- private void dispatchMessage(PubSubReply reply) {
- Optional subscription = Optional.ofNullable(subscriptions.get(reply.getChannel()));
-
- if (subscription.isPresent()) {
- dispatchMessage(reply.getChannel(), subscription.get(), reply.getContent().get());
- } else if (deadLetterChannel.isPresent()) {
- dispatchMessage(reply.getChannel(), deadLetterChannel.get(), reply.getContent().get());
- } else {
- logger.warn("Received message for non-existing channel, with no dead letter handler: " + reply.getChannel());
- }
- }
-
- private void resubscribeAll() {
- new Thread(() -> {
- synchronized (DispatchManager.this) {
- try {
- for (String name : subscriptions.keySet()) {
- pubSubConnection.subscribe(name);
- }
- } catch (IOException e) {
- logger.warn("***** RESUBSCRIPTION ERROR *****", e);
- }
- }
- }).start();
- }
-
- private void dispatchMessage(final String name, final DispatchChannel channel, final byte[] message) {
- executor.execute(() -> channel.onDispatchMessage(name, message));
- }
-
- private void dispatchSubscription(final String name, final DispatchChannel channel) {
- executor.execute(() -> channel.onDispatchSubscribed(name));
- }
-
- private void dispatchUnsubscription(final String name, final DispatchChannel channel) {
- executor.execute(() -> channel.onDispatchUnsubscribed(name));
- }
-}
diff --git a/redis-dispatch/src/main/java/org/whispersystems/dispatch/io/RedisInputStream.java b/redis-dispatch/src/main/java/org/whispersystems/dispatch/io/RedisInputStream.java
deleted file mode 100644
index 98764c223..000000000
--- a/redis-dispatch/src/main/java/org/whispersystems/dispatch/io/RedisInputStream.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright 2013-2020 Signal Messenger, LLC
- * SPDX-License-Identifier: AGPL-3.0-only
- */
-package org.whispersystems.dispatch.io;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-public class RedisInputStream {
-
- private static final byte CR = 0x0D;
- private static final byte LF = 0x0A;
-
- private final InputStream inputStream;
-
- public RedisInputStream(InputStream inputStream) {
- this.inputStream = inputStream;
- }
-
- public String readLine() throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
- boolean foundCr = false;
-
- while (true) {
- int character = inputStream.read();
-
- if (character == -1) {
- throw new IOException("Stream closed!");
- }
-
- baos.write(character);
-
- if (foundCr && character == LF) break;
- else if (character == CR) foundCr = true;
- else if (foundCr) foundCr = false;
- }
-
- byte[] data = baos.toByteArray();
- return new String(data, 0, data.length-2);
- }
-
- public byte[] readFully(int size) throws IOException {
- byte[] result = new byte[size];
- int offset = 0;
- int remaining = result.length;
-
- while (remaining > 0) {
- int read = inputStream.read(result, offset, remaining);
-
- if (read < 0) {
- throw new IOException("Stream closed!");
- }
-
- offset += read;
- remaining -= read;
- }
-
- return result;
- }
-
- public void close() throws IOException {
- inputStream.close();
- }
-
-}
diff --git a/redis-dispatch/src/main/java/org/whispersystems/dispatch/io/RedisPubSubConnectionFactory.java b/redis-dispatch/src/main/java/org/whispersystems/dispatch/io/RedisPubSubConnectionFactory.java
deleted file mode 100644
index 898f9f6da..000000000
--- a/redis-dispatch/src/main/java/org/whispersystems/dispatch/io/RedisPubSubConnectionFactory.java
+++ /dev/null
@@ -1,13 +0,0 @@
-/*
- * Copyright 2013-2020 Signal Messenger, LLC
- * SPDX-License-Identifier: AGPL-3.0-only
- */
-package org.whispersystems.dispatch.io;
-
-import org.whispersystems.dispatch.redis.PubSubConnection;
-
-public interface RedisPubSubConnectionFactory {
-
- PubSubConnection connect();
-
-}
diff --git a/redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/PubSubConnection.java b/redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/PubSubConnection.java
deleted file mode 100644
index 322f6e5b7..000000000
--- a/redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/PubSubConnection.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Copyright 2013-2020 Signal Messenger, LLC
- * SPDX-License-Identifier: AGPL-3.0-only
- */
-package org.whispersystems.dispatch.redis;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.whispersystems.dispatch.io.RedisInputStream;
-import org.whispersystems.dispatch.redis.protocol.ArrayReplyHeader;
-import org.whispersystems.dispatch.redis.protocol.IntReply;
-import org.whispersystems.dispatch.redis.protocol.StringReplyHeader;
-import org.whispersystems.dispatch.util.Util;
-
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.util.Arrays;
-import java.util.Optional;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class PubSubConnection {
-
- private final Logger logger = LoggerFactory.getLogger(PubSubConnection.class);
-
- private static final byte[] UNSUBSCRIBE_TYPE = {'u', 'n', 's', 'u', 'b', 's', 'c', 'r', 'i', 'b', 'e' };
- private static final byte[] SUBSCRIBE_TYPE = {'s', 'u', 'b', 's', 'c', 'r', 'i', 'b', 'e' };
- private static final byte[] MESSAGE_TYPE = {'m', 'e', 's', 's', 'a', 'g', 'e' };
-
- private static final byte[] SUBSCRIBE_COMMAND = {'S', 'U', 'B', 'S', 'C', 'R', 'I', 'B', 'E', ' ' };
- private static final byte[] UNSUBSCRIBE_COMMAND = {'U', 'N', 'S', 'U', 'B', 'S', 'C', 'R', 'I', 'B', 'E', ' '};
- private static final byte[] CRLF = {'\r', '\n' };
-
- private final OutputStream outputStream;
- private final RedisInputStream inputStream;
- private final Socket socket;
- private final AtomicBoolean closed;
-
- public PubSubConnection(Socket socket) throws IOException {
- this.socket = socket;
- this.outputStream = socket.getOutputStream();
- this.inputStream = new RedisInputStream(new BufferedInputStream(socket.getInputStream()));
- this.closed = new AtomicBoolean(false);
- }
-
- public void subscribe(String channelName) throws IOException {
- if (closed.get()) throw new IOException("Connection closed!");
-
- byte[] command = Util.combine(SUBSCRIBE_COMMAND, channelName.getBytes(), CRLF);
- outputStream.write(command);
- }
-
- public void unsubscribe(String channelName) throws IOException {
- if (closed.get()) throw new IOException("Connection closed!");
-
- byte[] command = Util.combine(UNSUBSCRIBE_COMMAND, channelName.getBytes(), CRLF);
- outputStream.write(command);
- }
-
- public PubSubReply read() throws IOException {
- if (closed.get()) throw new IOException("Connection closed!");
-
- ArrayReplyHeader replyHeader = new ArrayReplyHeader(inputStream.readLine());
-
- if (replyHeader.getElementCount() != 3) {
- throw new IOException("Received array reply header with strange count: " + replyHeader.getElementCount());
- }
-
- StringReplyHeader replyTypeHeader = new StringReplyHeader(inputStream.readLine());
- byte[] replyType = inputStream.readFully(replyTypeHeader.getStringLength());
- inputStream.readLine();
-
- if (Arrays.equals(SUBSCRIBE_TYPE, replyType)) return readSubscribeReply();
- else if (Arrays.equals(UNSUBSCRIBE_TYPE, replyType)) return readUnsubscribeReply();
- else if (Arrays.equals(MESSAGE_TYPE, replyType)) return readMessageReply();
- else throw new IOException("Unknown reply type: " + new String(replyType));
- }
-
- public void close() {
- try {
- this.closed.set(true);
- this.inputStream.close();
- this.outputStream.close();
- this.socket.close();
- } catch (IOException e) {
- logger.warn("Exception while closing", e);
- }
- }
-
- private PubSubReply readMessageReply() throws IOException {
- StringReplyHeader channelNameHeader = new StringReplyHeader(inputStream.readLine());
- byte[] channelName = inputStream.readFully(channelNameHeader.getStringLength());
- inputStream.readLine();
-
- StringReplyHeader messageHeader = new StringReplyHeader(inputStream.readLine());
- byte[] message = inputStream.readFully(messageHeader.getStringLength());
- inputStream.readLine();
-
- return new PubSubReply(PubSubReply.Type.MESSAGE, new String(channelName), Optional.of(message));
- }
-
- private PubSubReply readUnsubscribeReply() throws IOException {
- String channelName = readSubscriptionReply();
- return new PubSubReply(PubSubReply.Type.UNSUBSCRIBE, channelName, Optional.empty());
- }
-
- private PubSubReply readSubscribeReply() throws IOException {
- String channelName = readSubscriptionReply();
- return new PubSubReply(PubSubReply.Type.SUBSCRIBE, channelName, Optional.empty());
- }
-
- private String readSubscriptionReply() throws IOException {
- StringReplyHeader channelNameHeader = new StringReplyHeader(inputStream.readLine());
- byte[] channelName = inputStream.readFully(channelNameHeader.getStringLength());
- inputStream.readLine();
-
- new IntReply(inputStream.readLine());
-
- return new String(channelName);
- }
-
-}
diff --git a/redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/PubSubReply.java b/redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/PubSubReply.java
deleted file mode 100644
index 929de484c..000000000
--- a/redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/PubSubReply.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright 2013-2020 Signal Messenger, LLC
- * SPDX-License-Identifier: AGPL-3.0-only
- */
-package org.whispersystems.dispatch.redis;
-
-import java.util.Optional;
-
-@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
-public class PubSubReply {
-
- public enum Type {
- MESSAGE,
- SUBSCRIBE,
- UNSUBSCRIBE
- }
-
- private final Type type;
- private final String channel;
- private final Optional content;
-
- public PubSubReply(Type type, String channel, Optional content) {
- this.type = type;
- this.channel = channel;
- this.content = content;
- }
-
- public Type getType() {
- return type;
- }
-
- public String getChannel() {
- return channel;
- }
-
- public Optional getContent() {
- return content;
- }
-
-}
diff --git a/redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/protocol/ArrayReplyHeader.java b/redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/protocol/ArrayReplyHeader.java
deleted file mode 100644
index 37cccb77f..000000000
--- a/redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/protocol/ArrayReplyHeader.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright 2013-2020 Signal Messenger, LLC
- * SPDX-License-Identifier: AGPL-3.0-only
- */
-package org.whispersystems.dispatch.redis.protocol;
-
-import java.io.IOException;
-
-public class ArrayReplyHeader {
-
- private final int elementCount;
-
- public ArrayReplyHeader(String header) throws IOException {
- if (header == null || header.length() < 2 || header.charAt(0) != '*') {
- throw new IOException("Invalid array reply header: " + header);
- }
-
- try {
- this.elementCount = Integer.parseInt(header.substring(1));
- } catch (NumberFormatException e) {
- throw new IOException(e);
- }
- }
-
- public int getElementCount() {
- return elementCount;
- }
-}
diff --git a/redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/protocol/IntReply.java b/redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/protocol/IntReply.java
deleted file mode 100644
index d28150bbd..000000000
--- a/redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/protocol/IntReply.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright 2013-2020 Signal Messenger, LLC
- * SPDX-License-Identifier: AGPL-3.0-only
- */
-package org.whispersystems.dispatch.redis.protocol;
-
-import java.io.IOException;
-
-public class IntReply {
-
- private final int value;
-
- public IntReply(String reply) throws IOException {
- if (reply == null || reply.length() < 2 || reply.charAt(0) != ':') {
- throw new IOException("Invalid int reply: " + reply);
- }
-
- try {
- this.value = Integer.parseInt(reply.substring(1));
- } catch (NumberFormatException e) {
- throw new IOException(e);
- }
- }
-
- public int getValue() {
- return value;
- }
-}
diff --git a/redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/protocol/StringReplyHeader.java b/redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/protocol/StringReplyHeader.java
deleted file mode 100644
index ece0258a6..000000000
--- a/redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/protocol/StringReplyHeader.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright 2013-2020 Signal Messenger, LLC
- * SPDX-License-Identifier: AGPL-3.0-only
- */
-package org.whispersystems.dispatch.redis.protocol;
-
-import java.io.IOException;
-
-public class StringReplyHeader {
-
- private final int stringLength;
-
- public StringReplyHeader(String header) throws IOException {
- if (header == null || header.length() < 2 || header.charAt(0) != '$') {
- throw new IOException("Invalid string reply header: " + header);
- }
-
- try {
- this.stringLength = Integer.parseInt(header.substring(1));
- } catch (NumberFormatException e) {
- throw new IOException(e);
- }
- }
-
- public int getStringLength() {
- return stringLength;
- }
-}
diff --git a/redis-dispatch/src/main/java/org/whispersystems/dispatch/util/Util.java b/redis-dispatch/src/main/java/org/whispersystems/dispatch/util/Util.java
deleted file mode 100644
index 21dbcc532..000000000
--- a/redis-dispatch/src/main/java/org/whispersystems/dispatch/util/Util.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright 2013-2020 Signal Messenger, LLC
- * SPDX-License-Identifier: AGPL-3.0-only
- */
-package org.whispersystems.dispatch.util;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-
-public class Util {
-
- public static byte[] combine(byte[]... elements) {
- try {
- int sum = 0;
-
- for (byte[] element : elements) {
- sum += element.length;
- }
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream(sum);
-
- for (byte[] element : elements) {
- baos.write(element);
- }
-
- return baos.toByteArray();
- } catch (IOException e) {
- throw new AssertionError(e);
- }
- }
-
-
- public static void sleep(long millis) {
- try {
- Thread.sleep(millis);
- } catch (InterruptedException e) {
- throw new AssertionError(e);
- }
- }
-}
diff --git a/redis-dispatch/src/test/java/org/whispersystems/dispatch/DispatchManagerTest.java b/redis-dispatch/src/test/java/org/whispersystems/dispatch/DispatchManagerTest.java
deleted file mode 100644
index 4410659d6..000000000
--- a/redis-dispatch/src/test/java/org/whispersystems/dispatch/DispatchManagerTest.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Copyright 2013-2020 Signal Messenger, LLC
- * SPDX-License-Identifier: AGPL-3.0-only
- */
-package org.whispersystems.dispatch;
-
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Optional;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.stubbing.Answer;
-import org.whispersystems.dispatch.io.RedisPubSubConnectionFactory;
-import org.whispersystems.dispatch.redis.PubSubConnection;
-import org.whispersystems.dispatch.redis.PubSubReply;
-
-public class DispatchManagerTest {
-
- private PubSubConnection pubSubConnection;
- private RedisPubSubConnectionFactory socketFactory;
- private DispatchManager dispatchManager;
- private PubSubReplyInputStream pubSubReplyInputStream;
-
- @BeforeEach
- void setUp() throws Exception {
- pubSubConnection = mock(PubSubConnection.class );
- socketFactory = mock(RedisPubSubConnectionFactory.class);
- pubSubReplyInputStream = new PubSubReplyInputStream();
-
- when(socketFactory.connect()).thenReturn(pubSubConnection);
- when(pubSubConnection.read()).thenAnswer((Answer) invocationOnMock -> pubSubReplyInputStream.read());
-
- dispatchManager = new DispatchManager(socketFactory, Optional.empty());
- dispatchManager.start();
- }
-
- @AfterEach
- void tearDown() {
- dispatchManager.shutdown();
- }
-
- @Test
- public void testConnect() {
- verify(socketFactory).connect();
- }
-
- @Test
- public void testSubscribe() {
- DispatchChannel dispatchChannel = mock(DispatchChannel.class);
- dispatchManager.subscribe("foo", dispatchChannel);
- pubSubReplyInputStream.write(new PubSubReply(PubSubReply.Type.SUBSCRIBE, "foo", Optional.empty()));
-
- verify(dispatchChannel, timeout(1000)).onDispatchSubscribed(eq("foo"));
- }
-
- @Test
- public void testSubscribeUnsubscribe() {
- DispatchChannel dispatchChannel = mock(DispatchChannel.class);
- dispatchManager.subscribe("foo", dispatchChannel);
- dispatchManager.unsubscribe("foo", dispatchChannel);
-
- pubSubReplyInputStream.write(new PubSubReply(PubSubReply.Type.SUBSCRIBE, "foo", Optional.empty()));
- pubSubReplyInputStream.write(new PubSubReply(PubSubReply.Type.UNSUBSCRIBE, "foo", Optional.empty()));
-
- verify(dispatchChannel, timeout(1000)).onDispatchUnsubscribed(eq("foo"));
- }
-
- @Test
- public void testMessages() {
- DispatchChannel fooChannel = mock(DispatchChannel.class);
- DispatchChannel barChannel = mock(DispatchChannel.class);
-
- dispatchManager.subscribe("foo", fooChannel);
- dispatchManager.subscribe("bar", barChannel);
-
- pubSubReplyInputStream.write(new PubSubReply(PubSubReply.Type.SUBSCRIBE, "foo", Optional.empty()));
- pubSubReplyInputStream.write(new PubSubReply(PubSubReply.Type.SUBSCRIBE, "bar", Optional.empty()));
-
- verify(fooChannel, timeout(1000)).onDispatchSubscribed(eq("foo"));
- verify(barChannel, timeout(1000)).onDispatchSubscribed(eq("bar"));
-
- pubSubReplyInputStream.write(new PubSubReply(PubSubReply.Type.MESSAGE, "foo", Optional.of("hello".getBytes())));
- pubSubReplyInputStream.write(new PubSubReply(PubSubReply.Type.MESSAGE, "bar", Optional.of("there".getBytes())));
-
- ArgumentCaptor captor = ArgumentCaptor.forClass(byte[].class);
- verify(fooChannel, timeout(1000)).onDispatchMessage(eq("foo"), captor.capture());
-
- assertArrayEquals("hello".getBytes(), captor.getValue());
-
- verify(barChannel, timeout(1000)).onDispatchMessage(eq("bar"), captor.capture());
-
- assertArrayEquals("there".getBytes(), captor.getValue());
- }
-
- private static class PubSubReplyInputStream {
-
- private final List pubSubReplyList = new LinkedList<>();
-
- public synchronized PubSubReply read() {
- try {
- while (pubSubReplyList.isEmpty()) wait();
- return pubSubReplyList.remove(0);
- } catch (InterruptedException e) {
- throw new AssertionError(e);
- }
- }
-
- public synchronized void write(PubSubReply pubSubReply) {
- pubSubReplyList.add(pubSubReply);
- notifyAll();
- }
- }
-
-}
diff --git a/redis-dispatch/src/test/java/org/whispersystems/dispatch/redis/PubSubConnectionTest.java b/redis-dispatch/src/test/java/org/whispersystems/dispatch/redis/PubSubConnectionTest.java
deleted file mode 100644
index 4e0459359..000000000
--- a/redis-dispatch/src/test/java/org/whispersystems/dispatch/redis/PubSubConnectionTest.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Copyright 2013-2020 Signal Messenger, LLC
- * SPDX-License-Identifier: AGPL-3.0-only
- */
-package org.whispersystems.dispatch.redis;
-
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.security.SecureRandom;
-import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-class PubSubConnectionTest {
-
- private static final String REPLY = "*3\r\n" +
- "$9\r\n" +
- "subscribe\r\n" +
- "$5\r\n" +
- "abcde\r\n" +
- ":1\r\n" +
- "*3\r\n" +
- "$9\r\n" +
- "subscribe\r\n" +
- "$5\r\n" +
- "fghij\r\n" +
- ":2\r\n" +
- "*3\r\n" +
- "$9\r\n" +
- "subscribe\r\n" +
- "$5\r\n" +
- "klmno\r\n" +
- ":2\r\n" +
- "*3\r\n" +
- "$7\r\n" +
- "message\r\n" +
- "$5\r\n" +
- "abcde\r\n" +
- "$10\r\n" +
- "1234567890\r\n" +
- "*3\r\n" +
- "$7\r\n" +
- "message\r\n" +
- "$5\r\n" +
- "klmno\r\n" +
- "$10\r\n" +
- "0987654321\r\n";
-
-
- @Test
- void testSubscribe() throws IOException {
- OutputStream outputStream = mock(OutputStream.class);
- Socket socket = mock(Socket.class );
- when(socket.getOutputStream()).thenReturn(outputStream);
- PubSubConnection connection = new PubSubConnection(socket);
-
- connection.subscribe("foobar");
-
- ArgumentCaptor captor = ArgumentCaptor.forClass(byte[].class);
- verify(outputStream).write(captor.capture());
-
- assertArrayEquals(captor.getValue(), "SUBSCRIBE foobar\r\n".getBytes());
- }
-
- @Test
- void testUnsubscribe() throws IOException {
- OutputStream outputStream = mock(OutputStream.class);
- Socket socket = mock(Socket.class );
- when(socket.getOutputStream()).thenReturn(outputStream);
- PubSubConnection connection = new PubSubConnection(socket);
-
- connection.unsubscribe("bazbar");
-
- ArgumentCaptor captor = ArgumentCaptor.forClass(byte[].class);
- verify(outputStream).write(captor.capture());
-
- assertArrayEquals(captor.getValue(), "UNSUBSCRIBE bazbar\r\n".getBytes());
- }
-
- @Test
- void testTricklyResponse() throws Exception {
- InputStream inputStream = mockInputStreamFor(new TrickleInputStream(REPLY.getBytes()));
- OutputStream outputStream = mock(OutputStream.class);
- Socket socket = mock(Socket.class );
- when(socket.getOutputStream()).thenReturn(outputStream);
- when(socket.getInputStream()).thenReturn(inputStream);
-
- PubSubConnection pubSubConnection = new PubSubConnection(socket);
- readResponses(pubSubConnection);
- }
-
- @Test
- void testFullResponse() throws Exception {
- InputStream inputStream = mockInputStreamFor(new FullInputStream(REPLY.getBytes()));
- OutputStream outputStream = mock(OutputStream.class);
- Socket socket = mock(Socket.class );
- when(socket.getOutputStream()).thenReturn(outputStream);
- when(socket.getInputStream()).thenReturn(inputStream);
-
- PubSubConnection pubSubConnection = new PubSubConnection(socket);
- readResponses(pubSubConnection);
- }
-
- @Test
- void testRandomLengthResponse() throws Exception {
- InputStream inputStream = mockInputStreamFor(new RandomInputStream(REPLY.getBytes()));
- OutputStream outputStream = mock(OutputStream.class);
- Socket socket = mock(Socket.class );
- when(socket.getOutputStream()).thenReturn(outputStream);
- when(socket.getInputStream()).thenReturn(inputStream);
-
- PubSubConnection pubSubConnection = new PubSubConnection(socket);
- readResponses(pubSubConnection);
- }
-
- private InputStream mockInputStreamFor(final MockInputStream stub) throws IOException {
- InputStream result = mock(InputStream.class);
-
- when(result.read()).thenAnswer(new Answer() {
- @Override
- public Integer answer(InvocationOnMock invocationOnMock) throws Throwable {
- return stub.read();
- }
- });
-
- when(result.read(any(byte[].class))).thenAnswer(new Answer() {
- @Override
- public Integer answer(InvocationOnMock invocationOnMock) throws Throwable {
- byte[] buffer = (byte[])invocationOnMock.getArguments()[0];
- return stub.read(buffer, 0, buffer.length);
- }
- });
-
- when(result.read(any(byte[].class), anyInt(), anyInt())).thenAnswer(new Answer() {
- @Override
- public Integer answer(InvocationOnMock invocationOnMock) throws Throwable {
- byte[] buffer = (byte[]) invocationOnMock.getArguments()[0];
- int offset = (int) invocationOnMock.getArguments()[1];
- int length = (int) invocationOnMock.getArguments()[2];
-
- return stub.read(buffer, offset, length);
- }
- });
-
- return result;
- }
-
- private void readResponses(PubSubConnection pubSubConnection) throws Exception {
- PubSubReply reply = pubSubConnection.read();
-
- assertEquals(reply.getType(), PubSubReply.Type.SUBSCRIBE);
- assertEquals(reply.getChannel(), "abcde");
- assertFalse(reply.getContent().isPresent());
-
- reply = pubSubConnection.read();
-
- assertEquals(reply.getType(), PubSubReply.Type.SUBSCRIBE);
- assertEquals(reply.getChannel(), "fghij");
- assertFalse(reply.getContent().isPresent());
-
- reply = pubSubConnection.read();
-
- assertEquals(reply.getType(), PubSubReply.Type.SUBSCRIBE);
- assertEquals(reply.getChannel(), "klmno");
- assertFalse(reply.getContent().isPresent());
-
- reply = pubSubConnection.read();
-
- assertEquals(reply.getType(), PubSubReply.Type.MESSAGE);
- assertEquals(reply.getChannel(), "abcde");
- assertArrayEquals(reply.getContent().get(), "1234567890".getBytes());
-
- reply = pubSubConnection.read();
-
- assertEquals(reply.getType(), PubSubReply.Type.MESSAGE);
- assertEquals(reply.getChannel(), "klmno");
- assertArrayEquals(reply.getContent().get(), "0987654321".getBytes());
- }
-
- private interface MockInputStream {
- public int read();
- public int read(byte[] input, int offset, int length);
- }
-
- private static class TrickleInputStream implements MockInputStream {
-
- private final byte[] data;
- private int index = 0;
-
- private TrickleInputStream(byte[] data) {
- this.data = data;
- }
-
- public int read() {
- return data[index++];
- }
-
- public int read(byte[] input, int offset, int length) {
- input[offset] = data[index++];
- return 1;
- }
-
- }
-
- private static class FullInputStream implements MockInputStream {
-
- private final byte[] data;
- private int index = 0;
-
- private FullInputStream(byte[] data) {
- this.data = data;
- }
-
- public int read() {
- return data[index++];
- }
-
- public int read(byte[] input, int offset, int length) {
- int amount = Math.min(data.length - index, length);
- System.arraycopy(data, index, input, offset, amount);
- index += length;
-
- return amount;
- }
- }
-
- private static class RandomInputStream implements MockInputStream {
- private final byte[] data;
- private int index = 0;
-
- private RandomInputStream(byte[] data) {
- this.data = data;
- }
-
- public int read() {
- return data[index++];
- }
-
- public int read(byte[] input, int offset, int length) {
- int maxCopy = Math.min(data.length - index, length);
- int randomCopy = new SecureRandom().nextInt(maxCopy) + 1;
- int copyAmount = Math.min(maxCopy, randomCopy);
-
- System.arraycopy(data, index, input, offset, copyAmount);
- index += copyAmount;
-
- return copyAmount;
- }
-
- }
-
-}
diff --git a/redis-dispatch/src/test/java/org/whispersystems/dispatch/redis/protocol/ArrayReplyHeaderTest.java b/redis-dispatch/src/test/java/org/whispersystems/dispatch/redis/protocol/ArrayReplyHeaderTest.java
deleted file mode 100644
index d7346019d..000000000
--- a/redis-dispatch/src/test/java/org/whispersystems/dispatch/redis/protocol/ArrayReplyHeaderTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Copyright 2013-2020 Signal Messenger, LLC
- * SPDX-License-Identifier: AGPL-3.0-only
- */
-package org.whispersystems.dispatch.redis.protocol;
-
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-import java.io.IOException;
-import org.junit.jupiter.api.Test;
-
-class ArrayReplyHeaderTest {
-
- @Test
- void testNull() {
- assertThrows(IOException.class, () -> new ArrayReplyHeader(null));
- }
-
- @Test
- void testBadPrefix() {
- assertThrows(IOException.class, () -> new ArrayReplyHeader(":3"));
- }
-
- @Test
- void testEmpty() {
- assertThrows(IOException.class, () -> new ArrayReplyHeader(""));
- }
-
- @Test
- void testTruncated() {
- assertThrows(IOException.class, () -> new ArrayReplyHeader("*"));
- }
-
- @Test
- void testBadNumber() {
- assertThrows(IOException.class, () -> new ArrayReplyHeader("*ABC"));
- }
-
- @Test
- void testValid() throws IOException {
- assertEquals(4, new ArrayReplyHeader("*4").getElementCount());
- }
-
-
-
-
-
-
-
-
-
-}
diff --git a/redis-dispatch/src/test/java/org/whispersystems/dispatch/redis/protocol/IntReplyHeaderTest.java b/redis-dispatch/src/test/java/org/whispersystems/dispatch/redis/protocol/IntReplyHeaderTest.java
deleted file mode 100644
index a5efb357d..000000000
--- a/redis-dispatch/src/test/java/org/whispersystems/dispatch/redis/protocol/IntReplyHeaderTest.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright 2013-2020 Signal Messenger, LLC
- * SPDX-License-Identifier: AGPL-3.0-only
- */
-package org.whispersystems.dispatch.redis.protocol;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-import java.io.IOException;
-import org.junit.jupiter.api.Test;
-
-class IntReplyHeaderTest {
-
- @Test
- void testNull() {
- assertThrows(IOException.class, () -> new IntReply(null));
- }
-
- @Test
- void testEmpty() {
- assertThrows(IOException.class, () -> new IntReply(""));
- }
-
- @Test
- void testBadNumber() {
- assertThrows(IOException.class, () -> new IntReply(":A"));
- }
-
- @Test
- void testBadFormat() {
- assertThrows(IOException.class, () -> new IntReply("*"));
- }
-
- @Test
- void testValid() throws IOException {
- assertEquals(23, new IntReply(":23").getValue());
- }
-}
diff --git a/redis-dispatch/src/test/java/org/whispersystems/dispatch/redis/protocol/StringReplyHeaderTest.java b/redis-dispatch/src/test/java/org/whispersystems/dispatch/redis/protocol/StringReplyHeaderTest.java
deleted file mode 100644
index 206041b6f..000000000
--- a/redis-dispatch/src/test/java/org/whispersystems/dispatch/redis/protocol/StringReplyHeaderTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright 2013-2020 Signal Messenger, LLC
- * SPDX-License-Identifier: AGPL-3.0-only
- */
-package org.whispersystems.dispatch.redis.protocol;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-import java.io.IOException;
-import org.junit.jupiter.api.Test;
-
-class StringReplyHeaderTest {
-
- @Test
- void testNull() {
- assertThrows(IOException.class, () -> new StringReplyHeader(null));
- }
-
- @Test
- void testBadNumber() {
- assertThrows(IOException.class, () -> new StringReplyHeader("$100A"));
- }
-
- @Test
- void testBadPrefix() {
- assertThrows(IOException.class, () -> new StringReplyHeader("*"));
- }
-
- @Test
- void testValid() throws IOException {
- assertEquals(1000, new StringReplyHeader("$1000").getStringLength());
- }
-
-}
diff --git a/service/pom.xml b/service/pom.xml
index e0493cb66..3d04a229b 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -41,11 +41,6 @@
event-logger
${project.version}
-
- org.whispersystems.textsecure
- redis-dispatch
- ${project.version}
-
org.whispersystems.textsecure
websocket-resources
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClientFactory.java b/service/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClientFactory.java
deleted file mode 100644
index 24ede2139..000000000
--- a/service/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClientFactory.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Copyright 2013-2020 Signal Messenger, LLC
- * SPDX-License-Identifier: AGPL-3.0-only
- */
-package org.whispersystems.textsecuregcm.providers;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.whispersystems.dispatch.io.RedisPubSubConnectionFactory;
-import org.whispersystems.dispatch.redis.PubSubConnection;
-import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration;
-import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
-import org.whispersystems.textsecuregcm.util.Util;
-
-import java.io.IOException;
-import java.net.Socket;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.LinkedList;
-import java.util.List;
-
-import redis.clients.jedis.JedisPool;
-import redis.clients.jedis.JedisPoolConfig;
-import redis.clients.jedis.Protocol;
-
-public class RedisClientFactory implements RedisPubSubConnectionFactory {
-
- private final Logger logger = LoggerFactory.getLogger(RedisClientFactory.class);
-
- private final String host;
- private final int port;
- private final ReplicatedJedisPool jedisPool;
-
- public RedisClientFactory(String name, String url, List replicaUrls, CircuitBreakerConfiguration circuitBreakerConfiguration)
- throws URISyntaxException
- {
- JedisPoolConfig poolConfig = new JedisPoolConfig();
- poolConfig.setTestOnBorrow(true);
- poolConfig.setMaxWaitMillis(10000);
-
- URI redisURI = new URI(url);
-
- this.host = redisURI.getHost();
- this.port = redisURI.getPort();
-
- JedisPool masterPool = new JedisPool(poolConfig, host, port, Protocol.DEFAULT_TIMEOUT, null);
- List replicaPools = new LinkedList<>();
-
- for (String replicaUrl : replicaUrls) {
- URI replicaURI = new URI(replicaUrl);
-
- replicaPools.add(new JedisPool(poolConfig, replicaURI.getHost(), replicaURI.getPort(),
- 500, Protocol.DEFAULT_TIMEOUT, null,
- Protocol.DEFAULT_DATABASE, null, false, null ,
- null, null));
- }
-
- this.jedisPool = new ReplicatedJedisPool(name, masterPool, replicaPools, circuitBreakerConfiguration);
- }
-
- public ReplicatedJedisPool getRedisClientPool() {
- return jedisPool;
- }
-
- @Override
- public PubSubConnection connect() {
- while (true) {
- try {
- Socket socket = new Socket(host, port);
- return new PubSubConnection(socket);
- } catch (IOException e) {
- logger.warn("Error connecting", e);
- Util.sleep(200);
- }
- }
- }
-}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubManager.java
deleted file mode 100644
index e2ba58d92..000000000
--- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubManager.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Copyright 2013-2020 Signal Messenger, LLC
- * SPDX-License-Identifier: AGPL-3.0-only
- */
-
-package org.whispersystems.textsecuregcm.storage;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.whispersystems.dispatch.DispatchChannel;
-import org.whispersystems.dispatch.DispatchManager;
-import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
-
-import io.dropwizard.lifecycle.Managed;
-import static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage;
-import redis.clients.jedis.Jedis;
-
-public class PubSubManager implements Managed {
-
- private static final String KEEPALIVE_CHANNEL = "KEEPALIVE";
-
- private final Logger logger = LoggerFactory.getLogger(PubSubManager.class);
-
- private final DispatchManager dispatchManager;
- private final ReplicatedJedisPool jedisPool;
-
- private boolean subscribed = false;
-
- public PubSubManager(ReplicatedJedisPool jedisPool, DispatchManager dispatchManager) {
- this.dispatchManager = dispatchManager;
- this.jedisPool = jedisPool;
- }
-
- @Override
- public void start() throws Exception {
- this.dispatchManager.start();
-
- KeepaliveDispatchChannel keepaliveDispatchChannel = new KeepaliveDispatchChannel();
- this.dispatchManager.subscribe(KEEPALIVE_CHANNEL, keepaliveDispatchChannel);
-
- synchronized (this) {
- while (!subscribed) wait(0);
- }
-
- new KeepaliveSender().start();
- }
-
- @Override
- public void stop() throws Exception {
- dispatchManager.shutdown();
- }
-
- public void subscribe(PubSubAddress address, DispatchChannel channel) {
- dispatchManager.subscribe(address.serialize(), channel);
- }
-
- public void unsubscribe(PubSubAddress address, DispatchChannel dispatchChannel) {
- dispatchManager.unsubscribe(address.serialize(), dispatchChannel);
- }
-
- public boolean hasLocalSubscription(PubSubAddress address) {
- return dispatchManager.hasSubscription(address.serialize());
- }
-
- public boolean publish(PubSubAddress address, PubSubMessage message) {
- return publish(address.serialize().getBytes(), message);
- }
-
- private boolean publish(byte[] channel, PubSubMessage message) {
- try (Jedis jedis = jedisPool.getWriteResource()) {
- long result = jedis.publish(channel, message.toByteArray());
-
- if (result < 0) {
- logger.warn("**** Jedis publish result < 0");
- }
-
- return result > 0;
- }
- }
-
- private class KeepaliveDispatchChannel implements DispatchChannel {
-
- @Override
- public void onDispatchMessage(String channel, byte[] message) {
- // Good
- }
-
- @Override
- public void onDispatchSubscribed(String channel) {
- if (KEEPALIVE_CHANNEL.equals(channel)) {
- synchronized (PubSubManager.this) {
- subscribed = true;
- PubSubManager.this.notifyAll();
- }
- }
- }
-
- @Override
- public void onDispatchUnsubscribed(String channel) {
- logger.warn("***** KEEPALIVE CHANNEL UNSUBSCRIBED *****");
- }
- }
-
- private class KeepaliveSender extends Thread {
- @Override
- public void run() {
- while (true) {
- try {
- Thread.sleep(20000);
- publish(KEEPALIVE_CHANNEL.getBytes(), PubSubMessage.newBuilder()
- .setType(PubSubMessage.Type.KEEPALIVE)
- .build());
- } catch (Throwable e) {
- logger.warn("***** KEEPALIVE EXCEPTION ******", e);
- }
- }
- }
- }
-}
diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/subscriptions/ProcessorCustomer.java b/service/src/main/java/org/whispersystems/textsecuregcm/subscriptions/ProcessorCustomer.java
index 73c5d6b46..c994439c3 100644
--- a/service/src/main/java/org/whispersystems/textsecuregcm/subscriptions/ProcessorCustomer.java
+++ b/service/src/main/java/org/whispersystems/textsecuregcm/subscriptions/ProcessorCustomer.java
@@ -6,11 +6,16 @@
package org.whispersystems.textsecuregcm.subscriptions;
import java.nio.charset.StandardCharsets;
-import org.whispersystems.dispatch.util.Util;
public record ProcessorCustomer(String customerId, SubscriptionProcessor processor) {
public byte[] toDynamoBytes() {
- return Util.combine(new byte[]{processor.getId()}, customerId.getBytes(StandardCharsets.UTF_8));
+ final byte[] customerIdBytes = customerId.getBytes(StandardCharsets.UTF_8);
+ final byte[] combinedBytes = new byte[customerIdBytes.length + 1];
+
+ combinedBytes[0] = processor.getId();
+ System.arraycopy(customerIdBytes, 0, combinedBytes, 1, customerIdBytes.length);
+
+ return combinedBytes;
}
}
diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/subscriptions/ProcessorCustomerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/subscriptions/ProcessorCustomerTest.java
new file mode 100644
index 000000000..1fe65c941
--- /dev/null
+++ b/service/src/test/java/org/whispersystems/textsecuregcm/subscriptions/ProcessorCustomerTest.java
@@ -0,0 +1,16 @@
+package org.whispersystems.textsecuregcm.subscriptions;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class ProcessorCustomerTest {
+
+ @Test
+ void toDynamoBytes() {
+ final ProcessorCustomer processorCustomer = new ProcessorCustomer("Test", SubscriptionProcessor.BRAINTREE);
+
+ assertArrayEquals(new byte[] { SubscriptionProcessor.BRAINTREE.getId(), 'T', 'e', 's', 't' },
+ processorCustomer.toDynamoBytes());
+ }
+}