From 35e8ad28117a9b1630381586540a3a201c54c0fb Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 15 Jul 2022 14:51:20 +0800 Subject: [PATCH] feat: add time unsynced check --- include/common/tmsg.h | 2 + include/util/taoserror.h | 1 + source/client/inc/clientInt.h | 2 +- source/client/src/clientEnv.c | 4 +- source/client/src/clientHb.c | 82 ++++++++++++------------ source/client/src/clientImpl.c | 2 +- source/client/src/clientMsgHandler.c | 12 ++++ source/common/src/tmsg.c | 4 ++ source/dnode/mnode/impl/src/mndProfile.c | 4 +- source/libs/function/src/udfd.c | 9 +++ source/libs/index/src/indexCache.c | 5 +- source/libs/transport/src/transCli.c | 12 ++-- source/libs/transport/src/transSvr.c | 4 +- source/util/src/terror.c | 1 + 14 files changed, 89 insertions(+), 55 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 0ab65e7978..cb8e1171e4 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -525,6 +525,7 @@ typedef struct { int8_t superUser; int8_t connType; SEpSet epSet; + int32_t svrTimestamp; char sVer[TSDB_VERSION_LEN]; char sDetailVer[128]; } SConnectRsp; @@ -2224,6 +2225,7 @@ typedef struct { typedef struct { int64_t reqId; int64_t rspId; + int32_t svrTimestamp; SArray* rsps; // SArray } SClientHbBatchRsp; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index ce434612c3..24837aa35f 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -73,6 +73,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MSG_DECODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0031) #define TSDB_CODE_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0032) #define TSDB_CODE_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0033) +#define TSDB_CODE_TIME_UNSYNCED TAOS_DEF_ERROR_CODE(0, 0x0034) #define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0040) #define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0041) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 700a4d9daf..3999eaccb2 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -285,7 +285,7 @@ static FORCE_INLINE SReqResultInfo* tscGetCurResInfo(TAOS_RES* res) { extern SAppInfo appInfo; extern int32_t clientReqRefPool; extern int32_t clientConnRefPool; -extern void* tscQhandle; +extern int32_t timestampDeltaLimit; __async_send_cb_fn_t getMsgRspHandle(int32_t msgType); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 207ac01a2c..67e6b69233 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -35,6 +35,8 @@ SAppInfo appInfo; int32_t clientReqRefPool = -1; int32_t clientConnRefPool = -1; +int32_t timestampDeltaLimit = 900; // s + static TdThreadOnce tscinit = PTHREAD_ONCE_INIT; volatile int32_t tscInitRes = 0; @@ -181,7 +183,7 @@ void destroyTscObj(void *pObj) { destroyAllRequests(pTscObj->pRequests); taosHashCleanup(pTscObj->pRequests); - + schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter); tscDebug("connObj 0x%" PRIx64 " p:%p destroyed, remain inst totalConn:%" PRId64, pTscObj->id, pTscObj, pTscObj->pAppInfo->numOfConns); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 2a9d113108..b12806ee0f 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -70,7 +70,7 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog if (NULL == vgInfo) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } - + vgInfo->vgVersion = rsp->vgVersion; vgInfo->hashMethod = rsp->hashMethod; vgInfo->vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); @@ -156,18 +156,18 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { STscObj *pTscObj = (STscObj *)acquireTscObj(pRsp->connKey.tscRid); if (NULL == pTscObj) { tscDebug("tscObj rid %" PRIx64 " not exist", pRsp->connKey.tscRid); - } else { + } else { if (pRsp->query->totalDnodes > 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &pRsp->query->epSet)) { - SEpSet* pOrig = &pTscObj->pAppInfo->mgmtEp.epSet; - SEp* pOrigEp = &pOrig->eps[pOrig->inUse]; - SEp* pNewEp = &pRsp->query->epSet.eps[pRsp->query->epSet.inUse]; - tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in hb", - pOrig->inUse, pOrig->numOfEps, pOrigEp->fqdn, pOrigEp->port, - pRsp->query->epSet.inUse, pRsp->query->epSet.numOfEps, pNewEp->fqdn, pNewEp->port); - + SEpSet *pOrig = &pTscObj->pAppInfo->mgmtEp.epSet; + SEp *pOrigEp = &pOrig->eps[pOrig->inUse]; + SEp *pNewEp = &pRsp->query->epSet.eps[pRsp->query->epSet.inUse]; + tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in hb", pOrig->inUse, pOrig->numOfEps, + pOrigEp->fqdn, pOrigEp->port, pRsp->query->epSet.inUse, pRsp->query->epSet.numOfEps, pNewEp->fqdn, + pNewEp->port); + updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pRsp->query->epSet); } - + pTscObj->pAppInfo->totalDnodes = pRsp->query->totalDnodes; pTscObj->pAppInfo->onlineDnodes = pRsp->query->onlineDnodes; pTscObj->connId = pRsp->query->connId; @@ -263,13 +263,20 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { } static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { - static int32_t emptyRspNum = 0; + static int32_t emptyRspNum = 0; char *key = (char *)param; SClientHbBatchRsp pRsp = {0}; if (TSDB_CODE_SUCCESS == code) { tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp); } - + + int32_t now = taosGetTimestampSec(); + int32_t delta = abs(now - pRsp.svrTimestamp); + if (delta > timestampDeltaLimit) { + code = TSDB_CODE_TIME_UNSYNCED; + tscError("time diff: %ds is too big", delta); + } + int32_t rspNum = taosArrayGetSize(pRsp.rsps); taosThreadMutexLock(&appInfo.mutex); @@ -373,7 +380,7 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { releaseTscObj(connKey->tscRid); return TSDB_CODE_QRY_OUT_OF_MEMORY; } - + hbBasic->connId = pTscObj->connId; int32_t numOfQueries = pTscObj->pRequests ? taosHashGetSize(pTscObj->pRequests) : 0; @@ -392,7 +399,6 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } - int32_t code = hbBuildQueryDesc(hbBasic, pTscObj); if (code) { releaseTscObj(connKey->tscRid); @@ -436,13 +442,12 @@ int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S if (NULL == req->info) { req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); } - + taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); return TSDB_CODE_SUCCESS; } - int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) { SDbVgVersion *dbs = NULL; uint32_t dbNum = 0; @@ -483,8 +488,8 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) { SSTableVersion *stbs = NULL; - uint32_t stbNum = 0; - int32_t code = 0; + uint32_t stbNum = 0; + int32_t code = 0; code = catalogGetExpiredSTables(pCatalog, &stbs, &stbNum); if (TSDB_CODE_SUCCESS != code) { @@ -521,20 +526,19 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC } int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) { - SAppHbReq* pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId)); + SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId)); if (NULL != pApp) { memcpy(&req->app, pApp, sizeof(*pApp)); } else { memset(&req->app.summary, 0, sizeof(req->app.summary)); req->app.pid = taosGetPId(); req->app.appId = clientHbMgr.appId; - taosGetAppName(req->app.name, NULL); + taosGetAppName(req->app.name, NULL); } return TSDB_CODE_SUCCESS; } - int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { int64_t *clusterId = (int64_t *)param; struct SCatalog *pCatalog = NULL; @@ -602,7 +606,7 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { continue; } - //hbClearClientHbReq(pOneReq); + // hbClearClientHbReq(pOneReq); pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); } @@ -615,11 +619,9 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { return pBatchReq; } -void hbThreadFuncUnexpectedStopped(void) { - atomic_store_8(&clientHbMgr.threadStop, 2); -} +void hbThreadFuncUnexpectedStopped(void) { atomic_store_8(&clientHbMgr.threadStop, 2); } -void hbMergeSummary(SAppClusterSummary* dst, SAppClusterSummary* src) { +void hbMergeSummary(SAppClusterSummary *dst, SAppClusterSummary *src) { dst->numOfInsertsReq += src->numOfInsertsReq; dst->numOfInsertRows += src->numOfInsertRows; dst->insertElapsedTime += src->insertElapsedTime; @@ -633,7 +635,7 @@ void hbMergeSummary(SAppClusterSummary* dst, SAppClusterSummary* src) { int32_t hbGatherAppInfo(void) { SAppHbReq req = {0}; - int sz = taosArrayGetSize(clientHbMgr.appHbMgrs); + int sz = taosArrayGetSize(clientHbMgr.appHbMgrs); if (sz > 0) { req.pid = taosGetPId(); req.appId = clientHbMgr.appId; @@ -641,11 +643,11 @@ int32_t hbGatherAppInfo(void) { } taosHashClear(clientHbMgr.appSummary); - + for (int32_t i = 0; i < sz; ++i) { SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i); - uint64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId; - SAppHbReq* pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId)); + uint64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId; + SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId)); if (NULL == pApp) { memcpy(&req.summary, &pAppHbMgr->pAppInstInfo->summary, sizeof(req.summary)); req.startTime = pAppHbMgr->startTime; @@ -654,7 +656,7 @@ int32_t hbGatherAppInfo(void) { if (pAppHbMgr->startTime < pApp->startTime) { pApp->startTime = pAppHbMgr->startTime; } - + hbMergeSummary(&pApp->summary, &pAppHbMgr->pAppInstInfo->summary); } } @@ -662,7 +664,6 @@ int32_t hbGatherAppInfo(void) { return TSDB_CODE_SUCCESS; } - static void *hbThreadFunc(void *param) { setThreadName("hb"); #ifdef WINDOWS @@ -681,7 +682,7 @@ static void *hbThreadFunc(void *param) { if (sz > 0) { hbGatherAppInfo(); } - + for (int i = 0; i < sz; i++) { SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i); @@ -698,7 +699,7 @@ static void *hbThreadFunc(void *param) { if (buf == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; tFreeClientHbBatchReq(pReq); - //hbClearReqInfo(pAppHbMgr); + // hbClearReqInfo(pAppHbMgr); break; } @@ -708,7 +709,7 @@ static void *hbThreadFunc(void *param) { if (pInfo == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; tFreeClientHbBatchReq(pReq); - //hbClearReqInfo(pAppHbMgr); + // hbClearReqInfo(pAppHbMgr); taosMemoryFree(buf); break; } @@ -725,7 +726,7 @@ static void *hbThreadFunc(void *param) { SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp); asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo); tFreeClientHbBatchReq(pReq); - //hbClearReqInfo(pAppHbMgr); + // hbClearReqInfo(pAppHbMgr); atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1); } @@ -759,7 +760,7 @@ static void hbStopThread() { return; } - taosThreadJoin(clientHbMgr.thread, NULL); + taosThreadJoin(clientHbMgr.thread, NULL); tscDebug("hb thread stopped"); } @@ -808,7 +809,7 @@ void hbFreeAppHbMgr(SAppHbMgr *pTarget) { } taosHashCleanup(pTarget->activeInfo); pTarget->activeInfo = NULL; - + taosMemoryFree(pTarget->key); taosMemoryFree(pTarget); } @@ -843,7 +844,7 @@ int hbMgrInit() { clientHbMgr.appId = tGenIdPI64(); tscDebug("app %" PRIx64 " initialized", clientHbMgr.appId); - + clientHbMgr.appSummary = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void *)); taosThreadMutexInit(&clientHbMgr.lock, NULL); @@ -881,7 +882,7 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, int64_t clust SClientHbReq hbReq = {0}; hbReq.connKey = connKey; hbReq.clusterId = clusterId; - //hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + // hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq)); @@ -920,4 +921,3 @@ void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1); } - diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 32c11983c2..940e6e8ebc 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1478,7 +1478,7 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU tsem_wait(&pParam->sem); } - if (pResultInfo->numOfRows == 0 || pRequest->code != TSDB_CODE_SUCCESS) { + if (pResultInfo->numOfRows == 0 || pRequest->code != TSDB_CODE_SUCCESS) { return NULL; } else { if (setupOneRowPtr) { diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 94bd5dd787..331ca86d78 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -52,6 +52,18 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { SConnectRsp connectRsp = {0}; tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp); + + int32_t now = taosGetTimestampSec(); + int32_t delta = abs(now - connectRsp.svrTimestamp); + if (delta > timestampDeltaLimit) { + code = TSDB_CODE_TIME_UNSYNCED; + tscError("time diff: %" PRId64 "ms is too big", delta); + taosMemoryFree(pMsg->pData); + setErrno(pRequest, code); + tsem_post(&pRequest->body.rspSem); + return code; + } + /*assert(connectRsp.epSet.numOfEps > 0);*/ if (connectRsp.epSet.numOfEps == 0) { taosMemoryFree(pMsg->pData); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index eca8d49b6a..058ef2e81d 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -453,6 +453,7 @@ int32_t tSerializeSClientHbBatchRsp(void *buf, int32_t bufLen, const SClientHbBa if (tStartEncode(&encoder) < 0) return -1; if (tEncodeI64(&encoder, pBatchRsp->reqId) < 0) return -1; if (tEncodeI64(&encoder, pBatchRsp->rspId) < 0) return -1; + if (tEncodeI32(&encoder, pBatchRsp->svrTimestamp) < 0) return -1; int32_t rspNum = taosArrayGetSize(pBatchRsp->rsps); if (tEncodeI32(&encoder, rspNum) < 0) return -1; @@ -474,6 +475,7 @@ int32_t tDeserializeSClientHbBatchRsp(void *buf, int32_t bufLen, SClientHbBatchR if (tStartDecode(&decoder) < 0) return -1; if (tDecodeI64(&decoder, &pBatchRsp->reqId) < 0) return -1; if (tDecodeI64(&decoder, &pBatchRsp->rspId) < 0) return -1; + if (tDecodeI32(&decoder, &pBatchRsp->svrTimestamp) < 0) return -1; int32_t rspNum = 0; if (tDecodeI32(&decoder, &rspNum) < 0) return -1; @@ -3613,6 +3615,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { if (tEncodeI8(&encoder, pRsp->superUser) < 0) return -1; if (tEncodeI8(&encoder, pRsp->connType) < 0) return -1; if (tEncodeSEpSet(&encoder, &pRsp->epSet) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->svrTimestamp) < 0) return -1; if (tEncodeCStr(&encoder, pRsp->sVer) < 0) return -1; if (tEncodeCStr(&encoder, pRsp->sDetailVer) < 0) return -1; tEndEncode(&encoder); @@ -3634,6 +3637,7 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { if (tDecodeI8(&decoder, &pRsp->superUser) < 0) return -1; if (tDecodeI8(&decoder, &pRsp->connType) < 0) return -1; if (tDecodeSEpSet(&decoder, &pRsp->epSet) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->svrTimestamp) < 0) return -1; if (tDecodeCStrTo(&decoder, pRsp->sVer) < 0) return -1; if (tDecodeCStrTo(&decoder, pRsp->sDetailVer) < 0) return -1; tEndDecode(&decoder); diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 38368b4ece..81bd887ad9 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -15,10 +15,10 @@ #define _DEFAULT_SOURCE #include "mndProfile.h" -#include "mndPrivilege.h" #include "mndDb.h" #include "mndDnode.h" #include "mndMnode.h" +#include "mndPrivilege.h" #include "mndQnode.h" #include "mndShow.h" #include "mndStb.h" @@ -274,6 +274,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { connectRsp.connId = pConn->id; connectRsp.connType = connReq.connType; connectRsp.dnodeNum = mndGetDnodeSize(pMnode); + connectRsp.svrTimestamp = taosGetTimestampSec(); strcpy(connectRsp.sVer, version); snprintf(connectRsp.sDetailVer, sizeof(connectRsp.sDetailVer), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo, @@ -623,6 +624,7 @@ static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) { } SClientHbBatchRsp batchRsp = {0}; + batchRsp.svrTimestamp = taosGetTimestampMs(); batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp)); int32_t sz = taosArrayGetSize(batchReq.reqs); diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index a412b589a9..e33c785917 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -382,6 +382,15 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) { SConnectRsp connectRsp = {0}; tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp); + + int32_t now = taosGetTimestampSec(); + int32_t delta = abs(now - connectRsp.svrTimestamp); + if (delta > 900) { + msgInfo->code = TSDB_CODE_TIME_UNSYNCED; + goto _return; + } + + if (connectRsp.epSet.numOfEps == 0) { msgInfo->code = TSDB_CODE_MND_APP_ERROR; goto _return; diff --git a/source/libs/index/src/indexCache.c b/source/libs/index/src/indexCache.c index 05ce418037..bfc3047719 100644 --- a/source/libs/index/src/indexCache.c +++ b/source/libs/index/src/indexCache.c @@ -516,13 +516,14 @@ static void idxCacheMakeRoomForWrite(IndexCache* cache) { idxCacheRef(cache); cache->imm = cache->mem; cache->mem = idxInternalCacheCreate(cache->type); + cache->mem->pCache = cache; cache->occupiedMem = 0; if (quit == false) { atomic_store_32(&cache->merging, 1); } - // sched to merge - // unref cache in bgwork + // 1. sched to merge + // 2. unref cache in bgwork idxCacheSchedToMerge(cache, quit); } } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 40227f02cc..ff984e464b 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1041,7 +1041,7 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { STraceId* trace = &pMsg->msg.info.traceId; char tbuf[256] = {0}; EPSET_DEBUG_STR(&pCtx->epSet, tbuf); - tGTrace("%s retry on next node, use %s, retryCnt:%d, limit:%d", transLabel(pThrd->pTransInst), tbuf, + tGDebug("%s retry on next node, use %s, retryCnt:%d, limit:%d", transLabel(pThrd->pTransInst), tbuf, pCtx->retryCnt + 1, pCtx->retryLimit); STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); @@ -1133,11 +1133,11 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { if (hasEpSet) { char tbuf[256] = {0}; EPSET_DEBUG_STR(&pCtx->epSet, tbuf); - tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn); + tGDebug("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn); } if (pCtx->pSem != NULL) { - tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn); + tGDebug("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn); if (pCtx->pRsp == NULL) { tGTrace("%s conn %p(sync) failed to resp, ignore", CONN_GET_INST_LABEL(pConn), pConn); } else { @@ -1146,7 +1146,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { tsem_post(pCtx->pSem); pCtx->pRsp = NULL; } else { - tGTrace("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn); + tGDebug("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn); if (retry == false && hasEpSet == true) { pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet); } else { @@ -1256,7 +1256,7 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra cliMsg->refId = (int64_t)shandle; STraceId* trace = &pReq->info.traceId; - tGTrace("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, + tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); ASSERT(transAsyncSend(pThrd->asyncPool, &(cliMsg->q)) == 0); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); @@ -1296,7 +1296,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM cliMsg->refId = (int64_t)shandle; STraceId* trace = &pReq->info.traceId; - tGTrace("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, + tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); transAsyncSend(pThrd->asyncPool, &(cliMsg->q)); diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 68e12a1963..9a511adf9b 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -1020,7 +1020,7 @@ void transRefSrvHandle(void* handle) { return; } int ref = T_REF_INC((SSvrConn*)handle); - tDebug("conn %p ref count:%d", handle, ref); + tTrace("conn %p ref count:%d", handle, ref); } void transUnrefSrvHandle(void* handle) { @@ -1028,7 +1028,7 @@ void transUnrefSrvHandle(void* handle) { return; } int ref = T_REF_DEC((SSvrConn*)handle); - tDebug("conn %p ref count:%d", handle, ref); + tTrace("conn %p ref count:%d", handle, ref); if (ref == 0) { destroyConn((SSvrConn*)handle, true); } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index ef6697b3b5..8600c61587 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -78,6 +78,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TIMESTAMP, "Invalid timestamp for TAOS_DEFINE_ERROR(TSDB_CODE_MSG_DECODE_ERROR, "Msg decode error") TAOS_DEFINE_ERROR(TSDB_CODE_NO_AVAIL_DISK, "No available disk") TAOS_DEFINE_ERROR(TSDB_CODE_NOT_FOUND, "Not found") +TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Unsynced time") TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs") -- GitLab