Balance TURN routing options across datacenters
This commit is contained in:
parent
ba12d39121
commit
befcdf55fe
|
@ -8,15 +8,16 @@ package org.whispersystems.textsecuregcm.calls.routing;
|
|||
import com.maxmind.geoip2.DatabaseReader;
|
||||
import com.maxmind.geoip2.exception.GeoIp2Exception;
|
||||
import com.maxmind.geoip2.model.CityResponse;
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.whispersystems.textsecuregcm.util.Util;
|
||||
import javax.annotation.Nonnull;
|
||||
import java.io.IOException;
|
||||
import java.net.Inet6Address;
|
||||
import java.net.InetAddress;
|
||||
import java.util.*;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.IntStream;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/**
|
||||
|
@ -58,6 +59,7 @@ public class TurnCallRouter {
|
|||
* @param aci aci of client
|
||||
* @param clientAddress IP address to base routing on
|
||||
* @param instanceLimit max instances to return options for
|
||||
* @return Up to two * instanceLimit options, half in ipv4, half in ipv6
|
||||
*/
|
||||
public TurnServerOptions getRoutingFor(
|
||||
@Nonnull final UUID aci,
|
||||
|
@ -121,8 +123,7 @@ public class TurnCallRouter {
|
|||
List<String> urlsWithIps = getUrlsForInstances(
|
||||
selectInstances(
|
||||
datacenters,
|
||||
instanceLimit,
|
||||
(clientAddress.get() instanceof Inet6Address)
|
||||
instanceLimit
|
||||
));
|
||||
return new TurnServerOptions(hostname, urlsWithIps, minimalRandomUrls());
|
||||
}
|
||||
|
@ -134,25 +135,22 @@ public class TurnCallRouter {
|
|||
.toList();
|
||||
}
|
||||
|
||||
private List<String> selectInstances(List<String> datacenters, int limit, boolean preferV6) {
|
||||
if(datacenters.isEmpty() || limit == 0) {
|
||||
// returns balanced number of instances across provided datacenters, prioritizing the datacenters earlier in the list
|
||||
private List<String> selectInstances(List<String> datacenters, int instanceLimit) {
|
||||
if(datacenters.isEmpty() || instanceLimit == 0) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
int numV6 = preferV6 ? (limit - limit / 3) : limit / 3;
|
||||
int numV4 = limit - numV6;
|
||||
|
||||
CallDnsRecords dnsRecords = this.callDnsRecords.get();
|
||||
List<InetAddress> ipv4Selection = datacenters.stream()
|
||||
.flatMap(dc -> randomNOf(dnsRecords.aByRegion().get(dc), limit, stableSelect).stream())
|
||||
List<List<InetAddress>> ipv4Options = datacenters.stream()
|
||||
.map(dc -> randomNOf(dnsRecords.aByRegion().get(dc), instanceLimit, stableSelect))
|
||||
.toList();
|
||||
List<InetAddress> ipv6Selection = datacenters.stream()
|
||||
.flatMap(dc -> randomNOf(dnsRecords.aaaaByRegion().get(dc), limit, stableSelect).stream())
|
||||
List<List<InetAddress>> ipv6Options = datacenters.stream()
|
||||
.map(dc -> randomNOf(dnsRecords.aaaaByRegion().get(dc), instanceLimit, stableSelect))
|
||||
.toList();
|
||||
|
||||
// increase numV4 if not enough v6 options. vice-versa is also true
|
||||
numV4 = Math.max(numV4, limit - ipv6Selection.size());
|
||||
ipv4Selection = ipv4Selection.stream().limit(numV4).toList();
|
||||
ipv6Selection = ipv6Selection.stream().limit(limit - ipv4Selection.size()).toList();
|
||||
List<InetAddress> ipv4Selection = selectFromOptions(ipv4Options, instanceLimit);
|
||||
List<InetAddress> ipv6Selection = selectFromOptions(ipv6Options, instanceLimit);
|
||||
|
||||
return Stream.concat(
|
||||
ipv4Selection.stream().map(InetAddress::getHostAddress),
|
||||
|
@ -161,6 +159,19 @@ public class TurnCallRouter {
|
|||
).toList();
|
||||
}
|
||||
|
||||
private static List<InetAddress> selectFromOptions(List<List<InetAddress>> recordsByDc, int instanceLimit) {
|
||||
return IntStream.range(0, recordsByDc.size())
|
||||
.mapToObj(dcIndex -> IntStream.range(0, recordsByDc.get(dcIndex).size())
|
||||
.mapToObj(addressIndex -> Triple.of(addressIndex, dcIndex, recordsByDc.get(dcIndex).get(addressIndex))))
|
||||
.flatMap(i -> i)
|
||||
.sorted(Comparator.comparingInt((Triple<Integer, Integer, InetAddress> t) -> t.getLeft())
|
||||
.thenComparingInt(Triple::getMiddle))
|
||||
.limit(instanceLimit)
|
||||
.sorted(Comparator.comparingInt(Triple::getMiddle))
|
||||
.map(Triple::getRight)
|
||||
.toList();
|
||||
}
|
||||
|
||||
private static <E> List<E> randomNOf(List<E> values, int n, boolean stableSelect) {
|
||||
return stableSelect ? Util.randomNOfStable(values, n) : Util.randomNOfShuffled(values, n);
|
||||
}
|
||||
|
|
|
@ -35,7 +35,7 @@ import org.whispersystems.websocket.auth.ReadOnly;
|
|||
@io.swagger.v3.oas.annotations.tags.Tag(name = "Calling")
|
||||
public class CallRoutingController {
|
||||
|
||||
private static final int TURN_INSTANCE_LIMIT = 3;
|
||||
private static final int TURN_INSTANCE_LIMIT = 2;
|
||||
private static final Counter INVALID_IP_COUNTER = Metrics.counter(name(CallRoutingController.class, "invalidIP"));
|
||||
private static final Logger log = LoggerFactory.getLogger(CallRoutingController.class);
|
||||
private final RateLimiters rateLimiters;
|
||||
|
|
|
@ -85,7 +85,12 @@ public class TurnCallRouterTest {
|
|||
InetAddress.getByName("9.9.9.2")
|
||||
),
|
||||
"dc-performance2", List.of(InetAddress.getByName("9.9.9.3")),
|
||||
"dc-performance3", List.of(InetAddress.getByName("9.9.9.4"))
|
||||
"dc-performance3", List.of(InetAddress.getByName("9.9.9.4")),
|
||||
"dc-performance4", List.of(
|
||||
InetAddress.getByName("9.9.9.5"),
|
||||
InetAddress.getByName("9.9.9.6"),
|
||||
InetAddress.getByName("9.9.9.7")
|
||||
)
|
||||
),
|
||||
Map.of(
|
||||
"dc-manual", List.of(InetAddress.getByName("2222:1111:0:dead::")),
|
||||
|
@ -94,7 +99,12 @@ public class TurnCallRouterTest {
|
|||
InetAddress.getByName("2222:1111:0:abc1::")
|
||||
),
|
||||
"dc-performance2", List.of(InetAddress.getByName("2222:1111:0:abc2::")),
|
||||
"dc-performance3", List.of(InetAddress.getByName("2222:1111:0:abc3::"))
|
||||
"dc-performance3", List.of(InetAddress.getByName("2222:1111:0:abc3::")),
|
||||
"dc-performance4", List.of(
|
||||
InetAddress.getByName("2222:1111:0:abc4::"),
|
||||
InetAddress.getByName("2222:1111:0:abc5::"),
|
||||
InetAddress.getByName("2222:1111:0:abc6::")
|
||||
)
|
||||
)
|
||||
);
|
||||
} catch (UnknownHostException e) {
|
||||
|
@ -205,11 +215,12 @@ public class TurnCallRouterTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testLimitReturnsPreferredProtocolAndPrioritizesPerformance() throws UnknownHostException {
|
||||
public void testLimitPrioritizesBestDataCenters() throws UnknownHostException {
|
||||
when(performanceTable.getDatacentersFor(any(), any(), any(), any()))
|
||||
.thenReturn(List.of("dc-performance3", "dc-performance2", "dc-performance1"));
|
||||
.thenReturn(List.of("dc-performance3", "dc-performance2", "dc-performance3"));
|
||||
|
||||
assertThat(router().getRoutingFor(aci, Optional.of(InetAddress.getByName("0.0.0.1")), 3))
|
||||
// gets one instance from best two datacenters
|
||||
assertThat(router().getRoutingFor(aci, Optional.of(InetAddress.getByName("0.0.0.1")), 2))
|
||||
.isEqualTo(optionsWithUrls(List.of(
|
||||
"turn:9.9.9.4",
|
||||
"turn:9.9.9.4:80?transport=tcp",
|
||||
|
@ -221,10 +232,14 @@ public class TurnCallRouterTest {
|
|||
|
||||
"turn:[2222:1111:0:abc3:0:0:0:0]",
|
||||
"turn:[2222:1111:0:abc3:0:0:0:0]:80?transport=tcp",
|
||||
"turns:[2222:1111:0:abc3:0:0:0:0]:443?transport=tcp"
|
||||
"turns:[2222:1111:0:abc3:0:0:0:0]:443?transport=tcp",
|
||||
|
||||
"turn:[2222:1111:0:abc2:0:0:0:0]",
|
||||
"turn:[2222:1111:0:abc2:0:0:0:0]:80?transport=tcp",
|
||||
"turns:[2222:1111:0:abc2:0:0:0:0]:443?transport=tcp"
|
||||
)));
|
||||
|
||||
assertThat(router().getRoutingFor(aci, Optional.of(InetAddress.getByName("2222:1111:0:abc2:0:0:0:1")), 3))
|
||||
assertThat(router().getRoutingFor(aci, Optional.of(InetAddress.getByName("2222:1111:0:abc2:0:0:0:1")), 1))
|
||||
.isEqualTo(optionsWithUrls(List.of(
|
||||
"turn:9.9.9.4",
|
||||
"turn:9.9.9.4:80?transport=tcp",
|
||||
|
@ -232,11 +247,56 @@ public class TurnCallRouterTest {
|
|||
|
||||
"turn:[2222:1111:0:abc3:0:0:0:0]",
|
||||
"turn:[2222:1111:0:abc3:0:0:0:0]:80?transport=tcp",
|
||||
"turns:[2222:1111:0:abc3:0:0:0:0]:443?transport=tcp",
|
||||
"turns:[2222:1111:0:abc3:0:0:0:0]:443?transport=tcp"
|
||||
)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBackFillsUpToLimit() throws UnknownHostException {
|
||||
when(performanceTable.getDatacentersFor(any(), any(), any(), any()))
|
||||
.thenReturn(List.of("dc-performance4", "dc-performance2", "dc-performance3"));
|
||||
|
||||
assertThat(router().getRoutingFor(aci, Optional.of(InetAddress.getByName("0.0.0.1")), 5))
|
||||
.isEqualTo(optionsWithUrls(List.of(
|
||||
"turn:9.9.9.5",
|
||||
"turn:9.9.9.5:80?transport=tcp",
|
||||
"turns:9.9.9.5:443?transport=tcp",
|
||||
|
||||
"turn:9.9.9.6",
|
||||
"turn:9.9.9.6:80?transport=tcp",
|
||||
"turns:9.9.9.6:443?transport=tcp",
|
||||
|
||||
"turn:9.9.9.7",
|
||||
"turn:9.9.9.7:80?transport=tcp",
|
||||
"turns:9.9.9.7:443?transport=tcp",
|
||||
|
||||
"turn:9.9.9.3",
|
||||
"turn:9.9.9.3:80?transport=tcp",
|
||||
"turns:9.9.9.3:443?transport=tcp",
|
||||
|
||||
"turn:9.9.9.4",
|
||||
"turn:9.9.9.4:80?transport=tcp",
|
||||
"turns:9.9.9.4:443?transport=tcp",
|
||||
|
||||
"turn:[2222:1111:0:abc4:0:0:0:0]",
|
||||
"turn:[2222:1111:0:abc4:0:0:0:0]:80?transport=tcp",
|
||||
"turns:[2222:1111:0:abc4:0:0:0:0]:443?transport=tcp",
|
||||
|
||||
"turn:[2222:1111:0:abc5:0:0:0:0]",
|
||||
"turn:[2222:1111:0:abc5:0:0:0:0]:80?transport=tcp",
|
||||
"turns:[2222:1111:0:abc5:0:0:0:0]:443?transport=tcp",
|
||||
|
||||
"turn:[2222:1111:0:abc6:0:0:0:0]",
|
||||
"turn:[2222:1111:0:abc6:0:0:0:0]:80?transport=tcp",
|
||||
"turns:[2222:1111:0:abc6:0:0:0:0]:443?transport=tcp",
|
||||
|
||||
"turn:[2222:1111:0:abc2:0:0:0:0]",
|
||||
"turn:[2222:1111:0:abc2:0:0:0:0]:80?transport=tcp",
|
||||
"turns:[2222:1111:0:abc2:0:0:0:0]:443?transport=tcp"
|
||||
"turns:[2222:1111:0:abc2:0:0:0:0]:443?transport=tcp",
|
||||
|
||||
"turn:[2222:1111:0:abc3:0:0:0:0]",
|
||||
"turn:[2222:1111:0:abc3:0:0:0:0]:80?transport=tcp",
|
||||
"turns:[2222:1111:0:abc3:0:0:0:0]:443?transport=tcp"
|
||||
)));
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue