提交 2a139821 编写于 作者: S slguan

fix issue #279

上级 169338ea
...@@ -40,7 +40,7 @@ uint32_t tsServerIp; ...@@ -40,7 +40,7 @@ uint32_t tsServerIp;
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql); int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql);
int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql); int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql);
void (*tscUpdateVnodeMsg[TSDB_SQL_MAX])(SSqlObj *pSql); void (*tscUpdateVnodeMsg[TSDB_SQL_MAX])(SSqlObj *pSql, char* buf);
void tscProcessActivityTimer(void *handle, void *tmrId); void tscProcessActivityTimer(void *handle, void *tmrId);
int tscKeepConn[TSDB_SQL_MAX] = {0}; int tscKeepConn[TSDB_SQL_MAX] = {0};
...@@ -206,10 +206,24 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -206,10 +206,24 @@ int tscSendMsgToServer(SSqlObj *pSql) {
} }
if (pSql->thandle) { if (pSql->thandle) {
/*
* the total length of message
* rpc header + actual message body + digest
*
* the pSql object may be released automatically during insert procedure, in which the access of
* message body by using "if (pHeader->msgType & 1)" may cause the segment fault.
*
*/
int32_t totalMsgLen = pSql->cmd.payloadLen + tsRpcHeadSize + sizeof(STaosDigest);
// the memory will be released by taosProcessResponse, so no memory leak here
char* buf = malloc(totalMsgLen);
memcpy(buf, pSql->cmd.payload, totalMsgLen);
tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]); tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]);
char *pStart = taosBuildReqHeader(pSql->thandle, pSql->cmd.msgType, pSql->cmd.payload); char *pStart = taosBuildReqHeader(pSql->thandle, pSql->cmd.msgType, buf);
if (pStart) { if (pStart) {
if (tscUpdateVnodeMsg[pSql->cmd.command]) (*tscUpdateVnodeMsg[pSql->cmd.command])(pSql); if (tscUpdateVnodeMsg[pSql->cmd.command]) (*tscUpdateVnodeMsg[pSql->cmd.command])(pSql, buf);
int ret = taosSendMsgToPeerH(pSql->thandle, pStart, pSql->cmd.payloadLen, pSql); int ret = taosSendMsgToPeerH(pSql->thandle, pStart, pSql->cmd.payloadLen, pSql);
if (ret >= 0) code = 0; if (ret >= 0) code = 0;
tscTrace("%p send msg ret:%d code:%d sig:%p", pSql, ret, code, pSql->signature); tscTrace("%p send msg ret:%d code:%d sig:%p", pSql, ret, code, pSql->signature);
...@@ -1007,12 +1021,12 @@ int tscBuildRetrieveMsg(SSqlObj *pSql) { ...@@ -1007,12 +1021,12 @@ int tscBuildRetrieveMsg(SSqlObj *pSql) {
return msgLen; return msgLen;
} }
void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql) { void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char* buf) {
SShellSubmitMsg *pShellMsg; SShellSubmitMsg *pShellMsg;
char * pMsg; char * pMsg;
SMeterMeta * pMeterMeta = pSql->cmd.pMeterMeta; SMeterMeta * pMeterMeta = pSql->cmd.pMeterMeta;
pMsg = pSql->cmd.payload + tsRpcHeadSize; pMsg = buf + tsRpcHeadSize;
pShellMsg = (SShellSubmitMsg *)pMsg; pShellMsg = (SShellSubmitMsg *)pMsg;
pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pSql->index].vnode); pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pSql->index].vnode);
...@@ -1042,9 +1056,9 @@ int tscBuildSubmitMsg(SSqlObj *pSql) { ...@@ -1042,9 +1056,9 @@ int tscBuildSubmitMsg(SSqlObj *pSql) {
return msgLen; return msgLen;
} }
void tscUpdateVnodeInQueryMsg(SSqlObj *pSql) { void tscUpdateVnodeInQueryMsg(SSqlObj *pSql, char* buf) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
char * pStart = pCmd->payload + tsRpcHeadSize; char * pStart = buf + tsRpcHeadSize;
SQueryMeterMsg *pQueryMsg = (SQueryMeterMsg *)pStart; SQueryMeterMsg *pQueryMsg = (SQueryMeterMsg *)pStart;
......
...@@ -110,7 +110,7 @@ void taos_init_imp() { ...@@ -110,7 +110,7 @@ void taos_init_imp() {
rpcInit.numOfChanns = tscNumOfThreads; rpcInit.numOfChanns = tscNumOfThreads;
rpcInit.sessionsPerChann = tsMaxVnodeConnections / tscNumOfThreads; rpcInit.sessionsPerChann = tsMaxVnodeConnections / tscNumOfThreads;
rpcInit.idMgmt = TAOS_ID_FREE; rpcInit.idMgmt = TAOS_ID_FREE;
rpcInit.noFree = 1; rpcInit.noFree = 0;
rpcInit.connType = TAOS_CONN_UDP; rpcInit.connType = TAOS_CONN_UDP;
rpcInit.qhandle = tscQhandle; rpcInit.qhandle = tscQhandle;
pVnodeConn = taosOpenRpc(&rpcInit); pVnodeConn = taosOpenRpc(&rpcInit);
...@@ -131,7 +131,7 @@ void taos_init_imp() { ...@@ -131,7 +131,7 @@ void taos_init_imp() {
rpcInit.numOfChanns = 1; rpcInit.numOfChanns = 1;
rpcInit.sessionsPerChann = tsMaxMgmtConnections; rpcInit.sessionsPerChann = tsMaxMgmtConnections;
rpcInit.idMgmt = TAOS_ID_FREE; rpcInit.idMgmt = TAOS_ID_FREE;
rpcInit.noFree = 1; rpcInit.noFree = 0;
rpcInit.connType = TAOS_CONN_UDP; rpcInit.connType = TAOS_CONN_UDP;
rpcInit.qhandle = tscQhandle; rpcInit.qhandle = tscQhandle;
pTscMgmtConn = taosOpenRpc(&rpcInit); pTscMgmtConn = taosOpenRpc(&rpcInit);
......
...@@ -75,7 +75,7 @@ int32_t vnodeIncQueryRefCount(SQueryMeterMsg *pQueryMsg, SMeterSidExtInfo **pSid ...@@ -75,7 +75,7 @@ int32_t vnodeIncQueryRefCount(SQueryMeterMsg *pQueryMsg, SMeterSidExtInfo **pSid
void vnodeDecQueryRefCount(SQueryMeterMsg *pQueryMsg, SMeterObj **pMeterObjList, int32_t numOfInc); void vnodeDecQueryRefCount(SQueryMeterMsg *pQueryMsg, SMeterObj **pMeterObjList, int32_t numOfInc);
int32_t vnodeTransferMeterState(SMeterObj* pMeterObj, int32_t state); int32_t vnodeSetMeterState(SMeterObj* pMeterObj, int32_t state);
void vnodeClearMeterState(SMeterObj* pMeterObj, int32_t state); void vnodeClearMeterState(SMeterObj* pMeterObj, int32_t state);
bool vnodeIsMeterState(SMeterObj* pMeterObj, int32_t state); bool vnodeIsMeterState(SMeterObj* pMeterObj, int32_t state);
void vnodeSetMeterDeleting(SMeterObj* pMeterObj); void vnodeSetMeterDeleting(SMeterObj* pMeterObj);
......
...@@ -285,7 +285,7 @@ void vnodeProcessImportTimer(void *param, void *tmrId) { ...@@ -285,7 +285,7 @@ void vnodeProcessImportTimer(void *param, void *tmrId) {
pImport->retry++; pImport->retry++;
//slow query will block the import operation //slow query will block the import operation
int32_t state = vnodeTransferMeterState(pObj, TSDB_METER_STATE_IMPORTING); int32_t state = vnodeSetMeterState(pObj, TSDB_METER_STATE_IMPORTING);
if (state >= TSDB_METER_STATE_DELETING) { if (state >= TSDB_METER_STATE_DELETING) {
dError("vid:%d sid:%d id:%s, meter is deleted, failed to import, state:%d", dError("vid:%d sid:%d id:%s, meter is deleted, failed to import, state:%d",
pObj->vnode, pObj->sid, pObj->meterId, state); pObj->vnode, pObj->sid, pObj->meterId, state);
...@@ -887,7 +887,7 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi ...@@ -887,7 +887,7 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
if (*((TSKEY *)(pSubmit->payLoad + (rows - 1) * pObj->bytesPerPoint)) > pObj->lastKey) { if (*((TSKEY *)(pSubmit->payLoad + (rows - 1) * pObj->bytesPerPoint)) > pObj->lastKey) {
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
vnodeTransferMeterState(pObj, TSDB_METER_STATE_INSERT); vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT);
code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, pObj->sversion, &pointsImported); code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, pObj->sversion, &pointsImported);
if (pShell) { if (pShell) {
......
...@@ -676,7 +676,7 @@ void vnodeUpdateMeter(void *param, void *tmrId) { ...@@ -676,7 +676,7 @@ void vnodeUpdateMeter(void *param, void *tmrId) {
return; return;
} }
int32_t state = vnodeTransferMeterState(pObj, TSDB_METER_STATE_UPDATING); int32_t state = vnodeSetMeterState(pObj, TSDB_METER_STATE_UPDATING);
if (state >= TSDB_METER_STATE_DELETING) { if (state >= TSDB_METER_STATE_DELETING) {
dError("vid:%d sid:%d id:%s, meter is deleted, failed to update, state:%d", dError("vid:%d sid:%d id:%s, meter is deleted, failed to update, state:%d",
pObj->vnode, pObj->sid, pObj->meterId, state); pObj->vnode, pObj->sid, pObj->meterId, state);
......
...@@ -183,8 +183,8 @@ void vnodeCloseShellVnode(int vnode) { ...@@ -183,8 +183,8 @@ void vnodeCloseShellVnode(int vnode) {
* 1. The msg, as well as SRpcConn may be in the task queue, free it immediate will cause crash * 1. The msg, as well as SRpcConn may be in the task queue, free it immediate will cause crash
* 2. Free connection may cause *(SRpcConn*)pObj->thandle to be invalid to access. * 2. Free connection may cause *(SRpcConn*)pObj->thandle to be invalid to access.
*/ */
dTrace("vid:%d, delay 5sec to free resources", vnode); dTrace("vid:%d, delay 500ms to free resources", vnode);
taosTmrStart(vnodeDelayedFreeResource, 5000, v, vnodeTmrCtrl); taosTmrStart(vnodeDelayedFreeResource, 500, v, vnodeTmrCtrl);
} }
void vnodeCleanUpShell() { void vnodeCleanUpShell() {
...@@ -508,9 +508,9 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) { ...@@ -508,9 +508,9 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
int32_t state = TSDB_METER_STATE_READY; int32_t state = TSDB_METER_STATE_READY;
if (pSubmit->import) { if (pSubmit->import) {
state = vnodeTransferMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING); state = vnodeSetMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING);
} else { } else {
state = vnodeTransferMeterState(pMeterObj, TSDB_METER_STATE_INSERT); state = vnodeSetMeterState(pMeterObj, TSDB_METER_STATE_INSERT);
} }
if (state == TSDB_METER_STATE_READY) { if (state == TSDB_METER_STATE_READY) {
......
...@@ -55,7 +55,7 @@ void vnodeProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { ...@@ -55,7 +55,7 @@ void vnodeProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
int32_t numOfPoints = 0; int32_t numOfPoints = 0;
int32_t state = vnodeTransferMeterState(pObj, TSDB_METER_STATE_INSERT); int32_t state = vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT);
if (state == TSDB_METER_STATE_READY) { if (state == TSDB_METER_STATE_READY) {
vnodeInsertPoints(pObj, (char *)pMsg, contLen, TSDB_DATA_SOURCE_SHELL, NULL, pObj->sversion, &numOfPoints); vnodeInsertPoints(pObj, (char *)pMsg, contLen, TSDB_DATA_SOURCE_SHELL, NULL, pObj->sversion, &numOfPoints);
vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT); vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT);
......
...@@ -586,7 +586,7 @@ void vnodeUpdateQueryColumnIndex(SQuery* pQuery, SMeterObj* pMeterObj) { ...@@ -586,7 +586,7 @@ void vnodeUpdateQueryColumnIndex(SQuery* pQuery, SMeterObj* pMeterObj) {
} }
} }
int32_t vnodeTransferMeterState(SMeterObj* pMeterObj, int32_t state) { int32_t vnodeSetMeterState(SMeterObj* pMeterObj, int32_t state) {
return __sync_val_compare_and_swap(&pMeterObj->state, TSDB_METER_STATE_READY, state); return __sync_val_compare_and_swap(&pMeterObj->state, TSDB_METER_STATE_READY, state);
} }
...@@ -619,7 +619,7 @@ bool vnodeIsSafeToDeleteMeter(SVnodeObj* pVnode, int32_t sid) { ...@@ -619,7 +619,7 @@ bool vnodeIsSafeToDeleteMeter(SVnodeObj* pVnode, int32_t sid) {
return true; return true;
} }
int32_t prev = vnodeTransferMeterState(pObj, TSDB_METER_STATE_DELETING); int32_t prev = vnodeSetMeterState(pObj, TSDB_METER_STATE_DELETING);
/* /*
* if the meter is not in ready/deleting state, it must be in insert/import/update, * if the meter is not in ready/deleting state, it must be in insert/import/update,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册