Skip to content

Commit b56256f

Browse files
author
fanjianye
committed
fix prepareInitPoliciesCacheAsync() in SystemTopicBasedTopicPoliciesService
1 parent 36a659f commit b56256f

File tree

2 files changed

+238
-32
lines changed

2 files changed

+238
-32
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java

Lines changed: 46 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -407,42 +407,58 @@ public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle name
407407
@Nonnull CompletableFuture<Void> prepareInitPoliciesCacheAsync(@Nonnull NamespaceName namespace) {
408408
requireNonNull(namespace);
409409
return pulsarService.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace)
410-
.thenCompose(namespacePolicies -> {
411-
if (namespacePolicies.isEmpty() || namespacePolicies.get().deleted) {
412-
log.info("[{}] skip prepare init policies cache since the namespace is deleted",
413-
namespace);
414-
return CompletableFuture.completedFuture(null);
415-
}
410+
.thenCompose(namespacePolicies -> {
411+
if (namespacePolicies.isEmpty() || namespacePolicies.get().deleted) {
412+
log.info("[{}] skip prepare init policies cache since the namespace is deleted",
413+
namespace);
414+
return CompletableFuture.completedFuture(null);
415+
}
416416

417-
return policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
418-
final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
419-
createSystemTopicClient(namespace);
420-
readerCaches.put(namespace, readerCompletableFuture);
421-
ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
422-
final CompletableFuture<Void> initFuture = readerCompletableFuture
423-
.thenCompose(reader -> {
424-
final CompletableFuture<Void> stageFuture = new CompletableFuture<>();
425-
initPolicesCache(reader, stageFuture);
426-
return stageFuture
427-
// Read policies in background
428-
.thenAccept(__ -> readMorePoliciesAsync(reader));
429-
});
430-
initFuture.exceptionallyAsync(ex -> {
417+
CompletableFuture<Void> initNamespacePolicyFuture = new CompletableFuture<>();
418+
CompletableFuture<Void> existingFuture =
419+
policyCacheInitMap.putIfAbsent(namespace, initNamespacePolicyFuture);
420+
if (existingFuture == null) {
421+
final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
422+
newReader(namespace);
423+
ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1));
424+
readerCompletableFuture
425+
.thenCompose(reader -> {
426+
final CompletableFuture<Void> stageFuture = new CompletableFuture<>();
427+
initPolicesCache(reader, stageFuture);
428+
return stageFuture
429+
// Read policies in background
430+
.thenAccept(__ -> readMorePoliciesAsync(reader));
431+
}).thenApply(__ -> {
432+
initNamespacePolicyFuture.complete(null);
433+
return null;
434+
}).exceptionally(ex -> {
431435
try {
432-
log.error("[{}] Failed to create reader on __change_events topic",
433-
namespace, ex);
436+
log.error("[{}] occur exception on reader of __change_events topic. "
437+
+ "try to clean the reader.", namespace, ex);
438+
initNamespacePolicyFuture.completeExceptionally(ex);
434439
cleanCacheAndCloseReader(namespace, false);
435440
} catch (Throwable cleanupEx) {
436441
// Adding this catch to avoid break callback chain
437442
log.error("[{}] Failed to cleanup reader on __change_events topic",
438443
namespace, cleanupEx);
439444
}
440445
return null;
441-
}, pulsarService.getExecutor());
442-
// let caller know we've got an exception.
443-
return initFuture;
444-
});
445-
});
446+
});
447+
448+
return initNamespacePolicyFuture;
449+
} else {
450+
return existingFuture;
451+
}
452+
});
453+
}
454+
455+
private CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> newReader(NamespaceName ns) {
456+
return readerCaches.compute(ns, (__, existingFuture) -> {
457+
if (existingFuture == null) {
458+
return createSystemTopicClient(ns);
459+
}
460+
return existingFuture;
461+
});
446462
}
447463

