提交 a8b91991 编写于 作者: H Hongze Cheng

more code

上级 0929326c
......@@ -32,6 +32,12 @@ extern "C" {
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSD ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
#define TSDB_CHECK_CODE(CODE, LINO, LABEL) \
if (CODE) { \
LINO = __LINE__; \
goto LABEL; \
}
typedef struct TSDBROW TSDBROW;
typedef struct TABLEID TABLEID;
typedef struct TSDBKEY TSDBKEY;
......@@ -65,6 +71,8 @@ typedef struct SSmaInfo SSmaInfo;
typedef struct SBlockCol SBlockCol;
typedef struct SVersionRange SVersionRange;
typedef struct SLDataIter SLDataIter;
typedef struct SDiskCol SDiskCol;
typedef struct SDiskData SDiskData;
#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F)
#define TSDB_MAX_SUBBLOCKS 8
......@@ -652,6 +660,21 @@ typedef struct {
STSchema *pTSchema;
} SSkmInfo;
struct SDiskCol {
SBlockCol bCol;
const uint8_t *pBit;
const uint8_t *pOff;
const uint8_t *pVal;
};
struct SDiskData {
SDiskDataHdr hdr;
const uint8_t *pUid;
const uint8_t *pVer;
const uint8_t *pKey;
SArray *aDiskCol; // SArray<SDiskCol>
};
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange, void *pLoadInfo, const char *idStr);
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
......
......@@ -17,8 +17,6 @@
typedef struct SDiskDataBuilder SDiskDataBuilder;
typedef struct SDiskColBuilder SDiskColBuilder;
typedef struct SDiskCol SDiskCol;
typedef struct SDiskData SDiskData;
struct SDiskColBuilder {
int16_t cid;
......@@ -45,21 +43,6 @@ struct SDiskDataBuilder {
uint8_t *aBuf[2];
};
struct SDiskCol {
SBlockCol bCol;
const uint8_t *pBit;
const uint8_t *pOff;
const uint8_t *pVal;
};
struct SDiskData {
SDiskDataHdr hdr;
const uint8_t *pUid;
const uint8_t *pVer;
const uint8_t *pKey;
SArray *aDiskCol; // SArray<SDiskCol>
};
// SDiskColBuilder ================================================
static int32_t tDiskColInit(SDiskColBuilder *pBuilder, int16_t cid, int8_t type, uint8_t cmprAlg) {
int32_t code = 0;
......@@ -147,9 +130,13 @@ static int32_t tDiskColAddValue(SDiskColBuilder *pBuilder, SColVal *pColVal) {
code = tCompress(pBuilder->pOffC, &pBuilder->offset, sizeof(int32_t));
if (code) goto _exit;
pBuilder->offset += pColVal->value.nData;
code = tCompress(pBuilder->pValC, pColVal->value.pData, pColVal->value.nData);
if (code) goto _exit;
} else {
code = tCompress(pBuilder->pValC, &pColVal->value.val, tDataTypes[pColVal->type].bytes);
if (code) goto _exit;
}
code = tCompress(pBuilder->pValC, pColVal->value.pData, pColVal->value.nData /*TODO*/);
if (code) goto _exit;
_exit:
return code;
......
......@@ -607,6 +607,93 @@ _err:
return code;
}
int32_t tsdbWriteDiskData(SDataFWriter *pWriter, SDiskData *pDiskData, SBlockInfo *pBlkInfo) {
int32_t code = 0;
int32_t lino = 0;
STsdbFD *pFD = pWriter->pDataFD; // todo
int64_t offset = pWriter->fData.size;
// hdr
int32_t n = tPutDiskDataHdr(NULL, &pDiskData->hdr);
code = tRealloc(&pWriter->aBuf[0], n);
TSDB_CHECK_CODE(code, lino, _exit);
tPutDiskDataHdr(pWriter->aBuf[0], &pDiskData->hdr);
code = tsdbWriteFile(pFD, offset, pWriter->aBuf[0], n);
TSDB_CHECK_CODE(code, lino, _exit);
offset += n;
// uid + ver + key
if (pDiskData->hdr.szUid) {
code = tsdbWriteFile(pFD, offset, pDiskData->pUid, pDiskData->hdr.szUid);
TSDB_CHECK_CODE(code, lino, _exit);
offset += pDiskData->hdr.szUid;
}
code = tsdbWriteFile(pFD, offset, pDiskData->pVer, pDiskData->hdr.szVer);
TSDB_CHECK_CODE(code, lino, _exit);
offset += pDiskData->hdr.szVer;
code = tsdbWriteFile(pFD, offset, pDiskData->pKey, pDiskData->hdr.szKey);
TSDB_CHECK_CODE(code, lino, _exit);
offset += pDiskData->hdr.szKey;
// SBlockCol
if (pDiskData->hdr.szBlkCol) {
code = tRealloc(&pWriter->aBuf[0], pDiskData->hdr.szBlkCol);
TSDB_CHECK_CODE(code, lino, _exit);
n = 0;
for (int32_t iDiskCol = 0; iDiskCol < taosArrayGetSize(pDiskData->aDiskCol); iDiskCol++) {
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol);
n += tPutBlockCol(pWriter->aBuf[0] + n, &pDiskCol->bCol);
}
ASSERT(n == pDiskData->hdr.szBlkCol);
code = tsdbWriteFile(pFD, offset, pWriter->aBuf[0], pDiskData->hdr.szBlkCol);
TSDB_CHECK_CODE(code, lino, _exit);
offset += pDiskData->hdr.szBlkCol;
}
// pData
for (int32_t iDiskCol = 0; iDiskCol < taosArrayGetSize(pDiskData->aDiskCol); iDiskCol++) {
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pDiskData->aDiskCol, iDiskCol);
if (pDiskCol->bCol.flag == HAS_NULL) continue;
if (pDiskCol->bCol.szBitmap) {
code = tsdbWriteFile(pFD, offset, pDiskCol->pBit, pDiskCol->bCol.szBitmap);
TSDB_CHECK_CODE(code, lino, _exit);
offset += pDiskCol->bCol.szBitmap;
}
if (pDiskCol->bCol.szOffset) {
code = tsdbWriteFile(pFD, offset, pDiskCol->pOff, pDiskCol->bCol.szOffset);
TSDB_CHECK_CODE(code, lino, _exit);
offset += pDiskCol->bCol.szOffset;
}
if (pDiskCol->bCol.szValue) {
code = tsdbWriteFile(pFD, offset, pDiskCol->pVal, pDiskCol->bCol.szValue);
TSDB_CHECK_CODE(code, lino, _exit);
offset += pDiskCol->bCol.szValue;
}
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code));
} else {
tsdbDebug("vgId:%d %s", TD_VID(pWriter->pTsdb->pVnode), __func__);
}
return code;
}
int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) {
int32_t code = 0;
int64_t n;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册