Post

openGauss列存压缩源码分析

openGauss列存压缩源码分析

在openGauss 3.0.0数据库中,相对于行存以页为单元进行压缩,列存以CU为单元具有天然的压缩优势。

在openGauss中有三种压缩级别:LOW, MIDDLE, HIGH。指定的压缩等级越高,则数据的压缩率越高。除此之外还可以选择不开启压缩。

1
2
3
4
5
6
typedef enum OptCompress {
    COMPRESS_NO = 0,
    COMPRESS_LOW,
    COMPRESS_MIDDLE,
    COMPRESS_HIGH,
} OptCompress;

对于压缩算法,一共有:

1
2
3
4
5
6
7
8
9
10
11
12
#define CU_DeltaCompressed 0x0001
#define CU_DicEncode 0x0002
// CU_CompressExtend is used for extended compression.
// For compression added afterwards, it can be defined as CU_CompressExtend + 0x0001~0x0008
#define CU_CompressExtend 0x0004    // Used for extended compression
#define CU_Delta2Compressed 0x0005  // CU_Delta2Compressed equals CU_CompressExtend plus 0x0001
#define CU_XORCompressed 0x0006     // CU_XORCompressed equals CU_CompressExtend plus 0x0002
#define CU_RLECompressed 0x0008
#define CU_LzCompressed 0x0010
#define CU_ZlibCompressed 0x0020
#define CU_BitpackCompressed 0x0040
#define CU_IntLikeCompressed 0x0080

对于一个CU的压缩,代码在CU::Compress(int valCount, int16 compress_modes, int align_size)函数里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
void CU::Compress(int valCount, int16 compress_modes, int align_size)
{
    errno_t rc;

    // Step1: 初始化,为compress_buffer分配内存,原数据大小 + NULL值位图大小 + 头部大小,压缩后大小不会超过分配的compress_buffer大小
    m_compressedBufSize = CUAlignUtils::AlignCuSize(m_srcDataSize + m_bpNullRawSize + sizeof(CU), align_size);
    m_compressedBuf = (char*)CStoreMemAlloc::Palloc(m_compressedBufSize, !m_inCUCache);

    int16 headerLen = GetCUHeaderSize();
    char* buf = m_compressedBuf + headerLen;

    // Step 2: 填充CU的NULL值 bitmap 位图
    buf = CompressNullBitmapIfNeed(buf);

    // Step 3: 压缩CU内的数据
    bool compressed = false;
    if (COMPRESS_NO != heaprel_get_compression_from_modes(compress_modes))
        compressed = CompressData(buf, valCount, compress_modes, align_size);

    // case 1: 用户定义数据不需要被加密
    // case 2: 用户定义了加密,但加密后数据大小比未加密数据还大,所以直接使用原来的数据
    if (compressed == false) {
        rc = memcpy_s(buf, m_srcDataSize, m_srcData, m_srcDataSize);
        securec_check(rc, "\0", "\0");
        m_cuSizeExcludePadding = headerLen + m_bpNullCompressedSize + m_srcDataSize;
        m_cuSize = CUAlignUtils::AlignCuSize(m_cuSizeExcludePadding, align_size);
        PADDING_CU(buf + m_srcDataSize, m_cuSize - m_cuSizeExcludePadding);
    }

    // 压缩后进行加密
    CUDataEncrypt(buf);

    // Step 4: 填充 compress_buffer 的头部
    FillCompressBufHeader();

    m_cache_compressed = true;

    // Step 5; 释放原数据buf
    FreeSrcBuf();
}

