提交 fc50d941 编写于 作者: H Hongze Cheng

more coe

上级 16cd9db4
......@@ -70,6 +70,9 @@ typedef struct SDiskData SDiskData;
typedef struct SDiskDataBuilder SDiskDataBuilder;
typedef struct SBlkInfo SBlkInfo;
#define TSDBROW_ROW_FMT ((int8_t)0x0)
#define TSDBROW_COL_FMT ((int8_t)0x1)
#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F)
#define TSDB_MAX_SUBBLOCKS 8
#define TSDB_FHDR_SIZE 512
......@@ -106,8 +109,9 @@ static FORCE_INLINE int64_t tsdbLogicToFileSize(int64_t lSize, int32_t szPage) {
#define TSDBROW_VERSION(ROW) (((ROW)->type == 0) ? (ROW)->version : (ROW)->pBlockData->aVersion[(ROW)->iRow])
#define TSDBROW_SVERSION(ROW) TD_ROW_SVER((ROW)->pTSRow)
#define TSDBROW_KEY(ROW) ((TSDBKEY){.version = TSDBROW_VERSION(ROW), .ts = TSDBROW_TS(ROW)})
#define tsdbRowFromTSRow(VERSION, TSROW) ((TSDBROW){.type = 0, .version = (VERSION), .pTSRow = (TSROW)})
#define tsdbRowFromBlockData(BLOCKDATA, IROW) ((TSDBROW){.type = 1, .pBlockData = (BLOCKDATA), .iRow = (IROW)})
#define tsdbRowFromTSRow(VERSION, TSROW) ((TSDBROW){.type = TSDBROW_ROW_FMT, .version = (VERSION), .pTSRow = (TSROW)})
#define tsdbRowFromBlockData(BLOCKDATA, IROW) \
((TSDBROW){.type = TSDBROW_COL_FMT, .pBlockData = (BLOCKDATA), .iRow = (IROW)})
void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
// int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow);
int32_t tsdbRowCmprFn(const void *p1, const void *p2);
......@@ -335,10 +339,13 @@ struct SVersionRange {
typedef struct SMemSkipListNode SMemSkipListNode;
struct SMemSkipListNode {
int8_t level;
int8_t flag; // TSDBROW_ROW_FMT for row format, TSDBROW_COL_FMT for col format
int32_t iRow;
int64_t version;
SRow *pTSRow;
void *pData;
SMemSkipListNode *forwards[0];
};
typedef struct SMemSkipList {
int64_t size;
uint32_t seed;
......@@ -376,7 +383,7 @@ struct SMemTable {
};
struct TSDBROW {
int8_t type; // 0 for row from tsRow, 1 for row from block data
int8_t type; // TSDBROW_ROW_FMT for row from tsRow, TSDBROW_COL_FMT for row from block data
union {
struct {
int64_t version;
......@@ -795,8 +802,13 @@ static FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) {
}
pIter->pRow = &pIter->row;
pIter->pRow->version = pIter->pNode->version;
pIter->pRow->pTSRow = pIter->pNode->pTSRow;
if (pIter->pNode->flag == TSDBROW_ROW_FMT) {
pIter->row = tsdbRowFromTSRow(pIter->pNode->version, pIter->pNode->pData);
} else if (pIter->pNode->flag == TSDBROW_COL_FMT) {
pIter->row = tsdbRowFromBlockData(pIter->pNode->pData, pIter->pNode->iRow);
} else {
ASSERT(0);
}
return pIter->pRow;
}
......
......@@ -247,7 +247,6 @@ void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDa
pIter->pTbData = pTbData;
pIter->backward = backward;
pIter->pRow = NULL;
pIter->row.type = 0;
if (pFrom == NULL) {
// create from head or tail
if (backward) {
......@@ -415,8 +414,13 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p
for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
pn = SL_NODE_BACKWARD(px, iLevel);
while (pn != pTbData->sl.pHead) {
if (pn->flag == TSDBROW_ROW_FMT) {
tKey.version = pn->version;
tKey.ts = pn->pTSRow->ts;
tKey.ts = ((SRow *)pn->pData)->ts;
} else if (pn->flag == TSDBROW_COL_FMT) {
tKey.version = ((SBlockData *)pn->pData)->aVersion[pn->iRow];
tKey.ts = ((SBlockData *)pn->pData)->aTSKEY[pn->iRow];
}
int32_t c = tsdbKeyCmprFn(&tKey, pKey);
if (c <= 0) {
......@@ -445,8 +449,13 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p
for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
pn = SL_NODE_FORWARD(px, iLevel);
while (pn != pTbData->sl.pTail) {
if (pn->flag == TSDBROW_ROW_FMT) {
tKey.version = pn->version;
tKey.ts = pn->pTSRow->ts;
tKey.ts = ((SRow *)pn->pData)->ts;
} else if (pn->flag == TSDBROW_COL_FMT) {
tKey.version = ((SBlockData *)pn->pData)->aVersion[pn->iRow];
tKey.ts = ((SBlockData *)pn->pData)->aTSKEY[pn->iRow];
}
int32_t c = tsdbKeyCmprFn(&tKey, pKey);
if (c >= 0) {
......@@ -473,29 +482,37 @@ static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
return level;
}
static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode **pos, int64_t version, SRow *pRow,
static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode **pos, TSDBROW *pRow,
int8_t forward) {
int32_t code = 0;
int8_t level;
SMemSkipListNode *pNode;
SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
ASSERT(pPool != NULL);
// node
level = tsdbMemSkipListRandLevel(&pTbData->sl);
ASSERT(pPool != NULL);
pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level));
if (pNode == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pNode->level = level;
pNode->version = version;
pNode->pTSRow = vnodeBufPoolMalloc(pPool, pRow->len);
if (NULL == pNode->pTSRow) {
pNode->flag = pRow->type;
if (pRow->type == TSDBROW_ROW_FMT) {
pNode->version = pRow->version;
pNode->pData = vnodeBufPoolMalloc(pPool, pRow->pTSRow->len);
if (NULL == pNode->pData) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
memcpy(pNode->pTSRow, pRow, pRow->len);
memcpy(pNode->pData, pRow->pTSRow, pRow->pTSRow->len);
} else if (pRow->type == TSDBROW_COL_FMT) {
pNode->iRow = pRow->iRow;
pNode->pData = pRow->pBlockData;
}
for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
SMemSkipListNode *pn = pos[iLevel];
......@@ -589,13 +606,12 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData,
TSDBKEY key = {.version = version};
SMemSkipListNode *pos[SL_MAX_LEVEL];
int32_t iRow = 0;
TSDBROW row = tsdbRowFromBlockData(pBlockData, iRow);
TSDBROW tRow = {.type = TSDBROW_COL_FMT, .pBlockData = pBlockData};
// TODO
tRow.iRow = iRow++;
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
code = tbDataDoPut(pMemTable, pTbData, pos, version, NULL, 0);
code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0);
if (code) goto _exit;
++iRow;
if (iRow < pBlockData->nRow) {
for (int8_t iLevel = pos[0]->level; iLevel < pTbData->sl.maxLevel; iLevel++) {
......@@ -607,9 +623,9 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData,
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
}
code = tbDataDoPut(pMemTable, pTbData, pos, version, NULL, 0);
tRow.iRow = ++iRow;
code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0);
if (code) goto _exit;
++iRow;
}
}
......@@ -621,25 +637,25 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData,
SSubmitTbData *pSubmitTbData, int32_t *affectedRows) {
int32_t code = 0;
SRow **rows = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);
int32_t nRow = TARRAY_SIZE(pSubmitTbData->aRowP);
SRow **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);
TSDBKEY key = {.version = version};
SMemSkipListNode *pos[SL_MAX_LEVEL];
TSDBROW row = tsdbRowFromTSRow(version, NULL);
int32_t nRow = TARRAY_SIZE(pSubmitTbData->aRowP);
TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version};
int32_t iRow = 0;
SRow *pLastRow = NULL;
// backward put first data
row.pTSRow = rows[iRow];
key.ts = row.pTSRow->ts;
tRow.pTSRow = aRow[iRow];
key.ts = aRow[iRow]->ts;
iRow++;
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
code = tbDataDoPut(pMemTable, pTbData, pos, version, row.pTSRow, 0);
code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0);
if (code) goto _exit;
pTbData->minKey = TMIN(pTbData->minKey, key.ts);
pLastRow = row.pTSRow;
pLastRow = tRow.pTSRow;
// forward put rest data
if (iRow < nRow) {
......@@ -648,17 +664,17 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData,
}
while (iRow < nRow) {
row.pTSRow = rows[iRow];
key.ts = row.pTSRow->ts;
tRow.pTSRow = aRow[iRow];
key.ts = tRow.pTSRow->ts;
if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) {
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
}
code = tbDataDoPut(pMemTable, pTbData, pos, version, row.pTSRow, 1);
code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 1);
if (code) goto _exit;
pLastRow = row.pTSRow;
pLastRow = tRow.pTSRow;
iRow++;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册