diff --git a/cmake/install.inc b/cmake/install.inc index 4b2d4828f8c2884752dbc12b29ce21ca7e0495a6..8418612d4c54d7f7643e190da0af81064197b5d1 100755 --- a/cmake/install.inc +++ b/cmake/install.inc @@ -32,7 +32,7 @@ ELSEIF (TD_WINDOWS) #INSTALL(TARGETS taos RUNTIME DESTINATION driver) #INSTALL(TARGETS shell RUNTIME DESTINATION .) IF (TD_MVN_INSTALLED) - INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos-jdbcdriver-2.0.13-dist.jar DESTINATION connector/jdbc) + INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos-jdbcdriver-2.0.14-dist.jar DESTINATION connector/jdbc) ENDIF () ELSEIF (TD_DARWIN) SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh") diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 594a4c39b55071aeb5f3010a435717fb321abe38..7921399330f876a217b8f14f4871d7f47cebec21 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -54,8 +54,8 @@ #define DO_UPDATE_TAG_COLUMNS(ctx, ts) \ do { \ - for (int32_t i = 0; i < (ctx)->tagInfo.numOfTagCols; ++i) { \ - SQLFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[i]; \ + for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \ + SQLFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \ if (__ctx->functionId == TSDB_FUNC_TS_DUMMY) { \ __ctx->tag.i64Key = (ts); \ __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \ @@ -66,8 +66,8 @@ #define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \ do { \ - for (int32_t i = 0; i < (ctx)->tagInfo.numOfTagCols; ++i) { \ - SQLFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[i]; \ + for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \ + SQLFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \ aAggs[TSDB_FUNC_TAG].xFunction(__ctx); \ } \ } while (0); @@ -305,7 +305,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI } else if (functionId == TSDB_FUNC_FIRST || functionId == TSDB_FUNC_LAST) { *type = (int16_t)dataType; *bytes = (int16_t)dataBytes; - *interBytes = dataBytes; + *interBytes = (int16_t)(dataBytes + sizeof(SFirstLastInfo)); } else if (functionId == TSDB_FUNC_SPREAD) { *type = (int16_t)TSDB_DATA_TYPE_DOUBLE; *bytes = sizeof(double); @@ -1169,8 +1169,8 @@ static int32_t minmax_merge_impl(SQLFunctionCtx *pCtx, int32_t bytes, char *outp if ((*(int32_t *)output < v) ^ isMin) { *(int32_t *)output = v; - for (int32_t i = 0; i < pCtx->tagInfo.numOfTagCols; ++i) { - SQLFunctionCtx *__ctx = pCtx->tagInfo.pTagCtxList[i]; + for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) { + SQLFunctionCtx *__ctx = pCtx->tagInfo.pTagCtxList[j]; aAggs[TSDB_FUNC_TAG].xFunction(__ctx); } @@ -1679,16 +1679,35 @@ static void last_function_f(SQLFunctionCtx *pCtx, int32_t index) { if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { return; } - - SET_VAL(pCtx, 1, 1); - memcpy(pCtx->aOutputBuf, pData, pCtx->inputBytes); - - TSKEY ts = pCtx->ptsList[index]; - DO_UPDATE_TAG_COLUMNS(pCtx, ts); - - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - pResInfo->hasResult = DATA_SET_FLAG; - pResInfo->complete = true; // set query completed + + // the scan order is not the required order, ignore it + if (pCtx->order != pCtx->param[0].i64Key) { + return; + } + + if (pCtx->order == TSDB_ORDER_DESC) { + SET_VAL(pCtx, 1, 1); + memcpy(pCtx->aOutputBuf, pData, pCtx->inputBytes); + + TSKEY ts = pCtx->ptsList[index]; + DO_UPDATE_TAG_COLUMNS(pCtx, ts); + + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + pResInfo->hasResult = DATA_SET_FLAG; + pResInfo->complete = true; // set query completed + } else { // in case of ascending order check, all data needs to be checked + SResultRowCellInfo* pResInfo = GET_RES_INFO(pCtx); + TSKEY ts = pCtx->ptsList[index]; + + char* buf = GET_ROWCELL_INTERBUF(pResInfo); + if (pResInfo->hasResult != DATA_SET_FLAG || (*(TSKEY*)buf) < ts) { + pResInfo->hasResult = DATA_SET_FLAG; + memcpy(pCtx->aOutputBuf, pData, pCtx->inputBytes); + + *(TSKEY*)buf = ts; + DO_UPDATE_TAG_COLUMNS(pCtx, ts); + } + } } static void last_data_assign_impl(SQLFunctionCtx *pCtx, char *pData, int32_t index) { @@ -1711,7 +1730,7 @@ static void last_data_assign_impl(SQLFunctionCtx *pCtx, char *pData, int32_t ind static void last_dist_function(SQLFunctionCtx *pCtx) { /* - * 1. for scan data in asc order, no need to check data + * 1. for scan data is not the required order * 2. for data blocks that are not loaded, no need to check data */ if (pCtx->order != pCtx->param[0].i64Key) { @@ -2447,7 +2466,7 @@ static bool percentile_function_setup(SQLFunctionCtx *pCtx) { static void percentile_function(SQLFunctionCtx *pCtx) { int32_t notNullElems = 0; - SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx); + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); // the first stage, only acquire the min/max value @@ -2548,12 +2567,14 @@ static void percentile_finalizer(SQLFunctionCtx *pCtx) { double v = pCtx->param[0].nType == TSDB_DATA_TYPE_INT ? pCtx->param[0].i64Key : pCtx->param[0].dKey; SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - tMemBucket * pMemBucket = ((SPercentileInfo *)GET_ROWCELL_INTERBUF(pResInfo))->pMemBucket; - - if (pMemBucket->total > 0) { // check for null - *(double *)pCtx->aOutputBuf = getPercentile(pMemBucket, v); - } else { + SPercentileInfo* ppInfo = (SPercentileInfo *) GET_ROWCELL_INTERBUF(pResInfo); + + tMemBucket * pMemBucket = ppInfo->pMemBucket; + if (pMemBucket == NULL || pMemBucket->total == 0) { // check for null + assert(ppInfo->numOfElems == 0); setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes); + } else { + *(double *)pCtx->aOutputBuf = getPercentile(pMemBucket, v); } tMemBucketDestroy(pMemBucket); diff --git a/src/connector/jdbc/CMakeLists.txt b/src/connector/jdbc/CMakeLists.txt index 701a39b20994c2201328c7fd85a7da9aa4ab44a5..0eb3eb21cec7d79fc5fec5f7b0400a0a5a39a137 100644 --- a/src/connector/jdbc/CMakeLists.txt +++ b/src/connector/jdbc/CMakeLists.txt @@ -8,7 +8,7 @@ IF (TD_MVN_INSTALLED) ADD_CUSTOM_COMMAND(OUTPUT ${JDBC_CMD_NAME} POST_BUILD COMMAND mvn -Dmaven.test.skip=true install -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml - COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/target/taos-jdbcdriver-2.0.13-dist.jar ${LIBRARY_OUTPUT_PATH} + COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/target/taos-jdbcdriver-2.0.14-dist.jar ${LIBRARY_OUTPUT_PATH} COMMAND mvn -Dmaven.test.skip=true clean -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml COMMENT "build jdbc driver") ADD_CUSTOM_TARGET(${JDBC_TARGET_NAME} ALL WORKING_DIRECTORY ${EXECUTABLE_OUTPUT_PATH} DEPENDS ${JDBC_CMD_NAME}) diff --git a/src/connector/jdbc/deploy-pom.xml b/src/connector/jdbc/deploy-pom.xml index 1dc2625e62769a38eaa709c7e5493dd926466924..4564bde81e450f9c44be52ef69918dc0b3ada26f 100755 --- a/src/connector/jdbc/deploy-pom.xml +++ b/src/connector/jdbc/deploy-pom.xml @@ -5,7 +5,7 @@ com.taosdata.jdbc taos-jdbcdriver - 2.0.13 + 2.0.14 jar JDBCDriver diff --git a/src/connector/jdbc/pom.xml b/src/connector/jdbc/pom.xml index 3d1f40243547081d5ada664e0edaf3e9184f3172..7e087ebd9b05709518d45b8096cbd2b051c4a214 100755 --- a/src/connector/jdbc/pom.xml +++ b/src/connector/jdbc/pom.xml @@ -3,7 +3,7 @@ 4.0.0 com.taosdata.jdbc taos-jdbcdriver - 2.0.13 + 2.0.14 jar JDBCDriver https://github.com/taosdata/TDengine/tree/master/src/connector/jdbc diff --git a/src/dnode/inc/dnodeMInfos.h b/src/dnode/inc/dnodeMInfos.h index 9c3c85c47e2dbcc11c5b5a80fbf091bd93855149..2c3eef5d5d98e43db1ae783f9e7db2769fcf521e 100644 --- a/src/dnode/inc/dnodeMInfos.h +++ b/src/dnode/inc/dnodeMInfos.h @@ -24,9 +24,9 @@ extern "C" { int32_t dnodeInitMInfos(); void dnodeCleanupMInfos(); -void dnodeUpdateMInfos(SMnodeInfos *minfos); -void dnodeUpdateEpSetForPeer(SRpcEpSet *epSet); -void dnodeGetMInfos(SMnodeInfos *minfos); +void dnodeUpdateMInfos(SMInfos *pMinfos); +void dnodeUpdateEpSetForPeer(SRpcEpSet *pEpSet); +void dnodeGetMInfos(SMInfos *pMinfos); bool dnodeIsMasterEp(char *ep); #ifdef __cplusplus diff --git a/src/dnode/src/dnodeMInfos.c b/src/dnode/src/dnodeMInfos.c index cefe44aebe7f87803141ce3d75c45dca18463849..162de2243e71ba7ae4e74839753f98c1cb546237 100644 --- a/src/dnode/src/dnodeMInfos.c +++ b/src/dnode/src/dnodeMInfos.c @@ -22,12 +22,12 @@ #include "dnodeInt.h" #include "dnodeMInfos.h" -static SMnodeInfos tsMInfos; -static SRpcEpSet tsMEpSet; +static SMInfos tsMInfos; +static SRpcEpSet tsMEpSet; static pthread_mutex_t tsMInfosMutex; -static void dnodeResetMInfos(SMnodeInfos *minfos); -static void dnodePrintMInfos(SMnodeInfos *minfos); +static void dnodeResetMInfos(SMInfos *minfos); +static void dnodePrintMInfos(SMInfos *minfos); static int32_t dnodeReadMInfos(); static int32_t dnodeWriteMInfos(); @@ -44,14 +44,14 @@ int32_t dnodeInitMInfos() { void dnodeCleanupMInfos() { pthread_mutex_destroy(&tsMInfosMutex); } -void dnodeUpdateMInfos(SMnodeInfos *minfos) { - if (minfos->mnodeNum <= 0 || minfos->mnodeNum > 3) { - dError("invalid mnode infos, mnodeNum:%d", minfos->mnodeNum); +void dnodeUpdateMInfos(SMInfos *pMinfos) { + if (pMinfos->mnodeNum <= 0 || pMinfos->mnodeNum > 3) { + dError("invalid mnode infos, mnodeNum:%d", pMinfos->mnodeNum); return; } - for (int32_t i = 0; i < minfos->mnodeNum; ++i) { - SMnodeInfo *minfo = &minfos->mnodeInfos[i]; + for (int32_t i = 0; i < pMinfos->mnodeNum; ++i) { + SMInfo *minfo = &pMinfos->mnodeInfos[i]; minfo->mnodeId = htonl(minfo->mnodeId); if (minfo->mnodeId <= 0 || strlen(minfo->mnodeEp) <= 5) { dError("invalid mnode info:%d, mnodeId:%d mnodeEp:%s", i, minfo->mnodeId, minfo->mnodeEp); @@ -60,14 +60,14 @@ void dnodeUpdateMInfos(SMnodeInfos *minfos) { } pthread_mutex_lock(&tsMInfosMutex); - if (minfos->mnodeNum != tsMInfos.mnodeNum) { - dnodeResetMInfos(minfos); + if (pMinfos->mnodeNum != tsMInfos.mnodeNum) { + dnodeResetMInfos(pMinfos); dnodeWriteMInfos(); sdbUpdateAsync(); } else { - int32_t size = sizeof(SMnodeInfos); - if (memcmp(minfos, &tsMInfos, size) != 0) { - dnodeResetMInfos(minfos); + int32_t size = sizeof(SMInfos); + if (memcmp(pMinfos, &tsMInfos, size) != 0) { + dnodeResetMInfos(pMinfos); dnodeWriteMInfos(); sdbUpdateAsync(); } @@ -99,11 +99,11 @@ bool dnodeIsMasterEp(char *ep) { return isMaster; } -void dnodeGetMInfos(SMnodeInfos *minfos) { +void dnodeGetMInfos(SMInfos *pMinfos) { pthread_mutex_lock(&tsMInfosMutex); - memcpy(minfos, &tsMInfos, sizeof(SMnodeInfos)); + memcpy(pMinfos, &tsMInfos, sizeof(SMInfos)); for (int32_t i = 0; i < tsMInfos.mnodeNum; ++i) { - minfos->mnodeInfos[i].mnodeId = htonl(tsMInfos.mnodeInfos[i].mnodeId); + pMinfos->mnodeInfos[i].mnodeId = htonl(tsMInfos.mnodeInfos[i].mnodeId); } pthread_mutex_unlock(&tsMInfosMutex); } @@ -123,15 +123,15 @@ void dnodeGetEpSetForShell(SRpcEpSet *epSet) { pthread_mutex_unlock(&tsMInfosMutex); } -static void dnodePrintMInfos(SMnodeInfos *minfos) { - dInfo("print mnode infos, mnodeNum:%d inUse:%d", minfos->mnodeNum, minfos->inUse); - for (int32_t i = 0; i < minfos->mnodeNum; i++) { - dInfo("mnode index:%d, %s", minfos->mnodeInfos[i].mnodeId, minfos->mnodeInfos[i].mnodeEp); +static void dnodePrintMInfos(SMInfos *pMinfos) { + dInfo("print minfos, mnodeNum:%d inUse:%d", pMinfos->mnodeNum, pMinfos->inUse); + for (int32_t i = 0; i < pMinfos->mnodeNum; i++) { + dInfo("mnode index:%d, %s", pMinfos->mnodeInfos[i].mnodeId, pMinfos->mnodeInfos[i].mnodeEp); } } -static void dnodeResetMInfos(SMnodeInfos *minfos) { - if (minfos == NULL) { +static void dnodeResetMInfos(SMInfos *pMinfos) { + if (pMinfos == NULL) { tsMEpSet.numOfEps = 1; taosGetFqdnPortFromEp(tsFirst, tsMEpSet.fqdn[0], &tsMEpSet.port[0]); @@ -142,10 +142,10 @@ static void dnodeResetMInfos(SMnodeInfos *minfos) { return; } - if (minfos->mnodeNum == 0) return; + if (pMinfos->mnodeNum == 0) return; - int32_t size = sizeof(SMnodeInfos); - memcpy(&tsMInfos, minfos, size); + int32_t size = sizeof(SMInfos); + memcpy(&tsMInfos, pMinfos, size); tsMEpSet.inUse = tsMInfos.inUse; tsMEpSet.numOfEps = tsMInfos.mnodeNum; @@ -153,7 +153,7 @@ static void dnodeResetMInfos(SMnodeInfos *minfos) { taosGetFqdnPortFromEp(tsMInfos.mnodeInfos[i].mnodeEp, tsMEpSet.fqdn[i], &tsMEpSet.port[i]); } - dnodePrintMInfos(minfos); + dnodePrintMInfos(pMinfos); } static int32_t dnodeReadMInfos() { @@ -162,7 +162,7 @@ static int32_t dnodeReadMInfos() { char * content = calloc(1, maxLen + 1); cJSON * root = NULL; FILE * fp = NULL; - SMnodeInfos minfos = {0}; + SMInfos minfos = {0}; char file[TSDB_FILENAME_LEN + 20] = {0}; sprintf(file, "%s/mnodeEpSet.json", tsDnodeDir); @@ -241,7 +241,7 @@ PARSE_MINFOS_OVER: terrno = 0; for (int32_t i = 0; i < minfos.mnodeNum; ++i) { - SMnodeInfo *mInfo = &minfos.mnodeInfos[i]; + SMInfo *mInfo = &minfos.mnodeInfos[i]; dnodeUpdateEp(mInfo->mnodeId, mInfo->mnodeEp, NULL, NULL); } dnodeResetMInfos(&minfos); diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 5c01f6471659f18c63d5eeeded8c0c5d73cd1867..15378c77c13b3ff423f1fb974c38bf89b5f2e0fa 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -472,8 +472,8 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { } SStatusRsp *pStatusRsp = pMsg->pCont; - SMnodeInfos *minfos = &pStatusRsp->mnodes; - dnodeUpdateMInfos(minfos); + SMInfos *pMinfos = &pStatusRsp->mnodes; + dnodeUpdateMInfos(pMinfos); SDnodeCfg *pCfg = &pStatusRsp->dnodeCfg; pCfg->numOfVnodes = htonl(pCfg->numOfVnodes); diff --git a/src/dnode/src/dnodeModule.c b/src/dnode/src/dnodeModule.c index 7faa3c8913a7cc98cb7de8f33763d533100a7715..f664618f516c2cb36d116e3e4f47ed6daf28e042 100644 --- a/src/dnode/src/dnodeModule.c +++ b/src/dnode/src/dnodeModule.c @@ -147,8 +147,8 @@ void dnodeProcessModuleStatus(uint32_t moduleStatus) { } } -bool dnodeStartMnode(SMnodeInfos *minfos) { - SMnodeInfos *mnodes = minfos; +bool dnodeStartMnode(SMInfos *pMinfos) { + SMInfos *pMnodes = pMinfos; if (tsModuleStatus & (1 << TSDB_MOD_MNODE)) { dDebug("mnode module is already started, module status:%d", tsModuleStatus); @@ -159,7 +159,7 @@ bool dnodeStartMnode(SMnodeInfos *minfos) { dInfo("start mnode module, module status:%d, new status:%d", tsModuleStatus, moduleStatus); dnodeProcessModuleStatus(moduleStatus); - sdbUpdateSync(mnodes); + sdbUpdateSync(pMnodes); return true; } diff --git a/src/inc/dnode.h b/src/inc/dnode.h index eef4490800a4191c2dee55c450cf99b8381bb64d..1efaa4a24bc848c99ac265f07ad340f09914ca30 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -45,7 +45,7 @@ void dnodeGetEpSetForShell(SRpcEpSet *epSet); int32_t dnodeGetDnodeId(); void dnodeUpdateEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port); bool dnodeCheckEpChanged(int32_t dnodeId, char *epstr); -bool dnodeStartMnode(SMnodeInfos *minfos); +bool dnodeStartMnode(SMInfos *pMinfos); void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index b4d3bec958c09a421d6b7efbe6f989fb463fe549..e8e302924490eb0ebcccf10468c3469107fe6dfe 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -591,13 +591,13 @@ typedef struct { typedef struct { int32_t mnodeId; char mnodeEp[TSDB_EP_LEN]; -} SMnodeInfo; +} SMInfo; typedef struct { - int8_t inUse; - int8_t mnodeNum; - SMnodeInfo mnodeInfos[TSDB_MAX_REPLICA]; -} SMnodeInfos; + int8_t inUse; + int8_t mnodeNum; + SMInfo mnodeInfos[TSDB_MAX_REPLICA]; +} SMInfos; typedef struct { int32_t numOfMnodes; // tsNumOfMnodes @@ -632,7 +632,7 @@ typedef struct { } SStatusMsg; typedef struct { - SMnodeInfos mnodes; + SMInfos mnodes; SDnodeCfg dnodeCfg; SVgroupAccess vgAccess[]; } SStatusRsp; @@ -761,7 +761,7 @@ typedef struct { typedef struct { int32_t dnodeId; char dnodeEp[TSDB_EP_LEN]; // end point, hostname:port - SMnodeInfos mnodes; + SMInfos mnodes; } SCreateMnodeMsg; typedef struct { diff --git a/src/mnode/inc/mnodeMnode.h b/src/mnode/inc/mnodeMnode.h index 10cbcebe220c17d697fe24ddb398832895a42c9c..93f2fa11ea95f22ca4addf12496b79cdaddbf290 100644 --- a/src/mnode/inc/mnodeMnode.h +++ b/src/mnode/inc/mnodeMnode.h @@ -48,7 +48,7 @@ void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet); char* mnodeGetMnodeMasterEp(); void mnodeGetMnodeInfos(void *mnodes); -void mnodeUpdateMnodeEpSet(); +void mnodeUpdateMnodeEpSet(SMInfos *pMnodes); #ifdef __cplusplus } diff --git a/src/mnode/inc/mnodeSdb.h b/src/mnode/inc/mnodeSdb.h index 90c4eac40a925ece5b6ab11d89df50ecaf927aed..31ea2da640ef3b20b84ca769a22637332d6ffe34 100644 --- a/src/mnode/inc/mnodeSdb.h +++ b/src/mnode/inc/mnodeSdb.h @@ -89,6 +89,7 @@ void* sdbGetTableByRid(int64_t rid); bool sdbIsMaster(); bool sdbIsServing(); void sdbUpdateMnodeRoles(); +int32_t sdbGetReplicaNum(); int32_t sdbInsertRow(SSdbRow *pRow); int32_t sdbDeleteRow(SSdbRow *pRow); diff --git a/src/mnode/src/mnodeMnode.c b/src/mnode/src/mnodeMnode.c index d20d51f82b3a86506610e157203b2beb05a4373d..68acae7dec0e5505e72733a638abcc142eeeb407 100644 --- a/src/mnode/src/mnodeMnode.c +++ b/src/mnode/src/mnodeMnode.c @@ -34,14 +34,14 @@ #include "mnodeUser.h" #include "mnodeVgroup.h" -int64_t tsMnodeRid = -1; -static void * tsMnodeSdb = NULL; -static int32_t tsMnodeUpdateSize = 0; -static SRpcEpSet tsMnodeEpSetForShell; -static SRpcEpSet tsMnodeEpSetForPeer; -static SMnodeInfos tsMnodeInfos; -static int32_t mnodeGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); -static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); +int64_t tsMnodeRid = -1; +static void * tsMnodeSdb = NULL; +static int32_t tsMnodeUpdateSize = 0; +static SRpcEpSet tsMEpForShell; +static SRpcEpSet tsMEpForPeer; +static SMInfos tsMInfos; +static int32_t mnodeGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); +static int32_t mnodeRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); #if defined(LINUX) static pthread_rwlock_t tsMnodeLock; @@ -127,7 +127,7 @@ static int32_t mnodeMnodeActionRestored() { mnodeCancelGetNextMnode(pIter); } - mnodeUpdateMnodeEpSet(); + mnodeUpdateMnodeEpSet(NULL); return TSDB_CODE_SUCCESS; } @@ -199,106 +199,130 @@ void mnodeCancelGetNextMnode(void *pIter) { sdbFreeIter(tsMnodeSdb, pIter); } -void mnodeUpdateMnodeEpSet() { - mInfo("update mnodes epSet, numOfEps:%d ", mnodeGetMnodesNum()); +void mnodeUpdateMnodeEpSet(SMInfos *pMinfos) { + bool set = false; + SMInfos mInfos = {0}; + mInfo("vgId:1, update mnodes epSet, numOfMnodes:%d pMinfos:%p", mnodeGetMnodesNum(), pMinfos); - mnodeMnodeWrLock(); - - memset(&tsMnodeEpSetForShell, 0, sizeof(SRpcEpSet)); - memset(&tsMnodeEpSetForPeer, 0, sizeof(SRpcEpSet)); - memset(&tsMnodeInfos, 0, sizeof(SMnodeInfos)); - - int32_t index = 0; - void * pIter = NULL; - while (1) { - SMnodeObj *pMnode = NULL; - pIter = mnodeGetNextMnode(pIter, &pMnode); - if (pMnode == NULL) break; - - SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId); - if (pDnode != NULL) { - strcpy(tsMnodeEpSetForShell.fqdn[index], pDnode->dnodeFqdn); - tsMnodeEpSetForShell.port[index] = htons(pDnode->dnodePort); - mDebug("mnode:%d, for shell fqdn:%s %d", pDnode->dnodeId, tsMnodeEpSetForShell.fqdn[index], htons(tsMnodeEpSetForShell.port[index])); - - strcpy(tsMnodeEpSetForPeer.fqdn[index], pDnode->dnodeFqdn); - tsMnodeEpSetForPeer.port[index] = htons(pDnode->dnodePort + TSDB_PORT_DNODEDNODE); - mDebug("mnode:%d, for peer fqdn:%s %d", pDnode->dnodeId, tsMnodeEpSetForPeer.fqdn[index], htons(tsMnodeEpSetForPeer.port[index])); - - tsMnodeInfos.mnodeInfos[index].mnodeId = htonl(pMnode->mnodeId); - strcpy(tsMnodeInfos.mnodeInfos[index].mnodeEp, pDnode->dnodeEp); - - if (pMnode->role == TAOS_SYNC_ROLE_MASTER) { - tsMnodeEpSetForShell.inUse = index; - tsMnodeEpSetForPeer.inUse = index; - tsMnodeInfos.inUse = index; + if (pMinfos != NULL) { + set = true; + mInfos = *pMinfos; + } + else { + int32_t index = 0; + void * pIter = NULL; + while (1) { + SMnodeObj *pMnode = NULL; + pIter = mnodeGetNextMnode(pIter, &pMnode); + if (pMnode == NULL) break; + + SDnodeObj *pDnode = mnodeGetDnode(pMnode->mnodeId); + if (pDnode != NULL) { + set = true; + mInfos.mnodeInfos[index].mnodeId = pMnode->mnodeId; + strcpy(mInfos.mnodeInfos[index].mnodeEp, pDnode->dnodeEp); + if (pMnode->role == TAOS_SYNC_ROLE_MASTER) mInfos.inUse = index; + index++; + } else { + set = false; } - mInfo("mnode:%d, ep:%s %s", pDnode->dnodeId, pDnode->dnodeEp, pMnode->role == TAOS_SYNC_ROLE_MASTER ? "master" : ""); - index++; + mnodeDecDnodeRef(pDnode); + mnodeDecMnodeRef(pMnode); } - mnodeDecDnodeRef(pDnode); - mnodeDecMnodeRef(pMnode); + mInfos.mnodeNum = index; + if (mInfos.mnodeNum < sdbGetReplicaNum()) { + set = false; + mDebug("vgId:1, mnodes info not synced, current:%d syncCfgNum:%d", mInfos.mnodeNum, sdbGetReplicaNum()); + } } - tsMnodeInfos.mnodeNum = index; - tsMnodeEpSetForShell.numOfEps = index; - tsMnodeEpSetForPeer.numOfEps = index; + mnodeMnodeWrLock(); + + if (set) { + memset(&tsMEpForShell, 0, sizeof(SRpcEpSet)); + memset(&tsMEpForPeer, 0, sizeof(SRpcEpSet)); + memcpy(&tsMInfos, &mInfos, sizeof(SMInfos)); + tsMEpForShell.inUse = tsMInfos.inUse; + tsMEpForPeer.inUse = tsMInfos.inUse; + tsMEpForShell.numOfEps = tsMInfos.mnodeNum; + tsMEpForPeer.numOfEps = tsMInfos.mnodeNum; + + mInfo("vgId:1, mnodes epSet is set, num:%d inUse:%d", tsMInfos.mnodeNum, tsMInfos.inUse); + for (int index = 0; index < mInfos.mnodeNum; ++index) { + SMInfo *pInfo = &tsMInfos.mnodeInfos[index]; + taosGetFqdnPortFromEp(pInfo->mnodeEp, tsMEpForShell.fqdn[index], &tsMEpForShell.port[index]); + taosGetFqdnPortFromEp(pInfo->mnodeEp, tsMEpForPeer.fqdn[index], &tsMEpForPeer.port[index]); + tsMEpForPeer.port[index] = tsMEpForPeer.port[index] + TSDB_PORT_DNODEDNODE; + + mInfo("vgId:1, mnode:%d, fqdn:%s shell:%u peer:%u", pInfo->mnodeId, tsMEpForShell.fqdn[index], + tsMEpForShell.port[index], tsMEpForPeer.port[index]); + + tsMEpForShell.port[index] = htons(tsMEpForShell.port[index]); + tsMEpForPeer.port[index] = htons(tsMEpForPeer.port[index]); + pInfo->mnodeId = htonl(pInfo->mnodeId); + } + } else { + mInfo("vgId:1, mnodes epSet not set, num:%d inUse:%d", tsMInfos.mnodeNum, tsMInfos.inUse); + for (int index = 0; index < tsMInfos.mnodeNum; ++index) { + mInfo("vgId:1, index:%d, ep:%s:%u", index, tsMEpForShell.fqdn[index], htons(tsMEpForShell.port[index])); + } + } mnodeMnodeUnLock(); } void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet) { mnodeMnodeRdLock(); - *epSet = tsMnodeEpSetForPeer; + *epSet = tsMEpForPeer; mnodeMnodeUnLock(); + mTrace("vgId:1, mnodes epSet for peer is returned, num:%d inUse:%d", tsMEpForPeer.numOfEps, tsMEpForPeer.inUse); for (int32_t i = 0; i < epSet->numOfEps; ++i) { if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort + TSDB_PORT_DNODEDNODE) { epSet->inUse = (i + 1) % epSet->numOfEps; - mTrace("mnode:%d, for peer ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse); + mTrace("vgId:1, mnode:%d, for peer ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse); } else { - mTrace("mpeer:%d, for peer ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i])); + mTrace("vgId:1, mpeer:%d, for peer ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i])); } } } void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet) { mnodeMnodeRdLock(); - *epSet = tsMnodeEpSetForShell; + *epSet = tsMEpForShell; mnodeMnodeUnLock(); + mTrace("vgId:1, mnodes epSet for shell is returned, num:%d inUse:%d", tsMEpForShell.numOfEps, tsMEpForShell.inUse); for (int32_t i = 0; i < epSet->numOfEps; ++i) { if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) { epSet->inUse = (i + 1) % epSet->numOfEps; - mTrace("mnode:%d, for shell ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse); + mTrace("vgId:1, mnode:%d, for shell ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse); } else { - mTrace("mnode:%d, for shell ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i])); + mTrace("vgId:1, mnode:%d, for shell ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i])); } } } char* mnodeGetMnodeMasterEp() { - return tsMnodeInfos.mnodeInfos[tsMnodeInfos.inUse].mnodeEp; + return tsMInfos.mnodeInfos[tsMInfos.inUse].mnodeEp; } -void mnodeGetMnodeInfos(void *mnodeInfos) { +void mnodeGetMnodeInfos(void *pMinfos) { mnodeMnodeRdLock(); - *(SMnodeInfos *)mnodeInfos = tsMnodeInfos; + *(SMInfos *)pMinfos = tsMInfos; mnodeMnodeUnLock(); } static int32_t mnodeSendCreateMnodeMsg(int32_t dnodeId, char *dnodeEp) { - mDebug("dnode:%d, send create mnode msg to dnode %s", dnodeId, dnodeEp); - SCreateMnodeMsg *pCreate = rpcMallocCont(sizeof(SCreateMnodeMsg)); if (pCreate == NULL) { return TSDB_CODE_MND_OUT_OF_MEMORY; } else { pCreate->dnodeId = htonl(dnodeId); tstrncpy(pCreate->dnodeEp, dnodeEp, sizeof(pCreate->dnodeEp)); - pCreate->mnodes = tsMnodeInfos; + mnodeGetMnodeInfos(&pCreate->mnodes); bool found = false; for (int i = 0; i < pCreate->mnodes.mnodeNum; ++i) { if (pCreate->mnodes.mnodeInfos[i].mnodeId == htonl(dnodeId)) { @@ -312,6 +336,11 @@ static int32_t mnodeSendCreateMnodeMsg(int32_t dnodeId, char *dnodeEp) { } } + mDebug("dnode:%d, send create mnode msg to dnode %s, numOfMnodes:%d", dnodeId, dnodeEp, pCreate->mnodes.mnodeNum); + for (int32_t i = 0; i < pCreate->mnodes.mnodeNum; ++i) { + mDebug("index:%d, mnodeId:%d ep:%s", i, pCreate->mnodes.mnodeInfos[i].mnodeId, pCreate->mnodes.mnodeInfos[i].mnodeEp); + } + SRpcMsg rpcMsg = {0}; rpcMsg.pCont = pCreate; rpcMsg.contLen = sizeof(SCreateMnodeMsg); @@ -336,7 +365,7 @@ static int32_t mnodeCreateMnodeCb(SMnodeMsg *pMsg, int32_t code) { mError("failed to create mnode, reason:%s", tstrerror(code)); } else { mDebug("mnode is created successfully"); - mnodeUpdateMnodeEpSet(); + mnodeUpdateMnodeEpSet(NULL); sdbUpdateAsync(); } @@ -380,7 +409,7 @@ void mnodeDropMnodeLocal(int32_t dnodeId) { mnodeDecMnodeRef(pMnode); } - mnodeUpdateMnodeEpSet(); + mnodeUpdateMnodeEpSet(NULL); sdbUpdateAsync(); } @@ -400,7 +429,7 @@ int32_t mnodeDropMnode(int32_t dnodeId) { sdbDecRef(tsMnodeSdb, pMnode); - mnodeUpdateMnodeEpSet(); + mnodeUpdateMnodeEpSet(NULL); sdbUpdateAsync(); return code; diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 2ef758baf19b3f4196699553db97cc1f954794f7..6cc4e097350783a93300c46adbd5f68faf43e143 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -224,11 +224,13 @@ void sdbUpdateMnodeRoles() { sdbInfo("vgId:1, mnode:%d, role:%s", pMnode->mnodeId, syncRole[pMnode->role]); if (pMnode->mnodeId == dnodeGetDnodeId()) tsSdbMgmt.role = pMnode->role; mnodeDecMnodeRef(pMnode); + } else { + sdbDebug("vgId:1, mnode:%d not found", roles.nodeId[i]); } } mnodeUpdateClusterId(); - mnodeUpdateMnodeEpSet(); + mnodeUpdateMnodeEpSet(NULL); } static uint32_t sdbGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion) { @@ -308,18 +310,20 @@ void sdbUpdateAsync() { } void sdbUpdateSync(void *pMnodes) { - SMnodeInfos *mnodes = pMnodes; + SMInfos *pMinfos = pMnodes; if (!mnodeIsRunning()) { mDebug("vgId:1, mnode not start yet, update sync config later"); return; } - mDebug("vgId:1, update sync config in sync module, mnodes:%p", pMnodes); + mDebug("vgId:1, update sync config, pMnodes:%p", pMnodes); SSyncCfg syncCfg = {0}; int32_t index = 0; - if (mnodes == NULL) { + if (pMinfos == NULL) { + mDebug("vgId:1, mInfos not input, use mInfos in sdb, numOfMnodes:%d", syncCfg.replica); + void *pIter = NULL; while (1) { SMnodeObj *pMnode = NULL; @@ -339,16 +343,17 @@ void sdbUpdateSync(void *pMnodes) { mnodeDecMnodeRef(pMnode); } syncCfg.replica = index; - mDebug("vgId:1, mnodes info not input, use infos in sdb, numOfMnodes:%d", syncCfg.replica); } else { - for (index = 0; index < mnodes->mnodeNum; ++index) { - SMnodeInfo *node = &mnodes->mnodeInfos[index]; + mDebug("vgId:1, mInfos input, numOfMnodes:%d", pMinfos->mnodeNum); + + for (index = 0; index < pMinfos->mnodeNum; ++index) { + SMInfo *node = &pMinfos->mnodeInfos[index]; syncCfg.nodeInfo[index].nodeId = node->mnodeId; taosGetFqdnPortFromEp(node->mnodeEp, syncCfg.nodeInfo[index].nodeFqdn, &syncCfg.nodeInfo[index].nodePort); syncCfg.nodeInfo[index].nodePort += TSDB_PORT_SYNC; } syncCfg.replica = index; - mDebug("vgId:1, mnodes info input, numOfMnodes:%d", syncCfg.replica); + mnodeUpdateMnodeEpSet(pMnodes); } syncCfg.quorum = (syncCfg.replica == 1) ? 1 : 2; @@ -1103,3 +1108,7 @@ static void *sdbWorkerFp(void *pWorker) { return NULL; } + +int32_t sdbGetReplicaNum() { + return tsSdbMgmt.cfg.replica; +} \ No newline at end of file diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index adac532f2d4d9ad9f15459d8d406c196d0de0308..b73ca27ce977884638f7726e5c16916537234c2a 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -548,7 +548,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { pPeer->pSyncNode = pNode; pPeer->refCount = 1; - sInfo("%s, it is configured", pPeer->id); + sInfo("%s, it is configured, ep:%s:%u", pPeer->id, pPeer->fqdn, pPeer->port); int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn); if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) { int32_t checkMs = 100 + (pNode->vgId * 10) % 100; @@ -1134,7 +1134,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { pPeer = (i < pNode->replica) ? pNode->peerInfo[i] : NULL; if (pPeer == NULL) { - sError("vgId:%d, peer:%s not configured", pNode->vgId, firstPkt.fqdn); + sError("vgId:%d, peer:%s:%u not configured", pNode->vgId, firstPkt.fqdn, firstPkt.port); taosCloseSocket(connFd); // syncSendVpeerCfgMsg(sync); } else { diff --git a/tests/script/general/parser/groupby.sim b/tests/script/general/parser/groupby.sim index 19d9ae84cbc02e179c2ab060082507529b2bb608..fbe4345a21168fe6e22c947451d8ee80304311f9 100644 --- a/tests/script/general/parser/groupby.sim +++ b/tests/script/general/parser/groupby.sim @@ -606,6 +606,44 @@ sql insert into t1 values ('2020-03-27 04:21:16.000', 1)('2020-03-27 04:31:17.00 sql insert into t2 values ('2020-03-27 04:11:16.000', 1)('2020-03-27 04:11:17.000', 2) ('2020-03-27 04:11:18.000', 3) ('2020-03-27 04:11:19.000', 4) ; sql insert into t2 values ('2020-03-27 04:21:16.000', 1)('2020-03-27 04:31:17.000', 2) ('2020-03-27 04:51:18.000', 3) ('2020-03-27 05:10:19.000', 4) ; +print =================>td-2236 +sql select first(ts),last(ts) from t1 group by c; +if $rows != 4 then + return -1 +endi + +if $data00 != @20-03-27 04:11:16.000@ then + return -1 +endi + +if $data01 != @20-03-27 04:21:16.000@ then + return -1 +endi + +if $data10 != @20-03-27 04:11:17.000@ then + return -1 +endi + +if $data11 != @20-03-27 04:31:17.000@ then + return -1 +endi + +if $data20 != @20-03-27 04:11:18.000@ then + return -1 +endi + +if $data21 != @20-03-27 04:51:18.000@ then + return -1 +endi + +if $data30 != @20-03-27 04:11:19.000@ then + return -1 +endi + +if $data31 != @20-03-27 05:10:19.000@ then + return -1 +endi + #sql select irate(c) from st where t1="1" and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' interval(1m) sliding(15s) group by tbname,t1,t2; #if $rows != 40 then # return -1