diff --git a/include/common/tglobal.h b/include/common/tglobal.h index ac75b8476299a8600ffa298e44b9f95b71640883..13e8454ac32f9d14ac0fa09aa9f7c3caf5f931ef 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -98,7 +98,8 @@ extern char *tsSvrCrashReportUri; // query buffer management extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing -extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in byte for each data node +extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in byte for each data node +extern int32_t tsCacheLazyLoadThreshold; // cost threshold for last/last_row loading cache as much as possible // query client extern int32_t tsQueryPolicy; @@ -145,10 +146,10 @@ extern char tsUdfdResFuncs[]; extern char tsUdfdLdLibPath[]; // schemaless -extern char tsSmlChildTableName[]; -extern char tsSmlTagName[]; -//extern bool tsSmlDataFormat; -//extern int32_t tsSmlBatchSize; +extern char tsSmlChildTableName[]; +extern char tsSmlTagName[]; +// extern bool tsSmlDataFormat; +// extern int32_t tsSmlBatchSize; // wal extern int64_t tsWalFsyncDataSizeLimit; diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 644f423cb1961f5422ec695cbc83f76128095f4a..f7c9c77e789250368bd622dea7ebba481c9bd778 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1136,7 +1136,7 @@ typedef struct { int64_t numOfInsertSuccessReqs; int64_t numOfBatchInsertReqs; int64_t numOfBatchInsertSuccessReqs; - int32_t numOfCacheTables; + int32_t numOfCachedTables; } SVnodeLoad; typedef struct { diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 4aa47a381966c225da16a2f48a555833558522de..141504a7c4d93e4813cfa57472cc0c88e2d13225 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -233,6 +233,7 @@ static const SSysDbTableSchema vgroupsSchema[] = { {.name = "v4_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, {.name = "v4_status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "cacheload", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true}, + {.name = "cacheTables", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true}, {.name = "tsma", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT, .sysInfo = true}, // {.name = "compact_start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, }; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 8058f9fddd6ee29e2b217f059255190493093378..0b4c7b88d764c03f98ff1298a903d8bfbd0af94a 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -154,6 +154,7 @@ char tsTagFilterCache = 0; // positive value (in MB) int32_t tsQueryBufferSize = -1; int64_t tsQueryBufferSizeBytes = -1; +int32_t tsCacheLazyLoadThreshold = 500; int32_t tsDiskCfgNum = 0; SDiskCfg tsDiskCfg[TFS_MAX_DISKS] = {0}; @@ -497,6 +498,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "disableStream", tsDisableStream, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, 0) != 0) return -1; + GRANT_CFG_ADD; return 0; } @@ -824,6 +827,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; } + tsCacheLazyLoadThreshold = cfgGetItem(pCfg, "cacheLazyLoadThreshold")->i32; + tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval; GRANT_CFG_GET; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 9ad7c72bc0de1d90c16b3a37ef5f5186be46a21d..3c3f3a02609d6a84590f1226cc6cf2a36690e130 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1070,7 +1070,8 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { if (tEncodeI64(&encoder, pload->totalStorage) < 0) return -1; if (tEncodeI64(&encoder, pload->compStorage) < 0) return -1; if (tEncodeI64(&encoder, pload->pointsWritten) < 0) return -1; - if (tEncodeI64(&encoder, reserved) < 0) return -1; + if (tEncodeI32(&encoder, pload->numOfCachedTables) < 0) return -1; + if (tEncodeI32(&encoder, reserved) < 0) return -1; if (tEncodeI64(&encoder, reserved) < 0) return -1; if (tEncodeI64(&encoder, reserved) < 0) return -1; } @@ -1148,7 +1149,8 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { if (tDecodeI64(&decoder, &vload.totalStorage) < 0) return -1; if (tDecodeI64(&decoder, &vload.compStorage) < 0) return -1; if (tDecodeI64(&decoder, &vload.pointsWritten) < 0) return -1; - if (tDecodeI64(&decoder, &reserved) < 0) return -1; + if (tDecodeI32(&decoder, &vload.numOfCachedTables) < 0) return -1; + if (tDecodeI32(&decoder, (int32_t*)&reserved) < 0) return -1; if (tDecodeI64(&decoder, &reserved) < 0) return -1; if (tDecodeI64(&decoder, &reserved) < 0) return -1; if (taosArrayPush(pReq->pVloads, &vload) == NULL) { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index ebe96fd740babea9112b3038dec7bb4725e74545..dfc3b3fde85fe81049cc2842d6d7c6f4c5e4380e 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -359,6 +359,7 @@ typedef struct { int8_t replica; SVnodeGid vnodeGid[TSDB_MAX_REPLICA]; void* pTsma; + int32_t numOfCachedTables; } SVgObj; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 1d9db37a7d68403781b72b1d8c9d8b43951d0261..adeae45314e0fa5c773818e57b90929f2b213e18 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -412,6 +412,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { if (pVgroup != NULL) { if (pVload->syncState == TAOS_SYNC_STATE_LEADER) { pVgroup->cacheUsage = pVload->cacheUsage; + pVgroup->numOfCachedTables = pVload->numOfCachedTables; pVgroup->numOfTables = pVload->numOfTables; pVgroup->numOfTimeSeries = pVload->numOfTimeSeries; pVgroup->totalStorage = pVload->totalStorage; @@ -440,7 +441,8 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { if (roleChanged) { SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName); if (pDb != NULL && pDb->stateTs != curMs) { - mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name, pDb->stateTs, curMs); + mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name, + pDb->stateTs, curMs); pDb->stateTs = curMs; } mndReleaseDb(pMnode, pDb); diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 84e8a9ec4384847173c744a9db8d542702b62306..b747755acca339a1f4b088bd513a84f9bee07fcd 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -803,6 +803,9 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p int32_t cacheUsage = (int32_t)pVgroup->cacheUsage; colDataSetVal(pColInfo, numOfRows, (const char *)&cacheUsage, false); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->numOfCachedTables, false); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->isTsma, false); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 2dec38240e95852695a1f4b524991c08755b03eb..0b38ce6d240dbda045bef53d585fd998a0116adb 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -706,6 +706,7 @@ typedef struct SMergeTree { bool destroyLoadInfo; SSttBlockLoadInfo *pLoadInfo; const char *idStr; + bool ignoreEarlierTs; } SMergeTree; typedef struct { @@ -751,6 +752,7 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead bool destroyLoadInfo, const char *idStr, bool strictTimeRange); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); bool tMergeTreeNext(SMergeTree *pMTree); +bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree); TSDBROW tMergeTreeGetRow(SMergeTree *pMTree); void tMergeTreeClose(SMergeTree *pMTree); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index e68b3589fc3e6d356bb995b2630adcff9afb5ec9..ab94a5e1c76a134f97e0e4985d7163b5053ded8f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -637,6 +637,11 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa state->pMergeTree = &state->mergeTree; bool hasVal = tMergeTreeNext(&state->mergeTree); if (!hasVal) { + if (tMergeTreeIgnoreEarlierTs(&state->mergeTree)) { + *pIgnoreEarlierTs = true; + *ppRow = NULL; + return code; + } state->state = SFSLASTNEXTROW_FILESET; goto _next_fileset; } diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 1d78622a61a73a8514f697a36bb43bd2024f008d..e4c23c295a1b34cfc8649520e79b3cde7ff055d0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -332,6 +332,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 // retrieve the only one last row of all tables in the uid list. if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) { + int64_t st = taosGetTimestampUs(); for (int32_t i = 0; i < pr->numOfTables; ++i) { STableKeyInfo* pKeyInfo = &pr->pTableList[i]; @@ -407,8 +408,10 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } if (hasNotNullRow) { - pr->lastTs = minTs; - tsdbInfo("%p have cache read %d tables", pr, i + 1); + double cost = (taosGetTimestampUs() - st) / 1000.0; + if (cost > tsCacheLazyLoadThreshold) { + pr->lastTs = minTs; + } } } @@ -418,8 +421,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 if (hasRes) { saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes, pr->idstr); } - - tsdbInfo("have cached %d tables", taosLRUCacheGetElems(lruCache)); } else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) { for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) { STableKeyInfo* pKeyInfo = &pr->pTableList[i]; diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index af4eb9626e05ddabc500fe9b410b8cd8d4a5cc3a..943b16116c20e18130dbe63fd0c44db0109f5e75 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -29,6 +29,7 @@ struct SLDataIter { STimeWindow timeWindow; SVersionRange verRange; SSttBlockLoadInfo *pBlockLoadInfo; + bool ignoreEarlierTs; }; SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols, @@ -351,6 +352,7 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t if (backward && ((strictTimeRange && (*pIter)->pSttBlk->maxKey <= (*pIter)->timeWindow.skey) || (!strictTimeRange && (*pIter)->pSttBlk->maxKey < (*pIter)->timeWindow.skey))) { (*pIter)->pSttBlk = NULL; + (*pIter)->ignoreEarlierTs = true; } } @@ -581,6 +583,7 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead pMTree->pLoadInfo = pBlockLoadInfo; pMTree->destroyLoadInfo = destroyLoadInfo; + pMTree->ignoreEarlierTs = false; for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file struct SLDataIter *pIter = NULL; @@ -595,6 +598,9 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead taosArrayPush(pMTree->pIterList, &pIter); tMergeTreeAddIter(pMTree, pIter); } else { + if (!pMTree->ignoreEarlierTs) { + pMTree->ignoreEarlierTs = pIter->ignoreEarlierTs; + } tLDataIterClose(pIter); } } @@ -608,6 +614,8 @@ _end: void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter); } +bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree) { return pMTree->ignoreEarlierTs; } + bool tMergeTreeNext(SMergeTree *pMTree) { int32_t code = TSDB_CODE_SUCCESS; if (pMTree->pIter) { diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index d7e9c72a914dbc1da9c95a99d5aceeaa37b91fac..303d2a9ca449de504a582c8472951a1bf6c6c683 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -382,7 +382,7 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { pLoad->syncRestore = state.restored; pLoad->syncCanRead = state.canRead; pLoad->cacheUsage = tsdbCacheGetUsage(pVnode); - pLoad->numOfCacheTables = tsdbCacheGetElems(pVnode); + pLoad->numOfCachedTables = tsdbCacheGetElems(pVnode); pLoad->numOfTables = metaGetTbNum(pVnode->pMeta); pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta); pLoad->totalStorage = (int64_t)3 * 1073741824;