diff --git a/include/os/os.h b/include/os/os.h index 71966061a19a175d816010ff6425b4004b1f2223..d646ffe4fdc4a3118c0594b406031a6287256c54 100644 --- a/include/os/os.h +++ b/include/os/os.h @@ -51,6 +51,9 @@ extern "C" { #endif #else +#ifndef __func__ +#define __func__ __FUNCTION__ +#endif #include #include #ifndef TD_USE_WINSOCK diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 059bfdeeb322158f6af66340cfaf4fedb24601a1..462b068a738b5ca95fc77b8ca86186b9d37c3e1a 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -117,9 +117,13 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) { SArray *pArray = taosArrayInit(innerSz, sizeof(void *)); for (int32_t j = 0; j < innerSz; j++) { SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); - if (pTask == NULL) return -1; + if (pTask == NULL) { + taosArrayDestroy(pArray); + return -1; + } if (tDecodeSStreamTask(pDecoder, pTask) < 0) { taosMemoryFree(pTask); + taosArrayDestroy(pArray); return -1; } taosArrayPush(pArray, &pTask); diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index c29541873e4fcc99323e3d61ab9f100187fe4d25..1a57a391b13d6c51abb95fb48f8e0e7bef0d52b8 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -180,6 +180,7 @@ int32_t tqMetaRestoreCheckInfo(STQ* pTq) { return -1; } } + tdbFree(pKey); tdbTbcClose(pCur); return 0; } diff --git a/source/dnode/vnode/src/tq/tqOffsetSnapshot.c b/source/dnode/vnode/src/tq/tqOffsetSnapshot.c index 084959af6556263ed6b34045cef406bfbc661a47..b63ff8af1d623aa2017ce5768a80e2dbf783d5a4 100644 --- a/source/dnode/vnode/src/tq/tqOffsetSnapshot.c +++ b/source/dnode/vnode/src/tq/tqOffsetSnapshot.c @@ -54,7 +54,7 @@ int32_t tqOffsetSnapRead(STqOffsetReader* pReader, uint8_t** ppData) { char* fname = tqOffsetBuildFName(pReader->pTq->path, 0); TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ); - if (pFile != NULL) { + if (pFile == NULL) { taosMemoryFree(fname); return 0; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 212a4aef54cfd25057a22b96c9b15a0d0b742f8d..f9e47f92fecaea8e7cbf5fa0b4cecaa2118c7971 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -767,6 +767,7 @@ typedef struct SStreamSessionAggOperatorInfo { SPhysiNode* pPhyNode; // create new child bool isFinal; bool ignoreExpiredData; + SHashObj* pGroupIdTbNameMap; } SStreamSessionAggOperatorInfo; typedef struct SStreamPartitionOperatorInfo { @@ -845,6 +846,7 @@ typedef struct SStreamStateAggOperatorInfo { void* pDelIterator; SArray* pChildren; // cache for children's result; bool ignoreExpiredData; + SHashObj* pGroupIdTbNameMap; } SStreamStateAggOperatorInfo; typedef struct SSortOperatorInfo { @@ -899,8 +901,12 @@ void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs); int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, const char* pkey); void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows); + +void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, + SDiskbasedBuf* pBuf); void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf); + int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf); bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo); void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 2f191db74db0a3a509a021101d929ea8f8e9695d..8c1104f51988930776d8745fceb7f727289751eb 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1389,6 +1389,46 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS return 0; } +void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, + SDiskbasedBuf* pBuf) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SSDataBlock* pBlock = pbInfo->pRes; + + // set output datablock version + pBlock->info.version = pTaskInfo->version; + + blockDataCleanup(pBlock); + if (!hasRemainResults(pGroupResInfo)) { + return; + } + + // clear the existed group id + pBlock->info.groupId = 0; + ASSERT(!pbInfo->mergeResultBlock); + doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo); + if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { + SStreamStateAggOperatorInfo* pInfo = pOperator->info; + + char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t)); + if (tbname != NULL) { + memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); + } else { + pBlock->info.parTbName[0] = 0; + } + } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || + pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || + pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { + SStreamSessionAggOperatorInfo* pInfo = pOperator->info; + + char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t)); + if (tbname != NULL) { + memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); + } else { + pBlock->info.parTbName[0] = 0; + } + } +} + void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 16d0c178e0f531613e7d65bb88e740e37a014a21..c968f827020999e177f2dba61886267789a3594a 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3728,6 +3728,9 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh pInfo->pPhyNode = pPhyNode; pInfo->ignoreExpiredData = pSessionNode->window.igExpired; + pInfo->pGroupIdTbNameMap = + taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); + pOperator->name = "StreamSessionWindowAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; pOperator->blocking = true; @@ -4351,7 +4354,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session"); return pInfo->pDelRes; } - doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); + doBuildStreamResBlock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); if (pBInfo->pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } @@ -4370,6 +4373,12 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { } printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "final session recv" : "single session recv"); + if (pBlock->info.parTbName[0]) { + taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName, + TSDB_TABLE_NAME_LEN); + /*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/ + } + if (pBlock->info.type == STREAM_CLEAR) { SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo)); doClearSessionWindows(&pInfo->streamAggSup, &pOperator->exprSupp, pBlock, START_TS_COLUMN_INDEX, @@ -4451,7 +4460,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session"); return pInfo->pDelRes; } - doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); + doBuildStreamResBlock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session"); return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; } @@ -4482,7 +4491,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { } { - doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); + doBuildStreamResBlock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); if (pBInfo->pRes->info.rows > 0) { printDataBlock(pBInfo->pRes, "semi session"); return pBInfo->pRes; @@ -4523,6 +4532,12 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { } printDataBlock(pBlock, "semi session recv"); + if (pBlock->info.parTbName[0]) { + taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName, + TSDB_TABLE_NAME_LEN); + /*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/ + } + if (pBlock->info.type == STREAM_CLEAR) { SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo)); doClearSessionWindows(&pInfo->streamAggSup, pSup, pBlock, START_TS_COLUMN_INDEX, pSup->numOfExprs, 0, pWins); @@ -4566,7 +4581,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); - doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); + doBuildStreamResBlock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); if (pBInfo->pRes->info.rows > 0) { printDataBlock(pBInfo->pRes, "semi session"); return pBInfo->pRes; @@ -4610,6 +4625,10 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream createOperatorFpSet(operatorDummyOpenFn, doStreamSessionSemiAgg, NULL, NULL, destroyStreamSessionAggOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); } + + pInfo->pGroupIdTbNameMap = + taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); + pOperator->operatorType = pPhyNode->type; if (numOfChild > 0) { pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*)); @@ -4910,7 +4929,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { printDataBlock(pInfo->pDelRes, "single state"); return pInfo->pDelRes; } - doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); + doBuildStreamResBlock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); if (pBInfo->pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } @@ -4972,7 +4991,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { printDataBlock(pInfo->pDelRes, "single state"); return pInfo->pDelRes; } - doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); + doBuildStreamResBlock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); printDataBlock(pBInfo->pRes, "single state"); return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; } @@ -5045,6 +5064,9 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pChildren = NULL; pInfo->ignoreExpiredData = pStateNode->window.igExpired; + pInfo->pGroupIdTbNameMap = + taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); + pOperator->name = "StreamStateAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE; pOperator->blocking = true; diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 4ead1147e47e8777a422cd8f63a74d4017a368a5..6060f8cd92f106dcb92a9aa971a26c0962bba37d 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -709,8 +709,15 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp int16_t outputType = GET_PARAM_TYPE(&pOutput[0]); int64_t outputLen = GET_PARAM_BYTES(&pOutput[0]); - char *outputBuf = taosMemoryCalloc(outputLen * pInput[0].numOfRows + 1, 1); - char *output = outputBuf; + int32_t code = TSDB_CODE_SUCCESS; + char * convBuf = taosMemoryMalloc(inputLen); + char * output = taosMemoryCalloc(1, outputLen + TSDB_NCHAR_SIZE); + char buf[400] = {0}; + + if (convBuf == NULL || output == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _end; + } for (int32_t i = 0; i < pInput[0].numOfRows; ++i) { if (colDataIsNull_s(pInput[0].columnData, i)) { @@ -723,17 +730,18 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp switch(outputType) { case TSDB_DATA_TYPE_TINYINT: { if (inputType == TSDB_DATA_TYPE_BINARY) { - *(int8_t *)output = taosStr2Int8(varDataVal(input), NULL, 10); + memcpy(buf, varDataVal(input), varDataLen(input)); + buf[varDataLen(input)] = 0; + *(int8_t *)output = taosStr2Int8(buf, NULL, 10); } else if (inputType == TSDB_DATA_TYPE_NCHAR) { - char *newBuf = taosMemoryCalloc(1, inputLen); - int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf); + int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), convBuf); if (len < 0) { - taosMemoryFree(newBuf); - return TSDB_CODE_FAILED; + code = TSDB_CODE_FAILED; + goto _end; } - newBuf[len] = 0; - *(int8_t *)output = taosStr2Int8(newBuf, NULL, 10); - taosMemoryFree(newBuf); + + convBuf[len] = 0; + *(int8_t *)output = taosStr2Int8(convBuf, NULL, 10); } else { GET_TYPED_DATA(*(int8_t *)output, int8_t, inputType, input); } @@ -741,17 +749,17 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp } case TSDB_DATA_TYPE_SMALLINT: { if (inputType == TSDB_DATA_TYPE_BINARY) { - *(int16_t *)output = taosStr2Int16(varDataVal(input), NULL, 10); + memcpy(buf, varDataVal(input), varDataLen(input)); + buf[varDataLen(input)] = 0; + *(int16_t *)output = taosStr2Int16(buf, NULL, 10); } else if (inputType == TSDB_DATA_TYPE_NCHAR) { - char *newBuf = taosMemoryCalloc(1, inputLen); - int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf); + int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), convBuf); if (len < 0) { - taosMemoryFree(newBuf); - return TSDB_CODE_FAILED; + code = TSDB_CODE_FAILED; + goto _end; } - newBuf[len] = 0; - *(int16_t *)output = taosStr2Int16(newBuf, NULL, 10); - taosMemoryFree(newBuf); + convBuf[len] = 0; + *(int16_t *)output = taosStr2Int16(convBuf, NULL, 10); } else { GET_TYPED_DATA(*(int16_t *)output, int16_t, inputType, input); } @@ -759,17 +767,18 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp } case TSDB_DATA_TYPE_INT: { if (inputType == TSDB_DATA_TYPE_BINARY) { - *(int32_t *)output = taosStr2Int32(varDataVal(input), NULL, 10); + memcpy(buf, varDataVal(input), varDataLen(input)); + buf[varDataLen(input)] = 0; + *(int32_t *)output = taosStr2Int32(buf, NULL, 10); } else if (inputType == TSDB_DATA_TYPE_NCHAR) { - char *newBuf = taosMemoryCalloc(1, inputLen); - int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf); + int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), convBuf); if (len < 0) { - taosMemoryFree(newBuf); - return TSDB_CODE_FAILED; + code = TSDB_CODE_FAILED; + goto _end; } - newBuf[len] = 0; - *(int32_t *)output = taosStr2Int32(newBuf, NULL, 10); - taosMemoryFree(newBuf); + + convBuf[len] = 0; + *(int32_t *)output = taosStr2Int32(convBuf, NULL, 10); } else { GET_TYPED_DATA(*(int32_t *)output, int32_t, inputType, input); } @@ -777,17 +786,17 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp } case TSDB_DATA_TYPE_BIGINT: { if (inputType == TSDB_DATA_TYPE_BINARY) { - *(int64_t *)output = taosStr2Int64(varDataVal(input), NULL, 10); + memcpy(buf, varDataVal(input), varDataLen(input)); + buf[varDataLen(input)] = 0; + *(int64_t *)output = taosStr2Int64(buf, NULL, 10); } else if (inputType == TSDB_DATA_TYPE_NCHAR) { - char *newBuf = taosMemoryCalloc(1, inputLen); - int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf); + int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), convBuf); if (len < 0) { - taosMemoryFree(newBuf); - return TSDB_CODE_FAILED; + code = TSDB_CODE_FAILED; + goto _end; } - newBuf[len] = 0; - *(int64_t *)output = taosStr2Int64(newBuf, NULL, 10); - taosMemoryFree(newBuf); + convBuf[len] = 0; + *(int64_t *)output = taosStr2Int64(convBuf, NULL, 10); } else { GET_TYPED_DATA(*(int64_t *)output, int64_t, inputType, input); } @@ -795,17 +804,17 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp } case TSDB_DATA_TYPE_UTINYINT: { if (inputType == TSDB_DATA_TYPE_BINARY) { - *(uint8_t *)output = taosStr2UInt8(varDataVal(input), NULL, 10); + memcpy(buf, varDataVal(input), varDataLen(input)); + buf[varDataLen(input)] = 0; + *(uint8_t *)output = taosStr2UInt8(buf, NULL, 10); } else if (inputType == TSDB_DATA_TYPE_NCHAR) { - char *newBuf = taosMemoryCalloc(1, inputLen); - int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf); + int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), convBuf); if (len < 0) { - taosMemoryFree(newBuf); - return TSDB_CODE_FAILED; + code = TSDB_CODE_FAILED; + goto _end; } - newBuf[len] = 0; - *(uint8_t *)output = taosStr2UInt8(newBuf, NULL, 10); - taosMemoryFree(newBuf); + convBuf[len] = 0; + *(uint8_t *)output = taosStr2UInt8(convBuf, NULL, 10); } else { GET_TYPED_DATA(*(uint8_t *)output, uint8_t, inputType, input); } @@ -813,17 +822,17 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp } case TSDB_DATA_TYPE_USMALLINT: { if (inputType == TSDB_DATA_TYPE_BINARY) { - *(uint16_t *)output = taosStr2UInt16(varDataVal(input), NULL, 10); + memcpy(buf, varDataVal(input), varDataLen(input)); + buf[varDataLen(input)] = 0; + *(uint16_t *)output = taosStr2UInt16(buf, NULL, 10); } else if (inputType == TSDB_DATA_TYPE_NCHAR) { - char *newBuf = taosMemoryCalloc(1, inputLen); - int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf); + int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), convBuf); if (len < 0) { - taosMemoryFree(newBuf); - return TSDB_CODE_FAILED; + code = TSDB_CODE_FAILED; + goto _end; } - newBuf[len] = 0; - *(uint16_t *)output = taosStr2UInt16(newBuf, NULL, 10); - taosMemoryFree(newBuf); + convBuf[len] = 0; + *(uint16_t *)output = taosStr2UInt16(convBuf, NULL, 10); } else { GET_TYPED_DATA(*(uint16_t *)output, uint16_t, inputType, input); } @@ -831,17 +840,17 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp } case TSDB_DATA_TYPE_UINT: { if (inputType == TSDB_DATA_TYPE_BINARY) { - *(uint32_t *)output = taosStr2UInt32(varDataVal(input), NULL, 10); + memcpy(buf, varDataVal(input), varDataLen(input)); + buf[varDataLen(input)] = 0; + *(uint32_t *)output = taosStr2UInt32(buf, NULL, 10); } else if (inputType == TSDB_DATA_TYPE_NCHAR) { - char *newBuf = taosMemoryCalloc(1, inputLen); - int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf); + int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), convBuf); if (len < 0) { - taosMemoryFree(newBuf); - return TSDB_CODE_FAILED; + code = TSDB_CODE_FAILED; + goto _end; } - newBuf[len] = 0; - *(uint32_t *)output = taosStr2UInt32(newBuf, NULL, 10); - taosMemoryFree(newBuf); + convBuf[len] = 0; + *(uint32_t *)output = taosStr2UInt32(convBuf, NULL, 10); } else { GET_TYPED_DATA(*(uint32_t *)output, uint32_t, inputType, input); } @@ -849,17 +858,18 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp } case TSDB_DATA_TYPE_UBIGINT: { if (inputType == TSDB_DATA_TYPE_BINARY) { - *(uint64_t *)output = taosStr2UInt64(varDataVal(input), NULL, 10); + memcpy(buf, varDataVal(input), varDataLen(input)); + buf[varDataLen(input)] = 0; + *(uint64_t *)output = taosStr2UInt64(buf, NULL, 10); } else if (inputType == TSDB_DATA_TYPE_NCHAR) { - char *newBuf = taosMemoryCalloc(1, inputLen); - int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf); + int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), convBuf); if (len < 0) { - taosMemoryFree(newBuf); - return TSDB_CODE_FAILED; + code = TSDB_CODE_FAILED; + goto _end; } - newBuf[len] = 0; - *(uint64_t *)output = taosStr2UInt64(newBuf, NULL, 10); - taosMemoryFree(newBuf); + + convBuf[len] = 0; + *(uint64_t *)output = taosStr2UInt64(convBuf, NULL, 10); } else { GET_TYPED_DATA(*(uint64_t *)output, uint64_t, inputType, input); } @@ -867,17 +877,17 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp } case TSDB_DATA_TYPE_FLOAT: { if (inputType == TSDB_DATA_TYPE_BINARY) { - *(float *)output = taosStr2Float(varDataVal(input), NULL); + memcpy(buf, varDataVal(input), varDataLen(input)); + buf[varDataLen(input)] = 0; + *(float *)output = taosStr2Float(buf, NULL); } else if (inputType == TSDB_DATA_TYPE_NCHAR) { - char *newBuf = taosMemoryCalloc(1, inputLen); - int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf); + int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), convBuf); if (len < 0) { - taosMemoryFree(newBuf); - return TSDB_CODE_FAILED; + code = TSDB_CODE_FAILED; + goto _end; } - newBuf[len] = 0; - *(float *)output = taosStr2Float(newBuf, NULL); - taosMemoryFree(newBuf); + convBuf[len] = 0; + *(float *)output = taosStr2Float(convBuf, NULL); } else { GET_TYPED_DATA(*(float *)output, float, inputType, input); } @@ -885,17 +895,17 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp } case TSDB_DATA_TYPE_DOUBLE: { if (inputType == TSDB_DATA_TYPE_BINARY) { - *(double *)output = taosStr2Double(varDataVal(input), NULL); + memcpy(buf, varDataVal(input), varDataLen(input)); + buf[varDataLen(input)] = 0; + *(double *)output = taosStr2Double(buf, NULL); } else if (inputType == TSDB_DATA_TYPE_NCHAR) { - char *newBuf = taosMemoryCalloc(1, inputLen); - int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf); + int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), convBuf); if (len < 0) { - taosMemoryFree(newBuf); - return TSDB_CODE_FAILED; + code = TSDB_CODE_FAILED; + goto _end; } - newBuf[len] = 0; - *(double *)output = taosStr2Double(newBuf, NULL); - taosMemoryFree(newBuf); + convBuf[len] = 0; + *(double *)output = taosStr2Double(convBuf, NULL); } else { GET_TYPED_DATA(*(double *)output, double, inputType, input); } @@ -903,17 +913,17 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp } case TSDB_DATA_TYPE_BOOL: { if (inputType == TSDB_DATA_TYPE_BINARY) { - *(bool *)output = taosStr2Int8(varDataVal(input), NULL, 10); + memcpy(buf, varDataVal(input), varDataLen(input)); + buf[varDataLen(input)] = 0; + *(bool *)output = taosStr2Int8(buf, NULL, 10); } else if (inputType == TSDB_DATA_TYPE_NCHAR) { - char *newBuf = taosMemoryCalloc(1, inputLen); - int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf); + int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), convBuf); if (len < 0) { - taosMemoryFree(newBuf); - return TSDB_CODE_FAILED; + code = TSDB_CODE_FAILED; + goto _end; } - newBuf[len] = 0; - *(bool *)output = taosStr2Int8(newBuf, NULL, 10); - taosMemoryFree(newBuf); + convBuf[len] = 0; + *(bool *)output = taosStr2Int8(convBuf, NULL, 10); } else { GET_TYPED_DATA(*(bool *)output, bool, inputType, input); } @@ -937,29 +947,27 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp } case TSDB_DATA_TYPE_BINARY: { if (inputType == TSDB_DATA_TYPE_BOOL) { + // NOTE: sprintf will append '\0' at the end of string int32_t len = sprintf(varDataVal(output), "%.*s", (int32_t)(outputLen - VARSTR_HEADER_SIZE), *(int8_t *)input ? "true" : "false"); varDataSetLen(output, len); } else if (inputType == TSDB_DATA_TYPE_BINARY) { int32_t len = TMIN(varDataLen(input), outputLen - VARSTR_HEADER_SIZE); - len = sprintf(varDataVal(output), "%.*s", len, varDataVal(input)); + memcpy(varDataVal(output), varDataVal(input), len); varDataSetLen(output, len); } else if (inputType == TSDB_DATA_TYPE_NCHAR) { - char *newBuf = taosMemoryCalloc(1, inputLen); - int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf); + int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), convBuf); if (len < 0) { - taosMemoryFree(newBuf); - return TSDB_CODE_FAILED; + code = TSDB_CODE_FAILED; + goto _end; } len = TMIN(len, outputLen - VARSTR_HEADER_SIZE); - memcpy(varDataVal(output), newBuf, len); + memcpy(varDataVal(output), convBuf, len); varDataSetLen(output, len); - taosMemoryFree(newBuf); } else { - char tmp[400] = {0}; - NUM_TO_STRING(inputType, input, sizeof(tmp), tmp); - int32_t len = (int32_t)strlen(tmp); + NUM_TO_STRING(inputType, input, sizeof(buf), buf); + int32_t len = (int32_t)strlen(buf); len = (outputLen - VARSTR_HEADER_SIZE) > len ? len : (outputLen - VARSTR_HEADER_SIZE); - memcpy(varDataVal(output), tmp, len); + memcpy(varDataVal(output), buf, len); varDataSetLen(output, len); } break; @@ -972,14 +980,17 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp len = sprintf(tmp, "%.*s", outputCharLen, *(int8_t *)input ? "true" : "false" ); bool ret = taosMbsToUcs4(tmp, len, (TdUcs4 *)varDataVal(output), outputLen - VARSTR_HEADER_SIZE, &len); if (!ret) { - return TSDB_CODE_FAILED; + code = TSDB_CODE_FAILED; + goto _end; } + varDataSetLen(output, len); } else if (inputType == TSDB_DATA_TYPE_BINARY) { len = outputCharLen > varDataLen(input) ? varDataLen(input) : outputCharLen; bool ret = taosMbsToUcs4(input + VARSTR_HEADER_SIZE, len, (TdUcs4 *)varDataVal(output), outputLen - VARSTR_HEADER_SIZE, &len); if (!ret) { - return TSDB_CODE_FAILED; + code = TSDB_CODE_FAILED; + goto _end; } varDataSetLen(output, len); } else if (inputType == TSDB_DATA_TYPE_NCHAR) { @@ -987,38 +998,39 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp memcpy(output, input, len + VARSTR_HEADER_SIZE); varDataSetLen(output, len); } else { - char tmp[400] = {0}; - NUM_TO_STRING(inputType, input, sizeof(tmp), tmp); - len = (int32_t)strlen(tmp); + NUM_TO_STRING(inputType, input, sizeof(buf), buf); + len = (int32_t)strlen(buf); len = outputCharLen > len ? len : outputCharLen; - bool ret = taosMbsToUcs4(tmp, len, (TdUcs4 *)varDataVal(output), outputLen - VARSTR_HEADER_SIZE, &len); + bool ret = taosMbsToUcs4(buf, len, (TdUcs4 *)varDataVal(output), outputLen - VARSTR_HEADER_SIZE, &len); if (!ret) { - return TSDB_CODE_FAILED; + code = TSDB_CODE_FAILED; + goto _end; } varDataSetLen(output, len); } + //for constant conversion, need to set proper length of pOutput description if (len < outputLen) { pOutput->columnData->info.bytes = len + VARSTR_HEADER_SIZE; } + break; } default: { - return TSDB_CODE_FAILED; + code = TSDB_CODE_FAILED; + goto _end; } } colDataAppend(pOutput->columnData, i, output, false); - if (IS_VAR_DATA_TYPE(outputType)) { - output += varDataTLen(output); - } else { - output += tDataTypes[outputType].bytes; - } } pOutput->numOfRows = pInput->numOfRows; - taosMemoryFree(outputBuf); - return TSDB_CODE_SUCCESS; + + _end: + taosMemoryFree(output); + taosMemoryFree(convBuf); + return code; } int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { @@ -1400,8 +1412,6 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p } else if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_TIMESTAMP) { /* unix timestamp or ts column*/ GET_TYPED_DATA(timeVal[k], int64_t, type, input[k]); if (type == TSDB_DATA_TYPE_TIMESTAMP) { - int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : - (timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000); int64_t timeValSec = timeVal[k] / factor; if (timeValSec < 1000000000) { timeVal[k] = timeValSec; diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index f663f4363d7afe7e0c258eadb94554678e2ce0e3..a148c1e36bbdabb249f6dc64dc7c86c6934e641e 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -1485,7 +1485,7 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN * if (dropOfp) { int ret = 0; SPgno pgno = *(SPgno *)(pCell + nLocal - sizeof(SPgno)); - int nLeft = nPayload - nLocal + sizeof(SPgno); + int nLeft = nPayload - nLocal + sizeof(SPgno) + nHeader; SPage *ofp; int bytes; diff --git a/source/libs/tdb/test/tdbExOVFLTest.cpp b/source/libs/tdb/test/tdbExOVFLTest.cpp index 58ea6147ef0ec9e76513a0b3c66513bc9321053d..d98c271edb128d3ba4c8b465dcd0aeba287bd0d4 100644 --- a/source/libs/tdb/test/tdbExOVFLTest.cpp +++ b/source/libs/tdb/test/tdbExOVFLTest.cpp @@ -238,8 +238,8 @@ TEST(TdbOVFLPagesTest, TbGetTest) { } } -TEST(TdbOVFLPagesTest, DISABLED_TbDeleteTest) { - // TEST(TdbOVFLPagesTest, TbDeleteTest) { +// TEST(TdbOVFLPagesTest, DISABLED_TbDeleteTest) { +TEST(TdbOVFLPagesTest, TbDeleteTest) { int ret = 0; taosRemoveDir("tdb"); @@ -267,7 +267,8 @@ TEST(TdbOVFLPagesTest, DISABLED_TbDeleteTest) { tdbBegin(pEnv, &txn); // generate value payload - char val[((4083 - 4 - 3 - 2) + 1) * 100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4) + // char val[((4083 - 4 - 3 - 2) + 1) * 100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4) + char val[((4083 - 4 - 3 - 2) + 1) * 2]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4) int valLen = sizeof(val) / sizeof(val[0]); generateBigVal(val, valLen); @@ -340,8 +341,8 @@ tdbBegin(pEnv, &txn); tdbTxnClose(&txn); } -TEST(tdb_test, DISABLED_simple_insert1) { - // TEST(tdb_test, simple_insert1) { +// TEST(tdb_test, DISABLED_simple_insert1) { +TEST(tdb_test, simple_insert1) { int ret; TDB *pEnv; TTB *pDb; diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 8f97f78556916b43bf9a9dc5c29e8542c051d6da..233d6a87b86fee566763fdd1c8aec9262008ab42 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -150,7 +150,6 @@ int walCheckAndRepairMeta(SWal* pWal) { const char* idxPattern = "^[0-9]+.idx$"; regex_t logRegPattern; regex_t idxRegPattern; - SArray* actualLog = taosArrayInit(8, sizeof(SWalFileInfo)); regcomp(&logRegPattern, logPattern, REG_EXTENDED); regcomp(&idxRegPattern, idxPattern, REG_EXTENDED); @@ -163,6 +162,8 @@ int walCheckAndRepairMeta(SWal* pWal) { return -1; } + SArray* actualLog = taosArrayInit(8, sizeof(SWalFileInfo)); + // scan log files and build new meta TdDirEntryPtr pDirEntry; while ((pDirEntry = taosReadDir(pDir)) != NULL) { diff --git a/source/os/src/osSocket.c b/source/os/src/osSocket.c index c1e02b48aa047e1043a41820ed77b6626e090aa2..bfeef248cdce61bba62d8cc5dcfd175b09559448 100644 --- a/source/os/src/osSocket.c +++ b/source/os/src/osSocket.c @@ -267,21 +267,29 @@ int32_t taosSetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void return -1; } #ifdef WINDOWS +#ifdef TCP_KEEPCNT if (level == SOL_SOCKET && optname == TCP_KEEPCNT) { return 0; } +#endif +#ifdef TCP_KEEPIDLE if (level == SOL_TCP && optname == TCP_KEEPIDLE) { return 0; } +#endif +#ifdef TCP_KEEPINTVL if (level == SOL_TCP && optname == TCP_KEEPINTVL) { return 0; } +#endif +#ifdef TCP_KEEPCNT if (level == SOL_TCP && optname == TCP_KEEPCNT) { return 0; } +#endif return setsockopt(pSocket->fd, level, optname, optval, optlen); #else @@ -601,26 +609,32 @@ int32_t taosKeepTcpAlive(TdSocketPtr pSocket) { #ifndef __APPLE__ // all fails on macosx +#ifdef TCP_KEEPCNT int32_t probes = 3; if (taosSetSockOpt(pSocket, SOL_TCP, TCP_KEEPCNT, (void *)&probes, sizeof(probes)) < 0) { // printf("fd:%d setsockopt SO_KEEPCNT failed: %d (%s)", sockFd, errno, strerror(errno)); taosCloseSocket(&pSocket); return -1; } +#endif +#ifdef TCP_KEEPIDLE int32_t alivetime = 10; if (taosSetSockOpt(pSocket, SOL_TCP, TCP_KEEPIDLE, (void *)&alivetime, sizeof(alivetime)) < 0) { // printf("fd:%d setsockopt SO_KEEPIDLE failed: %d (%s)", sockFd, errno, strerror(errno)); taosCloseSocket(&pSocket); return -1; } +#endif +#ifdef TCP_KEEPINTVL int32_t interval = 3; if (taosSetSockOpt(pSocket, SOL_TCP, TCP_KEEPINTVL, (void *)&interval, sizeof(interval)) < 0) { // printf("fd:%d setsockopt SO_KEEPINTVL failed: %d (%s)", sockFd, errno, strerror(errno)); taosCloseSocket(&pSocket); return -1; } +#endif #endif // __APPLE__ int32_t nodelay = 1; diff --git a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py index 371d147efc38735b4b0efdc19dc9288edbca5030..9a94632a12ea7746cc2d2ff9a660f8c83dbdb9c6 100644 --- a/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py +++ b/tests/system-test/6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py @@ -132,7 +132,7 @@ class TDTestCase: tdSql.error("create mnode on dnode 2") tdSql.query("select * from information_schema.ins_dnodes;") print(tdSql.queryResult) - clusterComCheck.checkDnodes(dnodeNumbers) + clusterComCheck.checkDnodes(dnodeNumbers, 60) # create database and stable clusterComCreate.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], paraDict["vgroups"],paraDict['replica']) diff --git a/tests/system-test/6-cluster/clusterCommonCheck.py b/tests/system-test/6-cluster/clusterCommonCheck.py index c37e3541d4e7b70d484fd17030b9ce581993bb59..28c481083395c74c02238d380946bd00c0b74c0f 100644 --- a/tests/system-test/6-cluster/clusterCommonCheck.py +++ b/tests/system-test/6-cluster/clusterCommonCheck.py @@ -37,10 +37,10 @@ class ClusterComCheck: tdSql.init(conn.cursor()) # tdSql.init(conn.cursor(), logSql) # output sql.txt file - def checkDnodes(self,dnodeNumbers): + def checkDnodes(self,dnodeNumbers, timeout=30): count=0 # print(tdSql) - while count < 30: + while count < timeout: tdSql.query("select * from information_schema.ins_dnodes") # tdLog.debug(tdSql.queryResult) status=0 @@ -50,14 +50,14 @@ class ClusterComCheck: tdLog.info(status) if status == dnodeNumbers: - tdLog.success("it find cluster with %d dnodes and check that all cluster dnodes are ready within 30s! " %dnodeNumbers) + tdLog.success("it find cluster with %d dnodes and check that all cluster dnodes are ready within %ds! " % (dnodeNumbers, count)) return True count+=1 time.sleep(1) else: tdSql.query("select * from information_schema.ins_dnodes") tdLog.debug(tdSql.queryResult) - tdLog.exit("it find cluster with %d dnodes but check that there dnodes are not ready within 30s ! "%dnodeNumbers) + tdLog.exit("it find cluster with %d dnodes but check that there dnodes are not ready within %ds ! "% (dnodeNumbers, timeout)) def checkDbRows(self,dbNumbers): dbNumbers=int(dbNumbers)