Expire in-memory queues after 30 days of inactivity.
// FREEBIE
This commit is contained in:
parent
1f0acd0622
commit
08291502eb
|
@ -27,6 +27,7 @@ import org.whispersystems.textsecuregcm.websocket.WebsocketAddress;
|
||||||
|
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static com.codahale.metrics.MetricRegistry.name;
|
import static com.codahale.metrics.MetricRegistry.name;
|
||||||
import static org.whispersystems.textsecuregcm.entities.MessageProtos.OutgoingMessageSignal;
|
import static org.whispersystems.textsecuregcm.entities.MessageProtos.OutgoingMessageSignal;
|
||||||
|
@ -57,14 +58,17 @@ public class StoredMessages {
|
||||||
|
|
||||||
public void insert(WebsocketAddress address, OutgoingMessageSignal message) {
|
public void insert(WebsocketAddress address, OutgoingMessageSignal message) {
|
||||||
try (Jedis jedis = jedisPool.getResource()) {
|
try (Jedis jedis = jedisPool.getResource()) {
|
||||||
|
byte[] queue = getKey(address);
|
||||||
StoredMessage storedMessage = StoredMessage.newBuilder()
|
StoredMessage storedMessage = StoredMessage.newBuilder()
|
||||||
.setType(StoredMessage.Type.MESSAGE)
|
.setType(StoredMessage.Type.MESSAGE)
|
||||||
.setContent(message.toByteString())
|
.setContent(message.toByteString())
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
long queueSize = jedis.lpush(getKey(address), storedMessage.toByteArray());
|
long queueSize = jedis.lpush(queue, storedMessage.toByteArray());
|
||||||
queueSizeHistogram.update(queueSize);
|
queueSizeHistogram.update(queueSize);
|
||||||
|
|
||||||
|
jedis.expireAt(queue, (System.currentTimeMillis() / 1000) + TimeUnit.DAYS.toSeconds(30));
|
||||||
|
|
||||||
if (queueSize > 1000) {
|
if (queueSize > 1000) {
|
||||||
jedis.ltrim(getKey(address), 0, 999);
|
jedis.ltrim(getKey(address), 0, 999);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue