Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][meta] Fix ephemeral handling of ZK nodes and fix MockZooKeeper ephemeral and ZK stat handling #23988

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
ee22969
[fix][metadata] Fix ephemeral status of ZK nodes
lhotari Feb 14, 2025
c45c6d6
Run pulsar-metadata tests with MockZooKeeper
lhotari Feb 14, 2025
cd5610a
Improve test names
lhotari Feb 14, 2025
244a65d
Fix Stat handling in MockZooKeeper
lhotari Feb 14, 2025
d9ec2d1
Cleanup watchers tied to a particular ZK session in MockZooKeeper
lhotari Feb 14, 2025
153289e
Improve MockZNode handling
lhotari Feb 14, 2025
69ca2df
Pass the overridden session id to the executor thread
lhotari Feb 14, 2025
d60b9bc
Don't use direct executor with MockZooKeeper since it could cause ord…
lhotari Feb 14, 2025
2298de6
Ensure that connection strings don't change during a single execution
lhotari Feb 14, 2025
0f5c46f
Improve checkContainers so that it's only called for ZK
lhotari Feb 14, 2025
701dfad
Skip testDeleteUnusedDirectories for MockZooKeeper
lhotari Feb 14, 2025
13ac174
Improve assertions in getChildren test
lhotari Feb 14, 2025
368993b
Fix MockZooKeeper's getChildren implementation
lhotari Feb 14, 2025
ea760b4
Consistently unlock before calling callbacks
lhotari Feb 14, 2025
4989e21
Restore separate mockZooKeeperGlobal instance
lhotari Feb 14, 2025
9cbd80f
Show test name for BrokerServiceLookupTest in IntelliJ
lhotari Feb 14, 2025
ee670b3
Optimize MockZooKeeper getChildren
lhotari Feb 15, 2025
53e587e
Improve BaseMetadataStoreTest: reduce duplication
lhotari Feb 15, 2025
aee6696
Use zkImpl so that provider name shows up in IntelliJ test runner
lhotari Feb 15, 2025
38f4b77
Also test MockZooKeeper with zk-only tests
lhotari Feb 15, 2025
89283e0
Allow filtering providers with TEST_METADATA_PROVIDERS env variable
lhotari Feb 15, 2025
4d833fe
Get rid of reflection in MockZooKeeper
lhotari Feb 15, 2025
cb32580
Handle any exception in multi ops
lhotari Feb 15, 2025
9d1c994
Fix checkstyle
lhotari Feb 15, 2025
85e217c
Log errors in MockZooKeeper multi
lhotari Feb 15, 2025
6e2175b
Fix error handling in MockZooKeeper multi
lhotari Feb 15, 2025
4a5a492
Consistently trigger watches in executor
lhotari Feb 15, 2025
5bd681a
Improve error logging in MockZooKeeper multi
lhotari Feb 15, 2025
4c27282
Always use a single threaded executor
lhotari Feb 15, 2025
be6a424
Replace locks in MockZooKeeper with single threaded execution
lhotari Feb 15, 2025
ccf1371
Improve solution to run pulsar-metadata tests since specific providers
lhotari Feb 15, 2025
3994410
Shutdown outside of executor thread
lhotari Feb 15, 2025
becc221
Fix hasChildren in MockZooKeeper
lhotari Feb 15, 2025
cfc2467
Pass path in NoNodeException
lhotari Feb 15, 2025
18ab5f3
Add logging to see what happens
lhotari Feb 15, 2025
4033fc3
Increase timeout to make the test pass on MockZooKeeper
lhotari Feb 15, 2025
02d01e3
Reduce logging in MockZooKeeper for multiops
lhotari Feb 15, 2025
3852360
Disable invalid test BrokerServiceLookupTest.testLookupConnectionNotC…
lhotari Feb 15, 2025
28bb928
Optimize MockZooKeeper getChildren
lhotari Feb 15, 2025
7d833f8
Fix some NPEs at startup when LeaderElectionService isn't yet available
lhotari Feb 17, 2025
87fad55
Add better exception message
lhotari Feb 17, 2025
6db5a4d
Fix exceptions caused by invalid bundle keys
lhotari Feb 17, 2025
5212fa2
Allow proper shutdown of BrokerService in BrokerServiceLookupTest.tes…
lhotari Feb 17, 2025
f598d95
Refactor restarting broker with different config
lhotari Feb 17, 2025
358d057
Improve MockZooKeeper shutdown
lhotari Feb 17, 2025
c3f24b1
Remove unnecessary parameter
lhotari Feb 17, 2025
9b58676
Reject tasks if stopped
lhotari Feb 17, 2025
70f6321
Improve exception message
lhotari Feb 17, 2025
2c45034
Fix shutdown
lhotari Feb 17, 2025
a631efa
Avoid unnecessary broker restarts in BrokerServiceLookupTest
lhotari Feb 17, 2025
2c7b042
Properly implement reusing TestZKServer in another context
lhotari Feb 17, 2025
e219043
Remove unnecessary synchronization
lhotari Feb 17, 2025
aa6274f
Fix parent handling
lhotari Feb 17, 2025
d0a0505
Refactor
lhotari Feb 17, 2025
4ceecad
Fix pulsar-metadata test resource leaks
lhotari Feb 17, 2025
06e745b
Fix one more leak
lhotari Feb 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -285,15 +285,15 @@ public static CompletableFuture<Set<String>> applyNamespacePoliciesAsync(
public static String getBundleRangeFromBundleName(String bundleName) {
// the bundle format is property/cluster/namespace/0x00000000_0xFFFFFFFF
int pos = bundleName.lastIndexOf("/");
checkArgument(pos != -1);
checkArgument(pos != -1, "Invalid bundle name format: %s", bundleName);
return bundleName.substring(pos + 1);
}

// From a full bundle name, extract the namespace name.
public static String getNamespaceNameFromBundleName(String bundleName) {
// the bundle format is property/cluster/namespace/0x00000000_0xFFFFFFFF
int pos = bundleName.lastIndexOf('/');
checkArgument(pos != -1);
checkArgument(pos != -1, "Invalid bundle name format: %s", bundleName);
return bundleName.substring(0, pos);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ private void updateBundleData() {
for (String bundle : bundleData.keySet()) {
if (!activeBundles.contains(bundle)){
bundleData.remove(bundle);
if (pulsar.getLeaderElectionService().isLeader()){
if (pulsar.getLeaderElectionService() != null && pulsar.getLeaderElectionService().isLeader()){
deleteBundleDataFromMetadataStore(bundle);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1012,7 +1012,7 @@ protected static boolean isLeaderBroker(PulsarService pulsar) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return true;
}
return pulsar.getLeaderElectionService().isLeader();
return pulsar.getLeaderElectionService() != null && pulsar.getLeaderElectionService().isLeader();
}

public void validateTenantOperation(String tenant, TenantOperation operation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,11 @@ public int size() {

public void validateBundle(NamespaceBundle nsBundle) throws Exception {
int idx = Arrays.binarySearch(partitions, nsBundle.getLowerEndpoint());
checkArgument(idx >= 0, "Cannot find bundle in the bundles list");
checkArgument(nsBundle.getUpperEndpoint().equals(bundles.get(idx).getUpperEndpoint()),
"Invalid upper boundary for bundle");
checkArgument(idx >= 0, "Cannot find bundle %s in the bundles list", nsBundle);
NamespaceBundle foundBundle = bundles.get(idx);
Long upperEndpoint = foundBundle.getUpperEndpoint();
checkArgument(nsBundle.getUpperEndpoint().equals(upperEndpoint),
"Invalid upper boundary for bundle %s. Expected upper boundary of %s", nsBundle, foundBundle);
}

public NamespaceBundle getFullBundle() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ public static String getTlsFileForClient(String name) {

private final List<AutoCloseable> closeables = new ArrayList<>();

// Set to true in test's constructor to use a real Zookeeper (TestZKServer)
protected boolean useTestZookeeper;

public MockedPulsarServiceBaseTest() {
resetConfig();
}
Expand Down Expand Up @@ -363,7 +366,14 @@ protected void afterPulsarStart(PulsarService pulsar) throws Exception {
* @throws Exception if an error occurs
*/
protected void restartBroker() throws Exception {
restartBroker(null);
}

protected void restartBroker(Consumer<ServiceConfiguration> configurationChanger) throws Exception {
stopBroker();
if (configurationChanger != null) {
configurationChanger.accept(conf);
}
startBroker();
if (pulsarClient == null) {
pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
Expand Down Expand Up @@ -461,7 +471,6 @@ protected PulsarTestContext.Builder createPulsarTestContextBuilder(ServiceConfig
PulsarTestContext.Builder builder = PulsarTestContext.builder()
.spyByDefault()
.config(conf)
.withMockZookeeper(true)
.pulsarServiceCustomizer(pulsarService -> {
try {
beforePulsarStart(pulsarService);
Expand All @@ -470,9 +479,25 @@ protected PulsarTestContext.Builder createPulsarTestContextBuilder(ServiceConfig
}
})
.brokerServiceCustomizer(this::customizeNewBrokerService);
configureMetadataStores(builder);
return builder;
}

/**
* Configures the metadata stores for the PulsarTestContext.Builder instance.
* Set useTestZookeeper to true in the test's constructor to use TestZKServer which is a real ZooKeeper
* implementation.
*
* @param builder the PulsarTestContext.Builder instance to configure
*/
protected void configureMetadataStores(PulsarTestContext.Builder builder) {
if (useTestZookeeper) {
builder.withTestZookeeper();
} else {
builder.withMockZookeeper(true);
}
}

protected PulsarTestContext createAdditionalPulsarTestContext(ServiceConfiguration conf) throws Exception {
return createAdditionalPulsarTestContext(conf, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@

package org.apache.pulsar.broker.testcontext;

import com.google.common.util.concurrent.MoreExecutors;
import io.netty.channel.EventLoopGroup;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
Expand All @@ -37,6 +35,7 @@
import lombok.Builder;
import lombok.Getter;
import lombok.Singular;
import lombok.SneakyThrows;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
Expand Down Expand Up @@ -65,16 +64,19 @@
import org.apache.pulsar.compaction.CompactionServiceFactory;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
import org.apache.pulsar.metadata.TestZKServer;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.MockZooKeeperSession;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.jetbrains.annotations.NotNull;
import org.mockito.Mockito;
import org.mockito.internal.util.MockUtil;
Expand Down Expand Up @@ -161,6 +163,10 @@ public class PulsarTestContext implements AutoCloseable {

private final MockZooKeeper mockZooKeeperGlobal;

private final TestZKServer testZKServer;

private final TestZKServer testZKServerGlobal;

private final SpyConfig spyConfig;

private final boolean startable;
Expand Down Expand Up @@ -409,6 +415,11 @@ public Builder reuseMockBookkeeperAndMetadataStores(PulsarTestContext otherConte
if (otherContext.getMockZooKeeperGlobal() != null) {
mockZooKeeperGlobal(otherContext.getMockZooKeeperGlobal());
}
} else if (otherContext.getTestZKServer() != null) {
testZKServer(otherContext.getTestZKServer());
if (otherContext.getTestZKServerGlobal() != null) {
testZKServerGlobal(otherContext.getTestZKServerGlobal());
}
} else {
localMetadataStore(NonClosingProxyHandler.createNonClosingProxy(otherContext.getLocalMetadataStore(),
MetadataStoreExtended.class
Expand Down Expand Up @@ -476,17 +487,56 @@ public Builder withMockZookeeper(boolean useSeparateGlobalZk) {
}

private MockZooKeeper createMockZooKeeper() throws Exception {
MockZooKeeper zk = MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService());
List<ACL> dummyAclList = new ArrayList<>(0);
MockZooKeeper zk = MockZooKeeper.newInstance();
initializeZookeeper(zk);
registerCloseable(zk::shutdown);
return zk;
}

private static void initializeZookeeper(ZooKeeper zk) throws KeeperException, InterruptedException {
ZkUtils.createFullPathOptimistic(zk, "/ledgers/available/192.168.1.1:" + 5000,
"".getBytes(StandardCharsets.UTF_8), dummyAclList, CreateMode.PERSISTENT);
"".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

zk.create("/ledgers/LAYOUT", "1\nflat:1".getBytes(StandardCharsets.UTF_8), dummyAclList,
zk.create("/ledgers/LAYOUT", "1\nflat:1".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}

registerCloseable(zk::shutdown);
return zk;
/**
* Configure this PulsarTestContext to use a test ZooKeeper instance which is
* shared for both the local and configuration metadata stores.
*
* @return the builder
*/
public Builder withTestZookeeper() {
return withTestZookeeper(false);
}

/**
* Configure this PulsarTestContext to use a test ZooKeeper instance.
*
* @param useSeparateGlobalZk if true, the global (configuration) zookeeper will be a separate instance
* @return the builder
*/
public Builder withTestZookeeper(boolean useSeparateGlobalZk) {
try {
testZKServer(createTestZookeeper());
if (useSeparateGlobalZk) {
testZKServerGlobal(createTestZookeeper());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return this;
}

private TestZKServer createTestZookeeper() throws Exception {
TestZKServer testZKServer = new TestZKServer();
try (ZooKeeper zkc = new ZooKeeper(testZKServer.getConnectionString(), 5000, event -> {
})) {
initializeZookeeper(zkc);
}
registerCloseable(testZKServer);
return testZKServer;
}

/**
Expand Down Expand Up @@ -676,6 +726,20 @@ private void initializeCommonPulsarServices(SpyConfig spyConfig) {
configurationMetadataStore(mockZookeeperMetadataStore);
}
}
} else if (super.testZKServer != null) {
MetadataStoreExtended testZookeeperMetadataStore =
createTestZookeeperMetadataStore(super.testZKServer, MetadataStoreConfig.METADATA_STORE);
if (super.localMetadataStore == null) {
localMetadataStore(testZookeeperMetadataStore);
}
if (super.configurationMetadataStore == null) {
if (super.testZKServerGlobal != null) {
configurationMetadataStore(createTestZookeeperMetadataStore(super.testZKServerGlobal,
MetadataStoreConfig.CONFIGURATION_METADATA_STORE));
} else {
configurationMetadataStore(testZookeeperMetadataStore);
}
}
} else {
try {
MetadataStoreExtended store = MetadataStoreFactoryImpl.createExtended("memory:local",
Expand Down Expand Up @@ -720,6 +784,17 @@ private MetadataStoreExtended createMockZookeeperMetadataStore(MockZooKeeper moc
return nonClosingProxy;
}

@SneakyThrows
private MetadataStoreExtended createTestZookeeperMetadataStore(TestZKServer zkServer,
String metadataStoreName) {
MetadataStoreExtended store = MetadataStoreExtended.create("zk:" + zkServer.getConnectionString(),
MetadataStoreConfig.builder().metadataStoreName(metadataStoreName).build());
registerCloseable(store);
MetadataStoreExtended nonClosingProxy =
NonClosingProxyHandler.createNonClosingProxy(store, MetadataStoreExtended.class);
return nonClosingProxy;
}

protected abstract void initializePulsarServices(SpyConfig spyConfig, Builder builder);
}

Expand Down
Loading
Loading