实际的数据压缩发生在CU::CompressData(_out_ char* outBuf, _in_ int nVals, _in_ int16 compress_modes, int align_size)函数里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
bool CU::CompressData(_out_ char* outBuf, _in_ int nVals, _in_ int16 compress_modes, int align_size)
{
    int compressOutSize = 0;
    bool beDelta2Compressed = false;
    bool beXORCompressed = false;

    // 获取压缩级别
    int8 compression = heaprel_get_compression_from_modes(compress_modes);

    // 压缩结果输出到outBuf
    CompressionArg2 output = {0};
    output.buf = outBuf;
    output.sz = (m_compressedBuf + m_compressedBufSize) - outBuf;

    // 压缩的输入,就是CU数据
    CompressionArg1 input = {0};
    input.sz = m_srcDataSize;
    input.buf = m_srcData;
    input.mode = compress_modes;

    // 为当前CU的数据设置 compression filter
    compression_options* ref_filter = (compression_options*)m_tmpinfo->m_options;

    // 如果允许tsdb,并且属性是timestamp或者float类型:
    // tsdb: 时序数据库
    if (g_instance.attr.attr_common.enable_tsdb && (ATT_IS_TIMESTAMP(m_atttypid) || ATT_IS_FLOAT(m_atttypid))) {
        SequenceCodec sequenceCoder(m_eachValSize, m_atttypid);     // 由SequenceCodec类完成
        compressOutSize = sequenceCoder.compress(input, output);
        if (ATT_IS_TIMESTAMP(m_atttypid)) {
            beDelta2Compressed = true;  // timestamp则使用了Delta2压缩
        } else if (ATT_IS_FLOAT(m_atttypid)) {
            beXORCompressed = true;     // float则使用了XOR压缩
        }
    }
    // 如果压缩结果小于0或者没有用Delta2和XOR压缩,说明要么上一步没有成功压缩,要么不是timestamp和float类型
    if (compressOutSize < 0 || (!beDelta2Compressed && !beXORCompressed)) {
        output = {0};
        output.buf = outBuf;
        output.sz = (m_compressedBuf + m_compressedBufSize) - outBuf;

        if (m_infoMode & CU_IntLikeCompressed) {   // 是类整型
            if (ATT_IS_CHAR_TYPE(m_atttypid)) {     // 属性是char类型的
                IntegerCoder intCoder(8);            // 由IntegerCoder类完成

                /* set min/max value */
                if (m_tmpinfo->m_valid_minmax) {
                    intCoder.SetMinMaxVal(m_tmpinfo->m_min_value, m_tmpinfo->m_max_value);
                }
                // 告诉IntegerCoder是否使用RLE压缩
                intCoder.m_adopt_rle = ref_filter->m_adopt_rle;
                compressOutSize = intCoder.Compress(input, output);
            } else if (ATT_IS_NUMERIC_TYPE(m_atttypid)) {       // 属性是numeric类型的
                if (compression > COMPRESS_LOW) {               // 压缩级别大于LOW,是MIDDLE或者HIGH
                    /// numeric data type compression.
                    /// lz4/zlib is used directly.              // 直接使用lz4/zlib压缩
                    input.buildGlobalDict = false;
                    input.useGlobalDict = false;
                    input.globalDict = NULL;
                    input.useDict = false;
                    input.numVals = HasNullValue() ? (nVals - CountNullValuesBefore(nVals)) : nVals;

                    StringCoder strCoder;                       // 由StringCoder类完成
                    compressOutSize = strCoder.Compress(input, output);
                }
            } else {
                // for future, another type             // 未来的其他类型
            }
        } else if (m_eachValSize > 0 && m_eachValSize <= 8) {           // 类型大小在(0, 8]范围
            IntegerCoder intCoder(m_eachValSize);                       // 由IntegerCoder完成
            /* set min/max value */
            if (m_tmpinfo->m_valid_minmax) {
                intCoder.SetMinMaxVal(m_tmpinfo->m_min_value, m_tmpinfo->m_max_value);
            }
            // 告诉IntegerCoder是否使用RLE压缩
            intCoder.m_adopt_rle = ref_filter->m_adopt_rle;         
            compressOutSize = intCoder.Compress(input, output);     // 由IntegerCoder类完成
        } else {
            // FUTURE CASE: complete global dictionary              // 未来会支持的:全局字典压缩
            Assert(-1 == m_eachValSize || m_eachValSize > 8);       // 类型大小大于8,或者为-1(变长字符)
            input.buildGlobalDict = false;
            input.useGlobalDict = false;
            input.globalDict = NULL;

            // 类型大小大于8的直接使用lz4/zlib,不用字典方法

            // 类型大小为-1的,说明是变长类型,如果压缩级别不是LOW则使用字典压缩
            input.useDict = (m_eachValSize > 8) ? false : (COMPRESS_LOW != compression);

            // values的数量是除掉了空值的
            input.numVals = HasNullValue() ? (nVals - CountNullValuesBefore(nVals)) : nVals;

            // 由StringCoder类完成
            StringCoder strCoder;
            // 提示StringCoder类是否使用RLE和字典压缩
            strCoder.m_adopt_rle = ref_filter->m_adopt_rle;
            strCoder.m_adopt_dict = ref_filter->m_adopt_dict;
            compressOutSize = strCoder.Compress(input, output);
        }
    }

    if (compressOutSize > 0) {
        // 压缩成功,计算CU大小,设置相关压缩信息
        Assert((uint32)compressOutSize < m_srcDataSize);
        Assert((0 == (output.modes & CU_INFOMASK2)) && (0 != (output.modes & CU_INFOMASK1)));
        m_infoMode |= (output.modes & CU_INFOMASK1);

        m_cuSizeExcludePadding = (outBuf - m_compressedBuf) + compressOutSize;
        m_cuSize = CUAlignUtils::AlignCuSize(m_cuSizeExcludePadding, align_size);
        Assert(m_cuSize <= m_compressedBufSize);
        PADDING_CU(m_compressedBuf + m_cuSizeExcludePadding, m_cuSize - m_cuSizeExcludePadding);

        if (!ref_filter->m_sampling_fihished) {
            /* sample and set adopted compression methods */
            ref_filter->set_common_flags(output.modes);
        }

        return true;
    }

    return false;
}

