diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 9f58fe13906873af73c244c210c0a03ebaa33705..df176314e5cb1371dd5465c8a24ae4e7b77e74ab 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -40,7 +40,7 @@ uint32_t tsServerIp; int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql); int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql); -void (*tscUpdateVnodeMsg[TSDB_SQL_MAX])(SSqlObj *pSql); +void (*tscUpdateVnodeMsg[TSDB_SQL_MAX])(SSqlObj *pSql, char* buf); void tscProcessActivityTimer(void *handle, void *tmrId); int tscKeepConn[TSDB_SQL_MAX] = {0}; @@ -206,10 +206,24 @@ int tscSendMsgToServer(SSqlObj *pSql) { } if (pSql->thandle) { + /* + * the total length of message + * rpc header + actual message body + digest + * + * the pSql object may be released automatically during insert procedure, in which the access of + * message body by using "if (pHeader->msgType & 1)" may cause the segment fault. + * + */ + int32_t totalMsgLen = pSql->cmd.payloadLen + tsRpcHeadSize + sizeof(STaosDigest); + + // the memory will be released by taosProcessResponse, so no memory leak here + char* buf = malloc(totalMsgLen); + memcpy(buf, pSql->cmd.payload, totalMsgLen); + tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]); - char *pStart = taosBuildReqHeader(pSql->thandle, pSql->cmd.msgType, pSql->cmd.payload); + char *pStart = taosBuildReqHeader(pSql->thandle, pSql->cmd.msgType, buf); if (pStart) { - if (tscUpdateVnodeMsg[pSql->cmd.command]) (*tscUpdateVnodeMsg[pSql->cmd.command])(pSql); + if (tscUpdateVnodeMsg[pSql->cmd.command]) (*tscUpdateVnodeMsg[pSql->cmd.command])(pSql, buf); int ret = taosSendMsgToPeerH(pSql->thandle, pStart, pSql->cmd.payloadLen, pSql); if (ret >= 0) code = 0; tscTrace("%p send msg ret:%d code:%d sig:%p", pSql, ret, code, pSql->signature); @@ -1007,12 +1021,12 @@ int tscBuildRetrieveMsg(SSqlObj *pSql) { return msgLen; } -void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql) { +void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char* buf) { SShellSubmitMsg *pShellMsg; char * pMsg; SMeterMeta * pMeterMeta = pSql->cmd.pMeterMeta; - pMsg = pSql->cmd.payload + tsRpcHeadSize; + pMsg = buf + tsRpcHeadSize; pShellMsg = (SShellSubmitMsg *)pMsg; pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pSql->index].vnode); @@ -1042,9 +1056,9 @@ int tscBuildSubmitMsg(SSqlObj *pSql) { return msgLen; } -void tscUpdateVnodeInQueryMsg(SSqlObj *pSql) { +void tscUpdateVnodeInQueryMsg(SSqlObj *pSql, char* buf) { SSqlCmd *pCmd = &pSql->cmd; - char * pStart = pCmd->payload + tsRpcHeadSize; + char * pStart = buf + tsRpcHeadSize; SQueryMeterMsg *pQueryMsg = (SQueryMeterMsg *)pStart; diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 34dea0be9e835ae7225d8fe0f886b78a2a827413..381bd79bf81afd19c4c5e545d8b75173f898a6cd 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -110,7 +110,7 @@ void taos_init_imp() { rpcInit.numOfChanns = tscNumOfThreads; rpcInit.sessionsPerChann = tsMaxVnodeConnections / tscNumOfThreads; rpcInit.idMgmt = TAOS_ID_FREE; - rpcInit.noFree = 1; + rpcInit.noFree = 0; rpcInit.connType = TAOS_CONN_UDP; rpcInit.qhandle = tscQhandle; pVnodeConn = taosOpenRpc(&rpcInit); @@ -131,7 +131,7 @@ void taos_init_imp() { rpcInit.numOfChanns = 1; rpcInit.sessionsPerChann = tsMaxMgmtConnections; rpcInit.idMgmt = TAOS_ID_FREE; - rpcInit.noFree = 1; + rpcInit.noFree = 0; rpcInit.connType = TAOS_CONN_UDP; rpcInit.qhandle = tscQhandle; pTscMgmtConn = taosOpenRpc(&rpcInit); diff --git a/src/system/inc/vnodeUtil.h b/src/system/inc/vnodeUtil.h index 2f619c85aa83bffd6b30c4f2f0d6be4c4d3eda2f..b7efcfaa38870b4eaee009d6eccfd6cb75e7d307 100644 --- a/src/system/inc/vnodeUtil.h +++ b/src/system/inc/vnodeUtil.h @@ -75,7 +75,7 @@ int32_t vnodeIncQueryRefCount(SQueryMeterMsg *pQueryMsg, SMeterSidExtInfo **pSid void vnodeDecQueryRefCount(SQueryMeterMsg *pQueryMsg, SMeterObj **pMeterObjList, int32_t numOfInc); -int32_t vnodeTransferMeterState(SMeterObj* pMeterObj, int32_t state); +int32_t vnodeSetMeterState(SMeterObj* pMeterObj, int32_t state); void vnodeClearMeterState(SMeterObj* pMeterObj, int32_t state); bool vnodeIsMeterState(SMeterObj* pMeterObj, int32_t state); void vnodeSetMeterDeleting(SMeterObj* pMeterObj); diff --git a/src/system/src/vnodeImport.c b/src/system/src/vnodeImport.c index d2493a5830674c29a9d76916126663622a5e5543..92ecaa7e0120bb36b9b21f574af95cef4003987b 100644 --- a/src/system/src/vnodeImport.c +++ b/src/system/src/vnodeImport.c @@ -285,7 +285,7 @@ void vnodeProcessImportTimer(void *param, void *tmrId) { pImport->retry++; //slow query will block the import operation - int32_t state = vnodeTransferMeterState(pObj, TSDB_METER_STATE_IMPORTING); + int32_t state = vnodeSetMeterState(pObj, TSDB_METER_STATE_IMPORTING); if (state >= TSDB_METER_STATE_DELETING) { dError("vid:%d sid:%d id:%s, meter is deleted, failed to import, state:%d", pObj->vnode, pObj->sid, pObj->meterId, state); @@ -887,7 +887,7 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi if (*((TSKEY *)(pSubmit->payLoad + (rows - 1) * pObj->bytesPerPoint)) > pObj->lastKey) { vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); - vnodeTransferMeterState(pObj, TSDB_METER_STATE_INSERT); + vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT); code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, pObj->sversion, &pointsImported); if (pShell) { diff --git a/src/system/src/vnodeMeter.c b/src/system/src/vnodeMeter.c index b8961e98ec4d75b5aafcbbd96529784ed92ae98b..62b6b64e22407defa861e94a3bb25d87cfcca646 100644 --- a/src/system/src/vnodeMeter.c +++ b/src/system/src/vnodeMeter.c @@ -676,7 +676,7 @@ void vnodeUpdateMeter(void *param, void *tmrId) { return; } - int32_t state = vnodeTransferMeterState(pObj, TSDB_METER_STATE_UPDATING); + int32_t state = vnodeSetMeterState(pObj, TSDB_METER_STATE_UPDATING); if (state >= TSDB_METER_STATE_DELETING) { dError("vid:%d sid:%d id:%s, meter is deleted, failed to update, state:%d", pObj->vnode, pObj->sid, pObj->meterId, state); diff --git a/src/system/src/vnodeShell.c b/src/system/src/vnodeShell.c index 03ddf522cea5eabf60151a6aefdec88fa02045d9..fda92d5d4c02dce2b066d79831eb573c3c7e72c2 100644 --- a/src/system/src/vnodeShell.c +++ b/src/system/src/vnodeShell.c @@ -183,8 +183,8 @@ void vnodeCloseShellVnode(int vnode) { * 1. The msg, as well as SRpcConn may be in the task queue, free it immediate will cause crash * 2. Free connection may cause *(SRpcConn*)pObj->thandle to be invalid to access. */ - dTrace("vid:%d, delay 5sec to free resources", vnode); - taosTmrStart(vnodeDelayedFreeResource, 5000, v, vnodeTmrCtrl); + dTrace("vid:%d, delay 500ms to free resources", vnode); + taosTmrStart(vnodeDelayedFreeResource, 500, v, vnodeTmrCtrl); } void vnodeCleanUpShell() { @@ -508,9 +508,9 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) { int32_t state = TSDB_METER_STATE_READY; if (pSubmit->import) { - state = vnodeTransferMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING); + state = vnodeSetMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING); } else { - state = vnodeTransferMeterState(pMeterObj, TSDB_METER_STATE_INSERT); + state = vnodeSetMeterState(pMeterObj, TSDB_METER_STATE_INSERT); } if (state == TSDB_METER_STATE_READY) { diff --git a/src/system/src/vnodeStream.c b/src/system/src/vnodeStream.c index d7b7f62baefcc2247a0c6007c2459266ea95ec8c..a8dcff231d6d12e8bceed83e37b0fd51592f67ca 100644 --- a/src/system/src/vnodeStream.c +++ b/src/system/src/vnodeStream.c @@ -55,7 +55,7 @@ void vnodeProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { int32_t numOfPoints = 0; - int32_t state = vnodeTransferMeterState(pObj, TSDB_METER_STATE_INSERT); + int32_t state = vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT); if (state == TSDB_METER_STATE_READY) { vnodeInsertPoints(pObj, (char *)pMsg, contLen, TSDB_DATA_SOURCE_SHELL, NULL, pObj->sversion, &numOfPoints); vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT); diff --git a/src/system/src/vnodeUtil.c b/src/system/src/vnodeUtil.c index f97b7cb00bc4809e73f02fcdeca7dec990f13607..f9d1966378df2934461329d515d9a6292258dbf2 100644 --- a/src/system/src/vnodeUtil.c +++ b/src/system/src/vnodeUtil.c @@ -586,7 +586,7 @@ void vnodeUpdateQueryColumnIndex(SQuery* pQuery, SMeterObj* pMeterObj) { } } -int32_t vnodeTransferMeterState(SMeterObj* pMeterObj, int32_t state) { +int32_t vnodeSetMeterState(SMeterObj* pMeterObj, int32_t state) { return __sync_val_compare_and_swap(&pMeterObj->state, TSDB_METER_STATE_READY, state); } @@ -619,7 +619,7 @@ bool vnodeIsSafeToDeleteMeter(SVnodeObj* pVnode, int32_t sid) { return true; } - int32_t prev = vnodeTransferMeterState(pObj, TSDB_METER_STATE_DELETING); + int32_t prev = vnodeSetMeterState(pObj, TSDB_METER_STATE_DELETING); /* * if the meter is not in ready/deleting state, it must be in insert/import/update,