From 0e074d3a5aae3d7fdb8b7a8aa307eb4a7e5c8ba7 Mon Sep 17 00:00:00 2001 From: Ehren Kret Date: Fri, 7 Aug 2020 14:43:03 -0500 Subject: [PATCH] Copy SignalFxMeterRegistry into a new class to get better logging --- .../textsecuregcm/WhisperServerService.java | 4 +- .../metrics/SignalSignalFxMeterRegistry.java | 216 ++++++++++++++++++ 2 files changed, 218 insertions(+), 2 deletions(-) create mode 100644 service/src/main/java/org/whispersystems/textsecuregcm/metrics/SignalSignalFxMeterRegistry.java diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java index 897c1f4e9..8db2d9c88 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java @@ -51,7 +51,6 @@ import io.micrometer.core.instrument.distribution.DistributionStatisticConfig; import io.micrometer.datadog.DatadogConfig; import io.micrometer.datadog.DatadogMeterRegistry; import io.micrometer.signalfx.SignalFxConfig; -import io.micrometer.signalfx.SignalFxMeterRegistry; import io.micrometer.wavefront.WavefrontConfig; import io.micrometer.wavefront.WavefrontMeterRegistry; import org.bouncycastle.jce.provider.BouncyCastleProvider; @@ -99,6 +98,7 @@ import org.whispersystems.textsecuregcm.metrics.MetricsApplicationEventListener; import org.whispersystems.textsecuregcm.metrics.NetworkReceivedGauge; import org.whispersystems.textsecuregcm.metrics.NetworkSentGauge; import org.whispersystems.textsecuregcm.metrics.PushLatencyManager; +import org.whispersystems.textsecuregcm.metrics.SignalSignalFxMeterRegistry; import org.whispersystems.textsecuregcm.metrics.TrafficSource; import org.whispersystems.textsecuregcm.providers.RedisClientFactory; import org.whispersystems.textsecuregcm.providers.RedisClusterHealthCheck; @@ -296,7 +296,7 @@ public class WhisperServerService extends Application onSendErrorHandlerCollection = Collections.singleton(metricError -> { + final SignalFxMetricsException exception = metricError.getException(); + if (exception != null ) { + this.logger.warn("failed to send metrics: " + metricError.getMessage(), exception); + } else { + this.logger.warn("failed to send metrics: {}", metricError.getMessage()); + } + }); + + public SignalSignalFxMeterRegistry(SignalFxConfig config, Clock clock) { + this(config, clock, DEFAULT_THREAD_FACTORY); + } + + public SignalSignalFxMeterRegistry(SignalFxConfig config, Clock clock, ThreadFactory threadFactory) { + super(config, clock); + this.config = config; + + URI apiUri = URI.create(config.uri()); + int port = apiUri.getPort(); + if (port == -1) { + if ("http" .equals(apiUri.getScheme())) { + port = 80; + } else if ("https" .equals(apiUri.getScheme())) { + port = 443; + } + } + + SignalFxReceiverEndpoint signalFxEndpoint = new SignalFxEndpoint(apiUri.getScheme(), apiUri.getHost(), port); + this.dataPointReceiverFactory = new HttpDataPointProtobufReceiverFactory(signalFxEndpoint); + this.eventReceiverFactory = new HttpEventProtobufReceiverFactory(signalFxEndpoint); + + config().namingConvention(new SignalFxNamingConvention()); + + start(threadFactory); + } + + @Override + protected void publish() { + final long timestamp = clock.wallTime(); + + AggregateMetricSender metricSender = new AggregateMetricSender(this.config.source(), + this.dataPointReceiverFactory, this.eventReceiverFactory, + new StaticAuthToken(this.config.accessToken()), this.onSendErrorHandlerCollection); + + for (List batch : MeterPartition.partition(this, config.batchSize())) { + try (AggregateMetricSender.Session session = metricSender.createSession()) { + batch.stream() + .map(meter -> meter.match( + this::addGauge, + this::addCounter, + this::addTimer, + this::addDistributionSummary, + this::addLongTaskTimer, + this::addTimeGauge, + this::addFunctionCounter, + this::addFunctionTimer, + this::addMeter)) + .flatMap(builders -> builders.map(builder -> builder.setTimestamp(timestamp).build())) + .forEach(session::setDatapoint); + + logger.debug("successfully sent {} metrics to SignalFx.", batch.size()); + } catch (Throwable e) { + logger.warn("failed to send metrics", e); + } + } + } + + private Stream addMeter(Meter meter) { + return stream(meter.measure().spliterator(), false).flatMap(measurement -> { + String statSuffix = NamingConvention.camelCase.tagKey(measurement.getStatistic().toString()); + switch (measurement.getStatistic()) { + case TOTAL: + case TOTAL_TIME: + case COUNT: + case DURATION: + return Stream.of(addDatapoint(meter, COUNTER, statSuffix, measurement.getValue())); + case MAX: + case VALUE: + case UNKNOWN: + case ACTIVE_TASKS: + return Stream.of(addDatapoint(meter, GAUGE, statSuffix, measurement.getValue())); + } + return Stream.empty(); + }); + } + + private SignalFxProtocolBuffers.DataPoint.Builder addDatapoint(Meter meter, SignalFxProtocolBuffers.MetricType metricType, @Nullable String statSuffix, Number value) { + SignalFxProtocolBuffers.Datum.Builder datumBuilder = SignalFxProtocolBuffers.Datum.newBuilder(); + SignalFxProtocolBuffers.Datum datum = (value instanceof Double ? + datumBuilder.setDoubleValue((Double) value) : + datumBuilder.setIntValue(value.longValue()) + ).build(); + + String metricName = config().namingConvention().name(statSuffix == null ? meter.getId().getName() : meter.getId().getName() + "." + statSuffix, + meter.getId().getType(), meter.getId().getBaseUnit()); + + SignalFxProtocolBuffers.DataPoint.Builder dataPointBuilder = SignalFxProtocolBuffers.DataPoint.newBuilder() + .setMetric(metricName) + .setMetricType(metricType) + .setValue(datum); + + for (Tag tag : getConventionTags(meter.getId())) { + dataPointBuilder.addDimensions(SignalFxProtocolBuffers.Dimension.newBuilder() + .setKey(tag.getKey()) + .setValue(tag.getValue()) + .build()); + } + + return dataPointBuilder; + } + + // VisibleForTesting + Stream addLongTaskTimer(LongTaskTimer longTaskTimer) { + return Stream.of( + addDatapoint(longTaskTimer, GAUGE, "activeTasks", longTaskTimer.activeTasks()), + addDatapoint(longTaskTimer, COUNTER, "duration", longTaskTimer.duration(getBaseTimeUnit())) + ); + } + + private Stream addTimeGauge(TimeGauge timeGauge) { + return Stream.of(addDatapoint(timeGauge, GAUGE, null, timeGauge.value(getBaseTimeUnit()))); + } + + private Stream addGauge(Gauge gauge) { + return Stream.of(addDatapoint(gauge, GAUGE, null, gauge.value())); + } + + private Stream addCounter(Counter counter) { + return Stream.of(addDatapoint(counter, COUNTER, null, counter.count())); + } + + private Stream addFunctionCounter(FunctionCounter counter) { + return Stream.of(addDatapoint(counter, COUNTER, null, counter.count())); + } + + private Stream addTimer(Timer timer) { + return Stream.of( + addDatapoint(timer, COUNTER, "count", timer.count()), + addDatapoint(timer, COUNTER, "totalTime", timer.totalTime(getBaseTimeUnit())), + addDatapoint(timer, GAUGE, "avg", timer.mean(getBaseTimeUnit())), + addDatapoint(timer, GAUGE, "max", timer.max(getBaseTimeUnit())) + ); + } + + private Stream addFunctionTimer(FunctionTimer timer) { + return Stream.of( + addDatapoint(timer, COUNTER, "count", timer.count()), + addDatapoint(timer, COUNTER, "totalTime", timer.totalTime(getBaseTimeUnit())), + addDatapoint(timer, GAUGE, "avg", timer.mean(getBaseTimeUnit())) + ); + } + + private Stream addDistributionSummary(DistributionSummary summary) { + return Stream.of( + addDatapoint(summary, COUNTER, "count", summary.count()), + addDatapoint(summary, COUNTER, "totalTime", summary.totalAmount()), + addDatapoint(summary, GAUGE, "avg", summary.mean()), + addDatapoint(summary, GAUGE, "max", summary.max()) + ); + } + + @Override + protected TimeUnit getBaseTimeUnit() { + return TimeUnit.SECONDS; + } +}