未验证 提交 ebdc1a32 编写于 作者: W wade zhang 提交者: GitHub

Merge pull request #20996 from taosdata/fix/TD-23623

 enh(cache/rocks): base for rocks put/get 
...@@ -120,8 +120,14 @@ ELSE () ...@@ -120,8 +120,14 @@ ELSE ()
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=shift-base -fno-sanitize=alignment -g3 -Wformat=0") SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=shift-base -fno-sanitize=alignment -g3 -Wformat=0")
MESSAGE(STATUS "Compile with Address Sanitizer!") MESSAGE(STATUS "Compile with Address Sanitizer!")
ELSE () ELSE ()
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") MESSAGE(STATUS "XXXXXXXXXXXXXX Clang/AppleClang" ${TD_DARWIN})
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-reserved-user-defined-literal -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") IF (${TD_DARWIN})
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-y2k")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-reserved-user-defined-literal -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-y2k")
ELSE ()
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-reserved-user-defined-literal -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
ENDIF ()
ENDIF () ENDIF ()
# disable all assert # disable all assert
......
...@@ -109,7 +109,7 @@ option( ...@@ -109,7 +109,7 @@ option(
option( option(
BUILD_WITH_ROCKSDB BUILD_WITH_ROCKSDB
"If build with rocksdb" "If build with rocksdb"
OFF ON
) )
option( option(
......
...@@ -223,17 +223,31 @@ endif(${BUILD_WITH_LEVELDB}) ...@@ -223,17 +223,31 @@ endif(${BUILD_WITH_LEVELDB})
# rocksdb # rocksdb
# To support rocksdb build on ubuntu: sudo apt-get install libgflags-dev # To support rocksdb build on ubuntu: sudo apt-get install libgflags-dev
if(${BUILD_WITH_ROCKSDB}) if(${BUILD_WITH_ROCKSDB})
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=maybe-uninitialized") #SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=maybe-uninitialized")
option(WITH_TESTS "" OFF) option(WITH_TESTS "" OFF)
option(WITH_BENCHMARK_TOOLS "" OFF) option(WITH_BENCHMARK_TOOLS "" OFF)
option(WITH_TOOLS "" OFF) option(WITH_TOOLS "" OFF)
option(WITH_LIBURING "" OFF) option(WITH_LIBURING "" OFF)
option(WITH_IOSTATS_CONTEXT "" OFF)
option(WITH_PERF_CONTEXT "" OFF)
option(FAIL_ON_WARNINGS "" OFF)
#option(WITH_JEMALLOC "" ON)
option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF) option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF)
IF (${TD_WINDOWS})
option(WITH_MD_LIBRARY "build with MD" OFF)
set(SYSTEM_LIBS ${SYSTEM_LIBS} shlwapi.lib rpcrt4.lib)
endif(${TD_WINDOWS})
add_subdirectory(rocksdb EXCLUDE_FROM_ALL) add_subdirectory(rocksdb EXCLUDE_FROM_ALL)
target_include_directories( target_include_directories(
rocksdb rocksdb
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/rocksdb/include> PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/rocksdb/include>
) )
IF (${TD_DARWIN})
target_compile_options(
rocksdb
PRIVATE -Wno-unused-private-field
)
endif(${TD_DARWIN})
endif(${BUILD_WITH_ROCKSDB}) endif(${BUILD_WITH_ROCKSDB})
# lucene # lucene
......
message("contrib test/rocksdb:" ${BUILD_DEPENDENCY_TESTS})
add_executable(rocksdbTest "") add_executable(rocksdbTest "")
target_sources(rocksdbTest target_sources(rocksdbTest
PRIVATE PRIVATE
"${CMAKE_CURRENT_SOURCE_DIR}/main.c" "${CMAKE_CURRENT_SOURCE_DIR}/main.c"
) )
target_link_libraries(rocksdbTest rocksdb) target_link_libraries(rocksdbTest rocksdb)
\ No newline at end of file
...@@ -25,10 +25,12 @@ int main(int argc, char const *argv[]) { ...@@ -25,10 +25,12 @@ int main(int argc, char const *argv[]) {
// Read // Read
rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create(); rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create();
rocksdb_readoptions_set_snapshot(readoptions, rocksdb_create_snapshot(db)); //rocksdb_readoptions_set_snapshot(readoptions, rocksdb_create_snapshot(db));
char buf[256] = {0};
size_t vallen = 0; size_t vallen = 0;
char * val = rocksdb_get(db, readoptions, "key", 3, &vallen, &err); char * val = rocksdb_get(db, readoptions, "key", 3, &vallen, &err);
printf("val:%s\n", val); snprintf(buf, vallen+5, "val:%s", val);
printf("%ld %ld %s\n", strlen(val), vallen, buf);
// Update // Update
// rocksdb_put(db, writeoptions, "key", 3, "eulav", 5, &err); // rocksdb_put(db, writeoptions, "key", 3, "eulav", 5, &err);
...@@ -43,4 +45,4 @@ int main(int argc, char const *argv[]) { ...@@ -43,4 +45,4 @@ int main(int argc, char const *argv[]) {
rocksdb_close(db); rocksdb_close(db);
return 0; return 0;
} }
\ No newline at end of file
...@@ -1219,6 +1219,7 @@ static int32_t smlPushCols(SArray *colsArray, SArray *cols) { ...@@ -1219,6 +1219,7 @@ static int32_t smlPushCols(SArray *colsArray, SArray *cols) {
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i); SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
if (terrno == TSDB_CODE_DUP_KEY) { if (terrno == TSDB_CODE_DUP_KEY) {
taosHashCleanup(kvHash);
return terrno; return terrno;
} }
} }
...@@ -1292,12 +1293,12 @@ static int32_t smlParseLineBottom(SSmlHandle *info) { ...@@ -1292,12 +1293,12 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
uDebug("SML:0x%" PRIx64 " smlParseLineBottom add meta, format:%d, linenum:%d", info->id, info->dataFormat, uDebug("SML:0x%" PRIx64 " smlParseLineBottom add meta, format:%d, linenum:%d", info->id, info->dataFormat,
info->lineNum); info->lineNum);
SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat); SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat);
taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES);
smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags); smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags);
if (terrno == TSDB_CODE_DUP_KEY) { if (terrno == TSDB_CODE_DUP_KEY) {
return terrno; return terrno;
} }
smlInsertMeta(meta->colHash, meta->cols, elements->colArray); smlInsertMeta(meta->colHash, meta->cols, elements->colArray);
taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES);
} }
} }
uDebug("SML:0x%" PRIx64 " smlParseLineBottom end, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum); uDebug("SML:0x%" PRIx64 " smlParseLineBottom end, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum);
......
...@@ -755,7 +755,7 @@ SColVal *tRowIterNext(SRowIter *pIter) { ...@@ -755,7 +755,7 @@ SColVal *tRowIterNext(SRowIter *pIter) {
} }
if (pIter->pRow->flag == HAS_NULL) { if (pIter->pRow->flag == HAS_NULL) {
pIter->cv = COL_VAL_NULL(pTColumn->type, pTColumn->colId); pIter->cv = COL_VAL_NULL(pTColumn->colId, pTColumn->type);
goto _exit; goto _exit;
} }
...@@ -2439,7 +2439,7 @@ _exit: ...@@ -2439,7 +2439,7 @@ _exit:
int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes, int32_t nRows, char *lengthOrbitmap, int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes, int32_t nRows, char *lengthOrbitmap,
char *data) { char *data) {
int32_t code = 0; int32_t code = 0;
if(data == NULL){ if (data == NULL) {
for (int32_t i = 0; i < nRows; ++i) { for (int32_t i = 0; i < nRows; ++i) {
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0); code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0);
} }
...@@ -2453,8 +2453,9 @@ int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t byt ...@@ -2453,8 +2453,9 @@ int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t byt
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0); code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0);
if (code) goto _exit; if (code) goto _exit;
} else { } else {
if(ASSERT(varDataTLen(data + offset) <= bytes)){ if (ASSERT(varDataTLen(data + offset) <= bytes)) {
uError("var data length invalid, varDataTLen(data + offset):%d <= bytes:%d", (int)varDataTLen(data + offset), bytes); uError("var data length invalid, varDataTLen(data + offset):%d <= bytes:%d", (int)varDataTLen(data + offset),
bytes);
code = TSDB_CODE_INVALID_PARA; code = TSDB_CODE_INVALID_PARA;
goto _exit; goto _exit;
} }
......
...@@ -84,6 +84,7 @@ target_include_directories( ...@@ -84,6 +84,7 @@ target_include_directories(
PUBLIC "inc" PUBLIC "inc"
PUBLIC "src/inc" PUBLIC "src/inc"
PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar" PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar"
PUBLIC "${TD_SOURCE_DIR}/contrib/rocksdb/include"
) )
target_link_libraries( target_link_libraries(
vnode vnode
...@@ -100,6 +101,7 @@ target_link_libraries( ...@@ -100,6 +101,7 @@ target_link_libraries(
# PUBLIC bdb # PUBLIC bdb
# PUBLIC scalar # PUBLIC scalar
PUBLIC rocksdb
PUBLIC transport PUBLIC transport
PUBLIC stream PUBLIC stream
PUBLIC index PUBLIC index
......
...@@ -182,7 +182,7 @@ int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t n ...@@ -182,7 +182,7 @@ int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t n
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables, int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr, bool countOnly); SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr, bool countOnly);
void tsdbReaderSetId(STsdbReader* pReader, const char* idstr); void tsdbReaderSetId(STsdbReader *pReader, const char *idstr);
void tsdbReaderClose(STsdbReader *pReader); void tsdbReaderClose(STsdbReader *pReader);
int32_t tsdbNextDataBlock(STsdbReader *pReader, bool *hasNext); int32_t tsdbNextDataBlock(STsdbReader *pReader, bool *hasNext);
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave); int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave);
...@@ -196,8 +196,9 @@ void *tsdbGetIvtIdx(SMeta *pMeta); ...@@ -196,8 +196,9 @@ void *tsdbGetIvtIdx(SMeta *pMeta);
uint64_t getReaderMaxVersion(STsdbReader *pReader); uint64_t getReaderMaxVersion(STsdbReader *pReader);
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
uint64_t suid, void **pReader, const char *idstr); SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr);
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids); int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds,
SArray *pTableUids);
void *tsdbCacherowsReaderClose(void *pReader); void *tsdbCacherowsReaderClose(void *pReader);
int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
......
...@@ -343,6 +343,16 @@ struct STsdbFS { ...@@ -343,6 +343,16 @@ struct STsdbFS {
SArray *aDFileSet; // SArray<SDFileSet> SArray *aDFileSet; // SArray<SDFileSet>
}; };
typedef struct {
rocksdb_t *db;
rocksdb_options_t *options;
rocksdb_flushoptions_t *flushoptions;
rocksdb_writeoptions_t *writeoptions;
rocksdb_readoptions_t *readoptions;
rocksdb_writebatch_t *writebatch;
TdThreadMutex rMutex;
} SRocksCache;
struct STsdb { struct STsdb {
char *path; char *path;
SVnode *pVnode; SVnode *pVnode;
...@@ -355,6 +365,7 @@ struct STsdb { ...@@ -355,6 +365,7 @@ struct STsdb {
TdThreadMutex lruMutex; TdThreadMutex lruMutex;
SLRUCache *biCache; SLRUCache *biCache;
TdThreadMutex biMutex; TdThreadMutex biMutex;
SRocksCache rCache;
}; };
struct TSDBKEY { struct TSDBKEY {
...@@ -777,6 +788,8 @@ typedef struct SCacheRowsReader { ...@@ -777,6 +788,8 @@ typedef struct SCacheRowsReader {
uint64_t suid; uint64_t suid;
char **transferBuf; // todo remove it soon char **transferBuf; // todo remove it soon
int32_t numOfCols; int32_t numOfCols;
SArray *pCidList;
int32_t *pSlotIds;
int32_t type; int32_t type;
int32_t tableIndex; // currently returned result tables int32_t tableIndex; // currently returned result tables
STableKeyInfo *pTableList; // table id list STableKeyInfo *pTableList; // table id list
...@@ -796,6 +809,10 @@ typedef struct { ...@@ -796,6 +809,10 @@ typedef struct {
int32_t tsdbOpenCache(STsdb *pTsdb); int32_t tsdbOpenCache(STsdb *pTsdb);
void tsdbCloseCache(STsdb *pTsdb); void tsdbCloseCache(STsdb *pTsdb);
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row);
int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int32_t ltype);
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb); int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb);
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup); int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup);
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h); int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h);
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "executor.h" #include "executor.h"
#include "filter.h" #include "filter.h"
#include "qworker.h" #include "qworker.h"
#include "rocksdb/c.h"
#include "sync.h" #include "sync.h"
#include "tRealloc.h" #include "tRealloc.h"
#include "tchecksum.h" #include "tchecksum.h"
...@@ -177,6 +178,7 @@ int tsdbClose(STsdb** pTsdb); ...@@ -177,6 +178,7 @@ int tsdbClose(STsdb** pTsdb);
int32_t tsdbBegin(STsdb* pTsdb); int32_t tsdbBegin(STsdb* pTsdb);
int32_t tsdbPrepareCommit(STsdb* pTsdb); int32_t tsdbPrepareCommit(STsdb* pTsdb);
int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo); int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo);
int32_t tsdbCacheCommit(STsdb* pTsdb);
int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo); int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo);
int32_t tsdbFinishCommit(STsdb* pTsdb); int32_t tsdbFinishCommit(STsdb* pTsdb);
int32_t tsdbRollbackCommit(STsdb* pTsdb); int32_t tsdbRollbackCommit(STsdb* pTsdb);
...@@ -193,9 +195,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode); ...@@ -193,9 +195,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode);
void tqClose(STQ*); void tqClose(STQ*);
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, int tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp,
int32_t type); int32_t type);
int tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer); int tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer);
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
int tqCommit(STQ*); int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
......
...@@ -21,47 +21,30 @@ ...@@ -21,47 +21,30 @@
#define HASTYPE(_type, _t) (((_type) & (_t)) == (_t)) #define HASTYPE(_type, _t) (((_type) & (_t)) == (_t))
static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds, static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
void** pRes, const char* idStr) { const int32_t* dstSlotIds, void** pRes, const char* idStr) {
int32_t numOfRows = pBlock->info.rows; int32_t numOfRows = pBlock->info.rows;
bool allNullRow = true;
if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) { if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
bool allNullRow = true;
for (int32_t i = 0; i < pReader->numOfCols; ++i) { for (int32_t i = 0; i < pReader->numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]);
SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[i]); SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[i]);
int32_t slotId = slotIds[i];
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i);
if (slotIds[i] == -1) { // the primary timestamp p->ts = pColVal->ts;
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0); p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal);
p->ts = pColVal->ts; allNullRow = p->isNull & allNullRow;
p->bytes = TSDB_KEYSIZE;
*(int64_t*)p->buf = pColVal->ts;
allNullRow = false;
} else {
int32_t slotId = slotIds[i];
// add check for null value, caused by the modification of table schema (new column added).
if (slotId >= taosArrayGetSize(pRow)) {
p->ts = 0;
p->isNull = true;
colDataSetNULL(pColInfoData, numOfRows);
continue;
}
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
p->ts = pColVal->ts; if (!p->isNull) {
p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal); if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
allNullRow = p->isNull & allNullRow; varDataSetLen(p->buf, pColVal->colVal.value.nData);
if (!p->isNull) { memcpy(varDataVal(p->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData);
if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) { p->bytes = pColVal->colVal.value.nData + VARSTR_HEADER_SIZE; // binary needs to plus the header size
varDataSetLen(p->buf, pColVal->colVal.value.nData); } else {
memcpy(varDataVal(p->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData); memcpy(p->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes);
p->bytes = pColVal->colVal.value.nData + VARSTR_HEADER_SIZE; // binary needs to plus the header size p->bytes = pReader->pSchema->columns[slotId].bytes;
} else {
memcpy(p->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes);
p->bytes = pReader->pSchema->columns[slotId].bytes;
}
} }
} }
...@@ -74,36 +57,31 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p ...@@ -74,36 +57,31 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
pBlock->info.rows += allNullRow ? 0 : 1; pBlock->info.rows += allNullRow ? 0 : 1;
} else if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW)) { } else if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
for (int32_t i = 0; i < pReader->numOfCols; ++i) { for (int32_t i = 0; i < pReader->numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]);
if (slotIds[i] == -1) { int32_t slotId = slotIds[i];
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0); SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i);
colDataSetVal(pColInfoData, numOfRows, (const char*)&pColVal->ts, false); SColVal* pVal = &pColVal->colVal;
} else {
int32_t slotId = slotIds[i];
// add check for null value, caused by the modification of table schema (new column added).
if (slotId >= taosArrayGetSize(pRow)) {
colDataSetNULL(pColInfoData, numOfRows);
continue;
}
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
SColVal* pVal = &pColVal->colVal;
if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) { if (COL_VAL_IS_NONE(&pColVal->colVal)) {
if (!COL_VAL_IS_VALUE(&pColVal->colVal)) { continue;
colDataSetNULL(pColInfoData, numOfRows); }
} else { allNullRow = false;
varDataSetLen(pReader->transferBuf[slotId], pVal->value.nData); if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
memcpy(varDataVal(pReader->transferBuf[slotId]), pVal->value.pData, pVal->value.nData); if (!COL_VAL_IS_VALUE(&pColVal->colVal)) {
colDataSetVal(pColInfoData, numOfRows, pReader->transferBuf[slotId], false); colDataSetNULL(pColInfoData, numOfRows);
}
} else { } else {
colDataSetVal(pColInfoData, numOfRows, (const char*)&pVal->value.val, !COL_VAL_IS_VALUE(pVal)); varDataSetLen(pReader->transferBuf[slotId], pVal->value.nData);
memcpy(varDataVal(pReader->transferBuf[slotId]), pVal->value.pData, pVal->value.nData);
colDataSetVal(pColInfoData, numOfRows, pReader->transferBuf[slotId], false);
} }
} else {
colDataSetVal(pColInfoData, numOfRows, (const char*)&pVal->value.val, !COL_VAL_IS_VALUE(pVal));
} }
} }
pBlock->info.rows += 1; pBlock->info.rows += allNullRow ? 0 : 1;
} else { } else {
tsdbError("invalid retrieve type:%d, %s", pReader->type, idStr); tsdbError("invalid retrieve type:%d, %s", pReader->type, idStr);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
...@@ -143,7 +121,7 @@ static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* id ...@@ -143,7 +121,7 @@ static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* id
} }
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols, int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
uint64_t suid, void** pReader, const char* idstr) { SArray* pCidList, int32_t* pSlotIds, uint64_t suid, void** pReader, const char* idstr) {
*pReader = NULL; *pReader = NULL;
SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader)); SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader));
if (p == NULL) { if (p == NULL) {
...@@ -155,6 +133,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, ...@@ -155,6 +133,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
p->pTsdb = p->pVnode->pTsdb; p->pTsdb = p->pVnode->pTsdb;
p->verRange = (SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}; p->verRange = (SVersionRange){.minVer = 0, .maxVer = UINT64_MAX};
p->numOfCols = numOfCols; p->numOfCols = numOfCols;
p->pCidList = pCidList;
p->pSlotIds = pSlotIds;
p->suid = suid; p->suid = suid;
if (numOfTables == 0) { if (numOfTables == 0) {
...@@ -226,32 +206,9 @@ void* tsdbCacherowsReaderClose(void* pReader) { ...@@ -226,32 +206,9 @@ void* tsdbCacherowsReaderClose(void* pReader) {
return NULL; return NULL;
} }
static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint64_t uid, SArray** pRow,
LRUHandle** h) {
int32_t code = TSDB_CODE_SUCCESS;
*pRow = NULL;
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
code = tsdbCacheGetLastrowH(lruCache, uid, pr, h);
} else {
code = tsdbCacheGetLastH(lruCache, uid, pr, h);
}
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// no data in the table of Uid
if (*h != NULL) {
*pRow = (SArray*)taosLRUCacheValue(lruCache, *h);
}
return code;
}
static void freeItem(void* pItem) { static void freeItem(void* pItem) {
SLastCol* pCol = (SLastCol*)pItem; SLastCol* pCol = (SLastCol*)pItem;
if (IS_VAR_DATA_TYPE(pCol->colVal.type)) { if (IS_VAR_DATA_TYPE(pCol->colVal.type) && pCol->colVal.value.pData) {
taosMemoryFree(pCol->colVal.value.pData); taosMemoryFree(pCol->colVal.value.pData);
} }
} }
...@@ -277,19 +234,17 @@ static int32_t tsdbCacheQueryReseek(void* pQHandle) { ...@@ -277,19 +234,17 @@ static int32_t tsdbCacheQueryReseek(void* pQHandle) {
} }
} }
int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) { int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, const int32_t* dstSlotIds,
SArray* pTableUidList) {
if (pReader == NULL || pResBlock == NULL) { if (pReader == NULL || pResBlock == NULL) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
SCacheRowsReader* pr = pReader; SCacheRowsReader* pr = pReader;
int32_t code = TSDB_CODE_SUCCESS;
int32_t code = TSDB_CODE_SUCCESS; SArray* pRow = taosArrayInit(TARRAY_SIZE(pr->pCidList), sizeof(SLastCol));
SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache; bool hasRes = false;
LRUHandle* h = NULL; SArray* pLastCols = NULL;
SArray* pRow = NULL;
bool hasRes = false;
SArray* pLastCols = NULL;
void** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES); void** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES);
if (pRes == NULL) { if (pRes == NULL) {
...@@ -298,20 +253,22 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -298,20 +253,22 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
} }
for (int32_t j = 0; j < pr->numOfCols; ++j) { for (int32_t j = 0; j < pr->numOfCols; ++j) {
pRes[j] = taosMemoryCalloc( pRes[j] =
1, sizeof(SFirstLastRes) + pr->pSchema->columns[-1 == slotIds[j] ? 0 : slotIds[j]].bytes + VARSTR_HEADER_SIZE); taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[/*-1 == slotIds[j] ? 0 : */ slotIds[j]].bytes +
VARSTR_HEADER_SIZE);
SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]); SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]);
p->ts = INT64_MIN; p->ts = INT64_MIN;
} }
pLastCols = taosArrayInit(pr->pSchema->numOfCols, sizeof(SLastCol)); pLastCols = taosArrayInit(pr->numOfCols, sizeof(SLastCol));
if (pLastCols == NULL) { if (pLastCols == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _end; goto _end;
} }
for (int32_t i = 0; i < pr->pSchema->numOfCols; ++i) { for (int32_t i = 0; i < pr->numOfCols; ++i) {
struct STColumn* pCol = &pr->pSchema->columns[i]; int32_t slotId = slotIds[i];
struct STColumn* pCol = &pr->pSchema->columns[slotId];
SLastCol p = {.ts = INT64_MIN, .colVal.type = pCol->type, .colVal.flag = CV_FLAG_NULL}; SLastCol p = {.ts = INT64_MIN, .colVal.type = pCol->type, .colVal.flag = CV_FLAG_NULL};
if (IS_VAR_DATA_TYPE(pCol->type)) { if (IS_VAR_DATA_TYPE(pCol->type)) {
...@@ -328,6 +285,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -328,6 +285,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
pr->pDataFReader = NULL; pr->pDataFReader = NULL;
pr->pDataFReaderLast = NULL; pr->pDataFReaderLast = NULL;
int32_t ltype = (pr->type & CACHESCAN_RETRIEVE_LAST) >> 3;
// retrieve the only one last row of all tables in the uid list. // retrieve the only one last row of all tables in the uid list.
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) { if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
...@@ -335,16 +294,14 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -335,16 +294,14 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
for (int32_t i = 0; i < pr->numOfTables; ++i) { for (int32_t i = 0; i < pr->numOfTables; ++i) {
STableKeyInfo* pKeyInfo = &pr->pTableList[i]; STableKeyInfo* pKeyInfo = &pr->pTableList[i];
code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h); tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype);
if (code != TSDB_CODE_SUCCESS) { if (TARRAY_SIZE(pRow) <= 0) {
goto _end; taosArrayClearEx(pRow, freeItem);
}
if (h == NULL) {
continue; continue;
} }
if (taosArrayGetSize(pRow) <= 0) { SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
tsdbCacheRelease(lruCache, h); if (COL_VAL_IS_NONE(&pColVal->colVal)) {
taosArrayClearEx(pRow, freeItem);
continue; continue;
} }
...@@ -352,47 +309,34 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -352,47 +309,34 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
bool hasNotNullRow = true; bool hasNotNullRow = true;
int64_t singleTableLastTs = INT64_MAX; int64_t singleTableLastTs = INT64_MAX;
for (int32_t k = 0; k < pr->numOfCols; ++k) { for (int32_t k = 0; k < pr->numOfCols; ++k) {
int32_t slotId = slotIds[k]; SLastCol* p = taosArrayGet(pLastCols, k);
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, k);
if (slotId == -1) { // the primary timestamp
SLastCol* p = taosArrayGet(pLastCols, 0);
SLastCol* pCol = (SLastCol*)taosArrayGet(pRow, 0);
if (pCol->ts > p->ts) {
hasRes = true;
p->ts = pCol->ts;
p->colVal = pCol->colVal;
singleTableLastTs = pCol->ts;
}
} else {
SLastCol* p = taosArrayGet(pLastCols, slotId);
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
if (pColVal->ts > p->ts) {
if (!COL_VAL_IS_VALUE(&pColVal->colVal) && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
if (!COL_VAL_IS_VALUE(&p->colVal)) {
hasNotNullRow = false;
}
continue;
}
hasRes = true; if (pColVal->ts > p->ts) {
p->ts = pColVal->ts; if (!COL_VAL_IS_VALUE(&pColVal->colVal) && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
if (pColVal->ts < singleTableLastTs && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) { if (!COL_VAL_IS_VALUE(&p->colVal)) {
singleTableLastTs = pColVal->ts; hasNotNullRow = false;
} }
continue;
}
if (!IS_VAR_DATA_TYPE(pColVal->colVal.type)) { hasRes = true;
p->colVal = pColVal->colVal; p->ts = pColVal->ts;
} else { if (pColVal->ts < singleTableLastTs && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
if (COL_VAL_IS_VALUE(&pColVal->colVal)) { singleTableLastTs = pColVal->ts;
memcpy(p->colVal.value.pData, pColVal->colVal.value.pData, pColVal->colVal.value.nData); }
}
if (!IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
p->colVal.value.nData = pColVal->colVal.value.nData; p->colVal = pColVal->colVal;
p->colVal.type = pColVal->colVal.type; } else {
p->colVal.flag = pColVal->colVal.flag; if (COL_VAL_IS_VALUE(&pColVal->colVal)) {
p->colVal.cid = pColVal->colVal.cid; memcpy(p->colVal.value.pData, pColVal->colVal.value.pData, pColVal->colVal.value.nData);
} }
p->colVal.value.nData = pColVal->colVal.value.nData;
p->colVal.type = pColVal->colVal.type;
p->colVal.flag = pColVal->colVal.flag;
p->colVal.cid = pColVal->colVal.cid;
} }
} }
} }
...@@ -414,33 +358,31 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -414,33 +358,31 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
taosArraySet(pTableUidList, 0, &pKeyInfo->uid); taosArraySet(pTableUidList, 0, &pKeyInfo->uid);
} }
tsdbCacheRelease(lruCache, h); taosArrayClearEx(pRow, freeItem);
} }
if (hasRes) { if (hasRes) {
saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes, pr->idstr); saveOneRow(pLastCols, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr);
} }
} else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) { } else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) { for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
STableKeyInfo* pKeyInfo = &pr->pTableList[i]; STableKeyInfo* pKeyInfo = &pr->pTableList[i];
code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
if (h == NULL) { tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, pRow, pr, ltype);
if (TARRAY_SIZE(pRow) <= 0) {
taosArrayClearEx(pRow, freeItem);
continue; continue;
} }
if (taosArrayGetSize(pRow) <= 0) { SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
tsdbCacheRelease(lruCache, h); if (COL_VAL_IS_NONE(&pColVal->colVal)) {
taosArrayClearEx(pRow, freeItem);
continue; continue;
} }
saveOneRow(pRow, pResBlock, pr, slotIds, pRes, pr->idstr); saveOneRow(pRow, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr);
// TODO reset the pRes taosArrayClearEx(pRow, freeItem);
taosArrayPush(pTableUidList, &pKeyInfo->uid); taosArrayPush(pTableUidList, &pKeyInfo->uid);
tsdbCacheRelease(lruCache, h);
pr->tableIndex += 1; pr->tableIndex += 1;
if (pResBlock->info.rows >= pResBlock->info.capacity) { if (pResBlock->info.rows >= pResBlock->info.capacity) {
...@@ -466,6 +408,7 @@ _end: ...@@ -466,6 +408,7 @@ _end:
} }
taosMemoryFree(pRes); taosMemoryFree(pRes);
taosArrayDestroyEx(pRow, freeItem);
taosArrayDestroyEx(pLastCols, freeItem); taosArrayDestroyEx(pLastCols, freeItem);
return code; return code;
} }
...@@ -140,7 +140,6 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid ...@@ -140,7 +140,6 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
SMemTable *pMemTable = pTsdb->mem; SMemTable *pMemTable = pTsdb->mem;
STbData *pTbData = NULL; STbData *pTbData = NULL;
SVBufPool *pPool = pTsdb->pVnode->inUse; SVBufPool *pPool = pTsdb->pVnode->inUse;
TSDBKEY lastKey = {.version = version, .ts = eKey};
// check if table exists // check if table exists
SMetaInfo info; SMetaInfo info;
...@@ -181,7 +180,7 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid ...@@ -181,7 +180,7 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
pMemTable->nDel++; pMemTable->nDel++;
pMemTable->minVer = TMIN(pMemTable->minVer, version); pMemTable->minVer = TMIN(pMemTable->minVer, version);
pMemTable->maxVer = TMIN(pMemTable->maxVer, version); pMemTable->maxVer = TMIN(pMemTable->maxVer, version);
/*
if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) { if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) {
tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid, eKey); tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid, eKey);
} }
...@@ -189,6 +188,10 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid ...@@ -189,6 +188,10 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) { if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) {
tsdbCacheDeleteLast(pTsdb->lruCache, pTbData->uid, eKey); tsdbCacheDeleteLast(pTsdb->lruCache, pTbData->uid, eKey);
} }
*/
if (eKey >= pTbData->maxKey && sKey <= pTbData->maxKey) {
tsdbCacheDel(pTsdb, suid, uid, sKey, eKey);
}
tsdbTrace("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64 tsdbTrace("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
" at version %" PRId64, " at version %" PRId64,
...@@ -284,8 +287,8 @@ bool tsdbTbDataIterNext(STbDataIter *pIter) { ...@@ -284,8 +287,8 @@ bool tsdbTbDataIterNext(STbDataIter *pIter) {
int64_t tsdbCountTbDataRows(STbData *pTbData) { int64_t tsdbCountTbDataRows(STbData *pTbData) {
SMemSkipListNode *pNode = pTbData->sl.pHead; SMemSkipListNode *pNode = pTbData->sl.pHead;
int64_t rowsNum = 0; int64_t rowsNum = 0;
while (NULL != pNode) { while (NULL != pNode) {
pNode = SL_GET_NODE_FORWARD(pNode, 0); pNode = SL_GET_NODE_FORWARD(pNode, 0);
if (pNode == pTbData->sl.pTail) { if (pNode == pTbData->sl.pTail) {
...@@ -298,17 +301,17 @@ int64_t tsdbCountTbDataRows(STbData *pTbData) { ...@@ -298,17 +301,17 @@ int64_t tsdbCountTbDataRows(STbData *pTbData) {
return rowsNum; return rowsNum;
} }
void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj* pTableMap, int64_t *rowsNum) { void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj *pTableMap, int64_t *rowsNum) {
taosRLockLatch(&pMemTable->latch); taosRLockLatch(&pMemTable->latch);
for (int32_t i = 0; i < pMemTable->nBucket; ++i) { for (int32_t i = 0; i < pMemTable->nBucket; ++i) {
STbData *pTbData = pMemTable->aBucket[i]; STbData *pTbData = pMemTable->aBucket[i];
while (pTbData) { while (pTbData) {
void* p = taosHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid)); void *p = taosHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid));
if (p == NULL) { if (p == NULL) {
pTbData = pTbData->next; pTbData = pTbData->next;
continue; continue;
} }
*rowsNum += tsdbCountTbDataRows(pTbData); *rowsNum += tsdbCountTbDataRows(pTbData);
pTbData = pTbData->next; pTbData = pTbData->next;
} }
...@@ -668,15 +671,8 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, ...@@ -668,15 +671,8 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData,
if (key.ts >= pTbData->maxKey) { if (key.ts >= pTbData->maxKey) {
pTbData->maxKey = key.ts; pTbData->maxKey = key.ts;
if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config)) {
tsdbCacheInsertLastrow(pMemTable->pTsdb->lruCache, pMemTable->pTsdb, pTbData->uid, &lRow, true);
}
}
if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) {
tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, &lRow, pMemTable->pTsdb);
} }
tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow);
// SMemTable // SMemTable
pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey); pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
...@@ -736,15 +732,8 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, ...@@ -736,15 +732,8 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData,
if (key.ts >= pTbData->maxKey) { if (key.ts >= pTbData->maxKey) {
pTbData->maxKey = key.ts; pTbData->maxKey = key.ts;
if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config)) {
tsdbCacheInsertLastrow(pMemTable->pTsdb->lruCache, pMemTable->pTsdb, pTbData->uid, &lRow, true);
}
}
if (TSDB_CACHE_LAST(pMemTable->pTsdb->pVnode->config)) {
tsdbCacheInsertLast(pMemTable->pTsdb->lruCache, pTbData->uid, &lRow, pMemTable->pTsdb);
} }
tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow);
// SMemTable // SMemTable
pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey); pMemTable->minKey = TMIN(pMemTable->minKey, pTbData->minKey);
......
...@@ -144,8 +144,8 @@ _exit: ...@@ -144,8 +144,8 @@ _exit:
} }
int vnodeShouldCommit(SVnode *pVnode, bool atExit) { int vnodeShouldCommit(SVnode *pVnode, bool atExit) {
bool diskAvail = osDataSpaceAvailable(); bool diskAvail = osDataSpaceAvailable();
bool needCommit = false; bool needCommit = false;
taosThreadMutexLock(&pVnode->mutex); taosThreadMutexLock(&pVnode->mutex);
if (pVnode->inUse && diskAvail) { if (pVnode->inUse && diskAvail) {
...@@ -439,6 +439,9 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { ...@@ -439,6 +439,9 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
code = tsdbCommit(pVnode->pTsdb, pInfo); code = tsdbCommit(pVnode->pTsdb, pInfo);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbCacheCommit(pVnode->pTsdb);
TSDB_CHECK_CODE(code, lino, _exit);
if (VND_IS_RSMA(pVnode)) { if (VND_IS_RSMA(pVnode)) {
code = smaCommit(pVnode->pSma, pInfo); code = smaCommit(pVnode->pSma, pInfo);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
......
...@@ -31,18 +31,21 @@ typedef struct SCacheRowsScanInfo { ...@@ -31,18 +31,21 @@ typedef struct SCacheRowsScanInfo {
void* pLastrowReader; void* pLastrowReader;
SColMatchInfo matchInfo; SColMatchInfo matchInfo;
int32_t* pSlotIds; int32_t* pSlotIds;
int32_t* pDstSlotIds;
SExprSupp pseudoExprSup; SExprSupp pseudoExprSup;
int32_t retrieveType; int32_t retrieveType;
int32_t currentGroupIndex; int32_t currentGroupIndex;
SSDataBlock* pBufferredRes; SSDataBlock* pBufferredRes;
SArray* pUidList; SArray* pUidList;
SArray* pCidList;
int32_t indexOfBufferedRes; int32_t indexOfBufferedRes;
STableListInfo* pTableList; STableListInfo* pTableList;
} SCacheRowsScanInfo; } SCacheRowsScanInfo;
static SSDataBlock* doScanCache(SOperatorInfo* pOperator); static SSDataBlock* doScanCache(SOperatorInfo* pOperator);
static void destroyCacheScanOperator(void* param); static void destroyCacheScanOperator(void* param);
static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds); static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds,
int32_t** pDstSlotIds);
static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo); static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo);
#define SCAN_ROW_TYPE(_t) ((_t) ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW) #define SCAN_ROW_TYPE(_t) ((_t) ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW)
...@@ -71,9 +74,16 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe ...@@ -71,9 +74,16 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
goto _error; goto _error;
} }
SArray* pCidList = taosArrayInit(numOfCols, sizeof(int16_t));
for (int i = 0; i < TARRAY_SIZE(pInfo->matchInfo.pList); ++i) {
SColMatchItem* pColInfo = taosArrayGet(pInfo->matchInfo.pList, i);
taosArrayPush(pCidList, &pColInfo->colId);
}
pInfo->pCidList = pCidList;
removeRedundantTsCol(pScanNode, &pInfo->matchInfo); removeRedundantTsCol(pScanNode, &pInfo->matchInfo);
code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds); code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds, &pInfo->pDstSlotIds);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -91,8 +101,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe ...@@ -91,8 +101,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
uint64_t suid = tableListGetSuid(pTableListInfo); uint64_t suid = tableListGetSuid(pTableListInfo);
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables, code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables,
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, taosArrayGetSize(pInfo->matchInfo.pList), pCidList, pInfo->pSlotIds, suid,
pTaskInfo->id.str); &pInfo->pLastrowReader, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -160,8 +170,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ...@@ -160,8 +170,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
blockDataCleanup(pInfo->pBufferredRes); blockDataCleanup(pInfo->pBufferredRes);
taosArrayClear(pInfo->pUidList); taosArrayClear(pInfo->pUidList);
int32_t code = int32_t code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds,
tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds, pInfo->pUidList); pInfo->pDstSlotIds, pInfo->pUidList);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
...@@ -227,8 +237,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ...@@ -227,8 +237,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
} }
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num, code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, taosArrayGetSize(pInfo->matchInfo.pList), pInfo->pCidList, pInfo->pSlotIds, suid,
pTaskInfo->id.str); &pInfo->pLastrowReader, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pInfo->currentGroupIndex += 1; pInfo->currentGroupIndex += 1;
taosArrayClear(pInfo->pUidList); taosArrayClear(pInfo->pUidList);
...@@ -237,7 +247,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ...@@ -237,7 +247,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
taosArrayClear(pInfo->pUidList); taosArrayClear(pInfo->pUidList);
code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList); code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pDstSlotIds,
pInfo->pUidList);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
...@@ -280,6 +291,8 @@ void destroyCacheScanOperator(void* param) { ...@@ -280,6 +291,8 @@ void destroyCacheScanOperator(void* param) {
blockDataDestroy(pInfo->pRes); blockDataDestroy(pInfo->pRes);
blockDataDestroy(pInfo->pBufferredRes); blockDataDestroy(pInfo->pBufferredRes);
taosMemoryFree(pInfo->pSlotIds); taosMemoryFree(pInfo->pSlotIds);
taosMemoryFree(pInfo->pDstSlotIds);
taosArrayDestroy(pInfo->pCidList);
taosArrayDestroy(pInfo->pUidList); taosArrayDestroy(pInfo->pUidList);
taosArrayDestroy(pInfo->matchInfo.pList); taosArrayDestroy(pInfo->matchInfo.pList);
tableListDestroy(pInfo->pTableList); tableListDestroy(pInfo->pTableList);
...@@ -292,7 +305,8 @@ void destroyCacheScanOperator(void* param) { ...@@ -292,7 +305,8 @@ void destroyCacheScanOperator(void* param) {
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds) { int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds,
int32_t** pDstSlotIds) {
size_t numOfCols = taosArrayGetSize(pColMatchInfo); size_t numOfCols = taosArrayGetSize(pColMatchInfo);
*pSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t)); *pSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t));
...@@ -300,18 +314,25 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask ...@@ -300,18 +314,25 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
*pDstSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t));
if (*pDstSlotIds == NULL) {
taosMemoryFree(*pSlotIds);
return TSDB_CODE_OUT_OF_MEMORY;
}
SSchemaWrapper* pWrapper = pTaskInfo->schemaInfo.sw; SSchemaWrapper* pWrapper = pTaskInfo->schemaInfo.sw;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColMatchItem* pColMatch = taosArrayGet(pColMatchInfo, i); SColMatchItem* pColMatch = taosArrayGet(pColMatchInfo, i);
for (int32_t j = 0; j < pWrapper->nCols; ++j) { for (int32_t j = 0; j < pWrapper->nCols; ++j) {
if (pColMatch->colId == pWrapper->pSchema[j].colId && pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { /* if (pColMatch->colId == pWrapper->pSchema[j].colId && pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
(*pSlotIds)[pColMatch->dstSlotId] = -1; (*pSlotIds)[pColMatch->dstSlotId] = -1;
break; break;
} }*/
if (pColMatch->colId == pWrapper->pSchema[j].colId) { if (pColMatch->colId == pWrapper->pSchema[j].colId) {
(*pSlotIds)[pColMatch->dstSlotId] = j; (*pSlotIds)[i] = j;
(*pDstSlotIds)[i] = pColMatch->dstSlotId;
break; break;
} }
} }
......
...@@ -134,7 +134,7 @@ ...@@ -134,7 +134,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_database.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_database.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_telnet_line_taosc_insert.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_telnet_line_taosc_insert.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_json_taosc_insert.py #,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_json_taosc_insert.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_stmt_set_tbname_tag.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_stmt_set_tbname_tag.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_stable.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_stable.py
......
...@@ -68,7 +68,7 @@ docker run \ ...@@ -68,7 +68,7 @@ docker run \
-v ${REP_REAL_PATH}/community/contrib/libuv/:${REP_DIR}/community/contrib/libuv \ -v ${REP_REAL_PATH}/community/contrib/libuv/:${REP_DIR}/community/contrib/libuv \
-v ${REP_REAL_PATH}/community/contrib/lz4/:${REP_DIR}/community/contrib/lz4 \ -v ${REP_REAL_PATH}/community/contrib/lz4/:${REP_DIR}/community/contrib/lz4 \
-v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \ -v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \
--rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=true;make -j || exit 1" --rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=0;make -j || exit 1"
# -v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \ # -v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \
if [[ -d ${WORKDIR}/debugNoSan ]] ;then if [[ -d ${WORKDIR}/debugNoSan ]] ;then
...@@ -97,7 +97,7 @@ docker run \ ...@@ -97,7 +97,7 @@ docker run \
-v ${REP_REAL_PATH}/community/contrib/lz4/:${REP_DIR}/community/contrib/lz4 \ -v ${REP_REAL_PATH}/community/contrib/lz4/:${REP_DIR}/community/contrib/lz4 \
-v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \ -v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \
-v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \ -v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \
--rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DTOOLS_BUILD_TYPE=Debug -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=true;make -j || exit 1 " --rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DTOOLS_BUILD_TYPE=Debug -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=0;make -j || exit 1 "
mv ${REP_REAL_PATH}/debug ${WORKDIR}/debugSan mv ${REP_REAL_PATH}/debug ${WORKDIR}/debugSan
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册