diff --git a/.travis.yml b/.travis.yml index 74ed80da3fb92d5400c37569bc24c8c5d63f8ff6..5cff3bc72b612a62e50048a76453d3b6ac6c4dce 100644 --- a/.travis.yml +++ b/.travis.yml @@ -41,12 +41,29 @@ addons: branch_pattern: coverity_scan before_script: - - mkdir build - - cd build + - mkdir debug + - cd debug script: - cmake .. - - cmake --build . + - cmake --build . || exit $? + - |- + case $TRAVIS_OS_NAME in + linux) + cd ../tests/script + sudo ./test.sh 2>&1 | tee out.txt + cat out.txt + grep success out.txt + total_success=`grep success out.txt | wc -l` + echo "Total $total_success success" + grep failed out.txt + total_failed=`grep failed out.txt | wc -l` + echo "Total $total_failed failed" + if [ "$total_failed" -ne "0" ]; then + exit $total_failed + fi + ;; + esac # # Build Matrix @@ -58,6 +75,7 @@ matrix: packages: - build-essential - cmake + - net-tools # - os: osx # addons: diff --git a/CMakeLists.txt b/CMakeLists.txt index 7443f1a9110e68d57f85f6241a76bb9f7f473aea..5a9a42cc3627d18af7fd48064b35f5abd6b15601 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3,6 +3,8 @@ PROJECT(TDengine) SET(TD_CLUSTER FALSE) SET(TD_ACCOUNT FALSE) +SET(TD_VPEER FALSE) +SET(TD_MPEER FALSE) SET(TD_GRANT FALSE) SET(TD_COVER FALSE) SET(TD_PAGMODE_LITE FALSE) diff --git a/cmake/platform.inc b/cmake/platform.inc index 0d53f0cc43750dea1091875551163119e807616b..6087b6f16f61273dd1dcb69dd6aca207335a96a8 100755 --- a/cmake/platform.inc +++ b/cmake/platform.inc @@ -107,12 +107,12 @@ IF (TD_LINUX_64) SET(RELEASE_FLAGS "-O0") IF (NOT TD_ARM) IF (${CMAKE_CXX_COMPILER_ID} MATCHES "Clang") - SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -malign-double -g3 -gdwarf-2 -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE") + SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -Wno-missing-braces -fPIC -g3 -gdwarf-2 -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE") ELSE () - SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -malign-double -g3 -gdwarf-2 -malign-stringops -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE") + SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -malign-double -g3 -gdwarf-2 -malign-stringops -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE") ENDIF () ELSE () - SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -g -fsigned-char -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE") + SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -g -fsigned-char -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE") ENDIF () ADD_DEFINITIONS(-DLINUX) ADD_DEFINITIONS(-D_REENTRANT -D__USE_POSIX -D_LIBC_REENTRANT) @@ -128,7 +128,7 @@ IF (TD_LINUX_64) ENDIF () SET(DEBUG_FLAGS "-O0 -DDEBUG") SET(RELEASE_FLAGS "-O0") - SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -g -fsigned-char -munaligned-access -fpack-struct=8 -latomic -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE") + SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -g -fsigned-char -munaligned-access -fpack-struct=8 -latomic -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE") ADD_DEFINITIONS(-DLINUX) ADD_DEFINITIONS(-D_REENTRANT -D__USE_POSIX -D_LIBC_REENTRANT) ADD_DEFINITIONS(-DUSE_LIBICONV) @@ -141,7 +141,7 @@ IF (TD_LINUX_64) ELSEIF (TD_WINDOWS_64) SET(CMAKE_GENERATOR "NMake Makefiles" CACHE INTERNAL "" FORCE) IF (NOT TD_GODLL) - SET(COMMON_FLAGS "/nologo /WX- /Oi /Oy- /Gm- /EHsc /MT /GS /Gy /fp:precise /Zc:wchar_t /Zc:forScope /Gd /errorReport:prompt /analyze-") + SET(COMMON_FLAGS "/nologo /WX /Oi /Oy- /Gm- /EHsc /MT /GS /Gy /fp:precise /Zc:wchar_t /Zc:forScope /Gd /errorReport:prompt /analyze-") SET(DEBUG_FLAGS "/Zi /W3 /GL") SET(RELEASE_FLAGS "/W0 /GL") ENDIF () @@ -151,7 +151,7 @@ IF (TD_LINUX_64) ADD_DEFINITIONS(-DPTW32_BUILD) ADD_DEFINITIONS(-D_MBCS -D_CRT_SECURE_NO_DEPRECATE -D_CRT_NONSTDC_NO_DEPRECATE) ELSEIF (TD_DARWIN_64) - SET(COMMON_FLAGS "-std=gnu99 -Wall -fPIC -malign-double -g -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE") + SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -Wno-missing-braces -fPIC -g -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE") SET(DEBUG_FLAGS "-O0 -DDEBUG") SET(RELEASE_FLAGS "-O0") ADD_DEFINITIONS(-DDARWIN) @@ -159,4 +159,4 @@ IF (TD_LINUX_64) ELSE () MESSAGE(FATAL_ERROR "The current platform is not support yet, stop compile") EXIT () - ENDIF () \ No newline at end of file + ENDIF () diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 9657912716fe019b8d92abd4fc4232cf3a133301..dad32e58da1dcea4b6850f2aaeca50e538e1c14d 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -213,8 +213,6 @@ typedef struct SDataBlockList { int32_t idx; uint32_t nSize; uint32_t nAlloc; - char * userParam; /* user assigned parameters for async query */ - void * udfp; /* user defined function pointer, used in async model */ STableDataBlocks **pData; } SDataBlockList; @@ -451,7 +449,6 @@ void tscCloseTscObj(STscObj *pObj); void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen); -void tscProcessMultiVnodesInsert(SSqlObj *pSql); void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql); void tscKillMetricQuery(SSqlObj *pSql); void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 36cd42332b0d84b09798138080da33a856f26a21..62888234019cf2fe9e77fe476b731e6918499d7a 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -342,8 +342,8 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) { (*pSql->fp)(pSql->param, taosres, code); if (shouldFree) { - tscFreeSqlObj(pSql); tscTrace("%p Async sql is automatically freed in async res", pSql); + tscFreeSqlObj(pSql); } } diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 6f58d3bef328210f67e4aae4b47acd780fbc6cd3..518b3680b4a5c4013f7ab1acd82e02b6c741f911 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -941,6 +941,10 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) { sql = sToken.z; } code = tscGetTableMeta(pSql, pTableMetaInfo); + + if (pSql->asyncTblPos == NULL) { + assert(code == TSDB_CODE_ACTION_IN_PROGRESS); + } } int32_t len = cend - cstart + 1; @@ -1064,8 +1068,8 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) { /* - * For async insert, after get the metermeta from server, the sql string will not be - * parsed using the new metermeta to avoid the overhead cause by get metermeta data information. + * For async insert, after get the table meta from server, the sql string will not be + * parsed using the new table meta to avoid the overhead cause by get table meta data information. * And during the getMeterMetaCallback function, the sql string will be parsed from the * interrupted position. */ diff --git a/src/client/src/tscProfile.c b/src/client/src/tscProfile.c index 00c8d776190fc2cff077fd9be87e8ac966f4d5cd..739ae7848e6222ee650031649192593991299544 100644 --- a/src/client/src/tscProfile.c +++ b/src/client/src/tscProfile.c @@ -292,7 +292,6 @@ void tscKillConnection(STscObj *pObj) { pthread_mutex_unlock(&pObj->mutex); - taos_close(pObj); - tscTrace("connection:%p is killed", pObj); + taos_close(pObj); } diff --git a/src/client/src/tscSchemaUtil.c b/src/client/src/tscSchemaUtil.c index b487d5d6db98db9ce5522824ca33094ae17cfa21..90e1bd4c1da1b763937b226ffa048b5dae3548ca 100644 --- a/src/client/src/tscSchemaUtil.c +++ b/src/client/src/tscSchemaUtil.c @@ -210,7 +210,7 @@ char* tsGetTagsValue(STableMeta* pTableMeta) { } // todo refactor -static FORCE_INLINE char* skipSegments(char* input, char delim, int32_t num) { +__attribute__ ((unused))static FORCE_INLINE char* skipSegments(char* input, char delim, int32_t num) { for (int32_t i = 0; i < num; ++i) { while (*input != 0 && *input++ != delim) { }; @@ -218,7 +218,7 @@ static FORCE_INLINE char* skipSegments(char* input, char delim, int32_t num) { return input; } -static FORCE_INLINE size_t copy(char* dst, const char* src, char delimiter) { +__attribute__ ((unused)) static FORCE_INLINE size_t copy(char* dst, const char* src, char delimiter) { size_t len = 0; while (*src != delimiter && *src != 0) { *dst++ = *src++; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index a2af7ea16b21263b561f00d2bb6fd7bc8ccfcca2..5ea6941fffcc3403d89b21ed2bb97a6747edc8d9 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -343,8 +343,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { (*pSql->fp)(pSql->param, taosres, rpcMsg->code); if (shouldFree) { - tscFreeSqlObj(pSql); tscTrace("%p Async sql is automatically freed", pSql); + tscFreeSqlObj(pSql); } } @@ -1787,6 +1787,7 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) { size = tscEstimateHeartBeatMsgLength(pSql); if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) { + pthread_mutex_unlock(&pObj->mutex); tscError("%p failed to malloc for heartbeat msg", pSql); return -1; } @@ -1835,7 +1836,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { } for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) { - pMetaMsg->vpeerDesc[i].vnode = htonl(pMetaMsg->vpeerDesc[i].vnode); + pMetaMsg->vpeerDesc[i].vgId = htonl(pMetaMsg->vpeerDesc[i].vgId); } SSchema* pSchema = pMetaMsg->schema; @@ -2399,8 +2400,8 @@ int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) { pTableMetaInfo->pTableMeta = (STableMeta *)taosCacheAcquireByName(tscCacheHandle, pTableMetaInfo->name); if (pTableMetaInfo->pTableMeta != NULL) { STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); - tscTrace("%p retrieve tableMeta from cache, the number of columns:%d, numOfTags:%d", pSql, tinfo.numOfColumns, - tinfo.numOfTags); + tscTrace("%p retrieve table Meta from cache, the number of columns:%d, numOfTags:%d, %p", pSql, tinfo.numOfColumns, + tinfo.numOfTags, pTableMetaInfo->pTableMeta); return TSDB_CODE_SUCCESS; } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index f6c1fee633308260da4c8c9ce8c5e327c67338fb..199b4150e81eb52ebe4914b36d4ba9a64a51e44e 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -757,8 +757,8 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) { tscTrace("%p qhandle is null, abort free, fp:%p", pSql, pSql->fp); if (tscShouldFreeAsyncSqlObj(pSql)) { - tscFreeSqlObj(pSql); tscTrace("%p Async SqlObj is freed by app", pSql); + tscFreeSqlObj(pSql); } else { if (keepCmd) { tscFreeSqlResult(pSql); diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 4fadad5021b5841f482f365f9a6d07ea6777ad9b..f586db3d08d5decfa3458b789d6dd4eb856e84ef 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -582,10 +582,12 @@ void taos_close_stream(TAOS_STREAM *handle) { tscRemoveFromStreamList(pStream, pSql); taosTmrStopA(&(pStream->pTimer)); + + tscTrace("%p stream:%p is closed", pSql, pStream); + tscFreeSqlObj(pSql); pStream->pSql = NULL; - tscTrace("%p stream:%p is closed", pSql, pStream); tfree(pStream); } } diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index fd457a12dd26ac8bba28890db971b311f04b12a3..b7d01941d699067212a9037c7d9d7b4ee2ba667d 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -108,7 +108,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* if (pSql == NULL) { terrno = TSDB_CODE_CLI_OUT_OF_MEMORY; tscError("failed to allocate SSqlObj for subscription"); - goto failed; + goto _pSql_failed; } pSql->signature = pSql; @@ -137,13 +137,11 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* return pSub; failed: - if (sqlstr != NULL) { - free(sqlstr); - } - if (pSql != NULL) { - free(pSql); - } - free(pSub); + tfree(sqlstr); + +_pSql_failed: + tfree(pSql); + tfree(pSub); return NULL; } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index b93ec0b6d381520c8d693f78f72f39685e13d329..3660c822d717569abe7964751ab2df73fa728c6a 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1381,6 +1381,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR } else { // all data has been retrieved to client tscAllDataRetrievedFromDnode(trsupport, pSql); } + pthread_mutex_unlock(&trsupport->queryMutex); } static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) { @@ -1454,7 +1455,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { SSqlObj *pNew = tscCreateSqlObjForSubquery(pParentSql, trsupport, pSql); if (pNew == NULL) { tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vid:%d, orderOfSub:%d", - trsupport->pParentSqlObj, pSql, pSvd != NULL ? pSvd->vnode : -1, trsupport->subqueryIndex); + trsupport->pParentSqlObj, pSql, pSvd != NULL ? pSvd->vgId : -1, trsupport->subqueryIndex); pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY; trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; @@ -1470,7 +1471,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { if (pState->code != TSDB_CODE_SUCCESS) { // failed, abort if (vnodeInfo != NULL) { tscTrace("%p sub:%p query failed,ip:%u,vid:%d,orderOfSub:%d,global code:%d", pParentSql, pSql, - vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode, + vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vgId, trsupport->subqueryIndex, pState->code); } else { tscTrace("%p sub:%p query failed,orderOfSub:%d,global code:%d", pParentSql, pSql, @@ -1481,7 +1482,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { } else { // success, proceed to retrieve data from dnode if (vnodeInfo != NULL) { tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql, - vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode, + vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vgId, trsupport->subqueryIndex); } else { tscTrace("%p sub:%p query complete, orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql, diff --git a/src/common/src/name.c b/src/common/src/name.c index b0411d742c6c7a41d0c4b8ae3561c31aac2b2342..a605b98a1466ab99632d30db56e9f96081fda5e6 100644 --- a/src/common/src/name.c +++ b/src/common/src/name.c @@ -6,7 +6,7 @@ #include "ttokendef.h" // todo refactor -static FORCE_INLINE const char* skipSegments(const char* input, char delim, int32_t num) { +__attribute__((unused)) static FORCE_INLINE const char* skipSegments(const char* input, char delim, int32_t num) { for (int32_t i = 0; i < num; ++i) { while (*input != 0 && *input++ != delim) { }; @@ -14,7 +14,7 @@ static FORCE_INLINE const char* skipSegments(const char* input, char delim, int3 return input; } -static FORCE_INLINE size_t copy(char* dst, const char* src, char delimiter) { +__attribute__((unused)) static FORCE_INLINE size_t copy(char* dst, const char* src, char delimiter) { size_t len = 0; while (*src != delimiter && *src != 0) { *dst++ = *src++; diff --git a/src/dnode/CMakeLists.txt b/src/dnode/CMakeLists.txt index f0be3f9044c0b2d8278192ea512c265a77a12517..4bd89e238e7d1e08d9a0f4b223b72f5ed1dee369 100644 --- a/src/dnode/CMakeLists.txt +++ b/src/dnode/CMakeLists.txt @@ -6,6 +6,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) + INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/mnode/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/vnode/tsdb/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc) INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc) @@ -25,6 +26,10 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) IF (TD_CLUSTER) TARGET_LINK_LIBRARIES(taosd cluster) ENDIF () + + IF (TD_VPEER) + TARGET_LINK_LIBRARIES(taosd balance) + ENDIF () SET(PREPARE_ENV_CMD "prepare_env_cmd") SET(PREPARE_ENV_TARGET "prepare_env_target") diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index ee2143ff6d1d8c2ba2dddcb173506a27f4603530..e2de9bf5860b7e6d4f759c868eefb51a71018b5a 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -28,9 +28,11 @@ #include "dnodeRead.h" #include "dnodeShell.h" #include "dnodeWrite.h" +#include "mgmtGrant.h" static int32_t dnodeInitSystem(); static int32_t dnodeInitStorage(); +extern void grantParseParameter(); static void dnodeCleanupStorage(); static void dnodeCleanUpSystem(); static void dnodeSetRunStatus(SDnodeRunStatus status); @@ -77,7 +79,7 @@ int32_t main(int32_t argc, char *argv[]) { } /* Set termination handler. */ - struct sigaction act; + struct sigaction act = {0}; act.sa_flags = SA_SIGINFO; act.sa_sigaction = signal_handler; sigaction(SIGTERM, &act, NULL); diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 8e523eaf46701981a5469acfc19135a3fc4013d4..5f1e7a7a9431f4e6a16b7f61981a42aa36042fcf 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -64,7 +64,7 @@ int32_t dnodeInitMgmt() { dError("failed to init dnode timer"); return -1; } - + int32_t code = dnodeOpenVnodes(); if (code != TSDB_CODE_SUCCESS) { return -1; @@ -104,7 +104,7 @@ void dnodeMgmt(SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); } -static int dnodeGetVnodeList(int32_t vnodeList[]) { +static int32_t dnodeGetVnodeList(int32_t vnodeList[]) { DIR *dir = opendir(tsVnodeDir); if (dir == NULL) { return TSDB_CODE_NO_WRITE_ACCESS; @@ -129,47 +129,59 @@ static int dnodeGetVnodeList(int32_t vnodeList[]) { } static int32_t dnodeOpenVnodes() { - char vnodeDir[TSDB_FILENAME_LEN * 3]; - int failed = 0; + char vnodeDir[TSDB_FILENAME_LEN * 3]; + int32_t failed = 0; - int32_t *vnodeList = (int32_t *) malloc(sizeof(int32_t) * 10000); - int numOfVnodes = dnodeGetVnodeList(vnodeList); - - for (int i=0; ipCont; - pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); - pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions); - pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile); - pCreate->cfg.commitLog = pCreate->cfg.commitLog; - + pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); + pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions); + pCreate->cfg.cacheBlockSize = htonl(pCreate->cfg.cacheBlockSize); + pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile); + pCreate->cfg.daysToKeep1 = htonl(pCreate->cfg.daysToKeep1); + pCreate->cfg.daysToKeep2 = htonl(pCreate->cfg.daysToKeep2); + pCreate->cfg.daysToKeep = htonl(pCreate->cfg.daysToKeep); + pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime); + pCreate->cfg.rowsInFileBlock = htonl(pCreate->cfg.rowsInFileBlock); + pCreate->cfg.blocksPerTable = htons(pCreate->cfg.blocksPerTable); + pCreate->cfg.cacheNumOfBlocks.totalBlocks = htonl(pCreate->cfg.cacheNumOfBlocks.totalBlocks); + + for (int32_t j = 0; j < pCreate->cfg.replications; ++j) { + pCreate->vpeerDesc[j].vgId = htonl(pCreate->vpeerDesc[j].vgId); + pCreate->vpeerDesc[j].dnodeId = htonl(pCreate->vpeerDesc[j].dnodeId); + pCreate->vpeerDesc[j].ip = htonl(pCreate->vpeerDesc[j].ip); + } + return vnodeCreate(pCreate); } static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) { - SMDDropVnodeMsg *pDrop = rpcMsg->pCont; pDrop->vgId = htonl(pDrop->vgId); @@ -177,7 +189,6 @@ static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) { } static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) { - SMDCreateVnodeMsg *pCreate = rpcMsg->pCont; pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions); @@ -206,7 +217,7 @@ static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) { static void dnodeSendStatusMsg(void *handle, void *tmrId) { if (tsDnodeTmr == NULL) { - dError("dnode timer is already released"); + dError("dnode timer is already released"); return; } diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index 682aee4c0b3f613d920b29a8add684604b1b9347..d4365dae102cb6645cd38abe73a88a0ad7969a40 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -15,38 +15,25 @@ #define _DEFAULT_SOURCE #include "os.h" - #include "taoserror.h" #include "taosmsg.h" #include "tlog.h" #include "tqueue.h" #include "trpc.h" - #include "twal.h" #include "dnodeMgmt.h" #include "dnodeRead.h" -#include "queryExecutor.h" #include "vnode.h" typedef struct { - int32_t code; - int32_t count; - int32_t numOfVnodes; -} SRpcContext; - -typedef struct { - void *pCont; - int32_t contLen; - SRpcMsg rpcMsg; - SRpcContext *pRpcContext; // RPC message context + SRspRet rspRet; + void *pCont; + int32_t contLen; + SRpcMsg rpcMsg; } SReadMsg; static void *dnodeProcessReadQueue(void *param); -static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead); static void dnodeHandleIdleReadWorker(); -static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg); -static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg); -static void(*dnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(void *pVnode, SReadMsg *pNode); // module global variable static taos_qset readQset; @@ -55,14 +42,11 @@ static int32_t maxThreads; static int32_t minThreads; int32_t dnodeInitRead() { - dnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeProcessQueryMsg; - dnodeProcessReadMsgFp[TSDB_MSG_TYPE_RETRIEVE] = dnodeProcessRetrieveMsg; - readQset = taosOpenQset(); minThreads = 3; - maxThreads = tsNumOfCores*tsNumOfThreadsPerCore; - if (maxThreads <= minThreads*2) maxThreads = 2*minThreads; + maxThreads = tsNumOfCores * tsNumOfThreadsPerCore; + if (maxThreads <= minThreads * 2) maxThreads = 2 * minThreads; dPrint("dnode read is opened"); return 0; @@ -77,21 +61,21 @@ void dnodeRead(SRpcMsg *pMsg) { int32_t queuedMsgNum = 0; int32_t leftLen = pMsg->contLen; char *pCont = (char *) pMsg->pCont; - SRpcContext *pRpcContext = NULL; void *pVnode; dTrace("dnode %s msg incoming, thandle:%p", taosMsg[pMsg->msgType], pMsg->handle); - if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) { - queuedMsgNum = 0; - } - while (leftLen > 0) { SMsgHead *pHead = (SMsgHead *) pCont; pHead->vgId = htonl(pHead->vgId); pHead->contLen = htonl(pHead->contLen); - pVnode = vnodeGetVnode(pHead->vgId); + if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) { + pVnode = vnodeGetVnode(pHead->vgId); + } else { + pVnode = vnodeAccquireVnode(pHead->vgId); + } + if (pVnode == NULL) { leftLen -= pHead->contLen; pCont -= pHead->contLen; @@ -104,7 +88,6 @@ void dnodeRead(SRpcMsg *pMsg) { pRead->rpcMsg = *pMsg; pRead->pCont = pCont; pRead->contLen = pHead->contLen; - pRead->pRpcContext = pRpcContext; taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead); @@ -155,6 +138,34 @@ void dnodeFreeRqueue(void *rqueue) { // dynamically adjust the number of threads } +static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) { + SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); + pRead->rpcMsg = pMsg->rpcMsg; + pRead->pCont = qhandle; + pRead->contLen = 0; + pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; + + taos_queue queue = vnodeGetRqueue(pVnode); + taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead); +} + +void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) { + if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; + if (code == TSDB_CODE_ACTION_NEED_REPROCESSED) { + dnodeContinueExecuteQuery(pVnode, pRead->rspRet.qhandle, pRead); + } + + SRpcMsg rpcRsp = { + .handle = pRead->rpcMsg.handle, + .pCont = pRead->rspRet.rsp, + .contLen = pRead->rspRet.len, + .code = pRead->rspRet.code, + }; + + rpcSendResponse(&rpcRsp); + rpcFreeCont(pRead->rpcMsg.pCont); +} + static void *dnodeProcessReadQueue(void *param) { taos_qset qset = (taos_qset)param; SReadMsg *pReadMsg; @@ -167,13 +178,8 @@ static void *dnodeProcessReadQueue(void *param) { continue; } - terrno = 0; - if (dnodeProcessReadMsgFp[pReadMsg->rpcMsg.msgType]) { - (*dnodeProcessReadMsgFp[pReadMsg->rpcMsg.msgType]) (pVnode, pReadMsg); - } else { - terrno = TSDB_CODE_MSG_NOT_PROCESSED; - } - + int32_t code = vnodeProcessRead(pVnode, pReadMsg->rpcMsg.msgType, pReadMsg->pCont, pReadMsg->contLen, &pReadMsg->rspRet); + dnodeSendRpcReadRsp(pVnode, pReadMsg, code); taosFreeQitem(pReadMsg); } @@ -192,118 +198,3 @@ static void dnodeHandleIdleReadWorker() { } } -static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead) { - SRpcContext *pRpcContext = pRead->pRpcContext; - int32_t code = 0; - - if (pRpcContext) { - if (terrno) { - if (pRpcContext->code == 0) pRpcContext->code = terrno; - } - - int32_t count = atomic_add_fetch_32(&pRpcContext->count, 1); - if (count < pRpcContext->numOfVnodes) { - // not over yet, multiple vnodes - return; - } - - // over, result can be merged now - code = pRpcContext->code; - } else { - code = terrno; - } - - //TODO: query handle is returned by dnodeProcessQueryMsg - if (0) { - SRpcMsg rsp; - rsp.handle = pRead->rpcMsg.handle; - rsp.code = code; - rsp.pCont = NULL; - rpcSendResponse(&rsp); - } - - rpcFreeCont(pRead->rpcMsg.pCont); // free the received message -} - -static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) { - - SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); - pRead->rpcMsg = pMsg->rpcMsg; - pRead->pCont = qhandle; - pRead->contLen = 0; - pRead->pRpcContext = pMsg->pRpcContext; - pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; - - taos_queue queue = vnodeGetRqueue(pVnode); - taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead); -} - -static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg) { - SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pMsg->pCont; - - SQInfo* pQInfo = NULL; - if (pMsg->contLen != 0) { - void* tsdb = vnodeGetTsdb(pVnode); - int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo); - - SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); - pRsp->code = code; - pRsp->qhandle = htobe64((uint64_t) (pQInfo)); - - SRpcMsg rpcRsp = { - .handle = pMsg->rpcMsg.handle, - .pCont = pRsp, - .contLen = sizeof(SQueryTableRsp), - .code = code, - .msgType = 0 - }; - - rpcSendResponse(&rpcRsp); - dTrace("dnode query msg disposed, thandle:%p", pMsg->rpcMsg.handle); - vnodeRelease(pVnode); - } else { - pQInfo = pMsg->pCont; - } - - qTableQuery(pQInfo); // do execute query -} - -static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) { - SRetrieveTableMsg *pRetrieve = pMsg->pCont; - void *pQInfo = (void*) htobe64(pRetrieve->qhandle); - - dTrace("QInfo:%p vgId:%d, retrieve msg is received", pQInfo, pRetrieve->header.vgId); - int32_t contLen = 0; - - SRetrieveTableRsp *pRsp = NULL; - - int32_t code = qRetrieveQueryResultInfo(pQInfo); - if (code != TSDB_CODE_SUCCESS) { - contLen = sizeof(SRetrieveTableRsp); - - pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen); - memset(pRsp, 0, sizeof(SRetrieveTableRsp)); - } else { - // todo check code and handle error in build result set - code = qDumpRetrieveResult(pQInfo, &pRsp, &contLen); - - if (qHasMoreResultsToRetrieve(pQInfo)) { - dnodeContinueExecuteQuery(pVnode, pQInfo, pMsg); - } else { // no further execution invoked, release the ref to vnode - dnodeProcessReadResult(pVnode, pMsg); - //vnodeRelease(pVnode); - } - } - - SRpcMsg rpcRsp = (SRpcMsg) { - .handle = pMsg->rpcMsg.handle, - .pCont = pRsp, - .contLen = contLen, - .code = code, - .msgType = 0 - }; - - rpcSendResponse(&rpcRsp); - dTrace("dnode retrieve msg disposed, thandle:%p", pMsg->rpcMsg.handle); - vnodeRelease(pVnode); -} diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 951d43c34b88a2da659aa7429f37defb50837fb8..3681ef22c488a61825c97d6c6be29c079d9d3a73 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -37,7 +37,7 @@ int32_t dnodeInitShell() { dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeRead; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_RETRIEVE] = dnodeRead; - int numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore; + int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore; numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0); if (numOfThreads < 1) { numOfThreads = 1; diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index aee14ed48414aa68590e246f8563806cd1df6a20..2e0ec5f95b1f00ae5ba60b978e7b00136bbf8efd 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -52,7 +52,6 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker); SWriteWorkerPool wWorkerPool; int32_t dnodeInitWrite() { - wWorkerPool.max = tsNumOfCores; wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max); if (wWorkerPool.writeWorker == NULL) return -1; @@ -71,7 +70,7 @@ void dnodeCleanupWrite() { } void dnodeWrite(SRpcMsg *pMsg) { - char *pCont = (char *) pMsg->pCont; + char *pCont = (char *)pMsg->pCont; if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT || pMsg->msgType == TSDB_MSG_TYPE_MD_DROP_STABLE) { SMsgDesc *pDesc = (SMsgDesc *)pCont; @@ -80,16 +79,16 @@ void dnodeWrite(SRpcMsg *pMsg) { } SMsgHead *pHead = (SMsgHead *) pCont; - pHead->vgId = htonl(pHead->vgId); - pHead->contLen = htonl(pHead->contLen); + pHead->vgId = htonl(pHead->vgId); + pHead->contLen = htonl(pHead->contLen); taos_queue queue = vnodeGetWqueue(pHead->vgId); if (queue) { // put message into queue SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg)); - pWrite->rpcMsg = *pMsg; - pWrite->pCont = pCont; - pWrite->contLen = pHead->contLen; + pWrite->rpcMsg = *pMsg; + pWrite->pCont = pCont; + pWrite->contLen = pHead->contLen; taosWriteQitem(queue, TAOS_QTYPE_RPC, pWrite); } else { @@ -227,4 +226,3 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) { pthread_exit(NULL); } } - diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 2fbceb06dfccf5ebac4d7c461698248bbfa043ef..7c59a5a4b64653fdb4c4386059ada4c5574cf153 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -98,7 +98,6 @@ typedef struct { typedef struct { int32_t dnodeId; - int32_t vnode; uint32_t privateIp; uint32_t publicIp; } SVnodeGid; @@ -106,10 +105,10 @@ typedef struct { typedef struct { char tableId[TSDB_TABLE_ID_LEN + 1]; int8_t type; -} STableInfo; +} STableObj; typedef struct SSuperTableObj { - STableInfo info; + STableObj info; uint64_t uid; int64_t createdTime; int32_t sversion; @@ -124,7 +123,7 @@ typedef struct SSuperTableObj { } SSuperTableObj; typedef struct { - STableInfo info; + STableObj info; uint64_t uid; int64_t createdTime; int32_t sversion; //used by normal table @@ -255,7 +254,7 @@ typedef struct { SUserObj *pUser; SDbObj *pDb; SVgObj *pVgroup; - STableInfo *pTable; + STableObj *pTable; } SQueuedMsg; int32_t mgmtInitSystem(); diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 54899678ccccfc4b4ce4b3664d177a9f87ef89e0..0624af45c39e52c648ddfbc7576c0c87798c4c6c 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -47,6 +47,7 @@ static STaosError errors[] = { // rpc TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_IN_PROGRESS, 0, 1, "action in progress") +TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_NEED_REPROCESSED, 0, 3, "action need to be reprocessed") TAOS_DEFINE_ERROR(TSDB_CODE_MSG_NOT_PROCESSED, 0, 4, "message not processed") TAOS_DEFINE_ERROR(TSDB_CODE_ALREADY_PROCESSED, 0, 5, "message already processed") TAOS_DEFINE_ERROR(TSDB_CODE_REDIRECT, 0, 6, "redirect") @@ -156,28 +157,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QUERY_CANCELLED, 0, 110, "query cancelled TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_IE, 0, 111, "invalid ie") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VALUE, 0, 112, "invalid value") -// TAOS_DEFINE_ERROR(TSDB_CODE_SYNC_REQUIRED, 0, 99, "sync required") -// TAOS_DEFINE_ERROR(TSDB_CODE_UNSYNCED, 0, 100, "unsyned") -// TAOS_DEFINE_ERROR(TSDB_CODE_DATA_ALREADY_IMPORTED, 0, 75, "data already imported") -// TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_COMMIT_LOG, 0, 109, "invalid commit log") // commit log init failed -// TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VNODE_STATUS, 0, 116, "invalid vnode status") -// TAOS_DEFINE_ERROR(TSDB_CODE_TIMESTAMP_OUT_OF_RANGE, 0, 105, "timestamp out of range") -// TAOS_DEFINE_ERROR(TSDB_CODE_DUPLICATE_TAGS, 0, 112, "duplicate tags") // tags value for join not unique -// TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_SUBMIT_MSG, 0, 113, "invalid submit message") -// TAOS_DEFINE_ERROR(TSDB_CODE_FAILED_TO_LOCK_RESOURCES, 0, 117, "failed to lock resources") -// TAOS_DEFINE_ERROR(TSDB_CODE_FILE_BLOCK_TS_DISORDERED, 0, 108, "file block ts disordered") // time stamp in file block is disordered -// TAOS_DEFINE_ERROR(TSDB_CODE_BATCH_SIZE_TOO_BIG, 0, 104, "batch size too big") -// TAOS_DEFINE_ERROR(TSDB_CODE_WRONG_SCHEMA, 0, 53, "wrong schema") -// TAOS_DEFINE_ERROR(TSDB_CODE_NO_QSUMMARY, 0, 68, "no qsummery") -// TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_METER_ID, 0, 27, "invalid meter id") -// TAOS_DEFINE_ERROR(TSDB_CODE_METRICMETA_EXPIRED, 0, 63, "metricmeta expired") // local cached metric-meta expired causes error in metric query -// TAOS_DEFINE_ERROR(TSDB_CODE_SESSION_ALREADY_EXIST, 0, 67, "session already exist") -// TAOS_DEFINE_ERROR(TSDB_CODE_SESSION_NOT_READY, 0, 103, "session not ready") // table NOT in ready state -// TAOS_DEFINE_ERROR(TSDB_CODE_DATA_OVERFLOW, 0, 82, "data overflow") -// TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_TRANS_NOT_FINISHED, 0, 17, "action transaction not finished") -// TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_NOT_ONLINE, 0, 18, "action not online") -// TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_SEND_FAILD, 0, 19, "action send failed") -// TAOS_DEFINE_ERROR(TSDB_CODE_NOT_ACTIVE_SESSION, 0, 20, "not active session") +// others +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_FILE_FORMAT, 0, 120, "invalid file format") + #ifdef TAOS_ERROR_C }; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index d4fd452952a93a6151b6fe6e4888627135f2e180..3fae17170a5083d9f29afa390b5a954b1eb9121c 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -241,7 +241,8 @@ typedef struct SSchema { } SSchema; typedef struct { - int32_t vnode; // the index of vnode + int32_t vgId; + int32_t dnodeId; uint32_t ip; } SVnodeDesc; @@ -752,12 +753,12 @@ typedef struct { typedef struct { int32_t numOfQueries; - SQueryDesc qdesc[]; + SQueryDesc *qdesc; } SQqueryList; typedef struct { int32_t numOfStreams; - SStreamDesc sdesc[]; + SStreamDesc *sdesc; } SStreamList; typedef struct { diff --git a/src/inc/tsync.h b/src/inc/tsync.h new file mode 100644 index 0000000000000000000000000000000000000000..39c116c9cbaf49b943a02c1e61c64ad3b507412b --- /dev/null +++ b/src/inc/tsync.h @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_SYNC_H +#define TDENGINE_SYNC_H + +#ifdef __cplusplus +extern "C" { +#endif + +#define TAOS_SYNC_MAX_REPLICA 5 + +typedef enum _TAOS_SYNC_ROLE { + TAOS_SYNC_ROLE_OFFLINE, + TAOS_SYNC_ROLE_UNSYNCED, + TAOS_SYNC_ROLE_SLAVE, + TAOS_SYNC_ROLE_MASTER, +} ESyncRole; + +typedef enum _TAOS_SYNC_STATUS { + TAOS_SYNC_STATUS_INIT, + TAOS_SYNC_STATUS_START, + TAOS_SYNC_STATUS_FILE, + TAOS_SYNC_STATUS_CACHE, +} ESyncStatus; + +typedef struct { + uint32_t nodeId; // node ID assigned by TDengine + uint32_t nodeIp; // node IP address + char name[TSDB_FILENAME_LEN]; // external node name +} SNodeInfo; + +typedef struct { + uint32_t arbitratorIp; // arbitrator IP address + int8_t quorum; // number of confirms required, >=1 + int8_t replica; // number of replications, >=1 + SNodeInfo nodeInfo[TAOS_SYNC_MAX_REPLICA]; +} SSyncCfg; + +typedef struct { + int selfIndex; + uint32_t nodeId[TAOS_SYNC_MAX_REPLICA]; + int role[TAOS_SYNC_MAX_REPLICA]; +} SNodesRole; + +typedef struct { + char label[20]; // for debug purpose + char path[128]; // path to the file + int8_t replica; // number of replications, >=1 + int8_t quorum; // number of confirms required, >=1 + int32_t vgId; // vgroup ID + void *ahandle; // handle provided by APP + uint64_t version; // initial version + uint32_t arbitratorIp; + SNodeInfo nodeInfo[TAOS_SYNC_MAX_REPLICA]; + + // if name is null, get the file from index or after, used by master + // if name is provided, get the named file at the specified index, used by unsynced node + // it returns the file magic number and size, if file not there, magic shall be 0. + uint32_t (*getFileInfo)(char *name, int *index, int *size); + + // get the wal file from index or after + // return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file + int (*getWalInfo)(char *name, int *index); + + // when a forward pkt is received, call this to handle data + int (*writeToCache)(void *ahandle, SWalHead *, int type); + + // when forward is confirmed by peer, master call this API to notify app + void (*confirmForward)(void *ahandle, void *mhandle, int32_t code); + + // when role is changed, call this to notify app + void (*notifyRole)(void *ahandle, int8_t role); +} SSyncInfo; + +typedef void* tsync_h; + +tsync_h syncStart(SSyncInfo *); +void syncStop(tsync_h shandle); +int syncReconfig(tsync_h shandle, SSyncInfo *); +int syncForwardToPeer(tsync_h shandle, SWalHead *pHead, void *mhandle); +void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code); +void syncRecover(tsync_h shandle); // recover from other nodes: +int syncGetNodesRole(tsync_h shandle, SNodesRole *); + +extern char *syncRole[]; + +extern int tsMaxSyncNum; +extern int tsSyncTcpThreads; +extern int tsMaxWatchFiles; +extern short tsSyncPort; +extern int tsMaxFwdInfo; + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_SYNC_H diff --git a/src/inc/twal.h b/src/inc/twal.h index bac5f87215c45b834141bcc8fa1a1ece54e58ab8..3648f5ae2923bb3ca8962181e67c8bd4bdd9a266 100644 --- a/src/inc/twal.h +++ b/src/inc/twal.h @@ -33,6 +33,11 @@ typedef struct { char cont[]; } SWalHead; +typedef struct { + int8_t commitLog; // commitLog + int8_t wals; // number of WAL files; +} SWalCfg; + typedef void* twal_h; // WAL HANDLE twal_h walOpen(char *path, int max, int level); diff --git a/src/inc/vnode.h b/src/inc/vnode.h index b459c2c5625c0424f4df0094eb2a3bc8e97a6e7e..3097343a48a31d97f6792c14964214a2d4380563 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -22,7 +22,9 @@ extern "C" { typedef struct { int len; + int code; void *rsp; + void *qhandle; //used by query and retrieve msg } SRspRet; int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg); @@ -31,7 +33,8 @@ int32_t vnodeOpen(int32_t vgId, char *rootDir); int32_t vnodeClose(int32_t vgId); void vnodeRelease(void *pVnode); -void* vnodeGetVnode(int32_t vgId); +void* vnodeAccquireVnode(int32_t vgId); // add refcount +void* vnodeGetVnode(int32_t vgId); // keep refcount unchanged void* vnodeGetRqueue(void *); void* vnodeGetWqueue(int32_t vgId); @@ -41,6 +44,8 @@ void* vnodeGetTsdb(void *pVnode); int32_t vnodeProcessWrite(void *pVnode, int qtype, SWalHead *pHead, void *item); void vnodeBuildStatusMsg(void * param); +int32_t vnodeProcessRead(void *pVnode, int msgType, void *pCont, int32_t contLen, SRspRet *ret); + #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtBalance.h b/src/mnode/inc/mgmtBalance.h index 401074d1713b6ebe1ce95c3c882eaaf1499a85fe..05f2ed94a7f77a0c5314fe008106de62ff7b07c8 100644 --- a/src/mnode/inc/mgmtBalance.h +++ b/src/mnode/inc/mgmtBalance.h @@ -23,7 +23,7 @@ extern "C" { int32_t mgmtInitBalance(); void mgmtCleanupBalance(); -void mgmtStartBalanceTimer(int32_t afterMs) ; +void mgmtBalanceNotify() ; int32_t mgmtAllocVnodes(SVgObj *pVgroup); #ifdef __cplusplus diff --git a/src/mnode/inc/mgmtGrant.h b/src/mnode/inc/mgmtGrant.h index c9e018d9b519f355f098b4dcb352b08a2a439e69..92b20532c4da9de54e8e37110350ece85521f6e1 100644 --- a/src/mnode/inc/mgmtGrant.h +++ b/src/mnode/inc/mgmtGrant.h @@ -14,7 +14,7 @@ */ #ifndef TDENGINE_MGMT_GRANT_H -#define TDENGINE_MGMT_GTANT_H +#define TDENGINE_MGMT_GRANT_H #ifdef __cplusplus "C" { diff --git a/src/mnode/inc/mgmtTable.h b/src/mnode/inc/mgmtTable.h index ddbbfb4a70200ab560cb33db1a9a2d897c152993..4d3e0f6b4356b633fb2ef8547a669ea872ccbd87 100644 --- a/src/mnode/inc/mgmtTable.h +++ b/src/mnode/inc/mgmtTable.h @@ -27,7 +27,7 @@ extern "C" { int32_t mgmtInitTables(); void mgmtCleanUpTables(); -STableInfo* mgmtGetTable(char* tableId); +STableObj* mgmtGetTable(char* tableId); void mgmtIncTableRef(void *pTable); void mgmtDecTableRef(void *pTable); void mgmtDropAllChildTables(SDbObj *pDropDb); diff --git a/src/mnode/src/mgmtBalance.c b/src/mnode/src/mgmtBalance.c index 3a7ae8dcde4f9cfecf5367fd8faddf917921339a..e697d70d5808f98c87652089f08570bfcee9b7c8 100644 --- a/src/mnode/src/mgmtBalance.c +++ b/src/mnode/src/mgmtBalance.c @@ -18,11 +18,35 @@ #include "mgmtBalance.h" #include "mgmtDnode.h" -int32_t mgmtInitBalance() { return 0; } -void mgmtCleanupBalance() {} -void mgmtStartBalanceTimer(int32_t afterMs) {} +extern int32_t balanceInit(); +extern void balanceCleanUp(); +extern void balanceNotify(); +extern int32_t balanceAllocVnodes(SVgObj *pVgroup); + +int32_t mgmtInitBalance() { +#ifdef _VPEER + return balanceInit(); +#else + return 0; +#endif +} + +void mgmtCleanupBalance() { +#ifdef _VPEER + balanceCleanUp(); +#endif +} + +void mgmtBalanceNotify() { +#ifdef _VPEER + balanceNotify(); +#endif +} int32_t mgmtAllocVnodes(SVgObj *pVgroup) { +#ifdef _VPEER + return balanceAllocVnodes(pVgroup); +#else void * pNode = NULL; SDnodeObj *pDnode = NULL; SDnodeObj *pSelDnode = NULL; @@ -53,4 +77,5 @@ int32_t mgmtAllocVnodes(SVgObj *pVgroup) { mTrace("dnode:%d, alloc one vnode to vgroup, openVnodes:%d", pSelDnode->dnodeId, pSelDnode->openVnodes); return TSDB_CODE_SUCCESS; +#endif } diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index f7d8675977f997e09af1a55b7fa0e74d99632ec9..ada0bce2e99881a7b91c17ccdbdf00384cdc1cc5 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -41,7 +41,7 @@ static int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, voi extern int32_t clusterInit(); extern void clusterCleanUp(); extern int32_t clusterGetDnodesNum(); -extern void * clusterGetNextDnode(void *pNode, void **pDnode); +extern void * clusterGetNextDnode(void *pNode, SDnodeObj **pDnode); extern void clusterIncDnodeRef(SDnodeObj *pDnode); extern void clusterDecDnodeRef(SDnodeObj *pDnode); extern SDnodeObj* clusterGetDnode(int32_t dnodeId); @@ -251,7 +251,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { if (pDnode->status != TSDB_DN_STATUS_READY) { mTrace("dnode:%d, from offline to online", pDnode->dnodeId); pDnode->status = TSDB_DN_STATUS_READY; - mgmtStartBalanceTimer(200); + mgmtBalanceNotify(); } mgmtDecDnodeRef(pDnode); diff --git a/src/mnode/src/mgmtSdb.c b/src/mnode/src/mgmtSdb.c index 06cb9dca5cd68e56cd2dd8d16388b57792feb7f8..de68baf40fe4ab1b789c473c24009fb4a70e4e08 100644 --- a/src/mnode/src/mgmtSdb.c +++ b/src/mnode/src/mgmtSdb.c @@ -695,6 +695,7 @@ int32_t sdbUpdateRow(SSdbOperDesc *pOper) { int32_t total_size = sizeof(SRowHead) + pTable->maxRowSize + sizeof(TSCKSUM); SRowHead *rowHead = (SRowHead *)calloc(1, total_size); if (rowHead == NULL) { + pthread_mutex_unlock(&pTable->mutex); sdbError("table:%s, failed to allocate row head memory", pTable->tableName); return -1; } diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index cf20c47646f9c5e9d9a4779e830fd879e171a0f8..d6d7a6afc0cc649cee27ad45ef3483f816af7a61 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -486,8 +486,8 @@ static void *mgmtGetSuperTable(char *tableId) { return sdbGetRow(tsSuperTableSdb, tableId); } -STableInfo *mgmtGetTable(char *tableId) { - STableInfo *tableInfo = sdbGetRow(tsSuperTableSdb, tableId); +STableObj *mgmtGetTable(char *tableId) { + STableObj *tableInfo = sdbGetRow(tsSuperTableSdb, tableId); if (tableInfo != NULL) { return tableInfo; } @@ -501,7 +501,7 @@ STableInfo *mgmtGetTable(char *tableId) { } void mgmtIncTableRef(void *p1) { - STableInfo *pTable = (STableInfo *)p1; + STableObj *pTable = (STableObj *)p1; if (pTable->type == TSDB_SUPER_TABLE) { sdbIncRef(tsSuperTableSdb, pTable); } else { @@ -512,7 +512,7 @@ void mgmtIncTableRef(void *p1) { void mgmtDecTableRef(void *p1) { if (p1 == NULL) return; - STableInfo *pTable = (STableInfo *)p1; + STableObj *pTable = (STableObj *)p1; if (pTable->type == TSDB_SUPER_TABLE) { sdbDecRef(tsSuperTableSdb, pTable); } else { @@ -1302,7 +1302,7 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) { return; } - pMsg->pTable = (STableInfo *)mgmtDoCreateChildTable(pCreate, pVgroup, sid); + pMsg->pTable = (STableObj *)mgmtDoCreateChildTable(pCreate, pVgroup, sid); if (pMsg->pTable == NULL) { mgmtSendSimpleResp(pMsg->thandle, terrno); return; @@ -1512,7 +1512,8 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) { } else { pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].privateIp; } - pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); + pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId); + pMeta->vpeerDesc[i].dnodeId = htonl(pVgroup->vnodeGid[i].dnodeId); } pMeta->numOfVpeers = pVgroup->numOfVnodes; @@ -1640,7 +1641,7 @@ static SChildTableObj* mgmtGetTableByPos(uint32_t dnodeId, int32_t vnode, int32_ } SChildTableObj *pTable = pVgroup->tableList[sid]; - mgmtIncTableRef((STableInfo *)pTable); + mgmtIncTableRef((STableObj *)pTable); mgmtDecVgroupRef(pVgroup); return pTable; } diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 87f3872b0aaacba15c49a5e3678d94993518ad8a..8ca23dc98aee1b5d338cec8fe5cfb9c399c425c6 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -32,7 +32,7 @@ #include "mgmtVgroup.h" void *tsVgroupSdb = NULL; -static int32_t tsVgUpdateSize = 0; +int32_t tsVgUpdateSize = 0; static int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn); @@ -93,11 +93,12 @@ static int32_t mgmtVgroupActionInsert(SSdbOperDesc *pOper) { for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { SDnodeObj *pDnode = mgmtGetDnode(pVgroup->vnodeGid[i].dnodeId); - pVgroup->vnodeGid[i].privateIp = pDnode->privateIp; - pVgroup->vnodeGid[i].publicIp = pDnode->publicIp; - pVgroup->vnodeGid[i].vnode = pVgroup->vgId; - atomic_add_fetch_32(&pDnode->openVnodes, 1); - mgmtDecDnodeRef(pDnode); + if (pDnode != NULL) { + pVgroup->vnodeGid[i].privateIp = pDnode->privateIp; + pVgroup->vnodeGid[i].publicIp = pDnode->publicIp; + atomic_add_fetch_32(&pDnode->openVnodes, 1); + mgmtDecDnodeRef(pDnode); + } } mgmtAddVgroupIntoDb(pVgroup); @@ -236,7 +237,7 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb) { mPrint("vgroup:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { - mPrint("vgroup:%d, index:%d, dnode:%d vnode:%d", pVgroup->vgId, i, pVgroup->vnodeGid[i].dnodeId, pVgroup->vnodeGid[i].vnode); + mPrint("vgroup:%d, index:%d, dnode:%d", pVgroup->vgId, i, pVgroup->vnodeGid[i].dnodeId); } pMsg->ahandle = pVgroup; @@ -292,7 +293,7 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { int32_t maxReplica = 0; SVgObj *pVgroup = NULL; - STableInfo *pTable = NULL; + STableObj *pTable = NULL; if (pShow->payloadLen > 0 ) { pTable = mgmtGetTable(pShow->payload); if (NULL == pTable || pTable->type == TSDB_SUPER_TABLE) { @@ -312,27 +313,21 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { } for (int32_t i = 0; i < maxReplica; ++i) { - pShow->bytes[cols] = 16; - pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "ip"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - pShow->bytes[cols] = 2; pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; - strcpy(pSchema[cols].name, "vnode"); + strcpy(pSchema[cols].name, "dnode"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; - pShow->bytes[cols] = 9; + pShow->bytes[cols] = 16; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "vnode status"); + strcpy(pSchema[cols].name, "ip"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; - pShow->bytes[cols] = 16; + pShow->bytes[cols] = 9; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "public ip"); + strcpy(pSchema[cols].name, "vstatus"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; } @@ -416,13 +411,13 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo cols++; for (int32_t i = 0; i < maxReplica; ++i) { - tinet_ntoa(ipstr, pVgroup->vnodeGid[i].privateIp); pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - strcpy(pWrite, ipstr); + *(int16_t *) pWrite = pVgroup->vnodeGid[i].dnodeId; cols++; + tinet_ntoa(ipstr, pVgroup->vnodeGid[i].privateIp); pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int16_t *) pWrite = pVgroup->vnodeGid[i].vnode; + strcpy(pWrite, ipstr); cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; @@ -433,11 +428,6 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo strcpy(pWrite, "null"); } cols++; - - tinet_ntoa(ipstr, pVgroup->vnodeGid[i].publicIp); - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - strcpy(pWrite, ipstr); - cols++; } numOfRows++; @@ -490,15 +480,15 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) { pCfg->daysToKeep2 = htonl(pCfg->daysToKeep2); pCfg->daysToKeep = htonl(pCfg->daysToKeep); pCfg->commitTime = htonl(pCfg->commitTime); - pCfg->commitLog = pCfg->commitLog; - pCfg->blocksPerTable = htons(pCfg->blocksPerTable); - pCfg->replications = (char) pVgroup->numOfVnodes; pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock); - + pCfg->blocksPerTable = htons(pCfg->blocksPerTable); + pCfg->replications = (int8_t) pVgroup->numOfVnodes; + SVnodeDesc *vpeerDesc = pVnode->vpeerDesc; for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) { - vpeerDesc[j].vnode = htonl(pVgroup->vnodeGid[j].vnode); - vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].privateIp); + vpeerDesc[j].vgId = htonl(pVgroup->vgId); + vpeerDesc[j].dnodeId = htonl(pVgroup->vnodeGid[j].dnodeId); + vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].privateIp); } return pVnode; diff --git a/src/query/CMakeLists.txt b/src/query/CMakeLists.txt index a6462f98558d163b05d3b8b1cbc2cad23ae7cae4..1bbd06780482eb691f06297aed7e4c6ac6863f9a 100644 --- a/src/query/CMakeLists.txt +++ b/src/query/CMakeLists.txt @@ -14,4 +14,6 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) TARGET_LINK_LIBRARIES(query tsdb tutil m rt) ENDIF () -ADD_SUBDIRECTORY(tests) \ No newline at end of file +ADD_SUBDIRECTORY(tests) +SET_SOURCE_FILES_PROPERTIES(src/sql.c PROPERTIES COMPILE_FLAGS -w) + diff --git a/src/query/inc/queryExecutor.h b/src/query/inc/queryExecutor.h index 5adce04efa44d7e85bc9ccf482dd21e0deff20b5..8c8c1a3a12f2392441abdd4ca673b45641cf1e22 100644 --- a/src/query/inc/queryExecutor.h +++ b/src/query/inc/queryExecutor.h @@ -198,6 +198,12 @@ typedef struct SQInfo { */ int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, SQInfo** pQInfo); +/** + * destroy the query info struct + * @param pQInfo + */ +void qDestroyQueryInfo(SQInfo* pQInfo); + /** * query on single table * @param pReadMsg diff --git a/src/query/inc/tlosertree.h b/src/query/inc/tlosertree.h index fb64fd2ee43528192f4eff0c0f8e9fbd526a6354..197d27a76126097f23cfc47a227957301f5b2c5c 100644 --- a/src/query/inc/tlosertree.h +++ b/src/query/inc/tlosertree.h @@ -38,7 +38,7 @@ typedef struct SLoserTreeInfo { SLoserTreeNode *pNode; } SLoserTreeInfo; -uint8_t tLoserTreeCreate(SLoserTreeInfo **pTree, int32_t numOfEntries, void *param, __merge_compare_fn_t compareFn); +uint32_t tLoserTreeCreate(SLoserTreeInfo **pTree, int32_t numOfEntries, void *param, __merge_compare_fn_t compareFn); void tLoserTreeInit(SLoserTreeInfo *pTree); diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index 4c4014996e9a9dda4356b8d0aa1e593a8d70e702..f0aa13ee3d2de60c5eb15126d3c6fff8e1aa3c73 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -1566,6 +1566,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { } destroyResultBuf(pRuntimeEnv->pResultBuf); + tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf); } @@ -2238,27 +2239,6 @@ char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWi pQuery->pSelectExpr[columnIndex].resBytes * realRowId; } -void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) { - if (pQInfo == NULL) { - return; - } - - SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - - teardownQueryRuntimeEnv(&pQInfo->runtimeEnv); - - // tSidSetDestroy(&pQInfo->pSidSet); - - if (pQInfo->pTableDataInfo != NULL) { - // size_t num = taosHashGetSize(pQInfo->pTableIdList); - for (int32_t j = 0; j < 0; ++j) { - destroyMeterQueryInfo(pQInfo->pTableDataInfo[j].pTableQInfo, pQuery->numOfOutputCols); - } - } - - tfree(pQInfo->pTableDataInfo); -} - int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) || (!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) { @@ -2266,14 +2246,10 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { pQuery->window.ekey, pQuery->order.order); sem_post(&pQInfo->dataReady); - // pQInfo->over = 1; - return TSDB_CODE_SUCCESS; } pQuery->status = 0; - - pQuery->rec = (SResultRec){0}; pQuery->rec = (SResultRec){0}; changeExecuteScanOrder(pQuery, true); @@ -4202,8 +4178,9 @@ int32_t doInitializeQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTable } pRuntimeEnv->pQueryHandle = tsdbQueryByTableId(tsdb, &cond, pQInfo->pTableIdList, cols); + taosArrayDestroy(cols); + pRuntimeEnv->pQuery = pQuery; - pRuntimeEnv->pTSBuf = param; pRuntimeEnv->cur.vnodeIndex = -1; if (param != NULL) { @@ -5169,7 +5146,8 @@ static void singleTableQueryImpl(SQInfo* pQInfo) { if (isQueryKilled(pQInfo)) { dTrace("QInfo:%p query is killed", pQInfo); } else { - dTrace("QInfo:%p query task completed, %d points are returned", pQInfo, pQuery->rec.size); + dTrace("QInfo:%p query task completed, %" PRId64 " rows will returned, total:%" PRId64 " rows", pQInfo, pQuery->rec.size, + pQuery->rec.total); } sem_post(&pQInfo->dataReady); @@ -5444,13 +5422,11 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, *tagCond = calloc(1, pQueryMsg->tagCondLen * TSDB_NCHAR_SIZE); memcpy(*tagCond, pMsg, pQueryMsg->tagCondLen * TSDB_NCHAR_SIZE); } - - dTrace("qmsg:%p query on %d meter(s), qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, numOfTagCols:%d, " - "timestamp order:%d, tags order:%d, tags order col:%d, numOfOutputCols:%d, numOfCols:%d, interval:%" PRId64 - ", fillType:%d, comptslen:%d, limit:%" PRId64 ", offset:%" PRId64, + + dTrace("qmsg:%p query on %d table(s), qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, ts order:%d, " + "outputCols:%d, numOfCols:%d, interval:%d" PRId64 ", fillType:%d, comptsLen:%d, limit:%" PRId64 ", offset:%" PRId64, pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->window.skey, pQueryMsg->window.ekey, - pQueryMsg->numOfGroupCols, pQueryMsg->order, pQueryMsg->orderType, - pQueryMsg->orderByIdx, pQueryMsg->numOfOutputCols, + pQueryMsg->numOfGroupCols, pQueryMsg->order, pQueryMsg->numOfOutputCols, pQueryMsg->numOfCols, pQueryMsg->intervalTime, pQueryMsg->interpoType, pQueryMsg->tsLen, pQueryMsg->limit, pQueryMsg->offset); @@ -5731,7 +5707,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou SArray *pTableIdList) { SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo)); if (pQInfo == NULL) { - goto _clean_memory; + goto _clean_pQInfo_memory; } SQuery *pQuery = calloc(1, sizeof(SQuery)); @@ -5841,7 +5817,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou vnodeParametersSafetyCheck(pQuery); - dTrace("qmsg:%p create QInfo:%p, QInfo created", pQueryMsg, pQInfo); + dTrace("qmsg:%p QInfo:%p created", pQueryMsg, pQInfo); return pQInfo; _clean_memory: @@ -5860,6 +5836,7 @@ _clean_memory: tfree(pExprs); tfree(pGroupbyExpr); +_clean_pQInfo_memory: tfree(pQInfo); return NULL; @@ -5927,7 +5904,7 @@ static void freeQInfo(SQInfo *pQInfo) { SQuery* pQuery = pQInfo->runtimeEnv.pQuery; setQueryKilled(pQInfo); - dTrace("QInfo:%p start to free SQInfo", pQInfo); + dTrace("QInfo:%p start to free QInfo", pQInfo); for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { tfree(pQuery->sdata[col]); } @@ -5941,7 +5918,16 @@ static void freeQInfo(SQInfo *pQInfo) { // } sem_destroy(&(pQInfo->dataReady)); - vnodeQueryFreeQInfoEx(pQInfo); + teardownQueryRuntimeEnv(&pQInfo->runtimeEnv); + + if (pQInfo->pTableDataInfo != NULL) { + // size_t num = taosHashGetSize(pQInfo->pTableIdList); + for (int32_t j = 0; j < 0; ++j) { + destroyMeterQueryInfo(pQInfo->pTableDataInfo[j].pTableQInfo, pQuery->numOfOutputCols); + } + } + + tfree(pQInfo->pTableDataInfo); for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) { SSingleColumnFilterInfo *pColFilter = &pQuery->pFilterInfo[i]; @@ -5974,6 +5960,8 @@ static void freeQInfo(SQInfo *pQInfo) { tfree(pQuery->pGroupbyExpr); tfree(pQuery); + taosArrayDestroy(pQInfo->pTableIdList); + dTrace("QInfo:%p QInfo is freed", pQInfo); // destroy signature, in order to avoid the query process pass the object safety check @@ -6120,6 +6108,11 @@ _query_over: return TSDB_CODE_SUCCESS; } +void qDestroyQueryInfo(SQInfo* pQInfo) { + dTrace("QInfo:%p query completed", pQInfo); + freeQInfo(pQInfo); +} + void qTableQuery(SQInfo *pQInfo) { if (pQInfo == NULL || pQInfo->signature != pQInfo) { dTrace("%p freed abort query", pQInfo); @@ -6133,6 +6126,9 @@ void qTableQuery(SQInfo *pQInfo) { dTrace("QInfo:%p query task is launched", pQInfo); +// sem_post(&pQInfo->dataReady); +// pQInfo->runtimeEnv.pQuery->status = QUERY_OVER; + int32_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList); if (numOfTables == 1) { singleTableQueryImpl(pQInfo); @@ -6151,18 +6147,14 @@ int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo) { SQuery *pQuery = pQInfo->runtimeEnv.pQuery; if (isQueryKilled(pQInfo)) { dTrace("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code); - if (pQInfo->code == TSDB_CODE_SUCCESS) { - return TSDB_CODE_QUERY_CANCELLED; - } else { // in case of not TSDB_CODE_SUCCESS, return the code to client - return (pQInfo->code >= 0)? pQInfo->code:(-pQInfo->code); - } + return pQInfo->code; } sem_wait(&pQInfo->dataReady); dTrace("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%d", pQInfo, pQuery->rowSize, pQuery->rec.size, pQInfo->code); - return (pQInfo->code >= 0)? pQInfo->code:(-pQInfo->code); + return pQInfo->code; } bool qHasMoreResultsToRetrieve(SQInfo* pQInfo) { @@ -6207,12 +6199,12 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c if (pQuery->rec.size > 0 && code == TSDB_CODE_SUCCESS) { code = doDumpQueryResult(pQInfo, (*pRsp)->data); } else { + setQueryStatus(pQuery, QUERY_OVER); code = pQInfo->code; } if (isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) { (*pRsp)->completed = 1; // notify no more result to client - freeQInfo(pQInfo); } return code; @@ -6222,4 +6214,4 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c // vnodeDecRefCount(pObj->qhandle); // pObj->qhandle = NULL; // } -} \ No newline at end of file +} diff --git a/src/query/src/tlosertree.c b/src/query/src/tlosertree.c index 4fe68970b9504d81cdbc779425c9439d32ff597c..e6e45ed8d0ea3d56a90eea8ef90057146790374a 100644 --- a/src/query/src/tlosertree.c +++ b/src/query/src/tlosertree.c @@ -40,7 +40,7 @@ void tLoserTreeDisplay(SLoserTreeInfo* pTree) { printf("\n"); } -uint8_t tLoserTreeCreate(SLoserTreeInfo** pTree, int32_t numOfEntries, void* param, __merge_compare_fn_t compareFn) { +uint32_t tLoserTreeCreate(SLoserTreeInfo** pTree, int32_t numOfEntries, void* param, __merge_compare_fn_t compareFn) { int32_t totalEntries = numOfEntries << 1; *pTree = (SLoserTreeInfo*)calloc(1, sizeof(SLoserTreeInfo) + sizeof(SLoserTreeNode) * totalEntries); diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 6f86c2dd7cf78d5cf3ff54b6daf9c360cb6d5137..52d05dc626f59804cfa8926164d7143e2d6bd6a8 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -331,6 +331,7 @@ void *rpcReallocCont(void *ptr, int contLen) { char *start = ((char *)ptr) - sizeof(SRpcReqContext) - sizeof(SRpcHead); if (contLen == 0 ) { free(start); + return NULL; } int size = contLen + RPC_MSG_OVERHEAD; diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 8bfec97032478251b0c83078c1559ecdb6a27e4d..47b358c052eada4c11f10f821cffb5ee482e491e 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -472,7 +472,7 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) { } int32_t ref = T_REF_INC(ptNode); - pTrace("%p add data ref in cache, refcnt:%d", ptNode, ref) + pTrace("%p acquired by data in cache, refcnt:%d", ptNode, ref) // the data if referenced by at least one object, so the reference count must be greater than the value of 2. assert(ref >= 2); @@ -516,7 +516,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { *data = NULL; int16_t ref = T_REF_DEC(pNode); - pTrace("%p is released, refcnt:%d", pNode, ref); + pTrace("%p data released, refcnt:%d", pNode, ref); if (_remove) { __cache_wr_lock(pCacheObj); diff --git a/src/util/src/tidpool.c b/src/util/src/tidpool.c index 5ed6b8cf232eb1fb774af75a9738f6a403cda93c..04ae1542676af9908617796e51faa2f8bf38e36a 100644 --- a/src/util/src/tidpool.c +++ b/src/util/src/tidpool.c @@ -126,7 +126,7 @@ int taosUpdateIdPool(id_pool_t *handle, int maxId) { return -1; } - int *idList = calloc(maxId, sizeof(bool)); + bool *idList = calloc(maxId, sizeof(bool)); if (idList == NULL) { return -1; } @@ -137,7 +137,7 @@ int taosUpdateIdPool(id_pool_t *handle, int maxId) { pIdPool->numOfFree += (maxId - pIdPool->maxId); pIdPool->maxId = maxId; - int *oldIdList = pIdPool->freeList; + bool *oldIdList = pIdPool->freeList; pIdPool->freeList = idList; free(oldIdList); diff --git a/src/util/src/tlog.c b/src/util/src/tlog.c index 21818e572f3fc49a2841c2f494362e2a7103f9f0..52aa07d968dd66af6267fe35fb9fa101fbba3621 100644 --- a/src/util/src/tlog.c +++ b/src/util/src/tlog.c @@ -347,7 +347,7 @@ void tprintf(const char *const flags, int dflag, const char *const format, ...) va_start(argpointer, format); int writeLen = vsnprintf(buffer + len, MAX_LOGLINE_CONTENT_SIZE, format, argpointer); if (writeLen <= 0) { - char tmp[MAX_LOGLINE_DUMP_BUFFER_SIZE]; + char tmp[MAX_LOGLINE_DUMP_BUFFER_SIZE] = {0}; writeLen = vsnprintf(tmp, MAX_LOGLINE_DUMP_CONTENT_SIZE, format, argpointer); strncpy(buffer + len, tmp, MAX_LOGLINE_CONTENT_SIZE); len += MAX_LOGLINE_CONTENT_SIZE; diff --git a/src/util/src/tnote.c b/src/util/src/tnote.c index 7a133590d2a450d8e8b688bc63515c0ad9e81912..9e0f6597aee26c0271f355c0e82cefabd565f50f 100644 --- a/src/util/src/tnote.c +++ b/src/util/src/tnote.c @@ -77,7 +77,7 @@ void taosUnLockNote(int fd, taosNoteInfo * pNote) void *taosThreadToOpenNewNote(void *param) { - char name[NOTE_FILE_NAME_LEN]; + char name[NOTE_FILE_NAME_LEN * 2]; taosNoteInfo * pNote = (taosNoteInfo *)param; pNote->taosNoteFlag ^= 1; @@ -170,7 +170,7 @@ void taosGetNoteName(char *fn, taosNoteInfo * pNote) int taosOpenNoteWithMaxLines(char *fn, int maxLines, int maxNoteNum, taosNoteInfo * pNote) { - char name[NOTE_FILE_NAME_LEN] = "\0"; + char name[NOTE_FILE_NAME_LEN * 2] = "\0"; struct stat notestat0, notestat1; int size; diff --git a/src/util/src/tskiplist.c b/src/util/src/tskiplist.c index 3af7bfd2f2ae3623bce8bedcf5add573639f88f9..0ef99b724488ad32958ce5cbcc86b3df90dc9336 100644 --- a/src/util/src/tskiplist.c +++ b/src/util/src/tskiplist.c @@ -18,7 +18,7 @@ #include "tskiplist.h" #include "tutil.h" -static FORCE_INLINE void recordNodeEachLevel(SSkipList *pSkipList, int32_t level) { // record link count in each level +__attribute__ ((unused)) static FORCE_INLINE void recordNodeEachLevel(SSkipList *pSkipList, int32_t level) { // record link count in each level #if SKIP_LIST_RECORD_PERFORMANCE for (int32_t i = 0; i < level; ++i) { pSkipList->state.nLevelNodeCnt[i]++; @@ -26,7 +26,7 @@ static FORCE_INLINE void recordNodeEachLevel(SSkipList *pSkipList, int32_t level #endif } -static FORCE_INLINE void removeNodeEachLevel(SSkipList *pSkipList, int32_t level) { +__attribute__ ((unused)) static FORCE_INLINE void removeNodeEachLevel(SSkipList *pSkipList, int32_t level) { #if SKIP_LIST_RECORD_PERFORMANCE for (int32_t i = 0; i < level; ++i) { pSkipList->state.nLevelNodeCnt[i]--; diff --git a/src/vnode/main/inc/vnodeInt.h b/src/vnode/main/inc/vnodeInt.h index 5ee1d5c18b658ebb333ed24436eaaa5738abdf84..4d078869c441e6a827fb0270af6af5cc3d07c42c 100644 --- a/src/vnode/main/inc/vnodeInt.h +++ b/src/vnode/main/inc/vnodeInt.h @@ -20,6 +20,9 @@ extern "C" { #endif +#include "tsync.h" +#include "twal.h" + typedef enum _VN_STATUS { VN_STATUS_INIT, VN_STATUS_CREATING, @@ -41,10 +44,14 @@ typedef struct { void *sync; void *events; void *cq; // continuous query + STsdbCfg tsdbCfg; + SSyncCfg syncCfg; + SWalCfg walCfg; } SVnodeObj; int vnodeWriteToQueue(void *param, SWalHead *pHead, int type); void vnodeInitWriteFp(void); +void vnodeInitReadFp(void); #ifdef __cplusplus } diff --git a/src/vnode/main/src/vnodeMain.c b/src/vnode/main/src/vnodeMain.c index ea1859729e55ab4ca35e0aa86461164afa7feb51..13210e496f0ac8d8490a6b83c86742e0fb505b53 100644 --- a/src/vnode/main/src/vnodeMain.c +++ b/src/vnode/main/src/vnodeMain.c @@ -29,19 +29,21 @@ #include "vnode.h" #include "vnodeInt.h" -static void *tsDnodeVnodesHash; -static void vnodeCleanUp(SVnodeObj *pVnode); -static void vnodeBuildVloadMsg(char *pNode, void * param); -static int vnodeWALCallback(void *arg); - -static int tsOpennedVnodes; +static void *tsDnodeVnodesHash; +static void vnodeCleanUp(SVnodeObj *pVnode); +static void vnodeBuildVloadMsg(char *pNode, void * param); +static int vnodeWALCallback(void *arg); +static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg); +static int32_t vnodeReadCfg(SVnodeObj *pVnode); + +static int32_t tsOpennedVnodes; static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT; static void vnodeInit() { - vnodeInitWriteFp(); + vnodeInitReadFp(); - tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj), taosHashInt); + tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj *), taosHashInt); if (tsDnodeVnodesHash == NULL) { dError("failed to init vnode list"); } @@ -51,22 +53,12 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { int32_t code; pthread_once(&vnodeModuleInit, vnodeInit); - SVnodeObj *pTemp = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pVnodeCfg->cfg.vgId); + SVnodeObj *pTemp = (SVnodeObj *)taosGetIntHashData(tsDnodeVnodesHash, pVnodeCfg->cfg.vgId); if (pTemp != NULL) { dPrint("vgId:%d, vnode already exist, pVnode:%p", pVnodeCfg->cfg.vgId, pTemp); return TSDB_CODE_SUCCESS; - } - - STsdbCfg tsdbCfg = {0}; - tsdbCfg.precision = pVnodeCfg->cfg.precision; - tsdbCfg.tsdbId = pVnodeCfg->cfg.vgId; - tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions; - tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile; - tsdbCfg.minRowsPerFileBlock = -1; - tsdbCfg.maxRowsPerFileBlock = -1; - tsdbCfg.keep = -1; - tsdbCfg.maxCacheSize = -1; + } char rootDir[TSDB_FILENAME_LEN] = {0}; sprintf(rootDir, "%s/vnode%d", tsVnodeDir, pVnodeCfg->cfg.vgId); @@ -81,12 +73,28 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { } } + code = vnodeSaveCfg(pVnodeCfg); + if (code != TSDB_CODE_SUCCESS) { + dError("vgId:%d, failed to save vnode cfg, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(code)); + return code; + } + + STsdbCfg tsdbCfg = {0}; + tsdbCfg.precision = pVnodeCfg->cfg.precision; + tsdbCfg.tsdbId = pVnodeCfg->cfg.vgId; + tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions; + tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile; + tsdbCfg.minRowsPerFileBlock = -1; + tsdbCfg.maxRowsPerFileBlock = -1; + tsdbCfg.keep = -1; + tsdbCfg.maxCacheSize = -1; + char tsdbDir[TSDB_FILENAME_LEN] = {0}; sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId); code = tsdbCreateRepo(tsdbDir, &tsdbCfg, NULL); - if (code <0) { + if (code != TSDB_CODE_SUCCESS) { dError("vgId:%d, failed to create tsdb in vnode, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(terrno)); - return code; + return terrno; } dPrint("vgId:%d, vnode is created, clog:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.commitLog); @@ -96,8 +104,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { } int32_t vnodeDrop(int32_t vgId) { - - SVnodeObj *pVnode = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, vgId); + SVnodeObj *pVnode = *(SVnodeObj **)taosGetIntHashData(tsDnodeVnodesHash, vgId); if (pVnode == NULL) { dTrace("vgId:%d, failed to drop, vgId not exist", vgId); return TSDB_CODE_INVALID_VGROUP_ID; @@ -114,18 +121,25 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { char temp[TSDB_FILENAME_LEN]; pthread_once(&vnodeModuleInit, vnodeInit); - SVnodeObj vnodeObj = {0}; - vnodeObj.vgId = vnode; - vnodeObj.status = VN_STATUS_INIT; - vnodeObj.refCount = 1; - vnodeObj.version = 0; - SVnodeObj *pVnode = (SVnodeObj *)taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *)(&vnodeObj)); + SVnodeObj *pVnode = calloc(sizeof(SVnodeObj), 1); + pVnode->vgId = vnode; + pVnode->status = VN_STATUS_INIT; + pVnode->refCount = 1; + pVnode->version = 0; + taosAddIntHash(tsDnodeVnodesHash, pVnode->vgId, (char *)(&pVnode)); + + int32_t code = vnodeReadCfg(pVnode); + if (code != TSDB_CODE_SUCCESS) { + dError("pVnode:%p vgId:%d, failed to read cfg file", pVnode, pVnode->vgId); + taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); + return code; + } pVnode->wqueue = dnodeAllocateWqueue(pVnode); pVnode->rqueue = dnodeAllocateRqueue(pVnode); sprintf(temp, "%s/wal", rootDir); - pVnode->wal = walOpen(temp, 3, tsCommitLog); + pVnode->wal = walOpen(temp, pVnode->walCfg.wals, pVnode->walCfg.commitLog); pVnode->sync = NULL; pVnode->events = NULL; pVnode->cq = NULL; @@ -155,7 +169,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { int32_t vnodeClose(int32_t vgId) { - SVnodeObj *pVnode = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, vgId); + SVnodeObj *pVnode = *(SVnodeObj **)taosGetIntHashData(tsDnodeVnodesHash, vgId); if (pVnode == NULL) return 0; dTrace("pVnode:%p vgId:%d, vnode will be closed", pVnode, pVnode->vgId); @@ -188,6 +202,7 @@ void vnodeRelease(void *pVnodeRaw) { } dTrace("pVnode:%p vgId:%d, vnode is released", pVnode, pVnode->vgId); + free(pVnode); tsOpennedVnodes--; if (tsOpennedVnodes <= 0) { @@ -198,12 +213,19 @@ void vnodeRelease(void *pVnodeRaw) { } void *vnodeGetVnode(int32_t vgId) { - SVnodeObj *pVnode = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, vgId); + SVnodeObj *pVnode = *(SVnodeObj **) taosGetIntHashData(tsDnodeVnodesHash, vgId); if (pVnode == NULL) { terrno = TSDB_CODE_INVALID_VGROUP_ID; return NULL; } + return pVnode; +} + +void *vnodeAccquireVnode(int32_t vgId) { + SVnodeObj *pVnode = vnodeGetVnode(vgId); + if (pVnode == NULL) return pVnode; + atomic_add_fetch_32(&pVnode->refCount, 1); dTrace("pVnode:%p vgId:%d, get vnode, refCount:%d", pVnode, pVnode->vgId, pVnode->refCount); @@ -215,7 +237,7 @@ void *vnodeGetRqueue(void *pVnode) { } void *vnodeGetWqueue(int32_t vgId) { - SVnodeObj *pVnode = vnodeGetVnode(vgId); + SVnodeObj *pVnode = vnodeAccquireVnode(vgId); if (pVnode == NULL) return NULL; return pVnode->wqueue; } @@ -234,7 +256,7 @@ void vnodeBuildStatusMsg(void *param) { } static void vnodeBuildVloadMsg(char *pNode, void * param) { - SVnodeObj *pVnode = (SVnodeObj *) pNode; + SVnodeObj *pVnode = *(SVnodeObj **) pNode; if (pVnode->status == VN_STATUS_DELETING) return; SDMStatusMsg *pStatus = param; @@ -247,8 +269,9 @@ static void vnodeBuildVloadMsg(char *pNode, void * param) { } static void vnodeCleanUp(SVnodeObj *pVnode) { - taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); + taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); + //syncStop(pVnode->sync); tsdbCloseRepo(pVnode->tsdb); walClose(pVnode->wal); @@ -256,7 +279,94 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { vnodeRelease(pVnode); } +// TODO: this is a simple implement static int vnodeWALCallback(void *arg) { SVnodeObj *pVnode = arg; return walRenew(pVnode->wal); -} \ No newline at end of file +} + +static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { + char cfgFile[TSDB_FILENAME_LEN * 2] = {0}; + sprintf(cfgFile, "%s/vnode%d/config", tsVnodeDir, pVnodeCfg->cfg.vgId); + + FILE *fp = fopen(cfgFile, "w"); + if (!fp) return errno; + + fprintf(fp, "commitLog %d\n", pVnodeCfg->cfg.commitLog); + fprintf(fp, "wals %d\n", 3); + fprintf(fp, "arbitratorIp %d\n", pVnodeCfg->vpeerDesc[0].ip); + fprintf(fp, "quorum %d\n", 1); + fprintf(fp, "replica %d\n", pVnodeCfg->cfg.replications); + for (int32_t i = 0; i < pVnodeCfg->cfg.replications; i++) { + fprintf(fp, "index%d nodeId %d nodeIp %u name n%d\n", i, pVnodeCfg->vpeerDesc[i].dnodeId, pVnodeCfg->vpeerDesc[i].ip, pVnodeCfg->vpeerDesc[i].dnodeId); + } + + fclose(fp); + dTrace("vgId:%d, save vnode cfg successed", pVnodeCfg->cfg.vgId); + + return TSDB_CODE_SUCCESS; +} + +// TODO: this is a simple implement +static int32_t vnodeReadCfg(SVnodeObj *pVnode) { + char option[5][16] = {0}; + char cfgFile[TSDB_FILENAME_LEN * 2] = {0}; + sprintf(cfgFile, "%s/vnode%d/config", tsVnodeDir, pVnode->vgId); + + FILE *fp = fopen(cfgFile, "r"); + if (!fp) return errno; + + int32_t commitLog = -1; + int32_t num = fscanf(fp, "%s %d", option[0], &commitLog); + if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT; + if (strcmp(option[0], "commitLog") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; + if (commitLog == -1) return TSDB_CODE_INVALID_FILE_FORMAT; + pVnode->walCfg.commitLog = (int8_t)commitLog; + + int32_t wals = -1; + num = fscanf(fp, "%s %d", option[0], &wals); + if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT; + if (strcmp(option[0], "wals") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; + if (wals == -1) return TSDB_CODE_INVALID_FILE_FORMAT; + pVnode->walCfg.wals = (int8_t)wals; + + int32_t arbitratorIp = -1; + num = fscanf(fp, "%s %u", option[0], &arbitratorIp); + if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT; + if (strcmp(option[0], "arbitratorIp") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; + if (arbitratorIp == -1) return TSDB_CODE_INVALID_FILE_FORMAT; + pVnode->syncCfg.arbitratorIp = arbitratorIp; + + int32_t quorum = -1; + num = fscanf(fp, "%s %d", option[0], &quorum); + if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT; + if (strcmp(option[0], "quorum") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; + if (quorum == -1) return TSDB_CODE_INVALID_FILE_FORMAT; + pVnode->syncCfg.quorum = (int8_t)quorum; + + int32_t replica = -1; + num = fscanf(fp, "%s %d", option[0], &replica); + if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT; + if (strcmp(option[0], "replica") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; + if (replica == -1) return TSDB_CODE_INVALID_FILE_FORMAT; + pVnode->syncCfg.replica = (int8_t)replica; + + for (int32_t i = 0; i < replica; ++i) { + int32_t dnodeId = -1; + uint32_t dnodeIp = -1; + num = fscanf(fp, "%s %s %d %s %u %s %s", option[0], option[1], &dnodeId, option[2], &dnodeIp, option[3], pVnode->syncCfg.nodeInfo[i].name); + if (num != 7) return TSDB_CODE_INVALID_FILE_FORMAT; + if (strcmp(option[1], "nodeId") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; + if (strcmp(option[2], "nodeIp") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; + if (strcmp(option[3], "name") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; + if (dnodeId == -1) return TSDB_CODE_INVALID_FILE_FORMAT; + if (dnodeIp == -1) return TSDB_CODE_INVALID_FILE_FORMAT; + pVnode->syncCfg.nodeInfo[i].nodeId = dnodeId; + pVnode->syncCfg.nodeInfo[i].nodeIp = dnodeIp; + } + + fclose(fp); + dTrace("pVnode:%p vgId:%d, read vnode cfg successed", pVnode, pVnode->vgId); + + return TSDB_CODE_SUCCESS; +} diff --git a/src/vnode/main/src/vnodeRead.c b/src/vnode/main/src/vnodeRead.c new file mode 100644 index 0000000000000000000000000000000000000000..929a30fbcdd87f10bcfdea5bb94a0a9cbda97b69 --- /dev/null +++ b/src/vnode/main/src/vnodeRead.c @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taosmsg.h" +#include "taoserror.h" +#include "tlog.h" +#include "tqueue.h" +#include "trpc.h" +#include "tsdb.h" +#include "twal.h" +#include "dataformat.h" +#include "vnode.h" +#include "vnodeInt.h" +#include "queryExecutor.h" + +static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, int32_t contLen, SRspRet *pRet); +static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet); +static int32_t vnodeProcessRetrieveMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet); + +void vnodeInitReadFp(void) { + vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg; + vnodeProcessReadMsgFp[TSDB_MSG_TYPE_RETRIEVE] = vnodeProcessRetrieveMsg; +} + +int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, SRspRet *ret) { + SVnodeObj *pVnode = (SVnodeObj *)param; + + if (vnodeProcessReadMsgFp[msgType] == NULL) + return TSDB_CODE_MSG_NOT_PROCESSED; + + if (pVnode->status == VN_STATUS_DELETING || pVnode->status == VN_STATUS_CLOSING) + return TSDB_CODE_NOT_ACTIVE_VNODE; + + return (*vnodeProcessReadMsgFp[msgType])(pVnode, pCont, contLen, ret); +} + +static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet) { + SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pCont; + memset(pRet, 0, sizeof(SRspRet)); + + int32_t code = TSDB_CODE_SUCCESS; + + SQInfo* pQInfo = NULL; + if (contLen != 0) { + void* tsdb = vnodeGetTsdb(pVnode); + pRet->code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo); + + SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); + pRsp->qhandle = htobe64((uint64_t) (pQInfo)); + pRsp->code = pRet->code; + + pRet->len = sizeof(SQueryTableRsp); + pRet->rsp = pRsp; + + dTrace("pVnode:%p vgId:%d QInfo:%p, dnode query msg disposed", pVnode, pVnode->vgId, pQInfo); + } else { + pQInfo = pCont; + code = TSDB_CODE_ACTION_IN_PROGRESS; + } + + qTableQuery(pQInfo); // do execute query + + return code; +} + +static int32_t vnodeProcessRetrieveMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet) { + SRetrieveTableMsg *pRetrieve = pCont; + void *pQInfo = (void*) htobe64(pRetrieve->qhandle); + memset(pRet, 0, sizeof(SRspRet)); + + int32_t code = TSDB_CODE_SUCCESS; + + dTrace("pVnode:%p vgId:%d QInfo:%p, retrieve msg is received", pVnode, pVnode->vgId, pQInfo); + + pRet->code = qRetrieveQueryResultInfo(pQInfo); + if (pRet->code != TSDB_CODE_SUCCESS) { + //TODO + pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); + memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); + } else { + // todo check code and handle error in build result set + pRet->code = qDumpRetrieveResult(pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len); + + if (qHasMoreResultsToRetrieve(pQInfo)) { + pRet->qhandle = pQInfo; + code = TSDB_CODE_ACTION_NEED_REPROCESSED; + } else { + // no further execution invoked, release the ref to vnode + qDestroyQueryInfo(pQInfo); + vnodeRelease(pVnode); + } + } + + dTrace("pVnode:%p vgId:%d QInfo:%p, retrieve msg is disposed", pVnode, pVnode->vgId, pQInfo); + return code; +} diff --git a/src/vnode/tsdb/inc/tsdb.h b/src/vnode/tsdb/inc/tsdb.h index b20711df1dc4b4a515dad83105bc9977588f5483..b2673413ae9eb66247c9f4ab7c68baac694fcbfb 100644 --- a/src/vnode/tsdb/inc/tsdb.h +++ b/src/vnode/tsdb/inc/tsdb.h @@ -337,6 +337,12 @@ SArray *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle); */ SArray *tsdbQueryTableList(tsdb_repo_t* tsdb, int64_t uid, const wchar_t *pTagCond, size_t len); +/** + * clean up the query handle + * @param queryHandle + */ +void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle); + #ifdef __cplusplus } #endif diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 1596cd54d3b1524b4c091a31d839999b282621b7..da602df42461fa5076617d65b3fe60b80ba56238 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -367,14 +367,16 @@ int32_t tsdbInsertData(tsdb_repo_t *repo, SSubmitMsg *pMsg) { SSubmitMsgIter msgIter; tsdbInitSubmitMsgIter(pMsg, &msgIter); - SSubmitBlk *pBlock; + SSubmitBlk *pBlock = NULL; + int32_t code = TSDB_CODE_SUCCESS; + while ((pBlock = tsdbGetSubmitMsgNext(&msgIter)) != NULL) { - if (tsdbInsertDataToTable(repo, pBlock) < 0) { - return -1; + if ((code = tsdbInsertDataToTable(repo, pBlock)) != TSDB_CODE_SUCCESS) { + return code; } } - return 0; + return code; } /** @@ -654,7 +656,7 @@ static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo) { } static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo) { - char fname[128]; + char fname[260]; if (pRepo == NULL) return 0; char *dirName = calloc(1, strlen(pRepo->rootDir) + strlen("tsdb") + 2); if (dirName == NULL) { @@ -735,7 +737,9 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) { STableId tableId = {.uid = pBlock->uid, .tid = pBlock->tid}; STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, tableId); - if (pTable == NULL) return -1; + if (pTable == NULL) { + return TSDB_CODE_INVALID_TABLE_ID; + } SSubmitBlkIter blkIter; SDataRow row; @@ -747,7 +751,7 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) { } } - return 0; + return TSDB_CODE_SUCCESS; } static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) { diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c index ed1229905005793719503d766e532aa43c8bd257..9d8780d0f4aaa74d8788e54c9ee78b791b0208d3 100644 --- a/src/vnode/tsdb/src/tsdbRead.c +++ b/src/vnode/tsdb/src/tsdbRead.c @@ -49,9 +49,8 @@ typedef struct SQueryFilePos { } SQueryFilePos; typedef struct SDataBlockLoadInfo { - int32_t fileListIndex; - int32_t fileId; - int32_t slotIdx; + SFileGroup* fileGroup; + int32_t slot; int32_t sid; SArray *pLoadedCols; } SDataBlockLoadInfo; @@ -190,10 +189,9 @@ static void initQueryFileInfoFD(SQueryFilesInfo *pVnodeFilesInfo) { } static void vnodeInitDataBlockLoadInfo(SDataBlockLoadInfo *pBlockLoadInfo) { - pBlockLoadInfo->slotIdx = -1; - pBlockLoadInfo->fileId = -1; + pBlockLoadInfo->slot = -1; pBlockLoadInfo->sid = -1; - pBlockLoadInfo->fileListIndex = -1; + pBlockLoadInfo->fileGroup = NULL; } static void vnodeInitCompBlockLoadInfo(SLoadCompBlockInfo *pCompBlockLoadInfo) { @@ -202,76 +200,6 @@ static void vnodeInitCompBlockLoadInfo(SLoadCompBlockInfo *pCompBlockLoadInfo) { pCompBlockLoadInfo->fileListIndex = -1; } -static int fileOrderComparFn(const void *p1, const void *p2) { - SHeaderFileInfo *pInfo1 = (SHeaderFileInfo *)p1; - SHeaderFileInfo *pInfo2 = (SHeaderFileInfo *)p2; - - if (pInfo1->fileId == pInfo2->fileId) { - return 0; - } - - return (pInfo1->fileId > pInfo2->fileId) ? 1 : -1; -} - -void vnodeRecordAllFiles(int32_t vnodeId, SQueryFilesInfo *pVnodeFilesInfo) { - char suffix[] = ".head"; - pVnodeFilesInfo->pFileInfo = taosArrayInit(4, sizeof(int32_t)); - - struct dirent *pEntry = NULL; - pVnodeFilesInfo->vnodeId = vnodeId; - char* tsDirectory = ""; - - sprintf(pVnodeFilesInfo->dbFilePathPrefix, "%s/vnode%d/db/", tsDirectory, vnodeId); - DIR *pDir = opendir(pVnodeFilesInfo->dbFilePathPrefix); - if (pDir == NULL) { - // dError("QInfo:%p failed to open directory:%s, %s", pQInfo, pVnodeFilesInfo->dbFilePathPrefix, - // strerror(errno)); - return; - } - - while ((pEntry = readdir(pDir)) != NULL) { - if ((pEntry->d_name[0] == '.' && pEntry->d_name[1] == '\0') || (strcmp(pEntry->d_name, "..") == 0)) { - continue; - } - - if (pEntry->d_type & DT_DIR) { - continue; - } - - size_t len = strlen(pEntry->d_name); - if (strcasecmp(&pEntry->d_name[len - 5], suffix) != 0) { - continue; - } - - int32_t vid = 0; - int32_t fid = 0; - sscanf(pEntry->d_name, "v%df%d", &vid, &fid); - if (vid != vnodeId) { /* ignore error files */ - // dError("QInfo:%p error data file:%s in vid:%d, ignore", pQInfo, pEntry->d_name, vnodeId); - continue; - } - -// int32_t firstFid = pVnode->fileId - pVnode->numOfFiles + 1; -// if (fid > pVnode->fileId || fid < firstFid) { -// dError("QInfo:%p error data file:%s in vid:%d, fid:%d, fid range:%d-%d", pQInfo, pEntry->d_name, vnodeId, -// fid, firstFid, pVnode->fileId); -// continue; -// } - - assert(fid >= 0 && vid >= 0); - taosArrayPush(pVnodeFilesInfo->pFileInfo, &fid); - } - - closedir(pDir); - - // dTrace("QInfo:%p find %d data files in %s to be checked", pQInfo, pVnodeFilesInfo->numOfFiles, - // pVnodeFilesInfo->dbFilePathPrefix); - - // order the files information according their names */ - size_t numOfFiles = taosArrayGetSize(pVnodeFilesInfo->pFileInfo); - qsort(pVnodeFilesInfo->pFileInfo->pData, numOfFiles, sizeof(SHeaderFileInfo), fileOrderComparFn); -} - tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond, SArray *idList, SArray *pColumnInfo) { // todo 1. filter not exist table @@ -282,7 +210,6 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond pQueryHandle->window = pCond->twindow; pQueryHandle->pTsdb = tsdb; - pQueryHandle->pColumns = pColumnInfo; pQueryHandle->loadDataAfterSeek = false; pQueryHandle->isFirstSlot = true; @@ -331,9 +258,6 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond vnodeInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo); vnodeInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo); - int32_t vnodeId = 1; - vnodeRecordAllFiles(vnodeId, &pQueryHandle->vnodeFileInfo); - return (tsdb_query_handle_t)pQueryHandle; } @@ -468,6 +392,7 @@ static bool loadQualifiedDataFromFileBlock(STsdbQueryHandle *pQueryHandle) { } } + taosArrayDestroy(sa); return pQueryHandle->realNumOfRows > 0; } @@ -513,10 +438,10 @@ bool moveToNextBlock(STsdbQueryHandle *pQueryHandle, int32_t step) { return true; } } else { // check data in cache + pQueryHandle->cur.fid = -1; return hasMoreDataInCacheForSingleModel(pQueryHandle); } - } else { - // next block in the same file + } else { // next block in the same file cur->slot += step; SCompBlock* pBlock = &pCheckInfo->pCompInfo->blocks[cur->slot]; @@ -601,9 +526,11 @@ static void filterDataInDataBlock(STsdbQueryHandle *pQueryHandle, SDataCols* pCo if (QUERY_IS_ASC_QUERY(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) { endPos = blockInfo.size - 1; pQueryHandle->realNumOfRows = endPos - cur->pos + 1; + pCheckInfo->lastKey = blockInfo.window.ekey + 1; } else if (!QUERY_IS_ASC_QUERY(pQueryHandle->order) && pQueryHandle->window.ekey < blockInfo.window.skey) { endPos = 0; pQueryHandle->realNumOfRows = cur->pos + 1; + pCheckInfo->lastKey = blockInfo.window.ekey - 1; } else { endPos = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, pQueryHandle->window.ekey, pQueryHandle->order); @@ -614,6 +541,8 @@ static void filterDataInDataBlock(STsdbQueryHandle *pQueryHandle, SDataCols* pCo } else { pQueryHandle->realNumOfRows = endPos - cur->pos; } + + pCheckInfo->lastKey = ((int64_t*)(pCols->cols[0].pData))[endPos] + 1; } else { if (endPos > cur->pos) { pQueryHandle->realNumOfRows = 0; @@ -621,6 +550,8 @@ static void filterDataInDataBlock(STsdbQueryHandle *pQueryHandle, SDataCols* pCo } else { pQueryHandle->realNumOfRows = cur->pos - endPos; } + + assert(0); } } @@ -751,7 +682,7 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf int32_t tid = pCheckInfo->tableId.tid; while (pCheckInfo->pFileGroup != NULL) { - if ((fid = getFileCompInfo(pCheckInfo, pCheckInfo->pFileGroup)) < 0) { + if (getFileCompInfo(pCheckInfo, pCheckInfo->pFileGroup) != TSDB_CODE_SUCCESS) { break; } @@ -761,7 +692,6 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf pCheckInfo->pFileGroup->fileId, tid); pCheckInfo->pFileGroup = tsdbGetFileGroupNext(&pCheckInfo->fileIter); - continue; } @@ -790,7 +720,7 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf assert(index >= 0 && index < pCheckInfo->compIndex[tid].numOfSuperBlocks); // load first data block into memory failed, caused by disk block error - bool blockLoaded = false; + bool blockLoaded = false; SArray *sa = getDefaultLoadColumns(pQueryHandle, true); // todo no need to loaded at all @@ -810,27 +740,41 @@ static bool getQualifiedDataBlock(STsdbQueryHandle *pQueryHandle, STableCheckInf pFile->fd = open(pFile->fname, O_RDONLY); } - // if (tsdbLoadDataBlock(pFile, &pCheckInfo->pCompInfo->blocks[cur->slot], 1, - // pCheckInfo->pDataCols, data) == 0) { - // blockLoaded = true; - // } + if (tsdbLoadDataBlock(pFile, pBlock, 1, pCheckInfo->pDataCols, data) == 0) { + SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo; + pBlockLoadInfo->fileGroup = pCheckInfo->pFileGroup; + pBlockLoadInfo->slot = pQueryHandle->cur.slot; + pBlockLoadInfo->sid = pCheckInfo->pTableObj->tableId.tid; + + blockLoaded = true; + } // dError("QInfo:%p fileId:%d total numOfBlks:%d blockId:%d load into memory failed due to error in disk files", // GET_QINFO_ADDR(pQuery), pQuery->fileId, pQuery->numOfBlocks, blkIdx); // failed to load data from disk, abort current query if (blockLoaded == false) { + taosArrayDestroy(sa); + tfree(data); + return false; } // todo search qualified points in blk, according to primary key (timestamp) column SDataCols* pDataCols = pCheckInfo->pDataCols; + + TSKEY* d = (TSKEY*) pDataCols->cols[PRIMARYKEY_TIMESTAMP_COL_INDEX].pData; + assert(d[0] == pBlock->keyFirst && d[pBlock->numOfPoints - 1] == pBlock->keyLast); + cur->pos = binarySearchForKey(pDataCols->cols[0].pData, pBlock->numOfPoints, key, pQueryHandle->order); cur->fid = pCheckInfo->pFileGroup->fileId; assert(cur->pos >= 0 && cur->fid >= 0 && cur->slot >= 0); filterDataInDataBlock(pQueryHandle, pCheckInfo->pDataCols, sa); + + taosArrayDestroy(sa); + tfree(data); return pQueryHandle->realNumOfRows > 0; } @@ -838,8 +782,6 @@ static bool hasMoreDataInFileForSingleTableModel(STsdbQueryHandle* pHandle) { assert(pHandle->activeIndex == 0 && taosArrayGetSize(pHandle->pTableCheckInfo) == 1); STsdbFileH* pFileHandle = tsdbGetFile(pHandle->pTsdb); -// SQueryFilePos* cur = &pHandle->cur; - STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); if (!pCheckInfo->checkFirstFileBlock) { @@ -952,7 +894,7 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) { rows = pHandle->realNumOfRows; skey = *(TSKEY*) pColInfoEx->pData; - ekey = *(TSKEY*) pColInfoEx->pData + TSDB_KEYSIZE * (rows - 1); + ekey = *(TSKEY*) ((char*)pColInfoEx->pData + TSDB_KEYSIZE * (rows - 1)); } } else { if (pTable->mem != NULL) { @@ -990,6 +932,10 @@ SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle; if (pHandle->cur.fid < 0) { + + + + return pHandle->pColumns; } else { STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex); @@ -1001,10 +947,17 @@ SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList return pHandle->pColumns; } else { SArray *sa = getDefaultLoadColumns(pHandle, true); - - doLoadDataFromFileBlock(pHandle); - filterDataInDataBlock(pHandle, pCheckInfo->pDataCols, sa); - return pHandle->pColumns; + + // data block has been loaded, todo extract method + SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo; + if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->sid == pCheckInfo->pTableObj->tableId.tid) { + return pHandle->pColumns; + } else { + doLoadDataFromFileBlock(pHandle); + filterDataInDataBlock(pHandle, pCheckInfo->pDataCols, sa); + + return pHandle->pColumns; + } } } } @@ -1351,3 +1304,36 @@ SArray *tsdbQueryTableList(tsdb_repo_t* tsdb, int64_t uid, const wchar_t *pTagCo return result; } } + +void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle) { + STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) queryHandle; + + size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo); + for(int32_t i = 0; i < size; ++i) { + STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); + tSkipListDestroyIter(pTableCheckInfo->iter); + + if (pTableCheckInfo->pDataCols != NULL) { + tfree(pTableCheckInfo->pDataCols->buf); + } + + tfree(pTableCheckInfo->pDataCols); + + tfree(pTableCheckInfo->pCompInfo); + tfree(pTableCheckInfo->compIndex); + } + + taosArrayDestroy(pQueryHandle->pTableCheckInfo); + + size_t cols = taosArrayGetSize(pQueryHandle->pColumns); + for(int32_t i = 0; i < cols; ++i) { + SColumnInfoEx *pColInfo = taosArrayGet(pQueryHandle->pColumns, i); + tfree(pColInfo->pData); + } + + taosArrayDestroy(pQueryHandle->pColumns); + + tfree(pQueryHandle->unzipBuffer); + tfree(pQueryHandle->secondaryUnzipBuffer); + tfree(pQueryHandle); +} diff --git a/src/vnode/wal/src/walMain.c b/src/vnode/wal/src/walMain.c index 9708b0d9dc31429412aff292567175983cd20f32..504e37027989f7b16ef7fc09f33e8e0de014cc73 100644 --- a/src/vnode/wal/src/walMain.c +++ b/src/vnode/wal/src/walMain.c @@ -80,7 +80,8 @@ void *walOpen(char *path, int max, int level) { } void walClose(void *handle) { - + if (handle == NULL) return; + SWal *pWal = (SWal *)handle; close(pWal->fd); @@ -125,7 +126,7 @@ int walRenew(twal_h handle) { if (pWal->num > pWal->max) { // remove the oldest wal file - char name[TSDB_FILENAME_LEN]; + char name[TSDB_FILENAME_LEN * 3]; sprintf(name, "%s/%s%d", pWal->path, walPrefix, pWal->id - pWal->max); if (remove(name) <0) { wError("wal:%s, failed to remove(%s)", name, strerror(errno)); @@ -150,7 +151,7 @@ int walWrite(void *handle, SWalHead *pHead) { if (pWal->level == TAOS_WAL_NOLOG) return 0; pHead->signature = walSignature; - taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWal)); + taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead)); int contLen = pHead->len + sizeof(SWalHead); if(write(pWal->fd, pHead, contLen) != contLen) { @@ -177,7 +178,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, SWalHead *, in uint32_t maxId = 0, minId = -1, index =0; int plen = strlen(walPrefix); - char opath[TSDB_FILENAME_LEN]; + char opath[TSDB_FILENAME_LEN+5]; sprintf(opath, "%s/old", pWal->path); // is there old directory? @@ -272,7 +273,7 @@ static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, SW break; } - if (taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { + if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { wWarn("wal:%s, cksum is messed up, skip the rest of file", name); break; } @@ -294,8 +295,8 @@ static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, SW int walHandleExistingFiles(char *path) { int code = 0; - char oname[TSDB_FILENAME_LEN]; - char nname[TSDB_FILENAME_LEN]; + char oname[TSDB_FILENAME_LEN * 3]; + char nname[TSDB_FILENAME_LEN * 3]; char opath[TSDB_FILENAME_LEN]; sprintf(opath, "%s/old", path); @@ -336,7 +337,7 @@ int walHandleExistingFiles(char *path) { static int walRemoveWalFiles(char *path) { int plen = strlen(walPrefix); - char name[TSDB_FILENAME_LEN]; + char name[TSDB_FILENAME_LEN * 3]; int code = 0; if (access(path, F_OK) != 0) return 0; diff --git a/tests/examples/c/demo.c b/tests/examples/c/demo.c index 853a949162e10ae64677734d77ad7ffeec2eb3bb..e06dbedfca3778d191f0d49cc145d69bdbcdbe8a 100644 --- a/tests/examples/c/demo.c +++ b/tests/examples/c/demo.c @@ -24,17 +24,45 @@ void taosMsleep(int mseconds); +static int32_t doQuery(TAOS* taos, const char* sql) { + int32_t code = taos_query(taos, sql); + if (code != 0) { + printf("failed to execute query, reason:%s\n", taos_errstr(taos)); + return -1; + } + + TAOS_RES* res = taos_use_result(taos); + TAOS_ROW row = NULL; + char buf[512] = {0}; + + int32_t numOfFields = taos_num_fields(res); + TAOS_FIELD* pFields = taos_fetch_fields(res); + + while((row = taos_fetch_row(res)) != NULL) { + taos_print_row(buf, row, pFields, numOfFields); + printf("%s\n", buf); + memset(buf, 0, 512); + } + + taos_free_result(res); + + return 0; +} + int main(int argc, char *argv[]) { TAOS * taos; char qstr[1024]; TAOS_RES *result; + // connect to server if (argc < 2) { printf("please input server-ip \n"); return 0; } + taos_options(TSDB_OPTION_CONFIGDIR, "~/sec/cfg"); + // init TAOS taos_init(); @@ -45,6 +73,22 @@ int main(int argc, char *argv[]) { } printf("success to connect to server\n"); + doQuery(taos, "create database if not exists test"); + doQuery(taos, "use test"); + doQuery(taos, "create table if not exists tm0 (ts timestamp, k int);"); + doQuery(taos, "insert into tm0 values('2020-1-1 1:1:1', 1);"); + doQuery(taos, "insert into tm0 values('2020-1-1 1:1:2', 2);"); + doQuery(taos, "insert into tm0 values('2020-1-1 1:1:3', 3);"); + doQuery(taos, "insert into tm0 values('2020-1-1 1:1:4', 4);"); + doQuery(taos, "insert into tm0 values('2020-1-1 1:1:5', 5);"); + doQuery(taos, "insert into tm0 values('2020-1-1 1:1:6', 6);"); + doQuery(taos, "insert into tm0 values('2020-1-1 1:1:7', 7);"); + doQuery(taos, "insert into tm0 values('2020-1-1 1:1:8', 8);"); + doQuery(taos, "insert into tm0 values('2020-1-1 1:1:9', 9);"); + doQuery(taos, "select * from tm0;"); + + taos_close(taos); + return 0; taos_query(taos, "drop database demo"); if (taos_query(taos, "create database demo") != 0) { @@ -53,8 +97,10 @@ int main(int argc, char *argv[]) { } printf("success to create database\n"); + taos_query(taos, "use demo"); + // create table if (taos_query(taos, "create table m1 (ts timestamp, speed int)") != 0) { printf("failed to create table, reason:%s\n", taos_errstr(taos)); @@ -62,9 +108,11 @@ int main(int argc, char *argv[]) { } printf("success to create table\n"); + // sleep for one second to make sure table is created on data node // taosMsleep(1000); + // insert 10 records int i = 0; for (i = 0; i < 10; ++i) { @@ -76,6 +124,7 @@ int main(int argc, char *argv[]) { } printf("success to insert rows, total %d rows\n", i); + // query the records sprintf(qstr, "SELECT * FROM m1"); if (taos_query(taos, qstr) != 0) { @@ -83,12 +132,16 @@ int main(int argc, char *argv[]) { exit(1); } + result = taos_use_result(taos); + if (result == NULL) { printf("failed to get result, reason:%s\n", taos_errstr(taos)); exit(1); } + +// TAOS_ROW row; TAOS_ROW row; int rows = 0; @@ -96,6 +149,7 @@ int main(int argc, char *argv[]) { TAOS_FIELD *fields = taos_fetch_fields(result); char temp[256]; + printf("select * from table, result:\n"); // fetch the records row by row while ((row = taos_fetch_row(result))) { diff --git a/tests/script/basicSuite.sim b/tests/script/basicSuite.sim index d16e85323bc714f2a64587ef4d84330644bc40fd..d5892d8682e8ea0e7a074077a942cfa049cc0910 100644 --- a/tests/script/basicSuite.sim +++ b/tests/script/basicSuite.sim @@ -1,17 +1,8 @@ ################################# -run general/user/basic1.sim - -run general/show/dnodes.sim - -run general/db/basic1.sim -run general/db/basic2.sim -run general/db/basic3.sim -run general/db/basic4.sim -run general/db/basic5.sim run general/table/basic1.sim -run general/table/basic2.sim -run general/table/basic3.sim +#run general/table/basic2.sim +#run general/table/basic3.sim ################################## diff --git a/tests/script/general/table/basic1.sim b/tests/script/general/table/basic1.sim index 9a05fb6d67e45a18d2a3919117a2bae542d19777..fb924ac453ddd9bb1cb4526d3069add8bfac76b9 100644 --- a/tests/script/general/table/basic1.sim +++ b/tests/script/general/table/basic1.sim @@ -1,5 +1,6 @@ system sh/stop_dnodes.sh system sh/deploy.sh -n dnode1 -m 192.168.0.1 -i 192.168.0.1 +system ifconfig system sh/exec.sh -n dnode1 -s start sql connect @@ -43,9 +44,9 @@ print $data10 $data11 $data22 print $data20 $data11 $data22 print =============== insert data -sql insert into d1.n1 values(now, 1) -sql insert into d1.n1 values(now, 2) -sql insert into d1.n1 values(now, 3) +sql insert into d1.n1 values(now+1s, 1) +sql insert into d1.n1 values(now+2s, 2) +sql insert into d1.n1 values(now+3s, 3) print =============== query data sql select * from d1.n1 @@ -69,7 +70,3 @@ if $data21 != 3 then return -1 endi -sql drop database d1 - -system sh/exec.sh -n dnode1 -s stop -x SIGINT - diff --git a/tests/script/general/table/basic2.sim b/tests/script/general/table/basic2.sim index 18b98f4a3fc7c0c5a2f3b34c6cd954dd70d49440..7701ca1c1fb5575d0fc6613b948b2bdbcd7a38fa 100644 --- a/tests/script/general/table/basic2.sim +++ b/tests/script/general/table/basic2.sim @@ -64,6 +64,3 @@ if $data21 != 3 then return -1 endi -sql drop database d1 - -system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/script/general/table/basic3.sim b/tests/script/general/table/basic3.sim index 599d0116532fad30cb1f95808046e27acde33cb0..6558617384f7ef68b99ec10b9cf0e7e241c302d1 100644 --- a/tests/script/general/table/basic3.sim +++ b/tests/script/general/table/basic3.sim @@ -43,9 +43,9 @@ print $data10 $data11 $data22 print $data20 $data11 $data22 print =============== insert data -sql insert into db.n1 values(now, 1) -sql insert into db.n1 values(now, 2) -sql insert into db.n1 values(now, 3) +sql insert into db.n1 values(now+1s, 1) +sql insert into db.n1 values(now+2s, 2) +sql insert into db.n1 values(now+3s, 3) print =============== query data sql select * from db.n1 diff --git a/tests/script/tmp/dnode2.sim b/tests/script/tmp/dnode2.sim index a44840475925ff3289261688bd0a70d0ee0fd06b..6d9a844fb6562ff2af26d52c2de387cb52d0dd2a 100644 --- a/tests/script/tmp/dnode2.sim +++ b/tests/script/tmp/dnode2.sim @@ -1,4 +1,6 @@ system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -m 192.168.0.1 -i 192.168.0.1 +system sh/deploy.sh -n dnode2 -m 192.168.0.1 -i 192.168.0.2 system sh/exec_up.sh -n dnode1 -s start system sh/exec_up.sh -n dnode2 -s start sql connect \ No newline at end of file diff --git a/tests/tsim/src/cJSON.c b/tests/tsim/src/cJSON.c index b2e2f47bdac787b82ca0b93e50e69009e8a5f64c..ecf0e05b42cf5a6f83e98d95f72b10dd5360714e 100644 --- a/tests/tsim/src/cJSON.c +++ b/tests/tsim/src/cJSON.c @@ -290,7 +290,7 @@ loop_end: input_buffer->offset += (size_t)(after_end - number_c_string); - strncpy(item->numberstring, number_c_string, 12); + strncpy(item->numberstring, (const char *)number_c_string, 12); return true; } diff --git a/tests/tsim/src/simExe.c b/tests/tsim/src/simExe.c index d844f3b7860ea51b4a69715ad7d5c476896fbffd..912f10ba2a42001b5a970c5da57586d572953cdd 100644 --- a/tests/tsim/src/simExe.c +++ b/tests/tsim/src/simExe.c @@ -576,6 +576,7 @@ bool simCreateRestFulConnect(SScript *script, char *user, char *pass) { bool simCreateNativeConnect(SScript *script, char *user, char *pass) { simCloseTaosdConnect(script); void *taos = NULL; + taosMsleep(2000); for (int attempt = 0; attempt < 10; ++attempt) { taos = taos_connect(NULL, user, pass, NULL, tsMnodeShellPort); if (taos == NULL) {