-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix][broker][branch-3.0] fix prepareInitPoliciesCacheAsync in SystemTopicBasedTopicPoliciesService #24978
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker][branch-3.0] fix prepareInitPoliciesCacheAsync in SystemTopicBasedTopicPoliciesService #24978
Conversation
cdff28e to
8941c47
Compare
8941c47 to
1466a6f
Compare
|
@TakaHiR07 Thanks for the great analysis and fix. Would it be possible to make a fix to master branch too? Does the problem appear there too? Usually we target master branch first and then backport to maintenance branches. |
1466a6f to
b56256f
Compare
@lhotari The problem catch exception and cleanPolicyInitMap twice appear too. Have push a pr, #24980. There is a bit different with branch-3.0, since there are some modification in pr-24658 |
|
@TakaHiR07 Please check this test failure: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR fixes two critical issues in the SystemTopicBasedTopicPoliciesService that caused orphan readers and recursive update errors: (1) duplicate execution of cleanCacheAndCloseReader() leading to concurrency problems, and (2) improper use of computeIfAbsent() causing recursive updates in policyCacheInitMap.
Key changes include:
- Refactored
prepareInitPoliciesCacheAsync()to useputIfAbsent()instead ofcomputeIfAbsent()to avoid recursive updates - Removed redundant
cleanCacheAndCloseReader()calls from exception handlers ininitPolicesCache()since cleanup is now handled once in the outer exception handler - Added a new
newReader()method to ensure only one reader future per namespace
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java | Refactored prepareInitPoliciesCacheAsync() to prevent double cleanup and recursive updates; added newReader() method for proper reader management; removed redundant cleanup calls in initPolicesCache() |
| pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java | Added two comprehensive test cases to verify proper behavior when exceptions occur during reader creation and policy cache initialization, ensuring cleanup happens exactly once |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // sleep 500ms to make sure clean operation finish. | ||
| Thread.sleep(500); | ||
| Assert.assertTrue(prepareFuture.isCompletedExceptionally()); | ||
| future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5)); | ||
| Assert.assertNull(future); | ||
| CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture1 = | ||
| spyReaderCaches.get(NamespaceName.get(NAMESPACE5)); | ||
| Assert.assertNull(readerCompletableFuture1); |
Copilot
AI
Nov 14, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using Thread.sleep(500) in tests is brittle and can lead to flaky test behavior. The test already uses Awaitility in other places. Consider using Awaitility.await() with appropriate conditions to verify that the cleanup operations have completed instead of relying on a fixed sleep duration.
| // sleep 500ms to make sure clean operation finish. | |
| Thread.sleep(500); | |
| Assert.assertTrue(prepareFuture.isCompletedExceptionally()); | |
| future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5)); | |
| Assert.assertNull(future); | |
| CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture1 = | |
| spyReaderCaches.get(NamespaceName.get(NAMESPACE5)); | |
| Assert.assertNull(readerCompletableFuture1); | |
| // Await cleanup operation to finish. | |
| Awaitility.await() | |
| .atMost(Duration.ofSeconds(2)) | |
| .untilAsserted(() -> { | |
| Assert.assertTrue(prepareFuture.isCompletedExceptionally()); | |
| Assert.assertNull(spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5))); | |
| Assert.assertNull(spyReaderCaches.get(NamespaceName.get(NAMESPACE5))); | |
| }); |
| } | ||
| }; | ||
| appender.start(); | ||
| logger.get().addAppender(appender, null, null); |
Copilot
AI
Nov 14, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The call to logger.get().addAppender(appender, null, null) appears redundant with the following line logger.addAppender(appender). Based on the Log4j2 API, Logger.get() returns the underlying LoggerConfig, and the second call to logger.addAppender(appender) should be sufficient. The first call with three parameters (the third being null) is typically used on LoggerConfig objects. Consider removing the first call to simplify the code and avoid potential confusion.
| logger.get().addAppender(appender, null, null); |
| // sleep 500ms to make sure clean operation finish. | ||
| Thread.sleep(500); | ||
| Assert.assertTrue(prepareFuture.isCompletedExceptionally()); |
Copilot
AI
Nov 14, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using Thread.sleep(500) in tests is brittle and can lead to flaky test behavior. The test already uses Awaitility in other places (line 595). Consider using Awaitility.await() with appropriate conditions to verify that the cleanup operations have completed instead of relying on a fixed sleep duration.
| // sleep 500ms to make sure clean operation finish. | |
| Thread.sleep(500); | |
| Assert.assertTrue(prepareFuture.isCompletedExceptionally()); | |
| // Wait until cleanup operation finishes. | |
| Awaitility.await() | |
| .atMost(Duration.ofSeconds(2)) | |
| .until(() -> prepareFuture.isCompletedExceptionally() | |
| && spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5)) == null | |
| && spyReaderCaches.get(NamespaceName.get(NAMESPACE5)) == null); |
| } | ||
| }; | ||
| appender.start(); | ||
| logger.get().addAppender(appender, null, null); |
Copilot
AI
Nov 14, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The call to logger.get().addAppender(appender, null, null) appears redundant with the following line logger.addAppender(appender). Based on the Log4j2 API, Logger.get() returns the underlying LoggerConfig, and the second call to logger.addAppender(appender) should be sufficient. The first call with three parameters (the third being null) is typically used on LoggerConfig objects. Consider removing the first call to simplify the code and avoid potential confusion.
| logger.get().addAppender(appender, null, null); |
@lhotari have fixed. In testTopicPoliciesAfterCompaction#clearTopicPoliciesCache, should also clear readerCaches. readerCaches and policyCacheInitMap put and remove element is always together. |
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Fixes #24977
Motivation
As shown in the issue, fix two problem: 1. cleanCacheAndCloseReader() executed twice cause concurrent error, which result in too many orphan reader remain in SystemTopicBasedTopicPoliciesService 2. double update in policyCacheInitMap cause recursive update error
Modifications
There is one point should be consider in this pr
Besides, this case still exist: if failed to close reader in cleanCacheAndCloseReader(), this closing reader maybe have chance to reconnect and become orphan reader. This is not this pr's work.
Verifying this change
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-complete