diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index 7a57b0a2c0857cc71e3882733746ee8f8420d70c..d4365dae102cb6645cd38abe73a88a0ad7969a40 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -15,38 +15,25 @@ #define _DEFAULT_SOURCE #include "os.h" - #include "taoserror.h" #include "taosmsg.h" #include "tlog.h" #include "tqueue.h" #include "trpc.h" - #include "twal.h" #include "dnodeMgmt.h" #include "dnodeRead.h" -#include "queryExecutor.h" #include "vnode.h" typedef struct { - int32_t code; - int32_t count; - int32_t numOfVnodes; -} SRpcContext; - -typedef struct { - void *pCont; - int32_t contLen; - SRpcMsg rpcMsg; - SRpcContext *pRpcContext; // RPC message context + SRspRet rspRet; + void *pCont; + int32_t contLen; + SRpcMsg rpcMsg; } SReadMsg; static void *dnodeProcessReadQueue(void *param); -static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead); static void dnodeHandleIdleReadWorker(); -static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg); -static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg); -static void(*dnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(void *pVnode, SReadMsg *pNode); // module global variable static taos_qset readQset; @@ -55,14 +42,11 @@ static int32_t maxThreads; static int32_t minThreads; int32_t dnodeInitRead() { - dnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeProcessQueryMsg; - dnodeProcessReadMsgFp[TSDB_MSG_TYPE_RETRIEVE] = dnodeProcessRetrieveMsg; - readQset = taosOpenQset(); minThreads = 3; - maxThreads = tsNumOfCores*tsNumOfThreadsPerCore; - if (maxThreads <= minThreads*2) maxThreads = 2*minThreads; + maxThreads = tsNumOfCores * tsNumOfThreadsPerCore; + if (maxThreads <= minThreads * 2) maxThreads = 2 * minThreads; dPrint("dnode read is opened"); return 0; @@ -77,7 +61,6 @@ void dnodeRead(SRpcMsg *pMsg) { int32_t queuedMsgNum = 0; int32_t leftLen = pMsg->contLen; char *pCont = (char *) pMsg->pCont; - SRpcContext *pRpcContext = NULL; void *pVnode; dTrace("dnode %s msg incoming, thandle:%p", taosMsg[pMsg->msgType], pMsg->handle); @@ -105,7 +88,6 @@ void dnodeRead(SRpcMsg *pMsg) { pRead->rpcMsg = *pMsg; pRead->pCont = pCont; pRead->contLen = pHead->contLen; - pRead->pRpcContext = pRpcContext; taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead); @@ -156,6 +138,34 @@ void dnodeFreeRqueue(void *rqueue) { // dynamically adjust the number of threads } +static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) { + SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); + pRead->rpcMsg = pMsg->rpcMsg; + pRead->pCont = qhandle; + pRead->contLen = 0; + pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; + + taos_queue queue = vnodeGetRqueue(pVnode); + taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead); +} + +void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) { + if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; + if (code == TSDB_CODE_ACTION_NEED_REPROCESSED) { + dnodeContinueExecuteQuery(pVnode, pRead->rspRet.qhandle, pRead); + } + + SRpcMsg rpcRsp = { + .handle = pRead->rpcMsg.handle, + .pCont = pRead->rspRet.rsp, + .contLen = pRead->rspRet.len, + .code = pRead->rspRet.code, + }; + + rpcSendResponse(&rpcRsp); + rpcFreeCont(pRead->rpcMsg.pCont); +} + static void *dnodeProcessReadQueue(void *param) { taos_qset qset = (taos_qset)param; SReadMsg *pReadMsg; @@ -168,13 +178,8 @@ static void *dnodeProcessReadQueue(void *param) { continue; } - terrno = 0; - if (dnodeProcessReadMsgFp[pReadMsg->rpcMsg.msgType]) { - (*dnodeProcessReadMsgFp[pReadMsg->rpcMsg.msgType]) (pVnode, pReadMsg); - } else { - terrno = TSDB_CODE_MSG_NOT_PROCESSED; - } - + int32_t code = vnodeProcessRead(pVnode, pReadMsg->rpcMsg.msgType, pReadMsg->pCont, pReadMsg->contLen, &pReadMsg->rspRet); + dnodeSendRpcReadRsp(pVnode, pReadMsg, code); taosFreeQitem(pReadMsg); } @@ -193,118 +198,3 @@ static void dnodeHandleIdleReadWorker() { } } -UNUSED_FUNC -static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead) { - SRpcContext *pRpcContext = pRead->pRpcContext; - int32_t code = 0; - - if (pRpcContext) { - if (terrno) { - if (pRpcContext->code == 0) pRpcContext->code = terrno; - } - - int32_t count = atomic_add_fetch_32(&pRpcContext->count, 1); - if (count < pRpcContext->numOfVnodes) { - // not over yet, multiple vnodes - return; - } - - // over, result can be merged now - code = pRpcContext->code; - } else { - code = terrno; - } - - //TODO: query handle is returned by dnodeProcessQueryMsg - if (0) { - SRpcMsg rsp; - rsp.handle = pRead->rpcMsg.handle; - rsp.code = code; - rsp.pCont = NULL; - rpcSendResponse(&rsp); - } - - rpcFreeCont(pRead->rpcMsg.pCont); // free the received message -} - -static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) { - - SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); - pRead->rpcMsg = pMsg->rpcMsg; - pRead->pCont = qhandle; - pRead->contLen = 0; - pRead->pRpcContext = pMsg->pRpcContext; - pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; - - taos_queue queue = vnodeGetRqueue(pVnode); - taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead); -} - -static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg) { - SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pMsg->pCont; - - SQInfo* pQInfo = NULL; - if (pMsg->contLen != 0) { - void* tsdb = vnodeGetTsdb(pVnode); - int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo); - - SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); - pRsp->code = code; - pRsp->qhandle = htobe64((uint64_t) (pQInfo)); - - SRpcMsg rpcRsp = { - .handle = pMsg->rpcMsg.handle, - .pCont = pRsp, - .contLen = sizeof(SQueryTableRsp), - .code = code, - .msgType = 0 - }; - - rpcSendResponse(&rpcRsp); - dTrace("dnode query msg disposed, thandle:%p", pMsg->rpcMsg.handle); - } else { - pQInfo = pMsg->pCont; - } - - qTableQuery(pQInfo); // do execute query -} - -static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) { - SRetrieveTableMsg *pRetrieve = pMsg->pCont; - void *pQInfo = (void*) htobe64(pRetrieve->qhandle); - - dTrace("QInfo:%p vgId:%d, retrieve msg is received", pQInfo, pRetrieve->header.vgId); - int32_t contLen = 0; - - SRetrieveTableRsp *pRsp = NULL; - - int32_t code = qRetrieveQueryResultInfo(pQInfo); - if (code != TSDB_CODE_SUCCESS) { - contLen = sizeof(SRetrieveTableRsp); - - pRsp = (SRetrieveTableRsp *)rpcMallocCont(contLen); - memset(pRsp, 0, sizeof(SRetrieveTableRsp)); - } else { - // todo check code and handle error in build result set - code = qDumpRetrieveResult(pQInfo, &pRsp, &contLen); - - if (qHasMoreResultsToRetrieve(pQInfo)) { - dnodeContinueExecuteQuery(pVnode, pQInfo, pMsg); - } else { // no further execution invoked, release the ref to vnode - qDestroyQueryInfo(pQInfo); -// dnodeProcessReadResult(pVnode, pMsg); - vnodeRelease(pVnode); - } - } - - SRpcMsg rpcRsp = (SRpcMsg) { - .handle = pMsg->rpcMsg.handle, - .pCont = pRsp, - .contLen = contLen, - .code = code, - .msgType = 0 - }; - - rpcSendResponse(&rpcRsp); - dTrace("dnode retrieve msg disposed, thandle:%p", pMsg->rpcMsg.handle); -} diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 841c5b658f1d37e31cc620872bf59f9790cb3c23..0624af45c39e52c648ddfbc7576c0c87798c4c6c 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -47,6 +47,7 @@ static STaosError errors[] = { // rpc TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_IN_PROGRESS, 0, 1, "action in progress") +TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_NEED_REPROCESSED, 0, 3, "action need to be reprocessed") TAOS_DEFINE_ERROR(TSDB_CODE_MSG_NOT_PROCESSED, 0, 4, "message not processed") TAOS_DEFINE_ERROR(TSDB_CODE_ALREADY_PROCESSED, 0, 5, "message already processed") TAOS_DEFINE_ERROR(TSDB_CODE_REDIRECT, 0, 6, "redirect") diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 9861d1a2ff29a9167aed1a2a7480980a1d16b061..3097343a48a31d97f6792c14964214a2d4380563 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -22,7 +22,9 @@ extern "C" { typedef struct { int len; + int code; void *rsp; + void *qhandle; //used by query and retrieve msg } SRspRet; int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg); @@ -42,6 +44,8 @@ void* vnodeGetTsdb(void *pVnode); int32_t vnodeProcessWrite(void *pVnode, int qtype, SWalHead *pHead, void *item); void vnodeBuildStatusMsg(void * param); +int32_t vnodeProcessRead(void *pVnode, int msgType, void *pCont, int32_t contLen, SRspRet *ret); + #ifdef __cplusplus } #endif diff --git a/src/vnode/main/inc/vnodeInt.h b/src/vnode/main/inc/vnodeInt.h index 7e6caf6168a7ad758f4eb4e2c4858b4db2d639f5..561b6ae61ff8ae85c1bd6b0c47e53b1d80c6ba99 100644 --- a/src/vnode/main/inc/vnodeInt.h +++ b/src/vnode/main/inc/vnodeInt.h @@ -47,6 +47,7 @@ typedef struct { int vnodeWriteToQueue(void *param, SWalHead *pHead, int type); void vnodeInitWriteFp(void); +void vnodeInitReadFp(void); #ifdef __cplusplus } diff --git a/src/vnode/main/src/vnodeMain.c b/src/vnode/main/src/vnodeMain.c index d0352da3b1a349a118a4b8a4d97de4fddf65b61b..3a70ec0d7d024774bf4cb4af962164c57b1add37 100644 --- a/src/vnode/main/src/vnodeMain.c +++ b/src/vnode/main/src/vnodeMain.c @@ -41,6 +41,7 @@ static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT; static void vnodeInit() { vnodeInitWriteFp(); + vnodeInitReadFp(); tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj *), taosHashInt); if (tsDnodeVnodesHash == NULL) { diff --git a/src/vnode/main/src/vnodeRead.c b/src/vnode/main/src/vnodeRead.c new file mode 100644 index 0000000000000000000000000000000000000000..929a30fbcdd87f10bcfdea5bb94a0a9cbda97b69 --- /dev/null +++ b/src/vnode/main/src/vnodeRead.c @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taosmsg.h" +#include "taoserror.h" +#include "tlog.h" +#include "tqueue.h" +#include "trpc.h" +#include "tsdb.h" +#include "twal.h" +#include "dataformat.h" +#include "vnode.h" +#include "vnodeInt.h" +#include "queryExecutor.h" + +static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, int32_t contLen, SRspRet *pRet); +static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet); +static int32_t vnodeProcessRetrieveMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet); + +void vnodeInitReadFp(void) { + vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg; + vnodeProcessReadMsgFp[TSDB_MSG_TYPE_RETRIEVE] = vnodeProcessRetrieveMsg; +} + +int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, SRspRet *ret) { + SVnodeObj *pVnode = (SVnodeObj *)param; + + if (vnodeProcessReadMsgFp[msgType] == NULL) + return TSDB_CODE_MSG_NOT_PROCESSED; + + if (pVnode->status == VN_STATUS_DELETING || pVnode->status == VN_STATUS_CLOSING) + return TSDB_CODE_NOT_ACTIVE_VNODE; + + return (*vnodeProcessReadMsgFp[msgType])(pVnode, pCont, contLen, ret); +} + +static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet) { + SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pCont; + memset(pRet, 0, sizeof(SRspRet)); + + int32_t code = TSDB_CODE_SUCCESS; + + SQInfo* pQInfo = NULL; + if (contLen != 0) { + void* tsdb = vnodeGetTsdb(pVnode); + pRet->code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo); + + SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); + pRsp->qhandle = htobe64((uint64_t) (pQInfo)); + pRsp->code = pRet->code; + + pRet->len = sizeof(SQueryTableRsp); + pRet->rsp = pRsp; + + dTrace("pVnode:%p vgId:%d QInfo:%p, dnode query msg disposed", pVnode, pVnode->vgId, pQInfo); + } else { + pQInfo = pCont; + code = TSDB_CODE_ACTION_IN_PROGRESS; + } + + qTableQuery(pQInfo); // do execute query + + return code; +} + +static int32_t vnodeProcessRetrieveMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet) { + SRetrieveTableMsg *pRetrieve = pCont; + void *pQInfo = (void*) htobe64(pRetrieve->qhandle); + memset(pRet, 0, sizeof(SRspRet)); + + int32_t code = TSDB_CODE_SUCCESS; + + dTrace("pVnode:%p vgId:%d QInfo:%p, retrieve msg is received", pVnode, pVnode->vgId, pQInfo); + + pRet->code = qRetrieveQueryResultInfo(pQInfo); + if (pRet->code != TSDB_CODE_SUCCESS) { + //TODO + pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); + memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); + } else { + // todo check code and handle error in build result set + pRet->code = qDumpRetrieveResult(pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len); + + if (qHasMoreResultsToRetrieve(pQInfo)) { + pRet->qhandle = pQInfo; + code = TSDB_CODE_ACTION_NEED_REPROCESSED; + } else { + // no further execution invoked, release the ref to vnode + qDestroyQueryInfo(pQInfo); + vnodeRelease(pVnode); + } + } + + dTrace("pVnode:%p vgId:%d QInfo:%p, retrieve msg is disposed", pVnode, pVnode->vgId, pQInfo); + return code; +}