diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index acf08bd47e3cf9ae756fae10aab6ed9a4b23f34c..60f8d28c12e9d8c5f7d9ba2f2104923eaaf92af0 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -205,6 +205,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_ALTER_HASHRANGE, "alter-hashrange", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "compact", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_TTL_TABLE, "drop-ttl-stb", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_COMMIT, "commit vnode", NULL, NULL) TD_NEW_MSG_SEG(TDMT_STREAM_MSG) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 9d7cfc055297118a1fafb7b30eab9c438e4cc40b..a8b58227498c63b0322745d4f5e672cf048749fe 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -250,8 +250,15 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx, uint8_t **ppBuf); int32_t tsdbOpenCache(STsdb *pTsdb); void tsdbCloseCache(SLRUCache *pCache); int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row); + +// bug api, deprecated, USE H version int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow); -int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid); + +int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h); +int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h); +int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h); + +int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey); // structs ======================= typedef struct { diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 2f6901cb244a15e28f71bdd367aa4676e86b6176..0cd555c5ac2262c218c50c5a2adde0ccbcaa2863 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -51,6 +51,44 @@ static void getTableCacheKey(tb_uid_t uid, const char *cacheType, char *key, int static void deleteTableCacheLastrow(const void *key, size_t keyLen, void *value) { taosMemoryFree(value); } +static int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) { + int32_t code = 0; + char key[32] = {0}; + int keyLen = 0; + + getTableCacheKey(uid, "lr", key, &keyLen); + LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); + if (h) { + STSRow *pRow = (STSRow *)taosLRUCacheValue(pCache, h); + if (pRow->ts <= eKey) { + taosLRUCacheRelease(pCache, h, true); + } else { + taosLRUCacheRelease(pCache, h, false); + } + + // void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen); + } + + return code; +} + +static int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) { + int32_t code = 0; + char key[32] = {0}; + int keyLen = 0; + + getTableCacheKey(uid, "l", key, &keyLen); + LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); + if (h) { + // clear last cache anyway, no matter where eKey ends. + taosLRUCacheRelease(pCache, h, true); + + // void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen); + } + + return code; +} + int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row) { int32_t code = 0; STSRow *cacheRow = NULL; @@ -65,7 +103,7 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row) { if (TD_ROW_LEN(row) <= TD_ROW_LEN(cacheRow)) { tdRowCpy(cacheRow, row); } else { - tsdbCacheDeleteLastrow(pCache, uid); + tsdbCacheDeleteLastrow(pCache, uid, TSKEY_MAX); tsdbCacheInsertLastrow(pCache, uid, row); } } @@ -97,7 +135,7 @@ int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row) { if (TD_ROW_LEN(row) <= TD_ROW_LEN(cacheRow)) { tdRowCpy(cacheRow, row); } else { - tsdbCacheDeleteLastrow(pCache, uid); + tsdbCacheDeleteLast(pCache, uid, TSKEY_MAX); tsdbCacheInsertLastrow(pCache, uid, row); } } @@ -581,6 +619,10 @@ typedef struct TsdbNextRowState { static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { int32_t code = 0; + STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1); + int16_t nCol = pTSchema->numOfCols; + SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal)); + tb_uid_t suid = getTableSuidByUid(uid, pTsdb); STbData *pMem = NULL; @@ -597,6 +639,8 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { SArray *pSkyline = taosArrayInit(32, sizeof(TSDBKEY)); + SDelIdx delIdx; + SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState); if (pDelFile) { SDelFReader *pDelFReader; @@ -604,7 +648,6 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL); if (code) goto _err; - SDelIdx delIdx; code = getTableDelIdx(pDelFReader, suid, uid, &delIdx); if (code) goto _err; @@ -612,6 +655,9 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { if (code) goto _err; tsdbDelFReaderClose(pDelFReader); + } else { + code = getTableDelSkyline(pMem, pIMem, NULL, NULL, pSkyline); + if (code) goto _err; } int iSkyline = taosArrayGetSize(pSkyline) - 1; @@ -644,7 +690,9 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { input[1].next = true; } - STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1); + int16_t nilColCount = nCol - 1; // count of null & none cols + int iCol = 0; // index of first nil col index from left to right + bool setICol = false; do { for (int i = 0; i < 3; ++i) { @@ -667,15 +715,17 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { TSDBROW *max[3] = {0}; int iMax[3] = {-1, -1, -1}; int nMax = 0; + TSKEY maxKey = TSKEY_MIN; + for (int i = 0; i < 3; ++i) { - if (input[i].pRow != NULL) { + if (!input[i].stop && input[i].pRow != NULL) { TSDBKEY key = TSDBROW_KEY(input[i].pRow); - TSDBKEY maxKey = TSDBROW_KEY(max[nMax]); // merging & deduplicating on client side - if (maxKey.ts <= key.ts) { - if (maxKey.ts < key.ts) { + if (maxKey <= key.ts) { + if (maxKey < key.ts) { nMax = 0; + maxKey = key.ts; } iMax[nMax] = i; @@ -686,6 +736,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { // delete detection TSDBROW *merge[3] = {0}; + int iMerge[3] = {-1, -1, -1}; int nMerge = 0; for (int i = 0; i < nMax; ++i) { TSDBKEY maxKey = TSDBROW_KEY(max[i]); @@ -693,6 +744,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { // bool deleted = false; bool deleted = tsdbKeyDeleted(&maxKey, pSkyline, &iSkyline); if (!deleted) { + iMerge[nMerge] = i; merge[nMerge++] = max[i]; } @@ -716,6 +768,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { tRowMergerClear(&merger); } } + } while (*ppRow == NULL); taosMemoryFreeClear(pTSchema); @@ -727,6 +780,245 @@ _err: return code; } +static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { + int32_t code = 0; + + STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1); + int16_t nCol = pTSchema->numOfCols; + SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal)); + + tb_uid_t suid = getTableSuidByUid(uid, pTsdb); + + STbData *pMem = NULL; + if (pTsdb->mem) { + tsdbGetTbDataFromMemTable(pTsdb->mem, suid, uid, &pMem); + } + + STbData *pIMem = NULL; + if (pTsdb->imem) { + tsdbGetTbDataFromMemTable(pTsdb->imem, suid, uid, &pIMem); + } + + *ppRow = NULL; + + SArray *pSkyline = taosArrayInit(32, sizeof(TSDBKEY)); + + SDelIdx delIdx; + + SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState); + if (pDelFile) { + SDelFReader *pDelFReader; + + code = tsdbDelFReaderOpen(&pDelFReader, pDelFile, pTsdb, NULL); + if (code) goto _err; + + code = getTableDelIdx(pDelFReader, suid, uid, &delIdx); + if (code) goto _err; + + code = getTableDelSkyline(pMem, pIMem, pDelFReader, &delIdx, pSkyline); + if (code) goto _err; + + tsdbDelFReaderClose(pDelFReader); + } else { + code = getTableDelSkyline(pMem, pIMem, NULL, NULL, pSkyline); + if (code) goto _err; + } + + int iSkyline = taosArrayGetSize(pSkyline) - 1; + + SBlockIdx idx = {.suid = suid, .uid = uid}; + + SFSNextRowIter fsState = {0}; + fsState.state = SFSNEXTROW_FS; + fsState.pTsdb = pTsdb; + fsState.pBlockIdxExp = &idx; + + SMemNextRowIter memState = {0}; + SMemNextRowIter imemState = {0}; + TSDBROW memRow, imemRow, fsRow; + + TsdbNextRowState input[3] = {{&memRow, true, false, &memState, getNextRowFromMem}, + {&imemRow, true, false, &imemState, getNextRowFromMem}, + {&fsRow, false, true, &fsState, getNextRowFromFS}}; + + if (pMem) { + memState.pMem = pMem; + memState.state = SMEMNEXTROW_ENTER; + input[0].stop = false; + input[0].next = true; + } + if (pIMem) { + imemState.pMem = pIMem; + imemState.state = SMEMNEXTROW_ENTER; + input[1].stop = false; + input[1].next = true; + } + + int16_t nilColCount = nCol - 1; // count of null & none cols + int iCol = 0; // index of first nil col index from left to right + bool setICol = false; + + do { + for (int i = 0; i < 3; ++i) { + if (input[i].next && !input[i].stop) { + code = input[i].nextRowFn(input[i].iter, &input[i].pRow); + if (code) goto _err; + + if (input[i].pRow == NULL) { + input[i].stop = true; + input[i].next = false; + } + } + } + + if (input[0].stop && input[1].stop && input[2].stop) { + break; + } + + // select maxpoint(s) from mem, imem, fs + TSDBROW *max[3] = {0}; + int iMax[3] = {-1, -1, -1}; + int nMax = 0; + TSKEY maxKey = TSKEY_MIN; + + for (int i = 0; i < 3; ++i) { + if (!input[i].stop && input[i].pRow != NULL) { + TSDBKEY key = TSDBROW_KEY(input[i].pRow); + + // merging & deduplicating on client side + if (maxKey <= key.ts) { + if (maxKey < key.ts) { + nMax = 0; + maxKey = key.ts; + } + + iMax[nMax] = i; + max[nMax++] = input[i].pRow; + } + } + } + + // delete detection + TSDBROW *merge[3] = {0}; + int iMerge[3] = {-1, -1, -1}; + int nMerge = 0; + for (int i = 0; i < nMax; ++i) { + TSDBKEY maxKey = TSDBROW_KEY(max[i]); + + // bool deleted = false; + bool deleted = tsdbKeyDeleted(&maxKey, pSkyline, &iSkyline); + if (!deleted) { + iMerge[nMerge] = i; + merge[nMerge++] = max[i]; + } + + input[iMax[i]].next = deleted; + } + + // merge if nMerge > 1 + if (nMerge > 0) { + if (nMerge == 1) { + code = tsRowFromTsdbRow(pTSchema, merge[nMerge - 1], ppRow); + if (code) goto _err; + } else { + // merge 2 or 3 rows + SRowMerger merger = {0}; + + tRowMergerInit(&merger, merge[0], pTSchema); + for (int i = 1; i < nMerge; ++i) { + tRowMerge(&merger, merge[i]); + } + tRowMergerGetRow(&merger, ppRow); + tRowMergerClear(&merger); + } + } + + if (iCol == 0) { + STColumn *pTColumn = &pTSchema->columns[0]; + SColVal *pColVal = &(SColVal){0}; + + *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = maxKey}); + + if (taosArrayPush(pColArray, pColVal) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + ++iCol; + + setICol = false; + for (int16_t i = iCol; iCol < nCol; ++i) { + // tsdbRowGetColVal(*ppRow, pTSchema, i, pColVal); + if (taosArrayPush(pColArray, pColVal) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + if (pColVal->isNull || pColVal->isNone) { + for (int j = 0; j < nMerge; ++j) { + SColVal jColVal = {0}; + tsdbRowGetColVal(merge[j], pTSchema, i, &jColVal); + if (jColVal.isNull || jColVal.isNone) { + input[iMerge[j]].next = true; + } + } + if (!setICol) { + iCol = i; + setICol = true; + } + } else { + --nilColCount; + } + } + + continue; + } + + setICol = false; + for (int16_t i = iCol; i < nCol; ++i) { + SColVal colVal = {0}; + tTSRowGetVal(*ppRow, pTSchema, i, &colVal); + + SColVal *tColVal = (SColVal *)taosArrayGet(pColArray, i); + + if (!colVal.isNone && !colVal.isNull) { + if (tColVal->isNull || tColVal->isNone) { + taosArraySet(pColArray, i, &colVal); + --nilColCount; + } + } else { + if (tColVal->isNull || tColVal->isNone && !setICol) { + iCol = i; + setICol = true; + + for (int j = 0; j < nMerge; ++j) { + SColVal jColVal = {0}; + tsdbRowGetColVal(merge[j], pTSchema, i, &jColVal); + if (jColVal.isNull || jColVal.isNone) { + input[iMerge[j]].next = true; + } + } + } + } + } + } while (nilColCount > 0); + + // if () new ts row from pColArray if non empty + if (taosArrayGetSize(pColArray) == nCol) { + code = tdSTSRowNew(pColArray, pTSchema, ppRow); + if (code) goto _err; + } + taosArrayDestroy(pColArray); + taosMemoryFreeClear(pTSchema); + + return code; +_err: + taosArrayDestroy(pColArray); + taosMemoryFreeClear(pTSchema); + tsdbError("vgId:%d merge last_row failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + return code; +} + int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { int32_t code = 0; char key[32] = {0}; @@ -749,9 +1041,11 @@ int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRo *ppRow = (STSRow *)taosLRUCacheValue(pCache, h); } + // taosLRUCacheRelease(pCache, h, true); + return code; } - +#if 0 int32_t tsdbCacheGetLast(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { int32_t code = 0; char key[32] = {0}; @@ -763,7 +1057,7 @@ int32_t tsdbCacheGetLast(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow * *ppRow = (STSRow *)taosLRUCacheValue(pCache, h); } else { STSRow *pRow = NULL; - // code = mergeLast(uid, pTsdb, &pRow); + code = mergeLast(uid, pTsdb, &pRow); // if table's empty or error, return code of -1 if (code < 0 || pRow == NULL) { return -1; @@ -774,10 +1068,71 @@ int32_t tsdbCacheGetLast(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow * *ppRow = (STSRow *)taosLRUCacheValue(pCache, h); } + // taosLRUCacheRelease(pCache, h, true); + + return code; +} +#endif +int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **handle) { + int32_t code = 0; + char key[32] = {0}; + int keyLen = 0; + + getTableCacheKey(uid, "lr", key, &keyLen); + LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); + if (h) { + //*ppRow = (STSRow *)taosLRUCacheValue(pCache, h); + } else { + STSRow *pRow = NULL; + code = mergeLastRow(uid, pTsdb, &pRow); + // if table's empty or error, return code of -1 + if (code < 0 || pRow == NULL) { + *handle = NULL; + return 0; + } + + tsdbCacheInsertLastrow(pCache, uid, pRow); + h = taosLRUCacheLookup(pCache, key, keyLen); + //*ppRow = (STSRow *)taosLRUCacheValue(pCache, h); + } + + *handle = h; + // taosLRUCacheRelease(pCache, h, true); + return code; } -int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid) { +int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **handle) { + int32_t code = 0; + char key[32] = {0}; + int keyLen = 0; + + getTableCacheKey(uid, "l", key, &keyLen); + LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); + if (h) { + //*ppRow = (STSRow *)taosLRUCacheValue(pCache, h); + + } else { + STSRow *pRow = NULL; + code = mergeLast(uid, pTsdb, &pRow); + // if table's empty or error, return code of -1 + if (code < 0 || pRow == NULL) { + *handle = NULL; + return 0; + } + + tsdbCacheInsertLast(pCache, uid, pRow); + h = taosLRUCacheLookup(pCache, key, keyLen); + //*ppRow = (STSRow *)taosLRUCacheValue(pCache, h); + } + + *handle = h; + // taosLRUCacheRelease(pCache, h, true); + + return code; +} + +int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) { int32_t code = 0; char key[32] = {0}; int keyLen = 0; @@ -785,10 +1140,32 @@ int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid) { getTableCacheKey(uid, "lr", key, &keyLen); LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); if (h) { + STSRow *pRow = (STSRow *)taosLRUCacheValue(pCache, h); + if (pRow->ts <= eKey) { + taosLRUCacheRelease(pCache, h, true); + } else { + taosLRUCacheRelease(pCache, h, false); + } + + // void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen); + } + + getTableCacheKey(uid, "l", key, &keyLen); + h = taosLRUCacheLookup(pCache, key, keyLen); + if (h) { + // clear last cache anyway, no matter where eKey ends. taosLRUCacheRelease(pCache, h, true); - // void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t - // keyLen); + + // void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen); } return code; } + +int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h) { + int32_t code = 0; + + taosLRUCacheRelease(pCache, h, false); + + return code; +} diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index e3eb015c759f45dca83f9e1a4dcada8e11c64d1d..e00c165f99261b287327a795807d257fc5651b2f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -22,15 +22,15 @@ typedef struct SLastrowReader { SVnode* pVnode; STSchema* pSchema; uint64_t uid; -// int32_t* pSlotIds; - char** transferBuf; // todo remove it soon - int32_t numOfCols; - int32_t type; - int32_t tableIndex; // currently returned result tables - SArray* pTableList; // table id list + // int32_t* pSlotIds; + char** transferBuf; // todo remove it soon + int32_t numOfCols; + int32_t type; + int32_t tableIndex; // currently returned result tables + SArray* pTableList; // table id list } SLastrowReader; -static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReader, const int32_t *slotIds) { +static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReader, const int32_t* slotIds) { int32_t numOfRows = pBlock->info.rows; size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); @@ -60,21 +60,21 @@ static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReade pBlock->info.rows += 1; } - -int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t* colId, int32_t numOfCols, void** pReader) { +int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t* colId, int32_t numOfCols, + void** pReader) { SLastrowReader* p = taosMemoryCalloc(1, sizeof(SLastrowReader)); if (p == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - p->type = type; - p->pVnode = pVnode; - p->numOfCols = numOfCols; + p->type = type; + p->pVnode = pVnode; + p->numOfCols = numOfCols; p->transferBuf = taosMemoryCalloc(p->numOfCols, POINTER_BYTES); STableKeyInfo* pKeyInfo = taosArrayGet(pTableIdList, 0); - p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1); - p->pTableList = pTableIdList; + p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1); + p->pTableList = pTableIdList; #if 0 for(int32_t i = 0; i < p->numOfCols; ++i) { for(int32_t j = 0; j < p->pSchema->numOfCols; ++j) { @@ -101,7 +101,7 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t tsdbLastrowReaderClose(void* pReader) { SLastrowReader* p = pReader; - for(int32_t i = 0; i < p->numOfCols; ++i) { + for (int32_t i = 0; i < p->numOfCols; ++i) { taosMemoryFreeClear(p->transferBuf[i]); } @@ -117,8 +117,9 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t SLastrowReader* pr = pReader; - STSRow* pRow = NULL; - size_t numOfTables = taosArrayGetSize(pr->pTableList); + LRUHandle* h = NULL; + STSRow* pRow = NULL; + size_t numOfTables = taosArrayGetSize(pr->pTableList); // retrieve the only one last row of all tables in the uid list. if (pr->type == LASTROW_RETRIEVE_TYPE_SINGLE) { @@ -126,16 +127,18 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t bool internalResult = false; for (int32_t i = 0; i < numOfTables; ++i) { STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); - - int32_t code = tsdbCacheGetLastrow(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &pRow); + + /* int32_t code = tsdbCacheGetLastrow(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &pRow); */ + int32_t code = tsdbCacheGetLastrowH(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h); if (code != TSDB_CODE_SUCCESS) { return code; } - if (pRow == NULL) { + if (h == NULL) { continue; } + pRow = (STSRow*)taosLRUCacheValue(pr->pVnode->pTsdb->lruCache, h); if (pRow->ts > lastKey) { // Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already // appended or not. @@ -147,23 +150,29 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t internalResult = true; lastKey = pRow->ts; } + + tsdbCacheRelease(pr->pVnode->pTsdb->lruCache, h); } } else if (pr->type == LASTROW_RETRIEVE_TYPE_ALL) { for (int32_t i = pr->tableIndex; i < numOfTables; ++i) { STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); - int32_t code = tsdbCacheGetLastrow(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &pRow); + /* int32_t code = tsdbCacheGetLastrow(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &pRow); */ + int32_t code = tsdbCacheGetLastrowH(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h); if (code != TSDB_CODE_SUCCESS) { return code; } // no data in the table of Uid - if (pRow == NULL) { + if (h == NULL) { continue; } + pRow = (STSRow*)taosLRUCacheValue(pr->pVnode->pTsdb->lruCache, h); saveOneRow(pRow, pResBlock, pr, slotIds); + tsdbCacheRelease(pr->pVnode->pTsdb->lruCache, h); + pr->tableIndex += 1; if (pResBlock->info.rows >= pResBlock->info.capacity) { return TSDB_CODE_SUCCESS; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 537fa5b866116af4e16b90090bc641d5fa12e12b..ba1fddf9565eb7028288431943e3e2e99f2aacb6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -180,7 +180,7 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid pMemTable->nDel++; if (tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) { - tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid); + tsdbCacheDelete(pTsdb->lruCache, pTbData->uid, eKey); } tsdbError("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64 diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 062e91a6a63412392f10037fa2a984b69d4b2028..bd4f4d1ca82d56442aebcb555b5f2c290f185fc3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -984,8 +984,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI } } else { SColVal cv = {0}; - - SColData* pData = (SColData*)taosArrayGetP(pBlockData->aColDataP, pReader->suppInfo.slotIds[i]); + SColData* pData = (SColData*)taosArrayGetP(pBlockData->aColDataP, pReader->suppInfo.slotIds[i] - 1); for (int32_t j = 0; j < pBlockData->nRow; ++j) { tColDataGetValue(pData, j, &cv); colDataAppend(pColData, j, (const char*)&cv.value, cv.isNull); @@ -994,7 +993,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI } pReader->pResBlock->info.rows = pBlockData->nRow; - setBlockDumpCompleted(&pReader->status.fBlockDumpInfo, pBlockData); + setBlockDumpCompleted(&pReader->status.fBlockDumpInfo, pBlockData); /* int32_t ret = tsdbLoadBlockDataCols(&(pReader->rhelper), pBlock, pCheckInfo->pCompInfo, colIds, @@ -2283,7 +2282,7 @@ static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock) } static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVerRange) { - return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) /*&& (pBlock->maxVersion >= pVerRange->minVer) && (pBlock->minVersion <= pVerRange->maxVer)*/; + return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVersion >= pVerRange->minVer) && (pBlock->minVersion <= pVerRange->maxVer); } static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo *pFBlock, SBlock* pBlock, STableBlockScanInfo *pScanInfo, TSDBKEY key) { @@ -2616,7 +2615,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { return code; } -static int32_t buildInmemBlockSeqentially(STsdbReader* pReader) { +static int32_t buildBlockFromBufferSeqentially(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; while(1) { @@ -2644,7 +2643,7 @@ static int32_t buildInmemBlockSeqentially(STsdbReader* pReader) { } } -static int32_t loadDataInFiles(STsdbReader* pReader) { +static int32_t buildBlockFromFiles(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; SFileSetIter* pFIter = &pStatus->fileIter; @@ -3018,7 +3017,7 @@ int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list) { // // check if the query range overlaps with the file data block // bool exists = true; -// int32_t code = loadDataInFiles(pTsdbReadHandle, &exists); +// int32_t code = buildBlockFromFiles(pTsdbReadHandle, &exists); // if (code != TSDB_CODE_SUCCESS) { // pTsdbReadHandle->checkFiles = false; // return false; @@ -3306,7 +3305,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { if (pReader->type == BLOCK_LOAD_OFFSET_ORDER) { if (pStatus->loadFromFile) { - int32_t code = loadDataInFiles(pReader); + int32_t code = buildBlockFromFiles(pReader); if (code != TSDB_CODE_SUCCESS) { return false; } @@ -3314,11 +3313,11 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { if (pBlock->info.rows > 0) { return true; } else { - buildInmemBlockSeqentially(pReader); + buildBlockFromBufferSeqentially(pReader); return pBlock->info.rows > 0; } } else { // no data in files, let's try the buffer - buildInmemBlockSeqentially(pReader); + buildBlockFromBufferSeqentially(pReader); return pBlock->info.rows > 0; } } else if (pReader->type == BLOCK_LOAD_TABLESEQ_ORDER) { @@ -3334,7 +3333,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { // if (pReader->checkFiles) { // // check if the query range overlaps with the file data block // bool exists = true; - // int32_t code = loadDataInFiles(pReader, &exists); + // int32_t code = buildBlockFromFiles(pReader, &exists); // if (code != TSDB_CODE_SUCCESS) { // pReader->activeIndex = 0; // pReader->checkFiles = false; @@ -3454,7 +3453,16 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); int32_t code = tBlockDataInit(&pStatus->fileBlockData); - doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return NULL; + } + + code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return NULL; + } return pReader->pResBlock->pDataBlock; } diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index ef9d7c0424031431d74aeafdaf44f59952cbff9a..4f07dfd497b7dbab35e805bd801e3fb1a5fb5386 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -738,99 +738,136 @@ _err: return code; } -int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol, - SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2) { - int32_t code = 0; - TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD; - uint8_t *pBuf1 = NULL; - uint8_t *pBuf2 = NULL; - - ASSERT(nCol == 0 || aColId[0] != PRIMARYKEY_TIMESTAMP_COL_ID); +static int32_t tsdbReadColDataImpl(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int32_t iSubBlock, + int16_t *aColId, int32_t nCol, SBlockData *pBlockData, uint8_t **ppBuf1, + uint8_t **ppBuf2) { + TdFilePtr pFD = pBlock->last ? pReader->pLastFD : pReader->pDataFD; + SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock]; + int32_t code = 0; + int64_t offset; + int64_t size; + int64_t n; - if (!ppBuf1) ppBuf1 = &pBuf1; - if (!ppBuf2) ppBuf2 = &pBuf2; + tBlockDataReset(pBlockData); + pBlockData->nRow = pSubBlock->nRow; - for (int32_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) { - SSubBlock *pSubBlock = &pBlock->aSubBlock[iSubBlock]; - int64_t offset; - int64_t size; - int64_t n; + // TSDBKEY + offset = pSubBlock->offset + sizeof(SBlockDataHdr); + size = pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM); + code = tsdbRealloc(ppBuf1, size); + if (code) goto _err; - tBlockDataReset(pBlockData); - pBlockData->nRow = pSubBlock->nRow; + n = taosLSeekFile(pFD, offset, SEEK_SET); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } - // TSDBKEY - offset = pSubBlock->offset + sizeof(SBlockDataHdr); - size = pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM); - code = tsdbRealloc(ppBuf1, size); - if (code) goto _err; + n = taosReadFile(pFD, *ppBuf1, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } else if (n < size) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } - n = taosLSeekFile(pFD, offset, SEEK_SET); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbRecoverBlockDataKey(pBlockData, pSubBlock, *ppBuf1, ppBuf2); + if (code) goto _err; - n = taosReadFile(pFD, *ppBuf1, size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } else if (n < size) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } + // OTHER + SBlockCol blockCol; + SBlockCol *pBlockCol = &blockCol; + SColData *pColData; + for (int32_t iCol = 0; iCol < nCol; iCol++) { + int16_t cid = aColId[iCol]; - code = tsdbRecoverBlockDataKey(pBlockData, pSubBlock, *ppBuf1, ppBuf2); - if (code) goto _err; + if (tMapDataSearch(&pSubBlock->mBlockCol, &(SBlockCol){.cid = cid}, tGetBlockCol, tBlockColCmprFn, pBlockCol) == + 0) { + code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aColDataP), &pColData); + if (code) goto _err; - // OTHER - SBlockCol blockCol; - SBlockCol *pBlockCol = &blockCol; - SColData *pColData; - for (int32_t iCol = 0; iCol < nCol; iCol++) { - int16_t cid = aColId[iCol]; + tColDataReset(pColData, pBlockCol->cid, pBlockCol->type); + if (pBlockCol->flag == HAS_NULL) { + for (int32_t iRow = 0; iRow < pSubBlock->nRow; iRow++) { + code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type)); + if (code) goto _err; + } + } else { + offset = pSubBlock->offset + sizeof(SBlockDataHdr) + pSubBlock->vsize + pSubBlock->ksize + pBlockCol->offset; + size = pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM); - if (tMapDataSearch(&pSubBlock->mBlockCol, &(SBlockCol){.cid = cid}, tGetBlockCol, tBlockColCmprFn, pBlockCol) == - 0) { - code = tBlockDataAddColData(pBlockData, taosArrayGetSize(pBlockData->aColDataP), &pColData); + code = tsdbRealloc(ppBuf1, size); if (code) goto _err; - tColDataReset(pColData, pBlockCol->cid, pBlockCol->type); - if (pBlockCol->flag == HAS_NULL) { - for (int32_t iRow = 0; iRow < pSubBlock->nRow; iRow++) { - code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type)); - if (code) goto _err; - } - } else { - offset = pSubBlock->offset + sizeof(SBlockDataHdr) + pSubBlock->vsize + pSubBlock->ksize + pBlockCol->offset; - size = pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM); - - code = tsdbRealloc(ppBuf1, size); - if (code) goto _err; + // seek + n = taosLSeekFile(pFD, offset, SEEK_SET); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } - // seek - n = taosLSeekFile(pFD, offset, SEEK_SET); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - // read - n = taosReadFile(pFD, *ppBuf1, size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } else if (n < size) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - code = tsdbRecoverColData(pBlockData, pSubBlock, pBlockCol, pColData, *ppBuf1, ppBuf2); - if (code) goto _err; + // read + n = taosReadFile(pFD, *ppBuf1, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } else if (n < size) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; } + + code = tsdbRecoverColData(pBlockData, pSubBlock, pBlockCol, pColData, *ppBuf1, ppBuf2); + if (code) goto _err; } } } + return code; + +_err: + return code; +} + +int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol, + SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2) { + int32_t code = 0; + uint8_t *pBuf1 = NULL; + uint8_t *pBuf2 = NULL; + + ASSERT(nCol == 0 || aColId[0] != PRIMARYKEY_TIMESTAMP_COL_ID); + + if (!ppBuf1) ppBuf1 = &pBuf1; + if (!ppBuf2) ppBuf2 = &pBuf2; + + code = tsdbReadColDataImpl(pReader, pBlockIdx, pBlock, 0, aColId, nCol, pBlockData, ppBuf1, ppBuf2); + if (code) goto _err; + + if (pBlock->nSubBlock > 1) { + SBlockData *pBlockData1 = &(SBlockData){0}; + SBlockData *pBlockData2 = &(SBlockData){0}; + + for (int32_t iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) { + code = tsdbReadColDataImpl(pReader, pBlockIdx, pBlock, iSubBlock, aColId, nCol, pBlockData1, ppBuf1, ppBuf2); + if (code) goto _err; + + code = tBlockDataCopy(pBlockData, pBlockData2); + if (code) { + tBlockDataClear(pBlockData1); + tBlockDataClear(pBlockData2); + goto _err; + } + + code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData); + if (code) { + tBlockDataClear(pBlockData1); + tBlockDataClear(pBlockData2); + goto _err; + } + } + + tBlockDataClear(pBlockData1); + tBlockDataClear(pBlockData2); + } tsdbFree(pBuf1); tsdbFree(pBuf2); @@ -876,39 +913,11 @@ static int32_t tsdbReadSubBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, goto _err; } - // // check - // p = *ppBuf1; - // SBlockDataHdr *pHdr = (SBlockDataHdr *)p; - // ASSERT(pHdr->delimiter == TSDB_FILE_DLMT); - // ASSERT(pHdr->suid == pBlockIdx->suid); - // ASSERT(pHdr->uid == pBlockIdx->uid); - // p += sizeof(*pHdr); - - // if (!taosCheckChecksumWhole(p, pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM))) { - // code = TSDB_CODE_FILE_CORRUPTED; - // goto _err; - // } - // p += (pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM)); - - // for (int32_t iBlockCol = 0; iBlockCol < pSubBlock->mBlockCol.nItem; iBlockCol++) { - // tMapDataGetItemByIdx(&pSubBlock->mBlockCol, iBlockCol, pBlockCol, tGetBlockCol); - - // ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE); - - // if (pBlockCol->flag == HAS_NULL) continue; - - // if (!taosCheckChecksumWhole(p, pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM))) { - // code = TSDB_CODE_FILE_CORRUPTED; - // goto _err; - // } - // p = p + pBlockCol->bsize + pBlockCol->csize + sizeof(TSCKSUM); - // } - // recover pBlockData->nRow = pSubBlock->nRow; p = *ppBuf1 + sizeof(SBlockDataHdr); - code = tsdbRecoverBlockDataKey(pBlockData, pSubBlock, *ppBuf1, ppBuf2); + code = tsdbRecoverBlockDataKey(pBlockData, pSubBlock, p, ppBuf2); if (code) goto _err; p = p + pSubBlock->vsize + pSubBlock->ksize + sizeof(TSCKSUM); @@ -964,7 +973,7 @@ int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *p SBlockData *pBlockData2 = &(SBlockData){0}; for (iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) { - code = tsdbReadSubBlockData(pReader, pBlockIdx, pBlock, iSubBlock, pBlockData, ppBuf1, ppBuf2); + code = tsdbReadSubBlockData(pReader, pBlockIdx, pBlock, iSubBlock, pBlockData1, ppBuf1, ppBuf2); if (code) { tBlockDataClear(pBlockData1); tBlockDataClear(pBlockData2); diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 489d6d749ae5439dcb66af96070bee429bf3ead5..d43bf3664ed6b602b12f5c2e6bda2d6dbf6fc09c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -1031,20 +1031,24 @@ static int32_t tColDataUpdateOffset(SColData *pColData) { ASSERT(pColData->nVal > 0); ASSERT(pColData->flag); + ASSERT(IS_VAR_DATA_TYPE(pColData->type)); - if (IS_VAR_DATA_TYPE(pColData->type) && (pColData->flag & HAS_VALUE)) { + if ((pColData->flag & HAS_VALUE)) { code = tsdbRealloc((uint8_t **)&pColData->aOffset, sizeof(int32_t) * pColData->nVal); if (code) goto _exit; int32_t offset = 0; for (int32_t iVal = 0; iVal < pColData->nVal; iVal++) { - uint8_t v = GET_BIT2(pColData->pBitMap, iVal); - if (v == 0 || v == 1) { - pColData->aOffset[iVal] = -1; - } else { - pColData->aOffset[iVal] = offset; - offset += tGetValue(pColData->pData + offset, &value, pColData->type); + if (pColData->flag != HAS_VALUE) { + uint8_t v = GET_BIT2(pColData->pBitMap, iVal); + if (v == 0 || v == 1) { + pColData->aOffset[iVal] = -1; + continue; + } } + + pColData->aOffset[iVal] = offset; + offset += tGetValue(pColData->pData + offset, &value, pColData->type); } ASSERT(offset == pColData->nData); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 34b596e34a36d11378749770d6af3c84d4696c8f..60a13ee34c0ea4900a3dd99828b58483da2fcaad 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -218,6 +218,8 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp break; case TDMT_VND_ALTER_CONFIG: break; + case TDMT_VND_COMMIT: + goto _do_commit; default: ASSERT(0); break; @@ -232,6 +234,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp // commit if need if (vnodeShouldCommit(pVnode)) { + _do_commit: vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version); // commit current change vnodeCommit(pVnode); diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index f3e1eb47e8fa7b8494fc4dbeed71bce0d6ba4a1f..b5aad7589a2777b683f197e92661517a32eb661c 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -107,7 +107,7 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode); EDealRes doTranslateTagExpr(SNode** pNode, void* pContext); -int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo* pListInfo); +int32_t getTableList(void* metaHandle, void* vnode, SScanPhysiNode* pScanNode, STableListInfo* pListInfo); SArray* createSortInfo(SNodeList* pNodeList); SArray* extractPartitionColInfo(SNodeList* pNodeList); SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, int32_t type); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 221ebbd3acda3d8a4c25d8040e2217d8a834673d..74debd1531366a07cfaf4d8d57aa6fd9caa1b87c 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -283,7 +283,7 @@ static bool isTableOk(STableKeyInfo* info, SNode *pTagCond, SMeta *metaHandle){ return result; } -int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo* pListInfo) { +int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, STableListInfo* pListInfo) { int32_t code = TSDB_CODE_SUCCESS; pListInfo->pTableList = taosArrayInit(8, sizeof(STableKeyInfo)); if(pListInfo->pTableList == NULL) return TSDB_CODE_OUT_OF_MEMORY; @@ -303,7 +303,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo //code = doFilterTag(pTagIndexCond, &metaArg, res); code = TSDB_CODE_INDEX_REBUILDING; if (code == TSDB_CODE_INDEX_REBUILDING) { - code = vnodeGetAllTableList(metaHandle, tableUid, pListInfo->pTableList); + code = vnodeGetAllTableList(pVnode, tableUid, pListInfo->pTableList); } else if (code != TSDB_CODE_SUCCESS) { qError("failed to get tableIds, reason: %s, suid: %" PRIu64 "", tstrerror(code), tableUid); taosArrayDestroy(res); @@ -319,7 +319,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo } taosArrayDestroy(res); } else { - code = vnodeGetAllTableList(metaHandle, tableUid, pListInfo->pTableList); + code = vnodeGetAllTableList(pVnode, tableUid, pListInfo->pTableList); } if(pTagCond){ diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 570dea474ae9c34c64c769af052e149c50e9884d..496cfbf2769d58a0ca49ff759023c6e0c51c9798 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4121,7 +4121,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) { STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode; - int32_t code = getTableList(pHandle->meta, pScanPhyNode, pTableListInfo); + int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanPhyNode, pTableListInfo); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = terrno; return NULL; @@ -4133,7 +4133,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); if (pBlockNode->tableType == TSDB_SUPER_TABLE) { - int32_t code = vnodeGetAllTableList(pHandle->meta, pBlockNode->uid, pTableListInfo->pTableList); + int32_t code = vnodeGetAllTableList(pHandle->vnode, pBlockNode->uid, pTableListInfo->pTableList); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = terrno; return NULL; @@ -4183,7 +4183,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); if (pScanNode->tableType == TSDB_SUPER_TABLE) { - code = vnodeGetAllTableList(pHandle->meta, pScanNode->uid, pTableListInfo->pTableList); + code = vnodeGetAllTableList(pHandle->vnode, pScanNode->uid, pTableListInfo->pTableList); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = terrno; return NULL; @@ -4399,7 +4399,7 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { } STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableListInfo* pTableListInfo, const char* idstr) { - int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo); + int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index bb3f4e403d50009bd76f7ab97740101509c73001..a467dd2a530ea4195496d336a4a107b6e7b5e6af 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2033,7 +2033,7 @@ typedef struct STableMergeScanInfo { int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId) { - int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo); + int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 7d44d41b558c3eee9faf238e5f76fb70bf91f579..b3c51c8f88244194b77d971b33aaa5de9118daf5 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -545,6 +545,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { return 0; } +// TODO consider the page meta size int32_t getProperSortPageSize(size_t rowSize) { uint32_t defaultPageSize = 4096;