diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index ba92ed238b02489bb09d45fb65a0b0071f6fac9a..6805e4d501e5128fb967bfd2e868a9751d17b10b 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -189,12 +189,15 @@ void destroyTscObj(void *pObj) { SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType}; hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey); - int64_t connNum = atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); + destroyAllRequests(pTscObj->pRequests); + taosHashCleanup(pTscObj->pRequests); + schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter); tscDebug("connObj 0x%" PRIx64 " p:%p destroyed, remain inst totalConn:%" PRId64, pTscObj->id, pTscObj, pTscObj->pAppInfo->numOfConns); + int64_t connNum = atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); if (0 == connNum) { destroyAppInst(pTscObj->pAppInfo); } diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index e2d75d39e34c9077591101e636255027e4b672d7..2a9d113108a565694091de4c3ea0c78431c50b57 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -671,8 +671,7 @@ static void *hbThreadFunc(void *param) { } #endif while (1) { - int8_t threadStop = atomic_val_compare_exchange_8(&clientHbMgr.threadStop, 1, 2); - if (1 == threadStop) { + if (1 == clientHbMgr.threadStop) { break; } @@ -760,9 +759,7 @@ static void hbStopThread() { return; } - while (2 != atomic_load_8(&clientHbMgr.threadStop)) { - taosUsleep(10); - } + taosThreadJoin(clientHbMgr.thread, NULL); tscDebug("hb thread stopped"); } diff --git a/source/common/src/systable.c b/source/common/src/systable.c index ba8a8e122049ea567e4d8f04396094b68326f3b0..08997bcaf8ba4f745db9f7d13b131cad47db901b 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -123,6 +123,9 @@ static const SSysDbTableSchema userStbsSchema[] = { {.name = "tags", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "last_update", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "table_comment", .bytes = TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "watermark", .bytes = 64 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "max_delay", .bytes = 64 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "rollup", .bytes = 128 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, }; static const SSysDbTableSchema streamSchema[] = { @@ -146,8 +149,8 @@ static const SSysDbTableSchema userTblsSchema[] = { {.name = "uid", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "ttl", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "table_comment", .bytes = TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "type", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "table_comment", .bytes = TSDB_TB_COMMENT_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "type", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, }; static const SSysDbTableSchema userTblDistSchema[] = { diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 1c234cf2802cb51e1480d7241b3ac7602c2ef565..1e576250285b88aaab8f4aad134bbb240e2646f3 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -2109,7 +2109,7 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)&pStb->updateTime, false); // number of tables - pColInfo = taosArrayGet(pBlock->pDataBlock, cols); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); if (pStb->commentLen > 0) { char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0}; STR_TO_VARSTR(comment, pStb->comment); @@ -2122,6 +2122,34 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc colDataAppendNULL(pColInfo, numOfRows); } + char watermark[64 + VARSTR_HEADER_SIZE] = {0}; + sprintf(varDataVal(watermark), "%" PRId64 "a,%" PRId64 "a", pStb->watermark[0], pStb->watermark[1]); + varDataSetLen(watermark, strlen(varDataVal(watermark))); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)watermark, false); + + char maxDelay[64 + VARSTR_HEADER_SIZE] = {0}; + sprintf(varDataVal(maxDelay), "%" PRId64 "a,%" PRId64 "a", pStb->maxdelay[0], pStb->maxdelay[1]); + varDataSetLen(maxDelay, strlen(varDataVal(maxDelay))); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)maxDelay, false); + + char rollup[128 + VARSTR_HEADER_SIZE] = {0}; + int32_t rollupNum = (int32_t)taosArrayGetSize(pStb->pFuncs); + for (int32_t i = 0; i < rollupNum; ++i) { + char *funcName = taosArrayGet(pStb->pFuncs, i); + if (i) { + strcat(varDataVal(rollup), ", "); + } + strcat(varDataVal(rollup), funcName); + } + varDataSetLen(rollup, strlen(varDataVal(rollup))); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)rollup, false); + numOfRows++; sdbRelease(pSdb, pStb); } diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 3a7ad4a2d613c22eabbc134acaae1b59175004a1..ca50702894a2a473099fc336c385d01588354173 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -1293,6 +1293,7 @@ void catalogDestroy(void) { if (!taosCheckCurrentInDll()) { ctgClearCacheEnqueue(NULL, true, true, true); + taosThreadJoin(gCtgMgmt.updateThread, NULL); } taosHashCleanup(gCtgMgmt.pCluster);