未验证 提交 a5ca6d0c 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #14942 from taosdata/feat/checkTimeUnSynced

feat: add time unsynced check
...@@ -525,6 +525,7 @@ typedef struct { ...@@ -525,6 +525,7 @@ typedef struct {
int8_t superUser; int8_t superUser;
int8_t connType; int8_t connType;
SEpSet epSet; SEpSet epSet;
int32_t svrTimestamp;
char sVer[TSDB_VERSION_LEN]; char sVer[TSDB_VERSION_LEN];
char sDetailVer[128]; char sDetailVer[128];
} SConnectRsp; } SConnectRsp;
...@@ -2233,6 +2234,7 @@ typedef struct { ...@@ -2233,6 +2234,7 @@ typedef struct {
typedef struct { typedef struct {
int64_t reqId; int64_t reqId;
int64_t rspId; int64_t rspId;
int32_t svrTimestamp;
SArray* rsps; // SArray<SClientHbRsp> SArray* rsps; // SArray<SClientHbRsp>
} SClientHbBatchRsp; } SClientHbBatchRsp;
......
...@@ -73,6 +73,7 @@ int32_t* taosGetErrno(); ...@@ -73,6 +73,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MSG_DECODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0031) #define TSDB_CODE_MSG_DECODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0031)
#define TSDB_CODE_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0032) #define TSDB_CODE_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0032)
#define TSDB_CODE_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0033) #define TSDB_CODE_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0033)
#define TSDB_CODE_TIME_UNSYNCED TAOS_DEF_ERROR_CODE(0, 0x0034)
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0040) #define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0040)
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0041) #define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0041)
......
...@@ -286,7 +286,7 @@ static FORCE_INLINE SReqResultInfo* tscGetCurResInfo(TAOS_RES* res) { ...@@ -286,7 +286,7 @@ static FORCE_INLINE SReqResultInfo* tscGetCurResInfo(TAOS_RES* res) {
extern SAppInfo appInfo; extern SAppInfo appInfo;
extern int32_t clientReqRefPool; extern int32_t clientReqRefPool;
extern int32_t clientConnRefPool; extern int32_t clientConnRefPool;
extern void* tscQhandle; extern int32_t timestampDeltaLimit;
__async_send_cb_fn_t getMsgRspHandle(int32_t msgType); __async_send_cb_fn_t getMsgRspHandle(int32_t msgType);
......
...@@ -35,6 +35,8 @@ SAppInfo appInfo; ...@@ -35,6 +35,8 @@ SAppInfo appInfo;
int32_t clientReqRefPool = -1; int32_t clientReqRefPool = -1;
int32_t clientConnRefPool = -1; int32_t clientConnRefPool = -1;
int32_t timestampDeltaLimit = 900; // s
static TdThreadOnce tscinit = PTHREAD_ONCE_INIT; static TdThreadOnce tscinit = PTHREAD_ONCE_INIT;
volatile int32_t tscInitRes = 0; volatile int32_t tscInitRes = 0;
...@@ -181,7 +183,7 @@ void destroyTscObj(void *pObj) { ...@@ -181,7 +183,7 @@ void destroyTscObj(void *pObj) {
destroyAllRequests(pTscObj->pRequests); destroyAllRequests(pTscObj->pRequests);
taosHashCleanup(pTscObj->pRequests); taosHashCleanup(pTscObj->pRequests);
schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter); schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter);
tscDebug("connObj 0x%" PRIx64 " p:%p destroyed, remain inst totalConn:%" PRId64, pTscObj->id, pTscObj, tscDebug("connObj 0x%" PRIx64 " p:%p destroyed, remain inst totalConn:%" PRId64, pTscObj->id, pTscObj,
pTscObj->pAppInfo->numOfConns); pTscObj->pAppInfo->numOfConns);
......
...@@ -70,7 +70,7 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog ...@@ -70,7 +70,7 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
if (NULL == vgInfo) { if (NULL == vgInfo) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
vgInfo->vgVersion = rsp->vgVersion; vgInfo->vgVersion = rsp->vgVersion;
vgInfo->hashMethod = rsp->hashMethod; vgInfo->hashMethod = rsp->hashMethod;
vgInfo->vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); vgInfo->vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
...@@ -156,18 +156,18 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { ...@@ -156,18 +156,18 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
STscObj *pTscObj = (STscObj *)acquireTscObj(pRsp->connKey.tscRid); STscObj *pTscObj = (STscObj *)acquireTscObj(pRsp->connKey.tscRid);
if (NULL == pTscObj) { if (NULL == pTscObj) {
tscDebug("tscObj rid %" PRIx64 " not exist", pRsp->connKey.tscRid); tscDebug("tscObj rid %" PRIx64 " not exist", pRsp->connKey.tscRid);
} else { } else {
if (pRsp->query->totalDnodes > 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &pRsp->query->epSet)) { if (pRsp->query->totalDnodes > 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &pRsp->query->epSet)) {
SEpSet* pOrig = &pTscObj->pAppInfo->mgmtEp.epSet; SEpSet *pOrig = &pTscObj->pAppInfo->mgmtEp.epSet;
SEp* pOrigEp = &pOrig->eps[pOrig->inUse]; SEp *pOrigEp = &pOrig->eps[pOrig->inUse];
SEp* pNewEp = &pRsp->query->epSet.eps[pRsp->query->epSet.inUse]; SEp *pNewEp = &pRsp->query->epSet.eps[pRsp->query->epSet.inUse];
tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in hb", tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in hb", pOrig->inUse, pOrig->numOfEps,
pOrig->inUse, pOrig->numOfEps, pOrigEp->fqdn, pOrigEp->port, pOrigEp->fqdn, pOrigEp->port, pRsp->query->epSet.inUse, pRsp->query->epSet.numOfEps, pNewEp->fqdn,
pRsp->query->epSet.inUse, pRsp->query->epSet.numOfEps, pNewEp->fqdn, pNewEp->port); pNewEp->port);
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pRsp->query->epSet); updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pRsp->query->epSet);
} }
pTscObj->pAppInfo->totalDnodes = pRsp->query->totalDnodes; pTscObj->pAppInfo->totalDnodes = pRsp->query->totalDnodes;
pTscObj->pAppInfo->onlineDnodes = pRsp->query->onlineDnodes; pTscObj->pAppInfo->onlineDnodes = pRsp->query->onlineDnodes;
pTscObj->connId = pRsp->query->connId; pTscObj->connId = pRsp->query->connId;
...@@ -263,13 +263,20 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { ...@@ -263,13 +263,20 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
} }
static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
static int32_t emptyRspNum = 0; static int32_t emptyRspNum = 0;
char *key = (char *)param; char *key = (char *)param;
SClientHbBatchRsp pRsp = {0}; SClientHbBatchRsp pRsp = {0};
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp); tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp);
} }
int32_t now = taosGetTimestampSec();
int32_t delta = abs(now - pRsp.svrTimestamp);
if (delta > timestampDeltaLimit) {
code = TSDB_CODE_TIME_UNSYNCED;
tscError("time diff: %ds is too big", delta);
}
int32_t rspNum = taosArrayGetSize(pRsp.rsps); int32_t rspNum = taosArrayGetSize(pRsp.rsps);
taosThreadMutexLock(&appInfo.mutex); taosThreadMutexLock(&appInfo.mutex);
...@@ -286,7 +293,7 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { ...@@ -286,7 +293,7 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
if (code != 0) { if (code != 0) {
(*pInst)->onlineDnodes = 0; (*pInst)->onlineDnodes = ((*pInst)->totalDnodes ? 0 : -1);
} }
if (rspNum) { if (rspNum) {
...@@ -373,7 +380,7 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { ...@@ -373,7 +380,7 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
releaseTscObj(connKey->tscRid); releaseTscObj(connKey->tscRid);
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
hbBasic->connId = pTscObj->connId; hbBasic->connId = pTscObj->connId;
int32_t numOfQueries = pTscObj->pRequests ? taosHashGetSize(pTscObj->pRequests) : 0; int32_t numOfQueries = pTscObj->pRequests ? taosHashGetSize(pTscObj->pRequests) : 0;
...@@ -392,7 +399,6 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { ...@@ -392,7 +399,6 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
int32_t code = hbBuildQueryDesc(hbBasic, pTscObj); int32_t code = hbBuildQueryDesc(hbBasic, pTscObj);
if (code) { if (code) {
releaseTscObj(connKey->tscRid); releaseTscObj(connKey->tscRid);
...@@ -436,13 +442,12 @@ int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S ...@@ -436,13 +442,12 @@ int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S
if (NULL == req->info) { if (NULL == req->info) {
req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
} }
taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) { int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
SDbVgVersion *dbs = NULL; SDbVgVersion *dbs = NULL;
uint32_t dbNum = 0; uint32_t dbNum = 0;
...@@ -483,8 +488,8 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl ...@@ -483,8 +488,8 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) { int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
SSTableVersion *stbs = NULL; SSTableVersion *stbs = NULL;
uint32_t stbNum = 0; uint32_t stbNum = 0;
int32_t code = 0; int32_t code = 0;
code = catalogGetExpiredSTables(pCatalog, &stbs, &stbNum); code = catalogGetExpiredSTables(pCatalog, &stbs, &stbNum);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
...@@ -521,20 +526,19 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC ...@@ -521,20 +526,19 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC
} }
int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) { int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) {
SAppHbReq* pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId)); SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
if (NULL != pApp) { if (NULL != pApp) {
memcpy(&req->app, pApp, sizeof(*pApp)); memcpy(&req->app, pApp, sizeof(*pApp));
} else { } else {
memset(&req->app.summary, 0, sizeof(req->app.summary)); memset(&req->app.summary, 0, sizeof(req->app.summary));
req->app.pid = taosGetPId(); req->app.pid = taosGetPId();
req->app.appId = clientHbMgr.appId; req->app.appId = clientHbMgr.appId;
taosGetAppName(req->app.name, NULL); taosGetAppName(req->app.name, NULL);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {
int64_t *clusterId = (int64_t *)param; int64_t *clusterId = (int64_t *)param;
struct SCatalog *pCatalog = NULL; struct SCatalog *pCatalog = NULL;
...@@ -602,7 +606,7 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { ...@@ -602,7 +606,7 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
continue; continue;
} }
//hbClearClientHbReq(pOneReq); // hbClearClientHbReq(pOneReq);
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
} }
...@@ -615,11 +619,9 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { ...@@ -615,11 +619,9 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
return pBatchReq; return pBatchReq;
} }
void hbThreadFuncUnexpectedStopped(void) { void hbThreadFuncUnexpectedStopped(void) { atomic_store_8(&clientHbMgr.threadStop, 2); }
atomic_store_8(&clientHbMgr.threadStop, 2);
}
void hbMergeSummary(SAppClusterSummary* dst, SAppClusterSummary* src) { void hbMergeSummary(SAppClusterSummary *dst, SAppClusterSummary *src) {
dst->numOfInsertsReq += src->numOfInsertsReq; dst->numOfInsertsReq += src->numOfInsertsReq;
dst->numOfInsertRows += src->numOfInsertRows; dst->numOfInsertRows += src->numOfInsertRows;
dst->insertElapsedTime += src->insertElapsedTime; dst->insertElapsedTime += src->insertElapsedTime;
...@@ -633,7 +635,7 @@ void hbMergeSummary(SAppClusterSummary* dst, SAppClusterSummary* src) { ...@@ -633,7 +635,7 @@ void hbMergeSummary(SAppClusterSummary* dst, SAppClusterSummary* src) {
int32_t hbGatherAppInfo(void) { int32_t hbGatherAppInfo(void) {
SAppHbReq req = {0}; SAppHbReq req = {0};
int sz = taosArrayGetSize(clientHbMgr.appHbMgrs); int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
if (sz > 0) { if (sz > 0) {
req.pid = taosGetPId(); req.pid = taosGetPId();
req.appId = clientHbMgr.appId; req.appId = clientHbMgr.appId;
...@@ -641,11 +643,11 @@ int32_t hbGatherAppInfo(void) { ...@@ -641,11 +643,11 @@ int32_t hbGatherAppInfo(void) {
} }
taosHashClear(clientHbMgr.appSummary); taosHashClear(clientHbMgr.appSummary);
for (int32_t i = 0; i < sz; ++i) { for (int32_t i = 0; i < sz; ++i) {
SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i); SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
uint64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId; uint64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId;
SAppHbReq* pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId)); SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
if (NULL == pApp) { if (NULL == pApp) {
memcpy(&req.summary, &pAppHbMgr->pAppInstInfo->summary, sizeof(req.summary)); memcpy(&req.summary, &pAppHbMgr->pAppInstInfo->summary, sizeof(req.summary));
req.startTime = pAppHbMgr->startTime; req.startTime = pAppHbMgr->startTime;
...@@ -654,7 +656,7 @@ int32_t hbGatherAppInfo(void) { ...@@ -654,7 +656,7 @@ int32_t hbGatherAppInfo(void) {
if (pAppHbMgr->startTime < pApp->startTime) { if (pAppHbMgr->startTime < pApp->startTime) {
pApp->startTime = pAppHbMgr->startTime; pApp->startTime = pAppHbMgr->startTime;
} }
hbMergeSummary(&pApp->summary, &pAppHbMgr->pAppInstInfo->summary); hbMergeSummary(&pApp->summary, &pAppHbMgr->pAppInstInfo->summary);
} }
} }
...@@ -662,7 +664,6 @@ int32_t hbGatherAppInfo(void) { ...@@ -662,7 +664,6 @@ int32_t hbGatherAppInfo(void) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void *hbThreadFunc(void *param) { static void *hbThreadFunc(void *param) {
setThreadName("hb"); setThreadName("hb");
#ifdef WINDOWS #ifdef WINDOWS
...@@ -681,7 +682,7 @@ static void *hbThreadFunc(void *param) { ...@@ -681,7 +682,7 @@ static void *hbThreadFunc(void *param) {
if (sz > 0) { if (sz > 0) {
hbGatherAppInfo(); hbGatherAppInfo();
} }
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i); SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
...@@ -698,7 +699,7 @@ static void *hbThreadFunc(void *param) { ...@@ -698,7 +699,7 @@ static void *hbThreadFunc(void *param) {
if (buf == NULL) { if (buf == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tFreeClientHbBatchReq(pReq); tFreeClientHbBatchReq(pReq);
//hbClearReqInfo(pAppHbMgr); // hbClearReqInfo(pAppHbMgr);
break; break;
} }
...@@ -708,7 +709,7 @@ static void *hbThreadFunc(void *param) { ...@@ -708,7 +709,7 @@ static void *hbThreadFunc(void *param) {
if (pInfo == NULL) { if (pInfo == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tFreeClientHbBatchReq(pReq); tFreeClientHbBatchReq(pReq);
//hbClearReqInfo(pAppHbMgr); // hbClearReqInfo(pAppHbMgr);
taosMemoryFree(buf); taosMemoryFree(buf);
break; break;
} }
...@@ -725,7 +726,7 @@ static void *hbThreadFunc(void *param) { ...@@ -725,7 +726,7 @@ static void *hbThreadFunc(void *param) {
SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp); SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo); asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo);
tFreeClientHbBatchReq(pReq); tFreeClientHbBatchReq(pReq);
//hbClearReqInfo(pAppHbMgr); // hbClearReqInfo(pAppHbMgr);
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1); atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
} }
...@@ -759,7 +760,7 @@ static void hbStopThread() { ...@@ -759,7 +760,7 @@ static void hbStopThread() {
return; return;
} }
taosThreadJoin(clientHbMgr.thread, NULL); taosThreadJoin(clientHbMgr.thread, NULL);
tscDebug("hb thread stopped"); tscDebug("hb thread stopped");
} }
...@@ -808,7 +809,7 @@ void hbFreeAppHbMgr(SAppHbMgr *pTarget) { ...@@ -808,7 +809,7 @@ void hbFreeAppHbMgr(SAppHbMgr *pTarget) {
} }
taosHashCleanup(pTarget->activeInfo); taosHashCleanup(pTarget->activeInfo);
pTarget->activeInfo = NULL; pTarget->activeInfo = NULL;
taosMemoryFree(pTarget->key); taosMemoryFree(pTarget->key);
taosMemoryFree(pTarget); taosMemoryFree(pTarget);
} }
...@@ -843,7 +844,7 @@ int hbMgrInit() { ...@@ -843,7 +844,7 @@ int hbMgrInit() {
clientHbMgr.appId = tGenIdPI64(); clientHbMgr.appId = tGenIdPI64();
tscDebug("app %" PRIx64 " initialized", clientHbMgr.appId); tscDebug("app %" PRIx64 " initialized", clientHbMgr.appId);
clientHbMgr.appSummary = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); clientHbMgr.appSummary = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void *)); clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void *));
taosThreadMutexInit(&clientHbMgr.lock, NULL); taosThreadMutexInit(&clientHbMgr.lock, NULL);
...@@ -881,7 +882,7 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, int64_t clust ...@@ -881,7 +882,7 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, int64_t clust
SClientHbReq hbReq = {0}; SClientHbReq hbReq = {0};
hbReq.connKey = connKey; hbReq.connKey = connKey;
hbReq.clusterId = clusterId; hbReq.clusterId = clusterId;
//hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); // hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq)); taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq));
...@@ -920,4 +921,3 @@ void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { ...@@ -920,4 +921,3 @@ void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) {
atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1); atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
} }
...@@ -52,6 +52,18 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -52,6 +52,18 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
SConnectRsp connectRsp = {0}; SConnectRsp connectRsp = {0};
tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp); tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp);
int32_t now = taosGetTimestampSec();
int32_t delta = abs(now - connectRsp.svrTimestamp);
if (delta > timestampDeltaLimit) {
code = TSDB_CODE_TIME_UNSYNCED;
tscError("time diff:%ds is too big", delta);
taosMemoryFree(pMsg->pData);
setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem);
return code;
}
/*assert(connectRsp.epSet.numOfEps > 0);*/ /*assert(connectRsp.epSet.numOfEps > 0);*/
if (connectRsp.epSet.numOfEps == 0) { if (connectRsp.epSet.numOfEps == 0) {
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
......
...@@ -453,6 +453,7 @@ int32_t tSerializeSClientHbBatchRsp(void *buf, int32_t bufLen, const SClientHbBa ...@@ -453,6 +453,7 @@ int32_t tSerializeSClientHbBatchRsp(void *buf, int32_t bufLen, const SClientHbBa
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI64(&encoder, pBatchRsp->reqId) < 0) return -1; if (tEncodeI64(&encoder, pBatchRsp->reqId) < 0) return -1;
if (tEncodeI64(&encoder, pBatchRsp->rspId) < 0) return -1; if (tEncodeI64(&encoder, pBatchRsp->rspId) < 0) return -1;
if (tEncodeI32(&encoder, pBatchRsp->svrTimestamp) < 0) return -1;
int32_t rspNum = taosArrayGetSize(pBatchRsp->rsps); int32_t rspNum = taosArrayGetSize(pBatchRsp->rsps);
if (tEncodeI32(&encoder, rspNum) < 0) return -1; if (tEncodeI32(&encoder, rspNum) < 0) return -1;
...@@ -474,6 +475,7 @@ int32_t tDeserializeSClientHbBatchRsp(void *buf, int32_t bufLen, SClientHbBatchR ...@@ -474,6 +475,7 @@ int32_t tDeserializeSClientHbBatchRsp(void *buf, int32_t bufLen, SClientHbBatchR
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI64(&decoder, &pBatchRsp->reqId) < 0) return -1; if (tDecodeI64(&decoder, &pBatchRsp->reqId) < 0) return -1;
if (tDecodeI64(&decoder, &pBatchRsp->rspId) < 0) return -1; if (tDecodeI64(&decoder, &pBatchRsp->rspId) < 0) return -1;
if (tDecodeI32(&decoder, &pBatchRsp->svrTimestamp) < 0) return -1;
int32_t rspNum = 0; int32_t rspNum = 0;
if (tDecodeI32(&decoder, &rspNum) < 0) return -1; if (tDecodeI32(&decoder, &rspNum) < 0) return -1;
...@@ -3613,6 +3615,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { ...@@ -3613,6 +3615,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
if (tEncodeI8(&encoder, pRsp->superUser) < 0) return -1; if (tEncodeI8(&encoder, pRsp->superUser) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->connType) < 0) return -1; if (tEncodeI8(&encoder, pRsp->connType) < 0) return -1;
if (tEncodeSEpSet(&encoder, &pRsp->epSet) < 0) return -1; if (tEncodeSEpSet(&encoder, &pRsp->epSet) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->svrTimestamp) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->sVer) < 0) return -1; if (tEncodeCStr(&encoder, pRsp->sVer) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->sDetailVer) < 0) return -1; if (tEncodeCStr(&encoder, pRsp->sDetailVer) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
...@@ -3634,6 +3637,7 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { ...@@ -3634,6 +3637,7 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
if (tDecodeI8(&decoder, &pRsp->superUser) < 0) return -1; if (tDecodeI8(&decoder, &pRsp->superUser) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->connType) < 0) return -1; if (tDecodeI8(&decoder, &pRsp->connType) < 0) return -1;
if (tDecodeSEpSet(&decoder, &pRsp->epSet) < 0) return -1; if (tDecodeSEpSet(&decoder, &pRsp->epSet) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->svrTimestamp) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->sVer) < 0) return -1; if (tDecodeCStrTo(&decoder, pRsp->sVer) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->sDetailVer) < 0) return -1; if (tDecodeCStrTo(&decoder, pRsp->sDetailVer) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
......
...@@ -15,10 +15,10 @@ ...@@ -15,10 +15,10 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndProfile.h" #include "mndProfile.h"
#include "mndPrivilege.h"
#include "mndDb.h" #include "mndDb.h"
#include "mndDnode.h" #include "mndDnode.h"
#include "mndMnode.h" #include "mndMnode.h"
#include "mndPrivilege.h"
#include "mndQnode.h" #include "mndQnode.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndStb.h" #include "mndStb.h"
...@@ -274,6 +274,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { ...@@ -274,6 +274,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
connectRsp.connId = pConn->id; connectRsp.connId = pConn->id;
connectRsp.connType = connReq.connType; connectRsp.connType = connReq.connType;
connectRsp.dnodeNum = mndGetDnodeSize(pMnode); connectRsp.dnodeNum = mndGetDnodeSize(pMnode);
connectRsp.svrTimestamp = taosGetTimestampSec();
strcpy(connectRsp.sVer, version); strcpy(connectRsp.sVer, version);
snprintf(connectRsp.sDetailVer, sizeof(connectRsp.sDetailVer), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo, snprintf(connectRsp.sDetailVer, sizeof(connectRsp.sDetailVer), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,
...@@ -623,6 +624,7 @@ static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) { ...@@ -623,6 +624,7 @@ static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) {
} }
SClientHbBatchRsp batchRsp = {0}; SClientHbBatchRsp batchRsp = {0};
batchRsp.svrTimestamp = taosGetTimestampSec();
batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp)); batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
int32_t sz = taosArrayGetSize(batchReq.reqs); int32_t sz = taosArrayGetSize(batchReq.reqs);
......
...@@ -382,6 +382,15 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -382,6 +382,15 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) { if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) {
SConnectRsp connectRsp = {0}; SConnectRsp connectRsp = {0};
tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp); tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp);
int32_t now = taosGetTimestampSec();
int32_t delta = abs(now - connectRsp.svrTimestamp);
if (delta > 900) {
msgInfo->code = TSDB_CODE_TIME_UNSYNCED;
goto _return;
}
if (connectRsp.epSet.numOfEps == 0) { if (connectRsp.epSet.numOfEps == 0) {
msgInfo->code = TSDB_CODE_MND_APP_ERROR; msgInfo->code = TSDB_CODE_MND_APP_ERROR;
goto _return; goto _return;
......
...@@ -516,13 +516,14 @@ static void idxCacheMakeRoomForWrite(IndexCache* cache) { ...@@ -516,13 +516,14 @@ static void idxCacheMakeRoomForWrite(IndexCache* cache) {
idxCacheRef(cache); idxCacheRef(cache);
cache->imm = cache->mem; cache->imm = cache->mem;
cache->mem = idxInternalCacheCreate(cache->type); cache->mem = idxInternalCacheCreate(cache->type);
cache->mem->pCache = cache; cache->mem->pCache = cache;
cache->occupiedMem = 0; cache->occupiedMem = 0;
if (quit == false) { if (quit == false) {
atomic_store_32(&cache->merging, 1); atomic_store_32(&cache->merging, 1);
} }
// sched to merge // 1. sched to merge
// unref cache in bgwork // 2. unref cache in bgwork
idxCacheSchedToMerge(cache, quit); idxCacheSchedToMerge(cache, quit);
} }
} }
......
...@@ -1042,7 +1042,7 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { ...@@ -1042,7 +1042,7 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
STraceId* trace = &pMsg->msg.info.traceId; STraceId* trace = &pMsg->msg.info.traceId;
char tbuf[256] = {0}; char tbuf[256] = {0};
EPSET_DEBUG_STR(&pCtx->epSet, tbuf); EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
tGTrace("%s retry on next node, use %s, retryCnt:%d, limit:%d", transLabel(pThrd->pTransInst), tbuf, tGDebug("%s retry on next node, use %s, retryCnt:%d, limit:%d", transLabel(pThrd->pTransInst), tbuf,
pCtx->retryCnt + 1, pCtx->retryLimit); pCtx->retryCnt + 1, pCtx->retryLimit);
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
...@@ -1134,11 +1134,11 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { ...@@ -1134,11 +1134,11 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
if (hasEpSet) { if (hasEpSet) {
char tbuf[256] = {0}; char tbuf[256] = {0};
EPSET_DEBUG_STR(&pCtx->epSet, tbuf); EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn); tGDebug("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
} }
if (pCtx->pSem != NULL) { if (pCtx->pSem != NULL) {
tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn); tGDebug("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn);
if (pCtx->pRsp == NULL) { if (pCtx->pRsp == NULL) {
tGTrace("%s conn %p(sync) failed to resp, ignore", CONN_GET_INST_LABEL(pConn), pConn); tGTrace("%s conn %p(sync) failed to resp, ignore", CONN_GET_INST_LABEL(pConn), pConn);
} else { } else {
...@@ -1147,7 +1147,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { ...@@ -1147,7 +1147,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
tsem_post(pCtx->pSem); tsem_post(pCtx->pSem);
pCtx->pRsp = NULL; pCtx->pRsp = NULL;
} else { } else {
tGTrace("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn); tGDebug("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn);
if (retry == false && hasEpSet == true) { if (retry == false && hasEpSet == true) {
pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet); pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet);
} else { } else {
...@@ -1257,7 +1257,7 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra ...@@ -1257,7 +1257,7 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra
cliMsg->refId = (int64_t)shandle; cliMsg->refId = (int64_t)shandle;
STraceId* trace = &pReq->info.traceId; STraceId* trace = &pReq->info.traceId;
tGTrace("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
ASSERT(transAsyncSend(pThrd->asyncPool, &(cliMsg->q)) == 0); ASSERT(transAsyncSend(pThrd->asyncPool, &(cliMsg->q)) == 0);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
...@@ -1297,7 +1297,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM ...@@ -1297,7 +1297,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
cliMsg->refId = (int64_t)shandle; cliMsg->refId = (int64_t)shandle;
STraceId* trace = &pReq->info.traceId; STraceId* trace = &pReq->info.traceId;
tGTrace("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
transAsyncSend(pThrd->asyncPool, &(cliMsg->q)); transAsyncSend(pThrd->asyncPool, &(cliMsg->q));
......
...@@ -1020,7 +1020,7 @@ void transRefSrvHandle(void* handle) { ...@@ -1020,7 +1020,7 @@ void transRefSrvHandle(void* handle) {
return; return;
} }
int ref = T_REF_INC((SSvrConn*)handle); int ref = T_REF_INC((SSvrConn*)handle);
tDebug("conn %p ref count:%d", handle, ref); tTrace("conn %p ref count:%d", handle, ref);
} }
void transUnrefSrvHandle(void* handle) { void transUnrefSrvHandle(void* handle) {
...@@ -1028,7 +1028,7 @@ void transUnrefSrvHandle(void* handle) { ...@@ -1028,7 +1028,7 @@ void transUnrefSrvHandle(void* handle) {
return; return;
} }
int ref = T_REF_DEC((SSvrConn*)handle); int ref = T_REF_DEC((SSvrConn*)handle);
tDebug("conn %p ref count:%d", handle, ref); tTrace("conn %p ref count:%d", handle, ref);
if (ref == 0) { if (ref == 0) {
destroyConn((SSvrConn*)handle, true); destroyConn((SSvrConn*)handle, true);
} }
......
...@@ -78,6 +78,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TIMESTAMP, "Invalid timestamp for ...@@ -78,6 +78,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TIMESTAMP, "Invalid timestamp for
TAOS_DEFINE_ERROR(TSDB_CODE_MSG_DECODE_ERROR, "Msg decode error") TAOS_DEFINE_ERROR(TSDB_CODE_MSG_DECODE_ERROR, "Msg decode error")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_AVAIL_DISK, "No available disk") TAOS_DEFINE_ERROR(TSDB_CODE_NO_AVAIL_DISK, "No available disk")
TAOS_DEFINE_ERROR(TSDB_CODE_NOT_FOUND, "Not found") TAOS_DEFINE_ERROR(TSDB_CODE_NOT_FOUND, "Not found")
TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Unsynced time")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs") TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册