Skip to content

Commit 5b0fa98

Browse files
lhotariTechnoboy-
authored andcommitted
[improve][broker] Fix replicated subscriptions race condition with mark delete update and snapshot completion (#16651)
1 parent 342ea43 commit 5b0fa98

File tree

7 files changed

+92
-45
lines changed

7 files changed

+92
-45
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java

Lines changed: 39 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -454,21 +454,6 @@ public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<St
454454
}
455455
}
456456

457-
if (!cursor.getMarkDeletedPosition().equals(previousMarkDeletePosition)) {
458-
this.updateLastMarkDeleteAdvancedTimestamp();
459-
460-
// Mark delete position advance
461-
ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache;
462-
if (snapshotCache != null) {
463-
ReplicatedSubscriptionsSnapshot snapshot = snapshotCache
464-
.advancedMarkDeletePosition(cursor.getMarkDeletedPosition());
465-
if (snapshot != null) {
466-
topic.getReplicatedSubscriptionController()
467-
.ifPresent(c -> c.localSubscriptionUpdated(subName, snapshot));
468-
}
469-
}
470-
}
471-
472457
if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog(false) == 0) {
473458
// Notify all consumer that the end of topic was reached
474459
if (dispatcher != null) {
@@ -501,7 +486,7 @@ public void markDeleteComplete(Object ctx) {
501486
dispatcher.afterAckMessages(null, ctx);
502487
}
503488
// Signal the dispatchers to give chance to take extra actions
504-
notifyTheMarkDeletePositionMoveForwardIfNeeded(oldMD);
489+
notifyTheMarkDeletePositionChanged(oldMD);
505490
}
506491

507492
@Override
@@ -528,7 +513,7 @@ public void deleteComplete(Object context) {
528513
if (dispatcher != null) {
529514
dispatcher.afterAckMessages(null, context);
530515
}
531-
notifyTheMarkDeletePositionMoveForwardIfNeeded((Position) context);
516+
notifyTheMarkDeletePositionChanged((Position) context);
532517
}
533518

534519
@Override
@@ -541,11 +526,42 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {
541526
}
542527
};
543528

