Skip to content

Commit 89f6015

Browse files
authored
[improve][broker] Fix replicated subscriptions race condition with mark delete update and snapshot completion (#16651)
1 parent 270120c commit 89f6015

File tree

7 files changed

+92
-45
lines changed

7 files changed

+92
-45
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java

Lines changed: 39 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -467,21 +467,6 @@ public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<St
467467
}
468468
}
469469

470-
if (!cursor.getMarkDeletedPosition().equals(previousMarkDeletePosition)) {
471-
this.updateLastMarkDeleteAdvancedTimestamp();
472-
473-
// Mark delete position advance
474-
ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache;
475-
if (snapshotCache != null) {
476-
ReplicatedSubscriptionsSnapshot snapshot = snapshotCache
477-
.advancedMarkDeletePosition(cursor.getMarkDeletedPosition());
478-
if (snapshot != null) {
479-
topic.getReplicatedSubscriptionController()
480-
.ifPresent(c -> c.localSubscriptionUpdated(subName, snapshot));
481-
}
482-
}
483-
}
484-
485470
if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog(false) == 0) {
486471
// Notify all consumer that the end of topic was reached
487472
if (dispatcher != null) {
@@ -514,7 +499,7 @@ public void markDeleteComplete(Object ctx) {
514499
dispatcher.afterAckMessages(null, ctx);
515500
}
516501
// Signal the dispatchers to give chance to take extra actions
517-
notifyTheMarkDeletePositionMoveForwardIfNeeded(oldMD);
502+
notifyTheMarkDeletePositionChanged(oldMD);
518503
}
519504

520505
@Override
@@ -541,7 +526,7 @@ public void deleteComplete(Object context) {
541526
if (dispatcher != null) {
542527
dispatcher.afterAckMessages(null, context);
543528
}
544-
notifyTheMarkDeletePositionMoveForwardIfNeeded((Position) context);
529+
notifyTheMarkDeletePositionChanged((Position) context);
545530
}
546531

547532
@Override
@@ -554,11 +539,42 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {
554539
}
555540
};
556541

