diff --git a/include/common/tmsg.h b/include/common/tmsg.h index c5e9a1275610ea61ca96f343ff2000902f7290df..caf67ee3a99cf054a7b6add325fbd1af7b5e8b3f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -914,6 +914,7 @@ typedef struct { int32_t numOfRetensions; SArray* pRetensions; int8_t schemaless; + int16_t sstTrigger; } SDbCfgRsp; int32_t tSerializeSDbCfgRsp(void* buf, int32_t bufLen, const SDbCfgRsp* pRsp); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 0f57da2b804c7c58411ee7ffa2cbb1fc4d68b3e8..1c52d7ea5df4904938201ee379c7b1068b25c0a0 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -370,7 +370,8 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask); void tFreeSStreamTask(SStreamTask* pTask); static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) { - if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { + int8_t type = pItem->type; + if (type == STREAM_INPUT__DATA_SUBMIT) { SStreamDataSubmit2* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit2*)pItem); if (pSubmitClone == NULL) { qDebug("task %d %p submit enqueue failed since out of memory", pTask->taskId, pTask); @@ -382,19 +383,19 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem pSubmitClone->submit.msgStr, pSubmitClone->submit.msgLen, pSubmitClone->submit.ver); taosWriteQitem(pTask->inputQueue->queue, pSubmitClone); // qStreamInput(pTask->exec.executor, pSubmitClone); - } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE || - pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { + } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || + type == STREAM_INPUT__REF_DATA_BLOCK) { taosWriteQitem(pTask->inputQueue->queue, pItem); // qStreamInput(pTask->exec.executor, pItem); - } else if (pItem->type == STREAM_INPUT__CHECKPOINT) { + } else if (type == STREAM_INPUT__CHECKPOINT) { taosWriteQitem(pTask->inputQueue->queue, pItem); // qStreamInput(pTask->exec.executor, pItem); - } else if (pItem->type == STREAM_INPUT__GET_RES) { + } else if (type == STREAM_INPUT__GET_RES) { taosWriteQitem(pTask->inputQueue->queue, pItem); // qStreamInput(pTask->exec.executor, pItem); } - if (pItem->type != STREAM_INPUT__GET_RES && pItem->type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) { + if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) { atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE); } diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 4c23c1f5575434cc971a49a042df4ce1720b73ef..defafce30eb14e8c2cf6aaa4408199340a9d2adf 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -193,7 +193,7 @@ typedef struct SSyncLogStore { SyncIndex (*syncLogLastIndex)(struct SSyncLogStore* pLogStore); SyncTerm (*syncLogLastTerm)(struct SSyncLogStore* pLogStore); - int32_t (*syncLogAppendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); + int32_t (*syncLogAppendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, bool forcSync); int32_t (*syncLogGetEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry); int32_t (*syncLogTruncate)(struct SSyncLogStore* pLogStore, SyncIndex fromIndex); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index ab51a64686ee7c719a89d35df9129f14f988f1de..fbab1ee08b847544c3711d73697636f502c2ce25 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -400,45 +400,6 @@ void destroyRequest(SRequestObj *pRequest) { removeRequest(pRequest->self); } -void taosClientCrash(int signum, void *sigInfo, void *context) { - taosIgnSignal(SIGTERM); - taosIgnSignal(SIGHUP); - taosIgnSignal(SIGINT); - taosIgnSignal(SIGBREAK); - -#if !defined(WINDOWS) - taosIgnSignal(SIGBUS); -#endif - taosIgnSignal(SIGABRT); - taosIgnSignal(SIGFPE); - taosIgnSignal(SIGSEGV); - - char *pMsg = NULL; - const char *flags = "UTL FATAL "; - ELogLevel level = DEBUG_FATAL; - int32_t dflag = 255; - int64_t msgLen= -1; - - if (tsEnableCrashReport) { - if (taosGenCrashJsonMsg(signum, &pMsg, lastClusterId, appInfo.startTime)) { - taosPrintLog(flags, level, dflag, "failed to generate crash json msg"); - goto _return; - } else { - msgLen = strlen(pMsg); - } - } - -_return: - - taosLogCrashInfo("taos", pMsg, msgLen, signum, sigInfo); - -#ifdef _TD_DARWIN_64 - exit(signum); -#elif defined(WINDOWS) - exit(signum); -#endif -} - void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, -1); } static void *tscCrashReportThreadFp(void *param) { @@ -535,15 +496,26 @@ void tscStopCrashReport() { } } -static void tscSetSignalHandle() { -#if !defined(WINDOWS) - taosSetSignal(SIGBUS, taosClientCrash); -#endif - taosSetSignal(SIGABRT, taosClientCrash); - taosSetSignal(SIGFPE, taosClientCrash); - taosSetSignal(SIGSEGV, taosClientCrash); + +void tscWriteCrashInfo(int signum, void *sigInfo, void *context) { + char *pMsg = NULL; + const char *flags = "UTL FATAL "; + ELogLevel level = DEBUG_FATAL; + int32_t dflag = 255; + int64_t msgLen= -1; + + if (tsEnableCrashReport) { + if (taosGenCrashJsonMsg(signum, &pMsg, lastClusterId, appInfo.startTime)) { + taosPrintLog(flags, level, dflag, "failed to generate crash json msg"); + } else { + msgLen = strlen(pMsg); + } + } + + taosLogCrashInfo("taos", pMsg, msgLen, signum, sigInfo); } + void taos_init_imp(void) { // In the APIs of other program language, taos_cleanup is not available yet. // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning. @@ -567,8 +539,6 @@ void taos_init_imp(void) { return; } - tscSetSignalHandle(); - initQueryModuleMsgHandle(); if (taosConvInit() != 0) { diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index a119408a08cc8040266fad7619dc2145da8a6894..2c53fe40800badfd17d308d4aca094729edf1d3d 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1239,7 +1239,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t int64_t transporterId = 0; asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); - + tsem_wait(&pRequest->body.rspSem); if (pRequest->code != TSDB_CODE_SUCCESS) { const char* errorMsg = diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 1179f4d23a024c5657e6cc082143a919bcb7683c..7b04603f95c3880cb75b5a48cfe607aa05eba9f0 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -528,9 +528,8 @@ void taos_stop_query(TAOS_RES *res) { SRequestObj *pRequest = (SRequestObj *)res; pRequest->killed = true; - int32_t numOfFields = taos_num_fields(pRequest); // It is not a query, no need to stop. - if (numOfFields == 0) { + if (NULL == pRequest->pQuery || QUERY_EXEC_MODE_SCHEDULE != pRequest->pQuery->execMode) { tscDebug("request 0x%" PRIx64 " no need to be killed since not query", pRequest->requestId); return; } diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 3fd6fed4fceb19f52026ab19ca1733b1b0a86a2e..72a00ef471fa0010efc7666c00b2a5e020509ab2 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -179,7 +179,7 @@ static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) { } string = cJSON_PrintUnformatted(json); - end: +end: cJSON_Delete(json); tFreeSMAltertbReq(&req); return string; @@ -200,7 +200,7 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) { } string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE); - _err: +_err: tDecoderClear(&coder); return string; } @@ -220,7 +220,7 @@ static char* processAlterStb(SMqMetaRsp* metaRsp) { } string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen); - _err: +_err: tDecoderClear(&coder); return string; } @@ -302,7 +302,7 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) { cJSON_AddItemToArray(tags, tag); } - end: +end: cJSON_AddItemToObject(json, "tags", tags); taosArrayDestroy(pTagVals); } @@ -360,7 +360,7 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) { } } - _exit: +_exit: for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pCreateReq = req.pReqs + iReq; taosMemoryFreeClear(pCreateReq->comment); @@ -373,7 +373,7 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) { } static char* processAutoCreateTable(STaosxRsp* rsp) { - if(rsp->createTableNum <= 0){ + if (rsp->createTableNum <= 0) { uError("WriteRaw:processAutoCreateTable rsp->createTableNum <= 0"); goto _exit; } @@ -392,14 +392,14 @@ static char* processAutoCreateTable(STaosxRsp* rsp) { goto _exit; } - if(pCreateReq[iReq].type != TSDB_CHILD_TABLE){ + if (pCreateReq[iReq].type != TSDB_CHILD_TABLE) { uError("WriteRaw:processAutoCreateTable pCreateReq[iReq].type != TSDB_CHILD_TABLE"); goto _exit; } } string = buildCreateCTableJson(pCreateReq, rsp->createTableNum); - _exit: +_exit: for (int i = 0; i < rsp->createTableNum; i++) { tDecoderClear(&decoder[i]); taosMemoryFreeClear(pCreateReq[i].comment); @@ -500,7 +500,7 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) { char* buf = NULL; if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) { - if(!tTagIsJson(vAlterTbReq.pTagVal)){ + if (!tTagIsJson(vAlterTbReq.pTagVal)) { uError("processAlterTable isJson false"); goto _exit; } @@ -524,7 +524,7 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) { } string = cJSON_PrintUnformatted(json); - _exit: +_exit: cJSON_Delete(json); tDecoderClear(&decoder); return string; @@ -557,12 +557,12 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) { string = cJSON_PrintUnformatted(json); - _exit: +_exit: cJSON_Delete(json); tDecoderClear(&decoder); return string; } -static char* processDeleteTable(SMqMetaRsp* metaRsp){ +static char* processDeleteTable(SMqMetaRsp* metaRsp) { SDeleteRes req = {0}; SDecoder coder = {0}; int32_t code = TSDB_CODE_SUCCESS; @@ -596,7 +596,7 @@ static char* processDeleteTable(SMqMetaRsp* metaRsp){ string = cJSON_PrintUnformatted(json); - _exit: +_exit: cJSON_Delete(json); tDecoderClear(&coder); return string; @@ -638,7 +638,7 @@ static char* processDropTable(SMqMetaRsp* metaRsp) { string = cJSON_PrintUnformatted(json); - _exit: +_exit: cJSON_Delete(json); tDecoderClear(&decoder); return string; @@ -726,7 +726,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { code = pRequest->code; taosMemoryFree(pCmdMsg.pMsg); - end: +end: destroyRequest(pRequest); tFreeSMCreateStbReq(&pReq); tDecoderClear(&coder); @@ -796,7 +796,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { code = pRequest->code; taosMemoryFree(pCmdMsg.pMsg); - end: +end: destroyRequest(pRequest); tDecoderClear(&coder); return code; @@ -857,9 +857,9 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch); SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, - .requestId = pRequest->requestId, - .requestObjRefId = pRequest->self, - .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; + .requestId = pRequest->requestId, + .requestObjRefId = pRequest->self, + .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName)); // loop to create table @@ -939,7 +939,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { code = pRequest->code; - end: +end: for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pCreateReq = req.pReqs + iReq; taosMemoryFreeClear(pCreateReq->comment); @@ -1009,9 +1009,9 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch); SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, - .requestId = pRequest->requestId, - .requestObjRefId = pRequest->self, - .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; + .requestId = pRequest->requestId, + .requestObjRefId = pRequest->self, + .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName)); // loop to create table for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { @@ -1063,7 +1063,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { } code = pRequest->code; - end: +end: taosHashCleanup(pVgroupHashmap); destroyRequest(pRequest); tDecoderClear(&coder); @@ -1131,7 +1131,7 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { } taos_free_result(res); - end: +end: tDecoderClear(&coder); return code; } @@ -1178,9 +1178,9 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { } SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, - .requestId = pRequest->requestId, - .requestObjRefId = pRequest->self, - .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; + .requestId = pRequest->requestId, + .requestObjRefId = pRequest->self, + .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; SVgroupInfo pInfo = {0}; SName pName = {0}; @@ -1239,7 +1239,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { code = handleAlterTbExecRes(pRes->res, pCatalog); } } - end: +end: taosArrayDestroy(pArray); if (pVgData) taosMemoryFreeClear(pVgData->pData); taosMemoryFreeClear(pVgData); @@ -1402,7 +1402,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) launchQueryImpl(pRequest, pQuery, true, NULL); code = pRequest->code; - end: +end: taosMemoryFreeClear(pTableMeta); qDestroyQuery(pQuery); destroyRequest(pRequest); @@ -1521,7 +1521,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { launchQueryImpl(pRequest, pQuery, true, NULL); code = pRequest->code; - end: +end: tDeleteSMqDataRsp(&rspObj.rsp); tDecoderClear(&decoder); qDestroyQuery(pQuery); @@ -1619,7 +1619,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) goto end; } - if(pCreateReq.type != TSDB_CHILD_TABLE){ + if (pCreateReq.type != TSDB_CHILD_TABLE) { uError("WriteRaw:pCreateReq.type != TSDB_CHILD_TABLE. table name: %s", tbName); code = TSDB_CODE_TSC_INVALID_VALUE; goto end; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 20bb265e783b5149c0364ad7e0b567b6a5ca678f..95f74da8034eb70fff4c09802e55e6f754b79323 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2821,8 +2821,8 @@ int32_t tSerializeSDbCfgRsp(void *buf, int32_t bufLen, const SDbCfgRsp *pRsp) { if (tEncodeI8(&encoder, pRetension->keepUnit) < 0) return -1; } if (tEncodeI8(&encoder, pRsp->schemaless) < 0) return -1; + if (tEncodeI16(&encoder, pRsp->sstTrigger) < 0) return -1; tEndEncode(&encoder); - int32_t tlen = encoder.pos; tEncoderClear(&encoder); return tlen; @@ -2873,6 +2873,7 @@ int32_t tDeserializeSDbCfgRsp(void *buf, int32_t bufLen, SDbCfgRsp *pRsp) { } } if (tDecodeI8(&decoder, &pRsp->schemaless) < 0) return -1; + if (tDecodeI16(&decoder, &pRsp->sstTrigger) < 0) return -1; tEndDecode(&decoder); tDecoderClear(&decoder); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 7e5c29d56f11211b9475834e8c4a666547056398..bdfda14a32772644a476b6ad6cbdc7d0e3989795 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -889,7 +889,7 @@ static int32_t mndProcessGetDbCfgReq(SRpcMsg *pReq) { cfgRsp.numOfRetensions = pDb->cfg.numOfRetensions; cfgRsp.pRetensions = pDb->cfg.pRetensions; cfgRsp.schemaless = pDb->cfg.schemaless; - + cfgRsp.sstTrigger = pDb->cfg.sstTrigger; int32_t contLen = tSerializeSDbCfgRsp(NULL, 0, &cfgRsp); void *pRsp = rpcMallocCont(contLen); if (pRsp == NULL) { diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 3f3e287bb9f6b450ad9470d9935a76c65cc03e19..7d368c76c55d61af9305acb23d180cefdc936396 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -153,6 +153,8 @@ typedef struct SMTbCursor SMTbCursor; SMTbCursor *metaOpenTbCursor(SMeta *pMeta); void metaCloseTbCursor(SMTbCursor *pTbCur); int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType); +int32_t metaTbCursorPrev(SMTbCursor *pTbCur); + #endif // tsdb diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 4b280a32f1f5915876dbeb8fd0ff4b6beaa4c658..fc41d507a8936eb5e35db53508b74c6dd03d2374 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -310,7 +310,7 @@ void metaCloseTbCursor(SMTbCursor *pTbCur) { } } -int metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType) { +int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType) { int ret; void *pBuf; STbCfg tbCfg; @@ -334,6 +334,30 @@ int metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType) { return 0; } +int32_t metaTbCursorPrev(SMTbCursor *pTbCur) { + int ret; + void *pBuf; + STbCfg tbCfg; + + for (;;) { + ret = tdbTbcPrev(pTbCur->pDbc, &pTbCur->pKey, &pTbCur->kLen, &pTbCur->pVal, &pTbCur->vLen); + if (ret < 0) { + return -1; + } + + tDecoderClear(&pTbCur->mr.coder); + + metaGetTableEntryByVersion(&pTbCur->mr, ((SUidIdxVal *)pTbCur->pVal)[0].version, *(tb_uid_t *)pTbCur->pKey); + if (pTbCur->mr.me.type == TSDB_SUPER_TABLE) { + continue; + } + + break; + } + + return 0; +} + SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) { void *pData = NULL; int nData = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 6a5acecfc3226753108863e3537f281028c8a34f..a2de1bdf4e1251d4f4276603da1d490a9c214692 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -268,7 +268,10 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } taosThreadMutexLock(&pr->readerMutex); - tsdbTakeReadSnap((STsdbReader*)pr, tsdbCacheQueryReseek, &pr->pReadSnap); + code = tsdbTakeReadSnap((STsdbReader*)pr, tsdbCacheQueryReseek, &pr->pReadSnap); + if (code != TSDB_CODE_SUCCESS) { + goto _end; + } pr->pDataFReader = NULL; pr->pDataFReaderLast = NULL; @@ -279,7 +282,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h); if (code != TSDB_CODE_SUCCESS) { - return code; + goto _end; } if (h == NULL) { @@ -352,7 +355,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 STableKeyInfo* pKeyInfo = &pr->pTableList[i]; code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h); if (code != TSDB_CODE_SUCCESS) { - return code; + goto _end; } if (h == NULL) { diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index 7dc839773f11b6bb823375f35281977e94b74f8b..51fdc69a95cc0a935a6b6e2abd390d5927384a50 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -458,9 +458,8 @@ static int32_t tsdbMergeFileSet(STsdb *pTsdb, SDFileSet *pSetOld, SDFileSet *pSe taosMemoryFree(pHeadF); } } else { - nRef = pHeadF->nRef; - *pHeadF = *pSetNew->pHeadF; - pHeadF->nRef = nRef; + ASSERT(pHeadF->offset == pSetNew->pHeadF->offset); + ASSERT(pHeadF->size == pSetNew->pHeadF->size); } // data @@ -481,9 +480,7 @@ static int32_t tsdbMergeFileSet(STsdb *pTsdb, SDFileSet *pSetOld, SDFileSet *pSe taosMemoryFree(pDataF); } } else { - nRef = pDataF->nRef; - *pDataF = *pSetNew->pDataF; - pDataF->nRef = nRef; + pDataF->size = pSetNew->pDataF->size; } // sma @@ -504,9 +501,7 @@ static int32_t tsdbMergeFileSet(STsdb *pTsdb, SDFileSet *pSetOld, SDFileSet *pSe taosMemoryFree(pSmaF); } } else { - nRef = pSmaF->nRef; - *pSmaF = *pSetNew->pSmaF; - pSmaF->nRef = nRef; + pSmaF->size = pSetNew->pSmaF->size; } // stt diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index d8c379f476943d64de5c93d9643b04ebfa832322..c967f8d0a5a0caa6086df7f23f7c8020ad9f3a4e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1776,12 +1776,15 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } if (minKey == k.ts) { + STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); + if (pSchema == NULL) { + return terrno; + } if (init) { - tsdbRowMerge(&merge, pRow); + tsdbRowMergerAdd(&merge, pRow, pSchema); } else { init = true; - STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); - int32_t code = tsdbRowMergerInit(&merge, pRow, pSchema); + int32_t code = tsdbRowMergerInit(&merge, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2882,7 +2885,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { if (pResBlock->info.rows > 0) { tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 - " rows:%d, elapsed time:%.2f ms %s", + " rows:%d, elapsed time:%.2f ms %s", pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows, el, pReader->idStr); } @@ -2932,7 +2935,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { if (pResBlock->info.rows > 0) { tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 - " rows:%d, elapsed time:%.2f ms %s", + " rows:%d, elapsed time:%.2f ms %s", pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows, el, pReader->idStr); } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index f4ac4b7a975b9639d865ef7bd1a9631edff7fa37..25fafff25a0e56a5ad70f7c3208a3b773f0c854e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -732,6 +732,7 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal); if (key.version > pMerger->version) { +#if 0 if (!COL_VAL_IS_NONE(pColVal)) { if ((!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) { SColVal *tColVal = taosArrayGet(pMerger->pArray, iCol); @@ -747,6 +748,28 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) taosArraySet(pMerger->pArray, iCol, pColVal); } } +#endif + if (!COL_VAL_IS_NONE(pColVal)) { + if (IS_VAR_DATA_TYPE(pColVal->type)) { + SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol); + if (!COL_VAL_IS_NULL(pColVal)) { + code = tRealloc(&pTColVal->value.pData, pColVal->value.nData); + if (code) return code; + + pTColVal->value.nData = pColVal->value.nData; + if (pTColVal->value.nData) { + memcpy(pTColVal->value.pData, pColVal->value.pData, pTColVal->value.nData); + } + pTColVal->flag = 0; + } else { + tFree(pTColVal->value.pData); + pTColVal->value.pData = NULL; + taosArraySet(pMerger->pArray, iCol, pColVal); + } + } else { + taosArraySet(pMerger->pArray, iCol, pColVal); + } + } } else if (key.version < pMerger->version) { SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol); if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) { diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 836ce87fbb7bb0620567be05030527c4ca7ac101..7ee7a24f97146ce91b1294d21a81e8cf163b2993 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -805,6 +805,7 @@ int32_t ctgMakeVgArray(SDBVgInfo* dbInfo); int32_t ctgAcquireVgMetaFromCache(SCatalog *pCtg, const char *dbFName, const char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb); int32_t ctgCopyTbMeta(SCatalog *pCtg, SCtgTbMetaCtx *ctx, SCtgDBCache **pDb, SCtgTbCache **pTb, STableMeta **pTableMeta, char* dbFName); void ctgReleaseVgMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache); +void ctgReleaseTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache); extern SCatalogMgmt gCtgMgmt; extern SCtgDebug gCTGDebug; diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index c7af0411bea055b5cf6afb13941735ce3c104eaa..f9a218835ea77d4a20aa1b7ac8086187c995c561 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -598,10 +598,16 @@ int32_t ctgGetCachedTbVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInf CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pTableName, pVgroup)); + ctgRUnlockVgInfo(dbCache); + SCtgTbMetaCtx ctx = {0}; ctx.pName = (SName*)pTableName; ctx.flag = CTG_FLAG_UNKNOWN_STB; - CTG_ERR_JRET(ctgCopyTbMeta(pCtg, &ctx, &dbCache, &tbCache, pTableMeta, db)); + code = ctgCopyTbMeta(pCtg, &ctx, &dbCache, &tbCache, pTableMeta, db); + + ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); + + CTG_RET(code); _return: diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 438128203ef10867f433693940ab4d62e0c65ebd..325d6e0e46c61e398e3766b931fe6141abd710c4 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -999,6 +999,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq)); ctgReleaseVgInfoToCache(pCtg, dbCache); + dbCache = NULL; } else { SBuildUseDBInput input = {0}; @@ -1168,6 +1169,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq)); ctgReleaseVgInfoToCache(pCtg, dbCache); + dbCache = NULL; } else { SBuildUseDBInput input = {0}; diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index c266cc1df9fc0ab5bd61574f519920df327630c9..6e4077eae05aa8cead28111f13f9600bb4e3244d 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -2118,7 +2118,7 @@ int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) { _return: - if (dbCache) { + if (code == TSDB_CODE_SUCCESS && dbCache) { ctgWUnlockVgInfo(dbCache); } diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index c87f6953eb5fbb2515b2db54ab1e85854111eeb2..6eef1ded695accbcf204e004c4d00af3987d8d91 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -281,10 +281,10 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbFName, S len += sprintf( buf2 + VARSTR_HEADER_SIZE, "CREATE DATABASE `%s` BUFFER %d CACHESIZE %d CACHEMODEL '%s' COMP %d DURATION %dm " - "WAL_FSYNC_PERIOD %d MAXROWS %d MINROWS %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d " + "WAL_FSYNC_PERIOD %d MAXROWS %d MINROWS %d STT_TRIGGER %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d " "WAL_LEVEL %d VGROUPS %d SINGLE_STABLE %d", dbFName, pCfg->buffer, pCfg->cacheSize, cacheModelStr(pCfg->cacheLast), pCfg->compression, pCfg->daysPerFile, - pCfg->walFsyncPeriod, pCfg->maxRows, pCfg->minRows, pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2, + pCfg->walFsyncPeriod, pCfg->maxRows, pCfg->minRows, pCfg->sstTrigger, pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2, pCfg->pages, pCfg->pageSize, prec, pCfg->replications, pCfg->walLevel, pCfg->numOfVgroups, 1 == pCfg->numOfStables); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 6ec60ee85722d6342074cbc7e2e87fa41452b943..c599b479e61b4c30a46713e4549983a6fd448c19 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -24,12 +24,16 @@ static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT; int32_t exchangeObjRefPool = -1; -static void initRefPool() { exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo); } static void cleanupRefPool() { int32_t ref = atomic_val_compare_exchange_32(&exchangeObjRefPool, exchangeObjRefPool, 0); taosCloseRef(ref); } +static void initRefPool() { + exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo); + atexit(cleanupRefPool); +} + static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) { ASSERT(pOperator != NULL); if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { @@ -448,7 +452,6 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; taosThreadOnce(&initPoolOnce, initRefPool); - atexit(cleanupRefPool); qDebug("start to create subplan task, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 3bc00b79b1aea8bd8dc6d586ed8598b3b5672e4a..fb122b077fc21477b18c5db83b28d0caa294e605 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -593,8 +593,11 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf int32_t pageId = 0; pPage = getNewBufPage(pInfo->pBuf, &pageId); - taosArrayPush(p->pPageList, &pageId); + if (pPage == NULL) { + return pPage; + } + taosArrayPush(p->pPageList, &pageId); *(int32_t*)pPage = 0; } else { int32_t* curId = taosArrayGetLast(p->pPageList); @@ -612,6 +615,11 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf // add a new page for current group int32_t pageId = 0; pPage = getNewBufPage(pInfo->pBuf, &pageId); + if (pPage == NULL) { + qError("failed to get new buffer, code:%s", tstrerror(terrno)); + return NULL; + } + taosArrayPush(p->pPageList, &pageId); memset(pPage, 0, getBufPageSize(pInfo->pBuf)); } diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index ddcaaf2c72778d1a8488ee353722bef2e337fb42..2b78f265fb4d6f2a1e41350f2f649582f9b84213 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -66,7 +66,7 @@ typedef struct SSysTableScanInfo { int64_t numOfBlocks; // extract basic running information. SLoadRemoteDataInfo loadInfo; - int32_t tbnameSlotId; + int32_t tbnameSlotId; } SSysTableScanInfo; typedef struct { @@ -81,10 +81,10 @@ typedef struct MergeIndex { } MergeIndex; typedef struct SBlockDistInfo { - SSDataBlock* pResBlock; - STsdbReader* pHandle; - SReadHandle readHandle; - uint64_t uid; // table uid + SSDataBlock* pResBlock; + STsdbReader* pHandle; + SReadHandle readHandle; + uint64_t uid; // table uid } SBlockDistInfo; static int32_t sysChkFilter__Comm(SNode* pNode); @@ -129,20 +129,20 @@ static char* SYSTABLE_IDX_COLUMN[] = {"table_name", "db_name", "create_time" static char* SYSTABLE_SPECIAL_COL[] = {"db_name", "vgroup_id"}; -static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity); -static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName); -static void destroySysScanOperator(void* param); -static int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code); -static SSDataBlock* doFilterResult(SSDataBlock* pDataBlock, SFilterInfo* pFilterInfo); +static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity); +static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName); +static void destroySysScanOperator(void* param); +static int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code); +static SSDataBlock* doFilterResult(SSDataBlock* pDataBlock, SFilterInfo* pFilterInfo); static __optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse); static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, SMetaReader* smrSuperTable, SMetaReader* smrChildTable, const char* dbname, const char* tableName, int32_t* pNumOfRows, const SSDataBlock* dataBlock); -static int32_t sysTableUserColsFillOneTableCols(const SSysTableScanInfo* pInfo, const char* dbname, - int32_t* pNumOfRows, const SSDataBlock* dataBlock, - char* tName, SSchemaWrapper* schemaRow, char* tableType); +static int32_t sysTableUserColsFillOneTableCols(const SSysTableScanInfo* pInfo, const char* dbname, int32_t* pNumOfRows, + const SSDataBlock* dataBlock, char* tName, SSchemaWrapper* schemaRow, + char* tableType); static void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfRows, SSDataBlock* dataBlock, SFilterInfo* pFilterInfo); @@ -204,11 +204,11 @@ int32_t sysFilte__TableName(void* arg, SNode* pNode, SArray* result) { if (func == NULL) return -1; SMetaFltParam param = {.suid = 0, - .cid = 0, - .type = TSDB_DATA_TYPE_VARCHAR, - .val = pVal->datum.p, - .reverse = reverse, - .filterFunc = func}; + .cid = 0, + .type = TSDB_DATA_TYPE_VARCHAR, + .val = pVal->datum.p, + .reverse = reverse, + .filterFunc = func}; return -1; } @@ -223,11 +223,11 @@ int32_t sysFilte__CreateTime(void* arg, SNode* pNode, SArray* result) { if (func == NULL) return -1; SMetaFltParam param = {.suid = 0, - .cid = 0, - .type = TSDB_DATA_TYPE_BIGINT, - .val = &pVal->datum.i, - .reverse = reverse, - .filterFunc = func}; + .cid = 0, + .type = TSDB_DATA_TYPE_BIGINT, + .val = &pVal->datum.i, + .reverse = reverse, + .filterFunc = func}; int32_t ret = metaFilterCreateTime(pMeta, ¶m, result); return ret; @@ -355,9 +355,9 @@ static int32_t optSysMergeRslt(SArray* mRslt, SArray* rslt); static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableScanInfo* pInfo, const char* name, SExecTaskInfo* pTaskInfo); void extractTbnameSlotId(SSysTableScanInfo* pInfo, const SScanPhysiNode* pScanNode); -static SSDataBlock* sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo, - const char* name, SSDataBlock* pBlock); -__optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse) { +static SSDataBlock* sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo, const char* name, + SSDataBlock* pBlock); +__optSysFilter optSysGetFilterFunc(int32_t ctype, bool* reverse) { if (ctype == OP_TYPE_LOWER_EQUAL || ctype == OP_TYPE_LOWER_THAN) { *reverse = true; } @@ -479,13 +479,13 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { } } - char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; - SSchemaWrapper *schemaRow = NULL; - if(smrTable.me.type == TSDB_SUPER_TABLE){ - schemaRow = &smrTable.me.stbEntry.schemaRow; + char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; + SSchemaWrapper* schemaRow = NULL; + if (smrTable.me.type == TSDB_SUPER_TABLE) { + schemaRow = &smrTable.me.stbEntry.schemaRow; STR_TO_VARSTR(typeName, "CHILD_TABLE"); - }else if(smrTable.me.type == TSDB_NORMAL_TABLE){ - schemaRow = &smrTable.me.ntbEntry.schemaRow; + } else if (smrTable.me.type == TSDB_NORMAL_TABLE) { + schemaRow = &smrTable.me.ntbEntry.schemaRow; STR_TO_VARSTR(typeName, "NORMAL_TABLE"); } @@ -507,50 +507,50 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta); } - SHashObj *stableSchema = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + SHashObj* stableSchema = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); taosHashSetFreeFp(stableSchema, tDeleteSSchemaWrapperForHash); while ((ret = metaTbCursorNext(pInfo->pCur, TSDB_TABLE_MAX)) == 0) { char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; - SSchemaWrapper *schemaRow = NULL; + SSchemaWrapper* schemaRow = NULL; - if(pInfo->pCur->mr.me.type == TSDB_SUPER_TABLE){ + if (pInfo->pCur->mr.me.type == TSDB_SUPER_TABLE) { qDebug("sysTableScanUserCols cursor get super table"); - void *schema = taosHashGet(stableSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t)); - if(schema == NULL){ - SSchemaWrapper *schemaWrapper = tCloneSSchemaWrapper(&pInfo->pCur->mr.me.stbEntry.schemaRow); + void* schema = taosHashGet(stableSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t)); + if (schema == NULL) { + SSchemaWrapper* schemaWrapper = tCloneSSchemaWrapper(&pInfo->pCur->mr.me.stbEntry.schemaRow); taosHashPut(stableSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES); } continue; - }else if (pInfo->pCur->mr.me.type == TSDB_CHILD_TABLE) { + } else if (pInfo->pCur->mr.me.type == TSDB_CHILD_TABLE) { qDebug("sysTableScanUserCols cursor get child table"); STR_TO_VARSTR(typeName, "CHILD_TABLE"); STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name); int64_t suid = pInfo->pCur->mr.me.ctbEntry.suid; - void *schema = taosHashGet(stableSchema, &pInfo->pCur->mr.me.ctbEntry.suid, sizeof(int64_t)); - if(schema != NULL){ - schemaRow = *(SSchemaWrapper **)schema; - }else{ + void* schema = taosHashGet(stableSchema, &pInfo->pCur->mr.me.ctbEntry.suid, sizeof(int64_t)); + if (schema != NULL) { + schemaRow = *(SSchemaWrapper**)schema; + } else { tDecoderClear(&pInfo->pCur->mr.coder); int code = metaGetTableEntryByUid(&pInfo->pCur->mr, suid); if (code != TSDB_CODE_SUCCESS) { // terrno has been set by metaGetTableEntryByName, therefore, return directly - qError("sysTableScanUserCols get meta by suid:%"PRId64 " error, code:%d", suid, code); + qError("sysTableScanUserCols get meta by suid:%" PRId64 " error, code:%d", suid, code); blockDataDestroy(dataBlock); pInfo->loadInfo.totalRows = 0; taosHashCleanup(stableSchema); return NULL; } - schemaRow = &pInfo->pCur->mr.me.stbEntry.schemaRow; + schemaRow = &pInfo->pCur->mr.me.stbEntry.schemaRow; } - }else if(pInfo->pCur->mr.me.type == TSDB_NORMAL_TABLE){ + } else if (pInfo->pCur->mr.me.type == TSDB_NORMAL_TABLE) { qDebug("sysTableScanUserCols cursor get normal table"); - schemaRow = &pInfo->pCur->mr.me.ntbEntry.schemaRow; + schemaRow = &pInfo->pCur->mr.me.ntbEntry.schemaRow; STR_TO_VARSTR(typeName, "NORMAL_TABLE"); STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name); - }else{ + } else { qDebug("sysTableScanUserCols cursor get invalid table"); continue; } @@ -665,6 +665,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta); } + bool blockFull = false; while ((ret = metaTbCursorNext(pInfo->pCur, TSDB_SUPER_TABLE)) == 0) { if (pInfo->pCur->mr.me.type != TSDB_CHILD_TABLE) { continue; @@ -686,17 +687,25 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, terrno); } - sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &pInfo->pCur->mr, dbname, tableName, &numOfRows, dataBlock); + if ((smrSuperTable.me.stbEntry.schemaTag.nCols + numOfRows) > pOperator->resultInfo.capacity) { + metaTbCursorPrev(pInfo->pCur); + blockFull = true; + } else { + sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &pInfo->pCur->mr, dbname, tableName, &numOfRows, + dataBlock); + } metaReaderClear(&smrSuperTable); - if (numOfRows >= pOperator->resultInfo.capacity) { + if (blockFull || numOfRows >= pOperator->resultInfo.capacity) { relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo); numOfRows = 0; if (pInfo->pRes->info.rows > 0) { break; } + + blockFull = false; } } @@ -902,10 +911,10 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, return TSDB_CODE_SUCCESS; } -static int32_t sysTableUserColsFillOneTableCols(const SSysTableScanInfo* pInfo, const char* dbname, - int32_t* pNumOfRows, const SSDataBlock* dataBlock, char* tName, - SSchemaWrapper* schemaRow, char* tableType) { - if(schemaRow == NULL){ +static int32_t sysTableUserColsFillOneTableCols(const SSysTableScanInfo* pInfo, const char* dbname, int32_t* pNumOfRows, + const SSDataBlock* dataBlock, char* tName, SSchemaWrapper* schemaRow, + char* tableType) { + if (schemaRow == NULL) { qError("sysTableUserColsFillOneTableCols schemaRow is NULL"); return TSDB_CODE_SUCCESS; } @@ -941,9 +950,8 @@ static int32_t sysTableUserColsFillOneTableCols(const SSysTableScanInfo* pInfo, colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)", (int32_t)(schemaRow->pSchema[i].bytes - VARSTR_HEADER_SIZE)); } else if (colType == TSDB_DATA_TYPE_NCHAR) { - colTypeLen += sprintf( - varDataVal(colTypeStr) + colTypeLen, "(%d)", - (int32_t)((schemaRow->pSchema[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)); + colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)", + (int32_t)((schemaRow->pSchema[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)); } varDataSetLen(colTypeStr, colTypeLen); colDataAppend(pColInfoData, numOfRows, (char*)colTypeStr, false); @@ -1550,9 +1558,9 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { if (pInfo->showRewrite) { getDBNameFromCondition(pInfo->pCondition, dbName); sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName); - }else if(strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0){ + } else if (strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0) { getDBNameFromCondition(pInfo->pCondition, dbName); - if(dbName[0]) sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName); + if (dbName[0]) sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName); sysTableIsCondOnOneTable(pInfo->pCondition, pInfo->req.filterTb); } @@ -1573,12 +1581,12 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { return sysTableScanFillTbName(pOperator, pInfo, name, pBlock); } -static SSDataBlock* sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo, - const char* name, SSDataBlock* pBlock) { +static SSDataBlock* sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScanInfo* pInfo, const char* name, + SSDataBlock* pBlock) { if (pBlock != NULL) { if (pInfo->tbnameSlotId != -1) { SColumnInfoData* pColumnInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, pInfo->tbnameSlotId); - char varTbName[TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE] = {0}; + char varTbName[TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE] = {0}; memcpy(varDataVal(varTbName), name, strlen(name)); varDataSetLen(varTbName, strlen(name)); for (int i = 0; i < pBlock->info.rows; ++i) { @@ -1669,7 +1677,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo) { - int32_t code = TDB_CODE_SUCCESS; + int32_t code = TDB_CODE_SUCCESS; SSysTableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SSysTableScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -1717,10 +1725,11 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan setOperatorInfo(pOperator, "SysTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, optrDefaultBufFn, NULL); + pOperator->fpSet = + createOperatorFpSet(optrDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, optrDefaultBufFn, NULL); return pOperator; - _error: +_error: if (pInfo != NULL) { destroySysScanOperator(pInfo); } @@ -1757,7 +1766,7 @@ void destroySysScanOperator(void* param) { const char* name = tNameGetTableName(&pInfo->name); if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 || strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0 || - strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0|| pInfo->pCur != NULL) { + strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0 || pInfo->pCur != NULL) { metaCloseTbCursor(pInfo->pCur); pInfo->pCur = NULL; } @@ -2165,7 +2174,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { // make the valgrind happy that all memory buffer has been initialized already. if (slotId != 0) { SColumnInfoData* p1 = taosArrayGet(pBlock->pDataBlock, 0); - int64_t v = 0; + int64_t v = 0; colDataAppendInt64(p1, 0, &v); } @@ -2175,10 +2184,10 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { } static void destroyBlockDistScanOperatorInfo(void* param) { - SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param; - blockDataDestroy(pDistInfo->pResBlock); - tsdbReaderClose(pDistInfo->pHandle); - taosMemoryFreeClear(param); + SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param; + blockDataDestroy(pDistInfo->pResBlock); + tsdbReaderClose(pDistInfo->pHandle); + taosMemoryFreeClear(param); } static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) { @@ -2250,8 +2259,8 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = - createOperatorFpSet(optrDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, optrDefaultBufFn, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, + optrDefaultBufFn, NULL); return pOperator; _error: diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 224537c1f110f9d3d629320d031baf1dc6e7d108..0c491addd58a48823514ef21f6e51969827a4270 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3053,14 +3053,12 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, if (pHandle->currentPage == -1) { pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage); if (pPage == NULL) { - terrno = TSDB_CODE_NO_AVAIL_DISK; return terrno; } pPage->num = sizeof(SFilePage); } else { pPage = getBufPage(pHandle->pBuf, pHandle->currentPage); if (pPage == NULL) { - terrno = TSDB_CODE_NO_AVAIL_DISK; return terrno; } if (pPage->num + length > getBufPageSize(pHandle->pBuf)) { @@ -3068,7 +3066,6 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, releaseBufPage(pHandle->pBuf, pPage); pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage); if (pPage == NULL) { - terrno = TSDB_CODE_NO_AVAIL_DISK; return terrno; } pPage->num = sizeof(SFilePage); @@ -3115,7 +3112,6 @@ static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf if (pHandle->pBuf != NULL) { SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId); if (pPage == NULL) { - terrno = TSDB_CODE_NO_AVAIL_DISK; return terrno; } memcpy(pPage->data + pPos->offset, pBuf, length); diff --git a/source/libs/function/src/tpercentile.c b/source/libs/function/src/tpercentile.c index 6577858ee693257b4ddf02f3ca1ab1d1234e40a6..97fe94b513f86f2a20f4c4e42b965c67840cc043 100644 --- a/source/libs/function/src/tpercentile.c +++ b/source/libs/function/src/tpercentile.c @@ -50,8 +50,8 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx) if (pg == NULL) { return NULL; } - memcpy(buffer->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes)); + memcpy(buffer->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes)); offset += (int32_t)(pg->num * pMemBucket->bytes); } @@ -116,7 +116,7 @@ int32_t findOnlyResult(tMemBucket *pMemBucket, double *result) { int32_t *pageId = taosArrayGet(list, 0); SFilePage *pPage = getBufPage(pMemBucket->pBuffer, *pageId); if (pPage == NULL) { - return TSDB_CODE_NO_AVAIL_DISK; + return terrno; } ASSERT(pPage->num == 1); @@ -283,7 +283,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, return NULL; } - int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, "1", tsTempDir); + int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 1024, "1", tsTempDir); if (ret != 0) { tMemBucketDestroy(pBucket); return NULL; @@ -395,7 +395,7 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) { pSlot->info.data = getNewBufPage(pBucket->pBuffer, &pageId); if (pSlot->info.data == NULL) { - return TSDB_CODE_NO_AVAIL_DISK; + return terrno; } pSlot->info.pageId = pageId; taosArrayPush(pPageIdList, &pageId); @@ -489,8 +489,9 @@ int32_t getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction // data in buffer and file are merged together to be processed. SFilePage *buffer = loadDataFromFilePage(pMemBucket, i); if (buffer == NULL) { - return TSDB_CODE_NO_AVAIL_DISK; + return terrno; } + int32_t currentIdx = count - num; char *thisVal = buffer->data + pMemBucket->bytes * currentIdx; @@ -536,7 +537,7 @@ int32_t getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction int32_t *pageId = taosArrayGet(list, f); SFilePage *pg = getBufPage(pMemBucket->pBuffer, *pageId); if (pg == NULL) { - return TSDB_CODE_NO_AVAIL_DISK; + return terrno; } int32_t code = tMemBucketPut(pMemBucket, pg->data, (int32_t)pg->num); diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 66b8e48eb1a41d5669e008d3296359f208e0a19f..787ef7501da7e467e67975d20563ad7f875ecced 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -228,9 +228,14 @@ typedef struct SQWorkerMgmt { case QW_PHASE_POST_FETCH: \ ctx->inFetch = 0; \ break; \ - default: \ + case QW_PHASE_PRE_QUERY: \ + case QW_PHASE_POST_QUERY: \ + case QW_PHASE_PRE_CQUERY: \ + case QW_PHASE_POST_CQUERY: \ atomic_store_8(&(ctx)->phase, _value); \ break; \ + default: \ + break; \ } \ } while (0) diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 2f4e8001df1aeca484b6898f1ae6a94d82af44c1..2f712e6eba50a6dd3e8c4768397eaf57ec505bff 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -550,7 +550,9 @@ _return: if (ctx) { QW_UPDATE_RSP_CODE(ctx, code); - QW_SET_PHASE(ctx, phase); + if (QW_PHASE_POST_CQUERY != phase) { + QW_SET_PHASE(ctx, phase); + } QW_UNLOCK(QW_WRITE, &ctx->lock); qwReleaseTaskCtx(mgmt, ctx); @@ -757,7 +759,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_LOCK(QW_WRITE, &ctx->lock); if (qComplete || (queryStop && (0 == atomic_load_8((int8_t *)&ctx->queryContinue))) || code) { // Note: query is not running anymore - QW_SET_PHASE(ctx, 0); + QW_SET_PHASE(ctx, QW_PHASE_POST_CQUERY); QW_UNLOCK(QW_WRITE, &ctx->lock); break; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 6f77769decdf2649b66e5cac0c8bbf823cce85e1..2f991288ffd0201be79ed3392befcd5da669294e 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -207,6 +207,7 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { if (ppTask) { SStreamTask* pTask = *ppTask; taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); + tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn); /*if (pTask->timer) { * taosTmrStop(pTask->timer);*/ /*pTask->timer = NULL;*/ diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index e77a8d4be333e1267aafb94c7da71e5a9651b385..948d2f53e7510d52a574c7c977615178fd080c3d 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -357,7 +357,7 @@ int32_t syncNodeOnAppendEntriesOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ASSERT(pAppendEntry->index == appendIndex); // append - code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); + code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry, false); if (code != 0) { char logBuf[128]; snprintf(logBuf, sizeof(logBuf), "ignore, append error, append-index:%" PRId64, appendIndex); @@ -398,7 +398,7 @@ int32_t syncNodeOnAppendEntriesOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) { } // append - code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); + code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry, false); if (code != 0) { char logBuf[128]; snprintf(logBuf, sizeof(logBuf), "ignore, log not exist, append error, append-index:%" PRId64, appendIndex); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index ac377911eb25510d0a1f8a0dd3453dc1c2b21ff8..11067d43aa13f928006e5f3ebd67c9dda3cb452e 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2488,7 +2488,7 @@ static int32_t syncNodeAppendNoopOld(SSyncNode* ths) { LRUHandle* h = NULL; if (ths->state == TAOS_SYNC_STATE_LEADER) { - int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); + int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false); if (code != 0) { sError("append noop error"); return -1; @@ -2731,7 +2731,7 @@ int32_t syncNodeOnClientRequestOld(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRe if (ths->state == TAOS_SYNC_STATE_LEADER) { // append entry - code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); + code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false); if (code != 0) { if (ths->replicaNum == 1) { if (h) { diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index b61fc2e90dc8cadd8265346a0bbe38e7a533238c..1d2b0b57fed915ff5a2731f936b91a72140c6142 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -364,7 +364,11 @@ _out: return ret; } -int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { +static inline bool syncLogStoreNeedFlush(SSyncRaftEntry* pEntry, int32_t replicaNum) { + return (replicaNum > 1) && (pEntry->originalRpcType == TDMT_VND_COMMIT); +} + +int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaftEntry* pEntry) { ASSERT(pEntry->index >= 0); SyncIndex lastVer = pLogStore->syncLogLastIndex(pLogStore); if (lastVer >= pEntry->index && pLogStore->syncLogTruncate(pLogStore, pEntry->index) < 0) { @@ -374,7 +378,8 @@ int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { lastVer = pLogStore->syncLogLastIndex(pLogStore); ASSERT(pEntry->index == lastVer + 1); - if (pLogStore->syncLogAppendEntry(pLogStore, pEntry) < 0) { + bool doFsync = syncLogStoreNeedFlush(pEntry, pNode->replicaNum); + if (pLogStore->syncLogAppendEntry(pLogStore, pEntry, doFsync) < 0) { sError("failed to append sync log entry since %s. index:%" PRId64 ", term:%" PRId64 "", terrstr(), pEntry->index, pEntry->term); return -1; @@ -436,7 +441,7 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p (void)syncNodeReplicateWithoutLock(pNode); // persist - if (syncLogStorePersist(pLogStore, pEntry) < 0) { + if (syncLogStorePersist(pLogStore, pNode, pEntry) < 0) { sError("vgId:%d, failed to persist sync log entry from buffer since %s. index:%" PRId64, pNode->vgId, terrstr(), pEntry->index); goto _out; diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index ca6d3c314fbeb63fe2eafa3b93bcdc08f936da3e..e6569d99741f762c593c557751e800f11b6c150e 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -23,7 +23,7 @@ // public function static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex); -static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); +static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, bool forceSync); static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex); static bool raftLogExist(struct SSyncLogStore* pLogStore, SyncIndex index); static int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index); @@ -192,9 +192,7 @@ SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) { return SYNC_TERM_INVALID; } -static inline bool raftLogForceSync(SSyncRaftEntry* pEntry) { return (pEntry->originalRpcType == TDMT_VND_COMMIT); } - -static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { +static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, bool forceSync) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; @@ -221,7 +219,6 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr ASSERT(pEntry->index == index); - bool forceSync = raftLogForceSync(pEntry); walFsync(pWal, forceSync); sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index, diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index b915e2964fce236a70108887a78fb71288266535..9ba8468438246bfc1d88191cfb0b0f2d45c6a002 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -840,7 +840,11 @@ int32_t taosGetSystemUUID(char *uid, int32_t uidlen) { uuid_generate(uuid); // it's caller's responsibility to make enough space for `uid`, that's 36-char + 1-null uuid_unparse_lower(uuid, buf); - memcpy(uid, buf, uidlen); + int n = snprintf(uid, uidlen, "%.*s", (int)sizeof(buf), buf); // though less performance, much safer + if (n >= uidlen) { + // target buffer is too small + return -1; + } return 0; #else int len = 0; diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 6920925e5f82b16ba6d213ffbe03d6fa40d21d06..c083ce2f7f577349eea7ced1bcb325f8ecacdf99 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -20,7 +20,10 @@ // todo refactor API SArray* taosArrayInit(size_t size, size_t elemSize) { - assert(elemSize > 0); + if (elemSize == 0) { + terrno = TSDB_CODE_INVALID_PARA; + return NULL; + } if (size < TARRAY_MIN_SIZE) { size = TARRAY_MIN_SIZE; @@ -96,8 +99,6 @@ void* taosArrayAddBatch(SArray* pArray, const void* pData, int32_t nEles) { } void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*)) { - assert(pArray); - size_t size = pArray->size; if (size <= 1) { return; @@ -136,8 +137,6 @@ void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp) } void taosArrayRemoveDuplicateP(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*)) { - assert(pArray); - size_t size = pArray->size; if (size <= 1) { return; @@ -197,11 +196,10 @@ void* taosArrayReserve(SArray* pArray, int32_t num) { } void* taosArrayPop(SArray* pArray) { - assert(pArray != NULL); - if (pArray->size == 0) { return NULL; } + pArray->size -= 1; return TARRAY_GET_ELEM(pArray, pArray->size); } @@ -210,16 +208,21 @@ void* taosArrayGet(const SArray* pArray, size_t index) { if (NULL == pArray) { return NULL; } - assert(index < pArray->size); + + if (index >= pArray->size) { + uError("index is out of range, current:%"PRIzu" max:%d", index, pArray->capacity); + return NULL; + } + return TARRAY_GET_ELEM(pArray, index); } void* taosArrayGetP(const SArray* pArray, size_t index) { - assert(index < pArray->size); - - void* d = TARRAY_GET_ELEM(pArray, index); - - return *(void**)d; + void** p = taosArrayGet(pArray, index); + if (p == NULL) { + return NULL; + } + return *p; } void* taosArrayGetLast(const SArray* pArray) { return TARRAY_GET_ELEM(pArray, pArray->size - 1); } @@ -312,9 +315,12 @@ void taosArrayRemoveBatch(SArray* pArray, size_t index, size_t num, FDelete fp) } SArray* taosArrayFromList(const void* src, size_t size, size_t elemSize) { - assert(src != NULL && elemSize > 0); - SArray* pDst = taosArrayInit(size, elemSize); + if (elemSize <= 0) { + terrno = TSDB_CODE_INVALID_PARA; + return NULL; + } + SArray* pDst = taosArrayInit(size, elemSize); memcpy(pDst->pData, src, elemSize * size); pDst->size = size; @@ -322,8 +328,6 @@ SArray* taosArrayFromList(const void* src, size_t size, size_t elemSize) { } SArray* taosArrayDup(const SArray* pSrc, __array_item_dup_fn_t fn) { - assert(pSrc != NULL); - if (pSrc->size == 0) { // empty array list return taosArrayInit(8, pSrc->elemSize); } @@ -415,14 +419,10 @@ void taosArrayDestroyEx(SArray* pArray, FDelete fp) { } void taosArraySort(SArray* pArray, __compar_fn_t compar) { - ASSERT(pArray != NULL && compar != NULL); taosSort(pArray->pData, pArray->size, pArray->elemSize, compar); } void* taosArraySearch(const SArray* pArray, const void* key, __compar_fn_t comparFn, int32_t flags) { - assert(pArray != NULL && comparFn != NULL); - assert(key != NULL); - return taosbsearch(key, pArray->pData, pArray->size, pArray->elemSize, comparFn, flags); } diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 34ad9ae6bc39753c897d932993c917848dba64ee..62f074db5b9e28ffe71ca7ec3f4506b619ad20b1 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -897,6 +897,7 @@ void taosLogCrashInfo(char* nodeType, char* pMsg, int64_t msgLen, int signum, vo pFile = taosOpenFile(filepath, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); if (pFile == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); taosPrintLog(flags, level, dflag, "failed to open file:%s since %s", filepath, terrstr()); goto _return; } diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index 87b44b2d1337ab157e4499f5165abf0868069bc5..7c60862c56b8544dc815723aa5427dc604a0fff5 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -5,7 +5,10 @@ #include "thash.h" #include "tlog.h" -#define GET_DATA_PAYLOAD(_p) ((char*)(_p)->pData + POINTER_BYTES) +#define GET_PAYLOAD_DATA(_p) ((char*)(_p)->pData + POINTER_BYTES) +#define BUF_PAGE_IN_MEM(_p) ((_p)->pData != NULL) +#define CLEAR_BUF_PAGE_IN_MEM_FLAG(_p) ((_p)->pData = NULL) +#define HAS_DATA_IN_DISK(_p) ((_p)->offset >= 0) #define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages) typedef struct SPageDiskInfo { @@ -14,7 +17,7 @@ typedef struct SPageDiskInfo { } SPageDiskInfo, SFreeListItem; struct SPageInfo { - SListNode* pn; // point to list node struct + SListNode* pn; // point to list node struct. it is NULL when the page is evicted from the in-memory buffer void* pData; int64_t offset; int32_t pageId; @@ -89,7 +92,7 @@ static char* doDecompressData(void* data, int32_t srcSize, int32_t* dst, SDiskba return data; } -static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) { +static uint64_t allocateNewPositionInFile(SDiskbasedBuf* pBuf, size_t size) { if (pBuf->pFree == NULL) { return pBuf->nextPos; } else { @@ -112,10 +115,6 @@ static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) { } } -static void setPageNotInBuf(SPageInfo* pPageInfo) { pPageInfo->pData = NULL; } - -static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize + POINTER_BYTES + sizeof(SFilePage); } - /** * +--------------------------+-------------------+--------------+ * | PTR to SPageInfo (8bytes)| Payload (PageSize)| 2 Extra Bytes| @@ -124,23 +123,31 @@ static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize * @param pg * @return */ -static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { - ASSERT(!pg->used && pg->pData != NULL); + +static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize + POINTER_BYTES + sizeof(SFilePage); } + +static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) { + if (pg->pData == NULL || pg->used) { + uError("invalid params in paged buffer process when flushing buf to disk, %s", pBuf->id); + terrno = TSDB_CODE_INVALID_PARA; + return NULL; + } int32_t size = pBuf->pageSize; char* t = NULL; - if (pg->offset == -1 || pg->dirty) { - void* payload = GET_DATA_PAYLOAD(pg); + if ((!HAS_DATA_IN_DISK(pg)) || pg->dirty) { + void* payload = GET_PAYLOAD_DATA(pg); t = doCompressData(payload, pBuf->pageSize, &size, pBuf); - ASSERTS(size >= 0, "size is negative"); + if (size < 0) { + uError("failed to compress data when flushing data to disk, %s", pBuf->id); + return NULL; + } } // this page is flushed to disk for the first time if (pg->dirty) { - if (pg->offset == -1) { - ASSERTS(pg->dirty == true, "pg->dirty is false"); - - pg->offset = allocatePositionInFile(pBuf, size); + if (!HAS_DATA_IN_DISK(pg)) { + pg->offset = allocateNewPositionInFile(pBuf, size); pBuf->nextPos += size; int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET); @@ -155,6 +162,7 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { return NULL; } + // extend the file size if (pBuf->fileSize < pg->offset + size) { pBuf->fileSize = pg->offset + size; } @@ -169,7 +177,7 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { taosArrayPush(pBuf->pFree, &dinfo); // 2. allocate new position, and update the info - pg->offset = allocatePositionInFile(pBuf, size); + pg->offset = allocateNewPositionInFile(pBuf, size); pBuf->nextPos += size; } @@ -197,20 +205,19 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { size = pg->length; } - ASSERT(size > 0 || (pg->offset == -1 && pg->length == -1)); - char* pDataBuf = pg->pData; memset(pDataBuf, 0, getAllocPageSize(pBuf->pageSize)); + #ifdef BUF_PAGE_DEBUG uDebug("page_flush %p, pageId:%d, offset:%d", pDataBuf, pg->pageId, pg->offset); #endif + pg->length = size; // on disk size return pDataBuf; } -static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { +static char* flushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) { int32_t ret = TSDB_CODE_SUCCESS; - ASSERT(((int64_t)pBuf->numOfPages * pBuf->pageSize) == pBuf->totalBufSize && pBuf->numOfPages >= pBuf->inMemPages); if (pBuf->pFile == NULL) { if ((ret = createDiskFile(pBuf)) != TSDB_CODE_SUCCESS) { @@ -219,22 +226,27 @@ static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { } } - char* p = doFlushPageToDisk(pBuf, pg); - setPageNotInBuf(pg); - pg->dirty = false; + char* p = doFlushBufPage(pBuf, pg); + CLEAR_BUF_PAGE_IN_MEM_FLAG(pg); + pg->dirty = false; return p; } // load file block data in disk static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { + if (pg->offset < 0 || pg->length <= 0) { + uError("failed to load buf page from disk, offset:%"PRId64", length:%d, %s", pg->offset, pg->length, pBuf->id); + return TSDB_CODE_INVALID_PARA; + } + int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET); if (ret == -1) { ret = TAOS_SYSTEM_ERROR(errno); return ret; } - void* pPage = (void*)GET_DATA_PAYLOAD(pg); + void* pPage = (void*)GET_PAYLOAD_DATA(pg); ret = (int32_t)taosReadFile(pBuf->pFile, pPage, pg->length); if (ret != pg->length) { ret = TAOS_SYSTEM_ERROR(errno); @@ -249,10 +261,14 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { return 0; } -static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t pageId) { +static SPageInfo* registerNewPageInfo(SDiskbasedBuf* pBuf, int32_t pageId) { pBuf->numOfPages += 1; SPageInfo* ppi = taosMemoryMalloc(sizeof(SPageInfo)); + if (ppi == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } ppi->pageId = pageId; ppi->pData = NULL; @@ -272,46 +288,33 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) { SListNode* pn = NULL; while ((pn = tdListNext(&iter)) != NULL) { SPageInfo* pageInfo = *(SPageInfo**)pn->data; - ASSERT(pageInfo->pageId >= 0 && pageInfo->pn == pn); + + SPageInfo* p = *(SPageInfo**)(pageInfo->pData); + ASSERT(pageInfo->pageId >= 0 && pageInfo->pn == pn && p == pageInfo); if (!pageInfo->used) { - // printf("%d is chosen\n", pageInfo->pageId); break; - } else { - // printf("page %d is used, dirty:%d\n", pageInfo->pageId, pageInfo->dirty); } } return pn; } -static char* evacOneDataPage(SDiskbasedBuf* pBuf) { - char* bufPage = NULL; +static char* evictBufPage(SDiskbasedBuf* pBuf) { SListNode* pn = getEldestUnrefedPage(pBuf); - terrno = 0; - - // all pages are referenced by user, try to allocate new space - if (pn == NULL) { - int32_t prev = pBuf->inMemPages; - - // increase by 50% of previous mem pages - pBuf->inMemPages = (int32_t)(pBuf->inMemPages * 1.5f); - - // qWarn("%p in memory buf page not sufficient, expand from %d to %d, page size:%d", pBuf, prev, - // pBuf->inMemPages, pBuf->pageSize); - } else { - tdListPopNode(pBuf->lruList, pn); + if (pn == NULL) { // no available buffer pages now, return. + return NULL; + } - SPageInfo* d = *(SPageInfo**)pn->data; - ASSERTS(d->pn == pn, "d->pn not equal pn"); + terrno = 0; + tdListPopNode(pBuf->lruList, pn); - d->pn = NULL; - taosMemoryFreeClear(pn); + SPageInfo* d = *(SPageInfo**)pn->data; - bufPage = flushPageToDisk(pBuf, d); - } + d->pn = NULL; + taosMemoryFreeClear(pn); - return bufPage; + return flushBufPage(pBuf, d); } static void lruListPushFront(SList* pList, SPageInfo* pi) { @@ -338,13 +341,12 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem SDiskbasedBuf* pPBuf = *pBuf; if (pPBuf == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + goto _error; } pPBuf->pageSize = pagesize; pPBuf->numOfPages = 0; // all pages are in buffer in the first place pPBuf->totalBufSize = 0; - pPBuf->inMemPages = inMemBufSize / pagesize; // maximum allowed pages, it is a soft limit. pPBuf->allocateId = -1; pPBuf->pFile = NULL; pPBuf->id = strdup(id); @@ -353,33 +355,69 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem pPBuf->freePgList = tdListNew(POINTER_BYTES); // at least more than 2 pages must be in memory - ASSERT(inMemBufSize >= pagesize * 2); + if (inMemBufSize < pagesize * 2) { + inMemBufSize = pagesize * 2; + } + pPBuf->inMemPages = inMemBufSize / pagesize; // maximum allowed pages, it is a soft limit. pPBuf->lruList = tdListNew(POINTER_BYTES); + if (pPBuf->lruList == NULL) { + goto _error; + } // init id hash table _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT); pPBuf->pIdList = taosArrayInit(4, POINTER_BYTES); + if (pPBuf->pIdList == NULL) { + goto _error; + } pPBuf->assistBuf = taosMemoryMalloc(pPBuf->pageSize + 2); // EXTRA BYTES + if (pPBuf->assistBuf == NULL) { + goto _error; + } + pPBuf->all = taosHashInit(10, fn, true, false); - pPBuf->prefix = (char*) dir; + if (pPBuf->all == NULL) { + goto _error; + } + pPBuf->prefix = (char*) dir; pPBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t)); // qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId, - // pPBuf->pageSize, - // pPBuf->inMemPages, pPBuf->path); + // pPBuf->pageSize, pPBuf->inMemPages, pPBuf->path); return TSDB_CODE_SUCCESS; + _error: + destroyDiskbasedBuf(pPBuf); + return TSDB_CODE_OUT_OF_MEMORY; +} + +static char* doExtractPage(SDiskbasedBuf* pBuf) { + char* availablePage = NULL; + if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) { + availablePage = evictBufPage(pBuf); + if (availablePage == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + uWarn("no available buf pages, current:%d, max:%d", listNEles(pBuf->lruList), pBuf->inMemPages) + } + } else { + availablePage = taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize)); // add extract bytes in case of zipped buffer increased. + if (availablePage == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + } + } + + return availablePage; } void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) { pBuf->statis.getPages += 1; - char* availablePage = NULL; - if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) { - availablePage = evacOneDataPage(pBuf); + char* availablePage = doExtractPage(pBuf); + if (availablePage == NULL) { + return NULL; } SPageInfo* pi = NULL; @@ -394,7 +432,10 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) { *pageId = (++pBuf->allocateId); // register page id info - pi = registerPage(pBuf, *pageId); + pi = registerNewPageInfo(pBuf, *pageId); + if (pi == NULL) { + return NULL; + } // add to hash map taosHashPut(pBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES); @@ -402,63 +443,62 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) { } // add to LRU list - ASSERT(listNEles(pBuf->lruList) < pBuf->inMemPages && pBuf->inMemPages > 0); lruListPushFront(pBuf->lruList, pi); - - // allocate buf - if (availablePage == NULL) { - pi->pData = - taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize)); // add extract bytes in case of zipped buffer increased. - } else { - pi->pData = availablePage; - } + pi->pData = availablePage; ((void**)pi->pData)[0] = pi; #ifdef BUF_PAGE_DEBUG uDebug("page_getNewBufPage , pi->pData:%p, pageId:%d, offset:%" PRId64, pi->pData, pi->pageId, pi->offset); #endif - return (void*)(GET_DATA_PAYLOAD(pi)); + + return (void*)(GET_PAYLOAD_DATA(pi)); } void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { - ASSERT(pBuf != NULL && id >= 0); + if (id < 0) { + terrno = TSDB_CODE_INVALID_PARA; + uError("invalid page id:%d, %s", id, pBuf->id); + return NULL; + } + pBuf->statis.getPages += 1; SPageInfo** pi = taosHashGet(pBuf->all, &id, sizeof(int32_t)); - ASSERT(pi != NULL && *pi != NULL); + if (pi == NULL || *pi == NULL) { + uError("failed to locate the buffer page:%d, %s", id, pBuf->id); + terrno = TSDB_CODE_INVALID_PARA; + return NULL; + } - if ((*pi)->pData != NULL) { // it is in memory + if (BUF_PAGE_IN_MEM(*pi)) { // it is in memory // no need to update the LRU list if only one page exists if (pBuf->numOfPages == 1) { (*pi)->used = true; - return (void*)(GET_DATA_PAYLOAD(*pi)); + return (void*)(GET_PAYLOAD_DATA(*pi)); } SPageInfo** pInfo = (SPageInfo**)((*pi)->pn->data); - ASSERT(*pInfo == *pi); + if (*pInfo != *pi) { + uError("inconsistently data in paged buffer, pInfo:%p, pi:%p, %s", *pInfo, *pi, pBuf->id); + return NULL; + } lruListMoveToFront(pBuf->lruList, (*pi)); (*pi)->used = true; + #ifdef BUF_PAGE_DEBUG uDebug("page_getBufPage1 pageId:%d, offset:%" PRId64, (*pi)->pageId, (*pi)->offset); #endif - return (void*)(GET_DATA_PAYLOAD(*pi)); + return (void*)(GET_PAYLOAD_DATA(*pi)); } else { // not in memory - ASSERT((*pi)->pData == NULL && (*pi)->pn == NULL && + ASSERT((!BUF_PAGE_IN_MEM(*pi)) && (*pi)->pn == NULL && (((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1))); - char* availablePage = NULL; - if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) { - availablePage = evacOneDataPage(pBuf); - if (availablePage == NULL) { - return NULL; - } - } + (*pi)->pData = doExtractPage(pBuf); - if (availablePage == NULL) { - (*pi)->pData = taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize)); - } else { - (*pi)->pData = availablePage; + // failed to evict buffer page, return with error code. + if ((*pi)->pData == NULL) { + return NULL; } // set the ptr to the new SPageInfo @@ -468,23 +508,25 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { (*pi)->used = true; // some data has been flushed to disk, and needs to be loaded into buffer again. - if ((*pi)->length > 0 && (*pi)->offset >= 0) { + if (HAS_DATA_IN_DISK(*pi)) { int32_t code = loadPageFromDisk(pBuf, *pi); if (code != 0) { + terrno = code; return NULL; } } #ifdef BUF_PAGE_DEBUG uDebug("page_getBufPage2 pageId:%d, offset:%" PRId64, (*pi)->pageId, (*pi)->offset); #endif - return (void*)(GET_DATA_PAYLOAD(*pi)); + return (void*)(GET_PAYLOAD_DATA(*pi)); } } void releaseBufPage(SDiskbasedBuf* pBuf, void* page) { - if (ASSERTS(pBuf != NULL && page != NULL, "pBuf or page is NULL")) { + if (page == NULL) { return; } + SPageInfo* ppi = getPageInfoFromPayload(page); releaseBufPageInfo(pBuf, ppi); } @@ -493,7 +535,13 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) { #ifdef BUF_PAGE_DEBUG uDebug("page_releaseBufPageInfo pageId:%d, used:%d, offset:%" PRId64, pi->pageId, pi->used, pi->offset); #endif - if (ASSERTS(pi->pData != NULL, "pi->pData is NULL")) { + + if (pi == NULL) { + return; + } + + if (pi->pData == NULL) { + uError("pi->pData (page data) is null"); return; } @@ -504,7 +552,6 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) { size_t getTotalBufSize(const SDiskbasedBuf* pBuf) { return (size_t)pBuf->totalBufSize; } SArray* getDataBufPagesIdList(SDiskbasedBuf* pBuf) { - ASSERT(pBuf != NULL); return pBuf->pIdList; } @@ -582,7 +629,6 @@ SPageInfo* getLastPageInfo(SArray* pList) { } int32_t getPageId(const SPageInfo* pPgInfo) { - ASSERT(pPgInfo != NULL); return pPgInfo->pageId; } diff --git a/tests/develop-test/2-query/show_create_db.py b/tests/develop-test/2-query/show_create_db.py new file mode 100644 index 0000000000000000000000000000000000000000..e5a79074ef2a1b9881f230b3eba2011edca517af --- /dev/null +++ b/tests/develop-test/2-query/show_create_db.py @@ -0,0 +1,82 @@ +import sys +from util.log import * +from util.cases import * +from util.sql import * +from util.dnodes import tdDnodes +from math import inf + +class TDTestCase: + def caseDescription(self): + ''' + case1: [TD-11204]Difference improvement that can ignore negative + ''' + return + + def init(self, conn, logSql, replicaVer=1): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), False) + self._conn = conn + + def restartTaosd(self, index=1, dbname="db"): + tdDnodes.stop(index) + tdDnodes.startWithoutSleep(index) + tdSql.execute(f"use scd") + + def run(self): + print("running {}".format(__file__)) + tdSql.execute("drop database if exists scd") + tdSql.execute("create database if not exists scd") + tdSql.execute('use scd') + tdSql.execute('create table stb1 (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, c6 float, c7 double, c8 binary(10), c9 nchar(10), c10 tinyint unsigned, c11 smallint unsigned, c12 int unsigned, c13 bigint unsigned) TAGS(t1 int, t2 binary(10), t3 double);') + + tdSql.execute("create table tb1 using stb1 tags(1,'1',1.0);") + + tdSql.execute("create table tb2 using stb1 tags(2,'2',2.0);") + + tdSql.execute("create table tb3 using stb1 tags(3,'3',3.0);") + + tdSql.execute('create database scd2 stt_trigger 3;') + + tdSql.execute('create database scd4 stt_trigger 13;') + + tdSql.query('show create database scd;') + tdSql.checkRows(1) + tdSql.checkData(0, 0, 'scd') + tdSql.checkData(0, 1, "CREATE DATABASE `scd` BUFFER 256 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 STT_TRIGGER 1 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0") + + tdSql.query('show create database scd2;') + tdSql.checkRows(1) + tdSql.checkData(0, 0, 'scd2') + tdSql.checkData(0, 1, "CREATE DATABASE `scd2` BUFFER 256 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 STT_TRIGGER 3 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0") + + tdSql.query('show create database scd4') + tdSql.checkRows(1) + tdSql.checkData(0, 0, 'scd4') + tdSql.checkData(0, 1, "CREATE DATABASE `scd4` BUFFER 256 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 STT_TRIGGER 13 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0") + + + self.restartTaosd(1, dbname='scd') + + tdSql.query('show create database scd;') + tdSql.checkRows(1) + tdSql.checkData(0, 0, 'scd') + tdSql.checkData(0, 1, "CREATE DATABASE `scd` BUFFER 256 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 STT_TRIGGER 1 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0") + + tdSql.query('show create database scd2;') + tdSql.checkRows(1) + tdSql.checkData(0, 0, 'scd2') + tdSql.checkData(0, 1, "CREATE DATABASE `scd2` BUFFER 256 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 STT_TRIGGER 3 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0") + + tdSql.query('show create database scd4') + tdSql.checkRows(1) + tdSql.checkData(0, 0, 'scd4') + tdSql.checkData(0, 1, "CREATE DATABASE `scd4` BUFFER 256 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 STT_TRIGGER 13 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0") + + + tdSql.execute('drop database scd') + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index c28d0745c4cafa192aa39b0ae96668c4e323f652..94d87c705cfaa07028ed64caf30ae668ad245cf3 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -1060,6 +1060,7 @@ #develop test ,,n,develop-test,python3 ./test.py -f 2-query/table_count_scan.py +,,n,develop-test,python3 ./test.py -f 2-query/show_create_db.py ,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/auto_create_table_json.py ,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/custom_col_tag.py ,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/default_json.py diff --git a/tests/script/api/batchprepare.c b/tests/script/api/batchprepare.c index 88dada44accf588357314843d2a7d09964c896c0..0903095dc98dcf3652546859003fa65fd88e1569 100644 --- a/tests/script/api/batchprepare.c +++ b/tests/script/api/batchprepare.c @@ -2828,7 +2828,7 @@ void runAll(TAOS *taos) { printf("%s Begin\n", gCaseCtrl.caseCatalog); runCaseList(taos); -#if 0 +#if 1 strcpy(gCaseCtrl.caseCatalog, "Micro DB precision Test"); printf("%s Begin\n", gCaseCtrl.caseCatalog); gCaseCtrl.precision = TIME_PRECISION_MICRO; diff --git a/tests/script/tsim/query/sys_tbname.sim b/tests/script/tsim/query/sys_tbname.sim index 9b16d982026b1648da938d9f74b4c0ee52f989c4..7b3953129a81d6fa56b093c8b048b2071b6fbe70 100644 --- a/tests/script/tsim/query/sys_tbname.sim +++ b/tests/script/tsim/query/sys_tbname.sim @@ -86,4 +86,23 @@ if $data00 != @ins_tags@ then return -1 endi +sql create stable stb(ts timestamp, f int) tags(t1 int, t2 int, t3 int, t4 int, t5 int); + +$i = 0 +$tbNum = 1000 +$tbPrefix = stb_tb +while $i < $tbNum + $tb = $tbPrefix . $i + sql create table $tb using stb tags( $i , $i , $i , $i , $i ) + + $i = $i + 1 +endw + +sql select tag_value from information_schema.ins_tags where stable_name='stb'; +if $rows != 5000 then + print $rows + return -1 +endi + + #system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tools/shell/inc/shellInt.h b/tools/shell/inc/shellInt.h index 6b3bc56dc767712b3607075aa07fafb85530d39b..1fe09f586356554d017bfbb572dab84ab16baf52 100644 --- a/tools/shell/inc/shellInt.h +++ b/tools/shell/inc/shellInt.h @@ -148,5 +148,6 @@ void shellRunSingleCommandWebsocketImp(char *command); // shellMain.c extern SShellObj shell; +extern void tscWriteCrashInfo(int signum, void *sigInfo, void *context); #endif /*_TD_SHELL_INT_H_*/ diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 4a9cf8343120654899a9080e4f6e219685c88cdf..1f79cfcc0455d5aa911b154bce9817e264da9d03 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -1137,10 +1137,8 @@ int32_t shellExecute() { taosSetSignal(SIGTERM, shellQueryInterruptHandler); taosSetSignal(SIGHUP, shellQueryInterruptHandler); - taosSetSignal(SIGABRT, shellQueryInterruptHandler); - taosSetSignal(SIGINT, shellQueryInterruptHandler); - + #ifdef WEBSOCKET if (!shell.args.restful && !shell.args.cloud) { #endif diff --git a/tools/shell/src/shellMain.c b/tools/shell/src/shellMain.c index fa3c0f2585ad6d6ef14c3dda35fb64024a521a02..22b8e89959593e9da18124908fedac31bd6b45ed 100644 --- a/tools/shell/src/shellMain.c +++ b/tools/shell/src/shellMain.c @@ -19,6 +19,29 @@ SShellObj shell = {0}; + +void shellCrashHandler(int signum, void *sigInfo, void *context) { + taosIgnSignal(SIGTERM); + taosIgnSignal(SIGHUP); + taosIgnSignal(SIGINT); + taosIgnSignal(SIGBREAK); + +#if !defined(WINDOWS) + taosIgnSignal(SIGBUS); +#endif + taosIgnSignal(SIGABRT); + taosIgnSignal(SIGFPE); + taosIgnSignal(SIGSEGV); + + tscWriteCrashInfo(signum, sigInfo, context); + +#ifdef _TD_DARWIN_64 + exit(signum); +#elif defined(WINDOWS) + exit(signum); +#endif +} + int main(int argc, char *argv[]) { shell.exit = false; #ifdef WEBSOCKET @@ -26,6 +49,13 @@ int main(int argc, char *argv[]) { shell.args.cloud = true; #endif +#if !defined(WINDOWS) + taosSetSignal(SIGBUS, shellCrashHandler); +#endif + taosSetSignal(SIGABRT, shellCrashHandler); + taosSetSignal(SIGFPE, shellCrashHandler); + taosSetSignal(SIGSEGV, shellCrashHandler); + if (shellCheckIntSize() != 0) { return -1; }