提交 51538a17 编写于 作者: H Hongze Cheng

more work

上级 e7d177a4
......@@ -392,10 +392,10 @@ tDataTypeDescriptor tDataTypes[TSDB_DATA_TYPE_MAX] = {
getStatics_i64},
{TSDB_DATA_TYPE_FLOAT, 5, FLOAT_BYTES, "FLOAT", 0, 0, tsCompressFloat, tsDecompressFloat, getStatics_f},
{TSDB_DATA_TYPE_DOUBLE, 6, DOUBLE_BYTES, "DOUBLE", 0, 0, tsCompressDouble, tsDecompressDouble, getStatics_d},
{TSDB_DATA_TYPE_VARCHAR, 6, 0, "VARCHAR", 0, 0, tsCompressString, tsDecompressString, getStatics_bin},
{TSDB_DATA_TYPE_VARCHAR, 6, 1, "VARCHAR", 0, 0, tsCompressString, tsDecompressString, getStatics_bin},
{TSDB_DATA_TYPE_TIMESTAMP, 9, LONG_BYTES, "TIMESTAMP", INT64_MIN, INT64_MAX, tsCompressTimestamp,
tsDecompressTimestamp, getStatics_i64},
{TSDB_DATA_TYPE_NCHAR, 5, 8, "NCHAR", 0, 0, tsCompressString, tsDecompressString, getStatics_nchr},
{TSDB_DATA_TYPE_NCHAR, 5, 1, "NCHAR", 0, 0, tsCompressString, tsDecompressString, getStatics_nchr},
{TSDB_DATA_TYPE_UTINYINT, 16, CHAR_BYTES, "TINYINT UNSIGNED", 0, UINT8_MAX, tsCompressTinyint, tsDecompressTinyint,
getStatics_u8},
{TSDB_DATA_TYPE_USMALLINT, 17, SHORT_BYTES, "SMALLINT UNSIGNED", 0, UINT16_MAX, tsCompressSmallint,
......
......@@ -47,6 +47,7 @@ typedef struct SBlockL SBlockL;
typedef struct SColData SColData;
typedef struct SBlockDataHdr SBlockDataHdr;
typedef struct SBlockData SBlockData;
typedef struct SDiskData SDiskData;
typedef struct SDelFile SDelFile;
typedef struct SHeadFile SHeadFile;
typedef struct SDataFile SDataFile;
......@@ -149,6 +150,11 @@ SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx);
void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData);
int32_t tPutBlockData(uint8_t *p, SBlockData *pBlockData);
int32_t tGetBlockData(uint8_t *p, SBlockData *pBlockData);
// SDiskData
int32_t tDiskDataInit(SDiskData *pDiskData);
void tDiskDataClear(SDiskData *pDiskData);
int32_t tBlockToDiskData(SBlockData *pBlockData, SDiskData *pDiskData, int8_t cmprAlg);
int32_t tDiskToBlockData(SDiskData *pDiskData, SBlockData *pBlockData);
// SDelIdx
int32_t tPutDelIdx(uint8_t *p, void *ph);
int32_t tGetDelIdx(uint8_t *p, void *ph);
......@@ -392,38 +398,35 @@ struct SMapData {
};
typedef struct {
int16_t cid;
int8_t type;
int8_t smaOn;
int8_t flag; // HAS_NONE|HAS_NULL|HAS_VALUE
int32_t szOrigin; // original column value size (only save for variant data type)
int32_t szBitmap; // bitmap size
int32_t szOffset; // size of offset, only for variant-length data type
int32_t szValue; // compressed column value size
int32_t offset;
int16_t cid;
int8_t type;
int8_t smaOn;
int8_t flag; // HAS_NONE|HAS_NULL|HAS_VALUE
int32_t szOrigin; // original column value size (only save for variant data type)
int32_t szBitmap; // bitmap size
int32_t szOffset; // size of offset, only for variant-length data type
int32_t szValue; // compressed column value size
int32_t offset;
uint8_t **ppData;
} SBlockCol;
typedef struct {
int32_t nRow;
int8_t cmprAlg;
int64_t offset; // block data offset
int32_t szBlockCol; // SBlockCol size
int32_t szVersion; // VERSION size
int32_t szTSKEY; // TSKEY size
int32_t szBlock; // total block size
int64_t sOffset; // sma offset
int32_t nSma; // sma size
int64_t offset; // block data offset
int32_t szBlock;
int32_t szKey;
} SSubBlock;
struct SBlock {
TSDBKEY minKey;
TSDBKEY maxKey;
int64_t minVersion;
int64_t maxVersion;
int64_t minVer;
int64_t maxVer;
int32_t nRow;
int8_t hasDup;
int8_t nSubBlock;
SSubBlock aSubBlock[TSDB_MAX_SUBBLOCKS];
int64_t sOffset; // sma offset
int32_t nSma; // sma size
};
struct SBlockL {
......@@ -433,13 +436,6 @@ struct SBlockL {
int64_t minVer;
int64_t maxVer;
int32_t nRow;
int64_t offset;
int8_t cmprAlg;
int32_t szBlockCol;
int32_t szUid;
int32_t szVer;
int32_t szTSKEY;
int32_t szBlock;
};
struct SColData {
......@@ -502,13 +498,17 @@ struct SDelIdx {
int64_t size;
};
#pragma pack(push, 1)
struct SBlockDataHdr {
uint32_t delimiter;
int32_t nRow;
int64_t suid;
int64_t uid;
int32_t szUid;
int32_t szVer;
int32_t szKey;
int32_t szBlkCol;
int8_t cmprAlg;
};
#pragma pack(pop)
struct SDelFile {
volatile int32_t nRef;
......@@ -596,6 +596,24 @@ struct STsdbReadSnap {
STsdbFS fs;
};
struct SDiskData {
int8_t cmprAlg;
int32_t nRow;
int64_t suid;
int64_t uid;
int32_t szUid;
int32_t szVer;
int32_t szKey;
uint8_t *pUid;
uint8_t *pVer;
uint8_t *pKey;
SArray *aBlockCol; // SArray<SBlockCol>
int32_t nBuf;
SArray *aBuf; // SArray<uint8_t*>
uint8_t *pBuf;
};
// ========== inline functions ==========
static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
TSDBKEY *pKey1 = (TSDBKEY *)p1;
......
......@@ -630,7 +630,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_
}
// 2. version range check
if (block.minVersion > pReader->verRange.maxVer || block.maxVersion < pReader->verRange.minVer) {
if (block.minVer > pReader->verRange.maxVer || block.maxVer < pReader->verRange.minVer) {
continue;
}
......@@ -766,7 +766,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows,
pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr);
pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
return TSDB_CODE_SUCCESS;
}
......@@ -798,7 +798,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr);
pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
return TSDB_CODE_SUCCESS;
......@@ -1011,8 +1011,8 @@ static bool blockIteratorNext(SDataBlockIter* pBlockIter) {
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SBlock* pBlock) {
return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
(pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
(pVerRange->minVer > pBlock->minVersion && pVerRange->minVer <= pBlock->maxVersion) ||
(pVerRange->maxVer < pBlock->maxVersion && pVerRange->maxVer >= pBlock->minVersion);
(pVerRange->minVer > pBlock->minVer && pVerRange->minVer <= pBlock->maxVer) ||
(pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
}
static SBlock* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
......@@ -1092,8 +1092,8 @@ static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock)
}
static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVerRange) {
return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVersion >= pVerRange->minVer) &&
(pBlock->minVersion <= pVerRange->maxVer);
return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVer >= pVerRange->minVer) &&
(pBlock->minVer <= pVerRange->maxVer);
}
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock) {
......@@ -1102,11 +1102,11 @@ static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, cons
for (int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += 1) {
TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
if (p->version >= pBlock->minVersion) {
if (p->version >= pBlock->minVer) {
return true;
}
} else if (p->ts < pBlock->minKey.ts) { // p->ts < pBlock->minKey.ts
if (p->version >= pBlock->minVersion) {
if (p->version >= pBlock->minVer) {
if (i < num - 1) {
TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
if (i + 1 == num - 1) { // pnext is the last point
......@@ -1114,7 +1114,7 @@ static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, cons
return true;
}
} else {
if (pnext->ts >= pBlock->minKey.ts && pnext->version >= pBlock->minVersion) {
if (pnext->ts >= pBlock->minKey.ts && pnext->version >= pBlock->minVer) {
return true;
}
}
......@@ -2512,9 +2512,7 @@ void* tsdbGetIvtIdx(SMeta* pMeta) {
return metaGetIvtIdx(pMeta);
}
uint64_t getReaderMaxVersion(STsdbReader *pReader) {
return pReader->verRange.maxVer;
}
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
/**
* @brief Get all suids since suid
......
......@@ -1909,8 +1909,8 @@ static void tsdbUpdateBlockInfo(SBlockData *pBlockData, SBlock *pBlock) {
pBlock->maxKey = key;
}
pBlock->minVersion = TMIN(pBlock->minVersion, key.version);
pBlock->maxVersion = TMAX(pBlock->maxVersion, key.version);
pBlock->minVer = TMIN(pBlock->minVer, key.version);
pBlock->maxVer = TMAX(pBlock->maxVer, key.version);
}
pBlock->nRow += pBlockData->nRow;
}
......@@ -2310,9 +2310,13 @@ int32_t tsdbWriteLastBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock
pBlockL->szBlock = pBlockL->szBlockCol + sizeof(TSCKSUM) + nBuf1;
pWriter->fLast.size += pBlockL->szBlock;
tFree(pBuf1);
tFree(pBuf2);
return code;
_err:
tFree(pBuf1);
tFree(pBuf2);
return code;
}
......@@ -2426,4 +2430,4 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
_err:
tsdbError("vgId:%d, tsdb DFileSet copy failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code;
}
\ No newline at end of file
}
......@@ -227,8 +227,8 @@ int32_t tPutBlock(uint8_t *p, void *ph) {
n += tPutTSDBKEY(p ? p + n : p, &pBlock->minKey);
n += tPutTSDBKEY(p ? p + n : p, &pBlock->maxKey);
n += tPutI64v(p ? p + n : p, pBlock->minVersion);
n += tPutI64v(p ? p + n : p, pBlock->maxVersion);
n += tPutI64v(p ? p + n : p, pBlock->minVer);
n += tPutI64v(p ? p + n : p, pBlock->maxVer);
n += tPutI32v(p ? p + n : p, pBlock->nRow);
n += tPutI8(p ? p + n : p, pBlock->hasDup);
n += tPutI8(p ? p + n : p, pBlock->nSubBlock);
......@@ -253,8 +253,8 @@ int32_t tGetBlock(uint8_t *p, void *ph) {
n += tGetTSDBKEY(p + n, &pBlock->minKey);
n += tGetTSDBKEY(p + n, &pBlock->maxKey);
n += tGetI64v(p + n, &pBlock->minVersion);
n += tGetI64v(p + n, &pBlock->maxVersion);
n += tGetI64v(p + n, &pBlock->minVer);
n += tGetI64v(p + n, &pBlock->maxVer);
n += tGetI32v(p + n, &pBlock->nRow);
n += tGetI8(p + n, &pBlock->hasDup);
n += tGetI8(p + n, &pBlock->nSubBlock);
......@@ -1509,6 +1509,206 @@ int32_t tGetBlockData(uint8_t *p, SBlockData *pBlockData) {
return n;
}
// SDiskData ==============================
static int32_t tsdbCmprData(uint8_t *pIn, int32_t szIn, int8_t type, int8_t cmprAlg, uint8_t **ppOut, int32_t nOut,
int32_t *szOut, uint8_t **ppBuf) {
int32_t code = 0;
ASSERT(szIn > 0 && ppOut);
if (cmprAlg == NO_COMPRESSION) {
code = tRealloc(ppOut, nOut + szIn);
if (code) goto _exit;
memcpy(*ppOut + nOut, pIn, szIn);
*szOut = szIn;
} else {
int32_t size = szIn + COMP_OVERFLOW_BYTES;
code = tRealloc(ppOut, nOut + size);
if (code) goto _exit;
if (cmprAlg == TWO_STAGE_COMP) {
ASSERT(ppBuf);
code = tRealloc(ppBuf, size);
if (code) goto _exit;
}
*szOut =
tDataTypes[type].compFunc(pIn, szIn, szIn / tDataTypes[type].bytes, *ppOut + nOut, size, cmprAlg, *ppBuf, size);
if (*szOut <= 0) {
code = TSDB_CODE_COMPRESS_ERROR;
goto _exit;
}
}
_exit:
return code;
}
static int32_t tsdbCmprColData(SColData *pColData, int8_t cmprAlg, SBlockCol *pBlockCol, uint8_t **ppBuf) {
int32_t code = 0;
int32_t n = 0;
// bitmap
if (pColData->flag != HAS_VALUE) {
code = tsdbCmprData(pColData->pBitMap, BIT2_SIZE(pColData->nVal), TSDB_DATA_TYPE_TINYINT, cmprAlg,
pBlockCol->ppData, n, &pBlockCol->szBitmap, ppBuf);
if (code) goto _exit;
} else {
pBlockCol->szBitmap = 0;
}
n += pBlockCol->szBitmap;
// offset
if (IS_VAR_DATA_TYPE(pColData->type)) {
code = tsdbCmprData((uint8_t *)pColData->aOffset, sizeof(int32_t) * pColData->nVal, TSDB_DATA_TYPE_INT, cmprAlg,
pBlockCol->ppData, n, &pBlockCol->szOffset, ppBuf);
if (code) goto _exit;
} else {
pBlockCol->szOffset = 0;
}
n += pBlockCol->szOffset;
// value
if (pColData->flag != (HAS_NULL | HAS_NONE)) {
code = tsdbCmprData((uint8_t *)pColData->pData, pColData->nData, pColData->type, cmprAlg, pBlockCol->ppData, n,
&pBlockCol->szValue, ppBuf);
if (code) goto _exit;
} else {
pBlockCol->szValue = 0;
}
n += pBlockCol->szValue;
// checksum
n += sizeof(TSCKSUM);
taosCalcChecksumAppend(0, *ppBuf, n);
_exit:
return code;
}
static int32_t tsdbDecmprData() {
int32_t code = 0;
// TODO
return code;
}
int32_t tDiskDataInit(SDiskData *pDiskData) {
int32_t code = 0;
pDiskData->aBlockCol = taosArrayInit(0, sizeof(SBlockCol));
if (pDiskData->aBlockCol == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pDiskData->aBuf = taosArrayInit(0, sizeof(uint8_t *));
if (pDiskData->aBuf == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
_exit:
return code;
}
void tDiskDataClear(SDiskData *pDiskData) {
taosArrayDestroy(pDiskData->aBlockCol);
for (int32_t i = 0; i < taosArrayGetSize(pDiskData->aBuf); i++) {
tFree((uint8_t *)taosArrayGet(pDiskData->aBuf, i));
}
taosArrayDestroy(pDiskData->aBuf);
}
static uint8_t **tDiskDataAllocBuf(SDiskData *pDiskData) {
if (pDiskData->nBuf >= taosArrayGetSize(pDiskData->aBuf)) {
uint8_t *p = NULL;
if (taosArrayPush(pDiskData->aBuf, &p) == NULL) {
return NULL;
}
}
ASSERT(pDiskData->nBuf < taosArrayGetSize(pDiskData->aBuf));
uint8_t **pp = taosArrayGet(pDiskData->aBuf, pDiskData->nBuf);
pDiskData->nBuf++;
return pp;
}
int32_t tBlockToDiskData(SBlockData *pBlockData, SDiskData *pDiskData, int8_t cmprAlg) {
int32_t code = 0;
ASSERT(pBlockData->nRow > 0);
pDiskData->cmprAlg = cmprAlg;
pDiskData->nRow = pBlockData->nRow;
pDiskData->suid = pBlockData->suid;
pDiskData->uid = pBlockData->uid;
pDiskData->szUid = 0;
pDiskData->szVer = 0;
pDiskData->szKey = 0;
taosArrayClear(pDiskData->aBlockCol);
pDiskData->nBuf = 0;
// uid
if (pDiskData->uid == 0) {
code = tsdbCmprData((uint8_t *)pBlockData->aUid, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT, cmprAlg,
&pDiskData->pUid, &pDiskData->szUid, &pDiskData->pBuf);
if (code) goto _exit;
}
// version
code = tsdbCmprData((uint8_t *)pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow, TSDB_DATA_TYPE_BIGINT,
cmprAlg, &pDiskData->pVer, &pDiskData->szVer, &pDiskData->pBuf);
if (code) goto _exit;
// tskey
code = tsdbCmprData((uint8_t *)pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow, TSDB_DATA_TYPE_TIMESTAMP,
cmprAlg, &pDiskData->pKey, &pDiskData->szKey, &pDiskData->pBuf);
if (code) goto _exit;
// columns
int32_t offset = 0;
for (int32_t iColData = 0; iColData < taosArrayGetSize(pBlockData->aIdx); iColData++) {
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
if (pColData->flag == HAS_NONE) continue;
SBlockCol blockCol = {.cid = pColData->cid,
.type = pColData->type,
.smaOn = pColData->smaOn,
.flag = pColData->flag,
.szOrigin = pColData->nData};
if (pColData->flag != HAS_NULL) {
// alloc a buffer
blockCol.ppData = tDiskDataAllocBuf(pDiskData);
if (blockCol.ppData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
// compress
code = tsdbCmprColData(pColData, cmprAlg, &blockCol);
if (code) goto _exit;
// update offset
blockCol.offset = offset;
offset = offset + blockCol.szBitmap + blockCol.szOffset + blockCol.szValue;
}
if (taosArrayPush(pDiskData->aBlockCol, &blockCol) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
}
_exit:
return code;
}
int32_t tBlockToDiskData(SBlockData *pBlockData, SDiskData *pDiskData, int8_t cmprAlg);
int32_t tDiskToBlockData(SDiskData *pDiskData, SBlockData *pBlockData);
// ALGORITHM ==============================
void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) {
SColVal colVal;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册