Modify the "migrate profiles" command to accept a list of UUIDs/versions from a CSV file
This commit is contained in:
parent
65b49b2d9c
commit
bb4f4bc441
|
@ -9,10 +9,16 @@ import io.dropwizard.Application;
|
||||||
import io.dropwizard.cli.EnvironmentCommand;
|
import io.dropwizard.cli.EnvironmentCommand;
|
||||||
import io.dropwizard.jdbi3.JdbiFactory;
|
import io.dropwizard.jdbi3.JdbiFactory;
|
||||||
import io.dropwizard.setup.Environment;
|
import io.dropwizard.setup.Environment;
|
||||||
|
import java.io.FileReader;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.UUID;
|
||||||
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.Semaphore;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import net.sourceforge.argparse4j.inf.Namespace;
|
import net.sourceforge.argparse4j.inf.Namespace;
|
||||||
import net.sourceforge.argparse4j.inf.Subparser;
|
import net.sourceforge.argparse4j.inf.Subparser;
|
||||||
|
import org.apache.commons.csv.CSVFormat;
|
||||||
|
import org.apache.commons.csv.CSVRecord;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.jdbi.v3.core.Jdbi;
|
import org.jdbi.v3.core.Jdbi;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -53,6 +59,12 @@ public class MigrateProfilesCommand extends EnvironmentCommand<WhisperServerConf
|
||||||
.required(false)
|
.required(false)
|
||||||
.setDefault(64)
|
.setDefault(64)
|
||||||
.help("The maximum number of concurrent DynamoDB requests");
|
.help("The maximum number of concurrent DynamoDB requests");
|
||||||
|
|
||||||
|
subparser.addArgument("-f", "--file")
|
||||||
|
.dest("file")
|
||||||
|
.type(String.class)
|
||||||
|
.required(false)
|
||||||
|
.help("A CSV containing UUID/version pairs to migrate; if not specified, all profiles are migrated");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -76,12 +88,40 @@ public class MigrateProfilesCommand extends EnvironmentCommand<WhisperServerConf
|
||||||
ProfilesDynamoDb profilesDynamoDb = new ProfilesDynamoDb(dynamoDbClient, dynamoDbAsyncClient,
|
ProfilesDynamoDb profilesDynamoDb = new ProfilesDynamoDb(dynamoDbClient, dynamoDbAsyncClient,
|
||||||
configuration.getDynamoDbTables().getProfiles().getTableName());
|
configuration.getDynamoDbTables().getProfiles().getTableName());
|
||||||
|
|
||||||
final int fetchSize = namespace.getInt("fetchSize");
|
final String csvFile = namespace.getString("file");
|
||||||
final int concurrency = namespace.getInt("concurrency");
|
|
||||||
|
|
||||||
|
if (StringUtils.isNotBlank(csvFile)) {
|
||||||
|
migrateFromCsvFile(profiles, profilesDynamoDb, csvFile);
|
||||||
|
} else {
|
||||||
|
final int fetchSize = namespace.getInt("fetchSize");
|
||||||
|
final int concurrency = namespace.getInt("concurrency");
|
||||||
|
|
||||||
|
migrateAll(profiles, profilesDynamoDb, concurrency, fetchSize);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void migrateFromCsvFile(final Profiles profiles, final ProfilesDynamoDb profilesDynamoDb, final String csvFile)
|
||||||
|
throws IOException {
|
||||||
|
log.info("Beginning migration of profiles specified in {}", csvFile);
|
||||||
|
|
||||||
|
try (final FileReader fileReader = new FileReader(csvFile)) {
|
||||||
|
for (final CSVRecord csvRecord : CSVFormat.DEFAULT.parse(fileReader)) {
|
||||||
|
final UUID uuid = UUID.fromString(csvRecord.get(0));
|
||||||
|
final String version = csvRecord.get(1);
|
||||||
|
|
||||||
|
profiles.get(uuid, version).ifPresent(profile -> profilesDynamoDb.set(uuid, profile));
|
||||||
|
log.info("Migrated {}/{}", uuid, version);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("Done");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void migrateAll(final Profiles profiles, final ProfilesDynamoDb profilesDynamoDb, final int concurrency, final int fetchSize)
|
||||||
|
throws InterruptedException {
|
||||||
final Semaphore semaphore = new Semaphore(concurrency);
|
final Semaphore semaphore = new Semaphore(concurrency);
|
||||||
|
|
||||||
log.info("Beginning migration");
|
log.info("Beginning migration of all profiles");
|
||||||
|
|
||||||
final AtomicInteger profilesProcessed = new AtomicInteger(0);
|
final AtomicInteger profilesProcessed = new AtomicInteger(0);
|
||||||
final AtomicInteger profilesMigrated = new AtomicInteger(0);
|
final AtomicInteger profilesMigrated = new AtomicInteger(0);
|
||||||
|
|
Loading…
Reference in New Issue