Fix Backup expiration purge time

This commit is contained in:
Ravi Khadiwala 2024-04-02 16:06:39 -05:00 committed by ravi-signal
parent 498dcbbfe8
commit 268c8382ee
1 changed files with 19 additions and 16 deletions

View File

@ -15,6 +15,7 @@ import java.time.Duration;
import java.util.HexFormat; import java.util.HexFormat;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import net.sourceforge.argparse4j.inf.Namespace; import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser; import net.sourceforge.argparse4j.inf.Subparser;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -115,15 +116,14 @@ public class RemoveExpiredBackupsCommand extends EnvironmentCommand<WhisperServe
throw new RuntimeException(e); throw new RuntimeException(e);
} }
}); });
final AtomicLong backupsExpired = new AtomicLong();
final BackupManager backupManager = commandDependencies.backupManager(); final BackupManager backupManager = commandDependencies.backupManager();
backupManager final long backupsExpired = backupManager
.getExpiredBackups(segments, Schedulers.parallel(), clock.instant().plus(gracePeriod)) .getExpiredBackups(segments, Schedulers.parallel(), clock.instant().minus(gracePeriod))
.flatMap(expiredBackup -> removeExpiredBackup(backupManager, expiredBackup, dryRun), concurrency) .flatMap(expiredBackup -> removeExpiredBackup(backupManager, expiredBackup, dryRun), concurrency)
.doOnNext(ignored -> backupsExpired.incrementAndGet()) .filter(Boolean.TRUE::equals)
.then() .count()
.block(); .block();
logger.info("Expired {} backups", backupsExpired.get()); logger.info("Expired {} backups", backupsExpired);
} finally { } finally {
environment.lifecycle().getManagedObjects().forEach(managedObject -> { environment.lifecycle().getManagedObjects().forEach(managedObject -> {
try { try {
@ -135,28 +135,31 @@ public class RemoveExpiredBackupsCommand extends EnvironmentCommand<WhisperServe
} }
} }
private Mono<Void> removeExpiredBackup( private Mono<Boolean> removeExpiredBackup(
final BackupManager backupManager, final ExpiredBackup expiredBackup, final BackupManager backupManager, final ExpiredBackup expiredBackup,
final boolean dryRun) { final boolean dryRun) {
final Mono<Void> mono; final Mono<Boolean> mono;
if (dryRun) { if (dryRun) {
mono = Mono.empty(); mono = Mono.just(true);
} else { } else {
mono = Mono.fromCompletionStage(() -> backupManager.expireBackup(expiredBackup)); mono = Mono.fromCompletionStage(() -> backupManager.expireBackup(expiredBackup)).map(ignore -> true);
} }
return mono return mono
.doOnSuccess(ignored -> Metrics .doOnSuccess(ignored -> {
logger.info("incrementing metric for {}", HexFormat.of().formatHex(expiredBackup.hashedBackupId()));
Metrics
.counter(EXPIRED_BACKUPS_COUNTER_NAME, .counter(EXPIRED_BACKUPS_COUNTER_NAME,
"tier", expiredBackup.expirationType().name(), "tier", expiredBackup.expirationType().name(),
"dryRun", String.valueOf(dryRun)) "dryRun", String.valueOf(dryRun))
.increment()) .increment();
})
.onErrorResume(throwable -> { .onErrorResume(throwable -> {
logger.warn("Failed to remove tier {} for backup {}", logger.warn("Failed to remove tier {} for backup {}",
expiredBackup.expirationType(), expiredBackup.expirationType(),
HexFormat.of().formatHex(expiredBackup.hashedBackupId())); HexFormat.of().formatHex(expiredBackup.hashedBackupId()));
return Mono.empty(); return Mono.just(false);
}); });
} }