对于SequenceCodec类的压缩,会先判断类型,如果是timestamp则使用Delta2压缩,是float类型则使用XOR压缩。然后还会判断压缩级别,如果是MIDDLE或HIGH则还需要分别使用lz4和zlib进行压缩。

对于IntegerCoder类的压缩,会先进行Delta压缩,如果使用RLE的话再使用RLE压缩,然后对压缩级别为MIDDLE和HIGH的情况分别使用lz4和zlib压缩。

对于StringCoder类的压缩,会先判断是否使用字典压缩,如果使用则先进行字典压缩,然后对数字部分使用IntegerCoder进行压缩。如果字典压缩失败或者没有允许使用字典压缩,则直接对于LOW和MIDDLE级别使用lz4压缩,对于HIGH级别使用zlib压缩。


可以看到整个流程为: 先判断是否允许了tsdb(时序数据库)并且类型为timestamp或者float类型,如果是,则压缩是由SequenceCodec类完成的,timestamp则使用了Delta2压缩,float则使用了XOR压缩。如果压缩级别为MIDDLE或HIGH,则还需要进行lz4或zlib压缩。 对于timestamp和float类型:

 LOWMIDDLEHIGH
timestampDelta2Delta2 + lz4Delta2 + zlib
floatXORXOR + lz4XOR + zlib

如果不允许tsdb或者上一步没有成功压缩,则进入以下的流程:

如果是IntLike(类整型):char类型则由IntegerCoder类完成,先使用Delta压缩,如果开启了RLE则使用RLE压缩,如果压缩级别为MIDDLE或HIGH,则还需要进行lz4或zlib压缩。

 LOWMIDDLEHIGH
charDelta + RLE(可选)Delta + RLE(可选) + lz4Delta + RLE(可选) + zlib

numeric类型则要判断压缩级别,如果级别为LOW,则不压缩numeric类型。当级别为MIDDLE或HIGH时,直接使用lz4/zlib压缩,由StringCoder类完成。

 LOWMIDDLEHIGH
numeric不压缩lz4zlib

对于不是IntLike(类整型)的情况,如果类型大小在(0, 8]范围,则由IntegerCoder类完成,并使用RLE压缩,如果级别为MIDDLE或HIGH还需要分别使用lz4和zlib压缩。

 LOWMIDDLEHIGH
长度不大于8的定长字符Delta + RLE(可选)Delta + RLE(可选) + lz4Delta + RLE(可选) + zlib

类型大小大于8的直接使用lz4/zlib,不使用字典压缩:

 LOWMIDDLEHIGH
长度大于8的定长字符lz4lz4zlib

类型大小为-1的,说明是变长类型,如果压缩级别为LOW则不使用字典,直接使用lz4。如果压缩级别为MIDDLE或HIGH,尝试用字典方法,然后对数字部分使用IntegerCodr进行压缩:

 LOWMIDDLEHIGH
变长字符lz4字典 + Delta + RLE(可选) + lz4字典 + Delta + RLE(可选) + zlib

综上:

 LOWMIDDLEHIGH
timestampDelta2Delta2 + lz4Delta2 + zlib
floatXORXOR + lz4XOR + zlib
charDelta + RLE(可选)Delta + RLE(可选) + lz4Delta + RLE(可选) + zlib
numeric不压缩lz4zlib
长度不大于8的定长字符Delta + RLE(可选)Delta + RLE(可选) + lz4Delta + RLE(可选) + zlib
长度大于8的定长字符lz4lz4zlib
变长字符lz4字典 + Delta + RLE(可选) + lz4字典 + Delta + RLE(可选) + zlib