diff --git a/include/common/trow.h b/include/common/trow.h index 706dde2bec8d294e7ea5009184dd48523386746d..af1a86a0a10968e05d44ee40f52f0a6923e7a739 100644 --- a/include/common/trow.h +++ b/include/common/trow.h @@ -651,6 +651,8 @@ static int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) { TD_ROW_SET_INFO(pBuilder->pBuf, 0); TD_ROW_SET_TYPE(pBuilder->pBuf, pBuilder->rowType); + TASSERT(pBuilder->nBitmaps > 0 && pBuilder->flen > 0); + uint32_t len = 0; switch (pBuilder->rowType) { case TD_ROW_TP: @@ -1165,6 +1167,18 @@ static FORCE_INLINE int32_t tdGetColDataOfRow(SCellVal *pVal, SDataCol *pCol, in return TSDB_CODE_SUCCESS; } +/** + * @brief + * + * @param pRow + * @param colId + * @param colType + * @param flen + * @param offset + * @param colIdx start from 0 + * @param pVal + * @return FORCE_INLINE + */ static FORCE_INLINE bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t colType, int32_t flen, uint32_t offset, col_id_t colIdx, SCellVal *pVal) { if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) { @@ -1172,10 +1186,20 @@ static FORCE_INLINE bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t return true; } void *pBitmap = tdGetBitmapAddrTp(pRow, flen); - tdGetTpRowValOfCol(pVal, pRow, pBitmap, colType, offset - sizeof(TSKEY), colIdx - 1); + tdGetTpRowValOfCol(pVal, pRow, pBitmap, colType, offset - sizeof(TSKEY), colIdx); return true; } +/** + * @brief + * + * @param pRow + * @param colId + * @param offset + * @param colIdx start from 0 + * @param pVal + * @return FORCE_INLINE + */ static FORCE_INLINE bool tdSKvRowGetVal(STSRow *pRow, col_id_t colId, uint32_t offset, col_id_t colIdx, SCellVal *pVal) { if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) { @@ -1183,7 +1207,7 @@ static FORCE_INLINE bool tdSKvRowGetVal(STSRow *pRow, col_id_t colId, uint32_t o return true; } void *pBitmap = tdGetBitmapAddrKv(pRow, tdRowGetNCols(pRow)); - tdGetKvRowValOfCol(pVal, pRow, pBitmap, offset, colIdx - 1); + tdGetKvRowValOfCol(pVal, pRow, pBitmap, offset, colIdx); return true; } diff --git a/include/util/taoserror.h b/include/util/taoserror.h index c5b477343db6cd4ffe299db6913a6a3298d199b7..6c5e006671541dec1ab2059d90b9471358e6df80 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -617,6 +617,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_FUNC_FUNTION_ERROR TAOS_DEF_ERROR_CODE(0, 0x2800) #define TSDB_CODE_FUNC_FUNTION_PARA_NUM TAOS_DEF_ERROR_CODE(0, 0x2801) #define TSDB_CODE_FUNC_FUNTION_PARA_TYPE TAOS_DEF_ERROR_CODE(0, 0x2802) +#define TSDB_CODE_FUNC_FUNTION_PARA_VALUE TAOS_DEF_ERROR_CODE(0, 0x2803) #ifdef __cplusplus } diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index a6678b2ec0881e74f1fa4c18723285258b1bf6c8..6159da9cb161e6604cbdd4f504114bf4b831b1d3 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -630,10 +630,29 @@ void appHbMgrCleanup(void) { int sz = taosArrayGetSize(clientHbMgr.appHbMgrs); for (int i = 0; i < sz; i++) { SAppHbMgr *pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i); + + void *pIter = taosHashIterate(pTarget->activeInfo, NULL); + while (pIter != NULL) { + SClientHbReq *pOneReq = pIter; + hbFreeReq(pOneReq); + taosHashCleanup(pOneReq->info); + pIter = taosHashIterate(pTarget->activeInfo, pIter); + } taosHashCleanup(pTarget->activeInfo); pTarget->activeInfo = NULL; + + + pIter = taosHashIterate(pTarget->connInfo, NULL); + while (pIter != NULL) { + SHbConnInfo *info = pIter; + taosMemoryFree(info->param); + pIter = taosHashIterate(pTarget->connInfo, pIter); + } taosHashCleanup(pTarget->connInfo); pTarget->connInfo = NULL; + + taosMemoryFree(pTarget->key); + taosMemoryFree(pTarget); } } @@ -716,12 +735,23 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, in } void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) { - int32_t code = 0; - code = taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); - code = taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey)); - if (code) { + SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); + if (pReq) { + hbFreeReq(pReq); + taosHashCleanup(pReq->info); + taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); + } + + SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey)); + if (info) { + taosMemoryFree(info->param); + taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey)); + } + + if (NULL == pReq || NULL == info) { return; } + atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1); } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 2ad34d156176c0992175ed1683cc8d9cd65a6552..4686d97ec881aab75455c9e7e44a61bd02a8fcf1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1561,12 +1561,19 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit if (isChosenRowDataRow) { colId = pSchema->columns[chosen_itr].colId; offset = pSchema->columns[chosen_itr].offset; - tdSTpRowGetVal(row, colId, pSchema->columns[chosen_itr].type, pSchema->flen, offset, chosen_itr, &sVal); + // TODO: use STSRowIter + tdSTpRowGetVal(row, colId, pSchema->columns[chosen_itr].type, pSchema->flen, offset, chosen_itr - 1, &sVal); } else { - SKvRowIdx* pColIdx = tdKvRowColIdxAt(row, chosen_itr); - colId = pColIdx->colId; - offset = pColIdx->offset; - tdSKvRowGetVal(row, colId, offset, chosen_itr, &sVal); + // TODO: use STSRowIter + if (chosen_itr == 0) { + colId = PRIMARYKEY_TIMESTAMP_COL_ID; + tdSKvRowGetVal(row, PRIMARYKEY_TIMESTAMP_COL_ID, -1, -1, &sVal); + } else { + SKvRowIdx* pColIdx = tdKvRowColIdxAt(row, chosen_itr - 1); + colId = pColIdx->colId; + offset = pColIdx->offset; + tdSKvRowGetVal(row, colId, offset, chosen_itr - 1, &sVal); + } } if (colId == pColInfo->info.colId) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 230ccf6aceef66b699066da6182edb7355424ffa..4c9ed7876924143f9734f573abcfc05e44520413 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -490,6 +490,7 @@ typedef struct SGroupbyOperatorInfo { SExprInfo* pScalarExprInfo; int32_t numOfScalarExpr; // the number of scalar expression in group operator SqlFunctionCtx* pScalarFuncCtx; + int32_t* rowCellInfoOffset; // offset value for each row result cell info } SGroupbyOperatorInfo; typedef struct SDataGroupInfo { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 87ecdde3ab811108170cbc3f5a423bac3a6b3125..cd129b54fbd237dd2a2b6b2d50c4066276aa2967 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1173,6 +1173,9 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* setPseudoOutputColInfo(pResult, pCtx, pPseudoList); pResult->info.groupId = pSrcBlock->info.groupId; + // if the source equals to the destination, it is to create a new column as the result of scalar function or some operators. + bool createNewColModel = (pResult == pSrcBlock); + int32_t numOfRows = 0; for (int32_t k = 0; k < numOfOutput; ++k) { @@ -1181,7 +1184,7 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); - if (pResult->info.rows > 0) { + if (pResult->info.rows > 0 && !createNewColModel) { colDataMergeCol(pColInfoData, pResult->info.rows, pfCtx->input.pData[0], pfCtx->input.numOfRows); } else { colDataAssign(pColInfoData, pfCtx->input.pData[0], pfCtx->input.numOfRows); @@ -1191,7 +1194,7 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) { SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); - int32_t offset = pResult->info.rows; + int32_t offset = createNewColModel? 0: pResult->info.rows; for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) { colDataAppend(pColInfoData, i + offset, taosVariantGet(&pExpr[k].base.pParam[0].param, pExpr[k].base.pParam[0].param.nType), TSDB_DATA_TYPE_NULL == pExpr[k].base.pParam[0].param.nType); } @@ -1207,7 +1210,8 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* SScalarParam dest = {.columnData = &idata}; scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest); - colDataMergeCol(pResColData, pResult->info.rows, &idata, dest.numOfRows); + int32_t startOffset = createNewColModel? 0:pResult->info.rows; + colDataMergeCol(pResColData, startOffset, &idata, dest.numOfRows); numOfRows = dest.numOfRows; taosArrayDestroy(pBlockList); @@ -1224,7 +1228,7 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pfCtx->fpSet.init(&pCtx[k], pResInfo); pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId); - pfCtx->offset = pResult->info.rows; // set the start offset + pfCtx->offset = createNewColModel? 0:pResult->info.rows; // set the start offset // set the timestamp(_rowts) output buffer if (taosArrayGetSize(pPseudoList) > 0) { @@ -1242,7 +1246,9 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* SScalarParam dest = {.columnData = &idata}; scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest); - colDataMergeCol(pResColData, pResult->info.rows, &idata, dest.numOfRows); + + int32_t startOffset = createNewColModel? 0:pResult->info.rows; + colDataMergeCol(pResColData, startOffset, &idata, dest.numOfRows); numOfRows = dest.numOfRows; taosArrayDestroy(pBlockList); @@ -1252,7 +1258,9 @@ void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* } } - pResult->info.rows += numOfRows; + if (!createNewColModel) { + pResult->info.rows += numOfRows; + } } void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SArray* pDataBlock, TSKEY prevTs, diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 8018a8dd31022a01cf51f3f68a010ae9ec37c83b..d610880e304826cf9bafa5910125849ecbbb10b7 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -341,7 +341,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx pInfo->pScalarExprInfo = pScalarExprInfo; pInfo->numOfScalarExpr = numOfScalarExpr; - pInfo->pScalarFuncCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pInfo->binfo.rowCellInfoOffset); + pInfo->pScalarFuncCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->rowCellInfoOffset); int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 726f9d0993ee7e60292ebb1e84769ec21ffcbb21..c2629b9bf40f280742a5e24d1fb2315cdac43bb2 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -28,11 +28,15 @@ static int32_t buildFuncErrMsg(char* pErrBuf, int32_t len, int32_t errCode, cons } static int32_t invaildFuncParaNumErrMsg(char* pErrBuf, int32_t len, const char* pFuncName) { - return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_PARA_NUM, "Invalid number of arguments : %s", pFuncName); + return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_PARA_NUM, "Invalid number of parameters : %s", pFuncName); } static int32_t invaildFuncParaTypeErrMsg(char* pErrBuf, int32_t len, const char* pFuncName) { - return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_PARA_TYPE, "Inconsistent datatypes : %s", pFuncName); + return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_PARA_TYPE, "Invalid datatypes : %s", pFuncName); +} + +static int32_t invaildFuncParaValueErrMsg(char* pErrBuf, int32_t len, const char* pFuncName) { + return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_PARA_VALUE, "Invalid parameter value : %s", pFuncName); } // There is only one parameter of numeric type, and the return type is parameter type @@ -318,10 +322,15 @@ static int32_t translateCast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; // The function return type has been set during syntax parsing uint8_t para2Type = pFunc->node.resType.type; - if ((TSDB_DATA_TYPE_JSON == para1Type || TSDB_DATA_TYPE_BLOB == para1Type || TSDB_DATA_TYPE_MEDIUMBLOB == para1Type) || - (TSDB_DATA_TYPE_JSON == para2Type || TSDB_DATA_TYPE_BLOB == para2Type || TSDB_DATA_TYPE_MEDIUMBLOB == para2Type)) { + if (para2Type != TSDB_DATA_TYPE_BIGINT && para2Type != TSDB_DATA_TYPE_UBIGINT && + para2Type != TSDB_DATA_TYPE_VARCHAR && para2Type != TSDB_DATA_TYPE_NCHAR && + para2Type != TSDB_DATA_TYPE_TIMESTAMP) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } + int32_t para2Bytes = pFunc->node.resType.bytes; + if (para2Bytes <= 0) { //non-positive value or overflow + return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName); + } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 7699219f52671b600ffc36ed5736dd4b5634e2eb..3d796515a0ec937b65885d211619130604adf7b1 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -901,7 +901,7 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) { char* data = colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); - if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) { + if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) < cts) { memcpy(buf, data, bytes); *(TSKEY*)(buf + bytes) = cts; // DO_UPDATE_TAG_COLUMNS(pCtx, ts); @@ -919,7 +919,7 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) { char* data = colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); - if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) { + if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) < cts) { memcpy(buf, data, bytes); *(TSKEY*)(buf + bytes) = cts; pResInfo->numOfRes = 1; diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 1e8b732b6a4d52546ae52ad1e66a184232803a6e..da759bc15ab23f248d38f3cde9b1658bc1e0fea9 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -888,10 +888,7 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, if (PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) { TSKEY tsKey = TD_ROW_KEY(row); - if (checkTimestamp(pDataBlocks, (const char*)&tsKey) != TSDB_CODE_SUCCESS) { - buildSyntaxErrMsg(&pCxt->msg, "client time/server time can not be mixed up", sToken.z); - return TSDB_CODE_TSC_INVALID_TIME_STAMP; - } + checkTimestamp(pDataBlocks, (const char*)&tsKey); } } diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 02c6006f60d08e2aab5361948b36817d99f446ed..5e8eb805e3f688d331c4b54132a97d0b7f9f9640 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -659,27 +659,22 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { int16_t inputType = pInput[0].columnData->info.type; int16_t outputType = pOutput[0].columnData->info.type; - if (outputType != TSDB_DATA_TYPE_BIGINT && outputType != TSDB_DATA_TYPE_UBIGINT && - outputType != TSDB_DATA_TYPE_VARCHAR && outputType != TSDB_DATA_TYPE_NCHAR && - outputType != TSDB_DATA_TYPE_TIMESTAMP) { - return TSDB_CODE_FAILED; - } int64_t outputLen = pOutput[0].columnData->info.bytes; - char *input = NULL; + if (IS_VAR_DATA_TYPE(outputType)) { + int32_t factor = (TSDB_DATA_TYPE_NCHAR == outputType) ? TSDB_NCHAR_SIZE : 1; + outputLen = outputLen * factor + VARSTR_HEADER_SIZE; + } + char *outputBuf = taosMemoryCalloc(outputLen * pInput[0].numOfRows, 1); char *output = outputBuf; - if (IS_VAR_DATA_TYPE(inputType)) { - input = pInput[0].columnData->pData + pInput[0].columnData->varmeta.offset[0]; - } else { - input = pInput[0].columnData->pData; - } for (int32_t i = 0; i < pInput[0].numOfRows; ++i) { if (colDataIsNull_s(pInput[0].columnData, i)) { colDataAppendNULL(pOutput->columnData, i); continue; } + char *input = colDataGetData(pInput[0].columnData, i); switch(outputType) { case TSDB_DATA_TYPE_BIGINT: { @@ -736,7 +731,7 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp } else if (inputType == TSDB_DATA_TYPE_BINARY) { int32_t len = sprintf(varDataVal(output), "%.*s", (int32_t)(outputLen - VARSTR_HEADER_SIZE), varDataVal(input)); varDataSetLen(output, len); - } else if (inputType == TSDB_DATA_TYPE_BINARY || inputType == TSDB_DATA_TYPE_NCHAR) { + } else if (inputType == TSDB_DATA_TYPE_NCHAR) { //not support return TSDB_CODE_FAILED; } else { @@ -789,11 +784,6 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp } colDataAppend(pOutput->columnData, i, output, false); - if (IS_VAR_DATA_TYPE(inputType)) { - input += varDataTLen(input); - } else { - input += tDataTypes[inputType].bytes; - } if (IS_VAR_DATA_TYPE(outputType)) { output += varDataTLen(output); } else { diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 6cc435fee4b2be4dad426cd6c052c24919c8943c..c0b3ae7055d6b0d72784c40f7345476dc5126de7 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -2749,4 +2749,15 @@ void schedulerDestroy(void) { taosCloseRef(schMgmt.jobRef); schMgmt.jobRef = 0; } + + if (schMgmt.hbConnections) { + void *pIter = taosHashIterate(schMgmt.hbConnections, NULL); + while (pIter != NULL) { + SSchHbTrans *hb = pIter; + schFreeRpcCtx(&hb->rpcCtx); + pIter = taosHashIterate(schMgmt.hbConnections, pIter); + } + taosHashCleanup(schMgmt.hbConnections); + schMgmt.hbConnections = NULL; + } } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 9dc12a3dece380e15f6ee410047793de77c4984b..8eb1a3ee7d2fe5cf844c66be3452283568946abe 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -129,6 +129,15 @@ static void transDestroyConnCtx(STransConnCtx* ctx); static SCliThrdObj* createThrdObj(); static void destroyThrdObj(SCliThrdObj* pThrd); +static void cliWalkCb(uv_handle_t* handle, void* arg); + +#define CLI_RELEASE_UV(loop) \ + do { \ + uv_walk(loop, cliWalkCb, NULL); \ + uv_run(loop, UV_RUN_DEFAULT); \ + uv_loop_close(loop); \ + } while (0); + // snprintf may cause performance problem #define CONN_CONSTRUCT_HASH_KEY(key, ip, port) \ do { \ @@ -212,8 +221,10 @@ static void destroyThrdObj(SCliThrdObj* pThrd); } \ } while (0) -#define CONN_NO_PERSIST_BY_APP(conn) (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1) -#define CONN_RELEASE_BY_SERVER(conn) (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1) +#define CONN_NO_PERSIST_BY_APP(conn) \ + (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1) +#define CONN_RELEASE_BY_SERVER(conn) \ + (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1) #define REQUEST_NO_RESP(msg) ((msg)->noResp == 1) #define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1) @@ -288,8 +299,9 @@ void cliHandleResp(SCliConn* conn) { tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); } - tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, TMSG_INFO(pHead->msgType), - taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen); + tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, + TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), + taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen); conn->secured = pHead->secured; @@ -355,10 +367,12 @@ void cliHandleExcept(SCliConn* pConn) { if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); - tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle, TMSG_INFO(transMsg.msgType)); + tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle, + TMSG_INFO(transMsg.msgType)); if (transMsg.ahandle == NULL) { transMsg.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType)); - tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle); + tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn, + transMsg.ahandle); } } else { transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; @@ -631,8 +645,9 @@ void cliSend(SCliConn* pConn) { pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); - tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pHead->msgType), - taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); + tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, + TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), + taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); if (pHead->persist == 1) { CONN_SET_PERSIST_BY_APP(pConn); @@ -671,9 +686,11 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { destroyCmsg(pMsg); destroyConnPool(pThrd->pool); uv_timer_stop(&pThrd->timer); + uv_walk(pThrd->loop, cliWalkCb, NULL); + pThrd->quit = true; - uv_stop(pThrd->loop); + // uv_stop(pThrd->loop); } static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* conn = pMsg->msg.handle; @@ -786,7 +803,6 @@ static void* cliWorkThread(void* arg) { SCliThrdObj* pThrd = (SCliThrdObj*)arg; setThreadName("trans-cli-work"); uv_run(pThrd->loop, UV_RUN_DEFAULT); - return NULL; } @@ -851,8 +867,8 @@ static void destroyThrdObj(SCliThrdObj* pThrd) { if (pThrd == NULL) { return; } - uv_stop(pThrd->loop); taosThreadJoin(pThrd->thread, NULL); + CLI_RELEASE_UV(pThrd->loop); taosThreadMutexDestroy(&pThrd->msgMtx); transDestroyAsyncPool(pThrd->asyncPool); @@ -874,6 +890,11 @@ void cliSendQuit(SCliThrdObj* thrd) { msg->type = Quit; transSendAsync(thrd->asyncPool, &msg->q); } +void cliWalkCb(uv_handle_t* handle, void* arg) { + if (!uv_is_closing(handle)) { + uv_close(handle, NULL); + } +} int cliRBChoseIdx(STrans* pTransInst) { int64_t index = pTransInst->index; diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 03b97b6fa1a948768c74316f66c1e7eaaa670986..9e53811fd381a26b7fd35997041f63069a505761 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -195,7 +195,7 @@ SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) void transDestroyAsyncPool(SAsyncPool* pool) { for (int i = 0; i < pool->nAsync; i++) { uv_async_t* async = &(pool->asyncs[i]); - uv_close((uv_handle_t*)async, NULL); + // uv_close((uv_handle_t*)async, NULL); SAsyncItem* item = async->data; taosThreadMutexDestroy(&item->mtx); taosMemoryFree(item); diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 50ce0bddcd96ba82045c4775823944ef49a0e325..158d599bdfb183c4c1c5d1f4942d2d0898d2fbda 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -93,27 +93,6 @@ typedef struct SServerObj { static const char* notify = "a"; -#define CONN_SHOULD_RELEASE(conn, head) \ - do { \ - if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ - conn->status = ConnRelease; \ - transClearBuffer(&conn->readBuf); \ - transFreeMsg(transContFromHead((char*)head)); \ - tTrace("server conn %p received release request", conn); \ - \ - STransMsg tmsg = {.code = 0, .handle = (void*)conn, .ahandle = NULL}; \ - SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); \ - srvMsg->msg = tmsg; \ - srvMsg->type = Release; \ - srvMsg->pConn = conn; \ - if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ - return; \ - } \ - uvStartSendRespInternal(srvMsg); \ - return; \ - } \ - } while (0) - static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); @@ -125,11 +104,8 @@ static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) static void uvWorkerAsyncCb(uv_async_t* handle); static void uvAcceptAsyncCb(uv_async_t* handle); static void uvShutDownCb(uv_shutdown_t* req, int status); - -static void uvFreeCb(uv_handle_t* handle) { - // - taosMemoryFree(handle); -} +static void uvWalkCb(uv_handle_t* handle, void* arg); +static void uvFreeCb(uv_handle_t* handle); static void uvStartSendRespInternal(SSrvMsg* smsg); static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb); @@ -146,7 +122,8 @@ static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd); -static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease, uvHandleRegister}; +static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease, + uvHandleRegister}; static void uvDestroyConn(uv_handle_t* handle); @@ -158,6 +135,34 @@ static void* transAcceptThread(void* arg); static bool addHandleToWorkloop(void* arg); static bool addHandleToAcceptloop(void* arg); +#define CONN_SHOULD_RELEASE(conn, head) \ + do { \ + if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ + conn->status = ConnRelease; \ + transClearBuffer(&conn->readBuf); \ + transFreeMsg(transContFromHead((char*)head)); \ + tTrace("server conn %p received release request", conn); \ + \ + STransMsg tmsg = {.code = 0, .handle = (void*)conn, .ahandle = NULL}; \ + SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); \ + srvMsg->msg = tmsg; \ + srvMsg->type = Release; \ + srvMsg->pConn = conn; \ + if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ + return; \ + } \ + uvStartSendRespInternal(srvMsg); \ + return; \ + } \ + } while (0) + +#define SRV_RELEASE_UV(loop) \ + do { \ + uv_walk(loop, uvWalkCb, NULL); \ + uv_run(loop, UV_RUN_DEFAULT); \ + uv_loop_close(loop); \ + } while (0); + void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { SSrvConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; @@ -209,12 +214,13 @@ static void uvHandleReq(SSrvConn* pConn) { } if (pConn->status == ConnNormal && pHead->noResp == 0) { transRefSrvHandle(pConn); - tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), - ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen); + tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType), + taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), + ntohs(pConn->locaddr.sin_port), transMsg.contLen); } else { - tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn, TMSG_INFO(transMsg.msgType), - taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), - transMsg.contLen, pHead->noResp); + tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn, + TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), + taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen, pHead->noResp); // no ref here } @@ -354,8 +360,9 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { char* msg = (char*)pHead; int32_t len = transMsgLenFromCont(pMsg->contLen); - tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), - ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); + tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType), + taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), + ntohs(pConn->locaddr.sin_port)); pHead->msgLen = htonl(len); wb->base = msg; @@ -367,7 +374,7 @@ static void uvStartSendRespInternal(SSrvMsg* smsg) { uvPrepareSendData(smsg, &wb); SSrvConn* pConn = smsg->pConn; - uv_timer_stop(&pConn->pTimer); + // uv_timer_stop(&pConn->pTimer); uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb); } static void uvStartSendResp(SSrvMsg* smsg) { @@ -436,36 +443,17 @@ void uvWorkerAsyncCb(uv_async_t* handle) { static void uvWalkCb(uv_handle_t* handle, void* arg) { if (!uv_is_closing(handle)) { uv_close(handle, NULL); - // uv_unref(handle); - tDebug("handle: %p -----test----", handle); } } -#define MAKE_VALGRIND_HAPPY(loop) \ - do { \ - uv_walk(loop, uvWalkCb, NULL); \ - uv_run(loop, UV_RUN_DEFAULT); \ - uv_loop_close(loop); \ - } while (0); +static void uvFreeCb(uv_handle_t* handle) { + // + taosMemoryFree(handle); +} static void uvAcceptAsyncCb(uv_async_t* async) { SServerObj* srv = async->data; tDebug("close server port %d", srv->port); uv_walk(srv->loop, uvWalkCb, NULL); - // uv_close((uv_handle_t*)async, NULL); - // uv_close((uv_handle_t*)&srv->server, NULL); - // uv_stop(srv->loop); - // uv_print_all_handles(srv->loop, stderr); - // int ref = uv_loop_alive(srv->loop); - // assert(ref == 0); - // tError("active size %d", ref); - // uv_stop(srv->loop); - // uv_run(srv->loop, UV_RUN_DEFAULT); - // fprintf(stderr, "------------------------------------"); - // uv_print_all_handles(srv->loop, stderr); - - // int ret = uv_loop_close(srv->loop); - // tError("(loop)->active_reqs.count: %d, ret: %d", (srv->loop)->active_reqs.count, ret); - // assert(ret == 0); } static void uvShutDownCb(uv_shutdown_t* req, int status) { @@ -532,8 +520,8 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { pConn->pTransInst = pThrd->pTransInst; /* init conn timer*/ - uv_timer_init(pThrd->loop, &pConn->pTimer); - pConn->pTimer.data = pConn; + // uv_timer_init(pThrd->loop, &pConn->pTimer); + // pConn->pTimer.data = pConn; pConn->hostThrd = pThrd; @@ -677,17 +665,17 @@ static void uvDestroyConn(uv_handle_t* handle) { SWorkThrdObj* thrd = conn->hostThrd; tDebug("server conn %p destroy", conn); - uv_timer_stop(&conn->pTimer); + // uv_timer_stop(&conn->pTimer); transQueueDestroy(&conn->srvMsgs); QUEUE_REMOVE(&conn->queue); taosMemoryFree(conn->pTcp); - // taosMemoryFree(conn); + taosMemoryFree(conn); if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) { tTrace("work thread quit"); - // uv_walk(thrd->loop, uvWalkCb, NULL); - uv_loop_close(thrd->loop); - uv_stop(thrd->loop); + uv_walk(thrd->loop, uvWalkCb, NULL); + // uv_loop_close(thrd->loop); + // uv_stop(thrd->loop); } } @@ -749,9 +737,9 @@ End: void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) { thrd->quit = true; if (QUEUE_IS_EMPTY(&thrd->conn)) { - // uv_walk(thrd->loop, uvWalkCb, NULL); - uv_loop_close(thrd->loop); - uv_stop(thrd->loop); + uv_walk(thrd->loop, uvWalkCb, NULL); + // uv_loop_close(thrd->loop); + // uv_stop(thrd->loop); } else { destroyAllConn(thrd); } @@ -802,7 +790,7 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) { return; } taosThreadJoin(pThrd->thread, NULL); - // MAKE_VALGRIND_HAPPY(pThrd->loop); + SRV_RELEASE_UV(pThrd->loop); transDestroyAsyncPool(pThrd->asyncPool); taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd); @@ -822,7 +810,7 @@ void transCloseServer(void* arg) { uv_async_send(srv->pAcceptAsync); taosThreadJoin(srv->thread, NULL); - MAKE_VALGRIND_HAPPY(srv->loop); + SRV_RELEASE_UV(srv->loop); for (int i = 0; i < srv->numOfThreads; i++) { sendQuitToWorkThrd(srv->pThreadObj[i]);