Remove a candidate metric provider.
This commit is contained in:
parent
34bf5112e0
commit
a709a3bcc0
|
@ -80,18 +80,6 @@
|
|||
<version>1.5.3</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.micrometer</groupId>
|
||||
<artifactId>micrometer-registry-signalfx</artifactId>
|
||||
<version>1.5.3</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.signalfx.public</groupId>
|
||||
<artifactId>signalfx-codahale</artifactId>
|
||||
<version>1.0.4</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.amazonaws</groupId>
|
||||
<artifactId>aws-java-sdk-s3</artifactId>
|
||||
|
|
|
@ -84,7 +84,7 @@ public class WhisperServerConfiguration extends Configuration {
|
|||
@NotNull
|
||||
@Valid
|
||||
@JsonProperty
|
||||
private List<MicrometerConfiguration> micrometer = new LinkedList<>();
|
||||
private MicrometerConfiguration micrometer;
|
||||
|
||||
@NotNull
|
||||
@Valid
|
||||
|
@ -309,14 +309,8 @@ public class WhisperServerConfiguration extends Configuration {
|
|||
return cdn;
|
||||
}
|
||||
|
||||
public Map<String, MicrometerConfiguration> getMicrometerConfiguration() {
|
||||
final Map<String, MicrometerConfiguration> micrometerConfigurationByName = new HashMap<>();
|
||||
|
||||
for (final MicrometerConfiguration micrometerConfiguration : micrometer) {
|
||||
micrometerConfigurationByName.put(micrometerConfiguration.getName(), micrometerConfiguration);
|
||||
}
|
||||
|
||||
return micrometerConfigurationByName;
|
||||
public MicrometerConfiguration getMicrometerConfiguration() {
|
||||
return micrometer;
|
||||
}
|
||||
|
||||
public UnidentifiedDeliveryConfiguration getDeliveryCertificate() {
|
||||
|
|
|
@ -22,7 +22,6 @@ import com.amazonaws.auth.AWSStaticCredentialsProvider;
|
|||
import com.amazonaws.auth.BasicAWSCredentials;
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
import com.amazonaws.services.s3.AmazonS3Client;
|
||||
import com.amazonaws.util.EC2MetadataUtils;
|
||||
import com.codahale.metrics.SharedMetricRegistries;
|
||||
import com.codahale.metrics.jdbi3.strategies.DefaultNameStrategy;
|
||||
import com.fasterxml.jackson.annotation.JsonAutoDetect;
|
||||
|
@ -42,12 +41,8 @@ import io.dropwizard.jdbi3.JdbiFactory;
|
|||
import io.dropwizard.setup.Bootstrap;
|
||||
import io.dropwizard.setup.Environment;
|
||||
import io.micrometer.core.instrument.Clock;
|
||||
import io.micrometer.core.instrument.ImmutableTag;
|
||||
import io.micrometer.core.instrument.Meter;
|
||||
import io.micrometer.core.instrument.Metrics;
|
||||
import io.micrometer.core.instrument.Tag;
|
||||
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
|
||||
import io.micrometer.signalfx.SignalFxConfig;
|
||||
import io.micrometer.wavefront.WavefrontConfig;
|
||||
import io.micrometer.wavefront.WavefrontMeterRegistry;
|
||||
import org.bouncycastle.jce.provider.BouncyCastleProvider;
|
||||
|
@ -95,7 +90,6 @@ import org.whispersystems.textsecuregcm.metrics.MetricsApplicationEventListener;
|
|||
import org.whispersystems.textsecuregcm.metrics.NetworkReceivedGauge;
|
||||
import org.whispersystems.textsecuregcm.metrics.NetworkSentGauge;
|
||||
import org.whispersystems.textsecuregcm.metrics.PushLatencyManager;
|
||||
import org.whispersystems.textsecuregcm.metrics.SignalSignalFxMeterRegistry;
|
||||
import org.whispersystems.textsecuregcm.metrics.TrafficSource;
|
||||
import org.whispersystems.textsecuregcm.providers.RedisClientFactory;
|
||||
import org.whispersystems.textsecuregcm.providers.RedisClusterHealthCheck;
|
||||
|
@ -162,7 +156,6 @@ import org.whispersystems.textsecuregcm.workers.ZkParamsCommand;
|
|||
import org.whispersystems.websocket.WebSocketResourceProviderFactory;
|
||||
import org.whispersystems.websocket.setup.WebSocketEnvironment;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.servlet.DispatcherType;
|
||||
import javax.servlet.FilterRegistration;
|
||||
import javax.servlet.ServletRegistration;
|
||||
|
@ -225,79 +218,32 @@ public class WhisperServerService extends Application<WhisperServerConfiguration
|
|||
public void run(WhisperServerConfiguration config, Environment environment)
|
||||
throws Exception
|
||||
{
|
||||
final String instanceId = EC2MetadataUtils.getInstanceId();
|
||||
SharedMetricRegistries.add(Constants.METRICS_NAME, environment.metrics());
|
||||
|
||||
final Map<String, MicrometerConfiguration> micrometerConfigurationByName = config.getMicrometerConfiguration();
|
||||
Metrics.addRegistry(new WavefrontMeterRegistry(new WavefrontConfig() {
|
||||
@Override
|
||||
public String get(final String key) {
|
||||
return null;
|
||||
}
|
||||
|
||||
{
|
||||
final MicrometerConfiguration micrometerWavefrontConfig = micrometerConfigurationByName.get("wavefront");
|
||||
@Override
|
||||
public String uri() {
|
||||
return config.getMicrometerConfiguration().getUri();
|
||||
}
|
||||
|
||||
Metrics.addRegistry(new WavefrontMeterRegistry(new WavefrontConfig() {
|
||||
@Override
|
||||
public String get(final String key) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String uri() {
|
||||
return micrometerWavefrontConfig.getUri();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String apiToken() {
|
||||
return micrometerWavefrontConfig.getApiKey();
|
||||
}
|
||||
}, Clock.SYSTEM) {
|
||||
@Override
|
||||
protected DistributionStatisticConfig defaultHistogramConfig() {
|
||||
return DistributionStatisticConfig.builder()
|
||||
.percentiles(.75, .95, .99, .999)
|
||||
.build()
|
||||
.merge(super.defaultHistogramConfig());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
{
|
||||
final MicrometerConfiguration micrometerSignalfxConfig = micrometerConfigurationByName.get("signalfx");
|
||||
Metrics.addRegistry(new SignalSignalFxMeterRegistry(new SignalFxConfig() {
|
||||
@Override
|
||||
public String get(String key) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String accessToken() {
|
||||
return micrometerSignalfxConfig.getApiKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String source() {
|
||||
return instanceId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String uri() {
|
||||
return micrometerSignalfxConfig.getUri();
|
||||
}
|
||||
}, Clock.SYSTEM) {
|
||||
@Override
|
||||
protected List<Tag> getConventionTags(@Nonnull Meter.Id id) {
|
||||
final List<Tag> tags = super.getConventionTags(id);
|
||||
tags.add(new ImmutableTag("environment", micrometerSignalfxConfig.getEnvironment()));
|
||||
return tags;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DistributionStatisticConfig defaultHistogramConfig() {
|
||||
return DistributionStatisticConfig.builder()
|
||||
.percentiles(.75, .95, .99, .999)
|
||||
.build()
|
||||
.merge(super.defaultHistogramConfig());
|
||||
}
|
||||
});
|
||||
}
|
||||
@Override
|
||||
public String apiToken() {
|
||||
return config.getMicrometerConfiguration().getApiKey();
|
||||
}
|
||||
}, Clock.SYSTEM) {
|
||||
@Override
|
||||
protected DistributionStatisticConfig defaultHistogramConfig() {
|
||||
return DistributionStatisticConfig.builder()
|
||||
.percentiles(.75, .95, .99, .999)
|
||||
.build()
|
||||
.merge(super.defaultHistogramConfig());
|
||||
}
|
||||
});
|
||||
|
||||
environment.getObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
environment.getObjectMapper().setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.NONE);
|
||||
|
|
|
@ -6,10 +6,6 @@ import javax.validation.constraints.NotEmpty;
|
|||
|
||||
public class MicrometerConfiguration {
|
||||
|
||||
@JsonProperty
|
||||
@NotEmpty
|
||||
private String name;
|
||||
|
||||
@JsonProperty
|
||||
private String uri;
|
||||
|
||||
|
@ -17,13 +13,6 @@ public class MicrometerConfiguration {
|
|||
@NotEmpty
|
||||
private String apiKey;
|
||||
|
||||
@JsonProperty
|
||||
private String environment;
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public String getUri() {
|
||||
return uri;
|
||||
}
|
||||
|
@ -31,8 +20,4 @@ public class MicrometerConfiguration {
|
|||
public String getApiKey() {
|
||||
return apiKey;
|
||||
}
|
||||
|
||||
public String getEnvironment() {
|
||||
return environment;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,216 +0,0 @@
|
|||
package org.whispersystems.textsecuregcm.metrics;
|
||||
|
||||
import com.signalfx.endpoint.SignalFxEndpoint;
|
||||
import com.signalfx.endpoint.SignalFxReceiverEndpoint;
|
||||
import com.signalfx.metrics.SignalFxMetricsException;
|
||||
import com.signalfx.metrics.auth.StaticAuthToken;
|
||||
import com.signalfx.metrics.connection.HttpDataPointProtobufReceiverFactory;
|
||||
import com.signalfx.metrics.connection.HttpEventProtobufReceiverFactory;
|
||||
import com.signalfx.metrics.errorhandler.OnSendErrorHandler;
|
||||
import com.signalfx.metrics.flush.AggregateMetricSender;
|
||||
import com.signalfx.metrics.protobuf.SignalFxProtocolBuffers;
|
||||
import io.micrometer.core.instrument.Clock;
|
||||
import io.micrometer.core.instrument.Counter;
|
||||
import io.micrometer.core.instrument.DistributionSummary;
|
||||
import io.micrometer.core.instrument.FunctionCounter;
|
||||
import io.micrometer.core.instrument.FunctionTimer;
|
||||
import io.micrometer.core.instrument.Gauge;
|
||||
import io.micrometer.core.instrument.LongTaskTimer;
|
||||
import io.micrometer.core.instrument.Meter;
|
||||
import io.micrometer.core.instrument.Tag;
|
||||
import io.micrometer.core.instrument.TimeGauge;
|
||||
import io.micrometer.core.instrument.Timer;
|
||||
import io.micrometer.core.instrument.config.NamingConvention;
|
||||
import io.micrometer.core.instrument.step.StepMeterRegistry;
|
||||
import io.micrometer.core.instrument.util.MeterPartition;
|
||||
import io.micrometer.core.instrument.util.NamedThreadFactory;
|
||||
import io.micrometer.core.lang.Nullable;
|
||||
import io.micrometer.signalfx.SignalFxConfig;
|
||||
import io.micrometer.signalfx.SignalFxNamingConvention;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static com.signalfx.metrics.protobuf.SignalFxProtocolBuffers.MetricType.COUNTER;
|
||||
import static com.signalfx.metrics.protobuf.SignalFxProtocolBuffers.MetricType.GAUGE;
|
||||
import static java.util.stream.StreamSupport.stream;
|
||||
|
||||
public class SignalSignalFxMeterRegistry extends StepMeterRegistry {
|
||||
private static final ThreadFactory DEFAULT_THREAD_FACTORY = new NamedThreadFactory("signalfx-metrics-publisher");
|
||||
private final Logger logger = LoggerFactory.getLogger(SignalSignalFxMeterRegistry.class);
|
||||
private final SignalFxConfig config;
|
||||
private final HttpDataPointProtobufReceiverFactory dataPointReceiverFactory;
|
||||
private final HttpEventProtobufReceiverFactory eventReceiverFactory;
|
||||
private final Set<OnSendErrorHandler> onSendErrorHandlerCollection = Collections.singleton(metricError -> {
|
||||
final SignalFxMetricsException exception = metricError.getException();
|
||||
if (exception != null ) {
|
||||
this.logger.warn("failed to send metrics: " + metricError.getMessage(), exception);
|
||||
} else {
|
||||
this.logger.warn("failed to send metrics: {}", metricError.getMessage());
|
||||
}
|
||||
});
|
||||
|
||||
public SignalSignalFxMeterRegistry(SignalFxConfig config, Clock clock) {
|
||||
this(config, clock, DEFAULT_THREAD_FACTORY);
|
||||
}
|
||||
|
||||
public SignalSignalFxMeterRegistry(SignalFxConfig config, Clock clock, ThreadFactory threadFactory) {
|
||||
super(config, clock);
|
||||
this.config = config;
|
||||
|
||||
URI apiUri = URI.create(config.uri());
|
||||
int port = apiUri.getPort();
|
||||
if (port == -1) {
|
||||
if ("http" .equals(apiUri.getScheme())) {
|
||||
port = 80;
|
||||
} else if ("https" .equals(apiUri.getScheme())) {
|
||||
port = 443;
|
||||
}
|
||||
}
|
||||
|
||||
SignalFxReceiverEndpoint signalFxEndpoint = new SignalFxEndpoint(apiUri.getScheme(), apiUri.getHost(), port);
|
||||
this.dataPointReceiverFactory = new HttpDataPointProtobufReceiverFactory(signalFxEndpoint);
|
||||
this.eventReceiverFactory = new HttpEventProtobufReceiverFactory(signalFxEndpoint);
|
||||
|
||||
config().namingConvention(new SignalFxNamingConvention());
|
||||
|
||||
start(threadFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void publish() {
|
||||
final long timestamp = clock.wallTime();
|
||||
|
||||
AggregateMetricSender metricSender = new AggregateMetricSender(this.config.source(),
|
||||
this.dataPointReceiverFactory, this.eventReceiverFactory,
|
||||
new StaticAuthToken(this.config.accessToken()), this.onSendErrorHandlerCollection);
|
||||
|
||||
for (List<Meter> batch : MeterPartition.partition(this, config.batchSize())) {
|
||||
try (AggregateMetricSender.Session session = metricSender.createSession()) {
|
||||
batch.stream()
|
||||
.map(meter -> meter.match(
|
||||
this::addGauge,
|
||||
this::addCounter,
|
||||
this::addTimer,
|
||||
this::addDistributionSummary,
|
||||
this::addLongTaskTimer,
|
||||
this::addTimeGauge,
|
||||
this::addFunctionCounter,
|
||||
this::addFunctionTimer,
|
||||
this::addMeter))
|
||||
.flatMap(builders -> builders.map(builder -> builder.setTimestamp(timestamp).build()))
|
||||
.forEach(session::setDatapoint);
|
||||
|
||||
logger.debug("successfully sent {} metrics to SignalFx.", batch.size());
|
||||
} catch (Throwable e) {
|
||||
logger.warn("failed to send metrics", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Stream<SignalFxProtocolBuffers.DataPoint.Builder> addMeter(Meter meter) {
|
||||
return stream(meter.measure().spliterator(), false).flatMap(measurement -> {
|
||||
String statSuffix = NamingConvention.camelCase.tagKey(measurement.getStatistic().toString());
|
||||
switch (measurement.getStatistic()) {
|
||||
case TOTAL:
|
||||
case TOTAL_TIME:
|
||||
case COUNT:
|
||||
case DURATION:
|
||||
return Stream.of(addDatapoint(meter, COUNTER, statSuffix, measurement.getValue()));
|
||||
case MAX:
|
||||
case VALUE:
|
||||
case UNKNOWN:
|
||||
case ACTIVE_TASKS:
|
||||
return Stream.of(addDatapoint(meter, GAUGE, statSuffix, measurement.getValue()));
|
||||
}
|
||||
return Stream.empty();
|
||||
});
|
||||
}
|
||||
|
||||
private SignalFxProtocolBuffers.DataPoint.Builder addDatapoint(Meter meter, SignalFxProtocolBuffers.MetricType metricType, @Nullable String statSuffix, Number value) {
|
||||
SignalFxProtocolBuffers.Datum.Builder datumBuilder = SignalFxProtocolBuffers.Datum.newBuilder();
|
||||
SignalFxProtocolBuffers.Datum datum = (value instanceof Double ?
|
||||
datumBuilder.setDoubleValue((Double) value) :
|
||||
datumBuilder.setIntValue(value.longValue())
|
||||
).build();
|
||||
|
||||
String metricName = config().namingConvention().name(statSuffix == null ? meter.getId().getName() : meter.getId().getName() + "." + statSuffix,
|
||||
meter.getId().getType(), meter.getId().getBaseUnit());
|
||||
|
||||
SignalFxProtocolBuffers.DataPoint.Builder dataPointBuilder = SignalFxProtocolBuffers.DataPoint.newBuilder()
|
||||
.setMetric(metricName)
|
||||
.setMetricType(metricType)
|
||||
.setValue(datum);
|
||||
|
||||
for (Tag tag : getConventionTags(meter.getId())) {
|
||||
dataPointBuilder.addDimensions(SignalFxProtocolBuffers.Dimension.newBuilder()
|
||||
.setKey(tag.getKey())
|
||||
.setValue(tag.getValue())
|
||||
.build());
|
||||
}
|
||||
|
||||
return dataPointBuilder;
|
||||
}
|
||||
|
||||
// VisibleForTesting
|
||||
Stream<SignalFxProtocolBuffers.DataPoint.Builder> addLongTaskTimer(LongTaskTimer longTaskTimer) {
|
||||
return Stream.of(
|
||||
addDatapoint(longTaskTimer, GAUGE, "activeTasks", longTaskTimer.activeTasks()),
|
||||
addDatapoint(longTaskTimer, COUNTER, "duration", longTaskTimer.duration(getBaseTimeUnit()))
|
||||
);
|
||||
}
|
||||
|
||||
private Stream<SignalFxProtocolBuffers.DataPoint.Builder> addTimeGauge(TimeGauge timeGauge) {
|
||||
return Stream.of(addDatapoint(timeGauge, GAUGE, null, timeGauge.value(getBaseTimeUnit())));
|
||||
}
|
||||
|
||||
private Stream<SignalFxProtocolBuffers.DataPoint.Builder> addGauge(Gauge gauge) {
|
||||
return Stream.of(addDatapoint(gauge, GAUGE, null, gauge.value()));
|
||||
}
|
||||
|
||||
private Stream<SignalFxProtocolBuffers.DataPoint.Builder> addCounter(Counter counter) {
|
||||
return Stream.of(addDatapoint(counter, COUNTER, null, counter.count()));
|
||||
}
|
||||
|
||||
private Stream<SignalFxProtocolBuffers.DataPoint.Builder> addFunctionCounter(FunctionCounter counter) {
|
||||
return Stream.of(addDatapoint(counter, COUNTER, null, counter.count()));
|
||||
}
|
||||
|
||||
private Stream<SignalFxProtocolBuffers.DataPoint.Builder> addTimer(Timer timer) {
|
||||
return Stream.of(
|
||||
addDatapoint(timer, COUNTER, "count", timer.count()),
|
||||
addDatapoint(timer, COUNTER, "totalTime", timer.totalTime(getBaseTimeUnit())),
|
||||
addDatapoint(timer, GAUGE, "avg", timer.mean(getBaseTimeUnit())),
|
||||
addDatapoint(timer, GAUGE, "max", timer.max(getBaseTimeUnit()))
|
||||
);
|
||||
}
|
||||
|
||||
private Stream<SignalFxProtocolBuffers.DataPoint.Builder> addFunctionTimer(FunctionTimer timer) {
|
||||
return Stream.of(
|
||||
addDatapoint(timer, COUNTER, "count", timer.count()),
|
||||
addDatapoint(timer, COUNTER, "totalTime", timer.totalTime(getBaseTimeUnit())),
|
||||
addDatapoint(timer, GAUGE, "avg", timer.mean(getBaseTimeUnit()))
|
||||
);
|
||||
}
|
||||
|
||||
private Stream<SignalFxProtocolBuffers.DataPoint.Builder> addDistributionSummary(DistributionSummary summary) {
|
||||
return Stream.of(
|
||||
addDatapoint(summary, COUNTER, "count", summary.count()),
|
||||
addDatapoint(summary, COUNTER, "totalTime", summary.totalAmount()),
|
||||
addDatapoint(summary, GAUGE, "avg", summary.mean()),
|
||||
addDatapoint(summary, GAUGE, "max", summary.max())
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TimeUnit getBaseTimeUnit() {
|
||||
return TimeUnit.SECONDS;
|
||||
}
|
||||
}
|
|
@ -1,39 +0,0 @@
|
|||
package org.whispersystems.textsecuregcm.metrics;
|
||||
|
||||
import com.amazonaws.util.EC2MetadataUtils;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.ScheduledReporter;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import com.signalfx.codahale.reporter.SignalFxReporter;
|
||||
import com.signalfx.endpoint.SignalFxEndpoint;
|
||||
import com.signalfx.metrics.auth.StaticAuthToken;
|
||||
import io.dropwizard.metrics.BaseReporterFactory;
|
||||
|
||||
import javax.validation.constraints.NotEmpty;
|
||||
|
||||
@JsonTypeName("signalsignalfx")
|
||||
public class SignalSignalfxReporterFactory extends BaseReporterFactory {
|
||||
|
||||
@JsonProperty
|
||||
@NotEmpty
|
||||
private String authToken = null;
|
||||
|
||||
@JsonProperty
|
||||
@NotEmpty
|
||||
private String environment = null;
|
||||
|
||||
@JsonProperty
|
||||
@NotEmpty
|
||||
private String hostname = null;
|
||||
|
||||
public ScheduledReporter build(MetricRegistry registry) {
|
||||
return new SignalFxReporter.Builder(registry, new StaticAuthToken(authToken), EC2MetadataUtils.getInstanceId())
|
||||
.addDimension("environment", environment)
|
||||
.setEndpoint(new SignalFxEndpoint(SignalFxEndpoint.DEFAULT_SCHEME, hostname, SignalFxEndpoint.DEFAULT_PORT))
|
||||
.setFilter(getFilter())
|
||||
.setDurationUnit(getDurationUnit())
|
||||
.setRateUnit(getRateUnit())
|
||||
.build();
|
||||
}
|
||||
}
|
|
@ -1,2 +1 @@
|
|||
org.whispersystems.textsecuregcm.metrics.JsonMetricsReporterFactory
|
||||
org.whispersystems.textsecuregcm.metrics.SignalSignalfxReporterFactory
|
||||
|
|
Loading…
Reference in New Issue