diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 641d235dd10ab7c815955d7df813d000ec2a61d9..700a4d9daf0985e2608f365149be99ea3e24a877 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -223,8 +223,8 @@ typedef struct SRequestObj { SArray* tableList; SQueryExecMetric metric; SRequestSendRecvBody body; - bool stableQuery; // todo refactor - bool validateOnly; // todo refactor + bool stableQuery; // todo refactor + bool validateOnly; // todo refactor bool killed; uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog @@ -325,7 +325,8 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList); -int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql, SRequestObj** pRequest); +int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql, + SRequestObj** pRequest); void taos_close_internal(void* taos); @@ -359,9 +360,6 @@ int32_t removeMeta(STscObj* pTscObj, SArray* tbList); // todo move to clie int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog); // todo move to xxx bool qnodeRequired(SRequestObj* pRequest); -void initTscQhandle(); -void cleanupTscQhandle(); - #ifdef __cplusplus } #endif diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 6805e4d501e5128fb967bfd2e868a9751d17b10b..207ac01a2c46eb50123ce6e108c617d74560de74 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -35,22 +35,10 @@ SAppInfo appInfo; int32_t clientReqRefPool = -1; int32_t clientConnRefPool = -1; -void *tscQhandle = NULL; - static TdThreadOnce tscinit = PTHREAD_ONCE_INIT; volatile int32_t tscInitRes = 0; -void initTscQhandle() { - // init handle - tscQhandle = taosInitScheduler(4096, 5, "tsc"); -} - -void cleanupTscQhandle() { - // destroy handle - taosCleanUpScheduler(tscQhandle); -} - -static int32_t registerRequest(SRequestObj *pRequest, STscObj* pTscObj) { +static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) { // connection has been released already, abort creating request. pRequest->self = taosAddRef(clientReqRefPool, pRequest); @@ -72,7 +60,7 @@ static int32_t registerRequest(SRequestObj *pRequest, STscObj* pTscObj) { static void deregisterRequest(SRequestObj *pRequest) { assert(pRequest != NULL); - STscObj * pTscObj = pRequest->pTscObj; + STscObj *pTscObj = pRequest->pTscObj; SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary; int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1); @@ -97,7 +85,8 @@ void closeTransporter(SAppInstInfo *pAppInfo) { static bool clientRpcRfp(int32_t code, tmsg_t msgType) { if (NEED_REDIRECT_ERROR(code)) { - if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) { + if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || + msgType == TDMT_SCH_MERGE_FETCH) { return false; } return true; @@ -251,7 +240,7 @@ void *createRequest(uint64_t connId, int32_t type) { return NULL; } - STscObj* pTscObj = acquireTscObj(connId); + STscObj *pTscObj = acquireTscObj(connId); if (pTscObj == NULL) { terrno = TSDB_CODE_TSC_DISCONNECTED; return NULL; @@ -348,7 +337,6 @@ void taos_init_imp(void) { // In the APIs of other program language, taos_cleanup is not available yet. // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning. atexit(taos_cleanup); - initTscQhandle(); errno = TSDB_CODE_SUCCESS; taosSeedRand(taosGetTimestampSec()); @@ -407,7 +395,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) { return 0; } - SConfig * pCfg = taosGetCfg(); + SConfig *pCfg = taosGetCfg(); SConfigItem *pItem = NULL; switch (option) { diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 3605f49a5ce5e0ef00adbe1d3159225dd3be6049..32c11983c2696a5f5b6542a999f7358f31db96c1 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1274,8 +1274,8 @@ typedef struct SchedArg { SEpSet* pEpset; } SchedArg; -void doProcessMsgFromServer(SSchedMsg* schedMsg) { - SchedArg* arg = (SchedArg*)schedMsg->ahandle; +int32_t doProcessMsgFromServer(void* param) { + SchedArg* arg = (SchedArg*)param; SRpcMsg* pMsg = &arg->msg; SEpSet* pEpSet = arg->pEpset; @@ -1328,11 +1328,10 @@ void doProcessMsgFromServer(SSchedMsg* schedMsg) { rpcFreeCont(pMsg->pCont); destroySendMsgInfo(pSendInfo); taosMemoryFree(arg); + return TSDB_CODE_SUCCESS; } void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { - SSchedMsg schedMsg = {0}; - SEpSet* tEpSet = NULL; if (pEpSet != NULL) { tEpSet = taosMemoryCalloc(1, sizeof(SEpSet)); @@ -1343,9 +1342,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { arg->msg = *pMsg; arg->pEpset = tEpSet; - schedMsg.fp = doProcessMsgFromServer; - schedMsg.ahandle = arg; - taosScheduleTask(tscQhandle, &schedMsg); + taosAsyncExec(doProcessMsgFromServer, arg, NULL); } TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) { diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index d3fdfa7a39595569c1e2a7f819e3fc1d7671819d..73def5b9b188b394a9ba3e0e3816a7e0944e4653 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -72,7 +72,6 @@ void taos_cleanup(void) { catalogDestroy(); schedulerDestroy(); - cleanupTscQhandle(); rpcCleanup(); tscInfo("all local resources released"); taosCleanupCfg(); @@ -242,7 +241,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { #endif } else if (TD_RES_TMQ(res)) { - SMqRspObj * msg = ((SMqRspObj *)res); + SMqRspObj *msg = ((SMqRspObj *)res); SReqResultInfo *pResultInfo; if (msg->resIter == -1) { pResultInfo = tmqGetNextResInfo(res, true); @@ -418,7 +417,7 @@ int taos_affected_rows(TAOS_RES *res) { return 0; } - SRequestObj * pRequest = (SRequestObj *)res; + SRequestObj *pRequest = (SRequestObj *)res; SReqResultInfo *pResInfo = &pRequest->body.resInfo; return pResInfo->numOfRows; } @@ -601,7 +600,7 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { } SReqResultInfo *pResInfo = tscGetCurResInfo(res); - TAOS_FIELD * pField = &pResInfo->userFields[columnIndex]; + TAOS_FIELD *pField = &pResInfo->userFields[columnIndex]; if (!IS_VAR_DATA_TYPE(pField->type)) { return 0; } @@ -645,8 +644,8 @@ const char *taos_get_server_info(TAOS *taos) { typedef struct SqlParseWrapper { SParseContext *pCtx; SCatalogReq catalogReq; - SRequestObj * pRequest; - SQuery * pQuery; + SRequestObj *pRequest; + SQuery *pQuery; } SqlParseWrapper; static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) { @@ -665,8 +664,8 @@ static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) { void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { SqlParseWrapper *pWrapper = (SqlParseWrapper *)param; - SQuery * pQuery = pWrapper->pQuery; - SRequestObj * pRequest = pWrapper->pRequest; + SQuery *pQuery = pWrapper->pQuery; + SRequestObj *pRequest = pWrapper->pRequest; if (code == TSDB_CODE_SUCCESS) { code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery); @@ -684,7 +683,8 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { destorySqlParseWrapper(pWrapper); - tscDebug("0x%"PRIx64" analysis semantics completed, start async query, reqId:0x%"PRIx64, pRequest->self, pRequest->requestId); + tscDebug("0x%" PRIx64 " analysis semantics completed, start async query, reqId:0x%" PRIx64, pRequest->self, + pRequest->requestId); launchAsyncQuery(pRequest, pQuery, pResultMeta); } else { destorySqlParseWrapper(pWrapper); @@ -705,7 +705,7 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { } void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) { - int64_t connId = *(int64_t*)taos; + int64_t connId = *(int64_t *)taos; taosAsyncQueryImpl(connId, sql, fp, param, false); } @@ -739,7 +739,7 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) { void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { SParseContext *pCxt = NULL; - STscObj * pTscObj = pRequest->pTscObj; + STscObj *pTscObj = pRequest->pTscObj; int32_t code = 0; if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) { @@ -874,9 +874,9 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { SSchedulerReq req = { - .syncReq = false, - .fetchFp = fetchCallback, - .cbParam = pRequest, + .syncReq = false, + .fetchFp = fetchCallback, + .cbParam = pRequest, }; schedulerFetchRows(pRequest->body.queryJob, &req); @@ -886,7 +886,7 @@ void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { ASSERT(res != NULL && fp != NULL); ASSERT(TD_RES_QUERY(res)); - SRequestObj *pRequest = res; + SRequestObj *pRequest = res; SReqResultInfo *pResultInfo = &pRequest->body.resInfo; // set the current block is all consumed @@ -928,7 +928,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { int64_t connId = *(int64_t *)taos; const int32_t MAX_TABLE_NAME_LENGTH = 12 * 1024 * 1024; // 12MB list int32_t code = 0; - SRequestObj * pRequest = NULL; + SRequestObj *pRequest = NULL; SCatalogReq catalogReq = {0}; if (NULL == tableNameList) { @@ -950,7 +950,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { goto _return; } - STscObj* pTscObj = pRequest->pTscObj; + STscObj *pTscObj = pRequest->pTscObj; code = transferTableNameList(tableNameList, pTscObj->acctId, pTscObj->db, &catalogReq.pTableMeta); if (code) { goto _return; @@ -972,7 +972,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { goto _return; } - SSyncQueryParam* pParam = pRequest->body.param; + SSyncQueryParam *pParam = pRequest->body.param; tsem_wait(&pParam->sem); _return: diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index dcccbb17c98472d1b1a13d3eea9a255be0763494..94bd5dd7871fa7fa636bfd9c8aa6ff90097ff3e1 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -179,7 +179,6 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) { if (code != 0) { terrno = code; if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash); - taosMemoryFreeClear(output.dbVgroup); tscError("0x%" PRIx64 " failed to build use db output since %s", pRequest->requestId, terrstr()); } else if (output.dbVgroup && output.dbVgroup->vgHash) { @@ -189,12 +188,14 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) { if (code1 != TSDB_CODE_SUCCESS) { tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId, tstrerror(code1)); - taosMemoryFreeClear(output.dbVgroup); } else { catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup); + output.dbVgroup = NULL; } } + taosMemoryFreeClear(output.dbVgroup); + tFreeSUsedbRsp(&usedbRsp); char db[TSDB_DB_NAME_LEN] = {0}; diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 417c014b6e1c00047dab1bf4d243714f36766377..5c09c7663fa3d1fc1674d4b3361c99021d8b993c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -22,7 +22,6 @@ typedef struct SLastrowReader { SVnode* pVnode; STSchema* pSchema; uint64_t uid; - // int32_t* pSlotIds; char** transferBuf; // todo remove it soon int32_t numOfCols; int32_t type; @@ -31,6 +30,7 @@ typedef struct SLastrowReader { } SLastrowReader; static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReader, const int32_t* slotIds) { + ASSERT(pReader->numOfCols <= taosArrayGetSize(pBlock->pDataBlock)); int32_t numOfRows = pBlock->info.rows; SColVal colVal = {0}; @@ -40,15 +40,17 @@ static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReade if (slotIds[i] == -1) { colDataAppend(pColInfoData, numOfRows, (const char*)&pRow->ts, false); } else { - tTSRowGetVal(pRow, pReader->pSchema, slotIds[i], &colVal); + int32_t slotId = slotIds[i]; + + tTSRowGetVal(pRow, pReader->pSchema, slotId, &colVal); if (IS_VAR_DATA_TYPE(colVal.type)) { if (colVal.isNull || colVal.isNone) { colDataAppendNULL(pColInfoData, numOfRows); } else { - varDataSetLen(pReader->transferBuf[i], colVal.value.nData); - memcpy(varDataVal(pReader->transferBuf[i]), colVal.value.pData, colVal.value.nData); - colDataAppend(pColInfoData, numOfRows, pReader->transferBuf[i], false); + varDataSetLen(pReader->transferBuf[slotId], colVal.value.nData); + memcpy(varDataVal(pReader->transferBuf[slotId]), colVal.value.pData, colVal.value.nData); + colDataAppend(pColInfoData, numOfRows, pReader->transferBuf[slotId], false); } } else { colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, colVal.isNull || colVal.isNone); @@ -68,7 +70,6 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, p->type = type; p->pVnode = pVnode; p->numOfCols = numOfCols; - p->transferBuf = taosMemoryCalloc(p->numOfCols, POINTER_BYTES); if (taosArrayGetSize(pTableIdList) == 0) { *pReader = p; @@ -79,7 +80,8 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1); p->pTableList = pTableIdList; - for (int32_t i = 0; i < p->numOfCols; ++i) { + p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES); + for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) { if (IS_VAR_DATA_TYPE(p->pSchema->columns[i].type)) { p->transferBuf[i] = taosMemoryMalloc(p->pSchema->columns[i].bytes); } @@ -92,10 +94,11 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t tsdbLastrowReaderClose(void* pReader) { SLastrowReader* p = pReader; - for (int32_t i = 0; i < p->numOfCols; ++i) { + for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) { taosMemoryFreeClear(p->transferBuf[i]); } + taosMemoryFree(p->pSchema); taosMemoryFree(p->transferBuf); taosMemoryFree(pReader); return TSDB_CODE_SUCCESS; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index dc7c54fdc9d1070ceda26677eca1d5dbbd92911a..e86a14a30f52d69714552ed1f3f2ac99e9969a9f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -63,10 +63,10 @@ typedef struct SBlockLoadSuppInfo { } SBlockLoadSuppInfo; typedef struct SFilesetIter { - int32_t numOfFiles; // number of total files - int32_t index; // current accessed index in the list - SArray* pFileList; // data file list - int32_t order; + int32_t numOfFiles; // number of total files + int32_t index; // current accessed index in the list + SArray* pFileList; // data file list + int32_t order; } SFilesetIter; typedef struct SFileDataBlockInfo { @@ -302,7 +302,10 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) { STimeWindow win = {0}; while (1) { - /*if (pReader->pFileReader != NULL) tsdbDataFReaderClose(&pReader->pFileReader);*/ +// if (pReader->pFileReader != NULL) { +// tsdbDataFReaderClose(&pReader->pFileReader); +// } + pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index); int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset); @@ -2434,6 +2437,7 @@ static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowInd SVersionRange* pVerRange, int32_t step) { while (pBlockData->aTSKEY[rowIndex] == key && rowIndex < pBlockData->nRow && rowIndex >= 0) { if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) { + rowIndex += step; continue; } @@ -2832,7 +2836,10 @@ void tsdbReaderClose(STsdbReader* pReader) { destroyBlockScanInfo(pReader->status.pTableMap); blockDataDestroy(pReader->pResBlock); - if (pReader->pFileReader != NULL) tsdbDataFReaderClose(&pReader->pFileReader); + if (pReader->pFileReader != NULL) { + tsdbDataFReaderClose(&pReader->pFileReader); + } + #if 0 // if (pReader->status.pTableScanInfo != NULL) { // pReader->status.pTableScanInfo = destroyTableCheckInfo(pReader->status.pTableScanInfo); @@ -3020,6 +3027,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { memset(pReader->suppInfo.plist, 0, POINTER_BYTES); pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID; + tsdbDataFReaderClose(&pReader->pFileReader); // todo set the correct numOfTables int32_t numOfTables = 1; diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index dce7adfea9aba3a6cb04e773de7bbf33ae633852..fb9f588bae5c1a5ab14a25a6dbb0e47e831c1e4c 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -642,6 +642,7 @@ void ctgFreeSTableIndex(void *info); void ctgClearSubTaskRes(SCtgSubRes *pRes); void ctgFreeQNode(SCtgQNode *node); void ctgClearHandle(SCatalog* pCtg); +void ctgFreeTbCacheImpl(SCtgTbCache *pCache); extern SCatalogMgmt gCtgMgmt; diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 77c1e5b8b119d349e9fb83b8490123a4846c9cb6..499ce7727676381ad28ea0fb672d768e88ac7907 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -647,6 +647,8 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) { CTG_RET(TSDB_CODE_OUT_OF_MEMORY); } + bool syncOp = operation->syncOp; + char* opName = gCtgCacheOperation[operation->opId].name; if (operation->syncOp) { tsem_init(&operation->rspSem, 0, 0); } @@ -664,14 +666,14 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) { gCtgMgmt.queue.tail = node; CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock); + ctgDebug("action [%s] added into queue", opName); + CTG_QUEUE_INC(); CTG_RT_STAT_INC(numOfOpEnqueue, 1); tsem_post(&gCtgMgmt.queue.reqSem); - ctgDebug("action [%s] added into queue", gCtgCacheOperation[operation->opId].name); - - if (operation->syncOp) { + if (syncOp) { tsem_wait(&operation->rspSem); taosMemoryFree(operation); } @@ -840,6 +842,7 @@ _return: ctgFreeVgInfo(dbInfo); taosMemoryFreeClear(op->data); + taosMemoryFreeClear(op); CTG_RET(code); } @@ -852,7 +855,7 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy SCtgUpdateTbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbMetaMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbMetaMsg)); - CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } char *p = strchr(output->dbFName, '.'); @@ -871,6 +874,11 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy _return: + if (output) { + taosMemoryFree(output->tbMeta); + taosMemoryFree(output); + } + taosMemoryFreeClear(msg); CTG_RET(code); @@ -1753,6 +1761,16 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) { CTG_CACHE_STAT_DEC(numOfStb, 1); } + SCtgTbCache* pTbCache = taosHashGet(dbCache->tbCache, msg->stbName, strlen(msg->stbName)); + if (NULL == pTbCache) { + ctgDebug("stb %s already not in cache", msg->stbName); + goto _return; + } + + CTG_LOCK(CTG_WRITE, &pTbCache->metaLock); + ctgFreeTbCacheImpl(pTbCache); + CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock); + if (taosHashRemove(dbCache->tbCache, msg->stbName, strlen(msg->stbName))) { ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:0x%"PRIx64, msg->dbFName, msg->stbName, msg->suid); } else { @@ -1780,14 +1798,24 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) { SCtgDBCache *dbCache = NULL; ctgGetDBCache(pCtg, msg->dbFName, &dbCache); if (NULL == dbCache) { - return TSDB_CODE_SUCCESS; + goto _return; } if (dbCache->dbId != msg->dbId) { ctgDebug("dbId 0x%" PRIx64 " not match with curId 0x%"PRIx64", dbFName:%s, tbName:%s", msg->dbId, dbCache->dbId, msg->dbFName, msg->tbName); - return TSDB_CODE_SUCCESS; + goto _return; } + SCtgTbCache* pTbCache = taosHashGet(dbCache->tbCache, msg->tbName, strlen(msg->tbName)); + if (NULL == pTbCache) { + ctgDebug("tb %s already not in cache", msg->tbName); + goto _return; + } + + CTG_LOCK(CTG_WRITE, &pTbCache->metaLock); + ctgFreeTbCacheImpl(pTbCache); + CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock); + if (taosHashRemove(dbCache->tbCache, msg->tbName, strlen(msg->tbName))) { ctgError("tb %s not exist in cache, dbFName:%s", msg->tbName, msg->dbFName); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); @@ -2063,6 +2091,8 @@ void* ctgUpdateThreadFunc(void* param) { if (operation->syncOp) { tsem_post(&operation->rspSem); + } else { + taosMemoryFreeClear(operation); } CTG_RT_STAT_INC(numOfOpDequeue, 1); diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 59ad00952733623007f3ce0b5fca34c88fedf829..8e0a5b7de3753baaf08d4628032f40dcf51c711e 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -261,6 +261,8 @@ int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { _return: + taosMemoryFree(pMsg->pData); + if (pJob) { taosReleaseRef(gCtgMgmt.jobPool, cbParam->refId); } diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index ad73fe40d2e9606d10bf2b7fc0c207a0c53674e8..a996a70973a18caa8573af224f0bcaa03938e3a3 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -152,6 +152,7 @@ void ctgFreeStbMetaCache(SCtgDBCache *dbCache) { } void ctgFreeTbCacheImpl(SCtgTbCache *pCache) { + qDebug("tbMeta freed, p:%p", pCache->pMeta); taosMemoryFreeClear(pCache->pMeta); if (pCache->pIndex) { taosArrayDestroyEx(pCache->pIndex->pIndex, tFreeSTableIndexInfo); @@ -831,6 +832,7 @@ int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput) if (output->tbMeta) { int32_t metaSize = CTG_META_SIZE(output->tbMeta); (*pOutput)->tbMeta = taosMemoryMalloc(metaSize); + qDebug("tbMeta cloned, size:%d, p:%p", metaSize, (*pOutput)->tbMeta); if (NULL == (*pOutput)->tbMeta) { qError("malloc %d failed", (int32_t)sizeof(STableMetaOutput)); taosMemoryFreeClear(*pOutput); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index a735edafab74e6056cefcb2d055944d827e99146..f1b46aa69b4ce21616e696be702335a579927da4 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2217,7 +2217,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "last_row", .type = FUNCTION_TYPE_LAST_ROW, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, .translateFunc = translateFirstLast, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 0dbf476837f046af5f7a4a65a27c5efaa19937e4..ccf28bfd78f9332d538a99077caf0d8561359c29 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -80,8 +80,10 @@ typedef struct STopBotRes { } STopBotRes; typedef struct SFirstLastRes { - bool hasResult; - bool isNull; // used for last_row function only + bool hasResult; + // used for last_row function only, isNullRes in SResultRowEntry can not be passed to downstream.So, + // this attribute is required + bool isNull; int32_t bytes; char buf[]; } SFirstLastRes; @@ -2948,7 +2950,7 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0; SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pResInfo); - colDataAppend(pCol, pBlock->info.rows, pRes->buf, pResInfo->isNullRes); + colDataAppend(pCol, pBlock->info.rows, pRes->buf, pRes->isNull||pResInfo->isNullRes); // handle selectivity STuplePos* pTuplePos = (STuplePos*)(pRes->buf + pRes->bytes + sizeof(TSKEY)); setSelectivityValue(pCtx, pBlock, pTuplePos, pBlock->info.rows); @@ -5988,24 +5990,28 @@ int32_t lastrowFunction(SqlFunctionCtx* pCtx) { int32_t type = pInputCol->info.type; int32_t bytes = pInputCol->info.bytes; + pInfo->bytes = bytes; + // last_row function does not ignore the null value for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) { - if (pInputCol->hasNull && colDataIsNull_s(pInputCol, i)) { - continue; - } - numOfElems++; char* data = colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) < cts) { - if (IS_VAR_DATA_TYPE(type)) { - bytes = varDataTLen(data); - pInfo->bytes = bytes; + + if (colDataIsNull_s(pInputCol, i)) { + pInfo->isNull = true; + } else { + if (IS_VAR_DATA_TYPE(type)) { + bytes = varDataTLen(data); + pInfo->bytes = bytes; + } + + memcpy(pInfo->buf, data, bytes); } - memcpy(pInfo->buf, data, bytes); *(TSKEY*)(pInfo->buf + bytes) = cts; pInfo->hasResult = true; diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index b4e217ef7482d44d6ebca0509c3043fec175e72b..eeb44c4f82db68b445e944ed1b4870686187f740 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -282,7 +282,7 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t } *str = '"'; - int32_t length = taosUcs4ToMbs((TdUcs4 *)buf, bufSize, str + 1); + int32_t length = taosUcs4ToMbs((TdUcs4*)buf, bufSize, str + 1); if (length <= 0) { return TSDB_CODE_TSC_INVALID_VALUE; } @@ -310,15 +310,15 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t return TSDB_CODE_TSC_INVALID_VALUE; } - if(len) *len = n; + if (len) *len = n; return TSDB_CODE_SUCCESS; } char* parseTagDatatoJson(void* p) { - char* string = NULL; + char* string = NULL; SArray* pTagVals = NULL; - cJSON* json = NULL; + cJSON* json = NULL; if (tTagToValArray((const STag*)p, &pTagVals) != 0) { goto end; } @@ -327,7 +327,7 @@ char* parseTagDatatoJson(void* p) { if (nCols == 0) { goto end; } - char tagJsonKey[256] = {0}; + char tagJsonKey[256] = {0}; json = cJSON_CreateObject(); if (json == NULL) { goto end; @@ -390,7 +390,7 @@ char* parseTagDatatoJson(void* p) { end: cJSON_Delete(json); taosArrayDestroy(pTagVals); - if(string == NULL){ + if (string == NULL) { string = strdup(TSDB_DATA_NULL_STR_L); } return string; diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 6bb8db593c13c28802ff6d63ab48f3da54cd0c66..71d123b7998d1c9759758fad33ff3ab411223d5a 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -41,6 +41,8 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask) { if (pTask->execNodes) { taosHashCleanup(pTask->execNodes); } + + taosMemoryFree(pTask->profile.execTime); } int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum) { diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 465677de806a055bf78eef4eb4114966d382137f..50d4c04a9354b29fdb0c2b9ca3d9eb7b4b819b5a 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -170,6 +170,7 @@ # --- valgrind ./test.sh -f tsim/valgrind/checkError1.sim ./test.sh -f tsim/valgrind/checkError2.sim +./test.sh -f tsim/valgrind/checkError3.sim # --- vnode # ./test.sh -f tsim/vnode/replica3_basic.sim diff --git a/tests/system-test/7-tmq/dataFromTsdbNWal.py b/tests/system-test/7-tmq/dataFromTsdbNWal.py new file mode 100644 index 0000000000000000000000000000000000000000..a55fbbfd1879bbefc3ef039405255b50ea05435f --- /dev/null +++ b/tests/system-test/7-tmq/dataFromTsdbNWal.py @@ -0,0 +1,254 @@ + +import taos +import sys +import time +import socket +import os +import threading +import math + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + def __init__(self): + self.vgroups = 1 + self.ctbNum = 100 + self.rowsPerTbl = 10000 + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + def prepareTestEnv(self): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 10000, + 'batchNum': 1000, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 10, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 1} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) + tdLog.info("create stb") + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) + tdLog.info("create ctb") + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + + tdLog.info("insert data") + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + + tdLog.info("restart taosd to ensure that the data falls into the disk") + # tdDnodes.stop(1) + # tdDnodes.start(1) + tdSql.query("flush database %s"%(paraDict['dbName'])) + return + + def tmqCase1(self): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 10000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 3, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 1} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + paraDict['batchNum'] = 100 + paraDict['startTs'] = paraDict['startTs'] + self.rowsPerTbl + + topicNameList = ['topic1'] + expectRowsList = [] + tmqCom.initConsumerTable() + + tdLog.info("create topics from stb with filter") + queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicNameList[0], queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + # tdSql.query(queryString) + # expectRowsList.append(tdSql.getRows()) + + # init consume info, and start tmq_sim, then check consume result + tdLog.info("insert consume info to consume processor") + consumerId = 0 + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 + topicList = topicNameList[0] + ifcheckdata = 1 + ifManualCommit = 1 + keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + + # after start consume, continue insert some data + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + + # + tdSql.query(queryString) + expectRowsList.append(tdSql.getRows()) + + tdLog.info("wait the consume result") + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + + if expectRowsList[0] != resultList[0]: + tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) + tdLog.exit("%d tmq consume rows error!"%consumerId) + + tmqCom.checkFileContent(consumerId, queryString) + + time.sleep(10) + for i in range(len(topicNameList)): + tdSql.query("drop topic %s"%topicNameList[i]) + + tdLog.printNoPrefix("======== test case 1 end ...... ") + + def tmqCase2(self): + tdLog.printNoPrefix("======== test case 2: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 10000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 3, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 1} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + topicNameList = ['topic1'] + expectRowsList = [] + tmqCom.initConsumerTable() + + tdLog.info("create topics from stb with filter") + queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicNameList[0], queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + tdSql.query(queryString) + expectRowsList.append(tdSql.getRows()) + totalRowsInserted = expectRowsList[0] + + # init consume info, and start tmq_sim, then check consume result + tdLog.info("insert consume info to consume processor") + consumerId = 1 + expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] / 3) + topicList = topicNameList[0] + ifcheckdata = 1 + ifManualCommit = 1 + keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor 0") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + tdLog.info("wait the consume result") + + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + + if not (expectrowcnt <= resultList[0] and totalRowsInserted >= resultList[0]): + tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(resultList[0], expectrowcnt, totalRowsInserted)) + tdLog.exit("%d tmq consume rows error!"%consumerId) + + firstConsumeRows = resultList[0] + + # reinit consume info, and start tmq_sim, then check consume result + tmqCom.initConsumerTable() + consumerId = 2 + expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2/3) + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor 1") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + tdLog.info("wait the consume result") + + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + + actConsumeTotalRows = firstConsumeRows + resultList[0] + + if not (expectrowcnt >= resultList[0] and totalRowsInserted == actConsumeTotalRows): + tdLog.info("act consume rows, first: %d, second: %d "%(firstConsumeRows, resultList[0])) + tdLog.info("and sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted)) + tdLog.exit("%d tmq consume rows error!"%consumerId) + + time.sleep(10) + for i in range(len(topicNameList)): + tdSql.query("drop topic %s"%topicNameList[i]) + + tdLog.printNoPrefix("======== test case 2 end ...... ") + + def run(self): + tdSql.prepare() + self.prepareTestEnv() + self.tmqCase1() + # self.tmqCase2() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/7-tmq/tmqUpdate.py b/tests/system-test/7-tmq/tmqUpdate.py new file mode 100644 index 0000000000000000000000000000000000000000..8b511790eb83f5efe6ebb871429f59c7b2a7988e --- /dev/null +++ b/tests/system-test/7-tmq/tmqUpdate.py @@ -0,0 +1,244 @@ + +import taos +import sys +import time +import socket +import os +import threading +from enum import Enum + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + def __init__(self): + self.vgroups = 4 + self.ctbNum = 500 + self.rowsPerTbl = 1000 + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + def prepareTestEnv(self): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 1000, + 'rowsPerTbl': 1000, + 'batchNum': 400, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 3, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) + tdLog.info("create stb") + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) + tdLog.info("create ctb") + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + tdLog.info("insert data") + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + + # tdLog.info("restart taosd to ensure that the data falls into the disk") + # tdSql.query("flush database %s"%(paraDict['dbName'])) + return + + # 自动建表完成数据插入,启动消费 + def tmqCase1(self): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 1000, + 'rowsPerTbl': 1000, + 'batchNum': 400, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 5, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + # update to half tables + paraDict['ctbNum'] = int(self.ctbNum / 2) + tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + + tdLog.info("create topics from stb1") + topicFromStb1 = 'topic_stb1' + queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicFromStb1, queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + paraDict['ctbNum'] = self.ctbNum + consumerId = 0 + expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 3) + topicList = topicFromStb1 + ifcheckdata = 0 + ifManualCommit = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:true,\ + auto.commit.interval.ms:1000,\ + auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + + tdLog.info("insert process end, and start to check consume result") + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + tdSql.query(queryString) + totalRowsInserted = tdSql.getRows() + + tdLog.info("act consume rows: %d, expect consume rows: %d, act insert rows: %d"%(totalConsumeRows, expectrowcnt, totalRowsInserted)) + if totalConsumeRows != expectrowcnt: + tdLog.exit("tmq consume rows error!") + + tdSql.query("drop topic %s"%topicFromStb1) + + tdLog.printNoPrefix("======== test case 1 end ...... ") + + def tmqCase2(self): + tdLog.printNoPrefix("======== test case 2: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 1000, + 'rowsPerTbl': 1000, + 'batchNum': 1000, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 5, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 1} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tdLog.info("restart taosd to ensure that the data falls into the disk") + tdSql.query("flush database %s"%(paraDict['dbName'])) + + # update to half tables + paraDict['ctbNum'] = int(self.ctbNum / 2) + tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+paraDict['ctbNum']) + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+paraDict['ctbNum']) + + tmqCom.initConsumerTable() + tdLog.info("create topics from stb1") + topicFromStb1 = 'topic_stb1' + queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicFromStb1, queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + paraDict['ctbNum'] = self.ctbNum + consumerId = 0 + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 * 2 + topicList = topicFromStb1 + ifcheckdata = 0 + ifManualCommit = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:true,\ + auto.commit.interval.ms:1000,\ + auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + + tdLog.info("insert process end, and start to check consume result") + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + tdSql.query(queryString) + totalRowsInserted = tdSql.getRows() + + tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt)) + + if totalConsumeRows != totalRowsInserted: + tdLog.exit("tmq consume rows error!") + + tmqCom.checkFileContent(consumerId, queryString) + + tdSql.query("drop topic %s"%topicFromStb1) + + tdLog.printNoPrefix("======== test case 2 end ...... ") + + def run(self): + tdSql.prepare() + self.prepareTestEnv() + self.tmqCase1() + self.tmqCase2() + + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/7-tmq/tmqUpdate1.py b/tests/system-test/7-tmq/tmqUpdate1.py new file mode 100644 index 0000000000000000000000000000000000000000..5f11090385c6abe27b8dc9fa10f9b44e691a2cee --- /dev/null +++ b/tests/system-test/7-tmq/tmqUpdate1.py @@ -0,0 +1,175 @@ + +import taos +import sys +import time +import socket +import os +import threading +from enum import Enum + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + def __init__(self): + self.vgroups = 4 + self.ctbNum = 1000 + self.rowsPerTbl = 1000 + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + def prepareTestEnv(self): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 1000, + 'rowsPerTbl': 1000, + 'batchNum': 400, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 3, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) + tdLog.info("create stb") + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) + tdLog.info("create ctb") + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + tdLog.info("insert data") + paraDict['ctbNum'] = int(self.ctbNum / 2) + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + + # tdLog.info("restart taosd to ensure that the data falls into the disk") + # tdSql.query("flush database %s"%(paraDict['dbName'])) + return + + def tmqCase1(self): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 1000, + 'rowsPerTbl': 1000, + 'batchNum': 1000, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 5, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 1} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tdLog.info("restart taosd to ensure that the data falls into the disk") + tdSql.query("flush database %s"%(paraDict['dbName'])) + + # update to half tables + paraDict['ctbNum'] = int(self.ctbNum / 4) + tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+paraDict['ctbNum']) + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']+paraDict['ctbNum']) + + tmqCom.initConsumerTable() + tdLog.info("create topics from stb1") + topicFromStb1 = 'topic_stb1' + queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicFromStb1, queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + paraDict['ctbNum'] = self.ctbNum + consumerId = 0 + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 + 1) + topicList = topicFromStb1 + ifcheckdata = 0 + ifManualCommit = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:true,\ + auto.commit.interval.ms:1000,\ + auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt + paraDict["rowsPerTbl"] * paraDict["ctbNum"],topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + + paraDict['ctbNum'] = int(self.ctbNum / 2) + paraDict['ctbStartIdx'] += paraDict['ctbNum'] + _ = tmqCom.asyncInsertDataByInterlace(paraDict) + time.sleep(3) + pthread = tmqCom.asyncInsertDataByInterlace(paraDict) + pthread.join() + + tdLog.info("insert process end, and start to check consume result") + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + tdSql.query(queryString) + totalRowsInserted = tdSql.getRows() + + tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt)) + + if totalConsumeRows <= totalRowsInserted or totalConsumeRows != expectrowcnt: + tdLog.exit("tmq consume rows error!") + + tdSql.query("drop topic %s"%topicFromStb1) + + tdLog.printNoPrefix("======== test case 1 end ...... ") + + + def run(self): + tdSql.prepare() + self.prepareTestEnv() + self.tmqCase1() + # self.tmqCase2() + + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tools/taosws-rs b/tools/taosws-rs index 6dccac192a2ae7dd78718ab926201aab5419327a..7a94ffab45f08e16f09b3f430fe75d717054adb6 160000 --- a/tools/taosws-rs +++ b/tools/taosws-rs @@ -1 +1 @@ -Subproject commit 6dccac192a2ae7dd78718ab926201aab5419327a +Subproject commit 7a94ffab45f08e16f09b3f430fe75d717054adb6