diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 18be516ed59f16f27b5f2dcf6bc3a8f00f5daab8..56982e8a4764427a0fd6d4a00164f69b3d6100c6 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -35,6 +35,7 @@ extern "C" { #define TAOS_ID_REALLOCATE 2 #define taosSendMsgToPeer(x, y, z) taosSendMsgToPeerH(x, y, z, NULL) +#define taosOpenRpcChann(x, y, z) taosOpenRpcChannWithQ(x, y, z, NULL) #define taosBuildReqMsg(x, y) taosBuildReqMsgWithSize(x, y, 512) #define taosBuildRspMsg(x, y) taosBuildRspMsgWithSize(x, y, 512) @@ -79,7 +80,7 @@ void *taosOpenRpc(SRpcInit *pRpc); void taosCloseRpc(void *); -int taosOpenRpcChann(void *handle, int cid, int sessions); +int taosOpenRpcChannWithQ(void *handle, int cid, int sessions, void *qhandle); void taosCloseRpcChann(void *handle, int cid); diff --git a/src/rpc/src/trpc.c b/src/rpc/src/trpc.c index c37103128e17b54bf53d85896ab367dad1884a75..4204db394a952f4ca4fd11e60d20075e6d916a73 100644 --- a/src/rpc/src/trpc.c +++ b/src/rpc/src/trpc.c @@ -87,6 +87,7 @@ typedef struct { typedef struct { int sessions; + void * qhandle; // for scheduler SRpcConn * connList; void * idPool; void * tmrCtrl; @@ -340,7 +341,7 @@ void *taosOpenRpc(SRpcInit *pRpc) { return pServer; } -int taosOpenRpcChann(void *handle, int cid, int sessions) { +int taosOpenRpcChannWithQ(void *handle, int cid, int sessions, void *qhandle) { STaosRpc * pServer = (STaosRpc *)handle; SRpcChann *pChann; @@ -384,6 +385,8 @@ int taosOpenRpcChann(void *handle, int cid, int sessions) { pthread_mutex_init(&pChann->mutex, NULL); pChann->sessions = sessions; + pChann->qhandle = qhandle ? qhandle : pServer->qhandle; + return 0; } @@ -986,7 +989,7 @@ void taosProcessIdleTimer(void *param, void *tmrId) { schedMsg.msg = NULL; schedMsg.ahandle = pConn->ahandle; schedMsg.thandle = pConn; - taosScheduleTask(pServer->qhandle, &schedMsg); + taosScheduleTask(pChann->qhandle, &schedMsg); } pthread_mutex_unlock(&pChann->mutex); @@ -1002,12 +1005,14 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, short port, char pReply[128]; SSchedMsg schedMsg; int chann, sid; + SRpcChann * pChann = NULL; tDump(data, dataLen); if (ip == 0 && taosCloseConn[pServer->type]) { // it means the connection is broken if (pConn) { + pChann = pServer->channList + pConn->chann; tTrace("%s cid:%d sid:%d id:%s, underlying link is gone pConn:%p", pServer->label, pConn->chann, pConn->sid, pConn->meterId, pConn); pConn->rspReceived = 1; @@ -1016,7 +1021,7 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, short port, schedMsg.msg = NULL; schedMsg.ahandle = pConn->ahandle; schedMsg.thandle = pConn; - taosScheduleTask(pServer->qhandle, &schedMsg); + taosScheduleTask(pChann->qhandle, &schedMsg); } tfree(data); return NULL; @@ -1070,6 +1075,7 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, short port, // internal communication is based on TAOS protocol, a trick here to make it efficient pHeader->msgLen = msgLen - (int)sizeof(STaosHeader) + (int)sizeof(SIntMsg); + if (pHeader->spi) pHeader->msgLen -= sizeof(STaosDigest); if ((pHeader->msgType & 1) == 0 && (pHeader->content[0] == TSDB_CODE_SESSION_ALREADY_EXIST)) { schedMsg.msg = NULL; // connection shall be closed @@ -1084,10 +1090,11 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, short port, pConn->pTimer); } + pChann = pServer->channList + pConn->chann; schedMsg.fp = taosProcessSchedMsg; schedMsg.ahandle = pConn->ahandle; schedMsg.thandle = pConn; - taosScheduleTask(pServer->qhandle, &schedMsg); + taosScheduleTask(pChann->qhandle, &schedMsg); } return pConn; @@ -1274,7 +1281,7 @@ void taosProcessTaosTimer(void *param, void *tmrId) { schedMsg.msg = NULL; schedMsg.ahandle = pConn->ahandle; schedMsg.thandle = pConn; - taosScheduleTask(pServer->qhandle, &schedMsg); + taosScheduleTask(pChann->qhandle, &schedMsg); } } } @@ -1342,7 +1349,7 @@ void taosStopRpcConn(void *thandle) { schedMsg.thandle = pConn; pthread_mutex_unlock(&pChann->mutex); - taosScheduleTask(pServer->qhandle, &schedMsg); + taosScheduleTask(pChann->qhandle, &schedMsg); } else { pthread_mutex_unlock(&pChann->mutex); taosCloseRpcConn(pConn); diff --git a/src/system/inc/vnode.h b/src/system/inc/vnode.h index 139ec8d373709f5b55199c758e0adad87cab943e..2fbee7eb2f89fba7d76659ce76611c45c6cddf0d 100644 --- a/src/system/inc/vnode.h +++ b/src/system/inc/vnode.h @@ -324,11 +324,12 @@ typedef struct { // internal globals extern int tsMeterSizeOnFile; extern uint32_t tsRebootTime; -extern void * rpcQhandle; +extern void ** rpcQhandle; extern void * dmQhandle; extern void * queryQhandle; extern int tsMaxVnode; extern int tsOpenVnodes; +extern int tsMaxVnode; extern SVnodeObj *vnodeList; extern void * vnodeTmrCtrl; diff --git a/src/system/src/mgmtSystem.c b/src/system/src/mgmtSystem.c index 0aed50229ee1d57215d0c5bead48846150b44015..acaa82c831f31801c76574b1911bc0e56008c0a3 100644 --- a/src/system/src/mgmtSystem.c +++ b/src/system/src/mgmtSystem.c @@ -39,7 +39,6 @@ void * mgmtStatisticTimer = NULL; void * mgmtStatusTimer = NULL; int mgmtShellConns = 0; extern void *pShellConn; -extern void *rpcQhandle; void mgmtCleanUpSystem() { mTrace("mgmt is running, clean it up"); diff --git a/src/system/src/vnodeFile.c b/src/system/src/vnodeFile.c index 44f6202fe9f9f5975550c7609ee1f48e6af12f3c..05343f4ce41be491b9a6b5e7c6572be9bd61bee8 100644 --- a/src/system/src/vnodeFile.c +++ b/src/system/src/vnodeFile.c @@ -1863,11 +1863,20 @@ int vnodeCheckNewHeaderFile(int fd, SVnodeObj *pVnode) { } if (read(fd, (void *)pBlocks, expectedSize) != expectedSize) { + dError("failed to read block part"); goto _broken_exit; } if (!taosCheckChecksumWhole((uint8_t *)pBlocks, expectedSize)) { + dError("block part is broken"); goto _broken_exit; } + + for (int i = 0; i < compInfo.numOfBlocks; i++) { + if (pBlocks[i].last && i != compInfo.numOfBlocks-1) { + dError("last block in middle, block:%d", i); + goto _broken_exit; + } + } } _correct_exit: diff --git a/src/system/src/vnodeShell.c b/src/system/src/vnodeShell.c index d193c51ac1bb1e4990a156c200b065f077fcd697..33a4f5f36cb6ae97987d1f1d6d459832dde2c9d5 100644 --- a/src/system/src/vnodeShell.c +++ b/src/system/src/vnodeShell.c @@ -31,6 +31,7 @@ #include "vnodeUtil.h" #pragma GCC diagnostic ignored "-Wint-conversion" +extern int tsMaxQueues; void * pShellServer = NULL; SShellObj **shellList = NULL; @@ -128,7 +129,7 @@ int vnodeInitShell() { rpcInit.idMgmt = TAOS_ID_FREE; rpcInit.connType = TAOS_CONN_UDPS; rpcInit.idleTime = tsShellActivityTimer * 1200; - rpcInit.qhandle = rpcQhandle; + rpcInit.qhandle = rpcQhandle[0]; rpcInit.efp = vnodeSendVpeerCfgMsg; pShellServer = taosOpenRpc(&rpcInit); @@ -155,7 +156,7 @@ int vnodeOpenShellVnode(int vnode) { memset(shellList[vnode], 0, size); - taosOpenRpcChann(pShellServer, vnode, sessions); + taosOpenRpcChannWithQ(pShellServer, vnode, sessions, rpcQhandle[(vnode+1)%tsMaxQueues]); return 0; } diff --git a/src/system/src/vnodeSystem.c b/src/system/src/vnodeSystem.c index e74e172af0c4720caf455f15f1b038d2bd9bc576..ef0c9d2d75eebd0743fc0393d47a48919644d5ca 100644 --- a/src/system/src/vnodeSystem.c +++ b/src/system/src/vnodeSystem.c @@ -29,9 +29,10 @@ // internal global, not configurable void * vnodeTmrCtrl; -void * rpcQhandle; +void ** rpcQhandle; void * dmQhandle; void * queryQhandle; +int tsMaxQueues; uint32_t tsRebootTime; int vnodeInitSystem() { @@ -41,9 +42,12 @@ int vnodeInitSystem() { if (numOfThreads < 1) numOfThreads = 1; queryQhandle = taosInitScheduler(tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode, numOfThreads, "query"); - numOfThreads = (1.0 - tsRatioOfQueryThreads) * tsNumOfCores * tsNumOfThreadsPerCore / 2.0; - if (numOfThreads < 1) numOfThreads = 1; - rpcQhandle = taosInitScheduler(tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode, numOfThreads, "dnode"); + tsMaxQueues = (1.0 - tsRatioOfQueryThreads) * tsNumOfCores * tsNumOfThreadsPerCore / 2.0; + if (tsMaxQueues < 1) tsMaxQueues = 1; + + rpcQhandle = malloc(tsMaxQueues*sizeof(void *)); + for (int i = 0; i < tsMaxQueues; i++) + rpcQhandle[i] = taosInitScheduler(tsSessionsPerVnode, 1, "dnode"); dmQhandle = taosInitScheduler(tsSessionsPerVnode, 1, "mgmt");