Skip to content

Commit

Permalink
Merge branch 'master' into fix_incorrect_blockedConsumerOnUnackedMsgs
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari authored Feb 13, 2025
2 parents 022f493 + d3ea0ee commit 177c3b7
Show file tree
Hide file tree
Showing 106 changed files with 2,250 additions and 635 deletions.
8 changes: 7 additions & 1 deletion .github/workflows/pulsar-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1494,15 +1494,21 @@ jobs:
restore-keys: |
owasp-dependency-check-data-
- name: Log warning when skipped
if: ${{ !steps.restore-owasp-dependency-check-data.outputs.cache-matched-key }}
run: |
echo "::warning::OWASP Dependency Check was skipped since the OWASP Dependency check data wasn't found in the cache. Run ci-owasp-dependency-check.yaml workflow to update the cache."
# Projects dependent on flume, hdfs, and hbase currently excluded from the scan.
- name: trigger dependency check
if: ${{ steps.restore-owasp-dependency-check-data.outputs.cache-matched-key }}
run: |
mvn -B -ntp verify -PskipDocker,skip-all,owasp-dependency-check -Dcheckstyle.skip=true -DskipTests \
-pl '!distribution/server,!distribution/io,!distribution/offloaders,!tiered-storage/file-system,!pulsar-io/flume,!pulsar-io/hbase,!pulsar-io/hdfs3,!pulsar-io/docs,!pulsar-io/jdbc/openmldb'
- name: Upload report
uses: actions/upload-artifact@v4
if: ${{ cancelled() || failure() }}
if: ${{ steps.restore-owasp-dependency-check-data.outputs.cache-matched-key && (cancelled() || failure()) }}
continue-on-error: true
with:
name: dependency report
Expand Down
2 changes: 1 addition & 1 deletion buildtools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
<license-maven-plugin.version>4.1</license-maven-plugin.version>
<puppycrawl.checkstyle.version>10.14.2</puppycrawl.checkstyle.version>
<maven-checkstyle-plugin.version>3.1.2</maven-checkstyle-plugin.version>
<netty.version>4.1.117.Final</netty.version>
<netty.version>4.1.118.Final</netty.version>
<guice.version>4.2.3</guice.version>
<guava.version>32.1.2-jre</guava.version>
<ant.version>1.10.12</ant.version>
Expand Down
54 changes: 27 additions & 27 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -293,33 +293,33 @@ The Apache Software License, Version 2.0
- org.apache.commons-commons-lang3-3.11.jar
- org.apache.commons-commons-text-1.10.0.jar
* Netty
- io.netty-netty-buffer-4.1.117.Final.jar
- io.netty-netty-codec-4.1.117.Final.jar
- io.netty-netty-codec-dns-4.1.117.Final.jar
- io.netty-netty-codec-http-4.1.117.Final.jar
- io.netty-netty-codec-http2-4.1.117.Final.jar
- io.netty-netty-codec-socks-4.1.117.Final.jar
- io.netty-netty-codec-haproxy-4.1.117.Final.jar
- io.netty-netty-common-4.1.117.Final.jar
- io.netty-netty-handler-4.1.117.Final.jar
- io.netty-netty-handler-proxy-4.1.117.Final.jar
- io.netty-netty-resolver-4.1.117.Final.jar
- io.netty-netty-resolver-dns-4.1.117.Final.jar
- io.netty-netty-resolver-dns-classes-macos-4.1.117.Final.jar
- io.netty-netty-resolver-dns-native-macos-4.1.117.Final-osx-aarch_64.jar
- io.netty-netty-resolver-dns-native-macos-4.1.117.Final-osx-x86_64.jar
- io.netty-netty-transport-4.1.117.Final.jar
- io.netty-netty-transport-classes-epoll-4.1.117.Final.jar
- io.netty-netty-transport-native-epoll-4.1.117.Final-linux-aarch_64.jar
- io.netty-netty-transport-native-epoll-4.1.117.Final-linux-x86_64.jar
- io.netty-netty-transport-native-unix-common-4.1.117.Final.jar
- io.netty-netty-tcnative-boringssl-static-2.0.69.Final.jar
- io.netty-netty-tcnative-boringssl-static-2.0.69.Final-linux-aarch_64.jar
- io.netty-netty-tcnative-boringssl-static-2.0.69.Final-linux-x86_64.jar
- io.netty-netty-tcnative-boringssl-static-2.0.69.Final-osx-aarch_64.jar
- io.netty-netty-tcnative-boringssl-static-2.0.69.Final-osx-x86_64.jar
- io.netty-netty-tcnative-boringssl-static-2.0.69.Final-windows-x86_64.jar
- io.netty-netty-tcnative-classes-2.0.69.Final.jar
- io.netty-netty-buffer-4.1.118.Final.jar
- io.netty-netty-codec-4.1.118.Final.jar
- io.netty-netty-codec-dns-4.1.118.Final.jar
- io.netty-netty-codec-http-4.1.118.Final.jar
- io.netty-netty-codec-http2-4.1.118.Final.jar
- io.netty-netty-codec-socks-4.1.118.Final.jar
- io.netty-netty-codec-haproxy-4.1.118.Final.jar
- io.netty-netty-common-4.1.118.Final.jar
- io.netty-netty-handler-4.1.118.Final.jar
- io.netty-netty-handler-proxy-4.1.118.Final.jar
- io.netty-netty-resolver-4.1.118.Final.jar
- io.netty-netty-resolver-dns-4.1.118.Final.jar
- io.netty-netty-resolver-dns-classes-macos-4.1.118.Final.jar
- io.netty-netty-resolver-dns-native-macos-4.1.118.Final-osx-aarch_64.jar
- io.netty-netty-resolver-dns-native-macos-4.1.118.Final-osx-x86_64.jar
- io.netty-netty-transport-4.1.118.Final.jar
- io.netty-netty-transport-classes-epoll-4.1.118.Final.jar
- io.netty-netty-transport-native-epoll-4.1.118.Final-linux-aarch_64.jar
- io.netty-netty-transport-native-epoll-4.1.118.Final-linux-x86_64.jar
- io.netty-netty-transport-native-unix-common-4.1.118.Final.jar
- io.netty-netty-tcnative-boringssl-static-2.0.70.Final.jar
- io.netty-netty-tcnative-boringssl-static-2.0.70.Final-linux-aarch_64.jar
- io.netty-netty-tcnative-boringssl-static-2.0.70.Final-linux-x86_64.jar
- io.netty-netty-tcnative-boringssl-static-2.0.70.Final-osx-aarch_64.jar
- io.netty-netty-tcnative-boringssl-static-2.0.70.Final-osx-x86_64.jar
- io.netty-netty-tcnative-boringssl-static-2.0.70.Final-windows-x86_64.jar
- io.netty-netty-tcnative-classes-2.0.70.Final.jar
- io.netty.incubator-netty-incubator-transport-classes-io_uring-0.0.26.Final.jar
- io.netty.incubator-netty-incubator-transport-native-io_uring-0.0.26.Final-linux-x86_64.jar
- io.netty.incubator-netty-incubator-transport-native-io_uring-0.0.26.Final-linux-aarch_64.jar
Expand Down
52 changes: 26 additions & 26 deletions distribution/shell/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -347,35 +347,35 @@ The Apache Software License, Version 2.0
- commons-text-1.10.0.jar
- commons-compress-1.26.0.jar
* Netty
- netty-buffer-4.1.117.Final.jar
- netty-codec-4.1.117.Final.jar
- netty-codec-dns-4.1.117.Final.jar
- netty-codec-http-4.1.117.Final.jar
- netty-codec-socks-4.1.117.Final.jar
- netty-codec-haproxy-4.1.117.Final.jar
- netty-common-4.1.117.Final.jar
- netty-handler-4.1.117.Final.jar
- netty-handler-proxy-4.1.117.Final.jar
- netty-resolver-4.1.117.Final.jar
- netty-resolver-dns-4.1.117.Final.jar
- netty-transport-4.1.117.Final.jar
- netty-transport-classes-epoll-4.1.117.Final.jar
- netty-transport-native-epoll-4.1.117.Final-linux-aarch_64.jar
- netty-transport-native-epoll-4.1.117.Final-linux-x86_64.jar
- netty-transport-native-unix-common-4.1.117.Final.jar
- netty-tcnative-boringssl-static-2.0.69.Final.jar
- netty-tcnative-boringssl-static-2.0.69.Final-linux-aarch_64.jar
- netty-tcnative-boringssl-static-2.0.69.Final-linux-x86_64.jar
- netty-tcnative-boringssl-static-2.0.69.Final-osx-aarch_64.jar
- netty-tcnative-boringssl-static-2.0.69.Final-osx-x86_64.jar
- netty-tcnative-boringssl-static-2.0.69.Final-windows-x86_64.jar
- netty-tcnative-classes-2.0.69.Final.jar
- netty-buffer-4.1.118.Final.jar
- netty-codec-4.1.118.Final.jar
- netty-codec-dns-4.1.118.Final.jar
- netty-codec-http-4.1.118.Final.jar
- netty-codec-socks-4.1.118.Final.jar
- netty-codec-haproxy-4.1.118.Final.jar
- netty-common-4.1.118.Final.jar
- netty-handler-4.1.118.Final.jar
- netty-handler-proxy-4.1.118.Final.jar
- netty-resolver-4.1.118.Final.jar
- netty-resolver-dns-4.1.118.Final.jar
- netty-transport-4.1.118.Final.jar
- netty-transport-classes-epoll-4.1.118.Final.jar
- netty-transport-native-epoll-4.1.118.Final-linux-aarch_64.jar
- netty-transport-native-epoll-4.1.118.Final-linux-x86_64.jar
- netty-transport-native-unix-common-4.1.118.Final.jar
- netty-tcnative-boringssl-static-2.0.70.Final.jar
- netty-tcnative-boringssl-static-2.0.70.Final-linux-aarch_64.jar
- netty-tcnative-boringssl-static-2.0.70.Final-linux-x86_64.jar
- netty-tcnative-boringssl-static-2.0.70.Final-osx-aarch_64.jar
- netty-tcnative-boringssl-static-2.0.70.Final-osx-x86_64.jar
- netty-tcnative-boringssl-static-2.0.70.Final-windows-x86_64.jar
- netty-tcnative-classes-2.0.70.Final.jar
- netty-incubator-transport-classes-io_uring-0.0.26.Final.jar
- netty-incubator-transport-native-io_uring-0.0.26.Final-linux-aarch_64.jar
- netty-incubator-transport-native-io_uring-0.0.26.Final-linux-x86_64.jar
- netty-resolver-dns-classes-macos-4.1.117.Final.jar
- netty-resolver-dns-native-macos-4.1.117.Final-osx-aarch_64.jar
- netty-resolver-dns-native-macos-4.1.117.Final-osx-x86_64.jar
- netty-resolver-dns-classes-macos-4.1.118.Final.jar
- netty-resolver-dns-native-macos-4.1.118.Final-osx-aarch_64.jar
- netty-resolver-dns-native-macos-4.1.118.Final-osx-x86_64.jar
* Prometheus client
- simpleclient-0.16.0.jar
- simpleclient_log4j2-0.16.0.jar
Expand Down
2 changes: 1 addition & 1 deletion docker/pulsar/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# under the License.
#

