Skip to content

Commit

Permalink
all
Browse files Browse the repository at this point in the history
  • Loading branch information
lzyy2024 committed Jan 22, 2025
1 parent 8b37740 commit fa80a74
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 55 deletions.
184 changes: 155 additions & 29 deletions be/src/vec/functions/function_compress.cpp
Original file line number Diff line number Diff line change
@@ -1,19 +1,3 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <glog/logging.h>

#include <cctype>
Expand Down Expand Up @@ -44,7 +28,6 @@
#include "vec/functions/function.h"
#include "vec/functions/simple_function_factory.h"


namespace doris {
class FunctionContext;
} // namespace doris
Expand All @@ -66,14 +49,16 @@ class FunctionCompress : public IFunction {

Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
uint32_t result, size_t input_rows_count) const override {
// LOG(INFO) << "Executing FunctionCompress with " << input_rows_count
// << " rows."; // Log the number of rows being processed

// Get the compression algorithm object
BlockCompressionCodec* compression_codec;
RETURN_IF_ERROR(get_block_compression_codec(segment_v2::CompressionTypePB::ZLIB,
&compression_codec));

const auto& arg_column =
assert_cast<const ColumnString&>(*block.get_by_position(arguments[0]).column);

auto result_column = ColumnString::create();

auto& col_data = result_column->get_chars();
Expand All @@ -90,16 +75,70 @@ class FunctionCompress : public IFunction {
const auto& str = arg_column.get_data_at(row);
data = Slice(str.data, str.size);

// Print the original string (before compression)
// LOG(INFO) << "Original string at row " << row << ": "
// << std::string(str.data, str.size);

auto st = compression_codec->compress(data, &compressed_str);

if (!st.ok()) {
// LOG(INFO) << "Compression failed at row " << row
// << ", skipping this row."; // Log failure
col_offset[row] = col_offset[row - 1];
null_map[row] = true;
continue;
}
col_data.resize(col_data.size() + compressed_str.size() + 1);
memcpy(col_data.data() + col_data.size(), compressed_str.data(), compressed_str.size());
col_offset[row] = col_offset[row - 1] + compressed_str.size();

size_t idx = col_data.size();
if (!str.size) { // null -> 0x
col_data.resize(col_data.size() + 2);
col_data[idx] = '0', col_data[idx + 1] = 'x';
col_offset[row] = col_offset[row - 1] + 2;
continue;
}

// first ten digits represent the length of the uncompressed string
int value = (int)str.size;
col_data.resize(col_data.size() + 10);
col_data[idx] = '0', col_data[idx + 1] = 'x';
for (int i = 0; i < 4; i++) {
unsigned char byte = (value >> (i * 8)) & 0xFF;
col_data[idx + 2 + i * 2] = "0123456789ABCDEF"[byte >> 4]; // 高4位
col_data[idx + 3 + i * 2] = "0123456789ABCDEF"[byte & 0x0F]; // 低4位
}
idx += 10;

col_data.resize(col_data.size() + 2 * compressed_str.size());
// memcpy(col_data.data() + col_data.size(), compressed_str.data(), compressed_str.size());

unsigned char* src = compressed_str.data();
{
auto transform = [](char ch) -> unsigned char {
char x;
if (ch < 10) {
x = ch + '0';
} else {
x = ch - 10 + 'A';
}
// LOG(INFO) << "transform" << (int)x << "->" << x;
return x;
};
for (int i = 0; i < compressed_str.size(); i++) {
col_data[idx] = transform(((*src) >> 4) & 0x0F);
col_data[idx + 1] = transform(*src & 0x0F);
LOG(INFO) << (unsigned int)(*src) << " -> " << (unsigned int)col_data[idx]
<< " and " << (unsigned int)col_data[idx + 1];
idx += 2;
src++;
}

// Print the compressed string (after compression)
// LOG(INFO) << "Compressed string at row " << row << ": "
// << std::string(reinterpret_cast<const char*>(col_data.data()));
col_offset[row] = col_offset[row - 1] + 10 + compressed_str.size() * 2;
}
}

block.replace_by_position(
result, ColumnNullable::create(std::move(result_column), std::move(null_column)));
return Status::OK();
Expand All @@ -121,6 +160,9 @@ class FunctionUncompress : public IFunction {

Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
uint32_t result, size_t input_rows_count) const override {
LOG(INFO) << "Executing FunctionUncompress with " << input_rows_count
<< " rows."; // Log the number of rows being processed

// Get the compression algorithm object
BlockCompressionCodec* compression_codec;
RETURN_IF_ERROR(get_block_compression_codec(segment_v2::CompressionTypePB::ZLIB,
Expand All @@ -138,27 +180,111 @@ class FunctionUncompress : public IFunction {
auto& null_map = null_column->get_data();

std::string uncompressed;
uncompressed.resize(32);
Slice data;
Slice uncompressed_slice;
for (int row = 0; row < input_rows_count; row++) {
auto check = [](char x) {
if (x >= '0' && x <= '9') return true;
if (x >= 'a' && x <= 'f') return true;
if (x >= 'A' && x <= 'F') return true;
return false;
};
auto trans = [](char x) {
if (x >= '0' && x <= '9') {
return x - '0';
}
if (x >= 'A' && x <= 'F') {
return x - 'A' + 10;
}
return x - 'a' + 10;
};

null_map[row] = false;
const auto& str = arg_column.get_data_at(row);
data = Slice(str.data, str.size);

// Print the compressed string (before decompression)
// LOG(INFO) << "Compressed string at row " << row << ": " << data.to_string() << ' '
// << data.size;

int illegal = 0;
if ((int)str.size < 10 || (int)str.size % 2 == 1) {
// The first ten digits are "0x" and length, followed by hexadecimal, each two digits is a byte
illegal = 1;
} else {
if (data[0] != '0' || data[1] != 'x') {
LOG(INFO) << "illegal: "
<< "data[0] : " << data[0] << " data[1] : " << data[1];
illegal = 1;
}
for (int i = 2; i <= 9; i += 2) {
if (!check(data[i])) {
illegal = 1;
LOG(INFO) << "illegal at " << i << ' ' << data[i];
}
}
}

if (illegal) {
if ((int)data.size == 2 && data[0] == '0' && data[1] == 'x') { // ''
int idx = col_data.size();
col_data.resize(col_data.size() + 2);
col_data[idx] = '0', col_data[idx + 1] = 'x';
col_offset[row] = col_offset[row - 1] + 2;
continue;
}
// LOG(INFO) << "illegal! : "
// << "(int) str.size() < 10 :" << ((int)str.size < 10)
// << ", (int)str.size() % 2 == 1 :" << ((int)str.size % 2 == 1);
col_offset[row] = col_offset[row - 1];
null_map[row] = true;
continue;
}

unsigned int length = 0;
for (size_t i = 2; i <= 9; i += 2) {
unsigned char byte = trans(data[i]);
byte = (byte << 4) + trans(data[i + 1]);
length += (byte << (8 * (i / 2 - 1)));
}
// LOG(INFO) << "uncompressed string length: " << length;

uncompressed.resize(length);
uncompressed_slice = Slice(uncompressed);
auto st = compression_codec->decompress(data, &uncompressed_slice);

//Converts a hexadecimal readable string to a compressed byte stream
std::string s(((int)data.size - 10) / 2, ' '); // byte stream data.size >= 10
for (size_t i = 10, j = 0; i < data.size; i += 2, j++) {
s[j] = (trans(data[i]) << 4) + trans(data[i + 1]);
// LOG(INFO) << data[i] << "*16 + " << data[i + 1] << " -> " << (int)(s[j]);
// LOG(INFO) << j << ' ' << s.size();
}
Slice compressed_data(s);
auto st = compression_codec->decompress(compressed_data, &uncompressed_slice);

if (!st.ok()) {
// LOG(INFO) << "Decompression failed at row " << row
// << ", skipping this row."; // Log failure
col_offset[row] = col_offset[row - 1];
null_map[row] = true;
continue;
}
// auto* unc_data = reinterpret_cast<const char*>(uncompressed_slice.data);
// col_data.push_back_raw(unc_data, uncompressed_slice.size);
col_data.resize(col_data.size() + uncompressed_slice.size + 1);
memcpy(col_data.data() + col_data.size(), uncompressed_slice.data,
uncompressed_slice.size);
col_offset[row] = col_offset[row - 1] + uncompressed_slice.size;
// Print the uncompressed string(after decompression) ;
// LOG(INFO) << "Uncompressed string at row " << row << ": "
// << uncompressed_slice.to_string();

int idx = col_data.size();
col_data.resize(col_data.size() + 2 * uncompressed_slice.size + 2);
col_offset[row] = col_offset[row - 1] + 2 * uncompressed_slice.size + 2;

col_data[idx] = '0', col_data[idx + 1] = 'x';
for (int i = 0; i < uncompressed_slice.size; i++) {
unsigned char byte = uncompressed_slice[i];
col_data[idx + 2 + i * 2] = "0123456789ABCDEF"[byte >> 4]; // 高4位
col_data[idx + 3 + i * 2] = "0123456789ABCDEF"[byte & 0x0F]; // 低4位
}
}

block.replace_by_position(
result, ColumnNullable::create(std::move(result_column), std::move(null_column)));
return Status::OK();
Expand Down
44 changes: 22 additions & 22 deletions be/test/vec/function/function_string_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3345,22 +3345,22 @@ TEST(function_string_test, function_rpad_test) {

// 压缩多个不同的字符串
DataSet data_set = {
{{Null()}, Null()},
// 示例 1: 压缩普通字符串
{"Hello, world!", "0x0D000000789CF348CDC9C9D75128CF2FCA49510400205E048A"},
// 示例 2: 压缩空字符串
{"", "0x"},
// 示例 3: 压缩带特殊字符的字符串
{"String with special characters! @#$%^&*()",
"0x29000000789C0B2E29CACC4B5728CF2CC950282E484DCE4CCC5148CE482C4A4C2E492D2A565"
"4705056518D53D3D2D004003C2A0D81"},
// 示例 4: 压缩带前后空格的字符串
{" This is a string with leading and trailing spaces ",
"0x37000000789C15C7510A00101045D1ADDC45D9C00B3125C94CD93EEAFC1C2075731EE1B16D3"
"68E456754951FCD426CD9F8F1A55C1DB8054B12C9"}};

static_cast<void>(
check_function<DataTypeString, true>(func_name, input_types, data_set));
{{Null()}, Null()},
// 示例 1: 压缩普通字符串
{{std::string("Hello, world!")}, "0x0D000000789CF348CDC9C9D75128CF2FCA49510400205E048A"},
// 示例 2: 压缩空字符串
{{std::string("")}, "0x"},
// 示例 3: 压缩带特殊字符的字符串
{{std::string("String with special characters! @#$%^&*()")},
"0x29000000789C0B2E29CACC4B5728CF2CC950282E484DCE4CCC5148CE482C4A4C2E492D2A565
4705056518D53D3D2D004003C2A0D81 "},
// 示例 4: 压缩带前后空格的字符串
{{std::string(" This is a string with leading and trailing spaces "}),
"0x37000000789C15C7510A00101045D1ADDC45D9C00B3125C94CD93EEAFC1C2075731EE1B16D3
68E456754951FCD426CD9F8F1A55C1DB8054B12C9 "}};

static_cast<void>(
check_function<DataTypeString, true>(func_name, input_types, data_set));
}

{
Expand All @@ -3371,16 +3371,16 @@ TEST(function_string_test, function_rpad_test) {
DataSet data_set = {
{{Null()}, Null()},
// 示例 1: 解压 'Hello, world!' 的压缩结果
{"0x0D000000789CF348CDC9C9D75128CF2FCA49510400205E048A", "Hello, world!"},
{{std::string("0x0D000000789CF348CDC9C9D75128CF2FCA49510400205E048A")}, "Hello, world!"},
// 示例 2: 解压空字符串的压缩结果
{"0x", ""},
{{std::string("0x")}, ""},
// 示例 3: 解压带特殊字符的字符串
{"0x29000000789C0B2E29CACC4B5728CF2CC950282E484DCE4CCC5148CE482C4A4C2E492D2A565"
"4705056518D53D3D2D004003C2A0D81",
{{std::string("0x29000000789C0B2E29CACC4B5728CF2CC950282E484DCE4CCC5148CE482C4A4C2E492D2A565
4705056518D53D3D2D004003C2A0D81")},
"String with special characters! @#$%^&*()"},
// 示例 4: 解压带前后空格的字符串
{"0x37000000789C15C7510A00101045D1ADDC45D9C00B3125C94CD93EEAFC1C2075731EE1B16D3"
"68E456754951FCD426CD9F8F1A55C1DB8054B12C9",
{{std::string("0x37000000789C15C7510A00101045D1ADDC45D9C00B3125C94CD93EEAFC1C2075731EE1B16D3
68E456754951FCD426CD9F8F1A55C1DB8054B12C9")},
" This is a string with leading and trailing spaces "}};

static_cast<void>(
Expand Down
2 changes: 1 addition & 1 deletion bin/fe.pid
Original file line number Diff line number Diff line change
@@ -1 +1 @@
16858
1488441
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@
import org.apache.doris.nereids.trees.expressions.functions.scalar.YearsDiff;
import org.apache.doris.nereids.trees.expressions.functions.scalar.YearsSub;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Compress;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Unompress;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Uncompress;

import com.google.common.collect.ImmutableList;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.BinaryType;
import org.apache.doris.nereids.types.StringType;

import com.google.common.base.Preconditions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.StringType;
import org.apache.doris.nereids.types.BinaryType;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
-- !compress_result
1 0x0D000000789CF348CDC9C9D75128CF2FCA49510400205E048A
2 0x17000000789C011700E8FF446F726973E6B58BE8AF95E4B8ADE69687E5AD97E7ACA6A70F0F02
3 \N
4 0x15000000789C011500EAFFE6B58BE8AF95E8A7A3E58E8BE7BCA9E8A18CE4B8BAA8250F3A
5 0x
6 0x
7 \N

-- !uncompress_result
1 Hello, world!
2 Doris测试中文字符
3 \N
4 测试解压缩行为
5
6
7 \N

-- !invalid_uncompress
1 \N
2 \N
3 \N
4 \N
5 \N
6 \N
7 \N
Loading

0 comments on commit fa80a74

Please sign in to comment.