From 2a139821b944a1cc7d99a2a81ebf40550618bae9 Mon Sep 17 00:00:00 2001 From: slguan Date: Tue, 6 Aug 2019 15:13:20 +0800 Subject: [PATCH] fix issue #279 --- src/client/src/tscServer.c | 28 +++++++++++++++++++++------- src/client/src/tscSystem.c | 4 ++-- src/system/inc/vnodeUtil.h | 2 +- src/system/src/vnodeImport.c | 4 ++-- src/system/src/vnodeMeter.c | 2 +- src/system/src/vnodeShell.c | 8 ++++---- src/system/src/vnodeStream.c | 2 +- src/system/src/vnodeUtil.c | 4 ++-- 8 files changed, 34 insertions(+), 20 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 9f58fe1390..df176314e5 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -40,7 +40,7 @@ uint32_t tsServerIp; int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql); int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql); -void (*tscUpdateVnodeMsg[TSDB_SQL_MAX])(SSqlObj *pSql); +void (*tscUpdateVnodeMsg[TSDB_SQL_MAX])(SSqlObj *pSql, char* buf); void tscProcessActivityTimer(void *handle, void *tmrId); int tscKeepConn[TSDB_SQL_MAX] = {0}; @@ -206,10 +206,24 @@ int tscSendMsgToServer(SSqlObj *pSql) { } if (pSql->thandle) { + /* + * the total length of message + * rpc header + actual message body + digest + * + * the pSql object may be released automatically during insert procedure, in which the access of + * message body by using "if (pHeader->msgType & 1)" may cause the segment fault. + * + */ + int32_t totalMsgLen = pSql->cmd.payloadLen + tsRpcHeadSize + sizeof(STaosDigest); + + // the memory will be released by taosProcessResponse, so no memory leak here + char* buf = malloc(totalMsgLen); + memcpy(buf, pSql->cmd.payload, totalMsgLen); + tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]); - char *pStart = taosBuildReqHeader(pSql->thandle, pSql->cmd.msgType, pSql->cmd.payload); + char *pStart = taosBuildReqHeader(pSql->thandle, pSql->cmd.msgType, buf); if (pStart) { - if (tscUpdateVnodeMsg[pSql->cmd.command]) (*tscUpdateVnodeMsg[pSql->cmd.command])(pSql); + if (tscUpdateVnodeMsg[pSql->cmd.command]) (*tscUpdateVnodeMsg[pSql->cmd.command])(pSql, buf); int ret = taosSendMsgToPeerH(pSql->thandle, pStart, pSql->cmd.payloadLen, pSql); if (ret >= 0) code = 0; tscTrace("%p send msg ret:%d code:%d sig:%p", pSql, ret, code, pSql->signature); @@ -1007,12 +1021,12 @@ int tscBuildRetrieveMsg(SSqlObj *pSql) { return msgLen; } -void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql) { +void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char* buf) { SShellSubmitMsg *pShellMsg; char * pMsg; SMeterMeta * pMeterMeta = pSql->cmd.pMeterMeta; - pMsg = pSql->cmd.payload + tsRpcHeadSize; + pMsg = buf + tsRpcHeadSize; pShellMsg = (SShellSubmitMsg *)pMsg; pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pSql->index].vnode); @@ -1042,9 +1056,9 @@ int tscBuildSubmitMsg(SSqlObj *pSql) { return msgLen; } -void tscUpdateVnodeInQueryMsg(SSqlObj *pSql) { +void tscUpdateVnodeInQueryMsg(SSqlObj *pSql, char* buf) { SSqlCmd *pCmd = &pSql->cmd; - char * pStart = pCmd->payload + tsRpcHeadSize; + char * pStart = buf + tsRpcHeadSize; SQueryMeterMsg *pQueryMsg = (SQueryMeterMsg *)pStart; diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 34dea0be9e..381bd79bf8 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -110,7 +110,7 @@ void taos_init_imp() { rpcInit.numOfChanns = tscNumOfThreads; rpcInit.sessionsPerChann = tsMaxVnodeConnections / tscNumOfThreads; rpcInit.idMgmt = TAOS_ID_FREE; - rpcInit.noFree = 1; + rpcInit.noFree = 0; rpcInit.connType = TAOS_CONN_UDP; rpcInit.qhandle = tscQhandle; pVnodeConn = taosOpenRpc(&rpcInit); @@ -131,7 +131,7 @@ void taos_init_imp() { rpcInit.numOfChanns = 1; rpcInit.sessionsPerChann = tsMaxMgmtConnections; rpcInit.idMgmt = TAOS_ID_FREE; - rpcInit.noFree = 1; + rpcInit.noFree = 0; rpcInit.connType = TAOS_CONN_UDP; rpcInit.qhandle = tscQhandle; pTscMgmtConn = taosOpenRpc(&rpcInit); diff --git a/src/system/inc/vnodeUtil.h b/src/system/inc/vnodeUtil.h index 2f619c85aa..b7efcfaa38 100644 --- a/src/system/inc/vnodeUtil.h +++ b/src/system/inc/vnodeUtil.h @@ -75,7 +75,7 @@ int32_t vnodeIncQueryRefCount(SQueryMeterMsg *pQueryMsg, SMeterSidExtInfo **pSid void vnodeDecQueryRefCount(SQueryMeterMsg *pQueryMsg, SMeterObj **pMeterObjList, int32_t numOfInc); -int32_t vnodeTransferMeterState(SMeterObj* pMeterObj, int32_t state); +int32_t vnodeSetMeterState(SMeterObj* pMeterObj, int32_t state); void vnodeClearMeterState(SMeterObj* pMeterObj, int32_t state); bool vnodeIsMeterState(SMeterObj* pMeterObj, int32_t state); void vnodeSetMeterDeleting(SMeterObj* pMeterObj); diff --git a/src/system/src/vnodeImport.c b/src/system/src/vnodeImport.c index d2493a5830..92ecaa7e01 100644 --- a/src/system/src/vnodeImport.c +++ b/src/system/src/vnodeImport.c @@ -285,7 +285,7 @@ void vnodeProcessImportTimer(void *param, void *tmrId) { pImport->retry++; //slow query will block the import operation - int32_t state = vnodeTransferMeterState(pObj, TSDB_METER_STATE_IMPORTING); + int32_t state = vnodeSetMeterState(pObj, TSDB_METER_STATE_IMPORTING); if (state >= TSDB_METER_STATE_DELETING) { dError("vid:%d sid:%d id:%s, meter is deleted, failed to import, state:%d", pObj->vnode, pObj->sid, pObj->meterId, state); @@ -887,7 +887,7 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi if (*((TSKEY *)(pSubmit->payLoad + (rows - 1) * pObj->bytesPerPoint)) > pObj->lastKey) { vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); - vnodeTransferMeterState(pObj, TSDB_METER_STATE_INSERT); + vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT); code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, pObj->sversion, &pointsImported); if (pShell) { diff --git a/src/system/src/vnodeMeter.c b/src/system/src/vnodeMeter.c index b8961e98ec..62b6b64e22 100644 --- a/src/system/src/vnodeMeter.c +++ b/src/system/src/vnodeMeter.c @@ -676,7 +676,7 @@ void vnodeUpdateMeter(void *param, void *tmrId) { return; } - int32_t state = vnodeTransferMeterState(pObj, TSDB_METER_STATE_UPDATING); + int32_t state = vnodeSetMeterState(pObj, TSDB_METER_STATE_UPDATING); if (state >= TSDB_METER_STATE_DELETING) { dError("vid:%d sid:%d id:%s, meter is deleted, failed to update, state:%d", pObj->vnode, pObj->sid, pObj->meterId, state); diff --git a/src/system/src/vnodeShell.c b/src/system/src/vnodeShell.c index 03ddf522ce..fda92d5d4c 100644 --- a/src/system/src/vnodeShell.c +++ b/src/system/src/vnodeShell.c @@ -183,8 +183,8 @@ void vnodeCloseShellVnode(int vnode) { * 1. The msg, as well as SRpcConn may be in the task queue, free it immediate will cause crash * 2. Free connection may cause *(SRpcConn*)pObj->thandle to be invalid to access. */ - dTrace("vid:%d, delay 5sec to free resources", vnode); - taosTmrStart(vnodeDelayedFreeResource, 5000, v, vnodeTmrCtrl); + dTrace("vid:%d, delay 500ms to free resources", vnode); + taosTmrStart(vnodeDelayedFreeResource, 500, v, vnodeTmrCtrl); } void vnodeCleanUpShell() { @@ -508,9 +508,9 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) { int32_t state = TSDB_METER_STATE_READY; if (pSubmit->import) { - state = vnodeTransferMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING); + state = vnodeSetMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING); } else { - state = vnodeTransferMeterState(pMeterObj, TSDB_METER_STATE_INSERT); + state = vnodeSetMeterState(pMeterObj, TSDB_METER_STATE_INSERT); } if (state == TSDB_METER_STATE_READY) { diff --git a/src/system/src/vnodeStream.c b/src/system/src/vnodeStream.c index d7b7f62bae..a8dcff231d 100644 --- a/src/system/src/vnodeStream.c +++ b/src/system/src/vnodeStream.c @@ -55,7 +55,7 @@ void vnodeProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { int32_t numOfPoints = 0; - int32_t state = vnodeTransferMeterState(pObj, TSDB_METER_STATE_INSERT); + int32_t state = vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT); if (state == TSDB_METER_STATE_READY) { vnodeInsertPoints(pObj, (char *)pMsg, contLen, TSDB_DATA_SOURCE_SHELL, NULL, pObj->sversion, &numOfPoints); vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT); diff --git a/src/system/src/vnodeUtil.c b/src/system/src/vnodeUtil.c index f97b7cb00b..f9d1966378 100644 --- a/src/system/src/vnodeUtil.c +++ b/src/system/src/vnodeUtil.c @@ -586,7 +586,7 @@ void vnodeUpdateQueryColumnIndex(SQuery* pQuery, SMeterObj* pMeterObj) { } } -int32_t vnodeTransferMeterState(SMeterObj* pMeterObj, int32_t state) { +int32_t vnodeSetMeterState(SMeterObj* pMeterObj, int32_t state) { return __sync_val_compare_and_swap(&pMeterObj->state, TSDB_METER_STATE_READY, state); } @@ -619,7 +619,7 @@ bool vnodeIsSafeToDeleteMeter(SVnodeObj* pVnode, int32_t sid) { return true; } - int32_t prev = vnodeTransferMeterState(pObj, TSDB_METER_STATE_DELETING); + int32_t prev = vnodeSetMeterState(pObj, TSDB_METER_STATE_DELETING); /* * if the meter is not in ready/deleting state, it must be in insert/import/update, -- GitLab