ARG ALPINE_VERSION=3.20
ARG ALPINE_VERSION=3.21
ARG IMAGE_JDK_MAJOR_VERSION=21

# First create a stage with just the Pulsar tarball and scripts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -802,33 +802,41 @@ public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback
buffer.retain();

// Jump to specific thread to avoid contention from writers writing from different threads
executor.execute(() -> {
OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx,
currentLedgerTimeoutTriggered);
internalAsyncAddEntry(addOperation);
});
final var addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx,
currentLedgerTimeoutTriggered);
var added = false;
try {
// Use synchronized to ensure if `addOperation` is added to queue and fails later, it will be the first
// element in `pendingAddEntries`.
synchronized (this) {
if (managedLedgerInterceptor != null) {
managedLedgerInterceptor.beforeAddEntry(addOperation, addOperation.getNumberOfMessages());
}
final var state = STATE_UPDATER.get(this);
beforeAddEntryToQueue(state);
pendingAddEntries.add(addOperation);
added = true;
afterAddEntryToQueue(state, addOperation);
}
} catch (Throwable throwable) {
if (!added) {
addOperation.failed(ManagedLedgerException.getManagedLedgerException(throwable));
} // else: all elements of `pendingAddEntries` will fail in another thread
}
}

protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
if (!beforeAddEntry(addOperation)) {
return;
}
final State state = STATE_UPDATER.get(this);
protected void beforeAddEntryToQueue(State state) throws ManagedLedgerException {
if (state.isFenced()) {
addOperation.failed(new ManagedLedgerFencedException());
return;
} else if (state == State.Terminated) {
addOperation.failed(new ManagedLedgerTerminatedException("Managed ledger was already terminated"));
return;
} else if (state == State.Closed) {
addOperation.failed(new ManagedLedgerAlreadyClosedException("Managed ledger was already closed"));
return;
} else if (state == State.WriteFailed) {
addOperation.failed(new ManagedLedgerAlreadyClosedException("Waiting to recover from failure"));
return;
throw new ManagedLedgerFencedException();
}
switch (state) {
case Terminated -> throw new ManagedLedgerTerminatedException("Managed ledger was already terminated");
case Closed -> throw new ManagedLedgerAlreadyClosedException("Managed ledger was already closed");
case WriteFailed -> throw new ManagedLedgerAlreadyClosedException("Waiting to recover from failure");
}
pendingAddEntries.add(addOperation);
}

