Consolidate directory reconciliation on v3 endpoints
This commit is contained in:
parent
bd820e6d2e
commit
dadf43b93e
|
@ -5,37 +5,19 @@
|
|||
package org.whispersystems.textsecuregcm.entities;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import javax.validation.constraints.NotNull;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
public class DirectoryReconciliationRequest {
|
||||
|
||||
@JsonProperty
|
||||
private UUID fromUuid;
|
||||
|
||||
@JsonProperty
|
||||
private UUID toUuid;
|
||||
|
||||
@JsonProperty
|
||||
private List<User> users;
|
||||
|
||||
public DirectoryReconciliationRequest() {
|
||||
}
|
||||
|
||||
public DirectoryReconciliationRequest(UUID fromUuid, UUID toUuid, List<User> users) {
|
||||
this.fromUuid = fromUuid;
|
||||
this.toUuid = toUuid;
|
||||
this.users = users;
|
||||
}
|
||||
|
||||
public UUID getFromUuid() {
|
||||
return fromUuid;
|
||||
}
|
||||
|
||||
public UUID getToUuid() {
|
||||
return toUuid;
|
||||
public DirectoryReconciliationRequest(List<User> users) {
|
||||
this.users = users;
|
||||
}
|
||||
|
||||
public List<User> getUsers() {
|
||||
|
@ -54,7 +36,7 @@ public class DirectoryReconciliationRequest {
|
|||
}
|
||||
|
||||
public User(UUID uuid, String number) {
|
||||
this.uuid = uuid;
|
||||
this.uuid = uuid;
|
||||
this.number = number;
|
||||
}
|
||||
|
||||
|
|
|
@ -119,9 +119,6 @@ public class AccountDatabaseCrawler implements Managed, Runnable {
|
|||
.getAccountsDynamoDbMigrationConfiguration()
|
||||
.isDynamoCrawlerEnabled();
|
||||
|
||||
listeners.stream().filter(listener -> listener instanceof DirectoryReconciler)
|
||||
.forEach(reconciler -> ((DirectoryReconciler) reconciler).setUseV3Endpoints(useDynamo));
|
||||
|
||||
final Optional<UUID> fromUuid = getLastUuid(useDynamo);
|
||||
|
||||
if (fromUuid.isEmpty()) {
|
||||
|
|
|
@ -37,7 +37,6 @@ public class DeletedAccountsDirectoryReconciler {
|
|||
errorCounter = Counter.builder(name(DeletedAccountsDirectoryReconciler.class, "error"))
|
||||
.tag("replicationName", replicationName)
|
||||
.register(Metrics.globalRegistry);
|
||||
|
||||
}
|
||||
|
||||
public void onCrawlChunk(final List<User> deletedUsers) throws ChunkProcessingFailedException {
|
||||
|
@ -45,17 +44,15 @@ public class DeletedAccountsDirectoryReconciler {
|
|||
try {
|
||||
deleteTimer.recordCallable(() -> {
|
||||
try {
|
||||
final DirectoryReconciliationResponse response = directoryReconciliationClient.delete(new DirectoryReconciliationRequest(null, null, deletedUsers));
|
||||
final DirectoryReconciliationResponse response = directoryReconciliationClient.delete(
|
||||
new DirectoryReconciliationRequest(deletedUsers));
|
||||
|
||||
if (response.getStatus() != DirectoryReconciliationResponse.Status.OK) {
|
||||
errorCounter.increment();
|
||||
|
||||
throw new ChunkProcessingFailedException("Response status: " + response.getStatus());
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
|
||||
errorCounter.increment();
|
||||
|
||||
throw new ChunkProcessingFailedException(e);
|
||||
}
|
||||
|
||||
|
|
|
@ -8,7 +8,6 @@ import static com.codahale.metrics.MetricRegistry.name;
|
|||
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
@ -28,8 +27,6 @@ public class DirectoryReconciler extends AccountDatabaseCrawlerListener {
|
|||
private final String replicationName;
|
||||
private final DirectoryReconciliationClient reconciliationClient;
|
||||
|
||||
private boolean useV3Endpoints;
|
||||
|
||||
public DirectoryReconciler(String replicationName, DirectoryReconciliationClient reconciliationClient) {
|
||||
this.reconciliationClient = reconciliationClient;
|
||||
this.replicationName = replicationName;
|
||||
|
@ -41,14 +38,7 @@ public class DirectoryReconciler extends AccountDatabaseCrawlerListener {
|
|||
|
||||
@Override
|
||||
public void onCrawlEnd(Optional<UUID> fromUuid) {
|
||||
|
||||
if (useV3Endpoints) {
|
||||
reconciliationClient.complete();
|
||||
} else {
|
||||
final DirectoryReconciliationRequest request = new DirectoryReconciliationRequest(fromUuid.orElse(null), null,
|
||||
Collections.emptyList());
|
||||
sendAdditions(request);
|
||||
}
|
||||
reconciliationClient.complete();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -69,15 +59,8 @@ public class DirectoryReconciler extends AccountDatabaseCrawlerListener {
|
|||
}
|
||||
});
|
||||
|
||||
final Optional<UUID> toUuid;
|
||||
if (!accounts.isEmpty()) {
|
||||
toUuid = Optional.of(accounts.get(accounts.size() - 1).getUuid());
|
||||
} else {
|
||||
toUuid = Optional.empty();
|
||||
}
|
||||
|
||||
addUsersRequest = new DirectoryReconciliationRequest(fromUuid.orElse(null), toUuid.orElse(null), addedUsers);
|
||||
deleteUsersRequest = new DirectoryReconciliationRequest(null, null, deletedUsers);
|
||||
addUsersRequest = new DirectoryReconciliationRequest(addedUsers);
|
||||
deleteUsersRequest = new DirectoryReconciliationRequest(deletedUsers);
|
||||
}
|
||||
|
||||
final DirectoryReconciliationResponse addUsersResponse = sendAdditions(addUsersRequest);
|
||||
|
@ -91,20 +74,12 @@ public class DirectoryReconciler extends AccountDatabaseCrawlerListener {
|
|||
}
|
||||
|
||||
private DirectoryReconciliationResponse sendDeletes(final DirectoryReconciliationRequest request) {
|
||||
if (useV3Endpoints) {
|
||||
return sendRequest(request, reconciliationClient::delete, "delete");
|
||||
}
|
||||
return sendRequest(request, reconciliationClient::delete, "delete");
|
||||
|
||||
return new DirectoryReconciliationResponse(DirectoryReconciliationResponse.Status.OK);
|
||||
}
|
||||
|
||||
private DirectoryReconciliationResponse sendAdditions(final DirectoryReconciliationRequest request) {
|
||||
|
||||
if (useV3Endpoints) {
|
||||
return sendRequest(request, reconciliationClient::sendChunkV3, "add");
|
||||
}
|
||||
|
||||
return sendRequest(request, reconciliationClient::sendChunk, "add_v2");
|
||||
return sendRequest(request, reconciliationClient::add, "add");
|
||||
}
|
||||
|
||||
private DirectoryReconciliationResponse sendRequest(final DirectoryReconciliationRequest request,
|
||||
|
@ -117,7 +92,7 @@ public class DirectoryReconciler extends AccountDatabaseCrawlerListener {
|
|||
final DirectoryReconciliationResponse response = requestHandler.apply(request);
|
||||
|
||||
if (response.getStatus() != DirectoryReconciliationResponse.Status.OK) {
|
||||
logger.warn("reconciliation error: " + response.getStatus());
|
||||
logger.warn("reconciliation error: {} ({})", response.getStatus(), context);
|
||||
}
|
||||
return response;
|
||||
} catch (ProcessingException ex) {
|
||||
|
@ -127,8 +102,4 @@ public class DirectoryReconciler extends AccountDatabaseCrawlerListener {
|
|||
});
|
||||
}
|
||||
|
||||
public void setUseV3Endpoints(final boolean useV3Endpoints) {
|
||||
this.useV3Endpoints = useV3Endpoints;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -32,25 +32,18 @@ public class DirectoryReconciliationClient {
|
|||
throws CertificateException
|
||||
{
|
||||
this.replicationUrl = directoryServerConfiguration.getReplicationUrl();
|
||||
this.client = initializeClient(directoryServerConfiguration);
|
||||
this.client = initializeClient(directoryServerConfiguration);
|
||||
|
||||
SharedMetricRegistries.getOrCreate(Constants.METRICS_NAME)
|
||||
.register(name(getClass(), directoryServerConfiguration.getReplicationName(), "days_until_certificate_expiration"),
|
||||
new CertificateExpirationGauge(CertificateUtil.getCertificate(directoryServerConfiguration.getReplicationCaCertificate())));
|
||||
}
|
||||
|
||||
public DirectoryReconciliationResponse sendChunk(DirectoryReconciliationRequest request) {
|
||||
public DirectoryReconciliationResponse add(DirectoryReconciliationRequest request) {
|
||||
return client.target(replicationUrl)
|
||||
.path("/v2/directory/reconcile")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.put(Entity.json(request), DirectoryReconciliationResponse.class);
|
||||
}
|
||||
|
||||
public DirectoryReconciliationResponse sendChunkV3(DirectoryReconciliationRequest request) {
|
||||
return client.target(replicationUrl)
|
||||
.path("/v3/directory/exists")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.put(Entity.json(request), DirectoryReconciliationResponse.class);
|
||||
.path("/v3/directory/exists")
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.put(Entity.json(request), DirectoryReconciliationResponse.class);
|
||||
}
|
||||
|
||||
public DirectoryReconciliationResponse delete(DirectoryReconciliationRequest request) {
|
||||
|
@ -68,16 +61,18 @@ public class DirectoryReconciliationClient {
|
|||
}
|
||||
|
||||
private static Client initializeClient(DirectoryServerConfiguration directoryServerConfiguration)
|
||||
throws CertificateException
|
||||
{
|
||||
KeyStore trustStore = CertificateUtil.buildKeyStoreForPem(directoryServerConfiguration.getReplicationCaCertificate());
|
||||
throws CertificateException {
|
||||
KeyStore trustStore = CertificateUtil.buildKeyStoreForPem(
|
||||
directoryServerConfiguration.getReplicationCaCertificate());
|
||||
SSLContext sslContext = SslConfigurator.newInstance()
|
||||
.securityProtocol("TLSv1.2")
|
||||
.trustStore(trustStore)
|
||||
.createSSLContext();
|
||||
.securityProtocol("TLSv1.2")
|
||||
.trustStore(trustStore)
|
||||
.createSSLContext();
|
||||
|
||||
return ClientBuilder.newBuilder()
|
||||
.register(HttpAuthenticationFeature.basic("signal", directoryServerConfiguration.getReplicationPassword().getBytes()))
|
||||
.sslContext(sslContext)
|
||||
.build();
|
||||
.register(
|
||||
HttpAuthenticationFeature.basic("signal", directoryServerConfiguration.getReplicationPassword().getBytes()))
|
||||
.sslContext(sslContext)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,8 +18,7 @@ import java.util.List;
|
|||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest;
|
||||
import org.whispersystems.textsecuregcm.entities.DirectoryReconciliationRequest.User;
|
||||
|
@ -53,13 +52,10 @@ class DirectoryReconcilerTest {
|
|||
when(undiscoverableAccount.shouldBeVisibleInDirectory()).thenReturn(false);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testCrawlChunkValid(final boolean useV3Endpoints) throws AccountDatabaseCrawlerRestartException {
|
||||
directoryReconciler.setUseV3Endpoints(useV3Endpoints);
|
||||
@Test
|
||||
void testCrawlChunkValid() throws AccountDatabaseCrawlerRestartException {
|
||||
|
||||
when(reconciliationClient.sendChunk(any())).thenReturn(successResponse);
|
||||
when(reconciliationClient.sendChunkV3(any())).thenReturn(successResponse);
|
||||
when(reconciliationClient.add(any())).thenReturn(successResponse);
|
||||
when(reconciliationClient.delete(any())).thenReturn(successResponse);
|
||||
|
||||
directoryReconciler.timeAndProcessCrawlChunk(Optional.of(VALID_UUID),
|
||||
|
@ -67,24 +63,16 @@ class DirectoryReconcilerTest {
|
|||
|
||||
ArgumentCaptor<DirectoryReconciliationRequest> chunkRequest = ArgumentCaptor.forClass(
|
||||
DirectoryReconciliationRequest.class);
|
||||
if (useV3Endpoints) {
|
||||
verify(reconciliationClient, times(1)).sendChunkV3(chunkRequest.capture());
|
||||
} else {
|
||||
verify(reconciliationClient, times(1)).sendChunk(chunkRequest.capture());
|
||||
}
|
||||
verify(reconciliationClient, times(1)).add(chunkRequest.capture());
|
||||
|
||||
assertThat(chunkRequest.getValue().getFromUuid()).isEqualTo(VALID_UUID);
|
||||
assertThat(chunkRequest.getValue().getToUuid()).isEqualTo(UNDISCOVERABLE_UUID);
|
||||
assertThat(chunkRequest.getValue().getUsers()).isEqualTo(List.of(new User(VALID_UUID, VALID_NUMBER)));
|
||||
|
||||
if (useV3Endpoints) {
|
||||
ArgumentCaptor<DirectoryReconciliationRequest> deletesRequest = ArgumentCaptor.forClass(
|
||||
DirectoryReconciliationRequest.class);
|
||||
verify(reconciliationClient, times(1)).delete(deletesRequest.capture());
|
||||
|
||||
ArgumentCaptor<DirectoryReconciliationRequest> deletesRequest = ArgumentCaptor.forClass(DirectoryReconciliationRequest.class);
|
||||
verify(reconciliationClient, times(1)).delete(deletesRequest.capture());
|
||||
|
||||
assertThat(deletesRequest.getValue().getUsers()).isEqualTo(
|
||||
List.of(new User(UNDISCOVERABLE_UUID, UNDISCOVERABLE_NUMBER)));
|
||||
}
|
||||
assertThat(deletesRequest.getValue().getUsers()).isEqualTo(
|
||||
List.of(new User(UNDISCOVERABLE_UUID, UNDISCOVERABLE_NUMBER)));
|
||||
|
||||
verifyNoMoreInteractions(reconciliationClient);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue