From 84be8cc045019874436033ae5b0227f4e80a3b32 Mon Sep 17 00:00:00 2001 From: Moxie Marlinspike Date: Mon, 24 Apr 2017 18:46:06 -0700 Subject: [PATCH] Add push command // FREEBIE --- .../textsecuregcm/WhisperServerService.java | 2 + .../textsecuregcm/push/PushSender.java | 2 +- .../textsecuregcm/storage/Messages.java | 13 ++ .../textsecuregcm/workers/PushCommand.java | 173 ++++++++++++++++++ 4 files changed, 189 insertions(+), 1 deletion(-) create mode 100644 src/main/java/org/whispersystems/textsecuregcm/workers/PushCommand.java diff --git a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 6ae5dcc2e..218fa2ee7 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -89,6 +89,7 @@ import org.whispersystems.textsecuregcm.websocket.WebSocketAccountAuthenticator; import org.whispersystems.textsecuregcm.workers.DeleteUserCommand; import org.whispersystems.textsecuregcm.workers.DirectoryCommand; import org.whispersystems.textsecuregcm.workers.PeriodicStatsCommand; +import org.whispersystems.textsecuregcm.workers.PushCommand; import org.whispersystems.textsecuregcm.workers.TrimMessagesCommand; import org.whispersystems.textsecuregcm.workers.VacuumCommand; import org.whispersystems.websocket.WebSocketResourceProviderFactory; @@ -123,6 +124,7 @@ public class WhisperServerService extends Application("accountdb", "accountsdb.xml") { @Override public DataSourceFactory getDataSourceFactory(WhisperServerConfiguration configuration) { diff --git a/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java b/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java index 3d2295a0a..991de7cbc 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java +++ b/src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java @@ -41,7 +41,7 @@ public class PushSender implements Managed { private final Logger logger = LoggerFactory.getLogger(PushSender.class); - private static final String APN_PAYLOAD = "{\"aps\":{\"sound\":\"default\",\"badge\":%d,\"alert\":{\"loc-key\":\"APN_Message\"}}}"; + public static final String APN_PAYLOAD = "{\"aps\":{\"sound\":\"default\",\"badge\":%d,\"alert\":{\"loc-key\":\"APN_Message\"}}}"; private final ApnFallbackManager apnFallbackManager; private final PushServiceClient pushServiceClient; diff --git a/src/main/java/org/whispersystems/textsecuregcm/storage/Messages.java b/src/main/java/org/whispersystems/textsecuregcm/storage/Messages.java index 54be51292..deb2723c6 100644 --- a/src/main/java/org/whispersystems/textsecuregcm/storage/Messages.java +++ b/src/main/java/org/whispersystems/textsecuregcm/storage/Messages.java @@ -12,6 +12,7 @@ import org.skife.jdbi.v2.sqlobject.customizers.Mapper; import org.skife.jdbi.v2.tweak.ResultSetMapper; import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope; import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity; +import org.whispersystems.textsecuregcm.util.Pair; import java.lang.annotation.Annotation; import java.lang.annotation.ElementType; @@ -56,6 +57,10 @@ public abstract class Messages { @Bind("source") String source, @Bind("timestamp") long timestamp); + @Mapper(DestinationMapper.class) + @SqlQuery("SELECT DISTINCT ON (destination, destination_device) destination, destination_device FROM messages WHERE timestamp > :timestamp ORDER BY destination, destination_device OFFSET :offset LIMIT :limit") + public abstract List> getPendingDestinations(@Bind("timestamp") long sinceTimestamp, @Bind("offset") int offset, @Bind("limit") int limit); + @Mapper(MessageMapper.class) @SqlUpdate("DELETE FROM messages WHERE " + ID + " = :id AND " + DESTINATION + " = :destination") abstract void remove(@Bind("destination") String destination, @Bind("id") long id); @@ -72,6 +77,14 @@ public abstract class Messages { @SqlUpdate("VACUUM messages") public abstract void vacuum(); + public static class DestinationMapper implements ResultSetMapper> { + + @Override + public Pair map(int i, ResultSet resultSet, StatementContext statementContext) throws SQLException { + return new Pair<>(resultSet.getString(DESTINATION), resultSet.getInt(DESTINATION_DEVICE)); + } + } + public static class MessageMapper implements ResultSetMapper { @Override public OutgoingMessageEntity map(int i, ResultSet resultSet, StatementContext statementContext) diff --git a/src/main/java/org/whispersystems/textsecuregcm/workers/PushCommand.java b/src/main/java/org/whispersystems/textsecuregcm/workers/PushCommand.java new file mode 100644 index 000000000..2f0cb1f89 --- /dev/null +++ b/src/main/java/org/whispersystems/textsecuregcm/workers/PushCommand.java @@ -0,0 +1,173 @@ +package org.whispersystems.textsecuregcm.workers; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.google.common.base.Optional; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import org.glassfish.jersey.client.ClientProperties; +import org.skife.jdbi.v2.DBI; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.whispersystems.textsecuregcm.WhisperServerConfiguration; +import org.whispersystems.textsecuregcm.entities.ApnMessage; +import org.whispersystems.textsecuregcm.entities.GcmMessage; +import org.whispersystems.textsecuregcm.providers.RedisClientFactory; +import org.whispersystems.textsecuregcm.push.ApnFallbackManager; +import org.whispersystems.textsecuregcm.push.PushSender; +import org.whispersystems.textsecuregcm.push.PushServiceClient; +import org.whispersystems.textsecuregcm.push.TransientPushFailureException; +import org.whispersystems.textsecuregcm.storage.Account; +import org.whispersystems.textsecuregcm.storage.Accounts; +import org.whispersystems.textsecuregcm.storage.AccountsManager; +import org.whispersystems.textsecuregcm.storage.Device; +import org.whispersystems.textsecuregcm.storage.DirectoryManager; +import org.whispersystems.textsecuregcm.storage.Messages; +import org.whispersystems.textsecuregcm.util.Pair; +import org.whispersystems.textsecuregcm.util.Util; + +import javax.ws.rs.client.Client; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import io.dropwizard.Application; +import io.dropwizard.cli.EnvironmentCommand; +import io.dropwizard.client.JerseyClientBuilder; +import io.dropwizard.jdbi.DBIFactory; +import io.dropwizard.setup.Environment; +import redis.clients.jedis.JedisPool; + +public class PushCommand extends EnvironmentCommand { + + private final Logger logger = LoggerFactory.getLogger(DirectoryCommand.class); + + private static final int LIMIT = 1000; + + public PushCommand() { + super(new Application() { + @Override + public void run(WhisperServerConfiguration configuration, Environment environment) + throws Exception + { + + } + }, "push", "send pushes"); + } + + @Override + public void configure(Subparser subparser) { + super.configure(subparser); + subparser.addArgument("-t", "--time") + .dest("timestamp") + .type(Long.class) + .required(true) + .help("The starting timestamp to notify users from"); + + subparser.addArgument("-o", "--offset") + .dest("offset") + .type(Integer.class) + .required(true) + .help("The starting offset in the user query"); + } + + @Override + protected void run(Environment environment, Namespace namespace, + WhisperServerConfiguration configuration) + throws Exception + { + try { + long timestampStart = namespace.getLong("timestamp"); + int offset = namespace.getInt("offset"); + + environment.getObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + DBIFactory dbiFactory = new DBIFactory(); + DBI database = dbiFactory.build(environment, configuration.getDataSourceFactory(), "accountdb" ); + DBI messagedb = dbiFactory.build(environment, configuration.getMessageStoreConfiguration(), "messagedb"); + + Accounts accounts = database.onDemand(Accounts.class); + Messages messages = messagedb.onDemand(Messages.class); + + JedisPool cacheClient = new RedisClientFactory(configuration.getCacheConfiguration().getUrl()).getRedisClientPool(); + JedisPool redisClient = new RedisClientFactory(configuration.getDirectoryConfiguration().getUrl()).getRedisClientPool(); + DirectoryManager directory = new DirectoryManager(redisClient); + AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheClient); + + Client httpClient = initializeHttpClient(environment, configuration); + PushServiceClient pushServiceClient = new PushServiceClient(httpClient, configuration.getPushConfiguration()); + + while (true) { + List> pendingDestinations = messages.getPendingDestinations(timestampStart, offset, LIMIT); + + if (pendingDestinations == null || pendingDestinations.size() == 0) { + break; + } + + for (Pair pendingDestination : pendingDestinations) { + Optional account = accountsManager.get(pendingDestination.first()); + + if (account.isPresent()) { + Optional device = account.get().getDevice(pendingDestination.second()); + + if (device.isPresent()) { + if (device.get().getGcmId() != null) { + sendGcm(pushServiceClient, account.get(), device.get()); + } else if (device.get().getApnId() != null) { + sendApn(pushServiceClient, account.get(), device.get()); + } + } else { + logger.warn("No device found: " + pendingDestination.first() + ", " + pendingDestination.second()); + } + } else { + logger.warn("No account found: " + pendingDestination.first()); + } + } + + logger.warn("Processed " + LIMIT + "..."); + offset += LIMIT; + } + + logger.warn("Finished!"); + + } catch (Exception ex) { + logger.warn("Exception", ex); + } + } + + private void sendGcm(PushServiceClient pushServiceClient, Account account, Device device) { + try { + GcmMessage gcmMessage = new GcmMessage(device.getGcmId(), account.getNumber(), + (int)device.getId(), "", false, true); + + logger.warn("Sending GCM: " + account.getNumber()); + pushServiceClient.send(gcmMessage); + } catch (TransientPushFailureException e) { + logger.warn("Push failure", e); + } + } + + private void sendApn(PushServiceClient pushServiceClient, Account account, Device device) { + if (!Util.isEmpty(device.getVoipApnId())) { + try { + ApnMessage apnMessage = new ApnMessage(device.getVoipApnId(), account.getNumber(), (int)device.getId(), + String.format(PushSender.APN_PAYLOAD, 1), + true, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(ApnFallbackManager.FALLBACK_DURATION)); + + logger.warn("Sending APN: " + account.getNumber()); + pushServiceClient.send(apnMessage); + } catch (TransientPushFailureException e) { + logger.warn("SILENT PUSH LOSS", e); + } + } + } + + private Client initializeHttpClient(Environment environment, WhisperServerConfiguration config) { + Client httpClient = new JerseyClientBuilder(environment).using(config.getJerseyClientConfiguration()) + .build(getName()); + + httpClient.property(ClientProperties.CONNECT_TIMEOUT, 1000); + httpClient.property(ClientProperties.READ_TIMEOUT, 1000); + + return httpClient; + } + +}