From 49dad3099a1cdb91c0afdaac4e16eb4ee15ac94e Mon Sep 17 00:00:00 2001 From: Moxie Marlinspike Date: Tue, 10 Apr 2018 13:19:40 -0700 Subject: [PATCH] Support for replicated redis clusters --- protobuf/StoredMessage.proto | 30 ----- .../textsecuregcm/WhisperServerService.java | 12 +- .../configuration/RedisConfiguration.java | 11 ++ .../limits/LockingRateLimiter.java | 8 +- .../textsecuregcm/limits/RateLimiter.java | 24 ++-- .../textsecuregcm/limits/RateLimiters.java | 5 +- .../providers/RedisClientFactory.java | 25 +++- .../providers/RedisHealthCheck.java | 8 +- .../textsecuregcm/redis/LuaScript.java | 23 ++-- .../redis/ReplicatedJedisPool.java | 56 +++++++++ .../storage/AccountsManager.java | 19 ++-- .../storage/DirectoryManager.java | 18 +-- .../textsecuregcm/storage/MessagesCache.java | 44 +++---- .../storage/PendingAccountsManager.java | 17 ++- .../storage/PendingDevicesManager.java | 18 ++- .../textsecuregcm/storage/PubSubManager.java | 14 +-- .../workers/DeleteUserCommand.java | 11 +- .../workers/DirectoryCommand.java | 11 +- .../tests/redis/ReplicatedJedisPoolTest.java | 107 ++++++++++++++++++ 19 files changed, 304 insertions(+), 157 deletions(-) delete mode 100644 protobuf/StoredMessage.proto create mode 100644 src/main/java/org/whispersystems/textsecuregcm/redis/ReplicatedJedisPool.java create mode 100644 src/test/java/org/whispersystems/textsecuregcm/tests/redis/ReplicatedJedisPoolTest.java diff --git a/protobuf/StoredMessage.proto b/protobuf/StoredMessage.proto deleted file mode 100644 index 05a6b88e5..000000000 --- a/protobuf/StoredMessage.proto +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Copyright (C) 2014 Open Whisper Systems - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package textsecure; - -option java_package = "org.whispersystems.textsecuregcm.storage"; -option java_outer_classname = "StoredMessageProtos"; - -message StoredMessage { - enum Type { - UNKNOWN = 0; - MESSAGE = 1; - } - - optional Type type = 1; - optional bytes content = 2; -} diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 4dfb071ae..639f42964 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -63,6 +63,7 @@ import org.whispersystems.textsecuregcm.push.GCMSender; import org.whispersystems.textsecuregcm.push.PushSender; import org.whispersystems.textsecuregcm.push.ReceiptSender; import org.whispersystems.textsecuregcm.push.WebsocketSender; +import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.s3.UrlSigner; import org.whispersystems.textsecuregcm.sms.SmsSender; import org.whispersystems.textsecuregcm.sms.TwilioSmsSender; @@ -104,7 +105,6 @@ import io.dropwizard.db.DataSourceFactory; import io.dropwizard.jdbi.DBIFactory; import io.dropwizard.setup.Bootstrap; import io.dropwizard.setup.Environment; -import redis.clients.jedis.JedisPool; public class WhisperServerService extends Application { @@ -158,10 +158,12 @@ public class WhisperServerService extends Application replicaUrls; + public String getUrl() { return url; } + + public List getReplicaUrls() { + return replicaUrls; + } } diff --git a/src/main/java/org/whispersystems/textsecuregcm/limits/LockingRateLimiter.java b/src/main/java/org/whispersystems/textsecuregcm/limits/LockingRateLimiter.java index 26b6ad055..fbcbf31f4 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/limits/LockingRateLimiter.java +++ b/src/main/java/org/whispersystems/textsecuregcm/limits/LockingRateLimiter.java @@ -4,17 +4,17 @@ import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException; +import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.util.Constants; import static com.codahale.metrics.MetricRegistry.name; import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisPool; public class LockingRateLimiter extends RateLimiter { private final Meter meter; - public LockingRateLimiter(JedisPool cacheClient, String name, int bucketSize, double leakRatePerMinute) { + public LockingRateLimiter(ReplicatedJedisPool cacheClient, String name, int bucketSize, double leakRatePerMinute) { super(cacheClient, name, bucketSize, leakRatePerMinute); MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME); @@ -41,13 +41,13 @@ public class LockingRateLimiter extends RateLimiter { } private void releaseLock(String key) { - try (Jedis jedis = cacheClient.getResource()) { + try (Jedis jedis = cacheClient.getWriteResource()) { jedis.del(getLockName(key)); } } private boolean acquireLock(String key) { - try (Jedis jedis = cacheClient.getResource()) { + try (Jedis jedis = cacheClient.getWriteResource()) { return jedis.set(getLockName(key), "L", "NX", "EX", 10) != null; } } diff --git a/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java b/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java index 5da82f6aa..99332ddef 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java +++ b/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException; +import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.SystemMapper; @@ -31,27 +32,26 @@ import java.io.IOException; import static com.codahale.metrics.MetricRegistry.name; import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisPool; public class RateLimiter { private final Logger logger = LoggerFactory.getLogger(RateLimiter.class); private final ObjectMapper mapper = SystemMapper.getMapper(); - private final Meter meter; - protected final JedisPool cacheClient; - protected final String name; - private final int bucketSize; - private final double leakRatePerMillis; - private final boolean reportLimits; + private final Meter meter; + protected final ReplicatedJedisPool cacheClient; + protected final String name; + private final int bucketSize; + private final double leakRatePerMillis; + private final boolean reportLimits; - public RateLimiter(JedisPool cacheClient, String name, + public RateLimiter(ReplicatedJedisPool cacheClient, String name, int bucketSize, double leakRatePerMinute) { this(cacheClient, name, bucketSize, leakRatePerMinute, false); } - public RateLimiter(JedisPool cacheClient, String name, + public RateLimiter(ReplicatedJedisPool cacheClient, String name, int bucketSize, double leakRatePerMinute, boolean reportLimits) { @@ -81,13 +81,13 @@ public class RateLimiter { } public void clear(String key) { - try (Jedis jedis = cacheClient.getResource()) { + try (Jedis jedis = cacheClient.getWriteResource()) { jedis.del(getBucketName(key)); } } private void setBucket(String key, LeakyBucket bucket) { - try (Jedis jedis = cacheClient.getResource()) { + try (Jedis jedis = cacheClient.getWriteResource()) { String serialized = bucket.serialize(mapper); jedis.setex(getBucketName(key), (int) Math.ceil((bucketSize / leakRatePerMillis) / 1000), serialized); } catch (JsonProcessingException e) { @@ -96,7 +96,7 @@ public class RateLimiter { } private LeakyBucket getBucket(String key) { - try (Jedis jedis = cacheClient.getResource()) { + try (Jedis jedis = cacheClient.getReadResource()) { String serialized = jedis.get(getBucketName(key)); if (serialized != null) { diff --git a/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java b/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java index 1dff0e157..4bbaabeee 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java +++ b/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java @@ -18,8 +18,7 @@ package org.whispersystems.textsecuregcm.limits; import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration; - -import redis.clients.jedis.JedisPool; +import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; public class RateLimiters { @@ -41,7 +40,7 @@ public class RateLimiters { private final RateLimiter profileLimiter; - public RateLimiters(RateLimitsConfiguration config, JedisPool cacheClient) { + public RateLimiters(RateLimitsConfiguration config, ReplicatedJedisPool cacheClient) { this.smsDestinationLimiter = new RateLimiter(cacheClient, "smsDestination", config.getSmsDestination().getBucketSize(), config.getSmsDestination().getLeakRatePerMinute()); diff --git a/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClientFactory.java b/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClientFactory.java index f6134df52..7333f73f5 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClientFactory.java +++ b/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClientFactory.java @@ -20,12 +20,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.dispatch.io.RedisPubSubConnectionFactory; import org.whispersystems.dispatch.redis.PubSubConnection; +import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.util.Util; import java.io.IOException; import java.net.Socket; import java.net.URI; import java.net.URISyntaxException; +import java.util.LinkedList; +import java.util.List; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; @@ -37,9 +40,9 @@ public class RedisClientFactory implements RedisPubSubConnectionFactory { private final String host; private final int port; - private final JedisPool jedisPool; + private final ReplicatedJedisPool jedisPool; - public RedisClientFactory(String url) throws URISyntaxException { + public RedisClientFactory(String url, List replicaUrls) throws URISyntaxException { JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setTestOnBorrow(true); @@ -47,11 +50,23 @@ public class RedisClientFactory implements RedisPubSubConnectionFactory { this.host = redisURI.getHost(); this.port = redisURI.getPort(); - this.jedisPool = new JedisPool(poolConfig, host, port, - Protocol.DEFAULT_TIMEOUT, null); + + JedisPool masterPool = new JedisPool(poolConfig, host, port, Protocol.DEFAULT_TIMEOUT, null); + List replicaPools = new LinkedList<>(); + + for (String replicaUrl : replicaUrls) { + URI replicaURI = new URI(replicaUrl); + + replicaPools.add(new JedisPool(poolConfig, replicaURI.getHost(), replicaURI.getPort(), + 500, Protocol.DEFAULT_TIMEOUT, null, + Protocol.DEFAULT_DATABASE, null, false, null , + null, null)); + } + + this.jedisPool = new ReplicatedJedisPool(masterPool, replicaPools); } - public JedisPool getRedisClientPool() { + public ReplicatedJedisPool getRedisClientPool() { return jedisPool; } diff --git a/src/main/java/org/whispersystems/textsecuregcm/providers/RedisHealthCheck.java b/src/main/java/org/whispersystems/textsecuregcm/providers/RedisHealthCheck.java index 5340ecf24..71f39d5e2 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/providers/RedisHealthCheck.java +++ b/src/main/java/org/whispersystems/textsecuregcm/providers/RedisHealthCheck.java @@ -17,21 +17,21 @@ package org.whispersystems.textsecuregcm.providers; import com.codahale.metrics.health.HealthCheck; +import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisPool; public class RedisHealthCheck extends HealthCheck { - private final JedisPool clientPool; + private final ReplicatedJedisPool clientPool; - public RedisHealthCheck(JedisPool clientPool) { + public RedisHealthCheck(ReplicatedJedisPool clientPool) { this.clientPool = clientPool; } @Override protected Result check() throws Exception { - try (Jedis client = clientPool.getResource()) { + try (Jedis client = clientPool.getWriteResource()) { client.set("HEALTH", "test"); if (!"test".equals(client.get("HEALTH"))) { diff --git a/src/main/java/org/whispersystems/textsecuregcm/redis/LuaScript.java b/src/main/java/org/whispersystems/textsecuregcm/redis/LuaScript.java index 643b57dcb..24d067d67 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/redis/LuaScript.java +++ b/src/main/java/org/whispersystems/textsecuregcm/redis/LuaScript.java @@ -1,27 +1,20 @@ package org.whispersystems.textsecuregcm.redis; import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; -import java.net.URL; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; import java.util.List; import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisPool; import redis.clients.jedis.exceptions.JedisDataException; public class LuaScript { - private final JedisPool jedisPool; - private final String script; - private final byte[] sha; + private final ReplicatedJedisPool jedisPool; + private final String script; + private final byte[] sha; - public static LuaScript fromResource(JedisPool jedisPool, String resource) throws IOException { + public static LuaScript fromResource(ReplicatedJedisPool jedisPool, String resource) throws IOException { InputStream inputStream = LuaScript.class.getClassLoader().getResourceAsStream(resource); ByteArrayOutputStream baos = new ByteArrayOutputStream(); @@ -38,14 +31,14 @@ public class LuaScript { return new LuaScript(jedisPool, new String(baos.toByteArray())); } - private LuaScript(JedisPool jedisPool, String script) { + private LuaScript(ReplicatedJedisPool jedisPool, String script) { this.jedisPool = jedisPool; this.script = script; this.sha = storeScript(jedisPool, script).getBytes(); } public Object execute(List keys, List args) { - try (Jedis jedis = jedisPool.getResource()) { + try (Jedis jedis = jedisPool.getWriteResource()) { try { return jedis.evalsha(sha, keys, args); } catch (JedisDataException e) { @@ -55,8 +48,8 @@ public class LuaScript { } } - private String storeScript(JedisPool jedisPool, String script) { - try (Jedis jedis = jedisPool.getResource()) { + private String storeScript(ReplicatedJedisPool jedisPool, String script) { + try (Jedis jedis = jedisPool.getWriteResource()) { return jedis.scriptLoad(script); } } diff --git a/src/main/java/org/whispersystems/textsecuregcm/redis/ReplicatedJedisPool.java b/src/main/java/org/whispersystems/textsecuregcm/redis/ReplicatedJedisPool.java new file mode 100644 index 000000000..d112d6e4a --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/redis/ReplicatedJedisPool.java @@ -0,0 +1,56 @@ +package org.whispersystems.textsecuregcm.redis; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.exceptions.JedisException; + +public class ReplicatedJedisPool { + + private final Logger logger = LoggerFactory.getLogger(ReplicatedJedisPool.class); + private final AtomicInteger replicaIndex = new AtomicInteger(0); + + private final JedisPool master; + private final JedisPool[] replicas; + + public ReplicatedJedisPool(JedisPool master, List replicas) { + if (replicas.size() < 1) throw new IllegalArgumentException("There must be at least one replica"); + + this.master = master; + this.replicas = new JedisPool[replicas.size()]; + + for (int i=0;i memcacheGet(String number) { - try (Jedis jedis = cacheClient.getResource()) { + try (Jedis jedis = cacheClient.getReadResource()) { String json = jedis.get(getKey(number)); if (json != null) return Optional.of(mapper.readValue(json, Account.class)); diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryManager.java b/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryManager.java index 47e077f8c..cff2024af 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryManager.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryManager.java @@ -24,6 +24,7 @@ import com.google.common.base.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.entities.ClientContact; +import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.util.IterablePair; import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.textsecuregcm.util.Util; @@ -33,7 +34,6 @@ import java.util.LinkedList; import java.util.List; import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisPool; import redis.clients.jedis.Pipeline; import redis.clients.jedis.Response; @@ -44,9 +44,9 @@ public class DirectoryManager { private static final byte[] DIRECTORY_KEY = {'d', 'i', 'r', 'e', 'c', 't', 'o', 'r', 'y'}; private final ObjectMapper objectMapper; - private final JedisPool redisPool; + private final ReplicatedJedisPool redisPool; - public DirectoryManager(JedisPool redisPool) { + public DirectoryManager(ReplicatedJedisPool redisPool) { this.redisPool = redisPool; this.objectMapper = new ObjectMapper(); this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); @@ -61,7 +61,7 @@ public class DirectoryManager { } public void remove(byte[] token) { - try (Jedis jedis = redisPool.getResource()) { + try (Jedis jedis = redisPool.getWriteResource()) { jedis.hdel(DIRECTORY_KEY, token); } } @@ -74,7 +74,7 @@ public class DirectoryManager { public void add(ClientContact contact) { TokenValue tokenValue = new TokenValue(contact.getRelay(), contact.isVoice(), contact.isVideo()); - try (Jedis jedis = redisPool.getResource()) { + try (Jedis jedis = redisPool.getWriteResource()) { jedis.hset(DIRECTORY_KEY, contact.getToken(), objectMapper.writeValueAsBytes(tokenValue)); } catch (JsonProcessingException e) { logger.warn("JSON Serialization", e); @@ -98,7 +98,7 @@ public class DirectoryManager { } public Optional get(byte[] token) { - try (Jedis jedis = redisPool.getResource()) { + try (Jedis jedis = redisPool.getWriteResource()) { byte[] result = jedis.hget(DIRECTORY_KEY, token); if (result == null) { @@ -114,7 +114,7 @@ public class DirectoryManager { } public List get(List tokens) { - try (Jedis jedis = redisPool.getResource()) { + try (Jedis jedis = redisPool.getWriteResource()) { Pipeline pipeline = jedis.pipelined(); List> futures = new LinkedList<>(); List results = new LinkedList<>(); @@ -147,7 +147,7 @@ public class DirectoryManager { } public BatchOperationHandle startBatchOperation() { - Jedis jedis = redisPool.getResource(); + Jedis jedis = redisPool.getWriteResource(); return new BatchOperationHandle(jedis, jedis.pipelined()); } @@ -156,7 +156,7 @@ public class DirectoryManager { Jedis jedis = handle.jedis; pipeline.sync(); - redisPool.returnResource(jedis); + redisPool.returnWriteResource(jedis); } public static class BatchOperationHandle { diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java index b99636c49..459ce4ab8 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java @@ -14,6 +14,7 @@ import org.whispersystems.textsecuregcm.push.NotPushRegisteredException; import org.whispersystems.textsecuregcm.push.PushSender; import org.whispersystems.textsecuregcm.push.TransientPushFailureException; import org.whispersystems.textsecuregcm.redis.LuaScript; +import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.util.Constants; import org.whispersystems.textsecuregcm.util.Pair; import org.whispersystems.textsecuregcm.util.Util; @@ -32,7 +33,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import static com.codahale.metrics.MetricRegistry.name; import io.dropwizard.lifecycle.Managed; import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisPool; import redis.clients.jedis.Tuple; import redis.clients.util.SafeEncoder; @@ -48,10 +48,10 @@ public class MessagesCache implements Managed { private static final Timer clearAccountTimer = metricRegistry.timer(name(MessagesCache.class, "clearAccount")); private static final Timer clearDeviceTimer = metricRegistry.timer(name(MessagesCache.class, "clearDevice" )); - private final JedisPool jedisPool; - private final Messages database; - private final AccountsManager accountsManager; - private final int delayMinutes; + private final ReplicatedJedisPool jedisPool; + private final Messages database; + private final AccountsManager accountsManager; + private final int delayMinutes; private InsertOperation insertOperation; private RemoveOperation removeOperation; @@ -61,7 +61,7 @@ public class MessagesCache implements Managed { private PushSender pushSender; private MessagePersister messagePersister; - public MessagesCache(JedisPool jedisPool, Messages database, AccountsManager accountsManager, int delayMinutes) { + public MessagesCache(ReplicatedJedisPool jedisPool, Messages database, AccountsManager accountsManager, int delayMinutes) { this.jedisPool = jedisPool; this.database = database; this.accountsManager = accountsManager; @@ -244,7 +244,7 @@ public class MessagesCache implements Managed { private static class InsertOperation { private final LuaScript insert; - InsertOperation(JedisPool jedisPool) throws IOException { + InsertOperation(ReplicatedJedisPool jedisPool) throws IOException { this.insert = LuaScript.fromResource(jedisPool, "lua/insert_item.lua"); } @@ -265,7 +265,7 @@ public class MessagesCache implements Managed { private final LuaScript removeBySender; private final LuaScript removeQueue; - RemoveOperation(JedisPool jedisPool) throws IOException { + RemoveOperation(ReplicatedJedisPool jedisPool) throws IOException { this.removeById = LuaScript.fromResource(jedisPool, "lua/remove_item_by_id.lua" ); this.removeBySender = LuaScript.fromResource(jedisPool, "lua/remove_item_by_sender.lua"); this.removeQueue = LuaScript.fromResource(jedisPool, "lua/remove_queue.lua" ); @@ -305,7 +305,7 @@ public class MessagesCache implements Managed { private final LuaScript getQueues; private final LuaScript getItems; - GetOperation(JedisPool jedisPool) throws IOException { + GetOperation(ReplicatedJedisPool jedisPool) throws IOException { this.getQueues = LuaScript.fromResource(jedisPool, "lua/get_queues_to_persist.lua"); this.getItems = LuaScript.fromResource(jedisPool, "lua/get_items.lua"); } @@ -346,10 +346,10 @@ public class MessagesCache implements Managed { private final AtomicBoolean running = new AtomicBoolean(true); - private final JedisPool jedisPool; - private final Messages database; - private final long delayTime; - private final TimeUnit delayTimeUnit; + private final ReplicatedJedisPool jedisPool; + private final Messages database; + private final long delayTime; + private final TimeUnit delayTimeUnit; private final PubSubManager pubSubManager; private final PushSender pushSender; @@ -360,13 +360,13 @@ public class MessagesCache implements Managed { private boolean finished = false; - MessagePersister(JedisPool jedisPool, - Messages database, - PubSubManager pubSubManager, - PushSender pushSender, - AccountsManager accountsManager, - long delayTime, - TimeUnit delayTimeUnit) + MessagePersister(ReplicatedJedisPool jedisPool, + Messages database, + PubSubManager pubSubManager, + PushSender pushSender, + AccountsManager accountsManager, + long delayTime, + TimeUnit delayTimeUnit) throws IOException { super(MessagePersister.class.getSimpleName()); @@ -416,12 +416,12 @@ public class MessagesCache implements Managed { while (!finished) Util.wait(this); } - private void persistQueue(JedisPool jedisPool, Key key) throws IOException { + private void persistQueue(ReplicatedJedisPool jedisPool, Key key) throws IOException { Timer.Context timer = persistQueueTimer.time(); int messagesPersistedCount = 0; - try (Jedis jedis = jedisPool.getResource()) { + try (Jedis jedis = jedisPool.getWriteResource()) { while (true) { jedis.setex(key.getUserMessageQueuePersistInProgress(), 30, "1".getBytes()); diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccountsManager.java b/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccountsManager.java index cef593dcd..a271bdba2 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccountsManager.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccountsManager.java @@ -19,16 +19,15 @@ package org.whispersystems.textsecuregcm.storage; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.auth.StoredVerificationCode; +import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.util.SystemMapper; import java.io.IOException; import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisPool; public class PendingAccountsManager { @@ -36,11 +35,11 @@ public class PendingAccountsManager { private static final String CACHE_PREFIX = "pending_account2::"; - private final PendingAccounts pendingAccounts; - private final JedisPool cacheClient; - private final ObjectMapper mapper; + private final PendingAccounts pendingAccounts; + private final ReplicatedJedisPool cacheClient; + private final ObjectMapper mapper; - public PendingAccountsManager(PendingAccounts pendingAccounts, JedisPool cacheClient) + public PendingAccountsManager(PendingAccounts pendingAccounts, ReplicatedJedisPool cacheClient) { this.pendingAccounts = pendingAccounts; this.cacheClient = cacheClient; @@ -72,7 +71,7 @@ public class PendingAccountsManager { } private void memcacheSet(String number, StoredVerificationCode code) { - try (Jedis jedis = cacheClient.getResource()) { + try (Jedis jedis = cacheClient.getWriteResource()) { jedis.set(CACHE_PREFIX + number, mapper.writeValueAsString(code)); } catch (JsonProcessingException e) { throw new IllegalArgumentException(e); @@ -80,7 +79,7 @@ public class PendingAccountsManager { } private Optional memcacheGet(String number) { - try (Jedis jedis = cacheClient.getResource()) { + try (Jedis jedis = cacheClient.getReadResource()) { String json = jedis.get(CACHE_PREFIX + number); if (json == null) return Optional.absent(); @@ -92,7 +91,7 @@ public class PendingAccountsManager { } private void memcacheDelete(String number) { - try (Jedis jedis = cacheClient.getResource()) { + try (Jedis jedis = cacheClient.getWriteResource()) { jedis.del(CACHE_PREFIX + number); } } diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/PendingDevicesManager.java b/src/main/java/org/whispersystems/textsecuregcm/storage/PendingDevicesManager.java index 20e418542..0c131f837 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/PendingDevicesManager.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/PendingDevicesManager.java @@ -22,12 +22,12 @@ import com.google.common.base.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.auth.StoredVerificationCode; +import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.util.SystemMapper; import java.io.IOException; import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisPool; public class PendingDevicesManager { @@ -35,13 +35,11 @@ public class PendingDevicesManager { private static final String CACHE_PREFIX = "pending_devices2::"; - private final PendingDevices pendingDevices; - private final JedisPool cacheClient; - private final ObjectMapper mapper; + private final PendingDevices pendingDevices; + private final ReplicatedJedisPool cacheClient; + private final ObjectMapper mapper; - public PendingDevicesManager(PendingDevices pendingDevices, - JedisPool cacheClient) - { + public PendingDevicesManager(PendingDevices pendingDevices, ReplicatedJedisPool cacheClient) { this.pendingDevices = pendingDevices; this.cacheClient = cacheClient; this.mapper = SystemMapper.getMapper(); @@ -72,7 +70,7 @@ public class PendingDevicesManager { } private void memcacheSet(String number, StoredVerificationCode code) { - try (Jedis jedis = cacheClient.getResource()) { + try (Jedis jedis = cacheClient.getWriteResource()) { jedis.set(CACHE_PREFIX + number, mapper.writeValueAsString(code)); } catch (JsonProcessingException e) { throw new IllegalArgumentException(e); @@ -80,7 +78,7 @@ public class PendingDevicesManager { } private Optional memcacheGet(String number) { - try (Jedis jedis = cacheClient.getResource()) { + try (Jedis jedis = cacheClient.getReadResource()) { String json = jedis.get(CACHE_PREFIX + number); if (json == null) return Optional.absent(); @@ -92,7 +90,7 @@ public class PendingDevicesManager { } private void memcacheDelete(String number) { - try (Jedis jedis = cacheClient.getResource()) { + try (Jedis jedis = cacheClient.getWriteResource()) { jedis.del(CACHE_PREFIX + number); } } diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubManager.java b/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubManager.java index f721a1866..ca0f7b8bf 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubManager.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubManager.java @@ -4,13 +4,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.dispatch.DispatchChannel; import org.whispersystems.dispatch.DispatchManager; - -import java.util.Arrays; +import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import io.dropwizard.lifecycle.Managed; import static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage; import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisPool; public class PubSubManager implements Managed { @@ -18,14 +16,14 @@ public class PubSubManager implements Managed { private final Logger logger = LoggerFactory.getLogger(PubSubManager.class); - private final DispatchManager dispatchManager; - private final JedisPool jedisPool; + private final DispatchManager dispatchManager; + private final ReplicatedJedisPool jedisPool; private boolean subscribed = false; - public PubSubManager(JedisPool jedisPool, DispatchManager dispatchManager) { + public PubSubManager(ReplicatedJedisPool jedisPool, DispatchManager dispatchManager) { this.dispatchManager = dispatchManager; - this.jedisPool = jedisPool; + this.jedisPool = jedisPool; } @Override @@ -64,7 +62,7 @@ public class PubSubManager implements Managed { } private boolean publish(byte[] channel, PubSubMessage message) { - try (Jedis jedis = jedisPool.getResource()) { + try (Jedis jedis = jedisPool.getWriteResource()) { long result = jedis.publish(channel, message.toByteArray()); if (result < 0) { diff --git a/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java b/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java index c00f6a6fc..09a23ac13 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java +++ b/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java @@ -10,6 +10,7 @@ import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.WhisperServerConfiguration; import org.whispersystems.textsecuregcm.auth.AuthenticationCredentials; import org.whispersystems.textsecuregcm.providers.RedisClientFactory; +import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.storage.Account; import org.whispersystems.textsecuregcm.storage.Accounts; import org.whispersystems.textsecuregcm.storage.AccountsManager; @@ -72,11 +73,11 @@ public class DeleteUserCommand extends EnvironmentCommand account = accountsManager.get(user); diff --git a/src/main/java/org/whispersystems/textsecuregcm/workers/DirectoryCommand.java b/src/main/java/org/whispersystems/textsecuregcm/workers/DirectoryCommand.java index 58a501c2e..bcef230fe 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/workers/DirectoryCommand.java +++ b/src/main/java/org/whispersystems/textsecuregcm/workers/DirectoryCommand.java @@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.WhisperServerConfiguration; import org.whispersystems.textsecuregcm.federation.FederatedClientManager; import org.whispersystems.textsecuregcm.providers.RedisClientFactory; +import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; import org.whispersystems.textsecuregcm.storage.Accounts; import org.whispersystems.textsecuregcm.storage.AccountsManager; import org.whispersystems.textsecuregcm.storage.DirectoryManager; @@ -71,11 +72,11 @@ public class DirectoryCommand extends EnvironmentCommand()); + throw new AssertionError(); + } catch (Exception e) { + // good + } + } + + @Test + public void testWriteCheckoutWithSlaves() { + JedisPool master = mock(JedisPool.class); + JedisPool slave = mock(JedisPool.class); + Jedis instance = mock(Jedis.class ); + + when(master.getResource()).thenReturn(instance); + + ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool(master, Collections.singletonList(slave)); + Jedis writeResource = replicatedJedisPool.getWriteResource(); + + assertThat(writeResource).isEqualTo(instance); + verify(master, times(1)).getResource(); + } + + @Test + public void testReadCheckouts() { + JedisPool master = mock(JedisPool.class); + JedisPool slaveOne = mock(JedisPool.class); + JedisPool slaveTwo = mock(JedisPool.class); + Jedis instanceOne = mock(Jedis.class ); + Jedis instanceTwo = mock(Jedis.class ); + + when(slaveOne.getResource()).thenReturn(instanceOne); + when(slaveTwo.getResource()).thenReturn(instanceTwo); + + ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool(master, Arrays.asList(slaveOne, slaveTwo)); + + assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceOne); + assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceTwo); + assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceOne); + assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceTwo); + assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceOne); + + verifyNoMoreInteractions(master); + } + + @Test + public void testBrokenReadCheckout() { + JedisPool master = mock(JedisPool.class); + JedisPool slaveOne = mock(JedisPool.class); + JedisPool slaveTwo = mock(JedisPool.class); + Jedis instanceTwo = mock(Jedis.class ); + + when(slaveOne.getResource()).thenThrow(new JedisException("Connection failed!")); + when(slaveTwo.getResource()).thenReturn(instanceTwo); + + ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool(master, Arrays.asList(slaveOne, slaveTwo)); + + assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceTwo); + assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceTwo); + assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceTwo); + + verifyNoMoreInteractions(master); + } + + @Test + public void testAllBrokenReadCheckout() { + JedisPool master = mock(JedisPool.class); + JedisPool slaveOne = mock(JedisPool.class); + JedisPool slaveTwo = mock(JedisPool.class); + + when(slaveOne.getResource()).thenThrow(new JedisException("Connection failed!")); + when(slaveTwo.getResource()).thenThrow(new JedisException("Also failed!")); + + ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool(master, Arrays.asList(slaveOne, slaveTwo)); + + try { + replicatedJedisPool.getReadResource(); + throw new AssertionError(); + } catch (Exception e) { + // good + } + + verifyNoMoreInteractions(master); + } + +}