Support for replicated redis clusters

This commit is contained in:
Moxie Marlinspike 2018-04-10 13:19:40 -07:00
parent 90ecc5c13b
commit 49dad3099a
19 changed files with 304 additions and 157 deletions

View File

@ -1,30 +0,0 @@
/**
* Copyright (C) 2014 Open Whisper Systems
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package textsecure;
option java_package = "org.whispersystems.textsecuregcm.storage";
option java_outer_classname = "StoredMessageProtos";
message StoredMessage {
enum Type {
UNKNOWN = 0;
MESSAGE = 1;
}
optional Type type = 1;
optional bytes content = 2;
}

View File

@ -63,6 +63,7 @@ import org.whispersystems.textsecuregcm.push.GCMSender;
import org.whispersystems.textsecuregcm.push.PushSender;
import org.whispersystems.textsecuregcm.push.ReceiptSender;
import org.whispersystems.textsecuregcm.push.WebsocketSender;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.s3.UrlSigner;
import org.whispersystems.textsecuregcm.sms.SmsSender;
import org.whispersystems.textsecuregcm.sms.TwilioSmsSender;
@ -104,7 +105,6 @@ import io.dropwizard.db.DataSourceFactory;
import io.dropwizard.jdbi.DBIFactory;
import io.dropwizard.setup.Bootstrap;
import io.dropwizard.setup.Environment;
import redis.clients.jedis.JedisPool;
public class WhisperServerService extends Application<WhisperServerConfiguration> {
@ -158,10 +158,12 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
Keys keys = database.onDemand(Keys.class);
Messages messages = messagedb.onDemand(Messages.class);
RedisClientFactory cacheClientFactory = new RedisClientFactory(config.getCacheConfiguration().getUrl());
JedisPool cacheClient = cacheClientFactory.getRedisClientPool();
JedisPool directoryClient = new RedisClientFactory(config.getDirectoryConfiguration().getUrl()).getRedisClientPool();
JedisPool messagesClient = new RedisClientFactory(config.getMessageCacheConfiguration().getRedisConfiguration().getUrl()).getRedisClientPool();
RedisClientFactory cacheClientFactory = new RedisClientFactory(config.getCacheConfiguration().getUrl(), config.getCacheConfiguration().getReplicaUrls() );
RedisClientFactory directoryClientFactory = new RedisClientFactory(config.getDirectoryConfiguration().getUrl(), config.getDirectoryConfiguration().getReplicaUrls());
RedisClientFactory messagesClientFactory = new RedisClientFactory(config.getMessageCacheConfiguration().getRedisConfiguration().getUrl(), config.getMessageCacheConfiguration().getRedisConfiguration().getReplicaUrls());
ReplicatedJedisPool cacheClient = cacheClientFactory.getRedisClientPool();
ReplicatedJedisPool directoryClient = directoryClientFactory.getRedisClientPool();
ReplicatedJedisPool messagesClient = messagesClientFactory.getRedisClientPool();
DirectoryManager directory = new DirectoryManager(directoryClient);
PendingAccountsManager pendingAccountsManager = new PendingAccountsManager(pendingAccounts, cacheClient);

View File

@ -21,13 +21,24 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.hibernate.validator.constraints.NotEmpty;
import org.hibernate.validator.constraints.URL;
import javax.validation.constraints.NotNull;
import java.util.List;
public class RedisConfiguration {
@JsonProperty
@NotEmpty
private String url;
@JsonProperty
@NotNull
private List<String> replicaUrls;
public String getUrl() {
return url;
}
public List<String> getReplicaUrls() {
return replicaUrls;
}
}

View File

@ -4,17 +4,17 @@ import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.Constants;
import static com.codahale.metrics.MetricRegistry.name;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
public class LockingRateLimiter extends RateLimiter {
private final Meter meter;
public LockingRateLimiter(JedisPool cacheClient, String name, int bucketSize, double leakRatePerMinute) {
public LockingRateLimiter(ReplicatedJedisPool cacheClient, String name, int bucketSize, double leakRatePerMinute) {
super(cacheClient, name, bucketSize, leakRatePerMinute);
MetricRegistry metricRegistry = SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME);
@ -41,13 +41,13 @@ public class LockingRateLimiter extends RateLimiter {
}
private void releaseLock(String key) {
try (Jedis jedis = cacheClient.getResource()) {
try (Jedis jedis = cacheClient.getWriteResource()) {
jedis.del(getLockName(key));
}
}
private boolean acquireLock(String key) {
try (Jedis jedis = cacheClient.getResource()) {
try (Jedis jedis = cacheClient.getWriteResource()) {
return jedis.set(getLockName(key), "L", "NX", "EX", 10) != null;
}
}

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.controllers.RateLimitExceededException;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.SystemMapper;
@ -31,27 +32,26 @@ import java.io.IOException;
import static com.codahale.metrics.MetricRegistry.name;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
public class RateLimiter {
private final Logger logger = LoggerFactory.getLogger(RateLimiter.class);
private final ObjectMapper mapper = SystemMapper.getMapper();
private final Meter meter;
protected final JedisPool cacheClient;
protected final String name;
private final int bucketSize;
private final double leakRatePerMillis;
private final boolean reportLimits;
private final Meter meter;
protected final ReplicatedJedisPool cacheClient;
protected final String name;
private final int bucketSize;
private final double leakRatePerMillis;
private final boolean reportLimits;
public RateLimiter(JedisPool cacheClient, String name,
public RateLimiter(ReplicatedJedisPool cacheClient, String name,
int bucketSize, double leakRatePerMinute)
{
this(cacheClient, name, bucketSize, leakRatePerMinute, false);
}
public RateLimiter(JedisPool cacheClient, String name,
public RateLimiter(ReplicatedJedisPool cacheClient, String name,
int bucketSize, double leakRatePerMinute,
boolean reportLimits)
{
@ -81,13 +81,13 @@ public class RateLimiter {
}
public void clear(String key) {
try (Jedis jedis = cacheClient.getResource()) {
try (Jedis jedis = cacheClient.getWriteResource()) {
jedis.del(getBucketName(key));
}
}
private void setBucket(String key, LeakyBucket bucket) {
try (Jedis jedis = cacheClient.getResource()) {
try (Jedis jedis = cacheClient.getWriteResource()) {
String serialized = bucket.serialize(mapper);
jedis.setex(getBucketName(key), (int) Math.ceil((bucketSize / leakRatePerMillis) / 1000), serialized);
} catch (JsonProcessingException e) {
@ -96,7 +96,7 @@ public class RateLimiter {
}
private LeakyBucket getBucket(String key) {
try (Jedis jedis = cacheClient.getResource()) {
try (Jedis jedis = cacheClient.getReadResource()) {
String serialized = jedis.get(getBucketName(key));
if (serialized != null) {

View File

@ -18,8 +18,7 @@ package org.whispersystems.textsecuregcm.limits;
import org.whispersystems.textsecuregcm.configuration.RateLimitsConfiguration;
import redis.clients.jedis.JedisPool;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
public class RateLimiters {
@ -41,7 +40,7 @@ public class RateLimiters {
private final RateLimiter profileLimiter;
public RateLimiters(RateLimitsConfiguration config, JedisPool cacheClient) {
public RateLimiters(RateLimitsConfiguration config, ReplicatedJedisPool cacheClient) {
this.smsDestinationLimiter = new RateLimiter(cacheClient, "smsDestination",
config.getSmsDestination().getBucketSize(),
config.getSmsDestination().getLeakRatePerMinute());

View File

@ -20,12 +20,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.dispatch.io.RedisPubSubConnectionFactory;
import org.whispersystems.dispatch.redis.PubSubConnection;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.Util;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.LinkedList;
import java.util.List;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
@ -37,9 +40,9 @@ public class RedisClientFactory implements RedisPubSubConnectionFactory {
private final String host;
private final int port;
private final JedisPool jedisPool;
private final ReplicatedJedisPool jedisPool;
public RedisClientFactory(String url) throws URISyntaxException {
public RedisClientFactory(String url, List<String> replicaUrls) throws URISyntaxException {
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setTestOnBorrow(true);
@ -47,11 +50,23 @@ public class RedisClientFactory implements RedisPubSubConnectionFactory {
this.host = redisURI.getHost();
this.port = redisURI.getPort();
this.jedisPool = new JedisPool(poolConfig, host, port,
Protocol.DEFAULT_TIMEOUT, null);
JedisPool masterPool = new JedisPool(poolConfig, host, port, Protocol.DEFAULT_TIMEOUT, null);
List<JedisPool> replicaPools = new LinkedList<>();
for (String replicaUrl : replicaUrls) {
URI replicaURI = new URI(replicaUrl);
replicaPools.add(new JedisPool(poolConfig, replicaURI.getHost(), replicaURI.getPort(),
500, Protocol.DEFAULT_TIMEOUT, null,
Protocol.DEFAULT_DATABASE, null, false, null ,
null, null));
}
this.jedisPool = new ReplicatedJedisPool(masterPool, replicaPools);
}
public JedisPool getRedisClientPool() {
public ReplicatedJedisPool getRedisClientPool() {
return jedisPool;
}

View File

@ -17,21 +17,21 @@
package org.whispersystems.textsecuregcm.providers;
import com.codahale.metrics.health.HealthCheck;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
public class RedisHealthCheck extends HealthCheck {
private final JedisPool clientPool;
private final ReplicatedJedisPool clientPool;
public RedisHealthCheck(JedisPool clientPool) {
public RedisHealthCheck(ReplicatedJedisPool clientPool) {
this.clientPool = clientPool;
}
@Override
protected Result check() throws Exception {
try (Jedis client = clientPool.getResource()) {
try (Jedis client = clientPool.getWriteResource()) {
client.set("HEALTH", "test");
if (!"test".equals(client.get("HEALTH"))) {

View File

@ -1,27 +1,20 @@
package org.whispersystems.textsecuregcm.redis;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.List;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.exceptions.JedisDataException;
public class LuaScript {
private final JedisPool jedisPool;
private final String script;
private final byte[] sha;
private final ReplicatedJedisPool jedisPool;
private final String script;
private final byte[] sha;
public static LuaScript fromResource(JedisPool jedisPool, String resource) throws IOException {
public static LuaScript fromResource(ReplicatedJedisPool jedisPool, String resource) throws IOException {
InputStream inputStream = LuaScript.class.getClassLoader().getResourceAsStream(resource);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@ -38,14 +31,14 @@ public class LuaScript {
return new LuaScript(jedisPool, new String(baos.toByteArray()));
}
private LuaScript(JedisPool jedisPool, String script) {
private LuaScript(ReplicatedJedisPool jedisPool, String script) {
this.jedisPool = jedisPool;
this.script = script;
this.sha = storeScript(jedisPool, script).getBytes();
}
public Object execute(List<byte[]> keys, List<byte[]> args) {
try (Jedis jedis = jedisPool.getResource()) {
try (Jedis jedis = jedisPool.getWriteResource()) {
try {
return jedis.evalsha(sha, keys, args);
} catch (JedisDataException e) {
@ -55,8 +48,8 @@ public class LuaScript {
}
}
private String storeScript(JedisPool jedisPool, String script) {
try (Jedis jedis = jedisPool.getResource()) {
private String storeScript(ReplicatedJedisPool jedisPool, String script) {
try (Jedis jedis = jedisPool.getWriteResource()) {
return jedis.scriptLoad(script);
}
}

View File

@ -0,0 +1,56 @@
package org.whispersystems.textsecuregcm.redis;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.exceptions.JedisException;
public class ReplicatedJedisPool {
private final Logger logger = LoggerFactory.getLogger(ReplicatedJedisPool.class);
private final AtomicInteger replicaIndex = new AtomicInteger(0);
private final JedisPool master;
private final JedisPool[] replicas;
public ReplicatedJedisPool(JedisPool master, List<JedisPool> replicas) {
if (replicas.size() < 1) throw new IllegalArgumentException("There must be at least one replica");
this.master = master;
this.replicas = new JedisPool[replicas.size()];
for (int i=0;i<this.replicas.length;i++) {
this.replicas[i] = replicas.get(i);
}
}
public Jedis getWriteResource() {
return master.getResource();
}
public void returnWriteResource(Jedis jedis) {
master.returnResource(jedis);
}
public Jedis getReadResource() {
int failureCount = 0;
while (failureCount < replicas.length) {
try {
return replicas[replicaIndex.getAndIncrement() % replicas.length].getResource();
} catch (JedisException e) {
logger.error("Failure obtaining read replica pool", e);
}
failureCount++;
}
throw new JedisException("All read replica pools failed!");
}
}

View File

@ -23,6 +23,7 @@ import com.google.common.base.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.ClientContact;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.Util;
@ -31,21 +32,17 @@ import java.util.Iterator;
import java.util.List;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
public class AccountsManager {
private final Logger logger = LoggerFactory.getLogger(AccountsManager.class);
private final Accounts accounts;
private final JedisPool cacheClient;
private final DirectoryManager directory;
private final ObjectMapper mapper;
private final Accounts accounts;
private final ReplicatedJedisPool cacheClient;
private final DirectoryManager directory;
private final ObjectMapper mapper;
public AccountsManager(Accounts accounts,
DirectoryManager directory,
JedisPool cacheClient)
{
public AccountsManager(Accounts accounts, DirectoryManager directory, ReplicatedJedisPool cacheClient) {
this.accounts = accounts;
this.directory = directory;
this.cacheClient = cacheClient;
@ -114,7 +111,7 @@ public class AccountsManager {
}
private void memcacheSet(String number, Account account) {
try (Jedis jedis = cacheClient.getResource()) {
try (Jedis jedis = cacheClient.getWriteResource()) {
jedis.set(getKey(number), mapper.writeValueAsString(account));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
@ -122,7 +119,7 @@ public class AccountsManager {
}
private Optional<Account> memcacheGet(String number) {
try (Jedis jedis = cacheClient.getResource()) {
try (Jedis jedis = cacheClient.getReadResource()) {
String json = jedis.get(getKey(number));
if (json != null) return Optional.of(mapper.readValue(json, Account.class));

View File

@ -24,6 +24,7 @@ import com.google.common.base.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.entities.ClientContact;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.IterablePair;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.textsecuregcm.util.Util;
@ -33,7 +34,6 @@ import java.util.LinkedList;
import java.util.List;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;
@ -44,9 +44,9 @@ public class DirectoryManager {
private static final byte[] DIRECTORY_KEY = {'d', 'i', 'r', 'e', 'c', 't', 'o', 'r', 'y'};
private final ObjectMapper objectMapper;
private final JedisPool redisPool;
private final ReplicatedJedisPool redisPool;
public DirectoryManager(JedisPool redisPool) {
public DirectoryManager(ReplicatedJedisPool redisPool) {
this.redisPool = redisPool;
this.objectMapper = new ObjectMapper();
this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
@ -61,7 +61,7 @@ public class DirectoryManager {
}
public void remove(byte[] token) {
try (Jedis jedis = redisPool.getResource()) {
try (Jedis jedis = redisPool.getWriteResource()) {
jedis.hdel(DIRECTORY_KEY, token);
}
}
@ -74,7 +74,7 @@ public class DirectoryManager {
public void add(ClientContact contact) {
TokenValue tokenValue = new TokenValue(contact.getRelay(), contact.isVoice(), contact.isVideo());
try (Jedis jedis = redisPool.getResource()) {
try (Jedis jedis = redisPool.getWriteResource()) {
jedis.hset(DIRECTORY_KEY, contact.getToken(), objectMapper.writeValueAsBytes(tokenValue));
} catch (JsonProcessingException e) {
logger.warn("JSON Serialization", e);
@ -98,7 +98,7 @@ public class DirectoryManager {
}
public Optional<ClientContact> get(byte[] token) {
try (Jedis jedis = redisPool.getResource()) {
try (Jedis jedis = redisPool.getWriteResource()) {
byte[] result = jedis.hget(DIRECTORY_KEY, token);
if (result == null) {
@ -114,7 +114,7 @@ public class DirectoryManager {
}
public List<ClientContact> get(List<byte[]> tokens) {
try (Jedis jedis = redisPool.getResource()) {
try (Jedis jedis = redisPool.getWriteResource()) {
Pipeline pipeline = jedis.pipelined();
List<Response<byte[]>> futures = new LinkedList<>();
List<ClientContact> results = new LinkedList<>();
@ -147,7 +147,7 @@ public class DirectoryManager {
}
public BatchOperationHandle startBatchOperation() {
Jedis jedis = redisPool.getResource();
Jedis jedis = redisPool.getWriteResource();
return new BatchOperationHandle(jedis, jedis.pipelined());
}
@ -156,7 +156,7 @@ public class DirectoryManager {
Jedis jedis = handle.jedis;
pipeline.sync();
redisPool.returnResource(jedis);
redisPool.returnWriteResource(jedis);
}
public static class BatchOperationHandle {

View File

@ -14,6 +14,7 @@ import org.whispersystems.textsecuregcm.push.NotPushRegisteredException;
import org.whispersystems.textsecuregcm.push.PushSender;
import org.whispersystems.textsecuregcm.push.TransientPushFailureException;
import org.whispersystems.textsecuregcm.redis.LuaScript;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.Constants;
import org.whispersystems.textsecuregcm.util.Pair;
import org.whispersystems.textsecuregcm.util.Util;
@ -32,7 +33,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static com.codahale.metrics.MetricRegistry.name;
import io.dropwizard.lifecycle.Managed;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Tuple;
import redis.clients.util.SafeEncoder;
@ -48,10 +48,10 @@ public class MessagesCache implements Managed {
private static final Timer clearAccountTimer = metricRegistry.timer(name(MessagesCache.class, "clearAccount"));
private static final Timer clearDeviceTimer = metricRegistry.timer(name(MessagesCache.class, "clearDevice" ));
private final JedisPool jedisPool;
private final Messages database;
private final AccountsManager accountsManager;
private final int delayMinutes;
private final ReplicatedJedisPool jedisPool;
private final Messages database;
private final AccountsManager accountsManager;
private final int delayMinutes;
private InsertOperation insertOperation;
private RemoveOperation removeOperation;
@ -61,7 +61,7 @@ public class MessagesCache implements Managed {
private PushSender pushSender;
private MessagePersister messagePersister;
public MessagesCache(JedisPool jedisPool, Messages database, AccountsManager accountsManager, int delayMinutes) {
public MessagesCache(ReplicatedJedisPool jedisPool, Messages database, AccountsManager accountsManager, int delayMinutes) {
this.jedisPool = jedisPool;
this.database = database;
this.accountsManager = accountsManager;
@ -244,7 +244,7 @@ public class MessagesCache implements Managed {
private static class InsertOperation {
private final LuaScript insert;
InsertOperation(JedisPool jedisPool) throws IOException {
InsertOperation(ReplicatedJedisPool jedisPool) throws IOException {
this.insert = LuaScript.fromResource(jedisPool, "lua/insert_item.lua");
}
@ -265,7 +265,7 @@ public class MessagesCache implements Managed {
private final LuaScript removeBySender;
private final LuaScript removeQueue;
RemoveOperation(JedisPool jedisPool) throws IOException {
RemoveOperation(ReplicatedJedisPool jedisPool) throws IOException {
this.removeById = LuaScript.fromResource(jedisPool, "lua/remove_item_by_id.lua" );
this.removeBySender = LuaScript.fromResource(jedisPool, "lua/remove_item_by_sender.lua");
this.removeQueue = LuaScript.fromResource(jedisPool, "lua/remove_queue.lua" );
@ -305,7 +305,7 @@ public class MessagesCache implements Managed {
private final LuaScript getQueues;
private final LuaScript getItems;
GetOperation(JedisPool jedisPool) throws IOException {
GetOperation(ReplicatedJedisPool jedisPool) throws IOException {
this.getQueues = LuaScript.fromResource(jedisPool, "lua/get_queues_to_persist.lua");
this.getItems = LuaScript.fromResource(jedisPool, "lua/get_items.lua");
}
@ -346,10 +346,10 @@ public class MessagesCache implements Managed {
private final AtomicBoolean running = new AtomicBoolean(true);
private final JedisPool jedisPool;
private final Messages database;
private final long delayTime;
private final TimeUnit delayTimeUnit;
private final ReplicatedJedisPool jedisPool;
private final Messages database;
private final long delayTime;
private final TimeUnit delayTimeUnit;
private final PubSubManager pubSubManager;
private final PushSender pushSender;
@ -360,13 +360,13 @@ public class MessagesCache implements Managed {
private boolean finished = false;
MessagePersister(JedisPool jedisPool,
Messages database,
PubSubManager pubSubManager,
PushSender pushSender,
AccountsManager accountsManager,
long delayTime,
TimeUnit delayTimeUnit)
MessagePersister(ReplicatedJedisPool jedisPool,
Messages database,
PubSubManager pubSubManager,
PushSender pushSender,
AccountsManager accountsManager,
long delayTime,
TimeUnit delayTimeUnit)
throws IOException
{
super(MessagePersister.class.getSimpleName());
@ -416,12 +416,12 @@ public class MessagesCache implements Managed {
while (!finished) Util.wait(this);
}
private void persistQueue(JedisPool jedisPool, Key key) throws IOException {
private void persistQueue(ReplicatedJedisPool jedisPool, Key key) throws IOException {
Timer.Context timer = persistQueueTimer.time();
int messagesPersistedCount = 0;
try (Jedis jedis = jedisPool.getResource()) {
try (Jedis jedis = jedisPool.getWriteResource()) {
while (true) {
jedis.setex(key.getUserMessageQueuePersistInProgress(), 30, "1".getBytes());

View File

@ -19,16 +19,15 @@ package org.whispersystems.textsecuregcm.storage;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.StoredVerificationCode;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import java.io.IOException;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
public class PendingAccountsManager {
@ -36,11 +35,11 @@ public class PendingAccountsManager {
private static final String CACHE_PREFIX = "pending_account2::";
private final PendingAccounts pendingAccounts;
private final JedisPool cacheClient;
private final ObjectMapper mapper;
private final PendingAccounts pendingAccounts;
private final ReplicatedJedisPool cacheClient;
private final ObjectMapper mapper;
public PendingAccountsManager(PendingAccounts pendingAccounts, JedisPool cacheClient)
public PendingAccountsManager(PendingAccounts pendingAccounts, ReplicatedJedisPool cacheClient)
{
this.pendingAccounts = pendingAccounts;
this.cacheClient = cacheClient;
@ -72,7 +71,7 @@ public class PendingAccountsManager {
}
private void memcacheSet(String number, StoredVerificationCode code) {
try (Jedis jedis = cacheClient.getResource()) {
try (Jedis jedis = cacheClient.getWriteResource()) {
jedis.set(CACHE_PREFIX + number, mapper.writeValueAsString(code));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
@ -80,7 +79,7 @@ public class PendingAccountsManager {
}
private Optional<StoredVerificationCode> memcacheGet(String number) {
try (Jedis jedis = cacheClient.getResource()) {
try (Jedis jedis = cacheClient.getReadResource()) {
String json = jedis.get(CACHE_PREFIX + number);
if (json == null) return Optional.absent();
@ -92,7 +91,7 @@ public class PendingAccountsManager {
}
private void memcacheDelete(String number) {
try (Jedis jedis = cacheClient.getResource()) {
try (Jedis jedis = cacheClient.getWriteResource()) {
jedis.del(CACHE_PREFIX + number);
}
}

View File

@ -22,12 +22,12 @@ import com.google.common.base.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.auth.StoredVerificationCode;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.util.SystemMapper;
import java.io.IOException;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
public class PendingDevicesManager {
@ -35,13 +35,11 @@ public class PendingDevicesManager {
private static final String CACHE_PREFIX = "pending_devices2::";
private final PendingDevices pendingDevices;
private final JedisPool cacheClient;
private final ObjectMapper mapper;
private final PendingDevices pendingDevices;
private final ReplicatedJedisPool cacheClient;
private final ObjectMapper mapper;
public PendingDevicesManager(PendingDevices pendingDevices,
JedisPool cacheClient)
{
public PendingDevicesManager(PendingDevices pendingDevices, ReplicatedJedisPool cacheClient) {
this.pendingDevices = pendingDevices;
this.cacheClient = cacheClient;
this.mapper = SystemMapper.getMapper();
@ -72,7 +70,7 @@ public class PendingDevicesManager {
}
private void memcacheSet(String number, StoredVerificationCode code) {
try (Jedis jedis = cacheClient.getResource()) {
try (Jedis jedis = cacheClient.getWriteResource()) {
jedis.set(CACHE_PREFIX + number, mapper.writeValueAsString(code));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
@ -80,7 +78,7 @@ public class PendingDevicesManager {
}
private Optional<StoredVerificationCode> memcacheGet(String number) {
try (Jedis jedis = cacheClient.getResource()) {
try (Jedis jedis = cacheClient.getReadResource()) {
String json = jedis.get(CACHE_PREFIX + number);
if (json == null) return Optional.absent();
@ -92,7 +90,7 @@ public class PendingDevicesManager {
}
private void memcacheDelete(String number) {
try (Jedis jedis = cacheClient.getResource()) {
try (Jedis jedis = cacheClient.getWriteResource()) {
jedis.del(CACHE_PREFIX + number);
}
}

View File

@ -4,13 +4,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.dispatch.DispatchChannel;
import org.whispersystems.dispatch.DispatchManager;
import java.util.Arrays;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import io.dropwizard.lifecycle.Managed;
import static org.whispersystems.textsecuregcm.storage.PubSubProtos.PubSubMessage;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
public class PubSubManager implements Managed {
@ -18,14 +16,14 @@ public class PubSubManager implements Managed {
private final Logger logger = LoggerFactory.getLogger(PubSubManager.class);
private final DispatchManager dispatchManager;
private final JedisPool jedisPool;
private final DispatchManager dispatchManager;
private final ReplicatedJedisPool jedisPool;
private boolean subscribed = false;
public PubSubManager(JedisPool jedisPool, DispatchManager dispatchManager) {
public PubSubManager(ReplicatedJedisPool jedisPool, DispatchManager dispatchManager) {
this.dispatchManager = dispatchManager;
this.jedisPool = jedisPool;
this.jedisPool = jedisPool;
}
@Override
@ -64,7 +62,7 @@ public class PubSubManager implements Managed {
}
private boolean publish(byte[] channel, PubSubMessage message) {
try (Jedis jedis = jedisPool.getResource()) {
try (Jedis jedis = jedisPool.getWriteResource()) {
long result = jedis.publish(channel, message.toByteArray());
if (result < 0) {

View File

@ -10,6 +10,7 @@ import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.auth.AuthenticationCredentials;
import org.whispersystems.textsecuregcm.providers.RedisClientFactory;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.Accounts;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
@ -72,11 +73,11 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
dbi.registerContainerFactory(new ImmutableSetContainerFactory());
dbi.registerContainerFactory(new OptionalContainerFactory());
Accounts accounts = dbi.onDemand(Accounts.class);
JedisPool cacheClient = new RedisClientFactory(configuration.getCacheConfiguration().getUrl()).getRedisClientPool();
JedisPool redisClient = new RedisClientFactory(configuration.getDirectoryConfiguration().getUrl()).getRedisClientPool();
DirectoryManager directory = new DirectoryManager(redisClient);
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheClient);
Accounts accounts = dbi.onDemand(Accounts.class);
ReplicatedJedisPool cacheClient = new RedisClientFactory(configuration.getCacheConfiguration().getUrl(), configuration.getCacheConfiguration().getReplicaUrls()).getRedisClientPool();
ReplicatedJedisPool redisClient = new RedisClientFactory(configuration.getDirectoryConfiguration().getUrl(), configuration.getDirectoryConfiguration().getReplicaUrls()).getRedisClientPool();
DirectoryManager directory = new DirectoryManager(redisClient);
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheClient);
for (String user: users) {
Optional<Account> account = accountsManager.get(user);

View File

@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.federation.FederatedClientManager;
import org.whispersystems.textsecuregcm.providers.RedisClientFactory;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import org.whispersystems.textsecuregcm.storage.Accounts;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import org.whispersystems.textsecuregcm.storage.DirectoryManager;
@ -71,11 +72,11 @@ public class DirectoryCommand extends EnvironmentCommand<WhisperServerConfigurat
dbi.registerContainerFactory(new ImmutableSetContainerFactory());
dbi.registerContainerFactory(new OptionalContainerFactory());
Accounts accounts = dbi.onDemand(Accounts.class);
JedisPool cacheClient = new RedisClientFactory(configuration.getCacheConfiguration().getUrl()).getRedisClientPool();
JedisPool redisClient = new RedisClientFactory(configuration.getDirectoryConfiguration().getUrl()).getRedisClientPool();
DirectoryManager directory = new DirectoryManager(redisClient);
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheClient);
Accounts accounts = dbi.onDemand(Accounts.class);
ReplicatedJedisPool cacheClient = new RedisClientFactory(configuration.getCacheConfiguration().getUrl(), configuration.getCacheConfiguration().getReplicaUrls()).getRedisClientPool();
ReplicatedJedisPool redisClient = new RedisClientFactory(configuration.getDirectoryConfiguration().getUrl(), configuration.getDirectoryConfiguration().getReplicaUrls()).getRedisClientPool();
DirectoryManager directory = new DirectoryManager(redisClient);
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheClient);
// FederatedClientManager federatedClientManager = new FederatedClientManager(environment,
// configuration.getJerseyClientConfiguration(),
// configuration.getFederationConfiguration());

View File

@ -0,0 +1,107 @@
package org.whispersystems.textsecuregcm.tests.redis;
import org.junit.Test;
import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.*;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.exceptions.JedisException;
public class ReplicatedJedisPoolTest {
@Test
public void testWriteCheckoutNoSlaves() {
JedisPool master = mock(JedisPool.class);
try {
new ReplicatedJedisPool(master, new LinkedList<>());
throw new AssertionError();
} catch (Exception e) {
// good
}
}
@Test
public void testWriteCheckoutWithSlaves() {
JedisPool master = mock(JedisPool.class);
JedisPool slave = mock(JedisPool.class);
Jedis instance = mock(Jedis.class );
when(master.getResource()).thenReturn(instance);
ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool(master, Collections.singletonList(slave));
Jedis writeResource = replicatedJedisPool.getWriteResource();
assertThat(writeResource).isEqualTo(instance);
verify(master, times(1)).getResource();
}
@Test
public void testReadCheckouts() {
JedisPool master = mock(JedisPool.class);
JedisPool slaveOne = mock(JedisPool.class);
JedisPool slaveTwo = mock(JedisPool.class);
Jedis instanceOne = mock(Jedis.class );
Jedis instanceTwo = mock(Jedis.class );
when(slaveOne.getResource()).thenReturn(instanceOne);
when(slaveTwo.getResource()).thenReturn(instanceTwo);
ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool(master, Arrays.asList(slaveOne, slaveTwo));
assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceOne);
assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceTwo);
assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceOne);
assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceTwo);
assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceOne);
verifyNoMoreInteractions(master);
}
@Test
public void testBrokenReadCheckout() {
JedisPool master = mock(JedisPool.class);
JedisPool slaveOne = mock(JedisPool.class);
JedisPool slaveTwo = mock(JedisPool.class);
Jedis instanceTwo = mock(Jedis.class );
when(slaveOne.getResource()).thenThrow(new JedisException("Connection failed!"));
when(slaveTwo.getResource()).thenReturn(instanceTwo);
ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool(master, Arrays.asList(slaveOne, slaveTwo));
assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceTwo);
assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceTwo);
assertThat(replicatedJedisPool.getReadResource()).isEqualTo(instanceTwo);
verifyNoMoreInteractions(master);
}
@Test
public void testAllBrokenReadCheckout() {
JedisPool master = mock(JedisPool.class);
JedisPool slaveOne = mock(JedisPool.class);
JedisPool slaveTwo = mock(JedisPool.class);
when(slaveOne.getResource()).thenThrow(new JedisException("Connection failed!"));
when(slaveTwo.getResource()).thenThrow(new JedisException("Also failed!"));
ReplicatedJedisPool replicatedJedisPool = new ReplicatedJedisPool(master, Arrays.asList(slaveOne, slaveTwo));
try {
replicatedJedisPool.getReadResource();
throw new AssertionError();
} catch (Exception e) {
// good
}
verifyNoMoreInteractions(master);
}
}