Directory update bug fix.
This commit is contained in:
parent
67e5794722
commit
53de38fc06
|
@ -26,6 +26,8 @@ import com.yammer.dropwizard.jdbi.args.OptionalArgumentFactory;
|
||||||
import net.sourceforge.argparse4j.inf.Namespace;
|
import net.sourceforge.argparse4j.inf.Namespace;
|
||||||
import net.spy.memcached.MemcachedClient;
|
import net.spy.memcached.MemcachedClient;
|
||||||
import org.skife.jdbi.v2.DBI;
|
import org.skife.jdbi.v2.DBI;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
|
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
|
||||||
import org.whispersystems.textsecuregcm.federation.FederatedClientManager;
|
import org.whispersystems.textsecuregcm.federation.FederatedClientManager;
|
||||||
import org.whispersystems.textsecuregcm.providers.MemcachedClientFactory;
|
import org.whispersystems.textsecuregcm.providers.MemcachedClientFactory;
|
||||||
|
@ -38,6 +40,8 @@ import redis.clients.jedis.JedisPool;
|
||||||
|
|
||||||
public class DirectoryCommand extends ConfiguredCommand<WhisperServerConfiguration> {
|
public class DirectoryCommand extends ConfiguredCommand<WhisperServerConfiguration> {
|
||||||
|
|
||||||
|
private final Logger logger = LoggerFactory.getLogger(DirectoryCommand.class);
|
||||||
|
|
||||||
public DirectoryCommand() {
|
public DirectoryCommand() {
|
||||||
super("directory", "Update directory from DB and peers.");
|
super("directory", "Update directory from DB and peers.");
|
||||||
}
|
}
|
||||||
|
@ -68,6 +72,9 @@ public class DirectoryCommand extends ConfiguredCommand<WhisperServerConfigurati
|
||||||
|
|
||||||
update.updateFromLocalDatabase();
|
update.updateFromLocalDatabase();
|
||||||
update.updateFromPeers();
|
update.updateFromPeers();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
logger.warn("Directory Exception", ex);
|
||||||
|
throw new RuntimeException(ex);
|
||||||
} finally {
|
} finally {
|
||||||
Thread.sleep(3000);
|
Thread.sleep(3000);
|
||||||
System.exit(0);
|
System.exit(0);
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.whispersystems.textsecuregcm.storage.AccountsManager;
|
||||||
import org.whispersystems.textsecuregcm.storage.DirectoryManager;
|
import org.whispersystems.textsecuregcm.storage.DirectoryManager;
|
||||||
import org.whispersystems.textsecuregcm.storage.DirectoryManager.BatchOperationHandle;
|
import org.whispersystems.textsecuregcm.storage.DirectoryManager.BatchOperationHandle;
|
||||||
import org.whispersystems.textsecuregcm.util.Base64;
|
import org.whispersystems.textsecuregcm.util.Base64;
|
||||||
|
import org.whispersystems.textsecuregcm.util.Hex;
|
||||||
import org.whispersystems.textsecuregcm.util.Util;
|
import org.whispersystems.textsecuregcm.util.Util;
|
||||||
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
@ -85,7 +86,7 @@ public class DirectoryUpdater {
|
||||||
|
|
||||||
for (FederatedClient client : clients) {
|
for (FederatedClient client : clients) {
|
||||||
logger.info("Updating directory from peer: " + client.getPeerName());
|
logger.info("Updating directory from peer: " + client.getPeerName());
|
||||||
BatchOperationHandle handle = directory.startBatchOperation();
|
// BatchOperationHandle handle = directory.startBatchOperation();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
int userCount = client.getUserCount();
|
int userCount = client.getUserCount();
|
||||||
|
@ -94,31 +95,38 @@ public class DirectoryUpdater {
|
||||||
logger.info("Remote peer user count: " + userCount);
|
logger.info("Remote peer user count: " + userCount);
|
||||||
|
|
||||||
while (retrieved < userCount) {
|
while (retrieved < userCount) {
|
||||||
|
logger.info("Retrieving remote tokens...");
|
||||||
List<ClientContact> clientContacts = client.getUserTokens(retrieved);
|
List<ClientContact> clientContacts = client.getUserTokens(retrieved);
|
||||||
|
|
||||||
if (clientContacts == null)
|
if (clientContacts == null) {
|
||||||
|
logger.info("Remote tokens empty, ending...");
|
||||||
break;
|
break;
|
||||||
|
} else {
|
||||||
|
logger.info("Retrieved " + clientContacts.size() + " remote tokens...");
|
||||||
|
}
|
||||||
|
|
||||||
for (ClientContact clientContact : clientContacts) {
|
for (ClientContact clientContact : clientContacts) {
|
||||||
clientContact.setRelay(client.getPeerName());
|
clientContact.setRelay(client.getPeerName());
|
||||||
|
|
||||||
Optional<ClientContact> existing = directory.get(clientContact.getToken());
|
Optional<ClientContact> existing = directory.get(clientContact.getToken());
|
||||||
|
|
||||||
if (!clientContact.isInactive() && (!existing.isPresent() || existing.get().getRelay().equals(client.getPeerName()))) {
|
if (!clientContact.isInactive() && (!existing.isPresent() || client.getPeerName().equals(existing.get().getRelay()))) {
|
||||||
directory.add(handle, clientContact);
|
// directory.add(handle, clientContact);
|
||||||
|
directory.add(clientContact);
|
||||||
} else {
|
} else {
|
||||||
if (existing != null && client.getPeerName().equals(existing.get().getRelay())) {
|
if (existing.isPresent() && client.getPeerName().equals(existing.get().getRelay())) {
|
||||||
directory.remove(clientContact.getToken());
|
directory.remove(clientContact.getToken());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
retrieved += clientContacts.size();
|
retrieved += clientContacts.size();
|
||||||
|
logger.info("Processed: " + retrieved + " remote tokens.");
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info("Update from peer complete.");
|
logger.info("Update from peer complete.");
|
||||||
} finally {
|
} finally {
|
||||||
directory.stopBatchOperation(handle);
|
// directory.stopBatchOperation(handle);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue