diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java index 2f493f50b..0e881cdee 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerConfiguration.java @@ -101,7 +101,12 @@ public class WhisperServerConfiguration extends Configuration { @Valid @NotNull @JsonProperty - private DatabaseConfiguration database = new DatabaseConfiguration(); + private DatabaseConfiguration keysDatabase; + + @Valid + @NotNull + @JsonProperty + private DatabaseConfiguration accountsDatabase; @JsonProperty private DatabaseConfiguration read_database; @@ -209,12 +214,12 @@ public class WhisperServerConfiguration extends Configuration { return abuseDatabase; } - public DatabaseConfiguration getAccountsDatabaseConfiguration() { - return database; + public DatabaseConfiguration getKeysDatabase() { + return keysDatabase; } - public DatabaseConfiguration getAccountsReadDatabaseConfiguration() { - return read_database; + public DatabaseConfiguration getAccountsDatabaseConfiguration() { + return accountsDatabase; } public RateLimitsConfiguration getLimitsConfiguration() { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 279950c71..ba170e0ea 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -114,6 +114,13 @@ public class WhisperServerService extends Application("keysdb", "keysdb.xml") { + @Override + public DataSourceFactory getDataSourceFactory(WhisperServerConfiguration configuration) { + return configuration.getKeysDatabase(); + } + }); + bootstrap.addBundle(new NameableMigrationsBundle("accountdb", "accountsdb.xml") { @Override public DataSourceFactory getDataSourceFactory(WhisperServerConfiguration configuration) { @@ -121,6 +128,7 @@ public class WhisperServerService extends Application("messagedb", "messagedb.xml") { @Override public DataSourceFactory getDataSourceFactory(WhisperServerConfiguration configuration) { @@ -152,9 +160,11 @@ public class WhisperServerService extends Application accountDatabaseCrawlerListeners = List.of(pushFeedbackProcessor, activeUserCounter, directoryReconciler, accountCleaner); AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(cacheClient); - AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler(accounts, accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners, config.getAccountDatabaseCrawlerConfiguration().getChunkSize(), config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs()); + AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners, config.getAccountDatabaseCrawlerConfiguration().getChunkSize(), config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs()); messagesCache.setPubSubManager(pubSubManager, pushSender); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java index 6bee0d934..295532801 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/limits/RateLimiters.java @@ -68,9 +68,9 @@ public class RateLimiters { config.getAutoBlock().getBucketSize(), config.getAutoBlock().getLeakRatePerMinute()); - this.verifyLimiter = new RateLimiter(cacheClient, "verify", - config.getVerifyNumber().getBucketSize(), - config.getVerifyNumber().getLeakRatePerMinute()); + this.verifyLimiter = new LockingRateLimiter(cacheClient, "verify", + config.getVerifyNumber().getBucketSize(), + config.getVerifyNumber().getLeakRatePerMinute()); this.pinLimiter = new LockingRateLimiter(cacheClient, "pin", config.getVerifyPin().getBucketSize(), diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawler.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawler.java index 3bfe3fed5..9a34e94dd 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawler.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountDatabaseCrawler.java @@ -42,7 +42,7 @@ public class AccountDatabaseCrawler implements Managed, Runnable { private static final long WORKER_TTL_MS = 120_000L; private static final long ACCELERATED_CHUNK_INTERVAL = 10L; - private final Accounts accounts; + private final AccountsManager accounts; private final int chunkSize; private final long chunkIntervalMs; private final String workerId; @@ -52,7 +52,7 @@ public class AccountDatabaseCrawler implements Managed, Runnable { private AtomicBoolean running = new AtomicBoolean(false); private boolean finished; - public AccountDatabaseCrawler(Accounts accounts, + public AccountDatabaseCrawler(AccountsManager accounts, AccountDatabaseCrawlerCache cache, List listeners, int chunkSize, diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java index f138760c6..a0d0fe636 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/Accounts.java @@ -66,7 +66,6 @@ public class Accounts { .bind("data", mapper.writeValueAsString(account)) .execute(); - return rows == 0; } catch (JsonProcessingException e) { throw new IllegalArgumentException(e); @@ -98,7 +97,6 @@ public class Accounts { })); } - public List getAllFrom(String from, int length) { return database.with(jdbi -> jdbi.withHandle(handle -> { try (Timer.Context ignored = getAllFromOffsetTimer.time()) { diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java index 79a8eca75..0697e47cb 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/AccountsManager.java @@ -31,6 +31,7 @@ import org.whispersystems.textsecuregcm.util.SystemMapper; import org.whispersystems.textsecuregcm.util.Util; import java.io.IOException; +import java.util.List; import java.util.Optional; import static com.codahale.metrics.MetricRegistry.name; @@ -92,6 +93,14 @@ public class AccountsManager { } } + public List getAllFrom(int length) { + return accounts.getAllFrom(length); + } + + public List getAllFrom(String number, int length) { + return accounts.getAllFrom(number, length); + } + private void updateDirectory(Account account) { if (account.isEnabled()) { byte[] token = Util.getContactToken(account.getNumber()); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccounts.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccounts.java index aa8638e40..7950350f7 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccounts.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/PendingAccounts.java @@ -19,10 +19,17 @@ package org.whispersystems.textsecuregcm.storage; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.Timer; +import org.jdbi.v3.core.mapper.RowMapper; +import org.jdbi.v3.core.statement.PreparedBatch; +import org.jdbi.v3.core.statement.StatementContext; +import org.jdbi.v3.core.transaction.TransactionIsolationLevel; import org.whispersystems.textsecuregcm.auth.StoredVerificationCode; import org.whispersystems.textsecuregcm.storage.mappers.StoredVerificationCodeRowMapper; import org.whispersystems.textsecuregcm.util.Constants; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; import java.util.Optional; import static com.codahale.metrics.MetricRegistry.name; @@ -43,10 +50,13 @@ public class PendingAccounts { } public void insert(String number, String verificationCode, long timestamp) { - database.use(jdbi -> jdbi.useHandle(handle -> { + database.use(jdbi -> jdbi.useTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> { try (Timer.Context ignored = insertTimer.time()) { - handle.createUpdate("WITH upsert AS (UPDATE pending_accounts SET verification_code = :verification_code, timestamp = :timestamp WHERE number = :number RETURNING *) " + - "INSERT INTO pending_accounts (number, verification_code, timestamp) SELECT :number, :verification_code, :timestamp WHERE NOT EXISTS (SELECT * FROM upsert)") + handle.createUpdate("DELETE FROM pending_accounts WHERE number = :number") + .bind("number", number) + .execute(); + + handle.createUpdate("INSERT INTO pending_accounts (number, verification_code, timestamp) VALUES (:number, :verification_code, :timestamp)") .bind("verification_code", verificationCode) .bind("timestamp", timestamp) .bind("number", number) @@ -84,4 +94,6 @@ public class PendingAccounts { })); } + + } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/mappers/AccountRowMapper.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/mappers/AccountRowMapper.java index 72b52add1..7b608b1ac 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/mappers/AccountRowMapper.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/mappers/AccountRowMapper.java @@ -20,7 +20,6 @@ public class AccountRowMapper implements RowMapper { try { Account account = mapper.readValue(resultSet.getString(Accounts.DATA), Account.class); account.setNumber(resultSet.getString(Accounts.NUMBER)); - return account; } catch (IOException e) { throw new SQLException(e); diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java index dc1d2b92b..119c49987 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/DeleteUserCommand.java @@ -64,13 +64,13 @@ public class DeleteUserCommand extends EnvironmentCommand WhisperServerConfiguration config) throws Exception { - DatabaseConfiguration accountDbConfig = config.getAccountsDatabaseConfiguration(); + DatabaseConfiguration accountDbConfig = config.getAbuseDatabaseConfiguration(); DatabaseConfiguration messageDbConfig = config.getMessageStoreConfiguration(); Jdbi accountJdbi = Jdbi.create(accountDbConfig.getUrl(), accountDbConfig.getUser(), accountDbConfig.getPassword()); diff --git a/service/src/main/resources/keysdb.xml b/service/src/main/resources/keysdb.xml new file mode 100644 index 000000000..f2a9c315c --- /dev/null +++ b/service/src/main/resources/keysdb.xml @@ -0,0 +1,189 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + CREATE type device_t AS (id integer, "authToken" varchar(255), salt varchar(255), "signalingKey" varchar(255), "gcmId" text, "apnId" text); + CREATE type account_t AS (number varchar(255), "supportsSms" smallint, devices device_t array); + UPDATE accounts SET data = row_to_json(row(number, supports_sms, array[row(1, auth_token, salt, signaling_key, gcm_id, apn_id)::device_t])::account_t) + + + + + + + + + + + DROP type account_t; + DROP type device_t; + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + CREATE OR REPLACE FUNCTION "custom_json_object_set_key"( + "json" json, + "key_to_set" TEXT, + "value_to_set" anyelement + ) + RETURNS json + LANGUAGE sql + IMMUTABLE + STRICT + AS $function$ + SELECT COALESCE( + (SELECT ('{' || string_agg(to_json("key") || ':' || "value", ',') || '}') + FROM (SELECT * + FROM json_each("json") + WHERE "key" <> "key_to_set" + UNION ALL + SELECT "key_to_set", to_json("value_to_set")) AS "fields"), + '{}' + )::json + $function$; + UPDATE accounts SET data = custom_json_object_set_key(data, 'identityKey', k.identity_key) FROM keys k WHERE (data->>'identityKey')::text is null AND k.number = data->>'number' AND k.last_resort = 1; + UPDATE accounts SET data = custom_json_object_set_key(data, 'identityKey', k.identity_key) FROM keys k WHERE (data->>'identityKey')::text is null AND k.number = data->>'number'; + + + + + + + + + + + + + + + + + + + + diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java index 8a4e8732f..41cb6d576 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/tests/storage/AccountDatabaseCrawlerTest.java @@ -26,6 +26,7 @@ import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerRestartExc import org.junit.Before; import org.junit.Test; +import org.whispersystems.textsecuregcm.storage.AccountsManager; import java.util.Arrays; import java.util.Collections; @@ -50,7 +51,7 @@ public class AccountDatabaseCrawlerTest { private final Account account1 = mock(Account.class); private final Account account2 = mock(Account.class); - private final Accounts accounts = mock(Accounts.class); + private final AccountsManager accounts = mock(AccountsManager.class); private final AccountDatabaseCrawlerListener listener = mock(AccountDatabaseCrawlerListener.class); private final AccountDatabaseCrawlerCache cache = mock(AccountDatabaseCrawlerCache.class);