Skip to content

Commit b034bc6

Browse files
committed
Skip replicated subscription update if the previous update was for the same snapshot id
- Reduces unnecessary updates - Move replicated subscription advance markdelete position logic to proper code location - the operation is asynchronous and should be handled after the markdelete position updating operation completes
1 parent 630c758 commit b034bc6

File tree

1 file changed

+22
-15
lines changed

1 file changed

+22
-15
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
120120
REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
121121
}
122122

123+
private volatile String lastLocalSubscriptionUpdatedSnapshotId;
124+
123125
static Map<String, Long> getBaseCursorProperties(boolean isReplicated) {
124126
return isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
125127
}
@@ -397,21 +399,6 @@ public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<St
397399
}
398400
}
399401

400-
if (!cursor.getMarkDeletedPosition().equals(previousMarkDeletePosition)) {
401-
this.updateLastMarkDeleteAdvancedTimestamp();
402-
403-
// Mark delete position advance
404-
ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache;
405-
if (snapshotCache != null) {
406-
ReplicatedSubscriptionsSnapshot snapshot = snapshotCache
407-
.advancedMarkDeletePosition((PositionImpl) cursor.getMarkDeletedPosition());
408-
if (snapshot != null) {
409-
topic.getReplicatedSubscriptionController()
410-
.ifPresent(c -> c.localSubscriptionUpdated(subName, snapshot));
411-
}
412-
}
413-
}
414-
415402
if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog(false) == 0) {
416403
// Notify all consumer that the end of topic was reached
417404
if (dispatcher != null) {
@@ -471,6 +458,26 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {
471458
private void notifyTheMarkDeletePositionMoveForwardIfNeeded(Position oldPosition) {
472459
PositionImpl oldMD = (PositionImpl) oldPosition;
473460
PositionImpl newMD = (PositionImpl) cursor.getMarkDeletedPosition();
461+
462+
if (!newMD.equals(oldMD)) {
463+
this.updateLastMarkDeleteAdvancedTimestamp();
464+
465+
// Mark delete position advance
466+
ReplicatedSubscriptionSnapshotCache snapshotCache = this.replicatedSubscriptionSnapshotCache;
467+
if (snapshotCache != null) {
468+
ReplicatedSubscriptionsSnapshot snapshot = snapshotCache.advancedMarkDeletePosition(newMD);
469+
if (snapshot != null) {
470+
topic.getReplicatedSubscriptionController()
471+
.ifPresent(c -> {
472+
if (!snapshot.getSnapshotId().equals(lastLocalSubscriptionUpdatedSnapshotId)) {
473+
c.localSubscriptionUpdated(subName, snapshot);
474+
lastLocalSubscriptionUpdatedSnapshotId = snapshot.getSnapshotId();
475+
}
476+
});
477+
}
478+
}
479+
}
480+
474481
if (dispatcher != null && newMD.compareTo(oldMD) > 0) {
475482
dispatcher.markDeletePositionMoveForward();
476483
}

0 commit comments

Comments
 (0)