diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java index be945d988fb88..89dbf2be990b0 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java @@ -181,8 +181,13 @@ private CompletableFuture>> getBookiesThenFreshCache(Str @Override public CompletableFuture watchWritableBookies(RegistrationListener registrationListener) { writableBookiesWatchers.add(registrationListener); + // trigger all listeners in writableBookiesWatchers one by one. It aims to keep a sync way + // to make sure the previous listener has finished when a new listener is register. + // Though it would bring duplicate trigger listener problem, but since watchWritableBookies + // is only executed when bookieClient construct, the duplicate problem is acceptable. return getWritableBookies() - .thenAcceptAsync(registrationListener::onBookiesChanged, executor); + .thenAcceptAsync(bookies -> + writableBookiesWatchers.forEach(w -> w.onBookiesChanged(bookies)), executor); } @Override @@ -193,8 +198,13 @@ public void unwatchWritableBookies(RegistrationListener registrationListener) { @Override public CompletableFuture watchReadOnlyBookies(RegistrationListener registrationListener) { readOnlyBookiesWatchers.add(registrationListener); + // trigger all listeners in readOnlyBookiesWatchers one by one. It aims to keep a sync way + // to make sure the previous listener has finished when a new listener is register. + // Though it would bring duplicate trigger listener problem, but since watchReadOnlyBookies + // is only executed when bookieClient construct, the duplicate problem is acceptable. return getReadOnlyBookies() - .thenAcceptAsync(registrationListener::onBookiesChanged, executor); + .thenAcceptAsync(bookies -> + readOnlyBookiesWatchers.forEach(w -> w.onBookiesChanged(bookies)), executor); } @Override