From fedfc66403eda5199fd150c743a37aab09acac55 Mon Sep 17 00:00:00 2001 From: Moxie Marlinspike Date: Tue, 23 Oct 2018 04:37:04 -0700 Subject: [PATCH] Initial hystrix support --- pom.xml | 12 ++ .../WhisperServerConfiguration.java | 7 + .../textsecuregcm/WhisperServerService.java | 9 ++ .../textsecuregcm/hystrix/GroupKeys.java | 9 ++ .../textsecuregcm/storage/Accounts.java | 2 +- .../storage/AccountsManager.java | 133 +++++++++++------ .../tests/storage/AccountsManagerTest.java | 134 ++++++++++++++++++ 7 files changed, 264 insertions(+), 42 deletions(-) create mode 100644 src/main/java/org/whispersystems/textsecuregcm/hystrix/GroupKeys.java create mode 100644 src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsManagerTest.java diff --git a/pom.xml b/pom.xml index 227e33c65..3a1bf41a5 100644 --- a/pom.xml +++ b/pom.xml @@ -14,6 +14,7 @@ 1.3.7 2.9.6 + 1.5.12 UTF-8 @@ -110,6 +111,17 @@ 0.5.0 + + com.netflix.hystrix + hystrix-core + ${hystrix.version} + + + com.netflix.hystrix + hystrix-codahale-metrics-publisher + ${hystrix.version} + + com.relayrides pushy diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java index 9c9938c60..95219af28 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java +++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java @@ -131,6 +131,10 @@ public class WhisperServerConfiguration extends Configuration { @JsonProperty private UnidentifiedDeliveryConfiguration unidentifiedDelivery; + @NotNull + @JsonProperty + private Map hystrix = new HashMap<>(); + public WebSocketConfiguration getWebSocketConfiguration() { return webSocket; } @@ -225,4 +229,7 @@ public class WhisperServerConfiguration extends Configuration { return results; } + public Map getHystrixConfiguration() { + return hystrix; + } } diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 8d138d821..66b652db8 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -20,6 +20,10 @@ import com.codahale.metrics.SharedMetricRegistries; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.DeserializationFeature; +import com.netflix.config.ConfigurationManager; +import com.netflix.hystrix.contrib.codahalemetricspublisher.HystrixCodaHaleMetricsPublisher; +import com.netflix.hystrix.strategy.HystrixPlugins; +import org.apache.commons.configuration.MapConfiguration; import org.bouncycastle.jce.provider.BouncyCastleProvider; import org.eclipse.jetty.servlets.CrossOriginFilter; import org.skife.jdbi.v2.DBI; @@ -102,6 +106,9 @@ public class WhisperServerService extends Application bootstrap) { + HystrixCodaHaleMetricsPublisher hystrixMetricsPublisher = new HystrixCodaHaleMetricsPublisher(bootstrap.getMetricRegistry()); + HystrixPlugins.getInstance().registerMetricsPublisher(hystrixMetricsPublisher); + bootstrap.addCommand(new DirectoryCommand()); bootstrap.addCommand(new VacuumCommand()); bootstrap.addCommand(new TrimMessagesCommand()); @@ -137,6 +144,8 @@ public class WhisperServerService extends Application get(String number) { - Optional account = memcacheGet(number); + Optional account = redisGet(number); if (!account.isPresent()) { - account = Optional.ofNullable(accounts.get(number)); - - if (account.isPresent()) { - memcacheSet(number, account.get()); - } + account = databaseGet(number); + account.ifPresent(value -> redisSet(number, value, true)); } return account; } - public boolean isRelayListed(String number) { - byte[] token = Util.getContactToken(number); - Optional contact = directory.get(token); - - return contact.isPresent() && !Util.isEmpty(contact.get().getRelay()); - } - private void updateDirectory(Account account) { - if (account.isActive()) { - byte[] token = Util.getContactToken(account.getNumber()); - ClientContact clientContact = new ClientContact(token, null, true, true); - directory.add(clientContact); - } else { - directory.remove(account.getNumber()); - } + new HystrixCommand(HystrixCommandGroupKey.Factory.asKey(GroupKeys.DIRECTORY_SERVICE)) { + @Override + protected Void run() { + if (account.isActive()) { + byte[] token = Util.getContactToken(account.getNumber()); + ClientContact clientContact = new ClientContact(token, null, true, true); + directory.add(clientContact); + } else { + directory.remove(account.getNumber()); + } + + return null; + } + }.execute(); } private String getKey(String number) { return Account.class.getSimpleName() + Account.MEMCACHE_VERION + number; } - private void memcacheSet(String number, Account account) { - try (Jedis jedis = cacheClient.getWriteResource()) { - jedis.set(getKey(number), mapper.writeValueAsString(account)); - } catch (JsonProcessingException e) { - throw new IllegalArgumentException(e); - } + private void redisSet(String number, Account account, boolean optional) { + new HystrixCommand(HystrixCommandGroupKey.Factory.asKey(GroupKeys.REDIS_CACHE)) { + @Override + protected Boolean run() { + try (Jedis jedis = cacheClient.getWriteResource()) { + jedis.set(getKey(number), mapper.writeValueAsString(account)); + } catch (JsonProcessingException e) { + throw new HystrixBadRequestException("Json processing error", e); + } + + return true; + } + + @Override + protected Boolean getFallback() { + if (optional) return true; + else return super.getFallback(); + } + }.execute(); } - private Optional memcacheGet(String number) { - try (Jedis jedis = cacheClient.getReadResource()) { - String json = jedis.get(getKey(number)); + private Optional redisGet(String number) { + return new HystrixCommand>(HystrixCommandGroupKey.Factory.asKey(GroupKeys.REDIS_CACHE)) { + @Override + protected Optional run() { + try (Jedis jedis = cacheClient.getReadResource()) { + String json = jedis.get(getKey(number)); - if (json != null) return Optional.of(mapper.readValue(json, Account.class)); - else return Optional.empty(); - } catch (IOException e) { - logger.warn("AccountsManager", "Deserialization error", e); - return Optional.empty(); - } + if (json != null) return Optional.of(mapper.readValue(json, Account.class)); + else return Optional.empty(); + } catch (IOException e) { + logger.warn("AccountsManager", "Deserialization error", e); + return Optional.empty(); + } + } + + @Override + protected Optional getFallback() { + return Optional.empty(); + } + }.execute(); } + private Optional databaseGet(String number) { + return new HystrixCommand>(HystrixCommandGroupKey.Factory.asKey(GroupKeys.DATABASE_ACCOUNTS)) { + @Override + protected Optional run() { + return Optional.ofNullable(accounts.get(number)); + } + }.execute(); + } + + private boolean databaseCreate(Account account) { + return new HystrixCommand(HystrixCommandGroupKey.Factory.asKey(GroupKeys.DATABASE_ACCOUNTS)) { + @Override + protected Boolean run() { + return accounts.create(account); + } + }.execute(); + } + + private void databaseUpdate(Account account) { + new HystrixCommand(HystrixCommandGroupKey.Factory.asKey(GroupKeys.DATABASE_ACCOUNTS)) { + @Override + protected Void run() { + accounts.update(account); + return null; + } + }.execute(); + } } diff --git a/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsManagerTest.java b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsManagerTest.java new file mode 100644 index 000000000..719eb6d45 --- /dev/null +++ b/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountsManagerTest.java @@ -0,0 +1,134 @@ +package org.whispersystems.textsecuregcm.tests.storage; + +import com.netflix.hystrix.exception.HystrixRuntimeException; +import org.junit.Test; +import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException; +import org.whispersystems.textsecuregcm.redis.ReplicatedJedisPool; +import org.whispersystems.textsecuregcm.storage.Account; +import org.whispersystems.textsecuregcm.storage.Accounts; +import org.whispersystems.textsecuregcm.storage.AccountsManager; +import org.whispersystems.textsecuregcm.storage.DirectoryManager; + +import java.util.HashSet; +import java.util.Optional; + +import static junit.framework.TestCase.assertSame; +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.exceptions.JedisException; + +public class AccountsManagerTest { + + @Test + public void testGetAccountInCache() { + ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); + Jedis jedis = mock(Jedis.class ); + Accounts accounts = mock(Accounts.class ); + DirectoryManager directoryManager = mock(DirectoryManager.class ); + + when(cacheClient.getReadResource()).thenReturn(jedis); + when(jedis.get(eq("Account5+14152222222"))).thenReturn("{\"number\": \"+14152222222\", \"name\": \"test\"}"); + + AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient); + Optional account = accountsManager.get("+14152222222"); + + assertTrue(account.isPresent()); + assertEquals(account.get().getNumber(), "+14152222222"); + assertEquals(account.get().getProfileName(), "test"); + + verify(jedis, times(1)).get(eq("Account5+14152222222")); + verify(jedis, times(1)).close(); + verifyNoMoreInteractions(jedis); + verifyNoMoreInteractions(accounts); + } + + @Test + public void testGetAccountNotInCache() { + ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); + Jedis jedis = mock(Jedis.class ); + Accounts accounts = mock(Accounts.class ); + DirectoryManager directoryManager = mock(DirectoryManager.class ); + Account account = new Account("+14152222222", new HashSet<>(), new byte[16]); + + when(cacheClient.getReadResource()).thenReturn(jedis); + when(cacheClient.getWriteResource()).thenReturn(jedis); + when(jedis.get(eq("Account5+14152222222"))).thenReturn(null); + when(accounts.get(eq("+14152222222"))).thenReturn(account); + + AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient); + Optional retrieved = accountsManager.get("+14152222222"); + + assertTrue(retrieved.isPresent()); + assertSame(retrieved.get(), account); + + verify(jedis, times(1)).get(eq("Account5+14152222222")); + verify(jedis, times(1)).set(eq("Account5+14152222222"), anyString()); + verify(jedis, times(2)).close(); + verifyNoMoreInteractions(jedis); + + verify(accounts, times(1)).get(eq("+14152222222")); + verifyNoMoreInteractions(accounts); + } + + @Test + public void testGetAccountBrokenCache() { + ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); + Jedis jedis = mock(Jedis.class ); + Accounts accounts = mock(Accounts.class ); + DirectoryManager directoryManager = mock(DirectoryManager.class ); + Account account = new Account("+14152222222", new HashSet<>(), new byte[16]); + + when(cacheClient.getReadResource()).thenReturn(jedis); + when(cacheClient.getWriteResource()).thenReturn(jedis); + when(jedis.get(eq("Account5+14152222222"))).thenThrow(new JedisException("Connection lost!")); + when(accounts.get(eq("+14152222222"))).thenReturn(account); + + AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient); + Optional retrieved = accountsManager.get("+14152222222"); + + assertTrue(retrieved.isPresent()); + assertSame(retrieved.get(), account); + + verify(jedis, times(1)).get(eq("Account5+14152222222")); + verify(jedis, times(1)).set(eq("Account5+14152222222"), anyString()); + verify(jedis, times(2)).close(); + verifyNoMoreInteractions(jedis); + + verify(accounts, times(1)).get(eq("+14152222222")); + verifyNoMoreInteractions(accounts); + } + + @Test + public void testGetAccountEmptyCacheBrokenDatabase() { + ReplicatedJedisPool cacheClient = mock(ReplicatedJedisPool.class); + Jedis jedis = mock(Jedis.class ); + Accounts accounts = mock(Accounts.class ); + DirectoryManager directoryManager = mock(DirectoryManager.class ); + Account account = new Account("+14152222222", new HashSet<>(), new byte[16]); + + when(cacheClient.getReadResource()).thenReturn(jedis); + when(cacheClient.getWriteResource()).thenReturn(jedis); + when(jedis.get(eq("Account5+14152222222"))).thenReturn(null); + when(accounts.get(eq("+14152222222"))).thenThrow(new UnableToObtainConnectionException(new Exception())); + + AccountsManager accountsManager = new AccountsManager(accounts, directoryManager, cacheClient); + + try { + Optional retrieved = accountsManager.get("+14152222222"); + throw new AssertionError("Should not have succeeded!"); + } catch (HystrixRuntimeException e) { + // good + verify(jedis, times(1)).get(eq("Account5+14152222222")); + verify(jedis, times(1)).close(); + verifyNoMoreInteractions(jedis); + + verify(accounts, times(1)).get(eq("+14152222222")); + verifyNoMoreInteractions(accounts); + } + } + + +}