提交 f51c8643 编写于 作者: H Hongze Cheng

more code

上级 b9a248d7
......@@ -15,8 +15,9 @@
#include "tsdb.h"
typedef struct SDiskData SDiskData;
typedef struct SDiskCol SDiskCol;
typedef struct SDiskDataBuilder SDiskDataBuilder;
typedef struct SDiskCol SDiskCol;
typedef struct SDiskData SDiskData;
struct SDiskCol {
int16_t cid;
......@@ -70,6 +71,34 @@ static int32_t tDiskColClear(SDiskCol *pDiskCol) {
return code;
}
static int32_t tDiskColToBinary(SDiskCol *pDiskCol, const uint8_t **ppData, int32_t *nData) {
int32_t code = 0;
ASSERT(pDiskCol->flag && pDiskCol->flag != HAS_NONE);
if (pDiskCol->flag == HAS_NULL) {
return code;
}
// bitmap (todo)
if (pDiskCol->flag != HAS_VALUE) {
}
// offset (todo)
if (IS_VAR_DATA_TYPE(pDiskCol->type)) {
code = tCompGen(pDiskCol->pOffC, NULL /* todo */, NULL /* todo */);
if (code) return code;
}
// value (todo)
if (pDiskCol->flag != (HAS_NULL | HAS_NONE)) {
code = tCompGen(pDiskCol->pValC, NULL /* todo */, NULL /* todo */);
if (code) return code;
}
return code;
}
static int32_t tDiskColAddValue(SDiskCol *pDiskCol, SColVal *pColVal) {
int32_t code = 0;
......@@ -338,8 +367,8 @@ static int32_t (*tDiskColAddValImpl[])(SDiskCol *pDiskCol, SColVal *pColVal) = {
tDiskColAddVal7, // HAS_VALUE|HAS_NULL|HAS_NONE
};
// SDiskData ================================================
struct SDiskData {
// SDiskDataBuilder ================================================
struct SDiskDataBuilder {
int64_t suid;
int64_t uid;
int32_t nRow;
......@@ -352,121 +381,121 @@ struct SDiskData {
uint8_t *aBuf[2];
};
int32_t tDiskDataInit(SDiskData *pDiskData, STSchema *pTSchema, TABLEID *pId, uint8_t cmprAlg) {
int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TABLEID *pId, uint8_t cmprAlg) {
int32_t code = 0;
pDiskData->suid = pId->suid;
pDiskData->uid = pId->uid;
pDiskData->nRow = 0;
pDiskData->cmprAlg = cmprAlg;
pBuilder->suid = pId->suid;
pBuilder->uid = pId->uid;
pBuilder->nRow = 0;
pBuilder->cmprAlg = cmprAlg;
if (pDiskData->pUidC == NULL) {
code = tCompressorCreate(&pDiskData->pUidC);
if (pBuilder->pUidC == NULL) {
code = tCompressorCreate(&pBuilder->pUidC);
if (code) return code;
}
code = tCompressorReset(pDiskData->pUidC, TSDB_DATA_TYPE_BIGINT, cmprAlg);
code = tCompressorReset(pBuilder->pUidC, TSDB_DATA_TYPE_BIGINT, cmprAlg);
if (code) return code;
if (pDiskData->pVerC == NULL) {
code = tCompressorCreate(&pDiskData->pVerC);
if (pBuilder->pVerC == NULL) {
code = tCompressorCreate(&pBuilder->pVerC);
if (code) return code;
}
code = tCompressorReset(pDiskData->pVerC, TSDB_DATA_TYPE_BIGINT, cmprAlg);
code = tCompressorReset(pBuilder->pVerC, TSDB_DATA_TYPE_BIGINT, cmprAlg);
if (code) return code;
if (pDiskData->pKeyC == NULL) {
code = tCompressorCreate(&pDiskData->pKeyC);
if (pBuilder->pKeyC == NULL) {
code = tCompressorCreate(&pBuilder->pKeyC);
if (code) return code;
}
code = tCompressorReset(pDiskData->pKeyC, TSDB_DATA_TYPE_TIMESTAMP, cmprAlg);
code = tCompressorReset(pBuilder->pKeyC, TSDB_DATA_TYPE_TIMESTAMP, cmprAlg);
if (code) return code;
if (pDiskData->aDiskCol == NULL) {
pDiskData->aDiskCol = taosArrayInit(pTSchema->numOfCols - 1, sizeof(SDiskCol));
if (pDiskData->aDiskCol == NULL) {
if (pBuilder->aDiskCol == NULL) {
pBuilder->aDiskCol = taosArrayInit(pTSchema->numOfCols - 1, sizeof(SDiskCol));
if (pBuilder->aDiskCol == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
}
pDiskData->nDiskCol = 0;
pBuilder->nDiskCol = 0;
for (int32_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) {
STColumn *pTColumn = &pTSchema->columns[iCol];
if (pDiskData->nDiskCol >= taosArrayGetSize(pDiskData->aDiskCol)) {
if (pBuilder->nDiskCol >= taosArrayGetSize(pBuilder->aDiskCol)) {
SDiskCol dc = (SDiskCol){0};
if (taosArrayPush(pDiskData->aDiskCol, &dc) == NULL) {
if (taosArrayPush(pBuilder->aDiskCol, &dc) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
}
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, pDiskData->nDiskCol);
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pBuilder->aDiskCol, pBuilder->nDiskCol);
code = tDiskColInit(pDiskCol, pTColumn->colId, pTColumn->type, cmprAlg);
if (code) return code;
pDiskData->nDiskCol++;
pBuilder->nDiskCol++;
}
return code;
}
int32_t tDiskDataDestroy(SDiskData *pDiskData) {
int32_t tDiskDataBuilderDestroy(SDiskDataBuilder *pBuilder) {
int32_t code = 0;
if (pDiskData->pUidC) tCompressorDestroy(pDiskData->pUidC);
if (pDiskData->pVerC) tCompressorDestroy(pDiskData->pVerC);
if (pDiskData->pKeyC) tCompressorDestroy(pDiskData->pKeyC);
if (pBuilder->pUidC) tCompressorDestroy(pBuilder->pUidC);
if (pBuilder->pVerC) tCompressorDestroy(pBuilder->pVerC);
if (pBuilder->pKeyC) tCompressorDestroy(pBuilder->pKeyC);
if (pDiskData->aDiskCol) {
for (int32_t iDiskCol = 0; iDiskCol < taosArrayGetSize(pDiskData->aDiskCol); iDiskCol++) {
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol);
if (pBuilder->aDiskCol) {
for (int32_t iDiskCol = 0; iDiskCol < taosArrayGetSize(pBuilder->aDiskCol); iDiskCol++) {
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pBuilder->aDiskCol, iDiskCol);
tDiskColClear(pDiskCol);
}
taosArrayDestroy(pDiskData->aDiskCol);
taosArrayDestroy(pBuilder->aDiskCol);
}
for (int32_t iBuf = 0; iBuf < sizeof(pDiskData->aBuf) / sizeof(pDiskData->aBuf[0]); iBuf++) {
tFree(pDiskData->aBuf[iBuf]);
for (int32_t iBuf = 0; iBuf < sizeof(pBuilder->aBuf) / sizeof(pBuilder->aBuf[0]); iBuf++) {
tFree(pBuilder->aBuf[iBuf]);
}
return code;
}
int32_t tDiskDataAddRow(SDiskData *pDiskData, TSDBROW *pRow, STSchema *pTSchema, TABLEID *pId) {
int32_t tDiskDataBuilderAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTSchema, TABLEID *pId) {
int32_t code = 0;
ASSERT(pId->suid == pDiskData->suid);
ASSERT(pId->suid == pBuilder->suid);
// uid
if (pDiskData->uid && pDiskData->uid != pId->uid) {
for (int32_t iRow = 0; iRow < pDiskData->nRow; iRow++) {
code = tCompress(pDiskData->pUidC, &pDiskData->uid, sizeof(int64_t));
if (pBuilder->uid && pBuilder->uid != pId->uid) {
for (int32_t iRow = 0; iRow < pBuilder->nRow; iRow++) {
code = tCompress(pBuilder->pUidC, &pBuilder->uid, sizeof(int64_t));
if (code) goto _exit;
}
pDiskData->uid = 0;
pBuilder->uid = 0;
}
if (pDiskData->uid == 0) {
code = tCompress(pDiskData->pUidC, &pId->uid, sizeof(int64_t));
if (pBuilder->uid == 0) {
code = tCompress(pBuilder->pUidC, &pId->uid, sizeof(int64_t));
if (code) goto _exit;
}
// version
int64_t version = TSDBROW_VERSION(pRow);
code = tCompress(pDiskData->pVerC, &version, sizeof(int64_t));
code = tCompress(pBuilder->pVerC, &version, sizeof(int64_t));
if (code) goto _exit;
// TSKEY
TSKEY ts = TSDBROW_TS(pRow);
code = tCompress(pDiskData->pKeyC, &ts, sizeof(int64_t));
code = tCompress(pBuilder->pKeyC, &ts, sizeof(int64_t));
if (code) goto _exit;
SRowIter iter = {0};
tRowIterInit(&iter, pRow, pTSchema);
SColVal *pColVal = tRowIterNext(&iter);
for (int32_t iDiskCol = 0; iDiskCol < pDiskData->nDiskCol; iDiskCol++) {
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol);
for (int32_t iDiskCol = 0; iDiskCol < pBuilder->nDiskCol; iDiskCol++) {
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pBuilder->aDiskCol, iDiskCol);
while (pColVal && pColVal->cid < pDiskCol->cid) {
pColVal = tRowIterNext(&iter);
......@@ -482,53 +511,53 @@ int32_t tDiskDataAddRow(SDiskData *pDiskData, TSDBROW *pRow, STSchema *pTSchema,
pColVal = tRowIterNext(&iter);
}
}
pDiskData->nRow++;
pBuilder->nRow++;
_exit:
return code;
}
int32_t tDiskDataToBinary(SDiskData *pDiskData, const uint8_t **ppData, int32_t *nData) {
int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, SDiskData *pDiskData) {
int32_t code = 0;
ASSERT(pDiskData->nRow);
ASSERT(pBuilder->nRow);
SDiskDataHdr hdr = {.delimiter = TSDB_FILE_DLMT,
.fmtVer = 0,
.suid = pDiskData->suid,
.uid = pDiskData->uid,
.suid = pBuilder->suid,
.uid = pBuilder->uid,
.szUid = 0,
.szVer = 0,
.szKey = 0,
.szBlkCol = 0,
.nRow = pDiskData->nRow,
.cmprAlg = pDiskData->cmprAlg};
.nRow = pBuilder->nRow,
.cmprAlg = pBuilder->cmprAlg};
// UID
const uint8_t *pUid = NULL;
if (pDiskData->uid == 0) {
code = tCompGen(pDiskData->pUidC, &pUid, &hdr.szUid);
if (pBuilder->uid == 0) {
code = tCompGen(pBuilder->pUidC, &pUid, &hdr.szUid);
if (code) return code;
}
// VERSION
const uint8_t *pVer = NULL;
code = tCompGen(pDiskData->pVerC, &pVer, &hdr.szVer);
code = tCompGen(pBuilder->pVerC, &pVer, &hdr.szVer);
if (code) return code;
// TSKEY
const uint8_t *pKey = NULL;
code = tCompGen(pDiskData->pKeyC, &pKey, &hdr.szKey);
code = tCompGen(pBuilder->pKeyC, &pKey, &hdr.szKey);
if (code) return code;
int32_t offset = 0;
for (int32_t iDiskCol = 0; iDiskCol < pDiskData->nDiskCol; iDiskCol++) {
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol);
for (int32_t iDiskCol = 0; iDiskCol < pBuilder->nDiskCol; iDiskCol++) {
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pBuilder->aDiskCol, iDiskCol);
if (pDiskCol->flag == HAS_NONE) continue;
// code = tDiskColToBinary(pDiskCol, );
// if (code) return code;
code = tDiskColToBinary(pDiskCol, NULL, NULL);
if (code) return code;
SBlockCol bCol = {.cid = pDiskCol->cid,
.type = pDiskCol->type,
......@@ -544,30 +573,51 @@ int32_t tDiskDataToBinary(SDiskData *pDiskData, const uint8_t **ppData, int32_t
offset = offset + bCol.szBitmap + bCol.szOffset + bCol.szValue;
}
#if 0
*nData = tPutDiskDataHdr(NULL, &hdr) + hdr.szUid + hdr.szVer + hdr.szKey + hdr.szBlkCol + offset;
code = tRealloc(&pDiskData->aBuf[0], *nData);
code = tRealloc(&pBuilder->aBuf[0], *nData);
if (code) return code;
*ppData = pDiskData->aBuf[0];
*ppData = pBuilder->aBuf[0];
int32_t n = 0;
n += tPutDiskDataHdr(pDiskData->aBuf[0] + n, &hdr);
n += tPutDiskDataHdr(pBuilder->aBuf[0] + n, &hdr);
if (hdr.szUid) {
memcpy(pDiskData->aBuf[0] + n, pUid, hdr.szUid);
memcpy(pBuilder->aBuf[0] + n, pUid, hdr.szUid);
n += hdr.szUid;
}
memcpy(pDiskData->aBuf[0] + n, pVer, hdr.szVer);
memcpy(pBuilder->aBuf[0] + n, pVer, hdr.szVer);
n += hdr.szVer;
memcpy(pDiskData->aBuf[0] + n, pKey, hdr.szKey);
memcpy(pBuilder->aBuf[0] + n, pKey, hdr.szKey);
n += hdr.szKey;
for (int32_t iDiskCol = 0; iDiskCol < pDiskData->nDiskCol; iDiskCol++) {
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol);
n += tPutBlockCol(pDiskData->aBuf[0] + n, NULL /*pDiskCol->bCol (todo) */);
for (int32_t iDiskCol = 0; iDiskCol < pBuilder->nDiskCol; iDiskCol++) {
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pBuilder->aDiskCol, iDiskCol);
n += tPutBlockCol(pBuilder->aBuf[0] + n, NULL /*pDiskCol->bCol (todo) */);
}
for (int32_t iDiskCol = 0; iDiskCol < pDiskData->nDiskCol; iDiskCol++) {
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol);
for (int32_t iDiskCol = 0; iDiskCol < pBuilder->nDiskCol; iDiskCol++) {
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pBuilder->aDiskCol, iDiskCol);
// memcpy(pDiskData->aBuf[0] + n, NULL, );
// n += 0;
}
#endif
return code;
}
// SDiskData ================================================
struct SDiskData {
SDiskDataHdr hdr;
const uint8_t *pUid;
const uint8_t *pVer;
const uint8_t *pKey;
SArray *aBlockCol;
SArray *aColData;
};
int32_t tDiskDataDestroy(SDiskData *pDiskData) {
int32_t code = 0;
if (pDiskData->aBlockCol) taosArrayDestroy(pDiskData->aBlockCol);
if (pDiskData->aColData) taosArrayDestroy(pDiskData->aColData);
return code;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册