From fd5e9ea0169b4de226f7e437ef8434fcc74631d4 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Tue, 16 Aug 2022 13:21:36 -0400 Subject: [PATCH] Drop the old (and now unused!) `redis-dispatch` module --- pom.xml | 1 - redis-dispatch/pom.xml | 20 -- .../dispatch/DispatchChannel.java | 11 - .../dispatch/DispatchManager.java | 157 ----------- .../dispatch/io/RedisInputStream.java | 68 ----- .../io/RedisPubSubConnectionFactory.java | 13 - .../dispatch/redis/PubSubConnection.java | 123 -------- .../dispatch/redis/PubSubReply.java | 40 --- .../redis/protocol/ArrayReplyHeader.java | 28 -- .../dispatch/redis/protocol/IntReply.java | 28 -- .../redis/protocol/StringReplyHeader.java | 28 -- .../whispersystems/dispatch/util/Util.java | 40 --- .../dispatch/DispatchManagerTest.java | 123 -------- .../dispatch/redis/PubSubConnectionTest.java | 264 ------------------ .../redis/protocol/ArrayReplyHeaderTest.java | 54 ---- .../redis/protocol/IntReplyHeaderTest.java | 39 --- .../redis/protocol/StringReplyHeaderTest.java | 35 --- service/pom.xml | 5 - .../providers/RedisClientFactory.java | 77 ----- .../textsecuregcm/storage/PubSubManager.java | 119 -------- .../subscriptions/ProcessorCustomer.java | 9 +- .../subscriptions/ProcessorCustomerTest.java | 16 ++ 22 files changed, 23 insertions(+), 1275 deletions(-) delete mode 100644 redis-dispatch/pom.xml delete mode 100644 redis-dispatch/src/main/java/org/whispersystems/dispatch/DispatchChannel.java delete mode 100644 redis-dispatch/src/main/java/org/whispersystems/dispatch/DispatchManager.java delete mode 100644 redis-dispatch/src/main/java/org/whispersystems/dispatch/io/RedisInputStream.java delete mode 100644 redis-dispatch/src/main/java/org/whispersystems/dispatch/io/RedisPubSubConnectionFactory.java delete mode 100644 redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/PubSubConnection.java delete mode 100644 redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/PubSubReply.java delete mode 100644 redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/protocol/ArrayReplyHeader.java delete mode 100644 redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/protocol/IntReply.java delete mode 100644 redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/protocol/StringReplyHeader.java delete mode 100644 redis-dispatch/src/main/java/org/whispersystems/dispatch/util/Util.java delete mode 100644 redis-dispatch/src/test/java/org/whispersystems/dispatch/DispatchManagerTest.java delete mode 100644 redis-dispatch/src/test/java/org/whispersystems/dispatch/redis/PubSubConnectionTest.java delete mode 100644 redis-dispatch/src/test/java/org/whispersystems/dispatch/redis/protocol/ArrayReplyHeaderTest.java delete mode 100644 redis-dispatch/src/test/java/org/whispersystems/dispatch/redis/protocol/IntReplyHeaderTest.java delete mode 100644 redis-dispatch/src/test/java/org/whispersystems/dispatch/redis/protocol/StringReplyHeaderTest.java delete mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClientFactory.java delete mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubManager.java create mode 100644 service/src/test/java/org/whispersystems/textsecuregcm/subscriptions/ProcessorCustomerTest.java diff --git a/pom.xml b/pom.xml index 53fc1c1ff..ae29bb099 100644 --- a/pom.xml +++ b/pom.xml @@ -33,7 +33,6 @@ api-doc event-logger integration-tests - redis-dispatch service websocket-resources diff --git a/redis-dispatch/pom.xml b/redis-dispatch/pom.xml deleted file mode 100644 index febe3a782..000000000 --- a/redis-dispatch/pom.xml +++ /dev/null @@ -1,20 +0,0 @@ - - - - TextSecureServer - org.whispersystems.textsecure - JGITVER - - 4.0.0 - redis-dispatch - - - - org.slf4j - slf4j-api - - - - diff --git a/redis-dispatch/src/main/java/org/whispersystems/dispatch/DispatchChannel.java b/redis-dispatch/src/main/java/org/whispersystems/dispatch/DispatchChannel.java deleted file mode 100644 index 761c59619..000000000 --- a/redis-dispatch/src/main/java/org/whispersystems/dispatch/DispatchChannel.java +++ /dev/null @@ -1,11 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ -package org.whispersystems.dispatch; - -public interface DispatchChannel { - void onDispatchMessage(String channel, byte[] message); - void onDispatchSubscribed(String channel); - void onDispatchUnsubscribed(String channel); -} diff --git a/redis-dispatch/src/main/java/org/whispersystems/dispatch/DispatchManager.java b/redis-dispatch/src/main/java/org/whispersystems/dispatch/DispatchManager.java deleted file mode 100644 index 2dbf4de2a..000000000 --- a/redis-dispatch/src/main/java/org/whispersystems/dispatch/DispatchManager.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ -package org.whispersystems.dispatch; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.whispersystems.dispatch.io.RedisPubSubConnectionFactory; -import org.whispersystems.dispatch.redis.PubSubConnection; -import org.whispersystems.dispatch.redis.PubSubReply; - -import java.io.IOException; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; - -@SuppressWarnings("OptionalUsedAsFieldOrParameterType") -public class DispatchManager extends Thread { - - private final Logger logger = LoggerFactory.getLogger(DispatchManager.class); - private final Executor executor = Executors.newCachedThreadPool(); - private final Map subscriptions = new ConcurrentHashMap<>(); - - private final Optional deadLetterChannel; - private final RedisPubSubConnectionFactory redisPubSubConnectionFactory; - - private PubSubConnection pubSubConnection; - private volatile boolean running; - - public DispatchManager(RedisPubSubConnectionFactory redisPubSubConnectionFactory, - Optional deadLetterChannel) - { - this.redisPubSubConnectionFactory = redisPubSubConnectionFactory; - this.deadLetterChannel = deadLetterChannel; - } - - @Override - public void start() { - this.pubSubConnection = redisPubSubConnectionFactory.connect(); - this.running = true; - super.start(); - } - - public void shutdown() { - this.running = false; - this.pubSubConnection.close(); - } - - public synchronized void subscribe(String name, DispatchChannel dispatchChannel) { - Optional previous = Optional.ofNullable(subscriptions.get(name)); - subscriptions.put(name, dispatchChannel); - - try { - pubSubConnection.subscribe(name); - } catch (IOException e) { - logger.warn("Subscription error", e); - } - - previous.ifPresent(channel -> dispatchUnsubscription(name, channel)); - } - - public synchronized void unsubscribe(String name, DispatchChannel channel) { - Optional subscription = Optional.ofNullable(subscriptions.get(name)); - - if (subscription.isPresent() && subscription.get() == channel) { - subscriptions.remove(name); - - try { - pubSubConnection.unsubscribe(name); - } catch (IOException e) { - logger.warn("Unsubscribe error", e); - } - - dispatchUnsubscription(name, subscription.get()); - } - } - - public boolean hasSubscription(String name) { - return subscriptions.containsKey(name); - } - - @Override - public void run() { - while (running) { - try { - PubSubReply reply = pubSubConnection.read(); - - switch (reply.getType()) { - case UNSUBSCRIBE: break; - case SUBSCRIBE: dispatchSubscribe(reply); break; - case MESSAGE: dispatchMessage(reply); break; - default: throw new AssertionError("Unknown pubsub reply type! " + reply.getType()); - } - } catch (IOException e) { - logger.warn("***** PubSub Connection Error *****", e); - if (running) { - this.pubSubConnection.close(); - this.pubSubConnection = redisPubSubConnectionFactory.connect(); - resubscribeAll(); - } - } - } - - logger.warn("DispatchManager Shutting Down..."); - } - - private void dispatchSubscribe(final PubSubReply reply) { - Optional subscription = Optional.ofNullable(subscriptions.get(reply.getChannel())); - - if (subscription.isPresent()) { - dispatchSubscription(reply.getChannel(), subscription.get()); - } else { - logger.info("Received subscribe event for non-existing channel: " + reply.getChannel()); - } - } - - private void dispatchMessage(PubSubReply reply) { - Optional subscription = Optional.ofNullable(subscriptions.get(reply.getChannel())); - - if (subscription.isPresent()) { - dispatchMessage(reply.getChannel(), subscription.get(), reply.getContent().get()); - } else if (deadLetterChannel.isPresent()) { - dispatchMessage(reply.getChannel(), deadLetterChannel.get(), reply.getContent().get()); - } else { - logger.warn("Received message for non-existing channel, with no dead letter handler: " + reply.getChannel()); - } - } - - private void resubscribeAll() { - new Thread(() -> { - synchronized (DispatchManager.this) { - try { - for (String name : subscriptions.keySet()) { - pubSubConnection.subscribe(name); - } - } catch (IOException e) { - logger.warn("***** RESUBSCRIPTION ERROR *****", e); - } - } - }).start(); - } - - private void dispatchMessage(final String name, final DispatchChannel channel, final byte[] message) { - executor.execute(() -> channel.onDispatchMessage(name, message)); - } - - private void dispatchSubscription(final String name, final DispatchChannel channel) { - executor.execute(() -> channel.onDispatchSubscribed(name)); - } - - private void dispatchUnsubscription(final String name, final DispatchChannel channel) { - executor.execute(() -> channel.onDispatchUnsubscribed(name)); - } -} diff --git a/redis-dispatch/src/main/java/org/whispersystems/dispatch/io/RedisInputStream.java b/redis-dispatch/src/main/java/org/whispersystems/dispatch/io/RedisInputStream.java deleted file mode 100644 index 98764c223..000000000 --- a/redis-dispatch/src/main/java/org/whispersystems/dispatch/io/RedisInputStream.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ -package org.whispersystems.dispatch.io; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; - -public class RedisInputStream { - - private static final byte CR = 0x0D; - private static final byte LF = 0x0A; - - private final InputStream inputStream; - - public RedisInputStream(InputStream inputStream) { - this.inputStream = inputStream; - } - - public String readLine() throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - - boolean foundCr = false; - - while (true) { - int character = inputStream.read(); - - if (character == -1) { - throw new IOException("Stream closed!"); - } - - baos.write(character); - - if (foundCr && character == LF) break; - else if (character == CR) foundCr = true; - else if (foundCr) foundCr = false; - } - - byte[] data = baos.toByteArray(); - return new String(data, 0, data.length-2); - } - - public byte[] readFully(int size) throws IOException { - byte[] result = new byte[size]; - int offset = 0; - int remaining = result.length; - - while (remaining > 0) { - int read = inputStream.read(result, offset, remaining); - - if (read < 0) { - throw new IOException("Stream closed!"); - } - - offset += read; - remaining -= read; - } - - return result; - } - - public void close() throws IOException { - inputStream.close(); - } - -} diff --git a/redis-dispatch/src/main/java/org/whispersystems/dispatch/io/RedisPubSubConnectionFactory.java b/redis-dispatch/src/main/java/org/whispersystems/dispatch/io/RedisPubSubConnectionFactory.java deleted file mode 100644 index 898f9f6da..000000000 --- a/redis-dispatch/src/main/java/org/whispersystems/dispatch/io/RedisPubSubConnectionFactory.java +++ /dev/null @@ -1,13 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ -package org.whispersystems.dispatch.io; - -import org.whispersystems.dispatch.redis.PubSubConnection; - -public interface RedisPubSubConnectionFactory { - - PubSubConnection connect(); - -} diff --git a/redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/PubSubConnection.java b/redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/PubSubConnection.java deleted file mode 100644 index 322f6e5b7..000000000 --- a/redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/PubSubConnection.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ -package org.whispersystems.dispatch.redis; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.whispersystems.dispatch.io.RedisInputStream; -import org.whispersystems.dispatch.redis.protocol.ArrayReplyHeader; -import org.whispersystems.dispatch.redis.protocol.IntReply; -import org.whispersystems.dispatch.redis.protocol.StringReplyHeader; -import org.whispersystems.dispatch.util.Util; - -import java.io.BufferedInputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.net.Socket; -import java.util.Arrays; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicBoolean; - -public class PubSubConnection { - - private final Logger logger = LoggerFactory.getLogger(PubSubConnection.class); - - private static final byte[] UNSUBSCRIBE_TYPE = {'u', 'n', 's', 'u', 'b', 's', 'c', 'r', 'i', 'b', 'e' }; - private static final byte[] SUBSCRIBE_TYPE = {'s', 'u', 'b', 's', 'c', 'r', 'i', 'b', 'e' }; - private static final byte[] MESSAGE_TYPE = {'m', 'e', 's', 's', 'a', 'g', 'e' }; - - private static final byte[] SUBSCRIBE_COMMAND = {'S', 'U', 'B', 'S', 'C', 'R', 'I', 'B', 'E', ' ' }; - private static final byte[] UNSUBSCRIBE_COMMAND = {'U', 'N', 'S', 'U', 'B', 'S', 'C', 'R', 'I', 'B', 'E', ' '}; - private static final byte[] CRLF = {'\r', '\n' }; - - private final OutputStream outputStream; - private final RedisInputStream inputStream; - private final Socket socket; - private final AtomicBoolean closed; - - public PubSubConnection(Socket socket) throws IOException { - this.socket = socket; - this.outputStream = socket.getOutputStream(); - this.inputStream = new RedisInputStream(new BufferedInputStream(socket.getInputStream())); - this.closed = new AtomicBoolean(false); - } - - public void subscribe(String channelName) throws IOException { - if (closed.get()) throw new IOException("Connection closed!"); - - byte[] command = Util.combine(SUBSCRIBE_COMMAND, channelName.getBytes(), CRLF); - outputStream.write(command); - } - - public void unsubscribe(String channelName) throws IOException { - if (closed.get()) throw new IOException("Connection closed!"); - - byte[] command = Util.combine(UNSUBSCRIBE_COMMAND, channelName.getBytes(), CRLF); - outputStream.write(command); - } - - public PubSubReply read() throws IOException { - if (closed.get()) throw new IOException("Connection closed!"); - - ArrayReplyHeader replyHeader = new ArrayReplyHeader(inputStream.readLine()); - - if (replyHeader.getElementCount() != 3) { - throw new IOException("Received array reply header with strange count: " + replyHeader.getElementCount()); - } - - StringReplyHeader replyTypeHeader = new StringReplyHeader(inputStream.readLine()); - byte[] replyType = inputStream.readFully(replyTypeHeader.getStringLength()); - inputStream.readLine(); - - if (Arrays.equals(SUBSCRIBE_TYPE, replyType)) return readSubscribeReply(); - else if (Arrays.equals(UNSUBSCRIBE_TYPE, replyType)) return readUnsubscribeReply(); - else if (Arrays.equals(MESSAGE_TYPE, replyType)) return readMessageReply(); - else throw new IOException("Unknown reply type: " + new String(replyType)); - } - - public void close() { - try { - this.closed.set(true); - this.inputStream.close(); - this.outputStream.close(); - this.socket.close(); - } catch (IOException e) { - logger.warn("Exception while closing", e); - } - } - - private PubSubReply readMessageReply() throws IOException { - StringReplyHeader channelNameHeader = new StringReplyHeader(inputStream.readLine()); - byte[] channelName = inputStream.readFully(channelNameHeader.getStringLength()); - inputStream.readLine(); - - StringReplyHeader messageHeader = new StringReplyHeader(inputStream.readLine()); - byte[] message = inputStream.readFully(messageHeader.getStringLength()); - inputStream.readLine(); - - return new PubSubReply(PubSubReply.Type.MESSAGE, new String(channelName), Optional.of(message)); - } - - private PubSubReply readUnsubscribeReply() throws IOException { - String channelName = readSubscriptionReply(); - return new PubSubReply(PubSubReply.Type.UNSUBSCRIBE, channelName, Optional.empty()); - } - - private PubSubReply readSubscribeReply() throws IOException { - String channelName = readSubscriptionReply(); - return new PubSubReply(PubSubReply.Type.SUBSCRIBE, channelName, Optional.empty()); - } - - private String readSubscriptionReply() throws IOException { - StringReplyHeader channelNameHeader = new StringReplyHeader(inputStream.readLine()); - byte[] channelName = inputStream.readFully(channelNameHeader.getStringLength()); - inputStream.readLine(); - - new IntReply(inputStream.readLine()); - - return new String(channelName); - } - -} diff --git a/redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/PubSubReply.java b/redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/PubSubReply.java deleted file mode 100644 index 929de484c..000000000 --- a/redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/PubSubReply.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ -package org.whispersystems.dispatch.redis; - -import java.util.Optional; - -@SuppressWarnings("OptionalUsedAsFieldOrParameterType") -public class PubSubReply { - - public enum Type { - MESSAGE, - SUBSCRIBE, - UNSUBSCRIBE - } - - private final Type type; - private final String channel; - private final Optional content; - - public PubSubReply(Type type, String channel, Optional content) { - this.type = type; - this.channel = channel; - this.content = content; - } - - public Type getType() { - return type; - } - - public String getChannel() { - return channel; - } - - public Optional getContent() { - return content; - } - -} diff --git a/redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/protocol/ArrayReplyHeader.java b/redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/protocol/ArrayReplyHeader.java deleted file mode 100644 index 37cccb77f..000000000 --- a/redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/protocol/ArrayReplyHeader.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ -package org.whispersystems.dispatch.redis.protocol; - -import java.io.IOException; - -public class ArrayReplyHeader { - - private final int elementCount; - - public ArrayReplyHeader(String header) throws IOException { - if (header == null || header.length() < 2 || header.charAt(0) != '*') { - throw new IOException("Invalid array reply header: " + header); - } - - try { - this.elementCount = Integer.parseInt(header.substring(1)); - } catch (NumberFormatException e) { - throw new IOException(e); - } - } - - public int getElementCount() { - return elementCount; - } -} diff --git a/redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/protocol/IntReply.java b/redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/protocol/IntReply.java deleted file mode 100644 index d28150bbd..000000000 --- a/redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/protocol/IntReply.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ -package org.whispersystems.dispatch.redis.protocol; - -import java.io.IOException; - -public class IntReply { - - private final int value; - - public IntReply(String reply) throws IOException { - if (reply == null || reply.length() < 2 || reply.charAt(0) != ':') { - throw new IOException("Invalid int reply: " + reply); - } - - try { - this.value = Integer.parseInt(reply.substring(1)); - } catch (NumberFormatException e) { - throw new IOException(e); - } - } - - public int getValue() { - return value; - } -} diff --git a/redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/protocol/StringReplyHeader.java b/redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/protocol/StringReplyHeader.java deleted file mode 100644 index ece0258a6..000000000 --- a/redis-dispatch/src/main/java/org/whispersystems/dispatch/redis/protocol/StringReplyHeader.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ -package org.whispersystems.dispatch.redis.protocol; - -import java.io.IOException; - -public class StringReplyHeader { - - private final int stringLength; - - public StringReplyHeader(String header) throws IOException { - if (header == null || header.length() < 2 || header.charAt(0) != '$') { - throw new IOException("Invalid string reply header: " + header); - } - - try { - this.stringLength = Integer.parseInt(header.substring(1)); - } catch (NumberFormatException e) { - throw new IOException(e); - } - } - - public int getStringLength() { - return stringLength; - } -} diff --git a/redis-dispatch/src/main/java/org/whispersystems/dispatch/util/Util.java b/redis-dispatch/src/main/java/org/whispersystems/dispatch/util/Util.java deleted file mode 100644 index 21dbcc532..000000000 --- a/redis-dispatch/src/main/java/org/whispersystems/dispatch/util/Util.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ -package org.whispersystems.dispatch.util; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; - -public class Util { - - public static byte[] combine(byte[]... elements) { - try { - int sum = 0; - - for (byte[] element : elements) { - sum += element.length; - } - - ByteArrayOutputStream baos = new ByteArrayOutputStream(sum); - - for (byte[] element : elements) { - baos.write(element); - } - - return baos.toByteArray(); - } catch (IOException e) { - throw new AssertionError(e); - } - } - - - public static void sleep(long millis) { - try { - Thread.sleep(millis); - } catch (InterruptedException e) { - throw new AssertionError(e); - } - } -} diff --git a/redis-dispatch/src/test/java/org/whispersystems/dispatch/DispatchManagerTest.java b/redis-dispatch/src/test/java/org/whispersystems/dispatch/DispatchManagerTest.java deleted file mode 100644 index 4410659d6..000000000 --- a/redis-dispatch/src/test/java/org/whispersystems/dispatch/DispatchManagerTest.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ -package org.whispersystems.dispatch; - -import static org.junit.jupiter.api.Assertions.assertArrayEquals; -import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.util.LinkedList; -import java.util.List; -import java.util.Optional; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.stubbing.Answer; -import org.whispersystems.dispatch.io.RedisPubSubConnectionFactory; -import org.whispersystems.dispatch.redis.PubSubConnection; -import org.whispersystems.dispatch.redis.PubSubReply; - -public class DispatchManagerTest { - - private PubSubConnection pubSubConnection; - private RedisPubSubConnectionFactory socketFactory; - private DispatchManager dispatchManager; - private PubSubReplyInputStream pubSubReplyInputStream; - - @BeforeEach - void setUp() throws Exception { - pubSubConnection = mock(PubSubConnection.class ); - socketFactory = mock(RedisPubSubConnectionFactory.class); - pubSubReplyInputStream = new PubSubReplyInputStream(); - - when(socketFactory.connect()).thenReturn(pubSubConnection); - when(pubSubConnection.read()).thenAnswer((Answer) invocationOnMock -> pubSubReplyInputStream.read()); - - dispatchManager = new DispatchManager(socketFactory, Optional.empty()); - dispatchManager.start(); - } - - @AfterEach - void tearDown() { - dispatchManager.shutdown(); - } - - @Test - public void testConnect() { - verify(socketFactory).connect(); - } - - @Test - public void testSubscribe() { - DispatchChannel dispatchChannel = mock(DispatchChannel.class); - dispatchManager.subscribe("foo", dispatchChannel); - pubSubReplyInputStream.write(new PubSubReply(PubSubReply.Type.SUBSCRIBE, "foo", Optional.empty())); - - verify(dispatchChannel, timeout(1000)).onDispatchSubscribed(eq("foo")); - } - - @Test - public void testSubscribeUnsubscribe() { - DispatchChannel dispatchChannel = mock(DispatchChannel.class); - dispatchManager.subscribe("foo", dispatchChannel); - dispatchManager.unsubscribe("foo", dispatchChannel); - - pubSubReplyInputStream.write(new PubSubReply(PubSubReply.Type.SUBSCRIBE, "foo", Optional.empty())); - pubSubReplyInputStream.write(new PubSubReply(PubSubReply.Type.UNSUBSCRIBE, "foo", Optional.empty())); - - verify(dispatchChannel, timeout(1000)).onDispatchUnsubscribed(eq("foo")); - } - - @Test - public void testMessages() { - DispatchChannel fooChannel = mock(DispatchChannel.class); - DispatchChannel barChannel = mock(DispatchChannel.class); - - dispatchManager.subscribe("foo", fooChannel); - dispatchManager.subscribe("bar", barChannel); - - pubSubReplyInputStream.write(new PubSubReply(PubSubReply.Type.SUBSCRIBE, "foo", Optional.empty())); - pubSubReplyInputStream.write(new PubSubReply(PubSubReply.Type.SUBSCRIBE, "bar", Optional.empty())); - - verify(fooChannel, timeout(1000)).onDispatchSubscribed(eq("foo")); - verify(barChannel, timeout(1000)).onDispatchSubscribed(eq("bar")); - - pubSubReplyInputStream.write(new PubSubReply(PubSubReply.Type.MESSAGE, "foo", Optional.of("hello".getBytes()))); - pubSubReplyInputStream.write(new PubSubReply(PubSubReply.Type.MESSAGE, "bar", Optional.of("there".getBytes()))); - - ArgumentCaptor captor = ArgumentCaptor.forClass(byte[].class); - verify(fooChannel, timeout(1000)).onDispatchMessage(eq("foo"), captor.capture()); - - assertArrayEquals("hello".getBytes(), captor.getValue()); - - verify(barChannel, timeout(1000)).onDispatchMessage(eq("bar"), captor.capture()); - - assertArrayEquals("there".getBytes(), captor.getValue()); - } - - private static class PubSubReplyInputStream { - - private final List pubSubReplyList = new LinkedList<>(); - - public synchronized PubSubReply read() { - try { - while (pubSubReplyList.isEmpty()) wait(); - return pubSubReplyList.remove(0); - } catch (InterruptedException e) { - throw new AssertionError(e); - } - } - - public synchronized void write(PubSubReply pubSubReply) { - pubSubReplyList.add(pubSubReply); - notifyAll(); - } - } - -} diff --git a/redis-dispatch/src/test/java/org/whispersystems/dispatch/redis/PubSubConnectionTest.java b/redis-dispatch/src/test/java/org/whispersystems/dispatch/redis/PubSubConnectionTest.java deleted file mode 100644 index 4e0459359..000000000 --- a/redis-dispatch/src/test/java/org/whispersystems/dispatch/redis/PubSubConnectionTest.java +++ /dev/null @@ -1,264 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ -package org.whispersystems.dispatch.redis; - -import static org.junit.jupiter.api.Assertions.assertArrayEquals; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; -import java.security.SecureRandom; -import org.junit.jupiter.api.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -class PubSubConnectionTest { - - private static final String REPLY = "*3\r\n" + - "$9\r\n" + - "subscribe\r\n" + - "$5\r\n" + - "abcde\r\n" + - ":1\r\n" + - "*3\r\n" + - "$9\r\n" + - "subscribe\r\n" + - "$5\r\n" + - "fghij\r\n" + - ":2\r\n" + - "*3\r\n" + - "$9\r\n" + - "subscribe\r\n" + - "$5\r\n" + - "klmno\r\n" + - ":2\r\n" + - "*3\r\n" + - "$7\r\n" + - "message\r\n" + - "$5\r\n" + - "abcde\r\n" + - "$10\r\n" + - "1234567890\r\n" + - "*3\r\n" + - "$7\r\n" + - "message\r\n" + - "$5\r\n" + - "klmno\r\n" + - "$10\r\n" + - "0987654321\r\n"; - - - @Test - void testSubscribe() throws IOException { - OutputStream outputStream = mock(OutputStream.class); - Socket socket = mock(Socket.class ); - when(socket.getOutputStream()).thenReturn(outputStream); - PubSubConnection connection = new PubSubConnection(socket); - - connection.subscribe("foobar"); - - ArgumentCaptor captor = ArgumentCaptor.forClass(byte[].class); - verify(outputStream).write(captor.capture()); - - assertArrayEquals(captor.getValue(), "SUBSCRIBE foobar\r\n".getBytes()); - } - - @Test - void testUnsubscribe() throws IOException { - OutputStream outputStream = mock(OutputStream.class); - Socket socket = mock(Socket.class ); - when(socket.getOutputStream()).thenReturn(outputStream); - PubSubConnection connection = new PubSubConnection(socket); - - connection.unsubscribe("bazbar"); - - ArgumentCaptor captor = ArgumentCaptor.forClass(byte[].class); - verify(outputStream).write(captor.capture()); - - assertArrayEquals(captor.getValue(), "UNSUBSCRIBE bazbar\r\n".getBytes()); - } - - @Test - void testTricklyResponse() throws Exception { - InputStream inputStream = mockInputStreamFor(new TrickleInputStream(REPLY.getBytes())); - OutputStream outputStream = mock(OutputStream.class); - Socket socket = mock(Socket.class ); - when(socket.getOutputStream()).thenReturn(outputStream); - when(socket.getInputStream()).thenReturn(inputStream); - - PubSubConnection pubSubConnection = new PubSubConnection(socket); - readResponses(pubSubConnection); - } - - @Test - void testFullResponse() throws Exception { - InputStream inputStream = mockInputStreamFor(new FullInputStream(REPLY.getBytes())); - OutputStream outputStream = mock(OutputStream.class); - Socket socket = mock(Socket.class ); - when(socket.getOutputStream()).thenReturn(outputStream); - when(socket.getInputStream()).thenReturn(inputStream); - - PubSubConnection pubSubConnection = new PubSubConnection(socket); - readResponses(pubSubConnection); - } - - @Test - void testRandomLengthResponse() throws Exception { - InputStream inputStream = mockInputStreamFor(new RandomInputStream(REPLY.getBytes())); - OutputStream outputStream = mock(OutputStream.class); - Socket socket = mock(Socket.class ); - when(socket.getOutputStream()).thenReturn(outputStream); - when(socket.getInputStream()).thenReturn(inputStream); - - PubSubConnection pubSubConnection = new PubSubConnection(socket); - readResponses(pubSubConnection); - } - - private InputStream mockInputStreamFor(final MockInputStream stub) throws IOException { - InputStream result = mock(InputStream.class); - - when(result.read()).thenAnswer(new Answer() { - @Override - public Integer answer(InvocationOnMock invocationOnMock) throws Throwable { - return stub.read(); - } - }); - - when(result.read(any(byte[].class))).thenAnswer(new Answer() { - @Override - public Integer answer(InvocationOnMock invocationOnMock) throws Throwable { - byte[] buffer = (byte[])invocationOnMock.getArguments()[0]; - return stub.read(buffer, 0, buffer.length); - } - }); - - when(result.read(any(byte[].class), anyInt(), anyInt())).thenAnswer(new Answer() { - @Override - public Integer answer(InvocationOnMock invocationOnMock) throws Throwable { - byte[] buffer = (byte[]) invocationOnMock.getArguments()[0]; - int offset = (int) invocationOnMock.getArguments()[1]; - int length = (int) invocationOnMock.getArguments()[2]; - - return stub.read(buffer, offset, length); - } - }); - - return result; - } - - private void readResponses(PubSubConnection pubSubConnection) throws Exception { - PubSubReply reply = pubSubConnection.read(); - - assertEquals(reply.getType(), PubSubReply.Type.SUBSCRIBE); - assertEquals(reply.getChannel(), "abcde"); - assertFalse(reply.getContent().isPresent()); - - reply = pubSubConnection.read(); - - assertEquals(reply.getType(), PubSubReply.Type.SUBSCRIBE); - assertEquals(reply.getChannel(), "fghij"); - assertFalse(reply.getContent().isPresent()); - - reply = pubSubConnection.read(); - - assertEquals(reply.getType(), PubSubReply.Type.SUBSCRIBE); - assertEquals(reply.getChannel(), "klmno"); - assertFalse(reply.getContent().isPresent()); - - reply = pubSubConnection.read(); - - assertEquals(reply.getType(), PubSubReply.Type.MESSAGE); - assertEquals(reply.getChannel(), "abcde"); - assertArrayEquals(reply.getContent().get(), "1234567890".getBytes()); - - reply = pubSubConnection.read(); - - assertEquals(reply.getType(), PubSubReply.Type.MESSAGE); - assertEquals(reply.getChannel(), "klmno"); - assertArrayEquals(reply.getContent().get(), "0987654321".getBytes()); - } - - private interface MockInputStream { - public int read(); - public int read(byte[] input, int offset, int length); - } - - private static class TrickleInputStream implements MockInputStream { - - private final byte[] data; - private int index = 0; - - private TrickleInputStream(byte[] data) { - this.data = data; - } - - public int read() { - return data[index++]; - } - - public int read(byte[] input, int offset, int length) { - input[offset] = data[index++]; - return 1; - } - - } - - private static class FullInputStream implements MockInputStream { - - private final byte[] data; - private int index = 0; - - private FullInputStream(byte[] data) { - this.data = data; - } - - public int read() { - return data[index++]; - } - - public int read(byte[] input, int offset, int length) { - int amount = Math.min(data.length - index, length); - System.arraycopy(data, index, input, offset, amount); - index += length; - - return amount; - } - } - - private static class RandomInputStream implements MockInputStream { - private final byte[] data; - private int index = 0; - - private RandomInputStream(byte[] data) { - this.data = data; - } - - public int read() { - return data[index++]; - } - - public int read(byte[] input, int offset, int length) { - int maxCopy = Math.min(data.length - index, length); - int randomCopy = new SecureRandom().nextInt(maxCopy) + 1; - int copyAmount = Math.min(maxCopy, randomCopy); - - System.arraycopy(data, index, input, offset, copyAmount); - index += copyAmount; - - return copyAmount; - } - - } - -} diff --git a/redis-dispatch/src/test/java/org/whispersystems/dispatch/redis/protocol/ArrayReplyHeaderTest.java b/redis-dispatch/src/test/java/org/whispersystems/dispatch/redis/protocol/ArrayReplyHeaderTest.java deleted file mode 100644 index d7346019d..000000000 --- a/redis-dispatch/src/test/java/org/whispersystems/dispatch/redis/protocol/ArrayReplyHeaderTest.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ -package org.whispersystems.dispatch.redis.protocol; - - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; - -import java.io.IOException; -import org.junit.jupiter.api.Test; - -class ArrayReplyHeaderTest { - - @Test - void testNull() { - assertThrows(IOException.class, () -> new ArrayReplyHeader(null)); - } - - @Test - void testBadPrefix() { - assertThrows(IOException.class, () -> new ArrayReplyHeader(":3")); - } - - @Test - void testEmpty() { - assertThrows(IOException.class, () -> new ArrayReplyHeader("")); - } - - @Test - void testTruncated() { - assertThrows(IOException.class, () -> new ArrayReplyHeader("*")); - } - - @Test - void testBadNumber() { - assertThrows(IOException.class, () -> new ArrayReplyHeader("*ABC")); - } - - @Test - void testValid() throws IOException { - assertEquals(4, new ArrayReplyHeader("*4").getElementCount()); - } - - - - - - - - - -} diff --git a/redis-dispatch/src/test/java/org/whispersystems/dispatch/redis/protocol/IntReplyHeaderTest.java b/redis-dispatch/src/test/java/org/whispersystems/dispatch/redis/protocol/IntReplyHeaderTest.java deleted file mode 100644 index a5efb357d..000000000 --- a/redis-dispatch/src/test/java/org/whispersystems/dispatch/redis/protocol/IntReplyHeaderTest.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ -package org.whispersystems.dispatch.redis.protocol; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; - -import java.io.IOException; -import org.junit.jupiter.api.Test; - -class IntReplyHeaderTest { - - @Test - void testNull() { - assertThrows(IOException.class, () -> new IntReply(null)); - } - - @Test - void testEmpty() { - assertThrows(IOException.class, () -> new IntReply("")); - } - - @Test - void testBadNumber() { - assertThrows(IOException.class, () -> new IntReply(":A")); - } - - @Test - void testBadFormat() { - assertThrows(IOException.class, () -> new IntReply("*")); - } - - @Test - void testValid() throws IOException { - assertEquals(23, new IntReply(":23").getValue()); - } -} diff --git a/redis-dispatch/src/test/java/org/whispersystems/dispatch/redis/protocol/StringReplyHeaderTest.java b/redis-dispatch/src/test/java/org/whispersystems/dispatch/redis/protocol/StringReplyHeaderTest.java deleted file mode 100644 index 206041b6f..000000000 --- a/redis-dispatch/src/test/java/org/whispersystems/dispatch/redis/protocol/StringReplyHeaderTest.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ -package org.whispersystems.dispatch.redis.protocol; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; - -import java.io.IOException; -import org.junit.jupiter.api.Test; - -class StringReplyHeaderTest { - - @Test - void testNull() { - assertThrows(IOException.class, () -> new StringReplyHeader(null)); - } - - @Test - void testBadNumber() { - assertThrows(IOException.class, () -> new StringReplyHeader("$100A")); - } - - @Test - void testBadPrefix() { - assertThrows(IOException.class, () -> new StringReplyHeader("*")); - } - - @Test - void testValid() throws IOException { - assertEquals(1000, new StringReplyHeader("$1000").getStringLength()); - } - -} diff --git a/service/pom.xml b/service/pom.xml index e0493cb66..3d04a229b 100644 --- a/service/pom.xml +++ b/service/pom.xml @@ -41,11 +41,6 @@ event-logger ${project.version} - - org.whispersystems.textsecure - redis-dispatch - ${project.version} - org.whispersystems.textsecure websocket-resources diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClientFactory.java b/service/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClientFactory.java deleted file mode 100644 index 24ede2139..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClientFactory.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ -package org.whispersystems.textsecuregcm.providers; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.whispersystems.dispatch.io.RedisPubSubConnectionFactory; -import org.whispersystems.dispatch.redis.PubSubConnection; -import org.whispersystems.textsecuregcm.configuration.CircuitBreakerConfiguration; -import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; -import org.whispersystems.textsecuregcm.util.Util; - -import java.io.IOException; -import java.net.Socket; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.LinkedList; -import java.util.List; - -import redis.clients.jedis.JedisPool; -import redis.clients.jedis.JedisPoolConfig; -import redis.clients.jedis.Protocol; - -public class RedisClientFactory implements RedisPubSubConnectionFactory { - - private final Logger logger = LoggerFactory.getLogger(RedisClientFactory.class); - - private final String host; - private final int port; - private final ReplicatedJedisPool jedisPool; - - public RedisClientFactory(String name, String url, List replicaUrls, CircuitBreakerConfiguration circuitBreakerConfiguration) - throws URISyntaxException - { - JedisPoolConfig poolConfig = new JedisPoolConfig(); - poolConfig.setTestOnBorrow(true); - poolConfig.setMaxWaitMillis(10000); - - URI redisURI = new URI(url); - - this.host = redisURI.getHost(); - this.port = redisURI.getPort(); - - JedisPool masterPool = new JedisPool(poolConfig, host, port, Protocol.DEFAULT_TIMEOUT, null); - List replicaPools = new LinkedList<>(); - - for (String replicaUrl : replicaUrls) { - URI replicaURI = new URI(replicaUrl); - - replicaPools.add(new JedisPool(poolConfig, replicaURI.getHost(), replicaURI.getPort(), - 500, Protocol.DEFAULT_TIMEOUT, null, - Protocol.DEFAULT_DATABASE, null, false, null , - null, null)); - } - - this.jedisPool = new ReplicatedJedisPool(name, masterPool, replicaPools, circuitBreakerConfiguration); - } - - public ReplicatedJedisPool getRedisClientPool() { - return jedisPool; - } - - @Override - public PubSubConnection connect() { - while (true) { - try { - Socket socket = new Socket(host, port); - return new PubSubConnection(socket); - } catch (IOException e) { - logger.warn("Error connecting", e); - Util.sleep(200); - } - } - } -} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubManager.java deleted file mode 100644 index e2ba58d92..000000000 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubManager.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Copyright 2013-2020 Signal Messenger, LLC - * SPDX-License-Identifier: AGPL-3.0-only - */ - -package org.whispersystems.textsecuregcm.storage; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.whispersystems.dispatch.DispatchChannel; -import org.whispersystems.dispatch.DispatchManager; -import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; - -import io.dropwizard.lifecycle.Managed; -import static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage; -import redis.clients.jedis.Jedis; - -public class PubSubManager implements Managed { - - private static final String KEEPALIVE_CHANNEL = "KEEPALIVE"; - - private final Logger logger = LoggerFactory.getLogger(PubSubManager.class); - - private final DispatchManager dispatchManager; - private final ReplicatedJedisPool jedisPool; - - private boolean subscribed = false; - - public PubSubManager(ReplicatedJedisPool jedisPool, DispatchManager dispatchManager) { - this.dispatchManager = dispatchManager; - this.jedisPool = jedisPool; - } - - @Override - public void start() throws Exception { - this.dispatchManager.start(); - - KeepaliveDispatchChannel keepaliveDispatchChannel = new KeepaliveDispatchChannel(); - this.dispatchManager.subscribe(KEEPALIVE_CHANNEL, keepaliveDispatchChannel); - - synchronized (this) { - while (!subscribed) wait(0); - } - - new KeepaliveSender().start(); - } - - @Override - public void stop() throws Exception { - dispatchManager.shutdown(); - } - - public void subscribe(PubSubAddress address, DispatchChannel channel) { - dispatchManager.subscribe(address.serialize(), channel); - } - - public void unsubscribe(PubSubAddress address, DispatchChannel dispatchChannel) { - dispatchManager.unsubscribe(address.serialize(), dispatchChannel); - } - - public boolean hasLocalSubscription(PubSubAddress address) { - return dispatchManager.hasSubscription(address.serialize()); - } - - public boolean publish(PubSubAddress address, PubSubMessage message) { - return publish(address.serialize().getBytes(), message); - } - - private boolean publish(byte[] channel, PubSubMessage message) { - try (Jedis jedis = jedisPool.getWriteResource()) { - long result = jedis.publish(channel, message.toByteArray()); - - if (result < 0) { - logger.warn("**** Jedis publish result < 0"); - } - - return result > 0; - } - } - - private class KeepaliveDispatchChannel implements DispatchChannel { - - @Override - public void onDispatchMessage(String channel, byte[] message) { - // Good - } - - @Override - public void onDispatchSubscribed(String channel) { - if (KEEPALIVE_CHANNEL.equals(channel)) { - synchronized (PubSubManager.this) { - subscribed = true; - PubSubManager.this.notifyAll(); - } - } - } - - @Override - public void onDispatchUnsubscribed(String channel) { - logger.warn("***** KEEPALIVE CHANNEL UNSUBSCRIBED *****"); - } - } - - private class KeepaliveSender extends Thread { - @Override - public void run() { - while (true) { - try { - Thread.sleep(20000); - publish(KEEPALIVE_CHANNEL.getBytes(), PubSubMessage.newBuilder() - .setType(PubSubMessage.Type.KEEPALIVE) - .build()); - } catch (Throwable e) { - logger.warn("***** KEEPALIVE EXCEPTION ******", e); - } - } - } - } -} diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/subscriptions/ProcessorCustomer.java b/service/src/main/java/org/whispersystems/textsecuregcm/subscriptions/ProcessorCustomer.java index 73c5d6b46..c994439c3 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/subscriptions/ProcessorCustomer.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/subscriptions/ProcessorCustomer.java @@ -6,11 +6,16 @@ package org.whispersystems.textsecuregcm.subscriptions; import java.nio.charset.StandardCharsets; -import org.whispersystems.dispatch.util.Util; public record ProcessorCustomer(String customerId, SubscriptionProcessor processor) { public byte[] toDynamoBytes() { - return Util.combine(new byte[]{processor.getId()}, customerId.getBytes(StandardCharsets.UTF_8)); + final byte[] customerIdBytes = customerId.getBytes(StandardCharsets.UTF_8); + final byte[] combinedBytes = new byte[customerIdBytes.length + 1]; + + combinedBytes[0] = processor.getId(); + System.arraycopy(customerIdBytes, 0, combinedBytes, 1, customerIdBytes.length); + + return combinedBytes; } } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/subscriptions/ProcessorCustomerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/subscriptions/ProcessorCustomerTest.java new file mode 100644 index 000000000..1fe65c941 --- /dev/null +++ b/service/src/test/java/org/whispersystems/textsecuregcm/subscriptions/ProcessorCustomerTest.java @@ -0,0 +1,16 @@ +package org.whispersystems.textsecuregcm.subscriptions; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +class ProcessorCustomerTest { + + @Test + void toDynamoBytes() { + final ProcessorCustomer processorCustomer = new ProcessorCustomer("Test", SubscriptionProcessor.BRAINTREE); + + assertArrayEquals(new byte[] { SubscriptionProcessor.BRAINTREE.getId(), 'T', 'e', 's', 't' }, + processorCustomer.toDynamoBytes()); + } +}