diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java index 4c24aa5938b93..8fd8252152898 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java @@ -439,8 +439,8 @@ private void internalStorePut(OpPut opPut) { future.completeExceptionally(getException(Code.BADVERSION, opPut.getPath())); } else { // The z-node does not exist, let's create it first - put(opPut.getPath(), opPut.getData(), Optional.of(-1L)).thenAccept( - s -> future.complete(s)) + put(opPut.getPath(), opPut.getData(), Optional.of(-1L), opPut.getOptions()) + .thenAccept(s -> future.complete(s)) .exceptionally(ex -> { if (ex.getCause() instanceof BadVersionException) { // The z-node exist now, let's overwrite it diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java index 9a38cdbcd2f85..a4c937611fd3f 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java @@ -66,4 +66,38 @@ public void sequentialKeys(String provider, Supplier urlSupplier) throws assertNotEquals(seq1, seq2); assertTrue(n1 < n2); } + + @Test(dataProvider = "impl") + public void testPersistentOrEphemeralPut(String provider, Supplier urlSupplier) throws Exception { + final String key1 = newKey(); + MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(), MetadataStoreConfig.builder().build()); + store.put(key1, "value-1".getBytes(), Optional.empty(), EnumSet.noneOf(CreateOption.class)).join(); + var value = store.get(key1).join().get(); + assertEquals(value.getValue(), "value-1".getBytes()); + // assertFalse(value.getStat().isEphemeral()); // Todo : fix zkStat.getEphemeralOwner() != 0 from test zk + assertTrue(value.getStat().isFirstVersion()); + var version = value.getStat().getVersion(); + + store.put(key1, "value-2".getBytes(), Optional.empty(), EnumSet.noneOf(CreateOption.class)).join(); + value = store.get(key1).join().get(); + assertEquals(value.getValue(), "value-2".getBytes()); + //assertFalse(value.getStat().isEphemeral()); // Todo : fix zkStat.getEphemeralOwner() != 0 from test zk + assertEquals(value.getStat().getVersion(), version + 1); + + final String key2 = newKey(); + store.put(key2, "value-4".getBytes(), Optional.empty(), EnumSet.of(CreateOption.Ephemeral)).join(); + value = store.get(key2).join().get(); + assertEquals(value.getValue(), "value-4".getBytes()); + assertTrue(value.getStat().isEphemeral()); + assertTrue(value.getStat().isFirstVersion()); + version = value.getStat().getVersion(); + + + store.put(key2, "value-5".getBytes(), Optional.empty(), EnumSet.of(CreateOption.Ephemeral)).join(); + value = store.get(key2).join().get(); + assertEquals(value.getValue(), "value-5".getBytes()); + assertTrue(value.getStat().isEphemeral()); + assertEquals(value.getStat().getVersion(), version + 1); + } + }