提交 d28ee044 编写于 作者: S slguan

fix issue #514

上级 23f07aa1
...@@ -35,6 +35,7 @@ extern "C" { ...@@ -35,6 +35,7 @@ extern "C" {
#define TAOS_ID_REALLOCATE 2 #define TAOS_ID_REALLOCATE 2
#define taosSendMsgToPeer(x, y, z) taosSendMsgToPeerH(x, y, z, NULL) #define taosSendMsgToPeer(x, y, z) taosSendMsgToPeerH(x, y, z, NULL)
#define taosOpenRpcChann(x, y, z) taosOpenRpcChannWithQ(x, y, z, NULL)
#define taosBuildReqMsg(x, y) taosBuildReqMsgWithSize(x, y, 512) #define taosBuildReqMsg(x, y) taosBuildReqMsgWithSize(x, y, 512)
#define taosBuildRspMsg(x, y) taosBuildRspMsgWithSize(x, y, 512) #define taosBuildRspMsg(x, y) taosBuildRspMsgWithSize(x, y, 512)
...@@ -79,7 +80,7 @@ void *taosOpenRpc(SRpcInit *pRpc); ...@@ -79,7 +80,7 @@ void *taosOpenRpc(SRpcInit *pRpc);
void taosCloseRpc(void *); void taosCloseRpc(void *);
int taosOpenRpcChann(void *handle, int cid, int sessions); int taosOpenRpcChannWithQ(void *handle, int cid, int sessions, void *qhandle);
void taosCloseRpcChann(void *handle, int cid); void taosCloseRpcChann(void *handle, int cid);
......
...@@ -87,6 +87,7 @@ typedef struct { ...@@ -87,6 +87,7 @@ typedef struct {
typedef struct { typedef struct {
int sessions; int sessions;
void * qhandle; // for scheduler
SRpcConn * connList; SRpcConn * connList;
void * idPool; void * idPool;
void * tmrCtrl; void * tmrCtrl;
...@@ -340,7 +341,7 @@ void *taosOpenRpc(SRpcInit *pRpc) { ...@@ -340,7 +341,7 @@ void *taosOpenRpc(SRpcInit *pRpc) {
return pServer; return pServer;
} }
int taosOpenRpcChann(void *handle, int cid, int sessions) { int taosOpenRpcChannWithQ(void *handle, int cid, int sessions, void *qhandle) {
STaosRpc * pServer = (STaosRpc *)handle; STaosRpc * pServer = (STaosRpc *)handle;
SRpcChann *pChann; SRpcChann *pChann;
...@@ -384,6 +385,8 @@ int taosOpenRpcChann(void *handle, int cid, int sessions) { ...@@ -384,6 +385,8 @@ int taosOpenRpcChann(void *handle, int cid, int sessions) {
pthread_mutex_init(&pChann->mutex, NULL); pthread_mutex_init(&pChann->mutex, NULL);
pChann->sessions = sessions; pChann->sessions = sessions;
pChann->qhandle = qhandle ? qhandle : pServer->qhandle;
return 0; return 0;
} }
...@@ -986,7 +989,7 @@ void taosProcessIdleTimer(void *param, void *tmrId) { ...@@ -986,7 +989,7 @@ void taosProcessIdleTimer(void *param, void *tmrId) {
schedMsg.msg = NULL; schedMsg.msg = NULL;
schedMsg.ahandle = pConn->ahandle; schedMsg.ahandle = pConn->ahandle;
schedMsg.thandle = pConn; schedMsg.thandle = pConn;
taosScheduleTask(pServer->qhandle, &schedMsg); taosScheduleTask(pChann->qhandle, &schedMsg);
} }
pthread_mutex_unlock(&pChann->mutex); pthread_mutex_unlock(&pChann->mutex);
...@@ -1002,12 +1005,14 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, short port, ...@@ -1002,12 +1005,14 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, short port,
char pReply[128]; char pReply[128];
SSchedMsg schedMsg; SSchedMsg schedMsg;
int chann, sid; int chann, sid;
SRpcChann * pChann = NULL;
tDump(data, dataLen); tDump(data, dataLen);
if (ip == 0 && taosCloseConn[pServer->type]) { if (ip == 0 && taosCloseConn[pServer->type]) {
// it means the connection is broken // it means the connection is broken
if (pConn) { if (pConn) {
pChann = pServer->channList + pConn->chann;
tTrace("%s cid:%d sid:%d id:%s, underlying link is gone pConn:%p", pServer->label, pConn->chann, pConn->sid, tTrace("%s cid:%d sid:%d id:%s, underlying link is gone pConn:%p", pServer->label, pConn->chann, pConn->sid,
pConn->meterId, pConn); pConn->meterId, pConn);
pConn->rspReceived = 1; pConn->rspReceived = 1;
...@@ -1016,7 +1021,7 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, short port, ...@@ -1016,7 +1021,7 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, short port,
schedMsg.msg = NULL; schedMsg.msg = NULL;
schedMsg.ahandle = pConn->ahandle; schedMsg.ahandle = pConn->ahandle;
schedMsg.thandle = pConn; schedMsg.thandle = pConn;
taosScheduleTask(pServer->qhandle, &schedMsg); taosScheduleTask(pChann->qhandle, &schedMsg);
} }
tfree(data); tfree(data);
return NULL; return NULL;
...@@ -1070,6 +1075,7 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, short port, ...@@ -1070,6 +1075,7 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, short port,
// internal communication is based on TAOS protocol, a trick here to make it efficient // internal communication is based on TAOS protocol, a trick here to make it efficient
pHeader->msgLen = msgLen - (int)sizeof(STaosHeader) + (int)sizeof(SIntMsg); pHeader->msgLen = msgLen - (int)sizeof(STaosHeader) + (int)sizeof(SIntMsg);
if (pHeader->spi) pHeader->msgLen -= sizeof(STaosDigest);
if ((pHeader->msgType & 1) == 0 && (pHeader->content[0] == TSDB_CODE_SESSION_ALREADY_EXIST)) { if ((pHeader->msgType & 1) == 0 && (pHeader->content[0] == TSDB_CODE_SESSION_ALREADY_EXIST)) {
schedMsg.msg = NULL; // connection shall be closed schedMsg.msg = NULL; // connection shall be closed
...@@ -1084,10 +1090,11 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, short port, ...@@ -1084,10 +1090,11 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, short port,
pConn->pTimer); pConn->pTimer);
} }
pChann = pServer->channList + pConn->chann;
schedMsg.fp = taosProcessSchedMsg; schedMsg.fp = taosProcessSchedMsg;
schedMsg.ahandle = pConn->ahandle; schedMsg.ahandle = pConn->ahandle;
schedMsg.thandle = pConn; schedMsg.thandle = pConn;
taosScheduleTask(pServer->qhandle, &schedMsg); taosScheduleTask(pChann->qhandle, &schedMsg);
} }
return pConn; return pConn;
...@@ -1274,7 +1281,7 @@ void taosProcessTaosTimer(void *param, void *tmrId) { ...@@ -1274,7 +1281,7 @@ void taosProcessTaosTimer(void *param, void *tmrId) {
schedMsg.msg = NULL; schedMsg.msg = NULL;
schedMsg.ahandle = pConn->ahandle; schedMsg.ahandle = pConn->ahandle;
schedMsg.thandle = pConn; schedMsg.thandle = pConn;
taosScheduleTask(pServer->qhandle, &schedMsg); taosScheduleTask(pChann->qhandle, &schedMsg);
} }
} }
} }
...@@ -1342,7 +1349,7 @@ void taosStopRpcConn(void *thandle) { ...@@ -1342,7 +1349,7 @@ void taosStopRpcConn(void *thandle) {
schedMsg.thandle = pConn; schedMsg.thandle = pConn;
pthread_mutex_unlock(&pChann->mutex); pthread_mutex_unlock(&pChann->mutex);
taosScheduleTask(pServer->qhandle, &schedMsg); taosScheduleTask(pChann->qhandle, &schedMsg);
} else { } else {
pthread_mutex_unlock(&pChann->mutex); pthread_mutex_unlock(&pChann->mutex);
taosCloseRpcConn(pConn); taosCloseRpcConn(pConn);
......
...@@ -324,11 +324,12 @@ typedef struct { ...@@ -324,11 +324,12 @@ typedef struct {
// internal globals // internal globals
extern int tsMeterSizeOnFile; extern int tsMeterSizeOnFile;
extern uint32_t tsRebootTime; extern uint32_t tsRebootTime;
extern void * rpcQhandle; extern void ** rpcQhandle;
extern void * dmQhandle; extern void * dmQhandle;
extern void * queryQhandle; extern void * queryQhandle;
extern int tsMaxVnode; extern int tsMaxVnode;
extern int tsOpenVnodes; extern int tsOpenVnodes;
extern int tsMaxVnode;
extern SVnodeObj *vnodeList; extern SVnodeObj *vnodeList;
extern void * vnodeTmrCtrl; extern void * vnodeTmrCtrl;
......
...@@ -39,7 +39,6 @@ void * mgmtStatisticTimer = NULL; ...@@ -39,7 +39,6 @@ void * mgmtStatisticTimer = NULL;
void * mgmtStatusTimer = NULL; void * mgmtStatusTimer = NULL;
int mgmtShellConns = 0; int mgmtShellConns = 0;
extern void *pShellConn; extern void *pShellConn;
extern void *rpcQhandle;
void mgmtCleanUpSystem() { void mgmtCleanUpSystem() {
mTrace("mgmt is running, clean it up"); mTrace("mgmt is running, clean it up");
......
...@@ -1863,11 +1863,20 @@ int vnodeCheckNewHeaderFile(int fd, SVnodeObj *pVnode) { ...@@ -1863,11 +1863,20 @@ int vnodeCheckNewHeaderFile(int fd, SVnodeObj *pVnode) {
} }
if (read(fd, (void *)pBlocks, expectedSize) != expectedSize) { if (read(fd, (void *)pBlocks, expectedSize) != expectedSize) {
dError("failed to read block part");
goto _broken_exit; goto _broken_exit;
} }
if (!taosCheckChecksumWhole((uint8_t *)pBlocks, expectedSize)) { if (!taosCheckChecksumWhole((uint8_t *)pBlocks, expectedSize)) {
dError("block part is broken");
goto _broken_exit; goto _broken_exit;
} }
for (int i = 0; i < compInfo.numOfBlocks; i++) {
if (pBlocks[i].last && i != compInfo.numOfBlocks-1) {
dError("last block in middle, block:%d", i);
goto _broken_exit;
}
}
} }
_correct_exit: _correct_exit:
......
...@@ -31,6 +31,7 @@ ...@@ -31,6 +31,7 @@
#include "vnodeUtil.h" #include "vnodeUtil.h"
#pragma GCC diagnostic ignored "-Wint-conversion" #pragma GCC diagnostic ignored "-Wint-conversion"
extern int tsMaxQueues;
void * pShellServer = NULL; void * pShellServer = NULL;
SShellObj **shellList = NULL; SShellObj **shellList = NULL;
...@@ -128,7 +129,7 @@ int vnodeInitShell() { ...@@ -128,7 +129,7 @@ int vnodeInitShell() {
rpcInit.idMgmt = TAOS_ID_FREE; rpcInit.idMgmt = TAOS_ID_FREE;
rpcInit.connType = TAOS_CONN_UDPS; rpcInit.connType = TAOS_CONN_UDPS;
rpcInit.idleTime = tsShellActivityTimer * 1200; rpcInit.idleTime = tsShellActivityTimer * 1200;
rpcInit.qhandle = rpcQhandle; rpcInit.qhandle = rpcQhandle[0];
rpcInit.efp = vnodeSendVpeerCfgMsg; rpcInit.efp = vnodeSendVpeerCfgMsg;
pShellServer = taosOpenRpc(&rpcInit); pShellServer = taosOpenRpc(&rpcInit);
...@@ -155,7 +156,7 @@ int vnodeOpenShellVnode(int vnode) { ...@@ -155,7 +156,7 @@ int vnodeOpenShellVnode(int vnode) {
memset(shellList[vnode], 0, size); memset(shellList[vnode], 0, size);
taosOpenRpcChann(pShellServer, vnode, sessions); taosOpenRpcChannWithQ(pShellServer, vnode, sessions, rpcQhandle[(vnode+1)%tsMaxQueues]);
return 0; return 0;
} }
......
...@@ -29,9 +29,10 @@ ...@@ -29,9 +29,10 @@
// internal global, not configurable // internal global, not configurable
void * vnodeTmrCtrl; void * vnodeTmrCtrl;
void * rpcQhandle; void ** rpcQhandle;
void * dmQhandle; void * dmQhandle;
void * queryQhandle; void * queryQhandle;
int tsMaxQueues;
uint32_t tsRebootTime; uint32_t tsRebootTime;
int vnodeInitSystem() { int vnodeInitSystem() {
...@@ -41,9 +42,12 @@ int vnodeInitSystem() { ...@@ -41,9 +42,12 @@ int vnodeInitSystem() {
if (numOfThreads < 1) numOfThreads = 1; if (numOfThreads < 1) numOfThreads = 1;
queryQhandle = taosInitScheduler(tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode, numOfThreads, "query"); queryQhandle = taosInitScheduler(tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode, numOfThreads, "query");
numOfThreads = (1.0 - tsRatioOfQueryThreads) * tsNumOfCores * tsNumOfThreadsPerCore / 2.0; tsMaxQueues = (1.0 - tsRatioOfQueryThreads) * tsNumOfCores * tsNumOfThreadsPerCore / 2.0;
if (numOfThreads < 1) numOfThreads = 1; if (tsMaxQueues < 1) tsMaxQueues = 1;
rpcQhandle = taosInitScheduler(tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode, numOfThreads, "dnode");
rpcQhandle = malloc(tsMaxQueues*sizeof(void *));
for (int i = 0; i < tsMaxQueues; i++)
rpcQhandle[i] = taosInitScheduler(tsSessionsPerVnode, 1, "dnode");
dmQhandle = taosInitScheduler(tsSessionsPerVnode, 1, "mgmt"); dmQhandle = taosInitScheduler(tsSessionsPerVnode, 1, "mgmt");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册