448464
protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> createSystemTopicClient(
@@ -501,15 +517,13 @@ public boolean test(NamespaceBundle namespaceBundle) {
501517
private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, CompletableFuture<Void> future) {
502518
if (closed.get()) {
503519
future.completeExceptionally(new BrokerServiceException(getClass().getName() + " is closed."));
504-
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
505520
return;
506521
}
507522
reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> {
508523
if (ex != null) {
509524
log.error("[{}] Failed to check the move events for the system topic",
510525
reader.getSystemTopic().getTopicName(), ex);
511526
future.completeExceptionally(ex);
512-
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
513527
return;
514528
}
515529
if (hasMore) {
@@ -524,7 +538,6 @@ private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, Comp
524538
log.error("[{}] Failed to read event from the system topic.",
525539
reader.getSystemTopic().getTopicName(), e);
526540
future.completeExceptionally(e);
527-
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
528541
return null;
529542
});
530543
} else {
@@ -550,11 +563,12 @@ private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, Comp
550563
});
551564
}
552565

553-
private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean cleanOwnedBundlesCount) {
566+
void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean cleanOwnedBundlesCount) {
554567
cleanCacheAndCloseReader(namespace, cleanOwnedBundlesCount, false);
555568
}
556569

557-
private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean cleanOwnedBundlesCount,
570+
@VisibleForTesting
571+
void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean cleanOwnedBundlesCount,
558572
boolean cleanWriterCache) {
559573
if (cleanWriterCache) {
560574
writerCaches.synchronous().invalidate(namespace);

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,19 @@
1919
package org.apache.pulsar.broker.service;
2020

2121
import static org.assertj.core.api.Assertions.assertThat;
22+
import static org.mockito.ArgumentMatchers.any;
23+
import static org.mockito.ArgumentMatchers.anyBoolean;
2224
import static org.mockito.Mockito.spy;
25+
import static org.mockito.Mockito.times;
26+
import static org.mockito.Mockito.verify;
2327
import static org.testng.AssertJUnit.assertEquals;
28+
import static org.testng.AssertJUnit.assertFalse;
2429
import static org.testng.AssertJUnit.assertNotNull;
2530
import static org.testng.AssertJUnit.assertNull;
2631
import static org.testng.AssertJUnit.assertTrue;
2732
import java.lang.reflect.Field;
2833
import java.time.Duration;
34+
import java.util.ArrayList;
2935
import java.util.HashSet;
3036
import java.util.List;
3137
import java.util.Map;
@@ -42,6 +48,10 @@
4248
import lombok.Cleanup;
4349
import lombok.extern.slf4j.Slf4j;
4450
import org.apache.commons.lang3.reflect.FieldUtils;
51+
import org.apache.logging.log4j.LogManager;
52+
import org.apache.logging.log4j.core.LogEvent;
53+
import org.apache.logging.log4j.core.Logger;
54+
import org.apache.logging.log4j.core.appender.AbstractAppender;
4555
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
4656
import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
4757
import org.apache.pulsar.broker.systopic.SystemTopicClient;
@@ -532,4 +542,186 @@ public void testCreateNamespaceEventsSystemTopicFactoryException() throws Except
532542
Assert.assertNotNull(topicPolicies);
533543
Assert.assertEquals(topicPolicies.getMaxConsumerPerTopic(), 10);
534544
}
545+
546+
@Test
547+
public void testPrepareInitPoliciesCacheAsyncThrowExceptionAfterCreateReader() throws Exception {
548+
// catch the log output in SystemTopicBasedTopicPoliciesService
549+
Logger logger = (Logger) LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class);
550+
List<String> logMessages = new ArrayList<>();
551+
AbstractAppender appender = new AbstractAppender("TestAppender", null, null) {
552+
@Override
553+
public void append(LogEvent event) {
554+
logMessages.add(event.getMessage().getFormattedMessage());
555+
}
556+
};
557+
appender.start();
558+
logger.get().addAppender(appender, null, null);
559+
logger.addAppender(appender);
560+
561+
// create namespace-5 and topic
562+
SystemTopicBasedTopicPoliciesService spyService = Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
563+
FieldUtils.writeField(pulsar, "topicPoliciesService", spyService, true);
564+
565+
566+
admin.namespaces().createNamespace(NAMESPACE5);
567+
final String topic = "persistent://" + NAMESPACE5 + "/test" + UUID.randomUUID();
568+
admin.topics().createPartitionedTopic(topic, 1);
569+
570+
CompletableFuture<Void> future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
571+
Assert.assertNull(future);
572+
573+
// mock readerCache and new a reader, then put this reader in readerCache.
574+
// when new reader, would trigger __change_event topic of namespace-5 created
575+
// and would trigger prepareInitPoliciesCacheAsync()
576+
ConcurrentHashMap<NamespaceName, CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
577+
spyReaderCaches = new ConcurrentHashMap<>();
578+
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
579+
spyService.createSystemTopicClient(NamespaceName.get(NAMESPACE5));
580+
spyReaderCaches.put(NamespaceName.get(NAMESPACE5), readerCompletableFuture);
581+
FieldUtils.writeDeclaredField(spyService, "readerCaches", spyReaderCaches, true);
582+
583+
// set topic policy. create producer for __change_event topic
584+
admin.topicPolicies().setMaxConsumersPerSubscription(topic, 1);
585+
future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
586+
Assert.assertNotNull(future);
587+
588+
// trigger close reader of __change_event directly, simulate that reader
589+
// is closed for some reason, such as topic unload or broker restart.
590+
// since prepareInitPoliciesCacheAsync() has been executed, it would go into readMorePoliciesAsync(),
591+
// throw exception, output "Closing the topic policies reader for" and do cleanCacheAndCloseReader()
592+
SystemTopicClient.Reader<PulsarEvent> reader = readerCompletableFuture.get();
593+
reader.close();
594+
log.info("successfully close spy reader");
595+
Awaitility.await().untilAsserted(() -> {
596+
boolean logFound = logMessages.stream()
597+
.anyMatch(msg -> msg.contains("Closing the topic policies reader for"));
598+
assertTrue(logFound);
599+
});
600+
601+
// Since cleanCacheAndCloseReader() is executed, should add the failed reader into readerCache again.
602+
// Then in SystemTopicBasedTopicPoliciesService, readerCache has a closed reader,
603+
// and policyCacheInitMap do not contain a future.
604+
// To simulate the situation: when getTopicPolicy() execute, it will do prepareInitPoliciesCacheAsync() and
605+
// use a closed reader to read the __change_event topic. Then throw exception
606+
spyReaderCaches.put(NamespaceName.get(NAMESPACE5), readerCompletableFuture);
607+
FieldUtils.writeDeclaredField(spyService, "readerCaches", spyReaderCaches, true);
608+
609+
CompletableFuture<Void> prepareFuture = new CompletableFuture<>();
610+
try {
611+
prepareFuture = spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5));
612+
prepareFuture.get();
613+
Assert.fail();
614+
} catch (Exception e) {
615+
// that is ok
616+
}
617+
618+
619+
// since prepareInitPoliciesCacheAsync() throw exception when initPolicesCache(),
620+
// would clean readerCache and policyCacheInitMap
621+
// sleep 500ms to make sure clean operation finish.
622+
Thread.sleep(500);
623+
Assert.assertTrue(prepareFuture.isCompletedExceptionally());
624+
future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
625+
Assert.assertNull(future);
626+
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture1 =
627+
spyReaderCaches.get(NamespaceName.get(NAMESPACE5));
628+
Assert.assertNull(readerCompletableFuture1);
629+
630+
631+
// make sure not do cleanCacheAndCloseReader() twice
632+
// totally trigger prepareInitPoliciesCacheAsync() twice, so the time of cleanCacheAndCloseReader() is 2.
633+
// in previous code, the time would be 3
634+
boolean logFound = logMessages.stream()
635+
.anyMatch(msg -> msg.contains("occur exception on reader of __change_events topic"));
636+
assertTrue(logFound);
637+
boolean logFound2 = logMessages.stream()
638+
.anyMatch(msg -> msg.contains("Failed to check the move events for the system topic"));
639+
assertTrue(logFound2);
640+
verify(spyService, times(2)).cleanCacheAndCloseReader(any(), anyBoolean(), anyBoolean());
641+
642+
// make sure not occur Recursive update
643+
boolean logFound3 = logMessages.stream()
644+
.anyMatch(msg -> msg.contains("Recursive update"));
645+
assertFalse(logFound3);
646+
647+
// clean log appender
648+
appender.stop();
649+
logger.removeAppender(appender);
650+
}
651+
652+
@Test
653+
public void testPrepareInitPoliciesCacheAsyncThrowExceptionInCreateReader() throws Exception {
654+
// catch the log output in SystemTopicBasedTopicPoliciesService
655+
Logger logger = (Logger) LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class);
656+
List<String> logMessages = new ArrayList<>();
657+
AbstractAppender appender = new AbstractAppender("TestAppender", null, null) {
658+
@Override
659+
public void append(LogEvent event) {
660+
logMessages.add(event.getMessage().getFormattedMessage());
661+
}
662+
};
663+
appender.start();
664+
logger.get().addAppender(appender, null, null);
665+
logger.addAppender(appender);
666+
667+
// create namespace-5 and topic
668+
SystemTopicBasedTopicPoliciesService spyService =
669+
Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
670+
FieldUtils.writeField(pulsar, "topicPoliciesService", spyService, true);
671+
672+
673+
admin.namespaces().createNamespace(NAMESPACE5);
674+
final String topic = "persistent://" + NAMESPACE5 + "/test" + UUID.randomUUID();
675+
admin.topics().createPartitionedTopic(topic, 1);
676+
677+
CompletableFuture<Void> future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
678+
Assert.assertNull(future);
679+
680+
// mock readerCache and put a failed readerCreateFuture in readerCache.
681+
// simulate that when trigger prepareInitPoliciesCacheAsync(),
682+
// it would use this failed readerFuture and go into corresponding logic
683+
ConcurrentHashMap<NamespaceName, CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
684+
spyReaderCaches = new ConcurrentHashMap<>();
685+
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture = new CompletableFuture<>();
686+
readerCompletableFuture.completeExceptionally(new Exception("create reader fail"));
687+
spyReaderCaches.put(NamespaceName.get(NAMESPACE5), readerCompletableFuture);
688+
FieldUtils.writeDeclaredField(spyService, "readerCaches", spyReaderCaches, true);
689+
690+
// trigger prepareInitPoliciesCacheAsync()
691+
CompletableFuture<Void> prepareFuture = new CompletableFuture<>();
692+
try {
693+
prepareFuture = spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5));
694+
prepareFuture.get();
695+
Assert.fail();
696+
} catch (Exception e) {
697+
// that is ok
698+
}
699+
700+
// since prepareInitPoliciesCacheAsync() throw exception when createReader,
701+
// would clean readerCache and policyCacheInitMap.
702+
// sleep 500ms to make sure clean operation finish.
703+
Thread.sleep(500);
704+
Assert.assertTrue(prepareFuture.isCompletedExceptionally());
705+
future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
706+
Assert.assertNull(future);
707+
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture1 =
708+
spyReaderCaches.get(NamespaceName.get(NAMESPACE5));
709+
Assert.assertNull(readerCompletableFuture1);
710+
711+
712+
// make sure not do cleanCacheAndCloseReader() twice
713+
// totally trigger prepareInitPoliciesCacheAsync() once, so the time of cleanCacheAndCloseReader() is 1.
714+
boolean logFound = logMessages.stream()
715+
.anyMatch(msg -> msg.contains("occur exception on reader of __change_events topic"));
716+
assertTrue(logFound);
717+
boolean logFound2 = logMessages.stream()
718+
.anyMatch(msg -> msg.contains("Failed to check the move events for the system topic")
719+
|| msg.contains("Failed to read event from the system topic"));
720+
assertFalse(logFound2);
721+
verify(spyService, times(1)).cleanCacheAndCloseReader(any(), anyBoolean(), anyBoolean());
722+
723+
// clean log appender
724+
appender.stop();
725+
logger.removeAppender(appender);
726+
}
535727
}

0 commit comments

Comments
 (0)