Skip to content

Conversation

@TakaHiR07
Copy link
Contributor

@TakaHiR07 TakaHiR07 commented Nov 13, 2025

Fixes #24977

Motivation

As shown in the issue, fix two problem: 1. cleanCacheAndCloseReader() executed twice cause concurrent error, which result in too many orphan reader remain in SystemTopicBasedTopicPoliciesService 2. double update in policyCacheInitMap cause recursive update error

Modifications

  1. no need to do cleanCacheAndCloseReader() when throw exception, since the exception would be catch in outside code. By the way, in previous pulsar-version 2.9.x, cleanCacheAndCloseReader is also executed only once
  2. avoid double update in policyCacheInitMap. use putIfAbsent instead of computeIfAbsent. It is not appropriate to add so many operation into compute().
  3. add two test, to simulate if throw exception in createReader, initPolicyCache, readMorePolicy of prepareInitPoliciesCacheAsync. By the way, it seems lack of unittest in SystemTopicBasedTopicPoliciesService.
  4. new method "newReader()" to ensure only one readerCreateCompletableFuture. Actually this method is add for test. The whole process of prepareInitPoliciesCacheAsync() is : put future -> put reader -> throw exception -> remove reader -> remove future. so even without "newReader()", namespace's reader in readerCache can be ensure only one.

There is one point should be consider in this pr

  1. When use putIfAbsent, if too many getTopicPolicy() trigger prepareInitPoliciesCacheAsync, it would generate many empty completableFuture. Further more, we can use double check in the code to avoid this object gc.(the code would be ugly).

Besides, this case still exist: if failed to close reader in cleanCacheAndCloseReader(), this closing reader maybe have chance to reconnect and become orphan reader. This is not this pr's work.

Verifying this change

  • [] Make sure that the change passes the CI checks.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Nov 13, 2025
@TakaHiR07 TakaHiR07 force-pushed the branch-3.0-fix_prepareInitPoliciesCacheAsync branch from cdff28e to 8941c47 Compare November 13, 2025 12:37
@TakaHiR07 TakaHiR07 changed the title [fix][broker] fix prepareInitPoliciesCacheAsync in SystemTopicBasedTopicPoliciesService [fix][broker][branch-3.0] fix prepareInitPoliciesCacheAsync in SystemTopicBasedTopicPoliciesService Nov 13, 2025
@TakaHiR07 TakaHiR07 force-pushed the branch-3.0-fix_prepareInitPoliciesCacheAsync branch from 8941c47 to 1466a6f Compare November 13, 2025 13:11
@lhotari
Copy link
Member

lhotari commented Nov 13, 2025

@TakaHiR07 Thanks for the great analysis and fix. Would it be possible to make a fix to master branch too? Does the problem appear there too? Usually we target master branch first and then backport to maintenance branches.

@TakaHiR07 TakaHiR07 force-pushed the branch-3.0-fix_prepareInitPoliciesCacheAsync branch from 1466a6f to b56256f Compare November 14, 2025 09:51
@TakaHiR07
Copy link
Contributor Author

TakaHiR07 commented Nov 14, 2025

@TakaHiR07 Thanks for the great analysis and fix. Would it be possible to make a fix to master branch too? Does the problem appear there too? Usually we target master branch first and then backport to maintenance branches.

@lhotari The problem catch exception and cleanPolicyInitMap twice appear too. Have push a pr, #24980. There is a bit different with branch-3.0, since there are some modification in pr-24658

@lhotari
Copy link
Member

lhotari commented Nov 14, 2025

@TakaHiR07 Please check this test failure:

  Error:  Tests run: 155, Failures: 1, Errors: 0, Skipped: 53, Time elapsed: 853.185 s <<< FAILURE! - in org.apache.pulsar.broker.admin.TopicPoliciesTest
  Error:  org.apache.pulsar.broker.admin.TopicPoliciesTest.testTopicPoliciesAfterCompaction[Clean_Cache](4)  Time elapsed: 0.367 s  <<< FAILURE!
  java.lang.AssertionError: expected [true] but found [false]
  	at org.testng.Assert.fail(Assert.java:110)
  	at org.testng.Assert.failNotEquals(Assert.java:1577)
  	at org.testng.Assert.assertTrue(Assert.java:56)
  	at org.testng.Assert.assertTrue(Assert.java:66)
  	at org.apache.pulsar.broker.admin.TopicPoliciesTest.testTopicPoliciesAfterCompaction(TopicPoliciesTest.java:3432)
  	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
  	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
  	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
  	at org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
  	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
  	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
  	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
  	at java.base/java.lang.Thread.run(Thread.java:840)

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes two critical issues in the SystemTopicBasedTopicPoliciesService that caused orphan readers and recursive update errors: (1) duplicate execution of cleanCacheAndCloseReader() leading to concurrency problems, and (2) improper use of computeIfAbsent() causing recursive updates in policyCacheInitMap.

