提交 915e9a8a 编写于 作者: H Hongze Cheng

adjust more code

上级 7765f542
...@@ -602,18 +602,23 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, ...@@ -602,18 +602,23 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData,
pBlockData->nColData = nColData - 1; pBlockData->nColData = nColData - 1;
for (int32_t iColData = 1; iColData < nColData; ++iColData) { for (int32_t iColData = 1; iColData < nColData; ++iColData) {
// todo
} }
// loop to add each row to the skiplist // loop to add each row to the skiplist
TSDBKEY key = {.version = version};
SMemSkipListNode *pos[SL_MAX_LEVEL]; SMemSkipListNode *pos[SL_MAX_LEVEL];
TSDBROW tRow = {.type = TSDBROW_COL_FMT, .pBlockData = pBlockData}; TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0);
TSDBKEY key = {.version = version, .ts = pBlockData->aTSKEY[0]};
TSDBROW lRow; // last row
// first row
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD); tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0); if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0))) goto _exit;
if (code) goto _exit; pTbData->minKey = TMIN(pTbData->minKey, key.ts);
lRow = tRow;
tRow.iRow++; // remain row
++tRow.iRow;
if (tRow.iRow < pBlockData->nRow) { if (tRow.iRow < pBlockData->nRow) {
for (int8_t iLevel = pos[0]->level; iLevel < pTbData->sl.maxLevel; iLevel++) { for (int8_t iLevel = pos[0]->level; iLevel < pTbData->sl.maxLevel; iLevel++) {
pos[iLevel] = SL_NODE_BACKWARD(pos[iLevel], iLevel); pos[iLevel] = SL_NODE_BACKWARD(pos[iLevel], iLevel);
...@@ -626,13 +631,32 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, ...@@ -626,13 +631,32 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData,
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS); tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
} }
code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0); if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0))) goto _exit;
if (code) goto _exit; lRow = tRow;
++tRow.iRow;
}
}
if (key.ts >= pTbData->maxKey) {
pTbData->maxKey = key.ts;
tRow.iRow++; // if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && pLastRow != NULL) {
// tsdbCacheInsertLastrow(pMemTable->pTsdb->lruCache, pMemTable->pTsdb, pTbData->uid, pLastRow, true);
// }
} }
if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) {
// tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, pLastRow, pMemTable->pTsdb);
} }
// SMemTable
pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
pMemTable->maxKey = TMAX(pMemTable->maxKey, pTbData->maxKey);
pMemTable->nRow += pBlockData->nRow;
if (affectedRows) *affectedRows = pBlockData->nRow;
_exit: _exit:
return code; return code;
} }
...@@ -650,9 +674,8 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, ...@@ -650,9 +674,8 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData,
SRow *pLastRow = NULL; SRow *pLastRow = NULL;
// backward put first data // backward put first data
tRow.pTSRow = aRow[iRow]; tRow.pTSRow = aRow[iRow++];
key.ts = aRow[iRow]->ts; key.ts = tRow.pTSRow->ts;
iRow++;
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD); tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0); code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0);
if (code) goto _exit; if (code) goto _exit;
...@@ -668,7 +691,7 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, ...@@ -668,7 +691,7 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData,
} }
while (iRow < nRow) { while (iRow < nRow) {
tRow.pTSRow = aRow[iRow]; tRow.pTSRow = aRow[iRow++];
key.ts = tRow.pTSRow->ts; key.ts = tRow.pTSRow->ts;
if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) { if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) {
...@@ -685,9 +708,7 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, ...@@ -685,9 +708,7 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData,
} }
if (key.ts >= pTbData->maxKey) { if (key.ts >= pTbData->maxKey) {
if (key.ts > pTbData->maxKey) {
pTbData->maxKey = key.ts; pTbData->maxKey = key.ts;
}
if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && pLastRow != NULL) { if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && pLastRow != NULL) {
tsdbCacheInsertLastrow(pMemTable->pTsdb->lruCache, pMemTable->pTsdb, pTbData->uid, pLastRow, true); tsdbCacheInsertLastrow(pMemTable->pTsdb->lruCache, pMemTable->pTsdb, pTbData->uid, pLastRow, true);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册