557-
private void notifyTheMarkDeletePositionMoveForwardIfNeeded(Position oldPosition) {
558-
Position oldMD = oldPosition;
542+
/**
543+
* This method is called after acknowledgements (such as individual acks) have been processed and the mark-delete
544+
* position has possibly been updated and advanced after "ack holes" have been filled up by the latest individual
545+
* acknowledgements.
546+
* @param oldPosition previous mark-delete position before the update
547+
*/
548+
private void notifyTheMarkDeletePositionChanged(Position oldPosition) {
559549
Position newMD = cursor.getMarkDeletedPosition();
560-
if (dispatcher != null && newMD.compareTo(oldMD) > 0) {
561-
dispatcher.markDeletePositionMoveForward();
550+
551+
// check if the mark delete position has changed since the last call
552+
if (newMD.compareTo(oldPosition) != 0) {
553+
updateLastMarkDeleteAdvancedTimestamp();
554+
handleReplicatedSubscriptionsUpdate(newMD);
555+
556+
if (dispatcher != null) {
557+
dispatcher.markDeletePositionMoveForward();
558+
}
559+
}
560+
}
561+
562+
/**
563+
* Checks the snapshot cache for a snapshot that corresponds to the given mark-delete position.
564+
* If a snapshot is found, it will notify the replicated subscription controller that the local subscription
565+
* has been updated.
566+
* This method is called when the mark-delete position is advanced or when a new snapshot is added to the cache.
567+
* When the new snapshot is added, it might be suitable for the current mark-delete position.
568+
* @param markDeletePosition the mark delete position to check for a snapshot
569+
*/
570+
private void handleReplicatedSubscriptionsUpdate(Position markDeletePosition) {
571+
ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache;
572+
if (snapshotCache != null) {
573+
ReplicatedSubscriptionsSnapshot snapshot = snapshotCache.advancedMarkDeletePosition(markDeletePosition);
574+
if (snapshot != null) {
575+
topic.getReplicatedSubscriptionController()
576+
.ifPresent(c -> c.localSubscriptionUpdated(subName, snapshot));
577+
}
562578
}
563579
}
564580

@@ -1569,6 +1585,8 @@ public void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapsho
15691585
ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache;
15701586
if (snapshotCache != null) {
15711587
snapshotCache.addNewSnapshot(new ReplicatedSubscriptionsSnapshot().copyFrom(snapshot));
1588+
// check if the newly added snapshot can be used with the current mark delete position
1589+
handleReplicatedSubscriptionsUpdate(cursor.getMarkDeletedPosition());
15721590
}
15731591
}
15741592

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.broker.service.persistent;
2020

21+
import java.util.Map;
2122
import java.util.NavigableMap;
2223
import java.util.TreeMap;
2324
import lombok.extern.slf4j.Slf4j;
@@ -65,9 +66,15 @@ public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot
6566
public synchronized ReplicatedSubscriptionsSnapshot advancedMarkDeletePosition(Position pos) {
6667
ReplicatedSubscriptionsSnapshot snapshot = null;
6768
while (!snapshots.isEmpty()) {
68-
Position first = snapshots.firstKey();
69+
Map.Entry<Position, ReplicatedSubscriptionsSnapshot> firstEntry =
70+
snapshots.firstEntry();
71+
Position first = firstEntry.getKey();
6972
if (first.compareTo(pos) > 0) {
7073
// Snapshot is associated which an higher position, so it cannot be used now
74+
if (log.isDebugEnabled()) {
75+
log.debug("[{}] Snapshot {} is associated with an higher position {} so it cannot be used for mark "
76+
+ "delete position {}", subscription, firstEntry.getValue(), first, pos);
77+
}
7178
break;
7279
} else {
7380
// This snapshot is potentially good. Continue the search for to see if there is a higher snapshot we

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,8 @@ public void receivedReplicatedSubscriptionMarker(Position position, int markerTy
128128

129129
public void localSubscriptionUpdated(String subscriptionName, ReplicatedSubscriptionsSnapshot snapshot) {
130130
if (log.isDebugEnabled()) {
131-
log.debug("[{}][{}] Updating subscription to snapshot {}", topic, subscriptionName,
131+
log.debug("[{}][{}][{}] Updating subscription to snapshot {}",
132+
topic.getBrokerService().pulsar().getBrokerId(), topic, subscriptionName,
132133
snapshot.getClustersList().stream()
133134
.map(cmid -> String.format("%s -> %d:%d", cmid.getCluster(),
134135
cmid.getMessageId().getLedgerId(), cmid.getMessageId().getEntryId()))
@@ -157,7 +158,8 @@ private void receivedSnapshotRequest(ReplicatedSubscriptionsSnapshotRequest requ
157158
// message id.
158159
Position lastMsgId = topic.getLastPosition();
159160
if (log.isDebugEnabled()) {
160-
log.debug("[{}] Received snapshot request. Last msg id: {}", topic.getName(), lastMsgId);
161+
log.debug("[{}][{}] Received snapshot request. Last msg id: {}",
162+
topic.getBrokerService().pulsar().getBrokerId(), topic.getName(), lastMsgId);
161163
}
162164

163165
ByteBuf marker = Markers.newReplicatedSubscriptionsSnapshotResponse(
@@ -242,7 +244,8 @@ private void startNewSnapshot() {
242244
|| topic.getLastMaxReadPositionMovedForwardTimestamp() == 0) {
243245
// There was no message written since the last snapshot, we can skip creating a new snapshot
244246
if (log.isDebugEnabled()) {
245-
log.debug("[{}] There is no new data in topic. Skipping snapshot creation.", topic.getName());
247+
log.debug("[{}][{}] There is no new data in topic. Skipping snapshot creation.",
248+
topic.getBrokerService().pulsar().getBrokerId(), topic.getName());
246249
}
247250
return;
248251
}
@@ -264,7 +267,8 @@ private void startNewSnapshot() {
264267
}
265268

266269
if (log.isDebugEnabled()) {
267-
log.debug("[{}] Starting snapshot creation.", topic.getName());
270+
log.debug("[{}][{}] Starting snapshot creation.", topic.getBrokerService().pulsar().getBrokerId(),
271+
topic.getName());
268272
}
269273

270274
pendingSnapshotsMetric.inc();
@@ -328,7 +332,8 @@ public void completed(Exception e, long ledgerId, long entryId) {
328332
// Nothing to do in case of publish errors since the retry logic is applied upstream after a snapshot is not
329333
// closed
330334
if (log.isDebugEnabled()) {
331-
log.debug("[{}] Published marker at {}:{}. Exception: {}", topic.getName(), ledgerId, entryId, e);
335+
log.debug("[{}][{}] Published marker at {}:{}. Exception: {}",
336+
topic.getBrokerService().pulsar().getBrokerId(), topic.getName(), ledgerId, entryId, e);
332337
}
333338

334339
this.positionOfLastLocalMarker = PositionFactory.create(ledgerId, entryId);

pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -130,14 +130,16 @@ public static String toJson(Object object) {
130130

131131
/**
132132
* Logs the topic stats and internal stats for the given topic.
133-
* @param logger logger to use
133+
*
134+
* @param logger logger to use
134135
* @param pulsarAdmin PulsarAdmin client to use
135-
* @param topic topic name
136+
* @param topic topic name
137+
* @param description
136138
*/
137-
public static void logTopicStats(Logger logger, PulsarAdmin pulsarAdmin, String topic) {
139+
public static void logTopicStats(Logger logger, PulsarAdmin pulsarAdmin, String topic, String description) {
138140
try {
139-
logger.info("[{}] stats: {}", topic, toJson(pulsarAdmin.topics().getStats(topic)));
140-
logger.info("[{}] internalStats: {}", topic,
141+
logger.info("[{}] {} stats: {}", topic, description, toJson(pulsarAdmin.topics().getStats(topic)));
142+
logger.info("[{}] {} internalStats: {}", topic, description,
141143
toJson(pulsarAdmin.topics().getInternalStats(topic, true)));
142144
} catch (PulsarAdminException e) {
143145
logger.warn("Failed to get stats for topic {}", topic, e);

pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -806,7 +806,11 @@ protected void assertOtelMetricLongSumValue(String metricName, int value) {
806806
}
807807

808808
protected void logTopicStats(String topic) {
809-
BrokerTestUtil.logTopicStats(log, admin, topic);
809+
logTopicStats(topic, "");
810+
}
811+
812+
protected void logTopicStats(String topic, String description) {
813+
BrokerTestUtil.logTopicStats(log, admin, topic, description);
810814
}
811815

812816
@DataProvider(name = "trueFalse")

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,7 @@ public void testReplicatedSubscriptionAcrossTwoRegions() throws Exception {
106106
String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
107107
String topicName = "persistent://" + namespace + "/mytopic";
108108
String subscriptionName = "cluster-subscription";
109-
// Subscription replication produces duplicates, https://github.com/apache/pulsar/issues/10054
110-
// TODO: duplications shouldn't be allowed, change to "false" when fixing the issue
111-
boolean allowDuplicates = true;
112-
// this setting can be used to manually run the test with subscription replication disabled
113-
// it shows that subscription replication has no impact in behavior for this test case
109+
boolean allowDuplicates = false;
114110
boolean replicateSubscriptionState = true;
115111

116112
admin1.namespaces().createNamespace(namespace);
@@ -134,7 +130,8 @@ public void testReplicatedSubscriptionAcrossTwoRegions() throws Exception {
134130

135131
Set<String> sentMessages = new LinkedHashSet<>();
136132

137-
// send messages in r1
133+
log.info("Send messages in r1");
134+
138135
{
139136
@Cleanup
140137
Producer<byte[]> producer = client1.newProducer().topic(topicName)
@@ -144,32 +141,41 @@ public void testReplicatedSubscriptionAcrossTwoRegions() throws Exception {
144141
int numMessages = 6;
145142
for (int i = 0; i < numMessages; i++) {
146143
String body = "message" + i;
147-
producer.send(body.getBytes(StandardCharsets.UTF_8));
144+
MessageId messageId = producer.send(body.getBytes(StandardCharsets.UTF_8));
145+
log.info("Sent message: {} with msgId: {}", body, messageId);
148146
sentMessages.add(body);
147+
if (i == 2) {
148+
// wait for subscription snapshot to be created
149+
Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
150+
}
149151
}
150152
}
151153

152154
Set<String> receivedMessages = new LinkedHashSet<>();
153155

156+
log.info("Consuming 3 messages in r1");
157+
154158
// consume 3 messages in r1
155159
try (Consumer<byte[]> consumer1 = client1.newConsumer()
156160
.topic(topicName)
161+
.receiverQueueSize(2)
157162
.subscriptionName(subscriptionName)
158163
.replicateSubscriptionState(replicateSubscriptionState)
159164
.subscribe()) {
160165
readMessages(consumer1, receivedMessages, 3, allowDuplicates);
166+
log.info("Waiting after reading 3 messages in r1.");
167+
Thread.sleep(config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
161168
}
162169

163-
// wait for subscription to be replicated
164-
Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
165-
166-
// consume remaining messages in r2
170+
log.info("Consume remaining messages in r2");
167171
try (Consumer<byte[]> consumer2 = client2.newConsumer()
168172
.topic(topicName)
169173
.subscriptionName(subscriptionName)
170174
.replicateSubscriptionState(replicateSubscriptionState)
171175
.subscribe()) {
172176
readMessages(consumer2, receivedMessages, -1, allowDuplicates);
177+
} finally {
178+
printStats(topicName);
173179
}
174180

175181
// assert that all messages have been received
@@ -188,6 +194,11 @@ public void testReplicatedSubscriptionAcrossTwoRegions() throws Exception {
188194
histogramPoint -> histogramPoint.hasSumGreaterThan(0.0))));
189195
}
190196

197+
private void printStats(String topicName) throws PulsarAdminException {
198+
BrokerTestUtil.logTopicStats(log, admin1, topicName, "admin1");
199+
BrokerTestUtil.logTopicStats(log, admin2, topicName, "admin2");
200+
}
201+
191202
/**
192203
* Tests replicated subscriptions across two regions and can read successful.
193204
*/

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public abstract class ReplicatorTestBase extends TestRetrySupport {
128128
protected final String cluster2 = "r2";
129129
protected final String cluster3 = "r3";
130130
protected final String cluster4 = "r4";
131-
protected String loadManagerClassName;
131+
protected String loadManagerClassName = "org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl";
132132

133133
protected String getLoadManagerClassName() {
134134
return loadManagerClassName;

0 commit comments

Comments
 (0)