diff --git a/cmake/cmake.define b/cmake/cmake.define index 5b65738c700bd7fa862da6284881ada17f245302..dcb4ce2702b9bf96f01e55affcc50ec2c4dd696a 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -120,8 +120,14 @@ ELSE () SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=shift-base -fno-sanitize=alignment -g3 -Wformat=0") MESSAGE(STATUS "Compile with Address Sanitizer!") ELSE () - SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") - SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-reserved-user-defined-literal -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") + MESSAGE(STATUS "XXXXXXXXXXXXXX Clang/AppleClang" ${TD_DARWIN}) + IF (${TD_DARWIN}) + SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-y2k") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-reserved-user-defined-literal -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-y2k") + ELSE () + SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-reserved-user-defined-literal -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") + ENDIF () ENDIF () # disable all assert diff --git a/cmake/cmake.options b/cmake/cmake.options index 60ff00affc01408b084a8993441e4fe7052f4977..4ec9d18e08580a735598b647ef65188b46fdbc61 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -109,7 +109,7 @@ option( option( BUILD_WITH_ROCKSDB "If build with rocksdb" - OFF + ON ) option( diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 8f943133820bfa45a963ad9c36c48ddf60756a98..942e77dacbd8b1a4d7db2bc61a2204bdcf436750 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -223,17 +223,31 @@ endif(${BUILD_WITH_LEVELDB}) # rocksdb # To support rocksdb build on ubuntu: sudo apt-get install libgflags-dev if(${BUILD_WITH_ROCKSDB}) - SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=maybe-uninitialized") + #SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=maybe-uninitialized") option(WITH_TESTS "" OFF) option(WITH_BENCHMARK_TOOLS "" OFF) option(WITH_TOOLS "" OFF) option(WITH_LIBURING "" OFF) + option(WITH_IOSTATS_CONTEXT "" OFF) + option(WITH_PERF_CONTEXT "" OFF) + option(FAIL_ON_WARNINGS "" OFF) + #option(WITH_JEMALLOC "" ON) option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF) + IF (${TD_WINDOWS}) + option(WITH_MD_LIBRARY "build with MD" OFF) + set(SYSTEM_LIBS ${SYSTEM_LIBS} shlwapi.lib rpcrt4.lib) + endif(${TD_WINDOWS}) add_subdirectory(rocksdb EXCLUDE_FROM_ALL) target_include_directories( rocksdb PUBLIC $ ) + IF (${TD_DARWIN}) + target_compile_options( + rocksdb + PRIVATE -Wno-unused-private-field + ) + endif(${TD_DARWIN}) endif(${BUILD_WITH_ROCKSDB}) # lucene diff --git a/contrib/test/rocksdb/CMakeLists.txt b/contrib/test/rocksdb/CMakeLists.txt index b500e0a66158ba33d52d351f4e34462b27771892..5ae1b0395b5c2a4b323c07de19d1298579cfcc3f 100644 --- a/contrib/test/rocksdb/CMakeLists.txt +++ b/contrib/test/rocksdb/CMakeLists.txt @@ -1,6 +1,8 @@ +message("contrib test/rocksdb:" ${BUILD_DEPENDENCY_TESTS}) + add_executable(rocksdbTest "") target_sources(rocksdbTest PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/main.c" ) -target_link_libraries(rocksdbTest rocksdb) \ No newline at end of file +target_link_libraries(rocksdbTest rocksdb) diff --git a/contrib/test/rocksdb/main.c b/contrib/test/rocksdb/main.c index d1cbd373da1f8af09b6f46fbfbeb9da27dd43678..09435790cc3f1b0a05b0fe3db73e708243cfaa8f 100644 --- a/contrib/test/rocksdb/main.c +++ b/contrib/test/rocksdb/main.c @@ -25,10 +25,12 @@ int main(int argc, char const *argv[]) { // Read rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create(); - rocksdb_readoptions_set_snapshot(readoptions, rocksdb_create_snapshot(db)); +//rocksdb_readoptions_set_snapshot(readoptions, rocksdb_create_snapshot(db)); + char buf[256] = {0}; size_t vallen = 0; char * val = rocksdb_get(db, readoptions, "key", 3, &vallen, &err); - printf("val:%s\n", val); + snprintf(buf, vallen+5, "val:%s", val); + printf("%ld %ld %s\n", strlen(val), vallen, buf); // Update // rocksdb_put(db, writeoptions, "key", 3, "eulav", 5, &err); @@ -43,4 +45,4 @@ int main(int argc, char const *argv[]) { rocksdb_close(db); return 0; -} \ No newline at end of file +} diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index cf1c6cc434739190734af155b95a5d78cab366e4..d12e0b5c197584e4800fe577bc40ebd44384865d 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1219,6 +1219,7 @@ static int32_t smlPushCols(SArray *colsArray, SArray *cols) { SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i); taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); if (terrno == TSDB_CODE_DUP_KEY) { + taosHashCleanup(kvHash); return terrno; } } @@ -1292,12 +1293,12 @@ static int32_t smlParseLineBottom(SSmlHandle *info) { uDebug("SML:0x%" PRIx64 " smlParseLineBottom add meta, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum); SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat); + taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES); smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags); if (terrno == TSDB_CODE_DUP_KEY) { return terrno; } smlInsertMeta(meta->colHash, meta->cols, elements->colArray); - taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES); } } uDebug("SML:0x%" PRIx64 " smlParseLineBottom end, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum); diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 796c5cf8ff5990b438a48dc6fa54037fe44807b4..62f837f3b6f4bad95fd5a035e6094bff076ec686 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -755,7 +755,7 @@ SColVal *tRowIterNext(SRowIter *pIter) { } if (pIter->pRow->flag == HAS_NULL) { - pIter->cv = COL_VAL_NULL(pTColumn->type, pTColumn->colId); + pIter->cv = COL_VAL_NULL(pTColumn->colId, pTColumn->type); goto _exit; } @@ -2439,7 +2439,7 @@ _exit: int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes, int32_t nRows, char *lengthOrbitmap, char *data) { int32_t code = 0; - if(data == NULL){ + if (data == NULL) { for (int32_t i = 0; i < nRows; ++i) { code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0); } @@ -2453,8 +2453,9 @@ int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t byt code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0); if (code) goto _exit; } else { - if(ASSERT(varDataTLen(data + offset) <= bytes)){ - uError("var data length invalid, varDataTLen(data + offset):%d <= bytes:%d", (int)varDataTLen(data + offset), bytes); + if (ASSERT(varDataTLen(data + offset) <= bytes)) { + uError("var data length invalid, varDataTLen(data + offset):%d <= bytes:%d", (int)varDataTLen(data + offset), + bytes); code = TSDB_CODE_INVALID_PARA; goto _exit; } diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index c713d1e247a8cd358fbe0c8a4a529f884e8810b6..8c0e5c35cd46e83a3aa3ba124fbf4adafbe607f5 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -84,6 +84,7 @@ target_include_directories( PUBLIC "inc" PUBLIC "src/inc" PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar" + PUBLIC "${TD_SOURCE_DIR}/contrib/rocksdb/include" ) target_link_libraries( vnode @@ -100,6 +101,7 @@ target_link_libraries( # PUBLIC bdb # PUBLIC scalar + PUBLIC rocksdb PUBLIC transport PUBLIC stream PUBLIC index diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 0a54da9b5318dcf94173e29d4167a0a0b3dc0f46..34536d453c9ed527b84edd36577c61f26c67d21f 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -182,7 +182,7 @@ int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t n int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables, SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr, bool countOnly); -void tsdbReaderSetId(STsdbReader* pReader, const char* idstr); +void tsdbReaderSetId(STsdbReader *pReader, const char *idstr); void tsdbReaderClose(STsdbReader *pReader); int32_t tsdbNextDataBlock(STsdbReader *pReader, bool *hasNext); int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave); @@ -196,8 +196,9 @@ void *tsdbGetIvtIdx(SMeta *pMeta); uint64_t getReaderMaxVersion(STsdbReader *pReader); int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, - uint64_t suid, void **pReader, const char *idstr); -int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids); + SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr); +int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds, + SArray *pTableUids); void *tsdbCacherowsReaderClose(void *pReader); int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 2a85b191a42839a580da6ff8b65ee4e9c65430b5..95bce321964b432182aaeef556bec8e52509e31e 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -343,6 +343,16 @@ struct STsdbFS { SArray *aDFileSet; // SArray }; +typedef struct { + rocksdb_t *db; + rocksdb_options_t *options; + rocksdb_flushoptions_t *flushoptions; + rocksdb_writeoptions_t *writeoptions; + rocksdb_readoptions_t *readoptions; + rocksdb_writebatch_t *writebatch; + TdThreadMutex rMutex; +} SRocksCache; + struct STsdb { char *path; SVnode *pVnode; @@ -355,6 +365,7 @@ struct STsdb { TdThreadMutex lruMutex; SLRUCache *biCache; TdThreadMutex biMutex; + SRocksCache rCache; }; struct TSDBKEY { @@ -777,6 +788,8 @@ typedef struct SCacheRowsReader { uint64_t suid; char **transferBuf; // todo remove it soon int32_t numOfCols; + SArray *pCidList; + int32_t *pSlotIds; int32_t type; int32_t tableIndex; // currently returned result tables STableKeyInfo *pTableList; // table id list @@ -796,6 +809,10 @@ typedef struct { int32_t tsdbOpenCache(STsdb *pTsdb); void tsdbCloseCache(STsdb *pTsdb); +int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row); +int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int32_t ltype); +int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); + int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb); int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup); int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 70b997aba2cadb4e9088c11cb5115a4079382ea4..94e5f253bfb73323f14c649f5356b7ed6df70786 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -19,6 +19,7 @@ #include "executor.h" #include "filter.h" #include "qworker.h" +#include "rocksdb/c.h" #include "sync.h" #include "tRealloc.h" #include "tchecksum.h" @@ -177,6 +178,7 @@ int tsdbClose(STsdb** pTsdb); int32_t tsdbBegin(STsdb* pTsdb); int32_t tsdbPrepareCommit(STsdb* pTsdb); int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo); +int32_t tsdbCacheCommit(STsdb* pTsdb); int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo); int32_t tsdbFinishCommit(STsdb* pTsdb); int32_t tsdbRollbackCommit(STsdb* pTsdb); @@ -193,9 +195,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode); void tqClose(STQ*); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, - int32_t type); + int32_t type); int tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer); -int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. +int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 3c7edd931baf1fd2f8796368e4ac76bbb398e636..df7b940f61cd830a1425a81dea00d14b0fca1cfc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -12,7 +12,6 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ - #include "tsdb.h" static int32_t tsdbOpenBICache(STsdb *pTsdb) { @@ -47,6 +46,502 @@ static void tsdbCloseBICache(STsdb *pTsdb) { } } +#define ROCKS_KEY_LEN 64 + +static void tsdbGetRocksPath(STsdb *pTsdb, char *path) { + SVnode *pVnode = pTsdb->pVnode; + if (pVnode->pTfs) { + if (path) { + snprintf(path, TSDB_FILENAME_LEN, "%s%s%s%scache.rdb", tfsGetPrimaryPath(pTsdb->pVnode->pTfs), TD_DIRSEP, + pTsdb->path, TD_DIRSEP); + } + } else { + if (path) { + snprintf(path, TSDB_FILENAME_LEN, "%s%scache.rdb", pTsdb->path, TD_DIRSEP); + } + } +} + +static int32_t tsdbOpenRocksCache(STsdb *pTsdb) { + int32_t code = 0; + + rocksdb_options_t *options = rocksdb_options_create(); + if (NULL == options) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + rocksdb_options_set_create_if_missing(options, 1); + // rocksdb_options_set_inplace_update_support(options, 1); + // rocksdb_options_set_allow_concurrent_memtable_write(options, 0); + + rocksdb_writeoptions_t *writeoptions = rocksdb_writeoptions_create(); + if (NULL == writeoptions) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err2; + } + // rocksdb_writeoptions_disable_WAL(writeoptions, 1); + + rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create(); + if (NULL == readoptions) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err2; + } + + char *err = NULL; + char cachePath[TSDB_FILENAME_LEN] = {0}; + tsdbGetRocksPath(pTsdb, cachePath); + + rocksdb_t *db = rocksdb_open(options, cachePath, &err); + if (NULL == db) { + code = -1; + goto _err3; + } + + rocksdb_flushoptions_t *flushoptions = rocksdb_flushoptions_create(); + if (NULL == flushoptions) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err4; + } + + rocksdb_writebatch_t *writebatch = rocksdb_writebatch_create(); + + pTsdb->rCache.writebatch = writebatch; + pTsdb->rCache.options = options; + pTsdb->rCache.writeoptions = writeoptions; + pTsdb->rCache.readoptions = readoptions; + pTsdb->rCache.flushoptions = flushoptions; + pTsdb->rCache.db = db; + + taosThreadMutexInit(&pTsdb->rCache.rMutex, NULL); + + return code; + +_err4: + rocksdb_readoptions_destroy(readoptions); +_err3: + rocksdb_writeoptions_destroy(writeoptions); +_err2: + rocksdb_options_destroy(options); +_err: + return code; +} + +static void tsdbCloseRocksCache(STsdb *pTsdb) { + rocksdb_close(pTsdb->rCache.db); + rocksdb_flushoptions_destroy(pTsdb->rCache.flushoptions); + rocksdb_writebatch_destroy(pTsdb->rCache.writebatch); + rocksdb_readoptions_destroy(pTsdb->rCache.readoptions); + rocksdb_writeoptions_destroy(pTsdb->rCache.writeoptions); + rocksdb_options_destroy(pTsdb->rCache.options); + taosThreadMutexDestroy(&pTsdb->rCache.rMutex); +} + +int32_t tsdbCacheCommit(STsdb *pTsdb) { + int32_t code = 0; + char *err = NULL; + + rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err); + if (NULL != err) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); + rocksdb_free(err); + code = -1; + } + + return code; +} + +SLastCol *tsdbCacheDeserialize(char const *value) { + if (!value) { + return NULL; + } + + SLastCol *pLastCol = (SLastCol *)value; + SColVal *pColVal = &pLastCol->colVal; + if (IS_VAR_DATA_TYPE(pColVal->type)) { + if (pColVal->value.nData > 0) { + pColVal->value.pData = (char *)value + sizeof(*pLastCol); + } else { + pColVal->value.pData = NULL; + } + } + + return pLastCol; +} + +void tsdbCacheSerialize(SLastCol *pLastCol, char **value, size_t *size) { + SColVal *pColVal = &pLastCol->colVal; + size_t length = sizeof(*pLastCol); + if (IS_VAR_DATA_TYPE(pColVal->type)) { + length += pColVal->value.nData; + } + *value = taosMemoryMalloc(length); + + *(SLastCol *)(*value) = *pLastCol; + if (IS_VAR_DATA_TYPE(pColVal->type)) { + uint8_t *pVal = pColVal->value.pData; + pColVal->value.pData = *value + sizeof(*pLastCol); + if (pColVal->value.nData > 0) { + memcpy(pColVal->value.pData, pVal, pColVal->value.nData); + } else { + pColVal->value.pData = NULL; + } + } + *size = length; +} + +static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, char const *lstring) { + SLastCol *pLastCol = NULL; + + char *err = NULL; + size_t vlen = 0; + char key[ROCKS_KEY_LEN]; + size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":%s", uid, cid, lstring); + char *value = NULL; + value = rocksdb_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, key, klen, &vlen, &err); + if (NULL != err) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); + rocksdb_free(err); + } + + pLastCol = tsdbCacheDeserialize(value); + + return pLastCol; +} + +int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) { + int32_t code = 0; + + // 1, fetch schema + STSchema *pTSchema = NULL; + int32_t sver = TSDBROW_SVERSION(pRow); + code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return -1; + } + + // 2, iterate col values into array + SArray *aColVal = taosArrayInit(32, sizeof(SColVal)); + + STSDBRowIter iter = {0}; + tsdbRowIterOpen(&iter, pRow, pTSchema); + + for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) { + /* + if (IS_VAR_DATA_TYPE(pColVal->type)) { + uint8_t *pVal = pColVal->value.pData; + + pColVal->value.pData = NULL; + code = tRealloc(&pColVal->value.pData, pColVal->value.nData); + if (code) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + if (pColVal->value.nData) { + memcpy(pColVal->value.pData, pVal, pColVal->value.nData); + } + } + */ + taosArrayPush(aColVal, pColVal); + } + + tsdbRowClose(&iter); + + // 3, build keys & multi get from rocks + int num_keys = TARRAY_SIZE(aColVal); + char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); + size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); + for (int i = 0; i < num_keys; ++i) { + SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i); + int16_t cid = pColVal->cid; + + char *keys = taosMemoryCalloc(2, ROCKS_KEY_LEN); + int last_key_len = snprintf(keys, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, cid); + if (last_key_len >= ROCKS_KEY_LEN) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); + } + int lr_key_len = snprintf(keys + ROCKS_KEY_LEN, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, cid); + if (lr_key_len >= ROCKS_KEY_LEN) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); + } + keys_list[i] = keys; + keys_list[num_keys + i] = keys + ROCKS_KEY_LEN; + keys_list_sizes[i] = last_key_len; + keys_list_sizes[num_keys + i] = lr_key_len; + } + char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); + size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); + char **errs = taosMemoryCalloc(num_keys * 2, sizeof(char *)); + taosThreadMutexLock(&pTsdb->rCache.rMutex); + rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list, + keys_list_sizes, values_list, values_list_sizes, errs); + for (int i = 0; i < num_keys; ++i) { + taosMemoryFree(keys_list[i]); + } + for (int i = 0; i < num_keys * 2; ++i) { + rocksdb_free(errs[i]); + } + taosMemoryFree(keys_list); + taosMemoryFree(keys_list_sizes); + taosMemoryFree(errs); + + TSKEY keyTs = TSDBROW_TS(pRow); + rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; + for (int i = 0; i < num_keys; ++i) { + SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i); + if (COL_VAL_IS_VALUE(pColVal)) { + SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); + + if (NULL == pLastCol || pLastCol->ts <= keyTs) { + char *value = NULL; + size_t vlen = 0; + tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); + char key[ROCKS_KEY_LEN]; + size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, pColVal->cid); + rocksdb_writebatch_put(wb, key, klen, value, vlen); + taosMemoryFree(value); + } + } + + if (!COL_VAL_IS_NONE(pColVal)) { + SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]); + + if (NULL == pLastCol || pLastCol->ts <= keyTs) { + char *value = NULL; + size_t vlen = 0; + tsdbCacheSerialize(&(SLastCol){.ts = keyTs, .colVal = *pColVal}, &value, &vlen); + char key[ROCKS_KEY_LEN]; + size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, pColVal->cid); + rocksdb_writebatch_put(wb, key, klen, value, vlen); + taosMemoryFree(value); + } + } + + rocksdb_free(values_list[i]); + rocksdb_free(values_list[i + num_keys]); + } + taosMemoryFree(values_list); + taosMemoryFree(values_list_sizes); + + char *err = NULL; + rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); + if (NULL != err) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); + rocksdb_free(err); + } + taosThreadMutexUnlock(&pTsdb->rCache.rMutex); + rocksdb_writebatch_clear(wb); + +_exit: + taosArrayDestroy(aColVal); + taosMemoryFree(pTSchema); + return code; +} + +static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, + int nCols, int16_t *slotIds); + +static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, + int nCols, int16_t *slotIds); + +int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int32_t ltype) { + static char const *alstring[2] = {"last_row", "last"}; + char const *lstring = alstring[ltype]; + int32_t code = 0; + + SArray *pCidList = pr->pCidList; + int num_keys = TARRAY_SIZE(pCidList); + char **keys_list = taosMemoryCalloc(num_keys, sizeof(char *)); + size_t *keys_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t)); + for (int i = 0; i < num_keys; ++i) { + int16_t cid = *(int16_t *)taosArrayGet(pCidList, i); + + char *keys = taosMemoryCalloc(2, ROCKS_KEY_LEN); + int last_key_len = snprintf(keys, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":%s", uid, cid, lstring); + if (last_key_len >= ROCKS_KEY_LEN) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); + } + + keys_list[i] = keys; + keys_list_sizes[i] = last_key_len; + } + char **values_list = taosMemoryCalloc(num_keys, sizeof(char *)); + size_t *values_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t)); + char **errs = taosMemoryCalloc(num_keys, sizeof(char *)); + rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys, (const char *const *)keys_list, + keys_list_sizes, values_list, values_list_sizes, errs); + for (int i = 0; i < num_keys; ++i) { + taosMemoryFree(keys_list[i]); + rocksdb_free(errs[i]); + } + taosMemoryFree(keys_list); + taosMemoryFree(keys_list_sizes); + taosMemoryFree(errs); + + for (int i = 0; i < num_keys; ++i) { + bool freeCol = true; + SArray *pTmpColArray = NULL; + SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); + int16_t cid = *(int16_t *)taosArrayGet(pCidList, i); + SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)}; + if (pLastCol) { + SColVal *pColVal = &pLastCol->colVal; + if (IS_VAR_DATA_TYPE(pColVal->type)) { + uint8_t *pVal = pColVal->value.pData; + pColVal->value.pData = taosMemoryMalloc(pColVal->value.nData); + if (pColVal->value.nData) { + memcpy(pColVal->value.pData, pVal, pColVal->value.nData); + } + } + } else { + taosThreadMutexLock(&pTsdb->rCache.rMutex); + + pLastCol = tsdbCacheLookup(pTsdb, uid, cid, lstring); + if (!pLastCol) { + // recalc: load from tsdb + int16_t aCols[1] = {cid}; + int16_t slotIds[1] = {pr->pSlotIds[i]}; + pTmpColArray = NULL; + + if (ltype) { + mergeLastCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds); + } else { + mergeLastRowCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds); + } + + if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= 1) { + pLastCol = taosArrayGet(pTmpColArray, 0); + freeCol = false; + } + + // still null, then make up a none col value + if (!pLastCol) { + pLastCol = &noneCol; + freeCol = false; + } + + // store result back to rocks cache + rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; + char *value = NULL; + size_t vlen = 0; + tsdbCacheSerialize(pLastCol, &value, &vlen); + char key[ROCKS_KEY_LEN]; + size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":%s", uid, pLastCol->colVal.cid, lstring); + rocksdb_writebatch_put(wb, key, klen, value, vlen); + + taosMemoryFree(value); + + char *err = NULL; + rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); + if (NULL != err) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); + rocksdb_free(err); + } + } + + taosThreadMutexUnlock(&pTsdb->rCache.rMutex); + } + + taosArrayPush(pLastArray, pLastCol); + + taosArrayDestroy(pTmpColArray); + if (freeCol) { + taosMemoryFree(pLastCol); + } + } + taosMemoryFree(values_list); + taosMemoryFree(values_list_sizes); + + return code; +} + +int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) { + int32_t code = 0; + // 1, fetch schema + STSchema *pTSchema = NULL; + int32_t sver = -1; + code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return -1; + } + + // 3, build keys & multi get from rocks + int num_keys = pTSchema->numOfCols; + char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); + size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); + for (int i = 0; i < num_keys; ++i) { + int16_t cid = pTSchema->columns[i].colId; + + char *keys = taosMemoryCalloc(2, ROCKS_KEY_LEN); + int last_key_len = snprintf(keys, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, cid); + if (last_key_len >= ROCKS_KEY_LEN) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); + } + int lr_key_len = snprintf(keys + ROCKS_KEY_LEN, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, cid); + if (lr_key_len >= ROCKS_KEY_LEN) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code)); + } + keys_list[i] = keys; + keys_list[num_keys + i] = keys + ROCKS_KEY_LEN; + keys_list_sizes[i] = last_key_len; + keys_list_sizes[num_keys + i] = lr_key_len; + } + char **values_list = taosMemoryCalloc(num_keys * 2, sizeof(char *)); + size_t *values_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t)); + char **errs = taosMemoryCalloc(num_keys * 2, sizeof(char *)); + taosThreadMutexLock(&pTsdb->rCache.rMutex); + rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys * 2, (const char *const *)keys_list, + keys_list_sizes, values_list, values_list_sizes, errs); + for (int i = 0; i < num_keys; ++i) { + taosMemoryFree(keys_list[i]); + } + for (int i = 0; i < num_keys * 2; ++i) { + rocksdb_free(errs[i]); + } + taosMemoryFree(keys_list); + taosMemoryFree(keys_list_sizes); + taosMemoryFree(errs); + + rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; + for (int i = 0; i < num_keys; ++i) { + SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); + if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) { + char key[ROCKS_KEY_LEN]; + size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last", uid, pLastCol->colVal.cid); + rocksdb_writebatch_delete(wb, key, klen); + } + + pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]); + if (NULL != pLastCol && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) { + char key[ROCKS_KEY_LEN]; + size_t klen = snprintf(key, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":last_row", uid, pLastCol->colVal.cid); + rocksdb_writebatch_delete(wb, key, klen); + } + + rocksdb_free(values_list[i]); + rocksdb_free(values_list[i + num_keys]); + } + taosMemoryFree(values_list); + taosMemoryFree(values_list_sizes); + + char *err = NULL; + rocksdb_write(pTsdb->rCache.db, pTsdb->rCache.writeoptions, wb, &err); + if (NULL != err) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err); + rocksdb_free(err); + } + taosThreadMutexUnlock(&pTsdb->rCache.rMutex); + rocksdb_writebatch_clear(wb); + +_exit: + taosMemoryFree(pTSchema); + + return code; +} + int32_t tsdbOpenCache(STsdb *pTsdb) { int32_t code = 0; SLRUCache *pCache = NULL; @@ -64,6 +559,12 @@ int32_t tsdbOpenCache(STsdb *pTsdb) { goto _err; } + code = tsdbOpenRocksCache(pTsdb); + if (code != TSDB_CODE_SUCCESS) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + taosLRUCacheSetStrictCapacity(pCache, false); taosThreadMutexInit(&pTsdb->lruMutex, NULL); @@ -84,6 +585,7 @@ void tsdbCloseCache(STsdb *pTsdb) { } tsdbCloseBICache(pTsdb); + tsdbCloseRocksCache(pTsdb); } static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) { @@ -170,7 +672,7 @@ int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) { return code; } - +/* int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) { int32_t code = 0; char key[32] = {0}; @@ -225,7 +727,7 @@ int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) { return code; } - +*/ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup) { int32_t code = 0; STSRow *cacheRow = NULL; @@ -1423,6 +1925,21 @@ static int32_t initLastColArray(STSchema *pTSchema, SArray **ppColArray) { return TSDB_CODE_SUCCESS; } +static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray, int16_t *slotIds, int nCols) { + SArray *pColArray = taosArrayInit(nCols, sizeof(SLastCol)); + if (NULL == pColArray) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + for (int32_t i = 0; i < nCols; ++i) { + int16_t slotId = slotIds[i]; + SLastCol col = {.ts = 0, .colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)}; + taosArrayPush(pColArray, &col); + } + *ppColArray = pColArray; + return TSDB_CODE_SUCCESS; +} + static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) { int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols; *ppDst = taosMemoryMalloc(len); @@ -1761,6 +2278,342 @@ _err: return code; } +static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, + int nCols, int16_t *slotIds) { + STSchema *pTSchema = pr->pSchema; // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1); + int16_t nLastCol = nCols; + int16_t noneCol = 0; + bool setNoneCol = false; + bool hasRow = false; + bool ignoreEarlierTs = false; + SArray *pColArray = NULL; + SColVal *pColVal = &(SColVal){0}; + + int32_t code = initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t)); + if (NULL == aColArray) { + taosArrayDestroy(pColArray); + + return TSDB_CODE_OUT_OF_MEMORY; + } + + for (int i = 0; i < nCols; ++i) { + taosArrayPush(aColArray, &aCols[i]); + } + + TSKEY lastRowTs = TSKEY_MAX; + + CacheNextRowIter iter = {0}; + nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader, + &pr->pDataFReaderLast, pr->lastTs); + + do { + TSDBROW *pRow = NULL; + nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray)); + + if (!pRow) { + break; + } + + hasRow = true; + + int32_t sversion = TSDBROW_SVERSION(pRow); + if (sversion != -1) { + code = updateTSchema(sversion, pr, uid); + if (TSDB_CODE_SUCCESS != code) { + goto _err; + } + pTSchema = pr->pCurrSchema; + } + // int16_t nCol = pTSchema->numOfCols; + + TSKEY rowTs = TSDBROW_TS(pRow); + + if (lastRowTs == TSKEY_MAX) { + lastRowTs = rowTs; + + for (int16_t iCol = noneCol; iCol < nCols; ++iCol) { + if (iCol >= nLastCol) { + break; + } + SLastCol *pCol = taosArrayGet(pColArray, iCol); + if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) { + continue; + } + if (slotIds[iCol] == 0) { + STColumn *pTColumn = &pTSchema->columns[0]; + + *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = rowTs}); + taosArraySet(pColArray, 0, &(SLastCol){.ts = rowTs, .colVal = *pColVal}); + continue; + } + tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); + + *pCol = (SLastCol){.ts = rowTs, .colVal = *pColVal}; + if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) { + pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData); + if (pCol->colVal.value.pData == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData); + } + + if (!COL_VAL_IS_VALUE(pColVal)) { + if (!setNoneCol) { + noneCol = iCol; + setNoneCol = true; + } + } else { + int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ); + if (aColIndex >= 0) { + taosArrayRemove(aColArray, aColIndex); + } + } + } + if (!setNoneCol) { + // done, goto return pColArray + break; + } else { + continue; + } + } + + // merge into pColArray + setNoneCol = false; + for (int16_t iCol = noneCol; iCol < nCols; ++iCol) { + if (iCol >= nLastCol) { + break; + } + // high version's column value + SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol); + if (lastColVal->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) { + continue; + } + SColVal *tColVal = &lastColVal->colVal; + + tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); + if (!COL_VAL_IS_VALUE(tColVal) && COL_VAL_IS_VALUE(pColVal)) { + SLastCol lastCol = {.ts = rowTs, .colVal = *pColVal}; + if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) { + SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol); + taosMemoryFree(pLastCol->colVal.value.pData); + + lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData); + if (lastCol.colVal.value.pData == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData); + } + + taosArraySet(pColArray, iCol, &lastCol); + int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ); + taosArrayRemove(aColArray, aColIndex); + } else if (!COL_VAL_IS_VALUE(tColVal) && !COL_VAL_IS_VALUE(pColVal) && !setNoneCol) { + noneCol = iCol; + setNoneCol = true; + } + } + } while (setNoneCol); + + if (!hasRow) { + if (ignoreEarlierTs) { + taosArrayDestroy(pColArray); + pColArray = NULL; + } else { + taosArrayClear(pColArray); + } + } + *ppLastArray = pColArray; + + nextRowIterClose(&iter); + taosArrayDestroy(aColArray); + + return code; + +_err: + nextRowIterClose(&iter); + // taosMemoryFreeClear(pTSchema); + *ppLastArray = NULL; + taosArrayDestroy(pColArray); + taosArrayDestroy(aColArray); + return code; +} + +static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, + int nCols, int16_t *slotIds) { + STSchema *pTSchema = pr->pSchema; // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1); + int16_t nLastCol = nCols; + int16_t noneCol = 0; + bool setNoneCol = false; + bool hasRow = false; + bool ignoreEarlierTs = false; + SArray *pColArray = NULL; + SColVal *pColVal = &(SColVal){0}; + + int32_t code = initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t)); + if (NULL == aColArray) { + taosArrayDestroy(pColArray); + + return TSDB_CODE_OUT_OF_MEMORY; + } + + for (int i = 0; i < nCols; ++i) { + taosArrayPush(aColArray, &aCols[i]); + } + + TSKEY lastRowTs = TSKEY_MAX; + + CacheNextRowIter iter = {0}; + nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader, + &pr->pDataFReaderLast, pr->lastTs); + + do { + TSDBROW *pRow = NULL; + nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray)); + + if (!pRow) { + break; + } + + hasRow = true; + + int32_t sversion = TSDBROW_SVERSION(pRow); + if (sversion != -1) { + code = updateTSchema(sversion, pr, uid); + if (TSDB_CODE_SUCCESS != code) { + goto _err; + } + pTSchema = pr->pCurrSchema; + } + // int16_t nCol = pTSchema->numOfCols; + + TSKEY rowTs = TSDBROW_TS(pRow); + + if (lastRowTs == TSKEY_MAX) { + lastRowTs = rowTs; + + for (int16_t iCol = noneCol; iCol < nCols; ++iCol) { + if (iCol >= nLastCol) { + break; + } + SLastCol *pCol = taosArrayGet(pColArray, iCol); + if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) { + continue; + } + if (slotIds[iCol] == 0) { + STColumn *pTColumn = &pTSchema->columns[0]; + + *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = rowTs}); + taosArraySet(pColArray, 0, &(SLastCol){.ts = rowTs, .colVal = *pColVal}); + continue; + } + tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); + + *pCol = (SLastCol){.ts = rowTs, .colVal = *pColVal}; + if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) { + pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData); + if (pCol->colVal.value.pData == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData); + } + + if (COL_VAL_IS_NONE(pColVal)) { + if (!setNoneCol) { + noneCol = iCol; + setNoneCol = true; + } + } else { + int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ); + if (aColIndex >= 0) { + taosArrayRemove(aColArray, aColIndex); + } + } + } + if (!setNoneCol) { + // done, goto return pColArray + break; + } else { + continue; + } + } + + // merge into pColArray + setNoneCol = false; + for (int16_t iCol = noneCol; iCol < nCols; ++iCol) { + if (iCol >= nLastCol) { + break; + } + // high version's column value + SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol); + if (lastColVal->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) { + continue; + } + SColVal *tColVal = &lastColVal->colVal; + + tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); + if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) { + SLastCol lastCol = {.ts = rowTs, .colVal = *pColVal}; + if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) { + SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol); + taosMemoryFree(pLastCol->colVal.value.pData); + + lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData); + if (lastCol.colVal.value.pData == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData); + } + + taosArraySet(pColArray, iCol, &lastCol); + int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ); + taosArrayRemove(aColArray, aColIndex); + } else if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal) && !setNoneCol) { + noneCol = iCol; + setNoneCol = true; + } + } + } while (setNoneCol); + + if (!hasRow) { + if (ignoreEarlierTs) { + taosArrayDestroy(pColArray); + pColArray = NULL; + } else { + taosArrayClear(pColArray); + } + } + *ppLastArray = pColArray; + + nextRowIterClose(&iter); + taosArrayDestroy(aColArray); + + return code; + +_err: + nextRowIterClose(&iter); + + *ppLastArray = NULL; + taosArrayDestroy(pColArray); + taosArrayDestroy(aColArray); + return code; +} + int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **handle) { int32_t code = 0; char key[32] = {0}; diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index d6e819d84ad3d8a6131dcf15f09afdc8ff4b7f64..0d47940582a9fed54eee0104746c52965f44c0b8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -21,47 +21,30 @@ #define HASTYPE(_type, _t) (((_type) & (_t)) == (_t)) static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds, - void** pRes, const char* idStr) { + const int32_t* dstSlotIds, void** pRes, const char* idStr) { int32_t numOfRows = pBlock->info.rows; + bool allNullRow = true; if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) { - bool allNullRow = true; - for (int32_t i = 0; i < pReader->numOfCols; ++i) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]); SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[i]); + int32_t slotId = slotIds[i]; + SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i); - if (slotIds[i] == -1) { // the primary timestamp - SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0); - p->ts = pColVal->ts; - p->bytes = TSDB_KEYSIZE; - *(int64_t*)p->buf = pColVal->ts; - allNullRow = false; - } else { - int32_t slotId = slotIds[i]; - // add check for null value, caused by the modification of table schema (new column added). - if (slotId >= taosArrayGetSize(pRow)) { - p->ts = 0; - p->isNull = true; - colDataSetNULL(pColInfoData, numOfRows); - continue; - } - - SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId); + p->ts = pColVal->ts; + p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal); + allNullRow = p->isNull & allNullRow; - p->ts = pColVal->ts; - p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal); - allNullRow = p->isNull & allNullRow; + if (!p->isNull) { + if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) { + varDataSetLen(p->buf, pColVal->colVal.value.nData); - if (!p->isNull) { - if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) { - varDataSetLen(p->buf, pColVal->colVal.value.nData); - memcpy(varDataVal(p->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData); - p->bytes = pColVal->colVal.value.nData + VARSTR_HEADER_SIZE; // binary needs to plus the header size - } else { - memcpy(p->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes); - p->bytes = pReader->pSchema->columns[slotId].bytes; - } + memcpy(varDataVal(p->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData); + p->bytes = pColVal->colVal.value.nData + VARSTR_HEADER_SIZE; // binary needs to plus the header size + } else { + memcpy(p->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes); + p->bytes = pReader->pSchema->columns[slotId].bytes; } } @@ -74,36 +57,31 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p pBlock->info.rows += allNullRow ? 0 : 1; } else if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW)) { for (int32_t i = 0; i < pReader->numOfCols; ++i) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]); - if (slotIds[i] == -1) { - SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0); - colDataSetVal(pColInfoData, numOfRows, (const char*)&pColVal->ts, false); - } else { - int32_t slotId = slotIds[i]; - // add check for null value, caused by the modification of table schema (new column added). - if (slotId >= taosArrayGetSize(pRow)) { - colDataSetNULL(pColInfoData, numOfRows); - continue; - } - SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId); - SColVal* pVal = &pColVal->colVal; + int32_t slotId = slotIds[i]; + SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i); + SColVal* pVal = &pColVal->colVal; - if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) { - if (!COL_VAL_IS_VALUE(&pColVal->colVal)) { - colDataSetNULL(pColInfoData, numOfRows); - } else { - varDataSetLen(pReader->transferBuf[slotId], pVal->value.nData); - memcpy(varDataVal(pReader->transferBuf[slotId]), pVal->value.pData, pVal->value.nData); - colDataSetVal(pColInfoData, numOfRows, pReader->transferBuf[slotId], false); - } + if (COL_VAL_IS_NONE(&pColVal->colVal)) { + continue; + } + allNullRow = false; + if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) { + if (!COL_VAL_IS_VALUE(&pColVal->colVal)) { + colDataSetNULL(pColInfoData, numOfRows); } else { - colDataSetVal(pColInfoData, numOfRows, (const char*)&pVal->value.val, !COL_VAL_IS_VALUE(pVal)); + varDataSetLen(pReader->transferBuf[slotId], pVal->value.nData); + + memcpy(varDataVal(pReader->transferBuf[slotId]), pVal->value.pData, pVal->value.nData); + colDataSetVal(pColInfoData, numOfRows, pReader->transferBuf[slotId], false); } + } else { + colDataSetVal(pColInfoData, numOfRows, (const char*)&pVal->value.val, !COL_VAL_IS_VALUE(pVal)); } } - pBlock->info.rows += 1; + pBlock->info.rows += allNullRow ? 0 : 1; } else { tsdbError("invalid retrieve type:%d, %s", pReader->type, idStr); return TSDB_CODE_INVALID_PARA; @@ -143,7 +121,7 @@ static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* id } int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols, - uint64_t suid, void** pReader, const char* idstr) { + SArray* pCidList, int32_t* pSlotIds, uint64_t suid, void** pReader, const char* idstr) { *pReader = NULL; SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader)); if (p == NULL) { @@ -155,6 +133,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, p->pTsdb = p->pVnode->pTsdb; p->verRange = (SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}; p->numOfCols = numOfCols; + p->pCidList = pCidList; + p->pSlotIds = pSlotIds; p->suid = suid; if (numOfTables == 0) { @@ -226,32 +206,9 @@ void* tsdbCacherowsReaderClose(void* pReader) { return NULL; } -static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint64_t uid, SArray** pRow, - LRUHandle** h) { - int32_t code = TSDB_CODE_SUCCESS; - *pRow = NULL; - - if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) { - code = tsdbCacheGetLastrowH(lruCache, uid, pr, h); - } else { - code = tsdbCacheGetLastH(lruCache, uid, pr, h); - } - - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - // no data in the table of Uid - if (*h != NULL) { - *pRow = (SArray*)taosLRUCacheValue(lruCache, *h); - } - - return code; -} - static void freeItem(void* pItem) { SLastCol* pCol = (SLastCol*)pItem; - if (IS_VAR_DATA_TYPE(pCol->colVal.type)) { + if (IS_VAR_DATA_TYPE(pCol->colVal.type) && pCol->colVal.value.pData) { taosMemoryFree(pCol->colVal.value.pData); } } @@ -277,19 +234,17 @@ static int32_t tsdbCacheQueryReseek(void* pQHandle) { } } -int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) { +int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, const int32_t* dstSlotIds, + SArray* pTableUidList) { if (pReader == NULL || pResBlock == NULL) { return TSDB_CODE_INVALID_PARA; } SCacheRowsReader* pr = pReader; - - int32_t code = TSDB_CODE_SUCCESS; - SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache; - LRUHandle* h = NULL; - SArray* pRow = NULL; - bool hasRes = false; - SArray* pLastCols = NULL; + int32_t code = TSDB_CODE_SUCCESS; + SArray* pRow = taosArrayInit(TARRAY_SIZE(pr->pCidList), sizeof(SLastCol)); + bool hasRes = false; + SArray* pLastCols = NULL; void** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES); if (pRes == NULL) { @@ -298,20 +253,22 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } for (int32_t j = 0; j < pr->numOfCols; ++j) { - pRes[j] = taosMemoryCalloc( - 1, sizeof(SFirstLastRes) + pr->pSchema->columns[-1 == slotIds[j] ? 0 : slotIds[j]].bytes + VARSTR_HEADER_SIZE); + pRes[j] = + taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[/*-1 == slotIds[j] ? 0 : */ slotIds[j]].bytes + + VARSTR_HEADER_SIZE); SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]); p->ts = INT64_MIN; } - pLastCols = taosArrayInit(pr->pSchema->numOfCols, sizeof(SLastCol)); + pLastCols = taosArrayInit(pr->numOfCols, sizeof(SLastCol)); if (pLastCols == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _end; } - for (int32_t i = 0; i < pr->pSchema->numOfCols; ++i) { - struct STColumn* pCol = &pr->pSchema->columns[i]; + for (int32_t i = 0; i < pr->numOfCols; ++i) { + int32_t slotId = slotIds[i]; + struct STColumn* pCol = &pr->pSchema->columns[slotId]; SLastCol p = {.ts = INT64_MIN, .colVal.type = pCol->type, .colVal.flag = CV_FLAG_NULL}; if (IS_VAR_DATA_TYPE(pCol->type)) { @@ -328,6 +285,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 pr->pDataFReader = NULL; pr->pDataFReaderLast = NULL; + int32_t ltype = (pr->type & CACHESCAN_RETRIEVE_LAST) >> 3; + // retrieve the only one last row of all tables in the uid list. if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) { int64_t st = taosGetTimestampUs(); @@ -335,16 +294,14 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 for (int32_t i = 0; i < pr->numOfTables; ++i) { STableKeyInfo* pKeyInfo = &pr->pTableList[i]; - code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h); - if (code != TSDB_CODE_SUCCESS) { - goto _end; - } - - if (h == NULL) { + tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype); + if (TARRAY_SIZE(pRow) <= 0) { + taosArrayClearEx(pRow, freeItem); continue; } - if (taosArrayGetSize(pRow) <= 0) { - tsdbCacheRelease(lruCache, h); + SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0); + if (COL_VAL_IS_NONE(&pColVal->colVal)) { + taosArrayClearEx(pRow, freeItem); continue; } @@ -352,47 +309,34 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 bool hasNotNullRow = true; int64_t singleTableLastTs = INT64_MAX; for (int32_t k = 0; k < pr->numOfCols; ++k) { - int32_t slotId = slotIds[k]; - - if (slotId == -1) { // the primary timestamp - SLastCol* p = taosArrayGet(pLastCols, 0); - SLastCol* pCol = (SLastCol*)taosArrayGet(pRow, 0); - if (pCol->ts > p->ts) { - hasRes = true; - p->ts = pCol->ts; - p->colVal = pCol->colVal; - singleTableLastTs = pCol->ts; - } - } else { - SLastCol* p = taosArrayGet(pLastCols, slotId); - SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId); - - if (pColVal->ts > p->ts) { - if (!COL_VAL_IS_VALUE(&pColVal->colVal) && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) { - if (!COL_VAL_IS_VALUE(&p->colVal)) { - hasNotNullRow = false; - } - continue; - } + SLastCol* p = taosArrayGet(pLastCols, k); + SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, k); - hasRes = true; - p->ts = pColVal->ts; - if (pColVal->ts < singleTableLastTs && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) { - singleTableLastTs = pColVal->ts; + if (pColVal->ts > p->ts) { + if (!COL_VAL_IS_VALUE(&pColVal->colVal) && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) { + if (!COL_VAL_IS_VALUE(&p->colVal)) { + hasNotNullRow = false; } + continue; + } - if (!IS_VAR_DATA_TYPE(pColVal->colVal.type)) { - p->colVal = pColVal->colVal; - } else { - if (COL_VAL_IS_VALUE(&pColVal->colVal)) { - memcpy(p->colVal.value.pData, pColVal->colVal.value.pData, pColVal->colVal.value.nData); - } - - p->colVal.value.nData = pColVal->colVal.value.nData; - p->colVal.type = pColVal->colVal.type; - p->colVal.flag = pColVal->colVal.flag; - p->colVal.cid = pColVal->colVal.cid; + hasRes = true; + p->ts = pColVal->ts; + if (pColVal->ts < singleTableLastTs && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) { + singleTableLastTs = pColVal->ts; + } + + if (!IS_VAR_DATA_TYPE(pColVal->colVal.type)) { + p->colVal = pColVal->colVal; + } else { + if (COL_VAL_IS_VALUE(&pColVal->colVal)) { + memcpy(p->colVal.value.pData, pColVal->colVal.value.pData, pColVal->colVal.value.nData); } + + p->colVal.value.nData = pColVal->colVal.value.nData; + p->colVal.type = pColVal->colVal.type; + p->colVal.flag = pColVal->colVal.flag; + p->colVal.cid = pColVal->colVal.cid; } } } @@ -414,33 +358,31 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 taosArraySet(pTableUidList, 0, &pKeyInfo->uid); } - tsdbCacheRelease(lruCache, h); + taosArrayClearEx(pRow, freeItem); } if (hasRes) { - saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes, pr->idstr); + saveOneRow(pLastCols, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr); } } else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) { for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) { STableKeyInfo* pKeyInfo = &pr->pTableList[i]; - code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h); - if (code != TSDB_CODE_SUCCESS) { - goto _end; - } - if (h == NULL) { + tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype); + if (TARRAY_SIZE(pRow) <= 0) { + taosArrayClearEx(pRow, freeItem); continue; } - if (taosArrayGetSize(pRow) <= 0) { - tsdbCacheRelease(lruCache, h); + SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0); + if (COL_VAL_IS_NONE(&pColVal->colVal)) { + taosArrayClearEx(pRow, freeItem); continue; } - saveOneRow(pRow, pResBlock, pr, slotIds, pRes, pr->idstr); - // TODO reset the pRes + saveOneRow(pRow, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr); + taosArrayClearEx(pRow, freeItem); taosArrayPush(pTableUidList, &pKeyInfo->uid); - tsdbCacheRelease(lruCache, h); pr->tableIndex += 1; if (pResBlock->info.rows >= pResBlock->info.capacity) { @@ -466,6 +408,7 @@ _end: } taosMemoryFree(pRes); + taosArrayDestroyEx(pRow, freeItem); taosArrayDestroyEx(pLastCols, freeItem); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index d0ff403bf70a96f114b0722bfb13f7ce035edfb4..b6141abcf933819e9c71f18c811b3e92d1d9080f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -140,7 +140,6 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid SMemTable *pMemTable = pTsdb->mem; STbData *pTbData = NULL; SVBufPool *pPool = pTsdb->pVnode->inUse; - TSDBKEY lastKey = {.version = version, .ts = eKey}; // check if table exists SMetaInfo info; @@ -181,7 +180,7 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid pMemTable->nDel++; pMemTable->minVer = TMIN(pMemTable->minVer, version); pMemTable->maxVer = TMIN(pMemTable->maxVer, version); - + /* if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) { tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid, eKey); } @@ -189,6 +188,10 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) { tsdbCacheDeleteLast(pTsdb->lruCache, pTbData->uid, eKey); } + */ + if (eKey >= pTbData->maxKey && sKey <= pTbData->maxKey) { + tsdbCacheDel(pTsdb, suid, uid, sKey, eKey); + } tsdbTrace("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64 " at version %" PRId64, @@ -284,8 +287,8 @@ bool tsdbTbDataIterNext(STbDataIter *pIter) { int64_t tsdbCountTbDataRows(STbData *pTbData) { SMemSkipListNode *pNode = pTbData->sl.pHead; - int64_t rowsNum = 0; - + int64_t rowsNum = 0; + while (NULL != pNode) { pNode = SL_GET_NODE_FORWARD(pNode, 0); if (pNode == pTbData->sl.pTail) { @@ -298,17 +301,17 @@ int64_t tsdbCountTbDataRows(STbData *pTbData) { return rowsNum; } -void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj* pTableMap, int64_t *rowsNum) { +void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj *pTableMap, int64_t *rowsNum) { taosRLockLatch(&pMemTable->latch); for (int32_t i = 0; i < pMemTable->nBucket; ++i) { STbData *pTbData = pMemTable->aBucket[i]; while (pTbData) { - void* p = taosHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid)); + void *p = taosHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid)); if (p == NULL) { pTbData = pTbData->next; continue; } - + *rowsNum += tsdbCountTbDataRows(pTbData); pTbData = pTbData->next; } @@ -668,15 +671,8 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, if (key.ts >= pTbData->maxKey) { pTbData->maxKey = key.ts; - - if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config)) { - tsdbCacheInsertLastrow(pMemTable->pTsdb->lruCache, pMemTable->pTsdb, pTbData->uid, &lRow, true); - } - } - - if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) { - tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, &lRow, pMemTable->pTsdb); } + tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow); // SMemTable pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey); @@ -736,15 +732,8 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, if (key.ts >= pTbData->maxKey) { pTbData->maxKey = key.ts; - - if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config)) { - tsdbCacheInsertLastrow(pMemTable->pTsdb->lruCache, pMemTable->pTsdb, pTbData->uid, &lRow, true); - } - } - - if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) { - tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, &lRow, pMemTable->pTsdb); } + tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow); // SMemTable pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey); diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 847125018c5f15baa660dc2f53677e17e18e0d7f..74168591d21ef151b0885e15076dc0842b7ab445 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -144,8 +144,8 @@ _exit: } int vnodeShouldCommit(SVnode *pVnode, bool atExit) { - bool diskAvail = osDataSpaceAvailable(); - bool needCommit = false; + bool diskAvail = osDataSpaceAvailable(); + bool needCommit = false; taosThreadMutexLock(&pVnode->mutex); if (pVnode->inUse && diskAvail) { @@ -439,6 +439,9 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { code = tsdbCommit(pVnode->pTsdb, pInfo); TSDB_CHECK_CODE(code, lino, _exit); + code = tsdbCacheCommit(pVnode->pTsdb); + TSDB_CHECK_CODE(code, lino, _exit); + if (VND_IS_RSMA(pVnode)) { code = smaCommit(pVnode->pSma, pInfo); TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 61f024ebb70bcbbf2b2127b41d3ac5f22ec74693..925e3054505af7039a6c3d00672880aebca0ebce 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -31,18 +31,21 @@ typedef struct SCacheRowsScanInfo { void* pLastrowReader; SColMatchInfo matchInfo; int32_t* pSlotIds; + int32_t* pDstSlotIds; SExprSupp pseudoExprSup; int32_t retrieveType; int32_t currentGroupIndex; SSDataBlock* pBufferredRes; SArray* pUidList; + SArray* pCidList; int32_t indexOfBufferedRes; STableListInfo* pTableList; } SCacheRowsScanInfo; static SSDataBlock* doScanCache(SOperatorInfo* pOperator); static void destroyCacheScanOperator(void* param); -static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds); +static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds, + int32_t** pDstSlotIds); static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo); #define SCAN_ROW_TYPE(_t) ((_t) ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW) @@ -71,9 +74,16 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe goto _error; } + SArray* pCidList = taosArrayInit(numOfCols, sizeof(int16_t)); + for (int i = 0; i < TARRAY_SIZE(pInfo->matchInfo.pList); ++i) { + SColMatchItem* pColInfo = taosArrayGet(pInfo->matchInfo.pList, i); + taosArrayPush(pCidList, &pColInfo->colId); + } + pInfo->pCidList = pCidList; + removeRedundantTsCol(pScanNode, &pInfo->matchInfo); - code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds); + code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds, &pInfo->pDstSlotIds); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -91,8 +101,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe uint64_t suid = tableListGetSuid(pTableListInfo); code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables, - taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, - pTaskInfo->id.str); + taosArrayGetSize(pInfo->matchInfo.pList), pCidList, pInfo->pSlotIds, suid, + &pInfo->pLastrowReader, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -160,8 +170,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { blockDataCleanup(pInfo->pBufferredRes); taosArrayClear(pInfo->pUidList); - int32_t code = - tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds, pInfo->pUidList); + int32_t code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds, + pInfo->pDstSlotIds, pInfo->pUidList); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -227,8 +237,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { } code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num, - taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, - pTaskInfo->id.str); + taosArrayGetSize(pInfo->matchInfo.pList), pInfo->pCidList, pInfo->pSlotIds, suid, + &pInfo->pLastrowReader, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { pInfo->currentGroupIndex += 1; taosArrayClear(pInfo->pUidList); @@ -237,7 +247,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { taosArrayClear(pInfo->pUidList); - code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList); + code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pDstSlotIds, + pInfo->pUidList); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -280,6 +291,8 @@ void destroyCacheScanOperator(void* param) { blockDataDestroy(pInfo->pRes); blockDataDestroy(pInfo->pBufferredRes); taosMemoryFree(pInfo->pSlotIds); + taosMemoryFree(pInfo->pDstSlotIds); + taosArrayDestroy(pInfo->pCidList); taosArrayDestroy(pInfo->pUidList); taosArrayDestroy(pInfo->matchInfo.pList); tableListDestroy(pInfo->pTableList); @@ -292,7 +305,8 @@ void destroyCacheScanOperator(void* param) { taosMemoryFreeClear(param); } -int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds) { +int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds, + int32_t** pDstSlotIds) { size_t numOfCols = taosArrayGetSize(pColMatchInfo); *pSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t)); @@ -300,18 +314,25 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask return TSDB_CODE_OUT_OF_MEMORY; } + *pDstSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t)); + if (*pDstSlotIds == NULL) { + taosMemoryFree(*pSlotIds); + return TSDB_CODE_OUT_OF_MEMORY; + } + SSchemaWrapper* pWrapper = pTaskInfo->schemaInfo.sw; for (int32_t i = 0; i < numOfCols; ++i) { SColMatchItem* pColMatch = taosArrayGet(pColMatchInfo, i); for (int32_t j = 0; j < pWrapper->nCols; ++j) { - if (pColMatch->colId == pWrapper->pSchema[j].colId && pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { + /* if (pColMatch->colId == pWrapper->pSchema[j].colId && pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { (*pSlotIds)[pColMatch->dstSlotId] = -1; break; - } + }*/ if (pColMatch->colId == pWrapper->pSchema[j].colId) { - (*pSlotIds)[pColMatch->dstSlotId] = j; + (*pSlotIds)[i] = j; + (*pDstSlotIds)[i] = pColMatch->dstSlotId; break; } } diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 260dfd486e0d10b772b0342cfb13123b1a40c67a..7b533f2cc4c53e61f19d4d19849d0d28da0144b7 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -134,7 +134,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_database.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_telnet_line_taosc_insert.py -,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_json_taosc_insert.py +#,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_json_taosc_insert.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_stmt_set_tbname_tag.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_stable.py diff --git a/tests/parallel_test/container_build.sh b/tests/parallel_test/container_build.sh index 80236cf604d63e4d9c8fdddfa81aeaf16fb64944..0fc29c241b24db24b92862b57a9e61278e2c6fe0 100755 --- a/tests/parallel_test/container_build.sh +++ b/tests/parallel_test/container_build.sh @@ -68,7 +68,7 @@ docker run \ -v ${REP_REAL_PATH}/community/contrib/libuv/:${REP_DIR}/community/contrib/libuv \ -v ${REP_REAL_PATH}/community/contrib/lz4/:${REP_DIR}/community/contrib/lz4 \ -v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \ - --rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=true;make -j || exit 1" + --rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=0;make -j || exit 1" # -v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \ if [[ -d ${WORKDIR}/debugNoSan ]] ;then @@ -97,7 +97,7 @@ docker run \ -v ${REP_REAL_PATH}/community/contrib/lz4/:${REP_DIR}/community/contrib/lz4 \ -v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \ -v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \ - --rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DTOOLS_BUILD_TYPE=Debug -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=true;make -j || exit 1 " + --rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DTOOLS_BUILD_TYPE=Debug -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=0;make -j || exit 1 " mv ${REP_REAL_PATH}/debug ${WORKDIR}/debugSan