[go: up one dir, main page]

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cherry-pick](branch-2.1) pick hive text write from master #40537

Merged
merged 7 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1301,6 +1301,8 @@ DEFINE_Validator(tablet_meta_serialize_size_limit,
[](const int64_t config) -> bool { return config < 1717986918; });

DEFINE_mInt64(pipeline_task_leakage_detect_period_secs, "60");
DEFINE_mInt32(snappy_compression_block_size, "262144");
DEFINE_mInt32(lz4_compression_block_size, "262144");

DEFINE_mBool(enable_pipeline_task_leakage_detect, "false");

Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1383,6 +1383,9 @@ DECLARE_mBool(ignore_not_found_file_in_external_table);
DECLARE_mInt64(tablet_meta_serialize_size_limit);

DECLARE_mInt64(pipeline_task_leakage_detect_period_secs);
// To be compatible with hadoop's block compression
DECLARE_mInt32(snappy_compression_block_size);
DECLARE_mInt32(lz4_compression_block_size);

DECLARE_mBool(enable_pipeline_task_leakage_detect);

Expand Down
50 changes: 50 additions & 0 deletions be/src/exec/decompressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ Status Decompressor::create_decompressor(CompressType type,
case CompressType::BZIP2:
decompressor->reset(new Bzip2Decompressor());
break;
case CompressType::ZSTD:
decompressor->reset(new ZstdDecompressor());
break;
case CompressType::LZ4FRAME:
decompressor->reset(new Lz4FrameDecompressor());
break;
Expand Down Expand Up @@ -86,6 +89,9 @@ Status Decompressor::create_decompressor(TFileCompressType::type type,
case TFileCompressType::BZ2:
compress_type = CompressType::BZIP2;
break;
case TFileCompressType::ZSTD:
compress_type = CompressType::ZSTD;
break;
case TFileCompressType::LZ4FRAME:
compress_type = CompressType::LZ4FRAME;
break;
Expand Down Expand Up @@ -300,6 +306,50 @@ std::string Bzip2Decompressor::debug_info() {
return ss.str();
}

ZstdDecompressor::~ZstdDecompressor() {
ZSTD_freeDStream(_zstd_strm);
}

Status ZstdDecompressor::init() {
_zstd_strm = ZSTD_createDStream();
if (!_zstd_strm) {
std::stringstream ss;
return Status::InternalError("ZSTD_dctx creation error");
}
auto ret = ZSTD_initDStream(_zstd_strm);
if (ZSTD_isError(ret)) {
return Status::InternalError("ZSTD_initDStream error: {}", ZSTD_getErrorName(ret));
}
return Status::OK();
}

Status ZstdDecompressor::decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read,
uint8_t* output, size_t output_max_len,
size_t* decompressed_len, bool* stream_end,
size_t* more_input_bytes, size_t* more_output_bytes) {
// 1. set input and output
ZSTD_inBuffer inputBuffer = {input, input_len, 0};
ZSTD_outBuffer outputBuffer = {output, output_max_len, 0};

// decompress
int ret = ZSTD_decompressStream(_zstd_strm, &outputBuffer, &inputBuffer);
*input_bytes_read = inputBuffer.pos;
*decompressed_len = outputBuffer.pos;

if (ZSTD_isError(ret)) {
return Status::InternalError("Failed to zstd decompress: {}", ZSTD_getErrorName(ret));
}

*stream_end = ret == 0;
return Status::OK();
}

std::string ZstdDecompressor::debug_info() {
std::stringstream ss;
ss << "ZstdDecompressor.";
return ss.str();
}

// Lz4Frame
// Lz4 version: 1.7.5
// define LZ4F_VERSION = 100
Expand Down
32 changes: 31 additions & 1 deletion be/src/exec/decompressor.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <stddef.h>
#include <stdint.h>
#include <zlib.h>
#include <zstd.h>

#include <memory>
#include <string>
Expand All @@ -34,7 +35,17 @@

namespace doris {

enum CompressType { UNCOMPRESSED, GZIP, DEFLATE, BZIP2, LZ4FRAME, LZOP, LZ4BLOCK, SNAPPYBLOCK };
enum CompressType {
UNCOMPRESSED,
GZIP,
DEFLATE,
BZIP2,
ZSTD,
LZ4FRAME,
LZOP,
LZ4BLOCK,
SNAPPYBLOCK
};

class Decompressor {
public:
Expand Down Expand Up @@ -126,6 +137,25 @@ class Bzip2Decompressor : public Decompressor {
bz_stream _bz_strm;
};

class ZstdDecompressor : public Decompressor {
public:
~ZstdDecompressor() override;

Status decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read, uint8_t* output,
size_t output_max_len, size_t* decompressed_len, bool* stream_end,
size_t* more_input_bytes, size_t* more_output_bytes) override;

std::string debug_info() override;

private:
friend class Decompressor;
ZstdDecompressor() : Decompressor(CompressType::ZSTD) {}
Status init() override;

private:
suxiaogang223 marked this conversation as resolved.
Show resolved Hide resolved
ZSTD_DStream* _zstd_strm {nullptr};
};

class Lz4FrameDecompressor : public Decompressor {
public:
~Lz4FrameDecompressor() override;
Expand Down
Loading
Loading