未验证 提交 a89293b9 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #3154 from taosdata/hotfix/ready

TD-1184 add error code
...@@ -43,6 +43,14 @@ void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts); ...@@ -43,6 +43,14 @@ void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub); void tscSaveSubscriptionProgress(void* sub);
static int32_t minMsgSize() { return tsRpcHeadSize + 100; } static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
static int32_t getWaitingTimeInterval(int32_t count) {
int32_t initial = 100; // 100 ms by default
if (count <= 1) {
return 0;
}
return initial * (2<<(count - 2));
}
static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) { static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
assert(pSql != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0); assert(pSql != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0);
...@@ -275,6 +283,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -275,6 +283,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
(rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID ||
rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL ||
rpcMsg->code == TSDB_CODE_APP_NOT_READY ||
rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE)) { rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE)) {
tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry); tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);
...@@ -287,6 +296,12 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -287,6 +296,12 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
if (pSql->retry > pSql->maxRetry) { if (pSql->retry > pSql->maxRetry) {
tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry); tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
} else { } else {
// wait for a little bit moment and then retry
if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
int32_t duration = getWaitingTimeInterval(pSql->retry);
taosMsleep(duration);
}
rpcMsg->code = tscRenewTableMeta(pSql, pTableMetaInfo->name); rpcMsg->code = tscRenewTableMeta(pSql, pTableMetaInfo->name);
// if there is an error occurring, proceed to the following error handling procedure. // if there is an error occurring, proceed to the following error handling procedure.
......
...@@ -165,6 +165,13 @@ int32_t dnodeInitMgmtTimer() { ...@@ -165,6 +165,13 @@ int32_t dnodeInitMgmtTimer() {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void dnodeSendStatusMsgToMnode() {
if (tsDnodeTmr != NULL && tsStatusTimer != NULL) {
dInfo("force send status msg to mnode");
taosTmrReset(dnodeSendStatusMsg, 3, NULL, tsDnodeTmr, &tsStatusTimer);
}
}
void dnodeCleanupMgmtTimer() { void dnodeCleanupMgmtTimer() {
if (tsStatusTimer != NULL) { if (tsStatusTimer != NULL) {
taosTmrStopA(&tsStatusTimer); taosTmrStopA(&tsStatusTimer);
......
...@@ -93,7 +93,7 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { ...@@ -93,7 +93,7 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
if (pMsg->pCont == NULL) return; if (pMsg->pCont == NULL) return;
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) { if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) {
rspMsg.code = TSDB_CODE_RPC_NOT_READY; rspMsg.code = TSDB_CODE_APP_NOT_READY;
rpcSendResponse(&rspMsg); rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
dDebug("RPC %p, msg:%s is ignored since dnode not running", pMsg->handle, taosMsg[pMsg->msgType]); dDebug("RPC %p, msg:%s is ignored since dnode not running", pMsg->handle, taosMsg[pMsg->msgType]);
......
...@@ -119,7 +119,7 @@ void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { ...@@ -119,7 +119,7 @@ void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) { if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) {
dError("RPC %p, shell msg:%s is ignored since dnode not running", pMsg->handle, taosMsg[pMsg->msgType]); dError("RPC %p, shell msg:%s is ignored since dnode not running", pMsg->handle, taosMsg[pMsg->msgType]);
rpcMsg.code = TSDB_CODE_RPC_NOT_READY; rpcMsg.code = TSDB_CODE_APP_NOT_READY;
rpcSendResponse(&rpcMsg); rpcSendResponse(&rpcMsg);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
return; return;
...@@ -144,7 +144,7 @@ void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { ...@@ -144,7 +144,7 @@ void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) { static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
int code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey); int code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey);
if (code != TSDB_CODE_RPC_NOT_READY) return code; if (code != TSDB_CODE_APP_NOT_READY) return code;
SDMAuthMsg *pMsg = rpcMallocCont(sizeof(SDMAuthMsg)); SDMAuthMsg *pMsg = rpcMallocCont(sizeof(SDMAuthMsg));
tstrncpy(pMsg->user, user, sizeof(pMsg->user)); tstrncpy(pMsg->user, user, sizeof(pMsg->user));
......
...@@ -65,6 +65,8 @@ void dnodeSendRpcMnodeWriteRsp(void *pMsg, int32_t code); ...@@ -65,6 +65,8 @@ void dnodeSendRpcMnodeWriteRsp(void *pMsg, int32_t code);
void dnodeReprocessMnodeWriteMsg(void *pMsg); void dnodeReprocessMnodeWriteMsg(void *pMsg);
void dnodeDelayReprocessMnodeWriteMsg(void *pMsg); void dnodeDelayReprocessMnodeWriteMsg(void *pMsg);
void dnodeSendStatusMsgToMnode();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -65,6 +65,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_SESSION_ID, 0, 0x0010, "Invalid se ...@@ -65,6 +65,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_SESSION_ID, 0, 0x0010, "Invalid se
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_MSG_TYPE, 0, 0x0011, "Invalid message type") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_MSG_TYPE, 0, 0x0011, "Invalid message type")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_RESPONSE_TYPE, 0, 0x0012, "Invalid response type") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_RESPONSE_TYPE, 0, 0x0012, "Invalid response type")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_TIME_STAMP, 0, 0x0013, "Invalid timestamp") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_TIME_STAMP, 0, 0x0013, "Invalid timestamp")
TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, 0, 0x0014, "Database not ready")
//common & util //common & util
TAOS_DEFINE_ERROR(TSDB_CODE_COM_OPS_NOT_SUPPORT, 0, 0x0100, "Operation not supported") TAOS_DEFINE_ERROR(TSDB_CODE_COM_OPS_NOT_SUPPORT, 0, 0x0100, "Operation not supported")
...@@ -184,7 +185,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_DISK_PERMISSIONS, 0, 0x0506, "No write p ...@@ -184,7 +185,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_DISK_PERMISSIONS, 0, 0x0506, "No write p
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR, 0, 0x0507, "Missing data file") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR, 0, 0x0507, "Missing data file")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_OUT_OF_MEMORY, 0, 0x0508, "Out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_VND_OUT_OF_MEMORY, 0, 0x0508, "Out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_APP_ERROR, 0, 0x0509, "Unexpected generic error in vnode") TAOS_DEFINE_ERROR(TSDB_CODE_VND_APP_ERROR, 0, 0x0509, "Unexpected generic error in vnode")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_STATUS, 0, 0x0510, "Database not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, 0, 0x0511, "Database suspended") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, 0, 0x0511, "Database suspended")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0512, "Write operation denied") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0512, "Write operation denied")
......
...@@ -581,8 +581,8 @@ void mnodeDropAllUsers(SAcctObj *pAcct) { ...@@ -581,8 +581,8 @@ void mnodeDropAllUsers(SAcctObj *pAcct) {
int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) { int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
if (!sdbIsMaster()) { if (!sdbIsMaster()) {
*secret = 0; *secret = 0;
mDebug("user:%s, failed to auth user, reason:%s", user, tstrerror(TSDB_CODE_RPC_NOT_READY)); mDebug("user:%s, failed to auth user, reason:%s", user, tstrerror(TSDB_CODE_APP_NOT_READY));
return TSDB_CODE_RPC_NOT_READY; return TSDB_CODE_APP_NOT_READY;
} }
SUserObj *pUser = mnodeGetUser(user); SUserObj *pUser = mnodeGetUser(user);
......
...@@ -1120,7 +1120,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -1120,7 +1120,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
pContext->epSet.port[i] = htons(pContext->epSet.port[i]); pContext->epSet.port[i] = htons(pContext->epSet.port[i]);
rpcSendReqToServer(pRpc, pContext); rpcSendReqToServer(pRpc, pContext);
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
} else if (pHead->code == TSDB_CODE_RPC_NOT_READY) { } else if (pHead->code == TSDB_CODE_RPC_NOT_READY || pHead->code == TSDB_CODE_APP_NOT_READY) {
pContext->code = pHead->code; pContext->code = pHead->code;
rpcProcessConnError(pContext, NULL); rpcProcessConnError(pContext, NULL);
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
#include "vnode.h" #include "vnode.h"
#include "vnodeInt.h" #include "vnodeInt.h"
#include "query.h" #include "query.h"
#include "dnode.h"
#define TSDB_VNODE_VERSION_CONTENT_LEN 31 #define TSDB_VNODE_VERSION_CONTENT_LEN 31
...@@ -361,6 +362,7 @@ void vnodeRelease(void *pVnodeRaw) { ...@@ -361,6 +362,7 @@ void vnodeRelease(void *pVnodeRaw) {
sprintf(rootDir, "%s/vnode%d", tsVnodeDir, vgId); sprintf(rootDir, "%s/vnode%d", tsVnodeDir, vgId);
taosMvDir(tsVnodeBakDir, rootDir); taosMvDir(tsVnodeBakDir, rootDir);
taosRemoveDir(rootDir); taosRemoveDir(rootDir);
dnodeSendStatusMsgToMnode();
} }
tsem_destroy(&pVnode->sem); tsem_destroy(&pVnode->sem);
...@@ -390,7 +392,7 @@ void *vnodeAcquireRqueue(int32_t vgId) { ...@@ -390,7 +392,7 @@ void *vnodeAcquireRqueue(int32_t vgId) {
if (pVnode == NULL) return NULL; if (pVnode == NULL) return NULL;
if (pVnode->status == TAOS_VN_STATUS_RESET) { if (pVnode->status == TAOS_VN_STATUS_RESET) {
terrno = TSDB_CODE_VND_INVALID_STATUS; terrno = TSDB_CODE_APP_NOT_READY;
vInfo("vgId:%d, status is in reset", vgId); vInfo("vgId:%d, status is in reset", vgId);
vnodeRelease(pVnode); vnodeRelease(pVnode);
return NULL; return NULL;
...@@ -404,7 +406,7 @@ void *vnodeAcquireWqueue(int32_t vgId) { ...@@ -404,7 +406,7 @@ void *vnodeAcquireWqueue(int32_t vgId) {
if (pVnode == NULL) return NULL; if (pVnode == NULL) return NULL;
if (pVnode->status == TAOS_VN_STATUS_RESET) { if (pVnode->status == TAOS_VN_STATUS_RESET) {
terrno = TSDB_CODE_VND_INVALID_STATUS; terrno = TSDB_CODE_APP_NOT_READY;
vInfo("vgId:%d, status is in reset", vgId); vInfo("vgId:%d, status is in reset", vgId);
vnodeRelease(pVnode); vnodeRelease(pVnode);
return NULL; return NULL;
...@@ -547,6 +549,7 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) { ...@@ -547,6 +549,7 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) {
SVnodeObj *pVnode = ahandle; SVnodeObj *pVnode = ahandle;
vInfo("vgId:%d, sync role changed from %d to %d", pVnode->vgId, pVnode->role, role); vInfo("vgId:%d, sync role changed from %d to %d", pVnode->vgId, pVnode->role, role);
pVnode->role = role; pVnode->role = role;
dnodeSendStatusMsgToMnode();
if (pVnode->role == TAOS_SYNC_ROLE_MASTER) if (pVnode->role == TAOS_SYNC_ROLE_MASTER)
cqStart(pVnode->cq); cqStart(pVnode->cq);
......
...@@ -49,18 +49,18 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { ...@@ -49,18 +49,18 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
if (pVnode->status != TAOS_VN_STATUS_READY) { if (pVnode->status != TAOS_VN_STATUS_READY) {
vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[msgType], pVnode->status); vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[msgType], pVnode->status);
return TSDB_CODE_VND_INVALID_STATUS; return TSDB_CODE_APP_NOT_READY;
} }
// tsdb may be in reset state // tsdb may be in reset state
if (pVnode->tsdb == NULL) return TSDB_CODE_RPC_NOT_READY; if (pVnode->tsdb == NULL) return TSDB_CODE_APP_NOT_READY;
if (pVnode->status == TAOS_VN_STATUS_CLOSING) if (pVnode->status == TAOS_VN_STATUS_CLOSING)
return TSDB_CODE_RPC_NOT_READY; return TSDB_CODE_APP_NOT_READY;
// TODO: Later, let slave to support query // TODO: Later, let slave to support query
if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) { if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role); vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role);
return TSDB_CODE_RPC_NOT_READY; return TSDB_CODE_APP_NOT_READY;
} }
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg); return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
......
...@@ -60,19 +60,19 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { ...@@ -60,19 +60,19 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
} }
// tsdb may be in reset state // tsdb may be in reset state
if (pVnode->tsdb == NULL) return TSDB_CODE_RPC_NOT_READY; if (pVnode->tsdb == NULL) return TSDB_CODE_APP_NOT_READY;
if (pVnode->status == TAOS_VN_STATUS_CLOSING) if (pVnode->status == TAOS_VN_STATUS_CLOSING)
return TSDB_CODE_RPC_NOT_READY; return TSDB_CODE_APP_NOT_READY;
if (pHead->version == 0) { // from client or CQ if (pHead->version == 0) { // from client or CQ
if (pVnode->status != TAOS_VN_STATUS_READY) { if (pVnode->status != TAOS_VN_STATUS_READY) {
vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->status); vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->status);
return TSDB_CODE_VND_INVALID_STATUS; // it may be in deleting or closing state return TSDB_CODE_APP_NOT_READY; // it may be in deleting or closing state
} }
if (pVnode->role != TAOS_SYNC_ROLE_MASTER) { if (pVnode->role != TAOS_SYNC_ROLE_MASTER) {
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->syncCfg.replica, pVnode->role); vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->syncCfg.replica, pVnode->role);
return TSDB_CODE_RPC_NOT_READY; return TSDB_CODE_APP_NOT_READY;
} }
// assign version // assign version
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册