Skip to content

Commit cdff28e

Browse files
author
fanjianye
committed
fix prepareInitPoliciesCacheAsync() in SystemTopicBasedTopicPoliciesService
1 parent 36a659f commit cdff28e

File tree

2 files changed

+151
-33
lines changed

2 files changed

+151
-33
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java

Lines changed: 47 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -407,42 +407,59 @@ public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle name
407407
@Nonnull CompletableFuture<Void> prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) {
408408
requireNonNull(namespace);
409409
return pulsarService.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace)
410-
.thenCompose(namespacePolicies -> {
411-
if (namespacePolicies.isEmpty() || namespacePolicies.get().deleted) {
412-
log.info("[{}] skip prepare init policies cache since the namespace is deleted",
413-
namespace);
414-
return CompletableFuture.completedFuture(null);
415-
}
410+
.thenCompose(namespacePolicies -> {
411+
if (namespacePolicies.isEmpty() || namespacePolicies.get().deleted) {
412+
log.info("[{}] skip prepare init policies cache since the namespace is deleted",
413+
namespace);
414+
return CompletableFuture.completedFuture(null);
415+
}
416416

417-
return policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
418-
final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
419-
createSystemTopicClient(namespace);
420-
readerCaches.put(namespace, readerCompletableFuture);
421-
ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
422-
final CompletableFuture<Void> initFuture = readerCompletableFuture
423-
.thenCompose(reader -> {
424-
final CompletableFuture<Void> stageFuture = new CompletableFuture<>();
425-
initPolicesCache(reader, stageFuture);
426-
return stageFuture
427-
// Read policies in background
428-
.thenAccept(__ -> readMorePoliciesAsync(reader));
429-
});
430-
initFuture.exceptionallyAsync(ex -> {
417+
CompletableFuture<Void> initNamespacePolicyFuture = new CompletableFuture<>();
418+
CompletableFuture<Void> existingFuture =
419+
policyCacheInitMap.putIfAbsent(namespace, initNamespacePolicyFuture);
420+
if (existingFuture == null) {
421+
final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
422+
newReader(namespace);
423+
readerCaches.put(namespace, readerCompletableFuture);
424+
ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
425+
readerCompletableFuture
426+
.thenCompose(reader -> {
427+
final CompletableFuture<Void> stageFuture = new CompletableFuture<>();
428+
initPolicesCache(reader, stageFuture);
429+
return stageFuture
430+
// Read policies in background
431+
.thenAccept(__ -> readMorePoliciesAsync(reader));
432+
}).thenApply(__ -> {
433+
initNamespacePolicyFuture.complete(null);
434+
return null;
435+
}).exceptionally(ex -> {
431436
try {
432-
log.error("[{}] Failed to create reader on __change_events topic",
433-
namespace, ex);
437+
log.error("[{}] occur exception on reader of __change_events topic. "
438+
+ "try to clean the reader.", namespace, ex);
439+
initNamespacePolicyFuture.completeExceptionally(ex);
434440
cleanCacheAndCloseReader(namespace, false);
435441
} catch (Throwable cleanupEx) {
436442
// Adding this catch to avoid break callback chain
437443
log.error("[{}] Failed to cleanup reader on __change_events topic",
438444
namespace, cleanupEx);
439445
}
440446
return null;
441-
}, pulsarService.getExecutor());
442-
// let caller know we've got an exception.
443-
return initFuture;
444-
});
445-
});
447+
});
448+
449+
return initNamespacePolicyFuture;
450+
} else {
451+
return existingFuture;
452+
}
453+
});
454+
}
455+
456+
private CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> newReader(NamespaceName ns) {
457+
return readerCaches.compute(ns, (__, existingFuture) -> {
458+
if (existingFuture == null) {
459+
return createSystemTopicClient(ns);
460+
}
461+
return existingFuture;
462+
});
446463
}
447464

