Skip to content

Commit

Permalink
[fix][client] Fix concurrent lookup with properties might have differ…
Browse files Browse the repository at this point in the history
…ent results (#23260)
  • Loading branch information
BewareMyPower authored Sep 6, 2024
1 parent 246647f commit 1439529
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,30 @@
package org.apache.pulsar.client.api;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.MultiBrokerBaseTest;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.client.impl.LookupTopicResult;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -72,6 +79,7 @@ private static ServiceConfiguration addCustomConfigs(ServiceConfiguration config

@Test
public void testLookupProperty() throws Exception {
admin.namespaces().unload("public/default");
final var topic = "test-lookup-property";
admin.topics().createPartitionedTopic(topic, 16);
@Cleanup final var client = (PulsarClientImpl) PulsarClient.builder()
Expand All @@ -89,7 +97,35 @@ public void testLookupProperty() throws Exception {
Assert.assertEquals(port, additionalBrokers.get(0).getBrokerListenPort().orElseThrow());
}

@Test
public void testConcurrentLookupProperties() throws Exception {
@Cleanup final var client = (PulsarClientImpl) PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.build();
final var futures = new ArrayList<CompletableFuture<LookupTopicResult>>();
BrokerIdAwareLoadManager.clientIdList.clear();

final var clientIdList = IntStream.range(0, 10).mapToObj(i -> "key-" + i).toList();
for (var clientId : clientIdList) {
client.getConfiguration().setLookupProperties(Collections.singletonMap(CLIENT_KEY, clientId));
futures.add(client.getLookup().getBroker(TopicName.get("test-concurrent-lookup-properties")));
client.getConfiguration().setLookupProperties(Collections.emptyMap());
}
FutureUtil.waitForAll(futures).get();
Assert.assertEquals(clientIdList, BrokerIdAwareLoadManager.clientIdList);
}

public static class BrokerIdAwareLoadManager extends ExtensibleLoadManagerImpl {

static final List<String> clientIdList = Collections.synchronizedList(new ArrayList<>());

@Override
public CompletableFuture<Optional<BrokerLookupData>> assign(Optional<ServiceUnitId> topic,
ServiceUnitId serviceUnit, LookupOptions options) {
getClientId(options).ifPresent(clientIdList::add);
return super.assign(topic, serviceUnit, options);
}

@Override
public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle, Set<String> excludeBrokerSet,
LookupOptions options) {
Expand All @@ -106,5 +142,12 @@ public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle, Set
.orElseGet(() -> super.selectAsync(bundle, excludeBrokerSet, options));
});
}

private static Optional<String> getClientId(LookupOptions options) {
if (options.getProperties() == null) {
return Optional.empty();
}
return Optional.ofNullable(options.getProperties().get(CLIENT_KEY));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.opentelemetry.api.common.Attributes;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -32,6 +33,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
Expand Down Expand Up @@ -60,7 +62,7 @@ public class BinaryProtoLookupService implements LookupService {
private final String listenerName;
private final int maxLookupRedirects;

private final ConcurrentHashMap<TopicName, CompletableFuture<LookupTopicResult>>
private final ConcurrentHashMap<Pair<TopicName, Map<String, String>>, CompletableFuture<LookupTopicResult>>
lookupInProgress = new ConcurrentHashMap<>();

private final ConcurrentHashMap<TopicName, CompletableFuture<PartitionedTopicMetadata>>
Expand Down Expand Up @@ -118,10 +120,12 @@ public void updateServiceUrl(String serviceUrl) throws PulsarClientException {
public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName) {
long startTime = System.nanoTime();
final MutableObject<CompletableFuture> newFutureCreated = new MutableObject<>();
final Pair<TopicName, Map<String, String>> key = Pair.of(topicName,
client.getConfiguration().getLookupProperties());
try {
return lookupInProgress.computeIfAbsent(topicName, tpName -> {
CompletableFuture<LookupTopicResult> newFuture =
findBroker(serviceNameResolver.resolveHost(), false, topicName, 0);
return lookupInProgress.computeIfAbsent(key, tpName -> {
CompletableFuture<LookupTopicResult> newFuture = findBroker(serviceNameResolver.resolveHost(), false,
topicName, 0, key.getRight());
newFutureCreated.setValue(newFuture);

newFuture.thenRun(() -> {
Expand All @@ -135,7 +139,7 @@ public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName) {
} finally {
if (newFutureCreated.getValue() != null) {
newFutureCreated.getValue().whenComplete((v, ex) -> {
lookupInProgress.remove(topicName, newFutureCreated.getValue());
lookupInProgress.remove(key, newFutureCreated.getValue());
});
}
}
Expand Down Expand Up @@ -167,7 +171,7 @@ public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
}

private CompletableFuture<LookupTopicResult> findBroker(InetSocketAddress socketAddress,
boolean authoritative, TopicName topicName, final int redirectCount) {
boolean authoritative, TopicName topicName, final int redirectCount, Map<String, String> properties) {
CompletableFuture<LookupTopicResult> addressFuture = new CompletableFuture<>();

if (maxLookupRedirects > 0 && redirectCount > maxLookupRedirects) {
Expand All @@ -179,7 +183,7 @@ private CompletableFuture<LookupTopicResult> findBroker(InetSocketAddress socket
client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
long requestId = client.newRequestId();
ByteBuf request = Commands.newLookup(topicName.toString(), listenerName, authoritative, requestId,
client.getConfiguration().getLookupProperties());
properties);
clientCnx.newLookup(request, requestId).whenComplete((r, t) -> {
if (t != null) {
// lookup failed
Expand All @@ -204,7 +208,7 @@ private CompletableFuture<LookupTopicResult> findBroker(InetSocketAddress socket

// (2) redirect to given address if response is: redirect
if (r.redirect) {
findBroker(responseBrokerAddress, r.authoritative, topicName, redirectCount + 1)
findBroker(responseBrokerAddress, r.authoritative, topicName, redirectCount + 1, properties)
.thenAccept(addressFuture::complete)
.exceptionally((lookupException) -> {
Throwable cause = FutureUtil.unwrapCompletionException(lookupException);
Expand Down

0 comments on commit 1439529

Please sign in to comment.