diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 0785dd8b53392ee36e439c1a381479d1a09041d1..b5fa1ea15983d30c76125d5dba94ebbf2aa4d989 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -43,6 +43,14 @@ void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts); void tscSaveSubscriptionProgress(void* sub); static int32_t minMsgSize() { return tsRpcHeadSize + 100; } +static int32_t getWaitingTimeInterval(int32_t count) { + int32_t initial = 100; // 100 ms by default + if (count <= 1) { + return 0; + } + + return initial * (2<<(count - 2)); +} static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) { assert(pSql != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0); @@ -275,6 +283,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID || rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || + rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE)) { tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry); @@ -287,6 +296,12 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { if (pSql->retry > pSql->maxRetry) { tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry); } else { + // wait for a little bit moment and then retry + if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { + int32_t duration = getWaitingTimeInterval(pSql->retry); + taosMsleep(duration); + } + rpcMsg->code = tscRenewTableMeta(pSql, pTableMetaInfo->name); // if there is an error occurring, proceed to the following error handling procedure. diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 4b06626753b73fc63487bf699280953115ed75a7..c1071f9698a4f136681f38a3c10132775219cf07 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -165,6 +165,13 @@ int32_t dnodeInitMgmtTimer() { return TSDB_CODE_SUCCESS; } +void dnodeSendStatusMsgToMnode() { + if (tsDnodeTmr != NULL && tsStatusTimer != NULL) { + dInfo("force send status msg to mnode"); + taosTmrReset(dnodeSendStatusMsg, 3, NULL, tsDnodeTmr, &tsStatusTimer); + } +} + void dnodeCleanupMgmtTimer() { if (tsStatusTimer != NULL) { taosTmrStopA(&tsStatusTimer); diff --git a/src/dnode/src/dnodePeer.c b/src/dnode/src/dnodePeer.c index b27f56a871bf4b0eecf841b625c6251810edbfce..c09d7422396b4525945ae0bb1f32d77d82ac9903 100644 --- a/src/dnode/src/dnodePeer.c +++ b/src/dnode/src/dnodePeer.c @@ -93,7 +93,7 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { if (pMsg->pCont == NULL) return; if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) { - rspMsg.code = TSDB_CODE_RPC_NOT_READY; + rspMsg.code = TSDB_CODE_APP_NOT_READY; rpcSendResponse(&rspMsg); rpcFreeCont(pMsg->pCont); dDebug("RPC %p, msg:%s is ignored since dnode not running", pMsg->handle, taosMsg[pMsg->msgType]); diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index f9d137bb99505047ed23fee5a58b57a014789e7f..5daf61670655486ec629fb715d2b33ed6f32d4f2 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -119,7 +119,7 @@ void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) { dError("RPC %p, shell msg:%s is ignored since dnode not running", pMsg->handle, taosMsg[pMsg->msgType]); - rpcMsg.code = TSDB_CODE_RPC_NOT_READY; + rpcMsg.code = TSDB_CODE_APP_NOT_READY; rpcSendResponse(&rpcMsg); rpcFreeCont(pMsg->pCont); return; @@ -144,7 +144,7 @@ void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) { int code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey); - if (code != TSDB_CODE_RPC_NOT_READY) return code; + if (code != TSDB_CODE_APP_NOT_READY) return code; SDMAuthMsg *pMsg = rpcMallocCont(sizeof(SDMAuthMsg)); tstrncpy(pMsg->user, user, sizeof(pMsg->user)); diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 5a059c93a627b9c57085db4ad30058393f8791b0..093ce93205646313a5f0ca3bec4d121ad8e20776 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -65,6 +65,8 @@ void dnodeSendRpcMnodeWriteRsp(void *pMsg, int32_t code); void dnodeReprocessMnodeWriteMsg(void *pMsg); void dnodeDelayReprocessMnodeWriteMsg(void *pMsg); +void dnodeSendStatusMsgToMnode(); + #ifdef __cplusplus } #endif diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 9af4cee28ae2eb11ab24e987cecbf787e7e8fe50..57c2b322faee5f56a5b8e1f1fe38a597a63a0b9e 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -65,6 +65,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_SESSION_ID, 0, 0x0010, "Invalid se TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_MSG_TYPE, 0, 0x0011, "Invalid message type") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_RESPONSE_TYPE, 0, 0x0012, "Invalid response type") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_TIME_STAMP, 0, 0x0013, "Invalid timestamp") +TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, 0, 0x0014, "Database not ready") //common & util TAOS_DEFINE_ERROR(TSDB_CODE_COM_OPS_NOT_SUPPORT, 0, 0x0100, "Operation not supported") @@ -184,7 +185,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_DISK_PERMISSIONS, 0, 0x0506, "No write p TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR, 0, 0x0507, "Missing data file") TAOS_DEFINE_ERROR(TSDB_CODE_VND_OUT_OF_MEMORY, 0, 0x0508, "Out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_VND_APP_ERROR, 0, 0x0509, "Unexpected generic error in vnode") -TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_STATUS, 0, 0x0510, "Database not ready") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, 0, 0x0511, "Database suspended") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0512, "Write operation denied") diff --git a/src/mnode/src/mnodeUser.c b/src/mnode/src/mnodeUser.c index 6e63a8f2de5f6d8af5d1fe295f4ae9fa774a0395..f4cb1a9ef3d435cd044df03fd96835043b73ff71 100644 --- a/src/mnode/src/mnodeUser.c +++ b/src/mnode/src/mnodeUser.c @@ -581,8 +581,8 @@ void mnodeDropAllUsers(SAcctObj *pAcct) { int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) { if (!sdbIsMaster()) { *secret = 0; - mDebug("user:%s, failed to auth user, reason:%s", user, tstrerror(TSDB_CODE_RPC_NOT_READY)); - return TSDB_CODE_RPC_NOT_READY; + mDebug("user:%s, failed to auth user, reason:%s", user, tstrerror(TSDB_CODE_APP_NOT_READY)); + return TSDB_CODE_APP_NOT_READY; } SUserObj *pUser = mnodeGetUser(user); diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index d0c57a34d0551639c3f689d39f34e38ff88836b1..777e4f824040f675d026f0785217f2b47f669105 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -1120,7 +1120,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { pContext->epSet.port[i] = htons(pContext->epSet.port[i]); rpcSendReqToServer(pRpc, pContext); rpcFreeCont(rpcMsg.pCont); - } else if (pHead->code == TSDB_CODE_RPC_NOT_READY) { + } else if (pHead->code == TSDB_CODE_RPC_NOT_READY || pHead->code == TSDB_CODE_APP_NOT_READY) { pContext->code = pHead->code; rpcProcessConnError(pContext, NULL); rpcFreeCont(rpcMsg.pCont); diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 0a5e292f6de359fea2ca22825a95cfb058302023..e5536bfaafd44c5e043c45bcdb1c3840e06e86ab 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -30,6 +30,7 @@ #include "vnode.h" #include "vnodeInt.h" #include "query.h" +#include "dnode.h" #define TSDB_VNODE_VERSION_CONTENT_LEN 31 @@ -361,6 +362,7 @@ void vnodeRelease(void *pVnodeRaw) { sprintf(rootDir, "%s/vnode%d", tsVnodeDir, vgId); taosMvDir(tsVnodeBakDir, rootDir); taosRemoveDir(rootDir); + dnodeSendStatusMsgToMnode(); } tsem_destroy(&pVnode->sem); @@ -390,7 +392,7 @@ void *vnodeAcquireRqueue(int32_t vgId) { if (pVnode == NULL) return NULL; if (pVnode->status == TAOS_VN_STATUS_RESET) { - terrno = TSDB_CODE_VND_INVALID_STATUS; + terrno = TSDB_CODE_APP_NOT_READY; vInfo("vgId:%d, status is in reset", vgId); vnodeRelease(pVnode); return NULL; @@ -404,7 +406,7 @@ void *vnodeAcquireWqueue(int32_t vgId) { if (pVnode == NULL) return NULL; if (pVnode->status == TAOS_VN_STATUS_RESET) { - terrno = TSDB_CODE_VND_INVALID_STATUS; + terrno = TSDB_CODE_APP_NOT_READY; vInfo("vgId:%d, status is in reset", vgId); vnodeRelease(pVnode); return NULL; @@ -547,6 +549,7 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) { SVnodeObj *pVnode = ahandle; vInfo("vgId:%d, sync role changed from %d to %d", pVnode->vgId, pVnode->role, role); pVnode->role = role; + dnodeSendStatusMsgToMnode(); if (pVnode->role == TAOS_SYNC_ROLE_MASTER) cqStart(pVnode->cq); diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index c41b24579469d93f6864727a919327f782f87dbb..11315f5321f674fa11faeef98d28a1776f42468c 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -49,18 +49,18 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { if (pVnode->status != TAOS_VN_STATUS_READY) { vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[msgType], pVnode->status); - return TSDB_CODE_VND_INVALID_STATUS; + return TSDB_CODE_APP_NOT_READY; } // tsdb may be in reset state - if (pVnode->tsdb == NULL) return TSDB_CODE_RPC_NOT_READY; + if (pVnode->tsdb == NULL) return TSDB_CODE_APP_NOT_READY; if (pVnode->status == TAOS_VN_STATUS_CLOSING) - return TSDB_CODE_RPC_NOT_READY; + return TSDB_CODE_APP_NOT_READY; // TODO: Later, let slave to support query if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) { vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role); - return TSDB_CODE_RPC_NOT_READY; + return TSDB_CODE_APP_NOT_READY; } return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg); diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 900ff1fbbacc034b20bc0a04df5758e595bb3f32..1a9b05ed34eb787ba6594c99f2fdf7c3f3b81945 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -60,19 +60,19 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { } // tsdb may be in reset state - if (pVnode->tsdb == NULL) return TSDB_CODE_RPC_NOT_READY; + if (pVnode->tsdb == NULL) return TSDB_CODE_APP_NOT_READY; if (pVnode->status == TAOS_VN_STATUS_CLOSING) - return TSDB_CODE_RPC_NOT_READY; + return TSDB_CODE_APP_NOT_READY; if (pHead->version == 0) { // from client or CQ if (pVnode->status != TAOS_VN_STATUS_READY) { vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->status); - return TSDB_CODE_VND_INVALID_STATUS; // it may be in deleting or closing state + return TSDB_CODE_APP_NOT_READY; // it may be in deleting or closing state } if (pVnode->role != TAOS_SYNC_ROLE_MASTER) { vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->syncCfg.replica, pVnode->role); - return TSDB_CODE_RPC_NOT_READY; + return TSDB_CODE_APP_NOT_READY; } // assign version