Stop timers on Flux termination

This commit is contained in:
Chris Eager 2024-01-09 14:57:47 -06:00 committed by Chris Eager
parent cc6cf8194f
commit 4a2cbb9ec7
2 changed files with 13 additions and 10 deletions

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2021-2022 Signal Messenger, LLC * Copyright 2021 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only * SPDX-License-Identifier: AGPL-3.0-only
*/ */
@ -60,8 +60,8 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
private static final String KEY_ENVELOPE_BYTES = "EB"; private static final String KEY_ENVELOPE_BYTES = "EB";
private final Timer storeTimer = timer(name(getClass(), "store")); private final Timer storeTimer = timer(name(getClass(), "store"));
private final Timer deleteByAccount = timer(name(getClass(), "delete", "account")); private final String DELETE_BY_ACCOUNT_TIMER_NAME = name(getClass(), "delete", "account");
private final Timer deleteByDevice = timer(name(getClass(), "delete", "device")); private final String DELETE_BY_DEVICE_TIMER_NAME = name(getClass(), "delete", "device");
private final DynamoDbAsyncClient dbAsyncClient; private final DynamoDbAsyncClient dbAsyncClient;
private final String tableName; private final String tableName;
@ -237,8 +237,9 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
KEY_SORT, item.get(KEY_SORT))) KEY_SORT, item.get(KEY_SORT)))
.build())), .build())),
DYNAMO_DB_MAX_BATCH_SIZE) DYNAMO_DB_MAX_BATCH_SIZE)
.doOnComplete(() -> sample.stop(deleteByAccount))
.then() .then()
.doOnSuccess(ignored -> sample.stop(timer(DELETE_BY_ACCOUNT_TIMER_NAME, "outcome", "success")))
.doOnError(ignored -> sample.stop(timer(DELETE_BY_ACCOUNT_TIMER_NAME, "outcome", "error")))
.toFuture(); .toFuture();
} }
@ -267,8 +268,9 @@ public class MessagesDynamoDb extends AbstractDynamoDbStore {
KEY_SORT, item.get(KEY_SORT))) KEY_SORT, item.get(KEY_SORT)))
.build())), .build())),
DYNAMO_DB_MAX_BATCH_SIZE) DYNAMO_DB_MAX_BATCH_SIZE)
.doOnComplete(() -> sample.stop(deleteByDevice))
.then() .then()
.doOnSuccess(ignored -> sample.stop(timer(DELETE_BY_DEVICE_TIMER_NAME, "outcome", "success")))
.doOnError(ignored -> sample.stop(timer(DELETE_BY_DEVICE_TIMER_NAME, "outcome", "error")))
.toFuture(); .toFuture();
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2013-2021 Signal Messenger, LLC * Copyright 2013 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only * SPDX-License-Identifier: AGPL-3.0-only
*/ */
@ -79,7 +79,7 @@ public class Profiles {
private static final Timer SET_PROFILES_TIMER = Metrics.timer(name(Profiles.class, "set")); private static final Timer SET_PROFILES_TIMER = Metrics.timer(name(Profiles.class, "set"));
private static final Timer GET_PROFILE_TIMER = Metrics.timer(name(Profiles.class, "get")); private static final Timer GET_PROFILE_TIMER = Metrics.timer(name(Profiles.class, "get"));
private static final Timer DELETE_PROFILES_TIMER = Metrics.timer(name(Profiles.class, "delete")); private static final String DELETE_PROFILES_TIMER_NAME = name(Profiles.class, "delete");
private static final String PARSE_BYTE_ARRAY_COUNTER_NAME = name(Profiles.class, "parseByteArray"); private static final String PARSE_BYTE_ARRAY_COUNTER_NAME = name(Profiles.class, "parseByteArray");
private static final int MAX_CONCURRENCY = 32; private static final int MAX_CONCURRENCY = 32;
@ -188,7 +188,7 @@ public class Profiles {
@VisibleForTesting @VisibleForTesting
static Map<String, AttributeValue> buildUpdateExpressionAttributeValues(final VersionedProfile profile) { static Map<String, AttributeValue> buildUpdateExpressionAttributeValues(final VersionedProfile profile) {
final Map<String, AttributeValue> expressionValues = new HashMap<>(); final Map<String, AttributeValue> expressionValues = new HashMap<>();
expressionValues.put(":commitment", AttributeValues.fromByteArray(profile.commitment())); expressionValues.put(":commitment", AttributeValues.fromByteArray(profile.commitment()));
if (profile.name() != null) { if (profile.name() != null) {
@ -210,7 +210,7 @@ public class Profiles {
if (profile.paymentAddress() != null) { if (profile.paymentAddress() != null) {
expressionValues.put(":paymentAddress", AttributeValues.fromByteArray(profile.paymentAddress())); expressionValues.put(":paymentAddress", AttributeValues.fromByteArray(profile.paymentAddress()));
} }
if (profile.phoneNumberSharing() != null) { if (profile.phoneNumberSharing() != null) {
expressionValues.put(":phoneNumberSharing", AttributeValues.fromByteArray(profile.phoneNumberSharing())); expressionValues.put(":phoneNumberSharing", AttributeValues.fromByteArray(profile.phoneNumberSharing()));
} }
@ -281,8 +281,9 @@ public class Profiles {
KEY_ACCOUNT_UUID, uuidAttributeValue, KEY_ACCOUNT_UUID, uuidAttributeValue,
ATTR_VERSION, item.get(ATTR_VERSION))) ATTR_VERSION, item.get(ATTR_VERSION)))
.build())), MAX_CONCURRENCY) .build())), MAX_CONCURRENCY)
.doOnComplete(() -> sample.stop(DELETE_PROFILES_TIMER))
.then() .then()
.doOnSuccess(ignored -> sample.stop(Metrics.timer(DELETE_PROFILES_TIMER_NAME, "outcome", "success")))
.doOnError(ignored -> sample.stop(Metrics.timer(DELETE_PROFILES_TIMER_NAME, "outcome", "error")))
.toFuture(); .toFuture();
} }
} }