protected void afterAddEntryToQueue(State state, OpAddEntry addOperation) throws ManagedLedgerException {
if (state == State.ClosingLedger || state == State.CreatingLedger) {
// We don't have a ready ledger to write into
// We are waiting for a new ledger to be created
Expand Down Expand Up @@ -1144,16 +1152,17 @@ public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cu
return cachedCursor;
}

NonDurableCursorImpl cursor = new NonDurableCursorImpl(bookKeeper, this, cursorName,
startCursorPosition, initialPosition, isReadCompacted);
cursor.setActive();

log.info("[{}] Opened new cursor: {}", name, cursor);
// The backlog of a non-durable cursor could be incorrect if the cursor is created before `internalTrimLedgers`
// and added to the managed ledger after `internalTrimLedgers`.
// For more details, see https://github.com/apache/pulsar/pull/23951.
synchronized (this) {
NonDurableCursorImpl cursor = new NonDurableCursorImpl(bookKeeper, this, cursorName,
startCursorPosition, initialPosition, isReadCompacted);
cursor.setActive();
log.info("[{}] Opened new cursor: {}", name, cursor);
addCursor(cursor);
return cursor;
}

return cursor;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,8 +456,13 @@ public ManagedLedgerInfo parseManagedLedgerInfo(byte[] data) throws InvalidProto
try {
MLDataFormats.ManagedLedgerInfoMetadata metadata =
MLDataFormats.ManagedLedgerInfoMetadata.parseFrom(metadataBytes);
return ManagedLedgerInfo.parseFrom(getCompressionCodec(metadata.getCompressionType())
.decode(byteBuf, metadata.getUncompressedSize()).nioBuffer());
ByteBuf uncompressed = getCompressionCodec(metadata.getCompressionType())
.decode(byteBuf, metadata.getUncompressedSize());
try {
return ManagedLedgerInfo.parseFrom(uncompressed.nioBuffer());
} finally {
uncompressed.release();
}
} catch (Exception e) {
log.error("Failed to parse managedLedgerInfo metadata, "
+ "fall back to parse managedLedgerInfo directly.", e);
Expand All @@ -478,8 +483,13 @@ public ManagedCursorInfo parseManagedCursorInfo(byte[] data) throws InvalidProto
try {
MLDataFormats.ManagedCursorInfoMetadata metadata =
MLDataFormats.ManagedCursorInfoMetadata.parseFrom(metadataBytes);
return ManagedCursorInfo.parseFrom(getCompressionCodec(metadata.getCompressionType())
.decode(byteBuf, metadata.getUncompressedSize()).nioBuffer());
ByteBuf uncompressed = getCompressionCodec(metadata.getCompressionType())
.decode(byteBuf, metadata.getUncompressedSize());
try {
return ManagedCursorInfo.parseFrom(uncompressed.nioBuffer());
} finally {
uncompressed.release();
}
} catch (Exception e) {
log.error("Failed to parse ManagedCursorInfo metadata, "
+ "fall back to parse ManagedCursorInfo directly", e);
Expand All @@ -503,29 +513,23 @@ private byte[] compressManagedInfo(byte[] info, byte[] metadata, int metadataSer
if (compressionType == null || compressionType.equals(CompressionType.NONE)) {
return info;
}
ByteBuf metadataByteBuf = null;
ByteBuf encodeByteBuf = null;

CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
try {
metadataByteBuf = PulsarByteBufAllocator.DEFAULT.buffer(metadataSerializedSize + 6,
ByteBuf metadataByteBuf = PulsarByteBufAllocator.DEFAULT.buffer(metadataSerializedSize + 6,
metadataSerializedSize + 6);
metadataByteBuf.writeShort(MAGIC_MANAGED_INFO_METADATA);
metadataByteBuf.writeInt(metadataSerializedSize);
metadataByteBuf.writeBytes(metadata);
encodeByteBuf = getCompressionCodec(compressionType)
ByteBuf encodeByteBuf = getCompressionCodec(compressionType)
.encode(Unpooled.wrappedBuffer(info));
CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
compositeByteBuf.addComponent(true, metadataByteBuf);
compositeByteBuf.addComponent(true, encodeByteBuf);
byte[] dataBytes = new byte[compositeByteBuf.readableBytes()];
compositeByteBuf.readBytes(dataBytes);
return dataBytes;
} finally {
if (metadataByteBuf != null) {
metadataByteBuf.release();
}
if (encodeByteBuf != null) {
encodeByteBuf.release();
}
compositeByteBuf.release();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,25 +223,23 @@ private void initLastConfirmedEntry() {
}

@Override
protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
if (!beforeAddEntry(addOperation)) {
return;
}
protected void beforeAddEntryToQueue(State state) throws ManagedLedgerException {
if (state != State.LedgerOpened) {
addOperation.failed(new ManagedLedgerException("Managed ledger is not opened"));
return;
throw new ManagedLedgerException("Managed ledger is not opened");
}
}

@Override
protected void afterAddEntryToQueue(State state, OpAddEntry addOperation) throws ManagedLedgerException {
if (addOperation.getCtx() == null || !(addOperation.getCtx() instanceof Position position)) {
addOperation.failed(new ManagedLedgerException("Illegal addOperation context object."));
return;
pendingAddEntries.poll();
throw new ManagedLedgerException("Illegal addOperation context object.");
}

if (log.isDebugEnabled()) {
log.debug("[{}] Add entry into shadow ledger lh={} entries={}, pos=({},{})",
name, currentLedger.getId(), currentLedgerEntries, position.getLedgerId(), position.getEntryId());
}
pendingAddEntries.add(addOperation);
if (position.getLedgerId() <= currentLedger.getId()) {
// Write into lastLedger
if (position.getLedgerId() == currentLedger.getId()) {
Expand Down
Loading

0 comments on commit 177c3b7

Please sign in to comment.