448465
protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> createSystemTopicClient(
@@ -501,15 +518,13 @@ public boolean test(NamespaceBundle namespaceBundle) {
501518
private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, CompletableFuture<Void> future) {
502519
if (closed.get()) {
503520
future.completeExceptionally(new BrokerServiceException(getClass().getName() + " is closed."));
504-
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
505521
return;
506522
}
507523
reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> {
508524
if (ex != null) {
509525
log.error("[{}] Failed to check the move events for the system topic",
510526
reader.getSystemTopic().getTopicName(), ex);
511527
future.completeExceptionally(ex);
512-
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
513528
return;
514529
}
515530
if (hasMore) {
@@ -524,7 +539,6 @@ private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, Comp
524539
log.error("[{}] Failed to read event from the system topic.",
525540
reader.getSystemTopic().getTopicName(), e);
526541
future.completeExceptionally(e);
527-
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
528542
return null;
529543
});
530544
} else {
@@ -550,11 +564,12 @@ private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, Comp
550564
});
551565
}
552566

553-
private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean cleanOwnedBundlesCount) {
567+
void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean cleanOwnedBundlesCount) {
554568
cleanCacheAndCloseReader(namespace, cleanOwnedBundlesCount, false);
555569
}
556570

557-
private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean cleanOwnedBundlesCount,
571+
@VisibleForTesting
572+
void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean cleanOwnedBundlesCount,
558573
boolean cleanWriterCache) {
559574
if (cleanWriterCache) {
560575
writerCaches.synchronous().invalidate(namespace);

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java

Lines changed: 104 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,19 @@
1919
package org.apache.pulsar.broker.service;
2020

2121
import static org.assertj.core.api.Assertions.assertThat;
22+
import static org.mockito.ArgumentMatchers.any;
23+
import static org.mockito.ArgumentMatchers.anyBoolean;
2224
import static org.mockito.Mockito.spy;
25+
import static org.mockito.Mockito.times;
26+
import static org.mockito.Mockito.verify;
2327
import static org.testng.AssertJUnit.assertEquals;
28+
import static org.testng.AssertJUnit.assertFalse;
2429
import static org.testng.AssertJUnit.assertNotNull;
2530
import static org.testng.AssertJUnit.assertNull;
2631
import static org.testng.AssertJUnit.assertTrue;
2732
import java.lang.reflect.Field;
2833
import java.time.Duration;
34+
import java.util.ArrayList;
2935
import java.util.HashSet;
3036
import java.util.List;
3137
import java.util.Map;
@@ -42,6 +48,10 @@
4248
import lombok.Cleanup;
4349
import lombok.extern.slf4j.Slf4j;
4450
import org.apache.commons.lang3.reflect.FieldUtils;
51+
import org.apache.logging.log4j.LogManager;
52+
import org.apache.logging.log4j.core.LogEvent;
53+
import org.apache.logging.log4j.core.Logger;
54+
import org.apache.logging.log4j.core.appender.AbstractAppender;
4555
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
4656
import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
4757
import org.apache.pulsar.broker.systopic.SystemTopicClient;
@@ -398,7 +408,7 @@ public void testGetTopicPoliciesWithCleanCache() throws Exception {
398408
Mockito.doAnswer(invocation -> {
399409
Thread.sleep(1000);
400410
return invocation.callRealMethod();
401-
}).when(spyPoliciesCache).get(Mockito.any());
411+
}).when(spyPoliciesCache).get(any());
402412

403413
CompletableFuture<Void> result = new CompletableFuture<>();
404414
Thread thread = new Thread(() -> {
@@ -532,4 +542,97 @@ public void testCreateNamespaceEventsSystemTopicFactoryException() throws Except
532542
Assert.assertNotNull(topicPolicies);
533543
Assert.assertEquals(topicPolicies.getMaxConsumerPerTopic(), 10);
534544
}
545+
546+
@Test
547+
public void testPrepareInitPoliciesCacheAsyncWithErrorReader() throws Exception {
548+
// catch the log output in SystemTopicBasedTopicPoliciesService
549+
Logger logger = (Logger) LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class);
550+
List<String> logMessages = new ArrayList<>();
551+
AbstractAppender appender = new AbstractAppender("TestAppender", null, null) {
552+
@Override
553+
public void append(LogEvent event) {
554+
logMessages.add(event.getMessage().getFormattedMessage());
555+
}
556+
};
557+
appender.start();
558+
logger.get().addAppender(appender, null, null);
559+
logger.addAppender(appender);
560+
561+
// create namespace-5 and topic
562+
SystemTopicBasedTopicPoliciesService spyService = Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
563+
FieldUtils.writeField(pulsar, "topicPoliciesService", spyService, true);
564+
565+
566+
admin.namespaces().createNamespace(NAMESPACE5);
567+
final String topic = "persistent://" + NAMESPACE5 + "/test" + UUID.randomUUID();
568+
admin.topics().createPartitionedTopic(topic, 1);
569+
570+
CompletableFuture<Void> future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
571+
Assert.assertNull(future);
572+
573+
// mock readerCache and new a reader, then put this reader in readerCache.
574+
// when new reader, would trigger __change_event topic of namespace-5 created
575+
// and would trigger prepareInitPoliciesCacheAsync()
576+
ConcurrentHashMap<NamespaceName, CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
577+
spyReaderCaches = new ConcurrentHashMap<>();
578+
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
579+
spyService.createSystemTopicClient(NamespaceName.get(NAMESPACE5));
580+
spyReaderCaches.put(NamespaceName.get(NAMESPACE5), readerCompletableFuture);
581+
FieldUtils.writeDeclaredField(spyService, "readerCaches", spyReaderCaches, true);
582+
583+
// set topic policy. create producer for __change_event topic
584+
admin.topicPolicies().setMaxConsumersPerSubscription(topic, 1);
585+
future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
586+
Assert.assertNotNull(future);
587+
588+
// trigger close reader of __change_event directly, simulate that reader
589+
// is closed for some reason, such as topic unload or broker restart.
590+
// since prepareInitPoliciesCacheAsync() has been executed, it would go into readMorePoliciesAsync(),
591+
// throw exception, output "Closing the topic policies reader for" and do cleanCacheAndCloseReader()
592+
SystemTopicClient.Reader<PulsarEvent> reader = readerCompletableFuture.get();
593+
reader.close();
594+
log.info("successfully close spy reader");
595+
596+
// Since cleanCacheAndCloseReader() is executed, should add the failed reader into readerCache again.
597+
// Then in SystemTopicBasedTopicPoliciesService, readerCache has a closed reader,
598+
// and policyCacheInitMap do not contain a future.
599+
// To simulate the situation: when getTopicPolicy() execute, it will do prepareInitPoliciesCacheAsync() and
600+
// use a closed reader to reader the __change_event topic. Then throw exception
601+
spyReaderCaches.put(NamespaceName.get(NAMESPACE5), readerCompletableFuture);
602+
FieldUtils.writeDeclaredField(spyService, "readerCaches", spyReaderCaches, true);
603+
604+
CompletableFuture<Void> prepareFuture = new CompletableFuture<>();
605+
try {
606+
prepareFuture = spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5));
607+
prepareFuture.get();
608+
Assert.fail();
609+
} catch (Exception e) {
610+
// that is ok
611+
}
612+
613+
614+
// since prepareInitPoliciesCacheAsync() throw exception when initPolicesCache(),
615+
// would clean readerCache and policyCacheInitMap
616+
Assert.assertTrue(prepareFuture.isCompletedExceptionally());
617+
future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
618+
Assert.assertNull(future);
619+
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture1 =
620+
spyReaderCaches.get(NamespaceName.get(NAMESPACE5));
621+
Assert.assertNull(readerCompletableFuture1);
622+
623+
624+
// make sure not do cleanCacheAndCloseReader() twice
625+
boolean logFound = logMessages.stream()
626+
.anyMatch(msg -> msg.contains("occur exception on reader of __change_events topic"));
627+
assertTrue(logFound);
628+
boolean logFound2 = logMessages.stream()
629+
.anyMatch(msg -> msg.contains("Failed to check the move events for the system topic"));
630+
assertTrue(logFound2);
631+
verify(spyService, times(2)).cleanCacheAndCloseReader(any(), anyBoolean(), anyBoolean());
632+
633+
// make sure not occur Recursive update
634+
boolean logFound3 = logMessages.stream()
635+
.anyMatch(msg -> msg.contains("Recursive update"));
636+
assertFalse(logFound3);
637+
}
535638
}

0 commit comments

Comments
 (0)