544-
private void notifyTheMarkDeletePositionMoveForwardIfNeeded(Position oldPosition) {
545-
Position oldMD = oldPosition;
529+
/**
530+
* This method is called after acknowledgements (such as individual acks) have been processed and the mark-delete
531+
* position has possibly been updated and advanced after "ack holes" have been filled up by the latest individual
532+
* acknowledgements.
533+
* @param oldPosition previous mark-delete position before the update
534+
*/
535+
private void notifyTheMarkDeletePositionChanged(Position oldPosition) {
546536
Position newMD = cursor.getMarkDeletedPosition();
547-
if (dispatcher != null && newMD.compareTo(oldMD) > 0) {
548-
dispatcher.markDeletePositionMoveForward();
537+
538+
// check if the mark delete position has changed since the last call
539+
if (newMD.compareTo(oldPosition) != 0) {
540+
updateLastMarkDeleteAdvancedTimestamp();
541+
handleReplicatedSubscriptionsUpdate(newMD);
542+
543+
if (dispatcher != null) {
544+
dispatcher.markDeletePositionMoveForward();
545+
}
546+
}
547+
}
548+
549+
/**
550+
* Checks the snapshot cache for a snapshot that corresponds to the given mark-delete position.
551+
* If a snapshot is found, it will notify the replicated subscription controller that the local subscription
552+
* has been updated.
553+
* This method is called when the mark-delete position is advanced or when a new snapshot is added to the cache.
554+
* When the new snapshot is added, it might be suitable for the current mark-delete position.
555+
* @param markDeletePosition the mark delete position to check for a snapshot
556+
*/
557+
private void handleReplicatedSubscriptionsUpdate(Position markDeletePosition) {
558+
ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache;
559+
if (snapshotCache != null) {
560+
ReplicatedSubscriptionsSnapshot snapshot = snapshotCache.advancedMarkDeletePosition(markDeletePosition);
561+
if (snapshot != null) {
562+
topic.getReplicatedSubscriptionController()
563+
.ifPresent(c -> c.localSubscriptionUpdated(subName, snapshot));
564+
}
549565
}
550566
}
551567

@@ -1550,6 +1566,8 @@ public void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapsho
15501566
ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache;
15511567
if (snapshotCache != null) {
15521568
snapshotCache.addNewSnapshot(new ReplicatedSubscriptionsSnapshot().copyFrom(snapshot));
1569+
// check if the newly added snapshot can be used with the current mark delete position
1570+
handleReplicatedSubscriptionsUpdate(cursor.getMarkDeletedPosition());
15531571
}
15541572
}
15551573

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.broker.service.persistent;
2020

21+
import java.util.Map;
2122
import java.util.NavigableMap;
2223
import java.util.TreeMap;
2324
import lombok.extern.slf4j.Slf4j;
@@ -65,9 +66,15 @@ public synchronized void addNewSnapshot(ReplicatedSubscriptionsSnapshot snapshot
6566
public synchronized ReplicatedSubscriptionsSnapshot advancedMarkDeletePosition(Position pos) {
6667
ReplicatedSubscriptionsSnapshot snapshot = null;
6768
while (!snapshots.isEmpty()) {
68-
Position first = snapshots.firstKey();
69+
Map.Entry<Position, ReplicatedSubscriptionsSnapshot> firstEntry =
70+
snapshots.firstEntry();
71+
Position first = firstEntry.getKey();
6972
if (first.compareTo(pos) > 0) {
7073
// Snapshot is associated which an higher position, so it cannot be used now
74+
if (log.isDebugEnabled()) {
75+
log.debug("[{}] Snapshot {} is associated with an higher position {} so it cannot be used for mark "
76+
+ "delete position {}", subscription, firstEntry.getValue(), first, pos);
77+
}
7178
break;
7279
} else {
7380
// This snapshot is potentially good. Continue the search for to see if there is a higher snapshot we

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,8 @@ public void receivedReplicatedSubscriptionMarker(Position position, int markerTy
128128

129129
public void localSubscriptionUpdated(String subscriptionName, ReplicatedSubscriptionsSnapshot snapshot) {
130130
if (log.isDebugEnabled()) {
131-
log.debug("[{}][{}] Updating subscription to snapshot {}", topic, subscriptionName,
131+
log.debug("[{}][{}][{}] Updating subscription to snapshot {}",
132+
topic.getBrokerService().pulsar().getBrokerId(), topic, subscriptionName,
132133
snapshot.getClustersList().stream()
133134
.map(cmid -> String.format("%s -> %d:%d", cmid.getCluster(),
134135
cmid.getMessageId().getLedgerId(), cmid.getMessageId().getEntryId()))
@@ -157,7 +158,8 @@ private void receivedSnapshotRequest(ReplicatedSubscriptionsSnapshotRequest requ
157158
// message id.
158159
Position lastMsgId = topic.getLastPosition();
159160
if (log.isDebugEnabled()) {
160-
log.debug("[{}] Received snapshot request. Last msg id: {}", topic.getName(), lastMsgId);
161+
log.debug("[{}][{}] Received snapshot request. Last msg id: {}",
162+
topic.getBrokerService().pulsar().getBrokerId(), topic.getName(), lastMsgId);
161163
}
162164

163165
ByteBuf marker = Markers.newReplicatedSubscriptionsSnapshotResponse(
@@ -242,7 +244,8 @@ private void startNewSnapshot() {
242244
|| topic.getLastMaxReadPositionMovedForwardTimestamp() == 0) {
243245
// There was no message written since the last snapshot, we can skip creating a new snapshot
244246
if (log.isDebugEnabled()) {
245-
log.debug("[{}] There is no new data in topic. Skipping snapshot creation.", topic.getName());
247+
log.debug("[{}][{}] There is no new data in topic. Skipping snapshot creation.",
248+
topic.getBrokerService().pulsar().getBrokerId(), topic.getName());
246249
}
247250
return;
248251
}
@@ -264,7 +267,8 @@ private void startNewSnapshot() {
264267
}
265268

266269
if (log.isDebugEnabled()) {
267-
log.debug("[{}] Starting snapshot creation.", topic.getName());
270+
log.debug("[{}][{}] Starting snapshot creation.", topic.getBrokerService().pulsar().getBrokerId(),
271+
topic.getName());
268272
}
269273

270274
pendingSnapshotsMetric.inc();
@@ -328,7 +332,8 @@ public void completed(Exception e, long ledgerId, long entryId) {
328332
// Nothing to do in case of publish errors since the retry logic is applied upstream after a snapshot is not
329333
// closed
330334
if (log.isDebugEnabled()) {
331-
log.debug("[{}] Published marker at {}:{}. Exception: {}", topic.getName(), ledgerId, entryId, e);
335+
log.debug("[{}][{}] Published marker at {}:{}. Exception: {}",
336+
topic.getBrokerService().pulsar().getBrokerId(), topic.getName(), ledgerId, entryId, e);
332337
}
333338

334339
this.positionOfLastLocalMarker = PositionFactory.create(ledgerId, entryId);

pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -130,14 +130,16 @@ public static String toJson(Object object) {
130130

131131
/**
132132
* Logs the topic stats and internal stats for the given topic.
133-
* @param logger logger to use
133+
*
134+
* @param logger logger to use
134135
* @param pulsarAdmin PulsarAdmin client to use
135-
* @param topic topic name
136+
* @param topic topic name
137+
* @param description
136138
*/
137-
public static void logTopicStats(Logger logger, PulsarAdmin pulsarAdmin, String topic) {
139+
public static void logTopicStats(Logger logger, PulsarAdmin pulsarAdmin, String topic, String description) {
138140
try {
139-
logger.info("[{}] stats: {}", topic, toJson(pulsarAdmin.topics().getStats(topic)));
140-
logger.info("[{}] internalStats: {}", topic,
141+
logger.info("[{}] {} stats: {}", topic, description, toJson(pulsarAdmin.topics().getStats(topic)));
142+
logger.info("[{}] {} internalStats: {}", topic, description,
141143
toJson(pulsarAdmin.topics().getInternalStats(topic, true)));
142144
} catch (PulsarAdminException e) {
143145
logger.warn("Failed to get stats for topic {}", topic, e);

pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -779,7 +779,11 @@ protected void assertOtelMetricLongSumValue(String metricName, int value) {
779779
}
780780

781781
protected void logTopicStats(String topic) {
782-
BrokerTestUtil.logTopicStats(log, admin, topic);
782+
logTopicStats(topic, "");
783+
}
784+
785+
protected void logTopicStats(String topic, String description) {
786+
BrokerTestUtil.logTopicStats(log, admin, topic, description);
783787
}
784788

785789
@DataProvider(name = "trueFalse")

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,7 @@ public void testReplicatedSubscriptionAcrossTwoRegions() throws Exception {
106106
String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
107107
String topicName = "persistent://" + namespace + "/mytopic";
108108
String subscriptionName = "cluster-subscription";
109-
// Subscription replication produces duplicates, https://github.com/apache/pulsar/issues/10054
110-
// TODO: duplications shouldn't be allowed, change to "false" when fixing the issue
111-
boolean allowDuplicates = true;
112-
// this setting can be used to manually run the test with subscription replication disabled
113-
// it shows that subscription replication has no impact in behavior for this test case
109+
boolean allowDuplicates = false;
114110
boolean replicateSubscriptionState = true;
115111

116112
admin1.namespaces().createNamespace(namespace);
@@ -134,7 +130,8 @@ public void testReplicatedSubscriptionAcrossTwoRegions() throws Exception {
134130

135131
Set<String> sentMessages = new LinkedHashSet<>();
136132

137-
// send messages in r1
133+
log.info("Send messages in r1");
134+
138135
{
139136
@Cleanup
140137
Producer<byte[]> producer = client1.newProducer().topic(topicName)
@@ -144,32 +141,41 @@ public void testReplicatedSubscriptionAcrossTwoRegions() throws Exception {
144141
int numMessages = 6;
145142
for (int i = 0; i < numMessages; i++) {
146143
String body = "message" + i;
147-
producer.send(body.getBytes(StandardCharsets.UTF_8));
144+
MessageId messageId = producer.send(body.getBytes(StandardCharsets.UTF_8));
145+
log.info("Sent message: {} with msgId: {}", body, messageId);
148146
sentMessages.add(body);
147+
if (i == 2) {
148+
// wait for subscription snapshot to be created
149+
Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
150+
}
149151
}
150152
}
151153

152154
Set<String> receivedMessages = new LinkedHashSet<>();
153155

156+
log.info("Consuming 3 messages in r1");
157+
154158
// consume 3 messages in r1
155159
try (Consumer<byte[]> consumer1 = client1.newConsumer()
156160
.topic(topicName)
161+
.receiverQueueSize(2)
157162
.subscriptionName(subscriptionName)
158163
.replicateSubscriptionState(replicateSubscriptionState)
159164
.subscribe()) {
160165
readMessages(consumer1, receivedMessages, 3, allowDuplicates);
166+
log.info("Waiting after reading 3 messages in r1.");
167+
Thread.sleep(config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
161168
}
162169

163-
// wait for subscription to be replicated
164-
Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
165-
166-
// consume remaining messages in r2
170+
log.info("Consume remaining messages in r2");
167171
try (Consumer<byte[]> consumer2 = client2.newConsumer()
168172
.topic(topicName)
169173
.subscriptionName(subscriptionName)
170174
.replicateSubscriptionState(replicateSubscriptionState)
171175
.subscribe()) {
172176
readMessages(consumer2, receivedMessages, -1, allowDuplicates);
177+
} finally {
178+
printStats(topicName);
173179
}
174180

175181
// assert that all messages have been received
@@ -188,6 +194,11 @@ public void testReplicatedSubscriptionAcrossTwoRegions() throws Exception {
188194
histogramPoint -> histogramPoint.hasSumGreaterThan(0.0))));
189195
}
190196

197+
private void printStats(String topicName) throws PulsarAdminException {
198+
BrokerTestUtil.logTopicStats(log, admin1, topicName, "admin1");
199+
BrokerTestUtil.logTopicStats(log, admin2, topicName, "admin2");
200+
}
201+
191202
/**
192203
* Tests replicated subscriptions across two regions and can read successful.
193204
*/

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public abstract class ReplicatorTestBase extends TestRetrySupport {
128128
protected final String cluster2 = "r2";
129129
protected final String cluster3 = "r3";
130130
protected final String cluster4 = "r4";
131-
protected String loadManagerClassName;
131+
protected String loadManagerClassName = "org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl";
132132

133133
protected String getLoadManagerClassName() {
134134
return loadManagerClassName;

0 commit comments

Comments
 (0)