Break out keys database and accounts database

This commit is contained in:
Moxie Marlinspike 2019-06-11 11:51:47 -07:00
parent fa2d838e60
commit 5b69ff7e94
12 changed files with 248 additions and 25 deletions

View File

@ -101,7 +101,12 @@ public class WhisperServerConfiguration extends Configuration {
@Valid
@NotNull
@JsonProperty
private DatabaseConfiguration database = new DatabaseConfiguration();
private DatabaseConfiguration keysDatabase;
@Valid
@NotNull
@JsonProperty
private DatabaseConfiguration accountsDatabase;
@JsonProperty
private DatabaseConfiguration read_database;
@ -209,12 +214,12 @@ public class WhisperServerConfiguration extends Configuration {
return abuseDatabase;
}
public DatabaseConfiguration getAccountsDatabaseConfiguration() {
return database;
public DatabaseConfiguration getKeysDatabase() {
return keysDatabase;
}
public DatabaseConfiguration getAccountsReadDatabaseConfiguration() {
return read_database;
public DatabaseConfiguration getAccountsDatabaseConfiguration() {
return accountsDatabase;
}
public RateLimitsConfiguration getLimitsConfiguration() {

View File

@ -114,6 +114,13 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
bootstrap.addCommand(new VacuumCommand());
bootstrap.addCommand(new DeleteUserCommand());
bootstrap.addCommand(new CertificateCommand());
bootstrap.addBundle(new NameableMigrationsBundle<WhisperServerConfiguration>("keysdb", "keysdb.xml") {
@Override
public DataSourceFactory getDataSourceFactory(WhisperServerConfiguration configuration) {
return configuration.getKeysDatabase();
}
});
bootstrap.addBundle(new NameableMigrationsBundle<WhisperServerConfiguration>("accountdb", "accountsdb.xml") {
@Override
public DataSourceFactory getDataSourceFactory(WhisperServerConfiguration configuration) {
@ -121,6 +128,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
}
});
bootstrap.addBundle(new NameableMigrationsBundle<WhisperServerConfiguration>("messagedb", "messagedb.xml") {
@Override
public DataSourceFactory getDataSourceFactory(WhisperServerConfiguration configuration) {
@ -152,9 +160,11 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
JdbiFactory jdbiFactory = new JdbiFactory(DefaultNameStrategy.CHECK_EMPTY);
Jdbi accountJdbi = jdbiFactory.build(environment, config.getAccountsDatabaseConfiguration(), "accountdb");
Jdbi messageJdbi = jdbiFactory.build(environment, config.getMessageStoreConfiguration(), "messagedb" );
Jdbi abuseJdbi = jdbiFactory.build(environment, config.getAbuseDatabaseConfiguration (), "abusedb" );
Jdbi keysJdbi = jdbiFactory.build(environment, config.getKeysDatabase(), "keysdb");
Jdbi messageJdbi = jdbiFactory.build(environment, config.getMessageStoreConfiguration(), "messagedb" );
Jdbi abuseJdbi = jdbiFactory.build(environment, config.getAbuseDatabaseConfiguration(), "abusedb" );
FaultTolerantDatabase keysDatabase = new FaultTolerantDatabase("keys_database", keysJdbi, config.getKeysDatabase().getCircuitBreakerConfiguration());
FaultTolerantDatabase accountDatabase = new FaultTolerantDatabase("accounts_database", accountJdbi, config.getAccountsDatabaseConfiguration().getCircuitBreakerConfiguration());
FaultTolerantDatabase messageDatabase = new FaultTolerantDatabase("message_database", messageJdbi, config.getMessageStoreConfiguration().getCircuitBreakerConfiguration());
FaultTolerantDatabase abuseDatabase = new FaultTolerantDatabase("abuse_database", abuseJdbi, config.getAbuseDatabaseConfiguration().getCircuitBreakerConfiguration());
@ -162,7 +172,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
Accounts accounts = new Accounts(accountDatabase);
PendingAccounts pendingAccounts = new PendingAccounts(accountDatabase);
PendingDevices pendingDevices = new PendingDevices(accountDatabase);
Keys keys = new Keys(accountDatabase);
Keys keys = new Keys(keysDatabase);
Messages messages = new Messages(messageDatabase);
AbusiveHostRules abusiveHostRules = new AbusiveHostRules(abuseDatabase);
@ -213,7 +223,7 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
List<AccountDatabaseCrawlerListener> accountDatabaseCrawlerListeners = List.of(pushFeedbackProcessor, activeUserCounter, directoryReconciler, accountCleaner);
AccountDatabaseCrawlerCache accountDatabaseCrawlerCache = new AccountDatabaseCrawlerCache(cacheClient);
AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler(accounts, accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners, config.getAccountDatabaseCrawlerConfiguration().getChunkSize(), config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs());
AccountDatabaseCrawler accountDatabaseCrawler = new AccountDatabaseCrawler(accountsManager, accountDatabaseCrawlerCache, accountDatabaseCrawlerListeners, config.getAccountDatabaseCrawlerConfiguration().getChunkSize(), config.getAccountDatabaseCrawlerConfiguration().getChunkIntervalMs());
messagesCache.setPubSubManager(pubSubManager, pushSender);

View File

@ -68,9 +68,9 @@ public class RateLimiters {
config.getAutoBlock().getBucketSize(),
config.getAutoBlock().getLeakRatePerMinute());
this.verifyLimiter = new RateLimiter(cacheClient, "verify",
config.getVerifyNumber().getBucketSize(),
config.getVerifyNumber().getLeakRatePerMinute());
this.verifyLimiter = new LockingRateLimiter(cacheClient, "verify",
config.getVerifyNumber().getBucketSize(),
config.getVerifyNumber().getLeakRatePerMinute());
this.pinLimiter = new LockingRateLimiter(cacheClient, "pin",
config.getVerifyPin().getBucketSize(),

View File

@ -42,7 +42,7 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
private static final long WORKER_TTL_MS = 120_000L;
private static final long ACCELERATED_CHUNK_INTERVAL = 10L;
private final Accounts accounts;
private final AccountsManager accounts;
private final int chunkSize;
private final long chunkIntervalMs;
private final String workerId;
@ -52,7 +52,7 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
private AtomicBoolean running = new AtomicBoolean(false);
private boolean finished;
public AccountDatabaseCrawler(Accounts accounts,
public AccountDatabaseCrawler(AccountsManager accounts,
AccountDatabaseCrawlerCache cache,
List<AccountDatabaseCrawlerListener> listeners,
int chunkSize,

View File

@ -66,7 +66,6 @@ public class Accounts {
.bind("data", mapper.writeValueAsString(account))
.execute();
return rows == 0;
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(e);
@ -98,7 +97,6 @@ public class Accounts {
}));
}
public List<Account> getAllFrom(String from, int length) {
return database.with(jdbi -> jdbi.withHandle(handle -> {
try (Timer.Context ignored = getAllFromOffsetTimer.time()) {

View File

@ -31,6 +31,7 @@ import org.whispersystems.textsecuregcm.util.SystemMapper;
import org.whispersystems.textsecuregcm.util.Util;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import static com.codahale.metrics.MetricRegistry.name;
@ -92,6 +93,14 @@ public class AccountsManager {
}
}
public List<Account> getAllFrom(int length) {
return accounts.getAllFrom(length);
}
public List<Account> getAllFrom(String number, int length) {
return accounts.getAllFrom(number, length);
}
private void updateDirectory(Account account) {
if (account.isEnabled()) {
byte[] token = Util.getContactToken(account.getNumber());

View File

@ -19,10 +19,17 @@ package org.whispersystems.textsecuregcm.storage;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.PreparedBatch;
import org.jdbi.v3.core.statement.StatementContext;
import org.jdbi.v3.core.transaction.TransactionIsolationLevel;
import org.whispersystems.textsecuregcm.auth.StoredVerificationCode;
import org.whispersystems.textsecuregcm.storage.mappers.StoredVerificationCodeRowMapper;
import org.whispersystems.textsecuregcm.util.Constants;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Optional;
import static com.codahale.metrics.MetricRegistry.name;
@ -43,10 +50,13 @@ public class PendingAccounts {
}
public void insert(String number, String verificationCode, long timestamp) {
database.use(jdbi -> jdbi.useHandle(handle -> {
database.use(jdbi -> jdbi.useTransaction(TransactionIsolationLevel.SERIALIZABLE, handle -> {
try (Timer.Context ignored = insertTimer.time()) {
handle.createUpdate("WITH upsert AS (UPDATE pending_accounts SET verification_code = :verification_code, timestamp = :timestamp WHERE number = :number RETURNING *) " +
"INSERT INTO pending_accounts (number, verification_code, timestamp) SELECT :number, :verification_code, :timestamp WHERE NOT EXISTS (SELECT * FROM upsert)")
handle.createUpdate("DELETE FROM pending_accounts WHERE number = :number")
.bind("number", number)
.execute();
handle.createUpdate("INSERT INTO pending_accounts (number, verification_code, timestamp) VALUES (:number, :verification_code, :timestamp)")
.bind("verification_code", verificationCode)
.bind("timestamp", timestamp)
.bind("number", number)
@ -84,4 +94,6 @@ public class PendingAccounts {
}));
}
}

View File

@ -20,7 +20,6 @@ public class AccountRowMapper implements RowMapper<Account> {
try {
Account account = mapper.readValue(resultSet.getString(Accounts.DATA), Account.class);
account.setNumber(resultSet.getString(Accounts.NUMBER));
return account;
} catch (IOException e) {
throw new SQLException(e);

View File

@ -64,13 +64,13 @@ public class DeleteUserCommand extends EnvironmentCommand<WhisperServerConfigura
JdbiFactory jdbiFactory = new JdbiFactory();
Jdbi accountJdbi = jdbiFactory.build(environment, configuration.getAccountsDatabaseConfiguration(), "accountdb");
FaultTolerantDatabase accountDatabase = new FaultTolerantDatabase("account_database_delete_user", accountJdbi, configuration.getAbuseDatabaseConfiguration().getCircuitBreakerConfiguration());
FaultTolerantDatabase accountDatabase = new FaultTolerantDatabase("account_database_delete_user", accountJdbi, configuration.getAccountsDatabaseConfiguration().getCircuitBreakerConfiguration());
Accounts accounts = new Accounts(accountDatabase);
ReplicatedJedisPool cacheClient = new RedisClientFactory("main_cache_delete_command", configuration.getCacheConfiguration().getUrl(), configuration.getCacheConfiguration().getReplicaUrls(), configuration.getCacheConfiguration().getCircuitBreakerConfiguration()).getRedisClientPool();
ReplicatedJedisPool redisClient = new RedisClientFactory("directory_cache_delete_command", configuration.getDirectoryConfiguration().getRedisConfiguration().getUrl(), configuration.getDirectoryConfiguration().getRedisConfiguration().getReplicaUrls(), configuration.getDirectoryConfiguration().getRedisConfiguration().getCircuitBreakerConfiguration()).getRedisClientPool();
DirectoryQueue directoryQueue = new DirectoryQueue(configuration.getDirectoryConfiguration().getSqsConfiguration());
DirectoryManager directory = new DirectoryManager(redisClient);
DirectoryQueue directoryQueue = new DirectoryQueue (configuration.getDirectoryConfiguration().getSqsConfiguration());
DirectoryManager directory = new DirectoryManager(redisClient );
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheClient);
for (String user: users) {

View File

@ -30,7 +30,7 @@ public class VacuumCommand extends ConfiguredCommand<WhisperServerConfiguration>
WhisperServerConfiguration config)
throws Exception
{
DatabaseConfiguration accountDbConfig = config.getAccountsDatabaseConfiguration();
DatabaseConfiguration accountDbConfig = config.getAbuseDatabaseConfiguration();
DatabaseConfiguration messageDbConfig = config.getMessageStoreConfiguration();
Jdbi accountJdbi = Jdbi.create(accountDbConfig.getUrl(), accountDbConfig.getUser(), accountDbConfig.getPassword());

View File

@ -0,0 +1,189 @@
<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog
xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-2.0.xsd"
logicalFilePath="migrations.xml">
<changeSet id="1" author="moxie">
<createTable tableName="accounts">
<column name="id" type="bigint" autoIncrement="true">
<constraints primaryKey="true" nullable="false"/>
</column>
<column name="number" type="varchar(255)">
<constraints unique="true" nullable="false"/>
</column>
<column name="auth_token" type="varchar(255)">
<constraints nullable="false"/>
</column>
<column name="salt" type="varchar(255)">
<constraints nullable="false"/>
</column>
<column name="signaling_key" type="varchar(255)"/>
<column name="gcm_id" type="text"/>
<column name="apn_id" type="text"/>
<column name="supports_sms" type="smallint" defaultValue="0"/>
</createTable>
<createTable tableName="pending_accounts">
<column name="id" type="bigint" autoIncrement="true">
<constraints primaryKey="true" nullable="false"/>
</column>
<column name="number" type="varchar(255)">
<constraints unique="true" nullable="false"/>
</column>
<column name="verification_code" type="varchar(255)">
<constraints nullable="false"/>
</column>
</createTable>
<createTable tableName="keys">
<column name="id" type="bigint" autoIncrement="true">
<constraints primaryKey="true" nullable="false"/>
</column>
<column name="number" type="varchar(255)">
<constraints nullable="false"/>
</column>
<column name="key_id" type="bigint">
<constraints nullable="false"/>
</column>
<column name="public_key" type="text">
<constraints nullable="false"/>
</column>
<column name="identity_key" type="text">
<constraints nullable="false"/>
</column>
<column name="last_resort" type="smallint" defaultValue="0"/>
</createTable>
<createIndex tableName="keys" indexName="keys_number_index">
<column name="number"/>
</createIndex>
</changeSet>
<changeSet id="2" author="matt">
<addColumn tableName="accounts">
<column name="data" type="json" />
</addColumn>
<sql>CREATE type device_t AS (id integer, "authToken" varchar(255), salt varchar(255), "signalingKey" varchar(255), "gcmId" text, "apnId" text);</sql>
<sql>CREATE type account_t AS (number varchar(255), "supportsSms" smallint, devices device_t array);</sql>
<sql>UPDATE accounts SET data = row_to_json(row(number, supports_sms, array[row(1, auth_token, salt, signaling_key, gcm_id, apn_id)::device_t])::account_t)</sql>
<addNotNullConstraint tableName="accounts" columnName="data"/>
<dropColumn tableName="accounts" columnName="auth_token"/>
<dropColumn tableName="accounts" columnName="salt"/>
<dropColumn tableName="accounts" columnName="signaling_key"/>
<dropColumn tableName="accounts" columnName="gcm_id"/>
<dropColumn tableName="accounts" columnName="apn_id"/>
<dropColumn tableName="accounts" columnName="supports_sms"/>
<sql>DROP type account_t;</sql>
<sql>DROP type device_t;</sql>
<addColumn tableName="keys">
<column name="device_id" type="bigint" defaultValue="1">
<constraints nullable="false" />
</column>
</addColumn>
<createTable tableName="pending_devices">
<column name="id" type="bigint" autoIncrement="true">
<constraints primaryKey="true" nullable="false"/>
</column>
<column name="number" type="text">
<constraints unique="true" nullable="false"/>
</column>
<column name="verification_code" type="text">
<constraints nullable="false"/>
</column>
</createTable>
<createTable tableName="messages">
<column name="id" type="bigint" autoIncrement="true">
<constraints primaryKey="true" nullable="false"/>
</column>
<column name="account_id" type="bigint">
<constraints nullable="false"/>
</column>
<column name="device_id" type="bigint">
<constraints nullable="false"/>
</column>
<column name="encrypted_message" type="text">
<constraints nullable="false"/>
</column>
</createTable>
<createIndex tableName="messages" indexName="messages_account_and_device">
<column name="account_id"/>
<column name="device_id"/>
</createIndex>
</changeSet>
<changeSet id="3" author="moxie">
<sql>CREATE OR REPLACE FUNCTION "custom_json_object_set_key"(
"json" json,
"key_to_set" TEXT,
"value_to_set" anyelement
)
RETURNS json
LANGUAGE sql
IMMUTABLE
STRICT
AS $function$
SELECT COALESCE(
(SELECT ('{' || string_agg(to_json("key") || ':' || "value", ',') || '}')
FROM (SELECT *
FROM json_each("json")
WHERE "key" &lt;&gt; "key_to_set"
UNION ALL
SELECT "key_to_set", to_json("value_to_set")) AS "fields"),
'{}'
)::json
$function$;</sql>
<sql>UPDATE accounts SET data = custom_json_object_set_key(data, 'identityKey', k.identity_key) FROM keys k WHERE (data->>'identityKey')::text is null AND k.number = data->>'number' AND k.last_resort = 1;</sql>
<sql>UPDATE accounts SET data = custom_json_object_set_key(data, 'identityKey', k.identity_key) FROM keys k WHERE (data->>'identityKey')::text is null AND k.number = data->>'number';</sql>
</changeSet>
<changeSet id="4" author="moxie">
<dropColumn tableName="keys" columnName="identity_key"/>
</changeSet>
<changeSet id="5" author="moxie">
<addColumn tableName="pending_accounts">
<column name="timestamp" type="bigint" defaultValueComputed="extract(epoch from now()) * 1000">
<constraints nullable="false"/>
</column>
</addColumn>
<addColumn tableName="pending_devices">
<column name="timestamp" type="bigint" defaultValueComputed="extract(epoch from now()) * 1000">
<constraints nullable="false"/>
</column>
</addColumn>
</changeSet>
</databaseChangeLog>

View File

@ -26,6 +26,7 @@ import org.whispersystems.textsecuregcm.storage.AccountDatabaseCrawlerRestartExc
import org.junit.Before;
import org.junit.Test;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import java.util.Arrays;
import java.util.Collections;
@ -50,7 +51,7 @@ public class AccountDatabaseCrawlerTest {
private final Account account1 = mock(Account.class);
private final Account account2 = mock(Account.class);
private final Accounts accounts = mock(Accounts.class);
private final AccountsManager accounts = mock(AccountsManager.class);
private final AccountDatabaseCrawlerListener listener = mock(AccountDatabaseCrawlerListener.class);
private final AccountDatabaseCrawlerCache cache = mock(AccountDatabaseCrawlerCache.class);