diff --git a/protobuf/StoredMessage.proto b/protobuf/StoredMessage.proto
deleted file mode 100644
index 05a6b88e5..000000000
--- a/protobuf/StoredMessage.proto
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Copyright (C) 2014 Open Whisper Systems
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-package textsecure;
-
-option java_package = "org.whispersystems.textsecuregcm.storage";
-option java_outer_classname = "StoredMessageProtos";
-
-message StoredMessage {
- enum Type {
- UNKNOWN = 0;
- MESSAGE = 1;
- }
-
- optional Type type = 1;
- optional bytes content = 2;
-}
diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
index 4dfb071ae..639f42964 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
@@ -63,6 +63,7 @@ import org.whispersystems.textsecuregcm.push.GCMSender;
import org.whispersystems.textsecuregcm.push.PushSender;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
import org.whispersystems.textsecuregcm.push.WebsocketSender;
+import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.s3.UrlSigner;
import org.whispersystems.textsecuregcm.sms.SmsSender;
import org.whispersystems.textsecuregcm.sms.TwilioSmsSender;
@@ -104,7 +105,6 @@ import io.dropwizard.db.DataSourceFactory;
import io.dropwizard.jdbi.DBIFactory;
import io.dropwizard.setup.Bootstrap;
import io.dropwizard.setup.Environment;
-import redis.clients.jedis.JedisPool;
public class WhisperServerService extends Application {
@@ -158,10 +158,12 @@ public class WhisperServerService extends Application replicaUrls;
+
public String getUrl() {
return url;
}
+
+ public List getReplicaUrls() {
+ return replicaUrls;
+ }
}
diff --git a/src/main/java/org/whispersystems/textsecuregcm/limits/LockingRateLimiter.java b/src/main/java/org/whispersystems/textsecuregcm/limits/LockingRateLimiter.java
index 26b6ad055..fbcbf31f4 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/limits/LockingRateLimiter.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/limits/LockingRateLimiter.java
@@ -4,17 +4,17 @@ import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
+import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.Constants;
import static com.codahale.metrics.MetricRegistry.name;
import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisPool;
public class LockingRateLimiter extends RateLimiter {
private final Meter meter;
- public LockingRateLimiter(JedisPool cacheClient, String name, int bucketSize, double leakRatePerMinute) {
+ public LockingRateLimiter(ReplicatedJedisPool cacheClient, String name, int bucketSize, double leakRatePerMinute) {
super(cacheClient, name, bucketSize, leakRatePerMinute);
MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
@@ -41,13 +41,13 @@ public class LockingRateLimiter extends RateLimiter {
}
private void releaseLock(String key) {
- try (Jedis jedis = cacheClient.getResource()) {
+ try (Jedis jedis = cacheClient.getWriteResource()) {
jedis.del(getLockName(key));
}
}
private boolean acquireLock(String key) {
- try (Jedis jedis = cacheClient.getResource()) {
+ try (Jedis jedis = cacheClient.getWriteResource()) {
return jedis.set(getLockName(key), "L", "NX", "EX", 10) != null;
}
}
diff --git a/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java b/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java
index 5da82f6aa..99332ddef 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiter.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
+import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.SystemMapper;
@@ -31,27 +32,26 @@ import java.io.IOException;
import static com.codahale.metrics.MetricRegistry.name;
import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisPool;
public class RateLimiter {
private final Logger logger = LoggerFactory.getLogger(RateLimiter.class);
private final ObjectMapper mapper = SystemMapper.getMapper();
- private final Meter meter;
- protected final JedisPool cacheClient;
- protected final String name;
- private final int bucketSize;
- private final double leakRatePerMillis;
- private final boolean reportLimits;
+ private final Meter meter;
+ protected final ReplicatedJedisPool cacheClient;
+ protected final String name;
+ private final int bucketSize;
+ private final double leakRatePerMillis;
+ private final boolean reportLimits;
- public RateLimiter(JedisPool cacheClient, String name,
+ public RateLimiter(ReplicatedJedisPool cacheClient, String name,
int bucketSize, double leakRatePerMinute)
{
this(cacheClient, name, bucketSize, leakRatePerMinute, false);
}
- public RateLimiter(JedisPool cacheClient, String name,
+ public RateLimiter(ReplicatedJedisPool cacheClient, String name,
int bucketSize, double leakRatePerMinute,
boolean reportLimits)
{
@@ -81,13 +81,13 @@ public class RateLimiter {
}
public void clear(String key) {
- try (Jedis jedis = cacheClient.getResource()) {
+ try (Jedis jedis = cacheClient.getWriteResource()) {
jedis.del(getBucketName(key));
}
}
private void setBucket(String key, LeakyBucket bucket) {
- try (Jedis jedis = cacheClient.getResource()) {
+ try (Jedis jedis = cacheClient.getWriteResource()) {
String serialized = bucket.serialize(mapper);
jedis.setex(getBucketName(key), (int) Math.ceil((bucketSize / leakRatePerMillis) / 1000), serialized);
} catch (JsonProcessingException e) {
@@ -96,7 +96,7 @@ public class RateLimiter {
}
private LeakyBucket getBucket(String key) {
- try (Jedis jedis = cacheClient.getResource()) {
+ try (Jedis jedis = cacheClient.getReadResource()) {
String serialized = jedis.get(getBucketName(key));
if (serialized != null) {
diff --git a/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java b/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java
index 1dff0e157..4bbaabeee 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java
@@ -18,8 +18,7 @@ package org.whispersystems.textsecuregcm.limits;
import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration;
-
-import redis.clients.jedis.JedisPool;
+import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
public class RateLimiters {
@@ -41,7 +40,7 @@ public class RateLimiters {
private final RateLimiter profileLimiter;
- public RateLimiters(RateLimitsConfiguration config, JedisPool cacheClient) {
+ public RateLimiters(RateLimitsConfiguration config, ReplicatedJedisPool cacheClient) {
this.smsDestinationLimiter = new RateLimiter(cacheClient, "smsDestination",
config.getSmsDestination().getBucketSize(),
config.getSmsDestination().getLeakRatePerMinute());
diff --git a/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClientFactory.java b/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClientFactory.java
index f6134df52..7333f73f5 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClientFactory.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/providers/RedisClientFactory.java
@@ -20,12 +20,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.dispatch.io.RedisPubSubConnectionFactory;
import org.whispersystems.dispatch.redis.PubSubConnection;
+import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.Util;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.LinkedList;
+import java.util.List;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
@@ -37,9 +40,9 @@ public class RedisClientFactory implements RedisPubSubConnectionFactory {
private final String host;
private final int port;
- private final JedisPool jedisPool;
+ private final ReplicatedJedisPool jedisPool;
- public RedisClientFactory(String url) throws URISyntaxException {
+ public RedisClientFactory(String url, List replicaUrls) throws URISyntaxException {
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setTestOnBorrow(true);
@@ -47,11 +50,23 @@ public class RedisClientFactory implements RedisPubSubConnectionFactory {
this.host = redisURI.getHost();
this.port = redisURI.getPort();
- this.jedisPool = new JedisPool(poolConfig, host, port,
- Protocol.DEFAULT_TIMEOUT, null);
+
+ JedisPool masterPool = new JedisPool(poolConfig, host, port, Protocol.DEFAULT_TIMEOUT, null);
+ List replicaPools = new LinkedList<>();
+
+ for (String replicaUrl : replicaUrls) {
+ URI replicaURI = new URI(replicaUrl);
+
+ replicaPools.add(new JedisPool(poolConfig, replicaURI.getHost(), replicaURI.getPort(),
+ 500, Protocol.DEFAULT_TIMEOUT, null,
+ Protocol.DEFAULT_DATABASE, null, false, null ,
+ null, null));
+ }
+
+ this.jedisPool = new ReplicatedJedisPool(masterPool, replicaPools);
}
- public JedisPool getRedisClientPool() {
+ public ReplicatedJedisPool getRedisClientPool() {
return jedisPool;
}
diff --git a/src/main/java/org/whispersystems/textsecuregcm/providers/RedisHealthCheck.java b/src/main/java/org/whispersystems/textsecuregcm/providers/RedisHealthCheck.java
index 5340ecf24..71f39d5e2 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/providers/RedisHealthCheck.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/providers/RedisHealthCheck.java
@@ -17,21 +17,21 @@
package org.whispersystems.textsecuregcm.providers;
import com.codahale.metrics.health.HealthCheck;
+import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisPool;
public class RedisHealthCheck extends HealthCheck {
- private final JedisPool clientPool;
+ private final ReplicatedJedisPool clientPool;
- public RedisHealthCheck(JedisPool clientPool) {
+ public RedisHealthCheck(ReplicatedJedisPool clientPool) {
this.clientPool = clientPool;
}
@Override
protected Result check() throws Exception {
- try (Jedis client = clientPool.getResource()) {
+ try (Jedis client = clientPool.getWriteResource()) {
client.set("HEALTH", "test");
if (!"test".equals(client.get("HEALTH"))) {
diff --git a/src/main/java/org/whispersystems/textsecuregcm/redis/LuaScript.java b/src/main/java/org/whispersystems/textsecuregcm/redis/LuaScript.java
index 643b57dcb..24d067d67 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/redis/LuaScript.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/redis/LuaScript.java
@@ -1,27 +1,20 @@
package org.whispersystems.textsecuregcm.redis;
import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
import java.util.List;
import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisPool;
import redis.clients.jedis.exceptions.JedisDataException;
public class LuaScript {
- private final JedisPool jedisPool;
- private final String script;
- private final byte[] sha;
+ private final ReplicatedJedisPool jedisPool;
+ private final String script;
+ private final byte[] sha;
- public static LuaScript fromResource(JedisPool jedisPool, String resource) throws IOException {
+ public static LuaScript fromResource(ReplicatedJedisPool jedisPool, String resource) throws IOException {
InputStream inputStream = LuaScript.class.getClassLoader().getResourceAsStream(resource);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -38,14 +31,14 @@ public class LuaScript {
return new LuaScript(jedisPool, new String(baos.toByteArray()));
}
- private LuaScript(JedisPool jedisPool, String script) {
+ private LuaScript(ReplicatedJedisPool jedisPool, String script) {
this.jedisPool = jedisPool;
this.script = script;
this.sha = storeScript(jedisPool, script).getBytes();
}
public Object execute(List keys, List args) {
- try (Jedis jedis = jedisPool.getResource()) {
+ try (Jedis jedis = jedisPool.getWriteResource()) {
try {
return jedis.evalsha(sha, keys, args);
} catch (JedisDataException e) {
@@ -55,8 +48,8 @@ public class LuaScript {
}
}
- private String storeScript(JedisPool jedisPool, String script) {
- try (Jedis jedis = jedisPool.getResource()) {
+ private String storeScript(ReplicatedJedisPool jedisPool, String script) {
+ try (Jedis jedis = jedisPool.getWriteResource()) {
return jedis.scriptLoad(script);
}
}
diff --git a/src/main/java/org/whispersystems/textsecuregcm/redis/ReplicatedJedisPool.java b/src/main/java/org/whispersystems/textsecuregcm/redis/ReplicatedJedisPool.java
new file mode 100644
index 000000000..d112d6e4a
--- /dev/null
+++ b/src/main/java/org/whispersystems/textsecuregcm/redis/ReplicatedJedisPool.java
@@ -0,0 +1,56 @@
+package org.whispersystems.textsecuregcm.redis;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.exceptions.JedisException;
+
+public class ReplicatedJedisPool {
+
+ private final Logger logger = LoggerFactory.getLogger(ReplicatedJedisPool.class);
+ private final AtomicInteger replicaIndex = new AtomicInteger(0);
+
+ private final JedisPool master;
+ private final JedisPool[] replicas;
+
+ public ReplicatedJedisPool(JedisPool master, List replicas) {
+ if (replicas.size() < 1) throw new IllegalArgumentException("There must be at least one replica");
+
+ this.master = master;
+ this.replicas = new JedisPool[replicas.size()];
+
+ for (int i=0;i memcacheGet(String number) {
- try (Jedis jedis = cacheClient.getResource()) {
+ try (Jedis jedis = cacheClient.getReadResource()) {
String json = jedis.get(getKey(number));
if (json != null) return Optional.of(mapper.readValue(json, Account.class));
diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryManager.java b/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryManager.java
index 47e077f8c..cff2024af 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryManager.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/storage/DirectoryManager.java
@@ -24,6 +24,7 @@ import com.google.common.base.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.ClientContact;
+import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.IterablePair;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.textsecuregcm.util.Util;
@@ -33,7 +34,6 @@ import java.util.LinkedList;
import java.util.List;
import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;
@@ -44,9 +44,9 @@ public class DirectoryManager {
private static final byte[] DIRECTORY_KEY = {'d', 'i', 'r', 'e', 'c', 't', 'o', 'r', 'y'};
private final ObjectMapper objectMapper;
- private final JedisPool redisPool;
+ private final ReplicatedJedisPool redisPool;
- public DirectoryManager(JedisPool redisPool) {
+ public DirectoryManager(ReplicatedJedisPool redisPool) {
this.redisPool = redisPool;
this.objectMapper = new ObjectMapper();
this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
@@ -61,7 +61,7 @@ public class DirectoryManager {
}
public void remove(byte[] token) {
- try (Jedis jedis = redisPool.getResource()) {
+ try (Jedis jedis = redisPool.getWriteResource()) {
jedis.hdel(DIRECTORY_KEY, token);
}
}
@@ -74,7 +74,7 @@ public class DirectoryManager {
public void add(ClientContact contact) {
TokenValue tokenValue = new TokenValue(contact.getRelay(), contact.isVoice(), contact.isVideo());
- try (Jedis jedis = redisPool.getResource()) {
+ try (Jedis jedis = redisPool.getWriteResource()) {
jedis.hset(DIRECTORY_KEY, contact.getToken(), objectMapper.writeValueAsBytes(tokenValue));
} catch (JsonProcessingException e) {
logger.warn("JSON Serialization", e);
@@ -98,7 +98,7 @@ public class DirectoryManager {
}
public Optional get(byte[] token) {
- try (Jedis jedis = redisPool.getResource()) {
+ try (Jedis jedis = redisPool.getWriteResource()) {
byte[] result = jedis.hget(DIRECTORY_KEY, token);
if (result == null) {
@@ -114,7 +114,7 @@ public class DirectoryManager {
}
public List get(List tokens) {
- try (Jedis jedis = redisPool.getResource()) {
+ try (Jedis jedis = redisPool.getWriteResource()) {
Pipeline pipeline = jedis.pipelined();
List> futures = new LinkedList<>();
List results = new LinkedList<>();
@@ -147,7 +147,7 @@ public class DirectoryManager {
}
public BatchOperationHandle startBatchOperation() {
- Jedis jedis = redisPool.getResource();
+ Jedis jedis = redisPool.getWriteResource();
return new BatchOperationHandle(jedis, jedis.pipelined());
}
@@ -156,7 +156,7 @@ public class DirectoryManager {
Jedis jedis = handle.jedis;
pipeline.sync();
- redisPool.returnResource(jedis);
+ redisPool.returnWriteResource(jedis);
}
public static class BatchOperationHandle {
diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java b/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java
index b99636c49..459ce4ab8 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/storage/MessagesCache.java
@@ -14,6 +14,7 @@ import org.whispersystems.textsecuregcm.push.NotPushRegisteredException;
import org.whispersystems.textsecuregcm.push.PushSender;
import org.whispersystems.textsecuregcm.push.TransientPushFailureException;
import org.whispersystems.textsecuregcm.redis.LuaScript;
+import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.textsecuregcm.util.Util;
@@ -32,7 +33,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static com.codahale.metrics.MetricRegistry.name;
import io.dropwizard.lifecycle.Managed;
import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Tuple;
import redis.clients.util.SafeEncoder;
@@ -48,10 +48,10 @@ public class MessagesCache implements Managed {
private static final Timer clearAccountTimer = metricRegistry.timer(name(MessagesCache.class, "clearAccount"));
private static final Timer clearDeviceTimer = metricRegistry.timer(name(MessagesCache.class, "clearDevice" ));
- private final JedisPool jedisPool;
- private final Messages database;
- private final AccountsManager accountsManager;
- private final int delayMinutes;
+ private final ReplicatedJedisPool jedisPool;
+ private final Messages database;
+ private final AccountsManager accountsManager;
+ private final int delayMinutes;
private InsertOperation insertOperation;
private RemoveOperation removeOperation;
@@ -61,7 +61,7 @@ public class MessagesCache implements Managed {
private PushSender pushSender;
private MessagePersister messagePersister;
- public MessagesCache(JedisPool jedisPool, Messages database, AccountsManager accountsManager, int delayMinutes) {
+ public MessagesCache(ReplicatedJedisPool jedisPool, Messages database, AccountsManager accountsManager, int delayMinutes) {
this.jedisPool = jedisPool;
this.database = database;
this.accountsManager = accountsManager;
@@ -244,7 +244,7 @@ public class MessagesCache implements Managed {
private static class InsertOperation {
private final LuaScript insert;
- InsertOperation(JedisPool jedisPool) throws IOException {
+ InsertOperation(ReplicatedJedisPool jedisPool) throws IOException {
this.insert = LuaScript.fromResource(jedisPool, "lua/insert_item.lua");
}
@@ -265,7 +265,7 @@ public class MessagesCache implements Managed {
private final LuaScript removeBySender;
private final LuaScript removeQueue;
- RemoveOperation(JedisPool jedisPool) throws IOException {
+ RemoveOperation(ReplicatedJedisPool jedisPool) throws IOException {
this.removeById = LuaScript.fromResource(jedisPool, "lua/remove_item_by_id.lua" );
this.removeBySender = LuaScript.fromResource(jedisPool, "lua/remove_item_by_sender.lua");
this.removeQueue = LuaScript.fromResource(jedisPool, "lua/remove_queue.lua" );
@@ -305,7 +305,7 @@ public class MessagesCache implements Managed {
private final LuaScript getQueues;
private final LuaScript getItems;
- GetOperation(JedisPool jedisPool) throws IOException {
+ GetOperation(ReplicatedJedisPool jedisPool) throws IOException {
this.getQueues = LuaScript.fromResource(jedisPool, "lua/get_queues_to_persist.lua");
this.getItems = LuaScript.fromResource(jedisPool, "lua/get_items.lua");
}
@@ -346,10 +346,10 @@ public class MessagesCache implements Managed {
private final AtomicBoolean running = new AtomicBoolean(true);
- private final JedisPool jedisPool;
- private final Messages database;
- private final long delayTime;
- private final TimeUnit delayTimeUnit;
+ private final ReplicatedJedisPool jedisPool;
+ private final Messages database;
+ private final long delayTime;
+ private final TimeUnit delayTimeUnit;
private final PubSubManager pubSubManager;
private final PushSender pushSender;
@@ -360,13 +360,13 @@ public class MessagesCache implements Managed {
private boolean finished = false;
- MessagePersister(JedisPool jedisPool,
- Messages database,
- PubSubManager pubSubManager,
- PushSender pushSender,
- AccountsManager accountsManager,
- long delayTime,
- TimeUnit delayTimeUnit)
+ MessagePersister(ReplicatedJedisPool jedisPool,
+ Messages database,
+ PubSubManager pubSubManager,
+ PushSender pushSender,
+ AccountsManager accountsManager,
+ long delayTime,
+ TimeUnit delayTimeUnit)
throws IOException
{
super(MessagePersister.class.getSimpleName());
@@ -416,12 +416,12 @@ public class MessagesCache implements Managed {
while (!finished) Util.wait(this);
}
- private void persistQueue(JedisPool jedisPool, Key key) throws IOException {
+ private void persistQueue(ReplicatedJedisPool jedisPool, Key key) throws IOException {
Timer.Context timer = persistQueueTimer.time();
int messagesPersistedCount = 0;
- try (Jedis jedis = jedisPool.getResource()) {
+ try (Jedis jedis = jedisPool.getWriteResource()) {
while (true) {
jedis.setex(key.getUserMessageQueuePersistInProgress(), 30, "1".getBytes());
diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccountsManager.java b/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccountsManager.java
index cef593dcd..a271bdba2 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccountsManager.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccountsManager.java
@@ -19,16 +19,15 @@ package org.whispersystems.textsecuregcm.storage;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.StoredVerificationCode;
+import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import java.io.IOException;
import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisPool;
public class PendingAccountsManager {
@@ -36,11 +35,11 @@ public class PendingAccountsManager {
private static final String CACHE_PREFIX = "pending_account2::";
- private final PendingAccounts pendingAccounts;
- private final JedisPool cacheClient;
- private final ObjectMapper mapper;
+ private final PendingAccounts pendingAccounts;
+ private final ReplicatedJedisPool cacheClient;
+ private final ObjectMapper mapper;
- public PendingAccountsManager(PendingAccounts pendingAccounts, JedisPool cacheClient)
+ public PendingAccountsManager(PendingAccounts pendingAccounts, ReplicatedJedisPool cacheClient)
{
this.pendingAccounts = pendingAccounts;
this.cacheClient = cacheClient;
@@ -72,7 +71,7 @@ public class PendingAccountsManager {
}
private void memcacheSet(String number, StoredVerificationCode code) {
- try (Jedis jedis = cacheClient.getResource()) {
+ try (Jedis jedis = cacheClient.getWriteResource()) {
jedis.set(CACHE_PREFIX + number, mapper.writeValueAsString(code));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
@@ -80,7 +79,7 @@ public class PendingAccountsManager {
}
private Optional memcacheGet(String number) {
- try (Jedis jedis = cacheClient.getResource()) {
+ try (Jedis jedis = cacheClient.getReadResource()) {
String json = jedis.get(CACHE_PREFIX + number);
if (json == null) return Optional.absent();
@@ -92,7 +91,7 @@ public class PendingAccountsManager {
}
private void memcacheDelete(String number) {
- try (Jedis jedis = cacheClient.getResource()) {
+ try (Jedis jedis = cacheClient.getWriteResource()) {
jedis.del(CACHE_PREFIX + number);
}
}
diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/PendingDevicesManager.java b/src/main/java/org/whispersystems/textsecuregcm/storage/PendingDevicesManager.java
index 20e418542..0c131f837 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/storage/PendingDevicesManager.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/storage/PendingDevicesManager.java
@@ -22,12 +22,12 @@ import com.google.common.base.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.StoredVerificationCode;
+import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import java.io.IOException;
import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisPool;
public class PendingDevicesManager {
@@ -35,13 +35,11 @@ public class PendingDevicesManager {
private static final String CACHE_PREFIX = "pending_devices2::";
- private final PendingDevices pendingDevices;
- private final JedisPool cacheClient;
- private final ObjectMapper mapper;
+ private final PendingDevices pendingDevices;
+ private final ReplicatedJedisPool cacheClient;
+ private final ObjectMapper mapper;
- public PendingDevicesManager(PendingDevices pendingDevices,
- JedisPool cacheClient)
- {
+ public PendingDevicesManager(PendingDevices pendingDevices, ReplicatedJedisPool cacheClient) {
this.pendingDevices = pendingDevices;
this.cacheClient = cacheClient;
this.mapper = SystemMapper.getMapper();
@@ -72,7 +70,7 @@ public class PendingDevicesManager {
}
private void memcacheSet(String number, StoredVerificationCode code) {
- try (Jedis jedis = cacheClient.getResource()) {
+ try (Jedis jedis = cacheClient.getWriteResource()) {
jedis.set(CACHE_PREFIX + number, mapper.writeValueAsString(code));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
@@ -80,7 +78,7 @@ public class PendingDevicesManager {
}
private Optional memcacheGet(String number) {
- try (Jedis jedis = cacheClient.getResource()) {
+ try (Jedis jedis = cacheClient.getReadResource()) {
String json = jedis.get(CACHE_PREFIX + number);
if (json == null) return Optional.absent();
@@ -92,7 +90,7 @@ public class PendingDevicesManager {
}
private void memcacheDelete(String number) {
- try (Jedis jedis = cacheClient.getResource()) {
+ try (Jedis jedis = cacheClient.getWriteResource()) {
jedis.del(CACHE_PREFIX + number);
}
}
diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubManager.java b/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubManager.java
index f721a1866..ca0f7b8bf 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubManager.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/storage/PubSubManager.java
@@ -4,13 +4,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.dispatch.DispatchChannel;
import org.whispersystems.dispatch.DispatchManager;
-
-import java.util.Arrays;
+import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import io.dropwizard.lifecycle.Managed;
import static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage;
import redis.clients.jedis.Jedis;
-import redis.clients.jedis.JedisPool;
public class PubSubManager implements Managed {
@@ -18,14 +16,14 @@ public class PubSubManager implements Managed {
private final Logger logger = LoggerFactory.getLogger(PubSubManager.class);
- private final DispatchManager dispatchManager;
- private final JedisPool jedisPool;
+ private final DispatchManager dispatchManager;
+ private final ReplicatedJedisPool jedisPool;
private boolean subscribed = false;
- public PubSubManager(JedisPool jedisPool, DispatchManager dispatchManager) {
+ public PubSubManager(ReplicatedJedisPool jedisPool, DispatchManager dispatchManager) {
this.dispatchManager = dispatchManager;
- this.jedisPool = jedisPool;
+ this.jedisPool = jedisPool;
}
@Override
@@ -64,7 +62,7 @@ public class PubSubManager implements Managed {
}
private boolean publish(byte[] channel, PubSubMessage message) {
- try (Jedis jedis = jedisPool.getResource()) {
+ try (Jedis jedis = jedisPool.getWriteResource()) {
long result = jedis.publish(channel, message.toByteArray());
if (result < 0) {
diff --git a/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java b/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java
index c00f6a6fc..09a23ac13 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java
@@ -10,6 +10,7 @@ import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.auth.AuthenticationCredentials;
import org.whispersystems.textsecuregcm.providers.RedisClientFactory;
+import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Accounts;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
@@ -72,11 +73,11 @@ public class DeleteUserCommand extends EnvironmentCommand account = accountsManager.get(user);
diff --git a/src/main/java/org/whispersystems/textsecuregcm/workers/DirectoryCommand.java b/src/main/java/org/whispersystems/textsecuregcm/workers/DirectoryCommand.java
index 58a501c2e..bcef230fe 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/workers/DirectoryCommand.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/workers/DirectoryCommand.java
@@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.federation.FederatedClientManager;
import org.whispersystems.textsecuregcm.providers.RedisClientFactory;
+import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.storage.Accounts;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.DirectoryManager;
@@ -71,11 +72,11 @@ public class DirectoryCommand extends EnvironmentCommand());
+ throw new AssertionError();
+ } catch (Exception e) {
+ // good
+ }
+ }
+
+ @Test
+ public void testWriteCheckoutWithSlaves() {
+ JedisPool master = mock(JedisPool.class);
+ JedisPool slave = mock(JedisPool.class);
+ Jedis instance = mock(Jedis.class );
+
+ when(master.getResource()).thenReturn(instance);
+
+ ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool(master, Collections.singletonList(slave));
+ Jedis writeResource = replicatedJedisPool.getWriteResource();
+
+ assertThat(writeResource).isEqualTo(instance);
+ verify(master, times(1)).getResource();
+ }
+
+ @Test
+ public void testReadCheckouts() {
+ JedisPool master = mock(JedisPool.class);
+ JedisPool slaveOne = mock(JedisPool.class);
+ JedisPool slaveTwo = mock(JedisPool.class);
+ Jedis instanceOne = mock(Jedis.class );
+ Jedis instanceTwo = mock(Jedis.class );
+
+ when(slaveOne.getResource()).thenReturn(instanceOne);
+ when(slaveTwo.getResource()).thenReturn(instanceTwo);
+
+ ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool(master, Arrays.asList(slaveOne, slaveTwo));
+
+ assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceOne);
+ assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceTwo);
+ assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceOne);
+ assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceTwo);
+ assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceOne);
+
+ verifyNoMoreInteractions(master);
+ }
+
+ @Test
+ public void testBrokenReadCheckout() {
+ JedisPool master = mock(JedisPool.class);
+ JedisPool slaveOne = mock(JedisPool.class);
+ JedisPool slaveTwo = mock(JedisPool.class);
+ Jedis instanceTwo = mock(Jedis.class );
+
+ when(slaveOne.getResource()).thenThrow(new JedisException("Connection failed!"));
+ when(slaveTwo.getResource()).thenReturn(instanceTwo);
+
+ ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool(master, Arrays.asList(slaveOne, slaveTwo));
+
+ assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceTwo);
+ assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceTwo);
+ assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceTwo);
+
+ verifyNoMoreInteractions(master);
+ }
+
+ @Test
+ public void testAllBrokenReadCheckout() {
+ JedisPool master = mock(JedisPool.class);
+ JedisPool slaveOne = mock(JedisPool.class);
+ JedisPool slaveTwo = mock(JedisPool.class);
+
+ when(slaveOne.getResource()).thenThrow(new JedisException("Connection failed!"));
+ when(slaveTwo.getResource()).thenThrow(new JedisException("Also failed!"));
+
+ ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool(master, Arrays.asList(slaveOne, slaveTwo));
+
+ try {
+ replicatedJedisPool.getReadResource();
+ throw new AssertionError();
+ } catch (Exception e) {
+ // good
+ }
+
+ verifyNoMoreInteractions(master);
+ }
+
+}