diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index ab15e851e76e1c7ad29a81a7cd1874a9e89d82ed..431c9116ccec57d90a1dbe2405845f6c26a5fef6 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -244,6 +244,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TDB_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0613) //"No available disk") #define TSDB_CODE_TDB_MESSED_MSG TAOS_DEF_ERROR_CODE(0, 0x0614) //"TSDB messed message") #define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0615) //"TSDB invalid tag value") +#define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0616) //"TSDB no cache last row data") // query #define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700) //"Invalid handle") diff --git a/src/tsdb/inc/tsdbCommitQueue.h b/src/tsdb/inc/tsdbCommitQueue.h index 6342c036b77317f6d954bf63c32674f5fbe66de9..b690e3bdc25d86acf7e5b9d580470a80f3f4316f 100644 --- a/src/tsdb/inc/tsdbCommitQueue.h +++ b/src/tsdb/inc/tsdbCommitQueue.h @@ -16,7 +16,7 @@ #ifndef _TD_TSDB_COMMIT_QUEUE_H_ #define _TD_TSDB_COMMIT_QUEUE_H_ -typedef enum { COMMIT_REQ, COMPACT_REQ } TSDB_REQ_T; +typedef enum { COMMIT_REQ, COMPACT_REQ,COMMIT_CONFIG_REQ } TSDB_REQ_T; int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req); diff --git a/src/tsdb/inc/tsdbMemTable.h b/src/tsdb/inc/tsdbMemTable.h index 6046274af40b855ac25dba6e220bf3bcfdc1dcca..babb7024b2500f3b8418fa5404bb593c768d8f1f 100644 --- a/src/tsdb/inc/tsdbMemTable.h +++ b/src/tsdb/inc/tsdbMemTable.h @@ -66,6 +66,7 @@ int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemSnapshot* pSnapshot, SArray* pAT void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemSnapshot* pSnapshot); void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes); int tsdbAsyncCommit(STsdbRepo* pRepo); +int tsdbSyncCommitConfig(STsdbRepo* pRepo); int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols, TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo); void* tsdbCommitData(STsdbRepo* pRepo); diff --git a/src/tsdb/inc/tsdbint.h b/src/tsdb/inc/tsdbint.h index 049c1bdb6ea37121a202a931faa17a4ea36cf6dc..7cf88826319b0b4ad8a7d128b19264d2b67434d0 100644 --- a/src/tsdb/inc/tsdbint.h +++ b/src/tsdb/inc/tsdbint.h @@ -78,7 +78,6 @@ struct STsdbRepo { bool config_changed; // config changed flag pthread_mutex_t save_mutex; // protect save config - uint8_t hasCachedLastRow; uint8_t hasCachedLastColumn; STsdbAppH appH; diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c index bb844e8e83f532e2c933ae35063460ec59129ee3..e25014bc1e8f2456eece3d517096cfee66886800 100644 --- a/src/tsdb/src/tsdbCommitQueue.c +++ b/src/tsdb/src/tsdbCommitQueue.c @@ -180,15 +180,14 @@ static void *tsdbLoopCommit(void *arg) { req = ((SReq *)pNode->data)->req; pRepo = ((SReq *)pNode->data)->pRepo; - // check if need to apply new config - if (pRepo->config_changed) { - tsdbApplyRepoConfig(pRepo); - } - if (req == COMMIT_REQ) { tsdbCommitData(pRepo); } else if (req == COMPACT_REQ) { tsdbCompactImpl(pRepo); + } else if (req == COMMIT_CONFIG_REQ) { + ASSERT(pRepo->config_changed); + tsdbApplyRepoConfig(pRepo); + tsem_post(&(pRepo->readyToCommit)); } else { ASSERT(0); } diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index afbedd5b2fd231606902db104916e4ff4f10ba67..bb02e0128337dbd0436e3c8b83894097b1c21c4c 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -270,8 +270,8 @@ int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) { pthread_mutex_unlock(&repo->save_mutex); - // schedule a commit msg then the new config will be applied immediatly - tsdbAsyncCommit(repo); + // schedule a commit msg and wait for the new config applied + tsdbSyncCommitConfig(repo); return 0; #if 0 @@ -553,7 +553,6 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { return NULL; } pRepo->config_changed = false; - atomic_store_8(&pRepo->hasCachedLastRow, 0); atomic_store_8(&pRepo->hasCachedLastColumn, 0); code = tsem_init(&(pRepo->readyToCommit), 0, 1); @@ -857,9 +856,7 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { } tsdbDestroyReadH(&readh); - if (CACHE_LAST_ROW(pCfg)) { - atomic_store_8(&pRepo->hasCachedLastRow, 1); - } + if (CACHE_LAST_NULL_COLUMN(pCfg)) { atomic_store_8(&pRepo->hasCachedLastColumn, 1); } @@ -900,20 +897,16 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) { // if close last option,need to free data if (need_free_last_row || need_free_last_col) { - if (need_free_last_row) { - atomic_store_8(&pRepo->hasCachedLastRow, 0); - } if (need_free_last_col) { atomic_store_8(&pRepo->hasCachedLastColumn, 0); } tsdbInfo("free cache last data since cacheLast option changed"); - for (int i = 1; i < maxTableIdx; i++) { + for (int i = 1; i <= maxTableIdx; i++) { STable *pTable = pMeta->tables[i]; if (pTable == NULL) continue; if (need_free_last_row) { taosTZfree(pTable->lastRow); pTable->lastRow = NULL; - pTable->lastKey = TSKEY_INITIAL_VAL; } if (need_free_last_col) { tsdbFreeLastColumns(pTable); @@ -983,9 +976,6 @@ int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) { tsdbDestroyReadH(&readh); - if (cacheLastRow) { - atomic_store_8(&pRepo->hasCachedLastRow, 1); - } if (cacheLastCol) { atomic_store_8(&pRepo->hasCachedLastColumn, 1); } diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index bee5600af730cda0d51c10dc969b070f7cd0252e..9d8b1ca7f2889f40b696f04a608dd166adf6eac6 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -271,10 +271,34 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { return ptr; } +int tsdbSyncCommitConfig(STsdbRepo* pRepo) { + ASSERT(pRepo->config_changed == true); + tsem_wait(&(pRepo->readyToCommit)); + + if (pRepo->code != TSDB_CODE_SUCCESS) { + tsdbWarn("vgId:%d try to commit config when TSDB not in good state: %s", REPO_ID(pRepo), tstrerror(terrno)); + } + + if (tsdbLockRepo(pRepo) < 0) return -1; + tsdbScheduleCommit(pRepo, COMMIT_CONFIG_REQ); + if (tsdbUnlockRepo(pRepo) < 0) return -1; + + tsem_wait(&(pRepo->readyToCommit)); + tsem_post(&(pRepo->readyToCommit)); + + if (pRepo->code != TSDB_CODE_SUCCESS) { + terrno = pRepo->code; + return -1; + } + + terrno = TSDB_CODE_SUCCESS; + return 0; +} + int tsdbAsyncCommit(STsdbRepo *pRepo) { tsem_wait(&(pRepo->readyToCommit)); - //ASSERT(pRepo->imem == NULL); + ASSERT(pRepo->imem == NULL); if (pRepo->mem == NULL) { tsem_post(&(pRepo->readyToCommit)); return 0; @@ -1015,7 +1039,6 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow taosTZfree(pTable->lastRow); TSDB_WLOCK_TABLE(pTable); pTable->lastRow = NULL; - pTable->lastKey = TSKEY_INITIAL_VAL; TSDB_WUNLOCK_TABLE(pTable); } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 1545d44395d6baca6f33c850e35796c96af8d52f..c333294179f215890f06ae8d30bf6316f379f875 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -2469,7 +2469,6 @@ static bool loadCachedLastRow(STsdbQueryHandle* pQueryHandle) { if (ret != TSDB_CODE_SUCCESS) { return false; } - copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj, NULL); tfree(pRow); @@ -2860,24 +2859,29 @@ bool tsdbGetExternalRow(TsdbQueryHandleT pHandle) { } /* - * 1. no data at all (pTable->lastKey = TSKEY_INITIAL_VAL), just return TSKEY_INITIAL_VAL - * 2. has data but not loaded, just return lastKey but not set pRes - * 3. has data and loaded, return lastKey and set pRes + * if lastRow == NULL, return TSDB_CODE_TDB_NO_CACHE_LAST_ROW + * else set pRes and return TSDB_CODE_SUCCESS and save lastKey */ int32_t tsdbGetCachedLastRow(STable* pTable, SDataRow* pRes, TSKEY* lastKey) { + int32_t code = TSDB_CODE_SUCCESS; + TSDB_RLOCK_TABLE(pTable); - *lastKey = pTable->lastKey; - if ((*lastKey) != TSKEY_INITIAL_VAL && pTable->lastRow) { + if (!pTable->lastRow) { + code = TSDB_CODE_TDB_NO_CACHE_LAST_ROW; + goto out; + } + + if (pRes) { *pRes = tdDataRowDup(pTable->lastRow); if (*pRes == NULL) { - TSDB_RUNLOCK_TABLE(pTable); - return TSDB_CODE_TDB_OUT_OF_MEMORY; + code = TSDB_CODE_TDB_OUT_OF_MEMORY; } } +out: TSDB_RUNLOCK_TABLE(pTable); - return TSDB_CODE_SUCCESS; + return code; } bool isTsdbCacheLastRow(TsdbQueryHandleT* pQueryHandle) { @@ -2887,7 +2891,6 @@ bool isTsdbCacheLastRow(TsdbQueryHandleT* pQueryHandle) { int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList) { assert(pQueryHandle != NULL && groupList != NULL); - SDataRow pRow = NULL; TSKEY key = TSKEY_INITIAL_VAL; SArray* group = taosArrayGetP(groupList->pGroupList, 0); @@ -2898,7 +2901,7 @@ int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *g int32_t code = 0; if (((STable*)pInfo->pTable)->lastRow) { - code = tsdbGetCachedLastRow(pInfo->pTable, &pRow, &key); + code = tsdbGetCachedLastRow(pInfo->pTable, NULL, &key); if (code != TSDB_CODE_SUCCESS) { pQueryHandle->cachelastrow = 0; } else { @@ -2913,7 +2916,6 @@ int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *g pQueryHandle->activeIndex = -1; // start from -1 } - tfree(pRow); return code; } diff --git a/tests/pytest/query/last_row_cache.py b/tests/pytest/query/last_row_cache.py index a0e81477096e9c846e109ae71020b40e47b39a84..f116d0254e75983bdd5d8af119c4b4ac1737376a 100644 --- a/tests/pytest/query/last_row_cache.py +++ b/tests/pytest/query/last_row_cache.py @@ -25,7 +25,7 @@ class TDTestCase: self.tables = 10 self.rows = 20 - self.columns = 50 + self.columns = 5 self.perfix = 't' self.ts = 1601481600000 @@ -34,7 +34,7 @@ class TDTestCase: sql = "create table st(ts timestamp, " for i in range(self.columns - 1): sql += "c%d int, " % (i + 1) - sql += "c50 int) tags(t1 int)" + sql += "c5 int) tags(t1 int)" tdSql.execute(sql) for i in range(self.tables):