diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/calls/routing/TurnCallRouter.java b/service/src/main/java/org/whispersystems/textsecuregcm/calls/routing/TurnCallRouter.java index 23af12dd4..f5a9422b7 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/calls/routing/TurnCallRouter.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/calls/routing/TurnCallRouter.java @@ -8,15 +8,16 @@ package org.whispersystems.textsecuregcm.calls.routing; import com.maxmind.geoip2.DatabaseReader; import com.maxmind.geoip2.exception.GeoIp2Exception; import com.maxmind.geoip2.model.CityResponse; +import org.apache.commons.lang3.tuple.Triple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.util.Util; import javax.annotation.Nonnull; import java.io.IOException; -import java.net.Inet6Address; import java.net.InetAddress; import java.util.*; import java.util.function.Supplier; +import java.util.stream.IntStream; import java.util.stream.Stream; /** @@ -58,6 +59,7 @@ public class TurnCallRouter { * @param aci aci of client * @param clientAddress IP address to base routing on * @param instanceLimit max instances to return options for + * @return Up to two * instanceLimit options, half in ipv4, half in ipv6 */ public TurnServerOptions getRoutingFor( @Nonnull final UUID aci, @@ -121,8 +123,7 @@ public class TurnCallRouter { List urlsWithIps = getUrlsForInstances( selectInstances( datacenters, - instanceLimit, - (clientAddress.get() instanceof Inet6Address) + instanceLimit )); return new TurnServerOptions(hostname, urlsWithIps, minimalRandomUrls()); } @@ -134,25 +135,22 @@ public class TurnCallRouter { .toList(); } - private List selectInstances(List datacenters, int limit, boolean preferV6) { - if(datacenters.isEmpty() || limit == 0) { + // returns balanced number of instances across provided datacenters, prioritizing the datacenters earlier in the list + private List selectInstances(List datacenters, int instanceLimit) { + if(datacenters.isEmpty() || instanceLimit == 0) { return Collections.emptyList(); } - int numV6 = preferV6 ? (limit - limit / 3) : limit / 3; - int numV4 = limit - numV6; CallDnsRecords dnsRecords = this.callDnsRecords.get(); - List ipv4Selection = datacenters.stream() - .flatMap(dc -> randomNOf(dnsRecords.aByRegion().get(dc), limit, stableSelect).stream()) + List> ipv4Options = datacenters.stream() + .map(dc -> randomNOf(dnsRecords.aByRegion().get(dc), instanceLimit, stableSelect)) .toList(); - List ipv6Selection = datacenters.stream() - .flatMap(dc -> randomNOf(dnsRecords.aaaaByRegion().get(dc), limit, stableSelect).stream()) + List> ipv6Options = datacenters.stream() + .map(dc -> randomNOf(dnsRecords.aaaaByRegion().get(dc), instanceLimit, stableSelect)) .toList(); - // increase numV4 if not enough v6 options. vice-versa is also true - numV4 = Math.max(numV4, limit - ipv6Selection.size()); - ipv4Selection = ipv4Selection.stream().limit(numV4).toList(); - ipv6Selection = ipv6Selection.stream().limit(limit - ipv4Selection.size()).toList(); + List ipv4Selection = selectFromOptions(ipv4Options, instanceLimit); + List ipv6Selection = selectFromOptions(ipv6Options, instanceLimit); return Stream.concat( ipv4Selection.stream().map(InetAddress::getHostAddress), @@ -161,6 +159,19 @@ public class TurnCallRouter { ).toList(); } + private static List selectFromOptions(List> recordsByDc, int instanceLimit) { + return IntStream.range(0, recordsByDc.size()) + .mapToObj(dcIndex -> IntStream.range(0, recordsByDc.get(dcIndex).size()) + .mapToObj(addressIndex -> Triple.of(addressIndex, dcIndex, recordsByDc.get(dcIndex).get(addressIndex)))) + .flatMap(i -> i) + .sorted(Comparator.comparingInt((Triple t) -> t.getLeft()) + .thenComparingInt(Triple::getMiddle)) + .limit(instanceLimit) + .sorted(Comparator.comparingInt(Triple::getMiddle)) + .map(Triple::getRight) + .toList(); + } + private static List randomNOf(List values, int n, boolean stableSelect) { return stableSelect ? Util.randomNOfStable(values, n) : Util.randomNOfShuffled(values, n); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/CallRoutingController.java b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/CallRoutingController.java index c394e646d..0ec9f231a 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/controllers/CallRoutingController.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/controllers/CallRoutingController.java @@ -35,7 +35,7 @@ import org.whispersystems.websocket.auth.ReadOnly; @io.swagger.v3.oas.annotations.tags.Tag(name = "Calling") public class CallRoutingController { - private static final int TURN_INSTANCE_LIMIT = 3; + private static final int TURN_INSTANCE_LIMIT = 2; private static final Counter INVALID_IP_COUNTER = Metrics.counter(name(CallRoutingController.class, "invalidIP")); private static final Logger log = LoggerFactory.getLogger(CallRoutingController.class); private final RateLimiters rateLimiters; diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/calls/routing/TurnCallRouterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/calls/routing/TurnCallRouterTest.java index 458d0e11b..ab71087d3 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/calls/routing/TurnCallRouterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/calls/routing/TurnCallRouterTest.java @@ -85,7 +85,12 @@ public class TurnCallRouterTest { InetAddress.getByName("9.9.9.2") ), "dc-performance2", List.of(InetAddress.getByName("9.9.9.3")), - "dc-performance3", List.of(InetAddress.getByName("9.9.9.4")) + "dc-performance3", List.of(InetAddress.getByName("9.9.9.4")), + "dc-performance4", List.of( + InetAddress.getByName("9.9.9.5"), + InetAddress.getByName("9.9.9.6"), + InetAddress.getByName("9.9.9.7") + ) ), Map.of( "dc-manual", List.of(InetAddress.getByName("2222:1111:0:dead::")), @@ -94,7 +99,12 @@ public class TurnCallRouterTest { InetAddress.getByName("2222:1111:0:abc1::") ), "dc-performance2", List.of(InetAddress.getByName("2222:1111:0:abc2::")), - "dc-performance3", List.of(InetAddress.getByName("2222:1111:0:abc3::")) + "dc-performance3", List.of(InetAddress.getByName("2222:1111:0:abc3::")), + "dc-performance4", List.of( + InetAddress.getByName("2222:1111:0:abc4::"), + InetAddress.getByName("2222:1111:0:abc5::"), + InetAddress.getByName("2222:1111:0:abc6::") + ) ) ); } catch (UnknownHostException e) { @@ -205,11 +215,12 @@ public class TurnCallRouterTest { } @Test - public void testLimitReturnsPreferredProtocolAndPrioritizesPerformance() throws UnknownHostException { + public void testLimitPrioritizesBestDataCenters() throws UnknownHostException { when(performanceTable.getDatacentersFor(any(), any(), any(), any())) - .thenReturn(List.of("dc-performance3", "dc-performance2", "dc-performance1")); + .thenReturn(List.of("dc-performance3", "dc-performance2", "dc-performance3")); - assertThat(router().getRoutingFor(aci, Optional.of(InetAddress.getByName("0.0.0.1")), 3)) + // gets one instance from best two datacenters + assertThat(router().getRoutingFor(aci, Optional.of(InetAddress.getByName("0.0.0.1")), 2)) .isEqualTo(optionsWithUrls(List.of( "turn:9.9.9.4", "turn:9.9.9.4:80?transport=tcp", @@ -221,10 +232,14 @@ public class TurnCallRouterTest { "turn:[2222:1111:0:abc3:0:0:0:0]", "turn:[2222:1111:0:abc3:0:0:0:0]:80?transport=tcp", - "turns:[2222:1111:0:abc3:0:0:0:0]:443?transport=tcp" + "turns:[2222:1111:0:abc3:0:0:0:0]:443?transport=tcp", + + "turn:[2222:1111:0:abc2:0:0:0:0]", + "turn:[2222:1111:0:abc2:0:0:0:0]:80?transport=tcp", + "turns:[2222:1111:0:abc2:0:0:0:0]:443?transport=tcp" ))); - assertThat(router().getRoutingFor(aci, Optional.of(InetAddress.getByName("2222:1111:0:abc2:0:0:0:1")), 3)) + assertThat(router().getRoutingFor(aci, Optional.of(InetAddress.getByName("2222:1111:0:abc2:0:0:0:1")), 1)) .isEqualTo(optionsWithUrls(List.of( "turn:9.9.9.4", "turn:9.9.9.4:80?transport=tcp", @@ -232,11 +247,56 @@ public class TurnCallRouterTest { "turn:[2222:1111:0:abc3:0:0:0:0]", "turn:[2222:1111:0:abc3:0:0:0:0]:80?transport=tcp", - "turns:[2222:1111:0:abc3:0:0:0:0]:443?transport=tcp", + "turns:[2222:1111:0:abc3:0:0:0:0]:443?transport=tcp" + ))); + } + + @Test + public void testBackFillsUpToLimit() throws UnknownHostException { + when(performanceTable.getDatacentersFor(any(), any(), any(), any())) + .thenReturn(List.of("dc-performance4", "dc-performance2", "dc-performance3")); + + assertThat(router().getRoutingFor(aci, Optional.of(InetAddress.getByName("0.0.0.1")), 5)) + .isEqualTo(optionsWithUrls(List.of( + "turn:9.9.9.5", + "turn:9.9.9.5:80?transport=tcp", + "turns:9.9.9.5:443?transport=tcp", + + "turn:9.9.9.6", + "turn:9.9.9.6:80?transport=tcp", + "turns:9.9.9.6:443?transport=tcp", + + "turn:9.9.9.7", + "turn:9.9.9.7:80?transport=tcp", + "turns:9.9.9.7:443?transport=tcp", + + "turn:9.9.9.3", + "turn:9.9.9.3:80?transport=tcp", + "turns:9.9.9.3:443?transport=tcp", + + "turn:9.9.9.4", + "turn:9.9.9.4:80?transport=tcp", + "turns:9.9.9.4:443?transport=tcp", + + "turn:[2222:1111:0:abc4:0:0:0:0]", + "turn:[2222:1111:0:abc4:0:0:0:0]:80?transport=tcp", + "turns:[2222:1111:0:abc4:0:0:0:0]:443?transport=tcp", + + "turn:[2222:1111:0:abc5:0:0:0:0]", + "turn:[2222:1111:0:abc5:0:0:0:0]:80?transport=tcp", + "turns:[2222:1111:0:abc5:0:0:0:0]:443?transport=tcp", + + "turn:[2222:1111:0:abc6:0:0:0:0]", + "turn:[2222:1111:0:abc6:0:0:0:0]:80?transport=tcp", + "turns:[2222:1111:0:abc6:0:0:0:0]:443?transport=tcp", "turn:[2222:1111:0:abc2:0:0:0:0]", "turn:[2222:1111:0:abc2:0:0:0:0]:80?transport=tcp", - "turns:[2222:1111:0:abc2:0:0:0:0]:443?transport=tcp" + "turns:[2222:1111:0:abc2:0:0:0:0]:443?transport=tcp", + + "turn:[2222:1111:0:abc3:0:0:0:0]", + "turn:[2222:1111:0:abc3:0:0:0:0]:80?transport=tcp", + "turns:[2222:1111:0:abc3:0:0:0:0]:443?transport=tcp" ))); }