提交 f55f133b 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/develop' into feature/os

......@@ -43,6 +43,14 @@ void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub);
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
static int32_t getWaitingTimeInterval(int32_t count) {
int32_t initial = 100; // 100 ms by default
if (count <= 1) {
return 0;
}
return initial * (2<<(count - 2));
}
static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
assert(pSql != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0);
......@@ -275,6 +283,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
(rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID ||
rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID ||
rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL ||
rpcMsg->code == TSDB_CODE_APP_NOT_READY ||
rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE)) {
tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);
......@@ -287,6 +296,12 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
if (pSql->retry > pSql->maxRetry) {
tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
} else {
// wait for a little bit moment and then retry
if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
int32_t duration = getWaitingTimeInterval(pSql->retry);
taosMsleep(duration);
}
rpcMsg->code = tscRenewTableMeta(pSql, pTableMetaInfo->name);
// if there is an error occurring, proceed to the following error handling procedure.
......
......@@ -165,6 +165,13 @@ int32_t dnodeInitMgmtTimer() {
return TSDB_CODE_SUCCESS;
}
void dnodeSendStatusMsgToMnode() {
if (tsDnodeTmr != NULL && tsStatusTimer != NULL) {
dInfo("force send status msg to mnode");
taosTmrReset(dnodeSendStatusMsg, 3, NULL, tsDnodeTmr, &tsStatusTimer);
}
}
void dnodeCleanupMgmtTimer() {
if (tsStatusTimer != NULL) {
taosTmrStopA(&tsStatusTimer);
......
......@@ -93,7 +93,7 @@ static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
if (pMsg->pCont == NULL) return;
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) {
rspMsg.code = TSDB_CODE_RPC_NOT_READY;
rspMsg.code = TSDB_CODE_APP_NOT_READY;
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
dDebug("RPC %p, msg:%s is ignored since dnode not running", pMsg->handle, taosMsg[pMsg->msgType]);
......
......@@ -119,7 +119,7 @@ void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) {
dError("RPC %p, shell msg:%s is ignored since dnode not running", pMsg->handle, taosMsg[pMsg->msgType]);
rpcMsg.code = TSDB_CODE_RPC_NOT_READY;
rpcMsg.code = TSDB_CODE_APP_NOT_READY;
rpcSendResponse(&rpcMsg);
rpcFreeCont(pMsg->pCont);
return;
......@@ -144,7 +144,7 @@ void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
int code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey);
if (code != TSDB_CODE_RPC_NOT_READY) return code;
if (code != TSDB_CODE_APP_NOT_READY) return code;
SDMAuthMsg *pMsg = rpcMallocCont(sizeof(SDMAuthMsg));
tstrncpy(pMsg->user, user, sizeof(pMsg->user));
......
......@@ -65,6 +65,8 @@ void dnodeSendRpcMnodeWriteRsp(void *pMsg, int32_t code);
void dnodeReprocessMnodeWriteMsg(void *pMsg);
void dnodeDelayReprocessMnodeWriteMsg(void *pMsg);
void dnodeSendStatusMsgToMnode();
#ifdef __cplusplus
}
#endif
......
......@@ -65,6 +65,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_SESSION_ID, 0, 0x0010, "Invalid se
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_MSG_TYPE, 0, 0x0011, "Invalid message type")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_RESPONSE_TYPE, 0, 0x0012, "Invalid response type")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_TIME_STAMP, 0, 0x0013, "Invalid timestamp")
TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, 0, 0x0014, "Database not ready")
//common & util
TAOS_DEFINE_ERROR(TSDB_CODE_COM_OPS_NOT_SUPPORT, 0, 0x0100, "Operation not supported")
......@@ -184,7 +185,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_DISK_PERMISSIONS, 0, 0x0506, "No write p
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR, 0, 0x0507, "Missing data file")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_OUT_OF_MEMORY, 0, 0x0508, "Out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_APP_ERROR, 0, 0x0509, "Unexpected generic error in vnode")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_STATUS, 0, 0x0510, "Database not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, 0, 0x0511, "Database suspended")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0512, "Write operation denied")
......
......@@ -581,8 +581,8 @@ void mnodeDropAllUsers(SAcctObj *pAcct) {
int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
if (!sdbIsMaster()) {
*secret = 0;
mDebug("user:%s, failed to auth user, reason:%s", user, tstrerror(TSDB_CODE_RPC_NOT_READY));
return TSDB_CODE_RPC_NOT_READY;
mDebug("user:%s, failed to auth user, reason:%s", user, tstrerror(TSDB_CODE_APP_NOT_READY));
return TSDB_CODE_APP_NOT_READY;
}
SUserObj *pUser = mnodeGetUser(user);
......
......@@ -1120,7 +1120,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
pContext->epSet.port[i] = htons(pContext->epSet.port[i]);
rpcSendReqToServer(pRpc, pContext);
rpcFreeCont(rpcMsg.pCont);
} else if (pHead->code == TSDB_CODE_RPC_NOT_READY) {
} else if (pHead->code == TSDB_CODE_RPC_NOT_READY || pHead->code == TSDB_CODE_APP_NOT_READY) {
pContext->code = pHead->code;
rpcProcessConnError(pContext, NULL);
rpcFreeCont(rpcMsg.pCont);
......
......@@ -30,6 +30,7 @@
#include "vnode.h"
#include "vnodeInt.h"
#include "query.h"
#include "dnode.h"
#define TSDB_VNODE_VERSION_CONTENT_LEN 31
......@@ -361,6 +362,7 @@ void vnodeRelease(void *pVnodeRaw) {
sprintf(rootDir, "%s/vnode%d", tsVnodeDir, vgId);
taosMvDir(tsVnodeBakDir, rootDir);
taosRemoveDir(rootDir);
dnodeSendStatusMsgToMnode();
}
tsem_destroy(&pVnode->sem);
......@@ -390,7 +392,7 @@ void *vnodeAcquireRqueue(int32_t vgId) {
if (pVnode == NULL) return NULL;
if (pVnode->status == TAOS_VN_STATUS_RESET) {
terrno = TSDB_CODE_VND_INVALID_STATUS;
terrno = TSDB_CODE_APP_NOT_READY;
vInfo("vgId:%d, status is in reset", vgId);
vnodeRelease(pVnode);
return NULL;
......@@ -404,7 +406,7 @@ void *vnodeAcquireWqueue(int32_t vgId) {
if (pVnode == NULL) return NULL;
if (pVnode->status == TAOS_VN_STATUS_RESET) {
terrno = TSDB_CODE_VND_INVALID_STATUS;
terrno = TSDB_CODE_APP_NOT_READY;
vInfo("vgId:%d, status is in reset", vgId);
vnodeRelease(pVnode);
return NULL;
......@@ -547,6 +549,7 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) {
SVnodeObj *pVnode = ahandle;
vInfo("vgId:%d, sync role changed from %d to %d", pVnode->vgId, pVnode->role, role);
pVnode->role = role;
dnodeSendStatusMsgToMnode();
if (pVnode->role == TAOS_SYNC_ROLE_MASTER)
cqStart(pVnode->cq);
......
......@@ -49,18 +49,18 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
if (pVnode->status != TAOS_VN_STATUS_READY) {
vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[msgType], pVnode->status);
return TSDB_CODE_VND_INVALID_STATUS;
return TSDB_CODE_APP_NOT_READY;
}
// tsdb may be in reset state
if (pVnode->tsdb == NULL) return TSDB_CODE_RPC_NOT_READY;
if (pVnode->tsdb == NULL) return TSDB_CODE_APP_NOT_READY;
if (pVnode->status == TAOS_VN_STATUS_CLOSING)
return TSDB_CODE_RPC_NOT_READY;
return TSDB_CODE_APP_NOT_READY;
// TODO: Later, let slave to support query
if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role);
return TSDB_CODE_RPC_NOT_READY;
return TSDB_CODE_APP_NOT_READY;
}
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
......
......@@ -60,19 +60,19 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
}
// tsdb may be in reset state
if (pVnode->tsdb == NULL) return TSDB_CODE_RPC_NOT_READY;
if (pVnode->tsdb == NULL) return TSDB_CODE_APP_NOT_READY;
if (pVnode->status == TAOS_VN_STATUS_CLOSING)
return TSDB_CODE_RPC_NOT_READY;
return TSDB_CODE_APP_NOT_READY;
if (pHead->version == 0) { // from client or CQ
if (pVnode->status != TAOS_VN_STATUS_READY) {
vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->status);
return TSDB_CODE_VND_INVALID_STATUS; // it may be in deleting or closing state
return TSDB_CODE_APP_NOT_READY; // it may be in deleting or closing state
}
if (pVnode->role != TAOS_SYNC_ROLE_MASTER) {
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->syncCfg.replica, pVnode->role);
return TSDB_CODE_RPC_NOT_READY;
return TSDB_CODE_APP_NOT_READY;
}
// assign version
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册