diff --git a/pom.xml b/pom.xml
index 227e33c65..3a1bf41a5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -14,6 +14,7 @@
1.3.7
2.9.6
+ 1.5.12
UTF-8
@@ -110,6 +111,17 @@
0.5.0
+
+ com.netflix.hystrix
+ hystrix-core
+ ${hystrix.version}
+
+
+ com.netflix.hystrix
+ hystrix-codahale-metrics-publisher
+ ${hystrix.version}
+
+
com.relayrides
pushy
diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java
index 9c9938c60..95219af28 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java
@@ -131,6 +131,10 @@ public class WhisperServerConfiguration extends Configuration {
@JsonProperty
private UnidentifiedDeliveryConfiguration unidentifiedDelivery;
+ @NotNull
+ @JsonProperty
+ private Map hystrix = new HashMap<>();
+
public WebSocketConfiguration getWebSocketConfiguration() {
return webSocket;
}
@@ -225,4 +229,7 @@ public class WhisperServerConfiguration extends Configuration {
return results;
}
+ public Map getHystrixConfiguration() {
+ return hystrix;
+ }
}
diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
index 8d138d821..66b652db8 100644
--- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
+++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java
@@ -20,6 +20,10 @@ import com.codahale.metrics.SharedMetricRegistries;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.netflix.config.ConfigurationManager;
+import com.netflix.hystrix.contrib.codahalemetricspublisher.HystrixCodaHaleMetricsPublisher;
+import com.netflix.hystrix.strategy.HystrixPlugins;
+import org.apache.commons.configuration.MapConfiguration;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.eclipse.jetty.servlets.CrossOriginFilter;
import org.skife.jdbi.v2.DBI;
@@ -102,6 +106,9 @@ public class WhisperServerService extends Application bootstrap) {
+ HystrixCodaHaleMetricsPublisher hystrixMetricsPublisher = new HystrixCodaHaleMetricsPublisher(bootstrap.getMetricRegistry());
+ HystrixPlugins.getInstance().registerMetricsPublisher(hystrixMetricsPublisher);
+
bootstrap.addCommand(new DirectoryCommand());
bootstrap.addCommand(new VacuumCommand());
bootstrap.addCommand(new TrimMessagesCommand());
@@ -137,6 +144,8 @@ public class WhisperServerService extends Application get(String number) {
- Optional account = memcacheGet(number);
+ Optional account = redisGet(number);
if (!account.isPresent()) {
- account = Optional.ofNullable(accounts.get(number));
-
- if (account.isPresent()) {
- memcacheSet(number, account.get());
- }
+ account = databaseGet(number);
+ account.ifPresent(value -> redisSet(number, value, true));
}
return account;
}
- public boolean isRelayListed(String number) {
- byte[] token = Util.getContactToken(number);
- Optional contact = directory.get(token);
-
- return contact.isPresent() && !Util.isEmpty(contact.get().getRelay());
- }
-
private void updateDirectory(Account account) {
- if (account.isActive()) {
- byte[] token = Util.getContactToken(account.getNumber());
- ClientContact clientContact = new ClientContact(token, null, true, true);
- directory.add(clientContact);
- } else {
- directory.remove(account.getNumber());
- }
+ new HystrixCommand(HystrixCommandGroupKey.Factory.asKey(GroupKeys.DIRECTORY_SERVICE)) {
+ @Override
+ protected Void run() {
+ if (account.isActive()) {
+ byte[] token = Util.getContactToken(account.getNumber());
+ ClientContact clientContact = new ClientContact(token, null, true, true);
+ directory.add(clientContact);
+ } else {
+ directory.remove(account.getNumber());
+ }
+
+ return null;
+ }
+ }.execute();
}
private String getKey(String number) {
return Account.class.getSimpleName() + Account.MEMCACHE_VERION + number;
}
- private void memcacheSet(String number, Account account) {
- try (Jedis jedis = cacheClient.getWriteResource()) {
- jedis.set(getKey(number), mapper.writeValueAsString(account));
- } catch (JsonProcessingException e) {
- throw new IllegalArgumentException(e);
- }
+ private void redisSet(String number, Account account, boolean optional) {
+ new HystrixCommand(HystrixCommandGroupKey.Factory.asKey(GroupKeys.REDIS_CACHE)) {
+ @Override
+ protected Boolean run() {
+ try (Jedis jedis = cacheClient.getWriteResource()) {
+ jedis.set(getKey(number), mapper.writeValueAsString(account));
+ } catch (JsonProcessingException e) {
+ throw new HystrixBadRequestException("Json processing error", e);
+ }
+
+ return true;
+ }
+
+ @Override
+ protected Boolean getFallback() {
+ if (optional) return true;
+ else return super.getFallback();
+ }
+ }.execute();
}
- private Optional memcacheGet(String number) {
- try (Jedis jedis = cacheClient.getReadResource()) {
- String json = jedis.get(getKey(number));
+ private Optional redisGet(String number) {
+ return new HystrixCommand>(HystrixCommandGroupKey.Factory.asKey(GroupKeys.REDIS_CACHE)) {
+ @Override
+ protected Optional run() {
+ try (Jedis jedis = cacheClient.getReadResource()) {
+ String json = jedis.get(getKey(number));
- if (json != null) return Optional.of(mapper.readValue(json, Account.class));
- else return Optional.empty();
- } catch (IOException e) {
- logger.warn("AccountsManager", "Deserialization error", e);
- return Optional.empty();
- }
+ if (json != null) return Optional.of(mapper.readValue(json, Account.class));
+ else return Optional.empty();
+ } catch (IOException e) {
+ logger.warn("AccountsManager", "Deserialization error", e);
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ protected Optional getFallback() {
+ return Optional.empty();
+ }
+ }.execute();
}
+ private Optional databaseGet(String number) {
+ return new HystrixCommand>(HystrixCommandGroupKey.Factory.asKey(GroupKeys.DATABASE_ACCOUNTS)) {
+ @Override
+ protected Optional run() {
+ return Optional.ofNullable(accounts.get(number));
+ }
+ }.execute();
+ }
+
+ private boolean databaseCreate(Account account) {
+ return new HystrixCommand(HystrixCommandGroupKey.Factory.asKey(GroupKeys.DATABASE_ACCOUNTS)) {
+ @Override
+ protected Boolean run() {
+ return accounts.create(account);
+ }
+ }.execute();
+ }
+
+ private void databaseUpdate(Account account) {
+ new HystrixCommand(HystrixCommandGroupKey.Factory.asKey(GroupKeys.DATABASE_ACCOUNTS)) {
+ @Override
+ protected Void run() {
+ accounts.update(account);
+ return null;
+ }
+ }.execute();
+ }
}
diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsManagerTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsManagerTest.java
new file mode 100644
index 000000000..719eb6d45
--- /dev/null
+++ b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsManagerTest.java
@@ -0,0 +1,134 @@
+package org.whispersystems.textsecuregcm.tests.storage;
+
+import com.netflix.hystrix.exception.HystrixRuntimeException;
+import org.junit.Test;
+import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
+import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool;
+import org.whispersystems.textsecuregcm.storage.Account;
+import org.whispersystems.textsecuregcm.storage.Accounts;
+import org.whispersystems.textsecuregcm.storage.AccountsManager;
+import org.whispersystems.textsecuregcm.storage.DirectoryManager;
+
+import java.util.HashSet;
+import java.util.Optional;
+
+import static junit.framework.TestCase.assertSame;
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.*;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.exceptions.JedisException;
+
+public class AccountsManagerTest {
+
+ @Test
+ public void testGetAccountInCache() {
+ ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class);
+ Jedis jedis = mock(Jedis.class );
+ Accounts accounts = mock(Accounts.class );
+ DirectoryManager directoryManager = mock(DirectoryManager.class );
+
+ when(cacheClient.getReadResource()).thenReturn(jedis);
+ when(jedis.get(eq("Account5+14152222222"))).thenReturn("{\"number\": \"+14152222222\", \"name\": \"test\"}");
+
+ AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient);
+ Optional account = accountsManager.get("+14152222222");
+
+ assertTrue(account.isPresent());
+ assertEquals(account.get().getNumber(), "+14152222222");
+ assertEquals(account.get().getProfileName(), "test");
+
+ verify(jedis, times(1)).get(eq("Account5+14152222222"));
+ verify(jedis, times(1)).close();
+ verifyNoMoreInteractions(jedis);
+ verifyNoMoreInteractions(accounts);
+ }
+
+ @Test
+ public void testGetAccountNotInCache() {
+ ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class);
+ Jedis jedis = mock(Jedis.class );
+ Accounts accounts = mock(Accounts.class );
+ DirectoryManager directoryManager = mock(DirectoryManager.class );
+ Account account = new Account("+14152222222", new HashSet<>(), new byte[16]);
+
+ when(cacheClient.getReadResource()).thenReturn(jedis);
+ when(cacheClient.getWriteResource()).thenReturn(jedis);
+ when(jedis.get(eq("Account5+14152222222"))).thenReturn(null);
+ when(accounts.get(eq("+14152222222"))).thenReturn(account);
+
+ AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient);
+ Optional retrieved = accountsManager.get("+14152222222");
+
+ assertTrue(retrieved.isPresent());
+ assertSame(retrieved.get(), account);
+
+ verify(jedis, times(1)).get(eq("Account5+14152222222"));
+ verify(jedis, times(1)).set(eq("Account5+14152222222"), anyString());
+ verify(jedis, times(2)).close();
+ verifyNoMoreInteractions(jedis);
+
+ verify(accounts, times(1)).get(eq("+14152222222"));
+ verifyNoMoreInteractions(accounts);
+ }
+
+ @Test
+ public void testGetAccountBrokenCache() {
+ ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class);
+ Jedis jedis = mock(Jedis.class );
+ Accounts accounts = mock(Accounts.class );
+ DirectoryManager directoryManager = mock(DirectoryManager.class );
+ Account account = new Account("+14152222222", new HashSet<>(), new byte[16]);
+
+ when(cacheClient.getReadResource()).thenReturn(jedis);
+ when(cacheClient.getWriteResource()).thenReturn(jedis);
+ when(jedis.get(eq("Account5+14152222222"))).thenThrow(new JedisException("Connection lost!"));
+ when(accounts.get(eq("+14152222222"))).thenReturn(account);
+
+ AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient);
+ Optional retrieved = accountsManager.get("+14152222222");
+
+ assertTrue(retrieved.isPresent());
+ assertSame(retrieved.get(), account);
+
+ verify(jedis, times(1)).get(eq("Account5+14152222222"));
+ verify(jedis, times(1)).set(eq("Account5+14152222222"), anyString());
+ verify(jedis, times(2)).close();
+ verifyNoMoreInteractions(jedis);
+
+ verify(accounts, times(1)).get(eq("+14152222222"));
+ verifyNoMoreInteractions(accounts);
+ }
+
+ @Test
+ public void testGetAccountEmptyCacheBrokenDatabase() {
+ ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class);
+ Jedis jedis = mock(Jedis.class );
+ Accounts accounts = mock(Accounts.class );
+ DirectoryManager directoryManager = mock(DirectoryManager.class );
+ Account account = new Account("+14152222222", new HashSet<>(), new byte[16]);
+
+ when(cacheClient.getReadResource()).thenReturn(jedis);
+ when(cacheClient.getWriteResource()).thenReturn(jedis);
+ when(jedis.get(eq("Account5+14152222222"))).thenReturn(null);
+ when(accounts.get(eq("+14152222222"))).thenThrow(new UnableToObtainConnectionException(new Exception()));
+
+ AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient);
+
+ try {
+ Optional retrieved = accountsManager.get("+14152222222");
+ throw new AssertionError("Should not have succeeded!");
+ } catch (HystrixRuntimeException e) {
+ // good
+ verify(jedis, times(1)).get(eq("Account5+14152222222"));
+ verify(jedis, times(1)).close();
+ verifyNoMoreInteractions(jedis);
+
+ verify(accounts, times(1)).get(eq("+14152222222"));
+ verifyNoMoreInteractions(accounts);
+ }
+ }
+
+
+}