Key changes include:

  • Refactored prepareInitPoliciesCacheAsync() to use putIfAbsent() instead of computeIfAbsent() to avoid recursive updates
  • Removed redundant cleanCacheAndCloseReader() calls from exception handlers in initPolicesCache() since cleanup is now handled once in the outer exception handler
  • Added a new newReader() method to ensure only one reader future per namespace

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.

File Description
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java Refactored prepareInitPoliciesCacheAsync() to prevent double cleanup and recursive updates; added newReader() method for proper reader management; removed redundant cleanup calls in initPolicesCache()
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java Added two comprehensive test cases to verify proper behavior when exceptions occur during reader creation and policy cache initialization, ensuring cleanup happens exactly once

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 702 to 709
// sleep 500ms to make sure clean operation finish.
Thread.sleep(500);
Assert.assertTrue(prepareFuture.isCompletedExceptionally());
future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
Assert.assertNull(future);
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture1 =
spyReaderCaches.get(NamespaceName.get(NAMESPACE5));
Assert.assertNull(readerCompletableFuture1);
Copy link

Copilot AI Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using Thread.sleep(500) in tests is brittle and can lead to flaky test behavior. The test already uses Awaitility in other places. Consider using Awaitility.await() with appropriate conditions to verify that the cleanup operations have completed instead of relying on a fixed sleep duration.

Suggested change
// sleep 500ms to make sure clean operation finish.
Thread.sleep(500);
Assert.assertTrue(prepareFuture.isCompletedExceptionally());
future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
Assert.assertNull(future);
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture1 =
spyReaderCaches.get(NamespaceName.get(NAMESPACE5));
Assert.assertNull(readerCompletableFuture1);
// Await cleanup operation to finish.
Awaitility.await()
.atMost(Duration.ofSeconds(2))
.untilAsserted(() -> {
Assert.assertTrue(prepareFuture.isCompletedExceptionally());
Assert.assertNull(spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5)));
Assert.assertNull(spyReaderCaches.get(NamespaceName.get(NAMESPACE5)));
});

Copilot uses AI. Check for mistakes.
}
};
appender.start();
logger.get().addAppender(appender, null, null);
Copy link

Copilot AI Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The call to logger.get().addAppender(appender, null, null) appears redundant with the following line logger.addAppender(appender). Based on the Log4j2 API, Logger.get() returns the underlying LoggerConfig, and the second call to logger.addAppender(appender) should be sufficient. The first call with three parameters (the third being null) is typically used on LoggerConfig objects. Consider removing the first call to simplify the code and avoid potential confusion.

Suggested change
logger.get().addAppender(appender, null, null);

Copilot uses AI. Check for mistakes.
Comment on lines 621 to 623
// sleep 500ms to make sure clean operation finish.
Thread.sleep(500);
Assert.assertTrue(prepareFuture.isCompletedExceptionally());
Copy link

Copilot AI Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using Thread.sleep(500) in tests is brittle and can lead to flaky test behavior. The test already uses Awaitility in other places (line 595). Consider using Awaitility.await() with appropriate conditions to verify that the cleanup operations have completed instead of relying on a fixed sleep duration.

Suggested change
// sleep 500ms to make sure clean operation finish.
Thread.sleep(500);
Assert.assertTrue(prepareFuture.isCompletedExceptionally());
// Wait until cleanup operation finishes.
Awaitility.await()
.atMost(Duration.ofSeconds(2))
.until(() -> prepareFuture.isCompletedExceptionally()
&& spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5)) == null
&& spyReaderCaches.get(NamespaceName.get(NAMESPACE5)) == null);

Copilot uses AI. Check for mistakes.
}
};
appender.start();
logger.get().addAppender(appender, null, null);
Copy link

Copilot AI Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The call to logger.get().addAppender(appender, null, null) appears redundant with the following line logger.addAppender(appender). Based on the Log4j2 API, Logger.get() returns the underlying LoggerConfig, and the second call to logger.addAppender(appender) should be sufficient. The first call with three parameters (the third being null) is typically used on LoggerConfig objects. Consider removing the first call to simplify the code and avoid potential confusion.

Suggested change
logger.get().addAppender(appender, null, null);

Copilot uses AI. Check for mistakes.
@TakaHiR07
Copy link
Contributor Author

@TakaHiR07 Please check this test failure:

  Error:  Tests run: 155, Failures: 1, Errors: 0, Skipped: 53, Time elapsed: 853.185 s <<< FAILURE! - in org.apache.pulsar.broker.admin.TopicPoliciesTest
  Error:  org.apache.pulsar.broker.admin.TopicPoliciesTest.testTopicPoliciesAfterCompaction[Clean_Cache](4)  Time elapsed: 0.367 s  <<< FAILURE!

@lhotari have fixed. In testTopicPoliciesAfterCompaction#clearTopicPoliciesCache, should also clear readerCaches. readerCaches and policyCacheInitMap put and remove element is always together.

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@lhotari lhotari merged commit 22e0a97 into apache:branch-3.0 Dec 10, 2025
87 of 90 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs ready-to-test release/3.0.16

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants