diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in index 73409a950a848662afc87a14b9f4c96dec4a625d..ea4d48a2b14cd0766e8362ac2bd519b448f733b0 100644 --- a/cmake/taostools_CMakeLists.txt.in +++ b/cmake/taostools_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taos-tools ExternalProject_Add(taos-tools GIT_REPOSITORY https://github.com/taosdata/taos-tools.git - GIT_TAG 4d02980 + GIT_TAG c64858f SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index cdf977bea328a0bdf35789f24edb0addce46e087..8680f93f8c9f0cb68a18f8dbcb9b4c0d03dd622d 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -194,6 +194,10 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) { SUseDbRsp usedbRsp = {0}; tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp); + if(strlen(usedbRsp.db) == 0){ + return TSDB_CODE_MND_DB_NOT_EXIST; + } + SName name = {0}; tNameFromString(&name, usedbRsp.db, T_NAME_ACCT | T_NAME_DB); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 5fb23b045c92e27da10486bb54f9343bb23bc81d..8314601df3d7a8de1f37757832162c1c6cc3379b 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -603,22 +603,27 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { return 0; } if (mndAcquireRpc(pMsg->info.node) == 0) return 0; + + SMnode *pMnode = pMsg->info.node; + const char *role = syncGetMyRoleStr(pMnode->syncMgmt.sync); + bool restored = syncIsRestoreFinish(pMnode->syncMgmt.sync); if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER || pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER || pMsg->msgType == TDMT_MND_UPTIME_TIMER) { + mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored, + pMnode->stopped, restored, role); return -1; } - SEpSet epSet = {0}; - SMnode *pMnode = pMsg->info.node; + const STraceId *trace = &pMsg->info.traceId; + SEpSet epSet = {0}; mndGetMnodeEpSet(pMnode, &epSet); - const STraceId *trace = &pMsg->info.traceId; mDebug( - "msg:%p, failed to check mnode state since %s, mnode restored:%d stopped:%d, sync restored:%d role:%s type:%s " - "numOfEps:%d inUse:%d", - pMsg, terrstr(), pMnode->restored, pMnode->stopped, syncIsRestoreFinish(pMnode->syncMgmt.sync), - syncGetMyRoleStr(pMnode->syncMgmt.sync), TMSG_INFO(pMsg->msgType), epSet.numOfEps, epSet.inUse); + "msg:%p, type:%s failed to process since %s, mnode restored:%d stopped:%d, sync restored:%d " + "role:%s, redirect numOfEps:%d inUse:%d", + pMsg, TMSG_INFO(pMsg->msgType), terrstr(), pMnode->restored, pMnode->stopped, restored, role, epSet.numOfEps, + epSet.inUse); if (epSet.numOfEps > 0) { for (int32_t i = 0; i < epSet.numOfEps; ++i) { diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index c27241317c2f2afa05d4744bc8e4c6e1ceb104f9..f25d436aee08ce9578ad86cc9dfc3bb4ef47fa14 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -394,7 +394,6 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER; if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER; if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER; - if (mndTransAppendNullLog(pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = 0; @@ -478,7 +477,6 @@ static int32_t mndSetDropMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeO static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) { SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; - int32_t numOfReplicas = 0; SDDropMnodeReq dropReq = {0}; SEpSet dropEpSet = {0}; @@ -505,9 +503,8 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode int32_t mndSetDropMnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { if (pObj == NULL) return 0; - if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) return -1; if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) return -1; - if (mndTransAppendNullLog(pTrans) != 0) return -1; + if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) return -1; return 0; } @@ -715,7 +712,8 @@ static void mndReloadSyncConfig(SMnode *pMnode) { SMnodeObj *pObj = NULL; ESdbStatus objStatus = 0; void *pIter = NULL; - bool hasUpdatingMnode = false; + int32_t updatingMnodes = 0; + int32_t readyMnodes = 0; SSyncCfg cfg = {.myIndex = -1}; while (1) { @@ -723,7 +721,11 @@ static void mndReloadSyncConfig(SMnode *pMnode) { if (pIter == NULL) break; if (objStatus == SDB_STATUS_CREATING || objStatus == SDB_STATUS_DROPPING) { mInfo("vgId:1, has updating mnode:%d, status:%s", pObj->id, sdbStatusName(objStatus)); - hasUpdatingMnode = true; + updatingMnodes++; + } + if (objStatus == SDB_STATUS_READY) { + mInfo("vgId:1, has ready mnode:%d, status:%s", pObj->id, sdbStatusName(objStatus)); + readyMnodes++; } if (objStatus == SDB_STATUS_READY || objStatus == SDB_STATUS_CREATING) { @@ -739,18 +741,24 @@ static void mndReloadSyncConfig(SMnode *pMnode) { sdbReleaseLock(pSdb, pObj, false); } - if (cfg.myIndex == -1) { - mInfo("vgId:1, mnode not reload since selfIndex is -1"); + if (readyMnodes <= 0 || updatingMnodes <= 0) { + mInfo("vgId:1, mnode sync not reconfig since readyMnodes:%d updatingMnodes:%d", readyMnodes, updatingMnodes); return; } - if (!mndGetRestored(pMnode)) { - mInfo("vgId:1, mnode not reload since restore not finished"); + if (cfg.myIndex == -1) { +#if 1 + mInfo("vgId:1, mnode sync not reconfig since selfIndex is -1"); +#else + // cannot reconfig because the leader may fail to elect after reboot + mInfo("vgId:1, mnode sync not reconfig since selfIndex is -1, do sync stop oper"); + syncStop(pMnode->syncMgmt.sync); +#endif return; } - if (hasUpdatingMnode) { - mInfo("vgId:1, start to reload mnode sync, replica:%d myIndex:%d", cfg.replicaNum, cfg.myIndex); + if (updatingMnodes > 0) { + mInfo("vgId:1, mnode sync reconfig, replica:%d myIndex:%d", cfg.replicaNum, cfg.myIndex); for (int32_t i = 0; i < cfg.replicaNum; ++i) { SNodeInfo *pNode = &cfg.nodeInfo[i]; mInfo("vgId:1, index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort); diff --git a/source/dnode/mnode/sdb/inc/sdb.h b/source/dnode/mnode/sdb/inc/sdb.h index be93bd2f59da8c54c467ae90eadf6df593a087d4..a2373a55ce095150598e0825e4f7ce80efb1c8a5 100644 --- a/source/dnode/mnode/sdb/inc/sdb.h +++ b/source/dnode/mnode/sdb/inc/sdb.h @@ -403,6 +403,10 @@ const char *sdbStatusName(ESdbStatus status); void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper); int32_t sdbGetIdFromRaw(SSdb *pSdb, SSdbRaw *pRaw); +void sdbWriteLock(SSdb *pSdb, int32_t type); +void sdbReadLock(SSdb *pSdb, int32_t type); +void sdbUnLock(SSdb *pSdb, int32_t type); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 5393c42da33a45629d15f02df803135e853185d7..e73fd28e71258adc85c9a2822b8a6b810bafbcdc 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -181,3 +181,23 @@ void sdbGetCommitInfo(SSdb *pSdb, int64_t *index, int64_t *term, int64_t *config pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig, *index, *term, *config); #endif } + +void sdbWriteLock(SSdb *pSdb, int32_t type) { + TdThreadRwlock *pLock = &pSdb->locks[type]; + // mTrace("sdb table:%d start write lock:%p", type, pLock); + taosThreadRwlockWrlock(pLock); + // mTrace("sdb table:%d stop write lock:%p", type, pLock); +} + +void sdbReadLock(SSdb *pSdb, int32_t type) { + TdThreadRwlock *pLock = &pSdb->locks[type]; + // mTrace("sdb table:%d start read lock:%p", type, pLock); + taosThreadRwlockRdlock(pLock); + // mTrace("sdb table:%d stop read lock:%p", type, pLock); +} + +void sdbUnLock(SSdb *pSdb, int32_t type) { + TdThreadRwlock *pLock = &pSdb->locks[type]; + // mTrace("sdb table:%d unlock:%p", type, pLock); + taosThreadRwlockUnlock(pLock); +} diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index 18dbea150b2fd38ef84d067cb73546baa95438cc..5eedcc545a526dc1bb657f0173c54a1f03440de2 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -363,9 +363,8 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { mInfo("write %s to sdb file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i)); - SHashObj *hash = pSdb->hashObjs[i]; - TdThreadRwlock *pLock = &pSdb->locks[i]; - taosThreadRwlockWrlock(pLock); + SHashObj *hash = pSdb->hashObjs[i]; + sdbWriteLock(pSdb, i); SSdbRow **ppRow = taosHashIterate(hash, NULL); while (ppRow != NULL) { @@ -410,7 +409,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { sdbFreeRaw(pRaw); ppRow = taosHashIterate(hash, ppRow); } - taosThreadRwlockUnlock(pLock); + sdbUnLock(pSdb, i); } if (code == 0) { diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index 0fee0fbca17685e6ee7b2a106accf87ba0d2b056..85e937fcc34995451e9e6f9565de8b48d79aca44 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -133,12 +133,12 @@ static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, const void *pKey) { } static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) { - TdThreadRwlock *pLock = &pSdb->locks[pRow->type]; - taosThreadRwlockWrlock(pLock); + int32_t type = pRow->type; + sdbWriteLock(pSdb, type); SSdbRow *pOldRow = taosHashGet(hash, pRow->pObj, keySize); if (pOldRow != NULL) { - taosThreadRwlockUnlock(pLock); + sdbUnLock(pSdb, type); sdbFreeRow(pSdb, pRow, false); terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE; return terrno; @@ -149,7 +149,7 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * sdbPrintOper(pSdb, pRow, "insert"); if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) { - taosThreadRwlockUnlock(pLock); + sdbUnLock(pSdb, type); sdbFreeRow(pSdb, pRow, false); terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno; @@ -164,12 +164,12 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * taosHashRemove(hash, pRow->pObj, keySize); sdbFreeRow(pSdb, pRow, false); terrno = code; - taosThreadRwlockUnlock(pLock); + sdbUnLock(pSdb, type); return terrno; } } - taosThreadRwlockUnlock(pLock); + sdbUnLock(pSdb, type); if (pSdb->keyTypes[pRow->type] == SDB_KEY_INT32) { pSdb->maxId[pRow->type] = TMAX(pSdb->maxId[pRow->type], *((int32_t *)pRow->pObj)); @@ -183,26 +183,27 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * } static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pNewRow, int32_t keySize) { - TdThreadRwlock *pLock = &pSdb->locks[pNewRow->type]; - taosThreadRwlockWrlock(pLock); + int32_t type = pNewRow->type; + sdbWriteLock(pSdb, type); SSdbRow **ppOldRow = taosHashGet(hash, pNewRow->pObj, keySize); if (ppOldRow == NULL || *ppOldRow == NULL) { - taosThreadRwlockUnlock(pLock); + sdbUnLock(pSdb, type); return sdbInsertRow(pSdb, hash, pRaw, pNewRow, keySize); } SSdbRow *pOldRow = *ppOldRow; pOldRow->status = pRaw->status; sdbPrintOper(pSdb, pOldRow, "update"); + sdbUnLock(pSdb, type); int32_t code = 0; - SdbUpdateFp updateFp = pSdb->updateFps[pNewRow->type]; + SdbUpdateFp updateFp = pSdb->updateFps[type]; if (updateFp != NULL) { code = (*updateFp)(pSdb, pOldRow->pObj, pNewRow->pObj); } - taosThreadRwlockUnlock(pLock); + // sdbUnLock(pSdb, type); sdbFreeRow(pSdb, pNewRow, false); pSdb->tableVer[pOldRow->type]++; @@ -210,12 +211,12 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * } static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) { - TdThreadRwlock *pLock = &pSdb->locks[pRow->type]; - taosThreadRwlockWrlock(pLock); + int32_t type = pRow->type; + sdbWriteLock(pSdb, type); SSdbRow **ppOldRow = taosHashGet(hash, pRow->pObj, keySize); if (ppOldRow == NULL || *ppOldRow == NULL) { - taosThreadRwlockUnlock(pLock); + sdbUnLock(pSdb, type); sdbFreeRow(pSdb, pRow, false); terrno = TSDB_CODE_SDB_OBJ_NOT_THERE; return terrno; @@ -228,7 +229,7 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * taosHashRemove(hash, pOldRow->pObj, keySize); pSdb->tableVer[pOldRow->type]++; - taosThreadRwlockUnlock(pLock); + sdbUnLock(pSdb, type); sdbFreeRow(pSdb, pRow, false); @@ -282,12 +283,11 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) { void *pRet = NULL; int32_t keySize = sdbGetkeySize(pSdb, type, pKey); - TdThreadRwlock *pLock = &pSdb->locks[type]; - taosThreadRwlockRdlock(pLock); + sdbReadLock(pSdb, type); SSdbRow **ppRow = taosHashGet(hash, pKey, keySize); if (ppRow == NULL || *ppRow == NULL) { - taosThreadRwlockUnlock(pLock); + sdbUnLock(pSdb, type); terrno = TSDB_CODE_SDB_OBJ_NOT_THERE; return NULL; } @@ -310,13 +310,13 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) { break; } - taosThreadRwlockUnlock(pLock); + sdbUnLock(pSdb, type); return pRet; } static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) { - TdThreadRwlock *pLock = &pSdb->locks[pRow->type]; - taosThreadRwlockWrlock(pLock); + int32_t type = pRow->type; + sdbWriteLock(pSdb, type); int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1); sdbPrintOper(pSdb, pRow, "check"); @@ -324,7 +324,7 @@ static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) { sdbFreeRow(pSdb, pRow, true); } - taosThreadRwlockUnlock(pLock); + sdbUnLock(pSdb, type); } void sdbReleaseLock(SSdb *pSdb, void *pObj, bool lock) { @@ -333,9 +333,9 @@ void sdbReleaseLock(SSdb *pSdb, void *pObj, bool lock) { SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow)); if (pRow->type >= SDB_MAX) return; - TdThreadRwlock *pLock = &pSdb->locks[pRow->type]; + int32_t type = pRow->type; if (lock) { - taosThreadRwlockWrlock(pLock); + sdbWriteLock(pSdb, type); } int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1); @@ -345,7 +345,7 @@ void sdbReleaseLock(SSdb *pSdb, void *pObj, bool lock) { } if (lock) { - taosThreadRwlockUnlock(pLock); + sdbUnLock(pSdb, type); } } @@ -357,8 +357,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) { SHashObj *hash = sdbGetHash(pSdb, type); if (hash == NULL) return NULL; - TdThreadRwlock *pLock = &pSdb->locks[type]; - taosThreadRwlockRdlock(pLock); + sdbReadLock(pSdb, type); SSdbRow **ppRow = taosHashIterate(hash, pIter); while (ppRow != NULL) { @@ -373,7 +372,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) { *ppObj = pRow->pObj; break; } - taosThreadRwlockUnlock(pLock); + sdbUnLock(pSdb, type); return ppRow; } @@ -384,9 +383,8 @@ void *sdbFetchAll(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj, ESdbStat SHashObj *hash = sdbGetHash(pSdb, type); if (hash == NULL) return NULL; - TdThreadRwlock *pLock = &pSdb->locks[type]; if (lock) { - taosThreadRwlockRdlock(pLock); + sdbReadLock(pSdb, type); } SSdbRow **ppRow = taosHashIterate(hash, pIter); @@ -404,7 +402,7 @@ void *sdbFetchAll(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj, ESdbStat break; } if (lock) { - taosThreadRwlockUnlock(pLock); + sdbUnLock(pSdb, type); } return ppRow; @@ -416,18 +414,17 @@ void sdbCancelFetch(SSdb *pSdb, void *pIter) { SHashObj *hash = sdbGetHash(pSdb, pRow->type); if (hash == NULL) return; - TdThreadRwlock *pLock = &pSdb->locks[pRow->type]; - taosThreadRwlockRdlock(pLock); + int32_t type = pRow->type; + sdbReadLock(pSdb, type); taosHashCancelIterate(hash, pIter); - taosThreadRwlockUnlock(pLock); + sdbUnLock(pSdb, type); } void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2, void *p3) { SHashObj *hash = sdbGetHash(pSdb, type); if (hash == NULL) return; - TdThreadRwlock *pLock = &pSdb->locks[type]; - taosThreadRwlockRdlock(pLock); + sdbReadLock(pSdb, type); SSdbRow **ppRow = taosHashIterate(hash, NULL); while (ppRow != NULL) { @@ -443,17 +440,16 @@ void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2 ppRow = taosHashIterate(hash, ppRow); } - taosThreadRwlockUnlock(pLock); + sdbUnLock(pSdb, type); } int32_t sdbGetSize(SSdb *pSdb, ESdbType type) { SHashObj *hash = sdbGetHash(pSdb, type); if (hash == NULL) return 0; - TdThreadRwlock *pLock = &pSdb->locks[type]; - taosThreadRwlockRdlock(pLock); + sdbReadLock(pSdb, type); int32_t size = taosHashGetSize(hash); - taosThreadRwlockUnlock(pLock); + sdbUnLock(pSdb, type); return size; } @@ -465,9 +461,7 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type) { if (pSdb->keyTypes[type] != SDB_KEY_INT32) return -1; int32_t maxId = 0; - - TdThreadRwlock *pLock = &pSdb->locks[type]; - taosThreadRwlockRdlock(pLock); + sdbReadLock(pSdb, type); SSdbRow **ppRow = taosHashIterate(hash, NULL); while (ppRow != NULL) { @@ -477,8 +471,7 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type) { ppRow = taosHashIterate(hash, ppRow); } - taosThreadRwlockUnlock(pLock); - + sdbUnLock(pSdb, type); maxId = TMAX(maxId, pSdb->maxId[type]); return maxId + 1; } diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 631ef09d4bc0e1db0835123bfb53e37b1ee7be72..efd0220a7ec30a4e8615ebecd6f3847ab858332d 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -1059,7 +1059,7 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) { if (param->val == NULL) { metaError("vgId:%d, failed to filter NULL data", TD_VID(pMeta->pVnode)); - return -1; + goto END; } else { if (IS_VAR_DATA_TYPE(param->type)) { tagData = varDataVal(param->val); @@ -1111,27 +1111,25 @@ int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) { break; } } - if (p->suid != pKey->suid) { + if (p == NULL || p->suid != pKey->suid) { break; } first = false; - if (p != NULL) { - int32_t cmp = (*param->filterFunc)(p->data, pKey->data, pKey->type); - if (cmp == 0) { - // match - tb_uid_t tuid = 0; - if (IS_VAR_DATA_TYPE(pKey->type)) { - tuid = *(tb_uid_t *)(p->data + varDataTLen(p->data)); - } else { - tuid = *(tb_uid_t *)(p->data + tDataTypes[pCursor->type].bytes); - } - taosArrayPush(pUids, &tuid); - } else if (cmp == 1) { - // not match but should continue to iter + int32_t cmp = (*param->filterFunc)(p->data, pKey->data, pKey->type); + if (cmp == 0) { + // match + tb_uid_t tuid = 0; + if (IS_VAR_DATA_TYPE(pKey->type)) { + tuid = *(tb_uid_t *)(p->data + varDataTLen(p->data)); } else { - // not match and no more result - break; + tuid = *(tb_uid_t *)(p->data + tDataTypes[pCursor->type].bytes); } + taosArrayPush(pUids, &tuid); + } else if (cmp == 1) { + // not match but should continue to iter + } else { + // not match and no more result + break; } valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur); if (valid < 0) { diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 2b7982d381828b6ccb458b7a11d905cf3fc13508..20e0fcdb5bf384c872e427e1875e1d4b5fe62d96 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -116,9 +116,10 @@ static int metaSaveJsonVarToIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const indexMultiTermAdd(terms, term); } } - taosArrayDestroy(pTagVals); indexJsonPut(pMeta->pTagIvtIdx, terms, tuid); indexMultiTermDestroy(terms); + + taosArrayDestroy(pTagVals); #endif return 0; } @@ -159,6 +160,7 @@ int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSche memcpy(val, (uint16_t *)&len, VARSTR_HEADER_SIZE); type = TSDB_DATA_TYPE_VARCHAR; term = indexTermCreate(suid, DEL_VALUE, type, key, nKey, val, len); + taosMemoryFree(val); } else if (pTagVal->nData == 0) { term = indexTermCreate(suid, DEL_VALUE, TSDB_DATA_TYPE_VARCHAR, key, nKey, pTagVal->pData, 0); } @@ -177,6 +179,7 @@ int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, const SSche } indexJsonPut(pMeta->pTagIvtIdx, terms, tuid); indexMultiTermDestroy(terms); + taosArrayDestroy(pTagVals); #endif return 0; } diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index 9605528ad6ae150fd88f512cdf5344b81d486a99..4e7c0a0ef79d7f3f9a862894fd4ed3d5bc845a0c 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -138,7 +138,7 @@ void idxReleaseRef(int64_t ref); #define IDX_TYPE_ADD_EXTERN_TYPE(ty, exTy) \ do { \ uint8_t oldTy = ty; \ - ty = (ty >> 4) | exTy; \ + ty = ((ty >> 4) & 0xFF) | exTy; \ ty = (ty << 4) | oldTy; \ } while (0) diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index cb671cfff99e67ca9098f48d39d20448a54a1d19..76dc84ae42a3d38bea29b030be12a240bd3f510d 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -139,7 +139,7 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { END: if (idx != NULL) { - indexClose(idx); + indexDestroy(idx); } *index = NULL; return ret; diff --git a/source/libs/index/src/indexCache.c b/source/libs/index/src/indexCache.c index 72f693f7e518f00a7a1982598f7d97bca949056c..0bb454571adebae9695ae07cf7ff6af4efd9e95f 100644 --- a/source/libs/index/src/indexCache.c +++ b/source/libs/index/src/indexCache.c @@ -538,7 +538,7 @@ int idxCachePut(void* cache, SIndexTerm* term, uint64_t uid) { idxCacheRef(pCache); // encode data CacheTerm* ct = taosMemoryCalloc(1, sizeof(CacheTerm)); - if (cache == NULL) { + if (ct == NULL) { return -1; } // set up key @@ -730,15 +730,17 @@ static int32_t idxCacheJsonTermCompare(const void* l, const void* r) { return cmp; } static MemTable* idxInternalCacheCreate(int8_t type) { - int ttype = IDX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? TSDB_DATA_TYPE_BINARY : TSDB_DATA_TYPE_BINARY; + // int ttype = IDX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? TSDB_DATA_TYPE_BINARY : + // TSDB_DATA_TYPE_BINARY; + int ttype = TSDB_DATA_TYPE_BINARY; int32_t (*cmpFn)(const void* l, const void* r) = IDX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? idxCacheJsonTermCompare : idxCacheTermCompare; MemTable* tbl = taosMemoryCalloc(1, sizeof(MemTable)); idxMemRef(tbl); - if (ttype == TSDB_DATA_TYPE_BINARY || ttype == TSDB_DATA_TYPE_NCHAR) { - tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, ttype, MAX_INDEX_KEY_LEN, cmpFn, SL_ALLOW_DUP_KEY, idxCacheTermGet); - } + // if (ttype == TSDB_DATA_TYPE_BINARY || ttype == TSDB_DATA_TYPE_NCHAR) { + tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, ttype, MAX_INDEX_KEY_LEN, cmpFn, SL_ALLOW_DUP_KEY, idxCacheTermGet); + //} return tbl; } diff --git a/source/libs/index/src/indexComm.c b/source/libs/index/src/indexComm.c index ecf91360734e37f0060aeb7758e5c4c5d57d4972..25036996fcf24ce4283d8269e6164b62efa95f6b 100644 --- a/source/libs/index/src/indexComm.c +++ b/source/libs/index/src/indexComm.c @@ -427,6 +427,7 @@ int32_t idxConvertDataToStr(void* src, int8_t type, void** dst) { *dst = taosMemoryCalloc(1, bufSize + 1); idxInt2str(*(uint64_t*)src, *dst, 1); tlen = strlen(*dst); + break; case TSDB_DATA_TYPE_FLOAT: *dst = taosMemoryCalloc(1, bufSize + 1); sprintf(*dst, "%.9lf", *(float*)src); diff --git a/source/libs/index/src/indexFilter.c b/source/libs/index/src/indexFilter.c index 6d81499dbcf13cec4143ef79f3dd6735f70253c6..075408f1b34954c215cc9fc11fbc08b384014001 100644 --- a/source/libs/index/src/indexFilter.c +++ b/source/libs/index/src/indexFilter.c @@ -231,7 +231,9 @@ static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) { SIF_ERR_RET(sifGetValueFromNode(node, ¶m->condValue)); param->colId = -1; param->colValType = (uint8_t)(vn->node.resType.type); - memcpy(param->colName, vn->literal, strlen(vn->literal)); + if (strlen(vn->literal) <= sizeof(param->colName)) { + memcpy(param->colName, vn->literal, strlen(vn->literal)); + } break; } case QUERY_NODE_COLUMN: { @@ -400,54 +402,52 @@ static FORCE_INLINE FilterFunc sifGetFilterFunc(EIndexQueryType type, bool *reve static void sifSetFltParam(SIFParam *left, SIFParam *right, SDataTypeBuf *typedata, SMetaFltParam *param) { int8_t ltype = left->colValType, rtype = right->colValType; if (ltype == TSDB_DATA_TYPE_FLOAT) { - float f; + float f = 0; SIF_DATA_CONVERT(rtype, right->condValue, f); typedata->f = f; param->val = &typedata->f; } else if (ltype == TSDB_DATA_TYPE_DOUBLE) { - double d; + double d = 0; SIF_DATA_CONVERT(rtype, right->condValue, d); typedata->d = d; param->val = &typedata->d; } else if (ltype == TSDB_DATA_TYPE_BIGINT) { - int64_t i64; + int64_t i64 = 0; SIF_DATA_CONVERT(rtype, right->condValue, i64); typedata->i64 = i64; param->val = &typedata->i64; } else if (ltype == TSDB_DATA_TYPE_INT) { - int32_t i32; + int32_t i32 = 0; SIF_DATA_CONVERT(rtype, right->condValue, i32); typedata->i32 = i32; param->val = &typedata->i32; } else if (ltype == TSDB_DATA_TYPE_SMALLINT) { - int16_t i16; - + int16_t i16 = 0; SIF_DATA_CONVERT(rtype, right->condValue, i16); typedata->i16 = i16; param->val = &typedata->i16; } else if (ltype == TSDB_DATA_TYPE_TINYINT) { - int8_t i8; + int8_t i8 = 0; SIF_DATA_CONVERT(rtype, right->condValue, i8) typedata->i8 = i8; param->val = &typedata->i8; } else if (ltype == TSDB_DATA_TYPE_UBIGINT) { - uint64_t u64; + uint64_t u64 = 0; SIF_DATA_CONVERT(rtype, right->condValue, u64); typedata->u64 = u64; param->val = &typedata->u64; - } else if (ltype == TSDB_DATA_TYPE_UINT) { - uint32_t u32; + uint32_t u32 = 0; SIF_DATA_CONVERT(rtype, right->condValue, u32); typedata->u32 = u32; param->val = &typedata->u32; } else if (ltype == TSDB_DATA_TYPE_USMALLINT) { - uint16_t u16; + uint16_t u16 = 0; SIF_DATA_CONVERT(rtype, right->condValue, u16); typedata->u16 = u16; param->val = &typedata->u16; } else if (ltype == TSDB_DATA_TYPE_UTINYINT) { - uint8_t u8; + uint8_t u8 = 0; SIF_DATA_CONVERT(rtype, right->condValue, u8); typedata->u8 = u8; param->val = &typedata->u8; @@ -663,7 +663,7 @@ static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) { // ugly code, refactor later if (nParam > 1 && params[1].status == SFLT_NOT_INDEX) { output->status = SFLT_NOT_INDEX; - return code; + goto _return; } SIF_ERR_JRET(sifGetOperFn(node->opType, &operFn, &output->status)); } diff --git a/source/libs/index/src/indexFst.c b/source/libs/index/src/indexFst.c index 01dffa782d93a5218a988d2c156f85662e5cd0fb..aed2ec3ee321011d9777b437ae9453296b6bbc62 100644 --- a/source/libs/index/src/indexFst.c +++ b/source/libs/index/src/indexFst.c @@ -338,7 +338,7 @@ uint8_t fstStateCommInput(FstState* s, bool* null) { return v; } // 0 indicate that common_input is None - return v == 0 ? 0 : COMMON_INPUT(v); + return COMMON_INPUT(v); } // input_len diff --git a/source/libs/index/src/indexFstFile.c b/source/libs/index/src/indexFstFile.c index 7021fdfae33fc89289e4506fcf40fbfef9601505..33960ad8c233bdbae32bb7847bdddaf26439d132 100644 --- a/source/libs/index/src/indexFstFile.c +++ b/source/libs/index/src/indexFstFile.c @@ -72,7 +72,8 @@ static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t of if (offset >= ctx->file.size) return 0; do { - char key[128] = {0}; + char key[1024] = {0}; + assert(strlen(ctx->file.buf) + 1 + 64 < sizeof(key)); idxGenLRUKey(key, ctx->file.buf, blkId); LRUHandle* h = taosLRUCacheLookup(ctx->lru, key, strlen(key)); @@ -99,6 +100,7 @@ static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t of assert(blk->nread <= kBlockSize); if (blk->nread < kBlockSize && blk->nread < len) { + taosMemoryFree(blk); break; } @@ -150,7 +152,7 @@ IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int if (ctx->type == TFILE) { // ugly code, refactor later ctx->file.readOnly = readOnly; - memcpy(ctx->file.buf, path, strlen(path)); + memcpy(ctx->file.buf, path, sizeof(ctx->file.buf)); if (readOnly == false) { ctx->file.pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); taosFtruncateFile(ctx->file.pFile, 0); diff --git a/source/libs/index/src/indexTfile.c b/source/libs/index/src/indexTfile.c index e746bb43034db6fee9fd1f27ae3cdc5979830b47..4fdf6d9e57309897fef1a3b378966c8b667f8fbe 100644 --- a/source/libs/index/src/indexTfile.c +++ b/source/libs/index/src/indexTfile.c @@ -506,7 +506,9 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const c tfh.suid = suid; tfh.version = version; tfh.colType = colType; - memcpy(tfh.colName, colName, strlen(colName)); + if (strlen(colName) <= sizeof(tfh.colName)) { + memcpy(tfh.colName, colName, strlen(colName)); + } return tfileWriterCreate(wcx, &tfh); } @@ -580,8 +582,14 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { if (cap < ttsz) { cap = ttsz; - buf = (char*)taosMemoryRealloc(buf, cap); + char* t = (char*)taosMemoryRealloc(buf, cap); + if (t == NULL) { + taosMemoryFree(buf); + return -1; + } + buf = t; } + char* p = buf; tfileSerialTableIdsToBuf(p, v->tableId); tw->ctx->write(tw->ctx, buf, ttsz); diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index d6110d9fcd617ed242633e4115def2a526fc7b0d..ccae1f54be3938f45c36ebee24ef4d4a305cb3ce 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -124,8 +124,8 @@ static void optSetParentOrder(SLogicNode* pNode, EOrder order) { EDealRes scanPathOptHaveNormalColImpl(SNode* pNode, void* pContext) { if (QUERY_NODE_COLUMN == nodeType(pNode)) { - *((bool*)pContext) = - (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType && COLUMN_TYPE_TBNAME != ((SColumnNode*)pNode)->colType); + *((bool*)pContext) = + (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType && COLUMN_TYPE_TBNAME != ((SColumnNode*)pNode)->colType); return *((bool*)pContext) ? DEAL_RES_END : DEAL_RES_IGNORE_CHILD; } return DEAL_RES_CONTINUE; @@ -1520,11 +1520,15 @@ static bool partTagsHasIndefRowsSelectFunc(SNodeList* pFuncs) { return false; } -static int32_t partTagsRewriteGroupTagsToFuncs(SNodeList* pGroupTags, SNodeList* pAggFuncs) { +static int32_t partTagsRewriteGroupTagsToFuncs(SNodeList* pGroupTags, int32_t start, SNodeList* pAggFuncs) { bool hasIndefRowsSelectFunc = partTagsHasIndefRowsSelectFunc(pAggFuncs); int32_t code = TSDB_CODE_SUCCESS; + int32_t index = 0; SNode* pNode = NULL; FOREACH(pNode, pGroupTags) { + if (index++ < start) { + continue; + } if (hasIndefRowsSelectFunc) { code = nodesListStrictAppend(pAggFuncs, partTagsCreateWrapperFunc("_select_value", pNode)); } else { @@ -1559,20 +1563,35 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub } } else { SAggLogicNode* pAgg = (SAggLogicNode*)pNode; + int32_t start = -1; SNode* pGroupKey = NULL; FOREACH(pGroupKey, pAgg->pGroupKeys) { - code = nodesListMakeStrictAppend( - &pScan->pGroupTags, nodesCloneNode(nodesListGetNode(((SGroupingSetNode*)pGroupKey)->pParameterList, 0))); + SNode* pGroupExpr = nodesListGetNode(((SGroupingSetNode*)pGroupKey)->pParameterList, 0); + if (NULL != pScan->pGroupTags) { + SNode* pGroupTag = NULL; + FOREACH(pGroupTag, pScan->pGroupTags) { + if (nodesEqualNode(pGroupTag, pGroupExpr)) { + continue; + } + } + } + if (start < 0) { + start = LIST_LENGTH(pScan->pGroupTags); + } + code = nodesListMakeStrictAppend(&pScan->pGroupTags, nodesCloneNode(pGroupExpr)); if (TSDB_CODE_SUCCESS != code) { break; } } NODES_DESTORY_LIST(pAgg->pGroupKeys); - code = partTagsRewriteGroupTagsToFuncs(pScan->pGroupTags, pAgg->pAggFuncs); + if (TSDB_CODE_SUCCESS == code && start >= 0) { + code = partTagsRewriteGroupTagsToFuncs(pScan->pGroupTags, start, pAgg->pAggFuncs); + } } if (TSDB_CODE_SUCCESS == code) { code = partTagsOptRebuildTbanme(pScan->pGroupTags); } + pCxt->optimized = true; return code; } diff --git a/source/libs/planner/test/planPartByTest.cpp b/source/libs/planner/test/planPartByTest.cpp index 48ab1e1ac2cc5cf9f8b836b9d7940df047019f6e..3d0467dffee8bb518acf73dab29efb330c17c14d 100644 --- a/source/libs/planner/test/planPartByTest.cpp +++ b/source/libs/planner/test/planPartByTest.cpp @@ -61,6 +61,8 @@ TEST_F(PlanPartitionByTest, withGroupBy) { run("SELECT COUNT(*) FROM t1 PARTITION BY c1 GROUP BY c2"); run("SELECT TBNAME, c1 FROM st1 PARTITION BY TBNAME GROUP BY c1"); + + run("SELECT COUNT(*) FROM t1 PARTITION BY TBNAME GROUP BY TBNAME"); } TEST_F(PlanPartitionByTest, withTimeLineFunc) { diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 375f2e573032166a3a61f8748b3db7dd36053098..3f13249ce6b47c748685ca59421e1f9a42231e94 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -96,6 +96,19 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { syncNodeCandidate2Leader(pSyncNode); pSyncNode->pVotesGranted->toLeader = true; return ret; + } + + if (pSyncNode->replicaNum == 1) { + // only myself, to leader + voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode); + votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode); + + pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum); + + syncNodeCandidate2Leader(pSyncNode); + pSyncNode->pVotesGranted->toLeader = true; + return ret; + } switch (pSyncNode->pRaftCfg->snapshotStrategy) { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 907eb21f4c84a4b26169ef81beb9d272d1a14bce..83bd7a5f384ed76fc4d46519bd49e4eadb6a3bd3 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1891,8 +1891,26 @@ inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) { return b1; } +static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) { + if (pOldCfg->replicaNum != pNewCfg->replicaNum) return true; + if (pOldCfg->myIndex != pNewCfg->myIndex) return true; + for (int32_t i = 0; i < pOldCfg->replicaNum; ++i) { + const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i]; + const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i]; + if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true; + if (pOldInfo->nodePort != pNewInfo->nodePort) return true; + } + + return false; +} + void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) { SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg; + if (!syncIsConfigChanged(&oldConfig, pNewConfig)) { + sInfo("vgId:1, sync not reconfig since not changed"); + return; + } + pSyncNode->pRaftCfg->cfg = *pNewConfig; pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex; @@ -2264,7 +2282,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); - ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted)); + //ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted)); syncNodeBecomeLeader(pSyncNode, "candidate to leader"); syncNodeLog2("==state change syncNodeCandidate2Leader==", pSyncNode); diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index c1d8dbef62f2111fa1194bd66e8e5949a7b3303a..2399891f6d99ad50e50f3a2b7add37f9f71e2ead 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -146,7 +146,7 @@ static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const if (nread < 0) { uError("http-report recv error:%s", uv_err_name(nread)); } else { - uTrace("http-report succ to recv %d bytes", nread); + uTrace("http-report succ to recv %d bytes", (int32_t)nread); } uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); } diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index c82af0d0e9b68a3606718bdef47bb783c7289c54..237a4b6059227bfe435cc36601580f06f8e86f62 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -54,11 +54,7 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->retry = pInit->rfp; pRpc->startTimer = pInit->tfp; - if (pInit->connType == TAOS_CONN_SERVER) { - pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; - } else { - pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; - } + pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; uint32_t ip = 0; if (pInit->connType == TAOS_CONN_SERVER) { @@ -79,7 +75,7 @@ void* rpcOpen(const SRpcInit* pInit) { } pRpc->parent = pInit->parent; if (pInit->user) { - memcpy(pRpc->user, pInit->user, strlen(pInit->user)); + memcpy(pRpc->user, pInit->user, TSDB_UNI_LEN); } int64_t refId = transAddExHandle(transGetInstMgt(), pRpc); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 359b9824cf437159dafa8ed76f93e0a6231cba98..48b3097449695af53f74147b32bb77238e44235b 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -267,11 +267,12 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { #define EPSET_GET_SIZE(epSet) (epSet)->numOfEps #define EPSET_GET_INUSE_IP(epSet) ((epSet)->eps[(epSet)->inUse].fqdn) #define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port) -#define EPSET_FORWARD_INUSE(epSet) \ - do { \ - if ((epSet)->numOfEps != 0) { \ - (epSet)->inUse = (++((epSet)->inUse)) % ((epSet)->numOfEps); \ - } \ +#define EPSET_FORWARD_INUSE(epSet) \ + do { \ + if ((epSet)->numOfEps != 0) { \ + ++((epSet)->inUse); \ + (epSet)->inUse = ((epSet)->inUse) % ((epSet)->numOfEps); \ + } \ } while (0) #define EPSET_DEBUG_STR(epSet, tbuf) \ @@ -503,6 +504,7 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { SConnList list = {0}; taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list)); plist = taosHashGet((SHashObj*)pool, key, strlen(key)); + if (plist == NULL) return NULL; QUEUE_INIT(&plist->conns); } @@ -1157,7 +1159,7 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, SCliObj* cli = taosMemoryCalloc(1, sizeof(SCliObj)); STrans* pTransInst = shandle; - memcpy(cli->label, label, strlen(label)); + memcpy(cli->label, label, TSDB_LABEL_LEN); cli->numOfThreads = numOfThreads; cli->pThreadObj = (SCliThrd**)taosMemoryCalloc(cli->numOfThreads, sizeof(SCliThrd*)); @@ -1611,8 +1613,8 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { SCvtAddr cvtAddr = {0}; if (ip != NULL && fqdn != NULL) { - memcpy(cvtAddr.ip, ip, strlen(ip)); - memcpy(cvtAddr.fqdn, fqdn, strlen(fqdn)); + if (strlen(ip) <= sizeof(cvtAddr.ip)) memcpy(cvtAddr.ip, ip, strlen(ip)); + if (strlen(fqdn) <= sizeof(cvtAddr.fqdn)) memcpy(cvtAddr.fqdn, fqdn, strlen(fqdn)); cvtAddr.cvt = true; } for (int i = 0; i < pTransInst->numOfThreads; i++) { diff --git a/source/os/src/osSocket.c b/source/os/src/osSocket.c index bfeef248cdce61bba62d8cc5dcfd175b09559448..fc943d4d76f983aed8c36efd72f851e9b8f3baa4 100644 --- a/source/os/src/osSocket.c +++ b/source/os/src/osSocket.c @@ -590,7 +590,9 @@ TdSocketPtr taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t taosCloseSocket(&pSocket); return NULL; } else { - taosKeepTcpAlive(pSocket); + if (taosKeepTcpAlive(pSocket) == -1) { + return NULL; + } } return pSocket; @@ -1059,18 +1061,22 @@ int32_t taosCreateSocketWithTimeout(uint32_t timeout) { } #if defined(WINDOWS) if (0 != setsockopt(fd, IPPROTO_TCP, TCP_MAXRT, (char *)&timeout, sizeof(timeout))) { + taosCloseSocketNoCheck1(fd); return -1; } #elif defined(_TD_DARWIN_64) uint32_t conn_timeout_ms = timeout * 1000; if (0 != setsockopt(fd, IPPROTO_TCP, TCP_CONNECTIONTIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) { + taosCloseSocketNoCheck1(fd); return -1; } #else // Linux like systems uint32_t conn_timeout_ms = timeout * 1000; if (0 != setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) { + taosCloseSocketNoCheck1(fd); return -1; } #endif + return (int)fd; } diff --git a/source/util/src/tsched.c b/source/util/src/tsched.c index 9cf9e2c4316a850ac31b15aaab8a864deb4d32eb..f524680331dcfdc61005c3e3b60b679463c5a1b9 100644 --- a/source/util/src/tsched.c +++ b/source/util/src/tsched.c @@ -38,6 +38,7 @@ void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *lab if (pSched->queue == NULL) { uError("%s: no enough memory for queue", label); taosCleanUpScheduler(pSched); + taosMemoryFree(pSched); return NULL; }