未验证 提交 652f5f61 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2179 from taosdata/hotfix/crash

[TD-549] fix crash while vread return TSDB_CODE_NOT_READY errno
...@@ -192,13 +192,14 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) { ...@@ -192,13 +192,14 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
if (code == TSDB_CODE_VND_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_VND_ACTION_IN_PROGRESS) return;
if (code == TSDB_CODE_VND_ACTION_NEED_REPROCESSED) { if (code == TSDB_CODE_VND_ACTION_NEED_REPROCESSED) {
dnodeContinueExecuteQuery(pVnode, pRead->rspRet.qhandle, pRead); dnodeContinueExecuteQuery(pVnode, pRead->rspRet.qhandle, pRead);
code = TSDB_CODE_SUCCESS;
} }
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.handle = pRead->rpcMsg.handle, .handle = pRead->rpcMsg.handle,
.pCont = pRead->rspRet.rsp, .pCont = pRead->rspRet.rsp,
.contLen = pRead->rspRet.len, .contLen = pRead->rspRet.len,
.code = pRead->rspRet.code, .code = code,
}; };
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
...@@ -216,7 +217,7 @@ static void *dnodeProcessReadQueue(void *param) { ...@@ -216,7 +217,7 @@ static void *dnodeProcessReadQueue(void *param) {
break; break;
} }
dTrace("%p, msg:%s will be processed", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]); dTrace("%p, msg:%s will be processed in vread queue", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]);
int32_t code = vnodeProcessRead(pVnode, pReadMsg->rpcMsg.msgType, pReadMsg->pCont, pReadMsg->contLen, &pReadMsg->rspRet); int32_t code = vnodeProcessRead(pVnode, pReadMsg->rpcMsg.msgType, pReadMsg->pCont, pReadMsg->contLen, &pReadMsg->rspRet);
dnodeSendRpcReadRsp(pVnode, pReadMsg, code); dnodeSendRpcReadRsp(pVnode, pReadMsg, code);
taosFreeQitem(pReadMsg); taosFreeQitem(pReadMsg);
......
...@@ -216,7 +216,7 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -216,7 +216,7 @@ static void *dnodeProcessWriteQueue(void *param) {
pHead->msgType = pWrite->rpcMsg.msgType; pHead->msgType = pWrite->rpcMsg.msgType;
pHead->version = 0; pHead->version = 0;
pHead->len = pWrite->contLen; pHead->len = pWrite->contLen;
dTrace("%p, msg:%s will be processed", pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]); dTrace("%p, msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]);
} else { } else {
pHead = (SWalHead *)item; pHead = (SWalHead *)item;
} }
......
...@@ -30,7 +30,6 @@ typedef enum _VN_STATUS { ...@@ -30,7 +30,6 @@ typedef enum _VN_STATUS {
typedef struct { typedef struct {
int len; int len;
int code;
void *rsp; void *rsp;
void *qhandle; //used by query and retrieve msg void *qhandle; //used by query and retrieve msg
} SRspRet; } SRspRet;
......
...@@ -39,15 +39,21 @@ void vnodeInitReadFp(void) { ...@@ -39,15 +39,21 @@ void vnodeInitReadFp(void) {
int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, SRspRet *ret) { int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, SRspRet *ret) {
SVnodeObj *pVnode = (SVnodeObj *)param; SVnodeObj *pVnode = (SVnodeObj *)param;
if (vnodeProcessReadMsgFp[msgType] == NULL) if (vnodeProcessReadMsgFp[msgType] == NULL) {
vTrace("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[msgType]);
return TSDB_CODE_VND_MSG_NOT_PROCESSED; return TSDB_CODE_VND_MSG_NOT_PROCESSED;
}
if (pVnode->status == TAOS_VN_STATUS_DELETING || pVnode->status == TAOS_VN_STATUS_CLOSING) if (pVnode->status == TAOS_VN_STATUS_DELETING || pVnode->status == TAOS_VN_STATUS_CLOSING) {
vTrace("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[msgType], pVnode->status);
return TSDB_CODE_VND_INVALID_VGROUP_ID; return TSDB_CODE_VND_INVALID_VGROUP_ID;
}
// TODO: Later, let slave to support query // TODO: Later, let slave to support query
if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
vTrace("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role);
return TSDB_CODE_RPC_NOT_READY; return TSDB_CODE_RPC_NOT_READY;
}
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pCont, contLen, ret); return (*vnodeProcessReadMsgFp[msgType])(pVnode, pCont, contLen, ret);
} }
...@@ -60,11 +66,11 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont ...@@ -60,11 +66,11 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont
qinfo_t pQInfo = NULL; qinfo_t pQInfo = NULL;
if (contLen != 0) { if (contLen != 0) {
pRet->code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo); code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->qhandle = htobe64((uint64_t) (pQInfo)); pRsp->qhandle = htobe64((uint64_t) (pQInfo));
pRsp->code = pRet->code; pRsp->code = code;
pRet->len = sizeof(SQueryTableRsp); pRet->len = sizeof(SQueryTableRsp);
pRet->rsp = pRsp; pRet->rsp = pRsp;
...@@ -74,9 +80,11 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont ...@@ -74,9 +80,11 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont
assert(pCont != NULL); assert(pCont != NULL);
pQInfo = pCont; pQInfo = pCont;
code = TSDB_CODE_VND_ACTION_IN_PROGRESS; code = TSDB_CODE_VND_ACTION_IN_PROGRESS;
vTrace("vgId:%d, QInfo:%p, dnode query msg in progress", pVnode->vgId, pQInfo);
} }
if (pQInfo != NULL) { if (pQInfo != NULL) {
vTrace("vgId:%d, QInfo:%p, do qTableQuery", pVnode->vgId, pQInfo);
qTableQuery(pQInfo); // do execute query qTableQuery(pQInfo); // do execute query
} }
...@@ -88,18 +96,16 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t cont ...@@ -88,18 +96,16 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t cont
void *pQInfo = (void*) htobe64(pRetrieve->qhandle); void *pQInfo = (void*) htobe64(pRetrieve->qhandle);
memset(pRet, 0, sizeof(SRspRet)); memset(pRet, 0, sizeof(SRspRet));
int32_t code = TSDB_CODE_SUCCESS;
vTrace("vgId:%d, QInfo:%p, retrieve msg is received", pVnode->vgId, pQInfo); vTrace("vgId:%d, QInfo:%p, retrieve msg is received", pVnode->vgId, pQInfo);
pRet->code = qRetrieveQueryResultInfo(pQInfo); int32_t code = qRetrieveQueryResultInfo(pQInfo);
if (pRet->code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
//TODO //TODO
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
} else { } else {
// todo check code and handle error in build result set // todo check code and handle error in build result set
pRet->code = qDumpRetrieveResult(pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len); code = qDumpRetrieveResult(pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len);
if (qHasMoreResultsToRetrieve(pQInfo)) { if (qHasMoreResultsToRetrieve(pQInfo)) {
pRet->qhandle = pQInfo; pRet->qhandle = pQInfo;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册