From 42df82b0be1d0ed027e05062f36c438e3bf32308 Mon Sep 17 00:00:00 2001 From: lzyy2024 <2972013149@qq.com> Date: Tue, 4 Feb 2025 12:37:50 +0800 Subject: [PATCH 1/4] all --- be/src/vec/functions/function_compress.cpp | 242 ++++++++++++++++++ .../vec/functions/simple_function_factory.h | 2 + .../doris/catalog/BuiltinScalarFunctions.java | 6 +- .../functions/scalar/Compress.java | 69 +++++ .../functions/scalar/Uncompress.java | 69 +++++ .../visitor/ScalarFunctionVisitor.java | 10 + .../test_compress_uncompress.out | 38 +++ .../test_compress_uncompress.groovy | 139 ++++++++++ 8 files changed, 574 insertions(+), 1 deletion(-) create mode 100644 be/src/vec/functions/function_compress.cpp create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Compress.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Uncompress.java create mode 100644 regression-test/data/query_p0/sql_functions/string_functions/test_compress_uncompress.out create mode 100644 regression-test/suites/query_p0/sql_functions/string_functions/test_compress_uncompress.groovy diff --git a/be/src/vec/functions/function_compress.cpp b/be/src/vec/functions/function_compress.cpp new file mode 100644 index 00000000000000..0f10af4b8ae66f --- /dev/null +++ b/be/src/vec/functions/function_compress.cpp @@ -0,0 +1,242 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/status.h" +#include "util/block_compression.h" +#include "util/faststring.h" +#include "vec/aggregate_functions/aggregate_function.h" +#include "vec/columns/column.h" +#include "vec/columns/column_nullable.h" +#include "vec/columns/column_string.h" +#include "vec/columns/column_vector.h" +#include "vec/columns/columns_number.h" +#include "vec/common/assert_cast.h" +#include "vec/core/block.h" +#include "vec/core/column_numbers.h" +#include "vec/core/column_with_type_and_name.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/data_types/data_type_number.h" +#include "vec/data_types/data_type_string.h" +#include "vec/functions/function.h" +#include "vec/functions/simple_function_factory.h" + +namespace doris { +class FunctionContext; +} // namespace doris + +namespace doris::vectorized { + +class FunctionCompress : public IFunction { + static constexpr std::array HEX_ITOC = {'0', '1', '2', '3', '4', '5', '6', '7', + '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'}; + +public: + static constexpr auto name = "compress"; + static FunctionPtr create() { return std::make_shared(); } + + String get_name() const override { return name; } + + size_t get_number_of_arguments() const override { return 1; } + + DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { + return std::make_shared(); + } + + Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, + uint32_t result, size_t input_rows_count) const override { + // Get the compression algorithm object + BlockCompressionCodec* compression_codec; + RETURN_IF_ERROR(get_block_compression_codec(segment_v2::CompressionTypePB::ZLIB, + &compression_codec)); + + const auto& arg_column = + assert_cast(*block.get_by_position(arguments[0]).column); + auto result_column = ColumnString::create(); + + auto& arg_data = arg_column.get_chars(); + auto& arg_offset = arg_column.get_offsets(); + const char* arg_begin = reinterpret_cast(arg_data.data()); + + auto& col_data = result_column->get_chars(); + auto& col_offset = result_column->get_offsets(); + col_offset.resize(input_rows_count); + + faststring compressed_str; + Slice data; + + // When the original string is large, the result is roughly this value + size_t total = arg_offset[input_rows_count - 1]; + col_data.reserve(total / 1000); + + for (size_t row = 0; row < input_rows_count; row++) { + size_t length = arg_offset[row] - arg_offset[row - 1]; + data = Slice(arg_begin + arg_offset[row - 1], length); + + size_t idx = col_data.size(); + if (!length) { // data is '' + col_data.resize(col_data.size() + 2); + col_data[idx] = '0', col_data[idx + 1] = 'x'; + col_offset[row] = col_offset[row - 1] + 2; + continue; + } + + // Z_MEM_ERROR and Z_BUF_ERROR are already handled in compress, making sure st is always Z_OK + auto st = compression_codec->compress(data, &compressed_str); + col_data.resize(col_data.size() + compressed_str.size()); + + // first ten digits represent the length of the uncompressed string + col_data[idx] = '0', col_data[idx + 1] = 'x'; + for (size_t i = 0; i < 4; i++) { + unsigned char byte = (length >> (i * 8)) & 0xFF; + col_data[idx + 2 + i * 2] = HEX_ITOC[byte >> 4]; // higher four + col_data[idx + 3 + i * 2] = HEX_ITOC[byte & 0x0F]; + } + idx += 10; + + // The length of compress_str is not known in advance, so it cannot be compressed directly into col_data + unsigned char* src = compressed_str.data(); + for (size_t i = 0; i < compressed_str.size(); idx++, i++, src++) { + col_data[idx] = *src; + } + col_offset[row] = col_offset[row - 1] + 10 + compressed_str.size(); + } + + block.replace_by_position(result, std::move(result_column)); + return Status::OK(); + } +}; + +class FunctionUncompress : public IFunction { +public: + static constexpr auto name = "uncompress"; + static FunctionPtr create() { return std::make_shared(); } + + String get_name() const override { return name; } + + size_t get_number_of_arguments() const override { return 1; } + + DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { + return make_nullable(std::make_shared()); + } + + Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, + uint32_t result, size_t input_rows_count) const override { + // Get the compression algorithm object + BlockCompressionCodec* compression_codec; + RETURN_IF_ERROR(get_block_compression_codec(segment_v2::CompressionTypePB::ZLIB, + &compression_codec)); + + const auto& arg_column = + assert_cast(*block.get_by_position(arguments[0]).column); + + auto& arg_data = arg_column.get_chars(); + auto& arg_offset = arg_column.get_offsets(); + const char* arg_begin = reinterpret_cast(arg_data.data()); + + auto result_column = ColumnString::create(); + auto& col_data = result_column->get_chars(); + auto& col_offset = result_column->get_offsets(); + col_offset.resize(input_rows_count); + + auto null_column = ColumnUInt8::create(input_rows_count); + auto& null_map = null_column->get_data(); + + std::string uncompressed; + Slice data; + Slice uncompressed_slice; + + size_t total = arg_offset[input_rows_count - 1]; + col_data.reserve(total * 1000); + + for (size_t row = 0; row < input_rows_count; row++) { + null_map[row] = false; + data = Slice(arg_begin + arg_offset[row - 1], arg_offset[row] - arg_offset[row - 1]); + size_t data_length = arg_offset[row] - arg_offset[row - 1]; + + if (data_length == 0) { // The original data is '' + col_offset[row] = col_offset[row - 1]; + continue; + } + + bool illegal = false; + // The first ten digits are "0x" and length, followed by hexadecimal, each two digits is a byte + if (data_length < 10) { + illegal = true; + } else { + if (data[0] != '0' || data[1] != 'x') { + illegal = true; + } + for (size_t i = 2; i <= 9; i++) { + if (!std::isxdigit(data[i])) { + illegal = true; + } + } + } + + if (illegal) { // The top ten don't fit the rules + col_offset[row] = col_offset[row - 1]; + null_map[row] = true; + continue; + } + + unsigned int length = 0; + for (size_t i = 2; i <= 9; i += 2) { + unsigned char byte; + std::from_chars(data.data + i, data.data + i + 2, byte, 16); + length += (byte << (8 * (i / 2 - 1))); //Little Endian : 0x01000000 -> 1 + } + + size_t idx = col_data.size(); + col_data.resize(col_data.size() + length); + uncompressed_slice = Slice(col_data.data() + idx, length); + + Slice compressed_data(data.data + 10, data.size - 10); + auto st = compression_codec->decompress(compressed_data, &uncompressed_slice); + + if (!st.ok()) { // is not a legal compressed string + col_data.resize(col_data.size() - length); // remove compressed_data + col_offset[row] = col_offset[row - 1]; + null_map[row] = true; + continue; + } + col_offset[row] = col_offset[row - 1] + length; + } + + block.replace_by_position( + result, ColumnNullable::create(std::move(result_column), std::move(null_column))); + return Status::OK(); + } +}; + +void register_function_compress(SimpleFunctionFactory& factory) { + factory.register_function(); + factory.register_function(); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/functions/simple_function_factory.h b/be/src/vec/functions/simple_function_factory.h index 98f2917d163e31..46eca0cb419294 100644 --- a/be/src/vec/functions/simple_function_factory.h +++ b/be/src/vec/functions/simple_function_factory.h @@ -110,6 +110,7 @@ void register_function_ip(SimpleFunctionFactory& factory); void register_function_multi_match(SimpleFunctionFactory& factory); void register_function_split_by_regexp(SimpleFunctionFactory& factory); void register_function_assert_true(SimpleFunctionFactory& factory); +void register_function_compress(SimpleFunctionFactory& factory); void register_function_bit_test(SimpleFunctionFactory& factory); class SimpleFunctionFactory { @@ -301,6 +302,7 @@ class SimpleFunctionFactory { register_function_split_by_regexp(instance); register_function_assert_true(instance); register_function_bit_test(instance); + register_function_compress(instance); }); return instance; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinScalarFunctions.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinScalarFunctions.java index b173383ff0c6ab..233128015cc089 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinScalarFunctions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinScalarFunctions.java @@ -119,6 +119,7 @@ import org.apache.doris.nereids.trees.expressions.functions.scalar.Char; import org.apache.doris.nereids.trees.expressions.functions.scalar.CharacterLength; import org.apache.doris.nereids.trees.expressions.functions.scalar.Coalesce; +import org.apache.doris.nereids.trees.expressions.functions.scalar.Compress; import org.apache.doris.nereids.trees.expressions.functions.scalar.Concat; import org.apache.doris.nereids.trees.expressions.functions.scalar.ConcatWs; import org.apache.doris.nereids.trees.expressions.functions.scalar.ConnectionId; @@ -449,6 +450,7 @@ import org.apache.doris.nereids.trees.expressions.functions.scalar.Trim; import org.apache.doris.nereids.trees.expressions.functions.scalar.TrimIn; import org.apache.doris.nereids.trees.expressions.functions.scalar.Truncate; +import org.apache.doris.nereids.trees.expressions.functions.scalar.Uncompress; import org.apache.doris.nereids.trees.expressions.functions.scalar.Unhex; import org.apache.doris.nereids.trees.expressions.functions.scalar.UnixTimestamp; import org.apache.doris.nereids.trees.expressions.functions.scalar.Upper; @@ -974,7 +976,9 @@ public class BuiltinScalarFunctions implements FunctionHelper { scalar(YearsSub.class, "years_sub"), scalar(MultiMatch.class, "multi_match"), scalar(SessionUser.class, "session_user"), - scalar(LastQueryId.class, "last_query_id")); + scalar(LastQueryId.class, "last_query_id"), + scalar(Compress.class, "compress"), + scalar(Uncompress.class, "uncompress")); public static final BuiltinScalarFunctions INSTANCE = new BuiltinScalarFunctions(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Compress.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Compress.java new file mode 100644 index 00000000000000..9422d72bca793a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Compress.java @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.scalar; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature; +import org.apache.doris.nereids.trees.expressions.functions.PropagateNullable; +import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression; +import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; +import org.apache.doris.nereids.types.StringType; +import org.apache.doris.nereids.types.VarcharType; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; + +/** + * ScalarFunction 'compress'. + */ +public class Compress extends ScalarFunction + implements UnaryExpression, ExplicitlyCastableSignature, PropagateNullable { + + public static final List SIGNATURES = ImmutableList.of( + FunctionSignature.ret(VarcharType.SYSTEM_DEFAULT).args(VarcharType.SYSTEM_DEFAULT), + FunctionSignature.ret(StringType.INSTANCE).args(StringType.INSTANCE)); + + /** + * constructor with 1 argument. + */ + public Compress(Expression arg) { + super("compress", arg); + } + + /** + * withChildren. + */ + @Override + public Compress withChildren(List children) { + Preconditions.checkArgument(children.size() == 1); + return new Compress(children.get(0)); + } + + @Override + public List getSignatures() { + return SIGNATURES; + } + + @Override + public R accept(ExpressionVisitor visitor, C context) { + return visitor.visitCompress(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Uncompress.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Uncompress.java new file mode 100644 index 00000000000000..8726963f486fce --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/Uncompress.java @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.scalar; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.functions.AlwaysNullable; +import org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature; +import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression; +import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; +import org.apache.doris.nereids.types.StringType; +import org.apache.doris.nereids.types.VarcharType; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; + +/** + * ScalarFunction 'uncompress'. + */ +public class Uncompress extends ScalarFunction + implements UnaryExpression, ExplicitlyCastableSignature, AlwaysNullable { + + public static final List SIGNATURES = ImmutableList.of( + FunctionSignature.ret(VarcharType.SYSTEM_DEFAULT).args(VarcharType.SYSTEM_DEFAULT), + FunctionSignature.ret(StringType.INSTANCE).args(StringType.INSTANCE)); + + /** + * constructor with 1 argument. + */ + public Uncompress(Expression arg) { + super("uncompress", arg); + } + + /** + * withChildren. + */ + @Override + public Uncompress withChildren(List children) { + Preconditions.checkArgument(children.size() == 1); + return new Uncompress(children.get(0)); + } + + @Override + public List getSignatures() { + return SIGNATURES; + } + + @Override + public R accept(ExpressionVisitor visitor, C context) { + return visitor.visitUncompress(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/ScalarFunctionVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/ScalarFunctionVisitor.java index 1a41ba4f23eb97..4d24a57d64c0f9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/ScalarFunctionVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/ScalarFunctionVisitor.java @@ -126,6 +126,7 @@ import org.apache.doris.nereids.trees.expressions.functions.scalar.Char; import org.apache.doris.nereids.trees.expressions.functions.scalar.CharacterLength; import org.apache.doris.nereids.trees.expressions.functions.scalar.Coalesce; +import org.apache.doris.nereids.trees.expressions.functions.scalar.Compress; import org.apache.doris.nereids.trees.expressions.functions.scalar.Concat; import org.apache.doris.nereids.trees.expressions.functions.scalar.ConcatWs; import org.apache.doris.nereids.trees.expressions.functions.scalar.ConnectionId; @@ -446,6 +447,7 @@ import org.apache.doris.nereids.trees.expressions.functions.scalar.Trim; import org.apache.doris.nereids.trees.expressions.functions.scalar.TrimIn; import org.apache.doris.nereids.trees.expressions.functions.scalar.Truncate; +import org.apache.doris.nereids.trees.expressions.functions.scalar.Uncompress; import org.apache.doris.nereids.trees.expressions.functions.scalar.Unhex; import org.apache.doris.nereids.trees.expressions.functions.scalar.UnixTimestamp; import org.apache.doris.nereids.trees.expressions.functions.scalar.Upper; @@ -2328,4 +2330,12 @@ default R visitMultiMatch(MultiMatch multiMatch, C context) { default R visitLastQueryId(LastQueryId queryId, C context) { return visitScalarFunction(queryId, context); } + + default R visitCompress(Compress compress, C context) { + return visitScalarFunction(compress, context); + } + + default R visitUncompress(Uncompress uncompress, C context) { + return visitScalarFunction(uncompress, context); + } } diff --git a/regression-test/data/query_p0/sql_functions/string_functions/test_compress_uncompress.out b/regression-test/data/query_p0/sql_functions/string_functions/test_compress_uncompress.out new file mode 100644 index 00000000000000..bbcea601408ec8 --- /dev/null +++ b/regression-test/data/query_p0/sql_functions/string_functions/test_compress_uncompress.out @@ -0,0 +1,38 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !restore_original_data -- +1 Hello, world! +2 Doris测试中文字符 +4 \N +5 \N +6 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa + +-- !uncompress_null_input -- +3 \N + +-- !uncompress_invalid_data -- +5 \N + +-- !compress_empty_string -- +4 \N + +-- !compress_repeated_string -- +6 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa + +-- !compress_single_char -- +x + +-- !compress_null_text -- +\N + +-- !compress_large_repeated_string -- +aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa + +-- !compress_empty_string_direct -- +\N + +-- !compress_string_direct -- +Hello, world! + +-- !compress_numeric_direct -- +12345 + diff --git a/regression-test/suites/query_p0/sql_functions/string_functions/test_compress_uncompress.groovy b/regression-test/suites/query_p0/sql_functions/string_functions/test_compress_uncompress.groovy new file mode 100644 index 00000000000000..9c4df7b1ec97be --- /dev/null +++ b/regression-test/suites/query_p0/sql_functions/string_functions/test_compress_uncompress.groovy @@ -0,0 +1,139 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_compress_uncompress") { + // Drop the existing table + sql "DROP TABLE IF EXISTS test_compression" + + // Create the test table + sql """ + CREATE TABLE test_compression ( + k0 INT, -- Primary key + text_col STRING, -- String column for input data + binary_col STRING -- Binary column for compressed data + ) + DISTRIBUTED BY HASH(k0) + PROPERTIES ( + "replication_num" = "1" + ); + """ + + // Insert test data with various cases + sql """ + INSERT INTO test_compression VALUES + (1, 'Hello, world!', COMPRESS('Hello, world!')), -- Plain string + (2, 'Doris测试中文字符', COMPRESS('Doris测试中文字符')), -- Chinese characters + (3, NULL, NULL), -- Null values + (4, '', COMPRESS('')), -- Empty string + (5, NULL, 'invalid_compressed_data'), -- Invalid binary data + (6, REPEAT('a', 50), COMPRESS(REPEAT('a', 50))); -- Short repeated string + """ + + // Test 1: Verify that UNCOMPRESS can correctly restore the original data + order_qt_restore_original_data """ + SELECT + k0, + UNCOMPRESS(binary_col) AS decompressed_data + FROM test_compression + WHERE binary_col IS NOT NULL + ORDER BY k0; + """ + + // Test 2: Verify that UNCOMPRESS returns NULL for NULL input + order_qt_uncompress_null_input """ + SELECT + k0, + UNCOMPRESS(binary_col) AS decompressed_data + FROM test_compression + WHERE binary_col IS NULL + ORDER BY k0; + """ + + // Test 3: Verify that UNCOMPRESS handles invalid binary data gracefully + order_qt_uncompress_invalid_data """ + SELECT + k0, + UNCOMPRESS(binary_col) AS decompressed_data + FROM test_compression + WHERE k0 = 5 + ORDER BY k0; + """ + + // Test 4: Verify that COMPRESS and UNCOMPRESS work correctly with empty strings + order_qt_compress_empty_string """ + SELECT + k0, + UNCOMPRESS(binary_col) AS decompressed_data + FROM test_compression + WHERE k0 = 4 + ORDER BY k0; + """ + + // Test 5: Verify that COMPRESS and UNCOMPRESS work correctly with repeated strings + order_qt_compress_repeated_string """ + SELECT + k0, + UNCOMPRESS(binary_col) AS decompressed_data + FROM test_compression + WHERE k0 = 6 + ORDER BY k0; + """ + + // Additional tests using SELECT UNCOMPRESS(COMPRESS()) directly + + // Test 6: Verify that COMPRESS and UNCOMPRESS work with a single character string + order_qt_compress_single_char """ + SELECT + UNCOMPRESS(COMPRESS('x')) AS decompressed_data + LIMIT 1; + """ + + // Test 7: Verify that COMPRESS handles NULL text values correctly + order_qt_compress_null_text """ + SELECT + UNCOMPRESS(COMPRESS(NULL)) AS decompressed_data + LIMIT 1; + """ + + // Test 8: Verify that COMPRESS and UNCOMPRESS work with long repeated strings + order_qt_compress_large_repeated_string """ + SELECT + UNCOMPRESS(COMPRESS(REPEAT('a', 100))) AS decompressed_data + LIMIT 1; + """ + + // Test 9: Verify that COMPRESS and UNCOMPRESS work with an empty string + order_qt_compress_empty_string_direct """ + SELECT + UNCOMPRESS(COMPRESS('')) AS decompressed_data + LIMIT 1; + """ + + // Test 10: Verify that COMPRESS and UNCOMPRESS work with the string 'Hello, world!' + order_qt_compress_string_direct """ + SELECT + UNCOMPRESS(COMPRESS('Hello, world!')) AS decompressed_data + LIMIT 1; + """ + + // Test 11: Verify that COMPRESS and UNCOMPRESS work with a numeric value + order_qt_compress_numeric_direct """ + SELECT + UNCOMPRESS(COMPRESS('12345')) AS decompressed_data + LIMIT 1; + """ +} From 52d222d5db3f76a84e5406c1f11294bb31156192 Mon Sep 17 00:00:00 2001 From: lzyy2024 <2972013149@qq.com> Date: Tue, 4 Feb 2025 22:41:13 +0800 Subject: [PATCH 2/4] debug --- be/src/vec/functions/function_compress.cpp | 6 ++---- .../string_functions/test_compress_uncompress.out | 6 +++--- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/be/src/vec/functions/function_compress.cpp b/be/src/vec/functions/function_compress.cpp index 0f10af4b8ae66f..9cd221e99cf195 100644 --- a/be/src/vec/functions/function_compress.cpp +++ b/be/src/vec/functions/function_compress.cpp @@ -100,15 +100,13 @@ class FunctionCompress : public IFunction { size_t idx = col_data.size(); if (!length) { // data is '' - col_data.resize(col_data.size() + 2); - col_data[idx] = '0', col_data[idx + 1] = 'x'; - col_offset[row] = col_offset[row - 1] + 2; + col_offset[row] = col_offset[row - 1]; continue; } // Z_MEM_ERROR and Z_BUF_ERROR are already handled in compress, making sure st is always Z_OK auto st = compression_codec->compress(data, &compressed_str); - col_data.resize(col_data.size() + compressed_str.size()); + col_data.resize(col_data.size() + 10 + compressed_str.size()); // first ten digits represent the length of the uncompressed string col_data[idx] = '0', col_data[idx + 1] = 'x'; diff --git a/regression-test/data/query_p0/sql_functions/string_functions/test_compress_uncompress.out b/regression-test/data/query_p0/sql_functions/string_functions/test_compress_uncompress.out index bbcea601408ec8..be60951c955392 100644 --- a/regression-test/data/query_p0/sql_functions/string_functions/test_compress_uncompress.out +++ b/regression-test/data/query_p0/sql_functions/string_functions/test_compress_uncompress.out @@ -2,7 +2,7 @@ -- !restore_original_data -- 1 Hello, world! 2 Doris测试中文字符 -4 \N +4 5 \N 6 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa @@ -13,7 +13,7 @@ 5 \N -- !compress_empty_string -- -4 \N +4 -- !compress_repeated_string -- 6 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa @@ -28,7 +28,7 @@ x aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa -- !compress_empty_string_direct -- -\N + -- !compress_string_direct -- Hello, world! From 12facae2117299c27cfdc2dd328db4dabed78428 Mon Sep 17 00:00:00 2001 From: lzyy2024 <2972013149@qq.com> Date: Wed, 5 Feb 2025 01:04:50 +0800 Subject: [PATCH 3/4] debug --- be/src/vec/functions/function_compress.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/vec/functions/function_compress.cpp b/be/src/vec/functions/function_compress.cpp index 9cd221e99cf195..55d11ca4eb727d 100644 --- a/be/src/vec/functions/function_compress.cpp +++ b/be/src/vec/functions/function_compress.cpp @@ -205,7 +205,7 @@ class FunctionUncompress : public IFunction { unsigned int length = 0; for (size_t i = 2; i <= 9; i += 2) { - unsigned char byte; + unsigned char byte = 0; std::from_chars(data.data + i, data.data + i + 2, byte, 16); length += (byte << (8 * (i / 2 - 1))); //Little Endian : 0x01000000 -> 1 } From c342b3f574b8d17b32c536ba5f2dac60186868be Mon Sep 17 00:00:00 2001 From: lzyy2024 <2972013149@qq.com> Date: Wed, 5 Feb 2025 18:03:48 +0800 Subject: [PATCH 4/4] directly sava length --- be/src/vec/functions/function_compress.cpp | 63 ++++++---------------- 1 file changed, 16 insertions(+), 47 deletions(-) diff --git a/be/src/vec/functions/function_compress.cpp b/be/src/vec/functions/function_compress.cpp index 55d11ca4eb727d..4c175a5fd44379 100644 --- a/be/src/vec/functions/function_compress.cpp +++ b/be/src/vec/functions/function_compress.cpp @@ -53,9 +53,6 @@ class FunctionContext; namespace doris::vectorized { class FunctionCompress : public IFunction { - static constexpr std::array HEX_ITOC = {'0', '1', '2', '3', '4', '5', '6', '7', - '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'}; - public: static constexpr auto name = "compress"; static FunctionPtr create() { return std::make_shared(); } @@ -95,7 +92,7 @@ class FunctionCompress : public IFunction { col_data.reserve(total / 1000); for (size_t row = 0; row < input_rows_count; row++) { - size_t length = arg_offset[row] - arg_offset[row - 1]; + uint32_t length = arg_offset[row] - arg_offset[row - 1]; data = Slice(arg_begin + arg_offset[row - 1], length); size_t idx = col_data.size(); @@ -106,16 +103,10 @@ class FunctionCompress : public IFunction { // Z_MEM_ERROR and Z_BUF_ERROR are already handled in compress, making sure st is always Z_OK auto st = compression_codec->compress(data, &compressed_str); - col_data.resize(col_data.size() + 10 + compressed_str.size()); - - // first ten digits represent the length of the uncompressed string - col_data[idx] = '0', col_data[idx + 1] = 'x'; - for (size_t i = 0; i < 4; i++) { - unsigned char byte = (length >> (i * 8)) & 0xFF; - col_data[idx + 2 + i * 2] = HEX_ITOC[byte >> 4]; // higher four - col_data[idx + 3 + i * 2] = HEX_ITOC[byte & 0x0F]; - } - idx += 10; + col_data.resize(col_data.size() + 4 + compressed_str.size()); + + std::memcpy(col_data.data() + idx, &length, sizeof(length)); + idx += 4; // The length of compress_str is not known in advance, so it cannot be compressed directly into col_data unsigned char* src = compressed_str.data(); @@ -182,48 +173,26 @@ class FunctionUncompress : public IFunction { continue; } - bool illegal = false; - // The first ten digits are "0x" and length, followed by hexadecimal, each two digits is a byte - if (data_length < 10) { - illegal = true; - } else { - if (data[0] != '0' || data[1] != 'x') { - illegal = true; - } - for (size_t i = 2; i <= 9; i++) { - if (!std::isxdigit(data[i])) { - illegal = true; - } - } - } - - if (illegal) { // The top ten don't fit the rules - col_offset[row] = col_offset[row - 1]; - null_map[row] = true; - continue; - } - - unsigned int length = 0; - for (size_t i = 2; i <= 9; i += 2) { - unsigned char byte = 0; - std::from_chars(data.data + i, data.data + i + 2, byte, 16); - length += (byte << (8 * (i / 2 - 1))); //Little Endian : 0x01000000 -> 1 - } + union { + char bytes[4]; + uint32_t value; + } length; + std::memcpy(length.bytes, data.data, 4); size_t idx = col_data.size(); - col_data.resize(col_data.size() + length); - uncompressed_slice = Slice(col_data.data() + idx, length); + col_data.resize(col_data.size() + length.value); + uncompressed_slice = Slice(col_data.data() + idx, length.value); - Slice compressed_data(data.data + 10, data.size - 10); + Slice compressed_data(data.data + 4, data.size - 4); auto st = compression_codec->decompress(compressed_data, &uncompressed_slice); - if (!st.ok()) { // is not a legal compressed string - col_data.resize(col_data.size() - length); // remove compressed_data + if (!st.ok()) { // is not a legal compressed string + col_data.resize(col_data.size() - length.value); // remove compressed_data col_offset[row] = col_offset[row - 1]; null_map[row] = true; continue; } - col_offset[row] = col_offset[row - 1] + length; + col_offset[row] = col_offset[row - 1] + length.value; } block.replace_by_position(