diff --git a/examples/rust/wrapper.h b/examples/rust/wrapper.h new file mode 100644 index 0000000000000000000000000000000000000000..78857597a9dc0c9a0f35194339a0d132f788d592 --- /dev/null +++ b/examples/rust/wrapper.h @@ -0,0 +1 @@ +#include diff --git a/include/libs/monitor/monitor.h b/include/libs/monitor/monitor.h index 3dee59ab14a0ac780f18f731a5abb6dbe559b6fd..1da801be0d95cbe72d1f5fab2a84d2aee92d07c2 100644 --- a/include/libs/monitor/monitor.h +++ b/include/libs/monitor/monitor.h @@ -147,9 +147,9 @@ typedef struct { } SMonStbInfo; typedef struct { - int32_t expire_time; - int64_t timeseries_used; - int64_t timeseries_total; + uint32_t expire_time; + int64_t timeseries_used; + int64_t timeseries_total; } SMonGrantInfo; typedef struct { diff --git a/packaging/tools/com.taosdata.taoskeeper.plist b/packaging/tools/com.taosdata.taoskeeper.plist new file mode 100644 index 0000000000000000000000000000000000000000..338b5d8e79c4a6f529f70e85bdd2d8b64d67264f --- /dev/null +++ b/packaging/tools/com.taosdata.taoskeeper.plist @@ -0,0 +1,33 @@ + + + + + Label + com.tdengine.taoskeeper + ProgramArguments + + /usr/local/bin/taoskeeper + + ProcessType + Interactive + Disabled + + RunAtLoad + + LaunchOnlyOnce + + SessionCreate + + ExitTimeOut + 600 + KeepAlive + + SuccessfulExit + + AfterInitialDemand + + + Program + /usr/local/bin/taoskeeper + + \ No newline at end of file diff --git a/packaging/tools/makepkg.sh b/packaging/tools/makepkg.sh index d71d5df47cad3cd9227dfa1ab9cf1d5bb5dbb059..29160238ce2b077fbb2c155533559f9962bd3554 100755 --- a/packaging/tools/makepkg.sh +++ b/packaging/tools/makepkg.sh @@ -53,7 +53,7 @@ if [ -d ${top_dir}/tools/taos-tools/packaging/deb ]; then cd ${top_dir}/tools/taos-tools/packaging/deb [ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0" - taostools_ver=$(git tag |grep -v taos | sort | tail -1) + taostools_ver=$(git for-each-ref --sort=taggerdate --format '%(tag)' refs/tags|grep -v taos | tail -1) taostools_install_dir="${release_dir}/${clientName2}Tools-${taostools_ver}" cd ${curr_dir} diff --git a/packaging/tools/post.sh b/packaging/tools/post.sh index 986180667745f4b325c05d1ce43ce7a01cb86a1d..c9fab51b286e2df19d9a65793df2e4f6972581fe 100755 --- a/packaging/tools/post.sh +++ b/packaging/tools/post.sh @@ -582,6 +582,11 @@ function install_service_on_launchctl() { ${csudo}cp ${install_main_dir}/service/com.taosdata.taosadapter.plist /Library/LaunchDaemons/com.taosdata.taosadapter.plist || : ${csudo}launchctl load -w /Library/LaunchDaemons/com.taosdata.taosadapter.plist || : fi + if [ -f ${install_main_dir}/service/com.taosdata.taoskeeper.plist ]; then + ${csudo}launchctl unload -w /Library/LaunchDaemons/com.taosdata.taoskeeper.plist > /dev/null 2>&1 || : + ${csudo}cp ${install_main_dir}/service/com.taosdata.taoskeeper.plist /Library/LaunchDaemons/com.taosdata.taoskeeper.plist || : + ${csudo}launchctl load -w /Library/LaunchDaemons/com.taosdata.taoskeeper.plist || : + fi } function install_taosadapter_service() { diff --git a/source/client/inc/clientSml.h b/source/client/inc/clientSml.h index d8d41d8be6e75c32c7c5da06dbcfca5c508a8b62..92896e6f23ffd0b37e3387ac800417519f19dd6a 100644 --- a/source/client/inc/clientSml.h +++ b/source/client/inc/clientSml.h @@ -70,7 +70,7 @@ extern "C" { #define VALUE_LEN 6 #define OTD_JSON_FIELDS_NUM 4 -#define MAX_RETRY_TIMES 5 +#define MAX_RETRY_TIMES 100 typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType; typedef enum { diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index e3e20ee85d4de996b664ccbaeed7514a992c9937..de08ba66cc76505b951653735f918faf8b2885de 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -543,7 +543,7 @@ void taos_init_imp(void) { if (taosCreateLog("taoslog", 10, configDir, NULL, NULL, NULL, NULL, 1) != 0) { // ignore create log failed, only print - printf(" WARING: Create taoslog failed. configDir=%s\n", configDir); + printf(" WARING: Create taoslog failed:%s. configDir=%s\n", strerror(errno), configDir); } if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1) != 0) { diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 7f58b579fa2d53abf55c9a1266911a03872d1d34..1719759822514d77e2cc2b33baa663a4dbeb99b5 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -750,6 +750,7 @@ end: } static int32_t smlModifyDBSchemas(SSmlHandle *info) { + uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas start, format:%d, needModifySchema:%d", info->id, info->dataFormat, info->needModifySchema); if (info->dataFormat && !info->needModifySchema) { return TSDB_CODE_SUCCESS; } @@ -779,6 +780,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta); if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) { + uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas create table:%s", info->id, pName.tname); SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField)); SArray *pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField)); code = smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true); @@ -818,6 +820,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { goto end; } if (action != SCHEMA_ACTION_NULL) { + uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas change table tag, table:%s, action:%d", info->id, pName.tname, action); SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField)); SArray *pTags = @@ -869,6 +872,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { goto end; } if (action != SCHEMA_ACTION_NULL) { + uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas change table col, table:%s, action:%d", info->id, pName.tname, action); SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField)); SArray *pTags = @@ -935,15 +939,19 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { } sTableData->tableMeta = pTableMeta; - + uDebug("SML:0x%" PRIx64 "modify schema uid:%" PRIu64 ", sversion:%d, tversion:%d", info->id, pTableMeta->uid, pTableMeta->sversion, pTableMeta->tversion) tmp = (SSmlSTableMeta **)taosHashIterate(info->superTables, tmp); } + uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas end success, format:%d, needModifySchema:%d", info->id, info->dataFormat, info->needModifySchema); + return 0; end: taosHashCleanup(hashTmp); taosMemoryFreeClear(pTableMeta); - // catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1); + catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1); + uError("SML:0x%" PRIx64 " smlModifyDBSchemas end failed:%d:%s, format:%d, needModifySchema:%d", info->id, code, tstrerror(code), info->dataFormat, info->needModifySchema); + return code; } @@ -1019,8 +1027,9 @@ static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols } else { size_t tmp = taosArrayGetSize(metaArray); if (tmp > INT16_MAX) { + smlBuildInvalidDataMsg(msg, "too many cols or tags", kv->key); uError("too many cols or tags"); - return -1; + return TSDB_CODE_SML_INVALID_DATA; } int16_t size = tmp; int ret = taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES); @@ -1170,6 +1179,7 @@ static int32_t smlPushCols(SArray *colsArray, SArray *cols) { } static int32_t smlParseLineBottom(SSmlHandle *info) { + uDebug("SML:0x%" PRIx64 " smlParseLineBottom start, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum); if (info->dataFormat) return TSDB_CODE_SUCCESS; for (int32_t i = 0; i < info->lineNum; i++) { @@ -1212,6 +1222,7 @@ static int32_t smlParseLineBottom(SSmlHandle *info) { SSmlSTableMeta **tableMeta = (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen); if (tableMeta) { // update meta + uDebug("SML:0x%" PRIx64 " smlParseLineBottom update meta, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum); ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, elements->colArray, false, &info->msgBuf); if (ret == TSDB_CODE_SUCCESS) { ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, tinfo->tags, true, &info->msgBuf); @@ -1226,7 +1237,7 @@ static int32_t smlParseLineBottom(SSmlHandle *info) { // uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id); // return ret; // } - + uDebug("SML:0x%" PRIx64 " smlParseLineBottom add meta, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum); SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat); smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags); if(terrno == TSDB_CODE_DUP_KEY){return terrno;} @@ -1234,12 +1245,14 @@ static int32_t smlParseLineBottom(SSmlHandle *info) { taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES); } } + uDebug("SML:0x%" PRIx64 " smlParseLineBottom end, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum); return TSDB_CODE_SUCCESS; } static int32_t smlInsertData(SSmlHandle *info) { int32_t code = TSDB_CODE_SUCCESS; + uDebug("SML:0x%" PRIx64 " smlInsertData start, format:%d", info->id, info->dataFormat); if(info->pRequest->dbList == NULL){ info->pRequest->dbList = taosArrayInit(1, TSDB_DB_FNAME_LEN); @@ -1284,6 +1297,7 @@ static int32_t smlInsertData(SSmlHandle *info) { // use tablemeta of stable to save vgid and uid of child table (*pMeta)->tableMeta->vgId = vg.vgId; (*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid + uDebug("SML:0x%" PRIx64 " smlInsertData table:%s, uid:%" PRIu64 ", format:%d", info->id, pName.tname, tableData->uid, info->dataFormat); code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, (*pMeta)->cols, tableData->cols, (*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen, @@ -1306,16 +1320,18 @@ static int32_t smlInsertData(SSmlHandle *info) { atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1); launchQueryImpl(info->pRequest, info->pQuery, true, NULL); + uDebug("SML:0x%" PRIx64 " smlInsertData end, format:%d, code:%d,%s", info->id, info->dataFormat, info->pRequest->code, tstrerror(info->pRequest->code)); + return info->pRequest->code; } static void smlPrintStatisticInfo(SSmlHandle *info) { uDebug( "SML:0x%" PRIx64 - " smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \ + " smlInsertLines result, code:%d, msg:%s, lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \ parse cost:%" PRId64 ",schema cost:%" PRId64 ",bind cost:%" PRId64 ",rpc cost:%" PRId64 ",total cost:%" PRId64 "", - info->id, info->cost.code, info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables, + info->id, info->cost.code, tstrerror(info->cost.code), info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables, info->cost.numOfCreateSTables, info->cost.numOfAlterTagSTables, info->cost.numOfAlterColSTables, info->cost.schemaTime - info->cost.parseTime, info->cost.insertBindTime - info->cost.schemaTime, info->cost.insertRpcTime - info->cost.insertBindTime, info->cost.endTime - info->cost.insertRpcTime, @@ -1360,6 +1376,7 @@ int32_t smlClearForRerun(SSmlHandle *info) { } static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) { + uDebug("SML:0x%" PRIx64 " smlParseLine start", info->id); int32_t code = TSDB_CODE_SUCCESS; if (info->protocol == TSDB_SML_JSON_PROTOCOL) { if (lines) { @@ -1395,8 +1412,16 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char } } - uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, len:%d, sql:%s", info->id, info->isRawLine, len, - (info->isRawLine ? "rawdata" : tmp)); + char cTmp = 0; // for print tmp if is raw + if(info->isRawLine){ + cTmp = tmp[len - 1]; + tmp[len - 1] = '\0'; + } + + uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, numLines:%d, protocol:%d, len:%d, sql:%s", info->id, info->isRawLine, numLines, info->protocol, len, tmp); + if(info->isRawLine){ + tmp[len - 1] = cTmp; + } if (info->protocol == TSDB_SML_LINE_PROTOCOL) { if (info->dataFormat) { @@ -1421,6 +1446,7 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char return code; } if (info->reRun) { + uDebug("SML:0x%" PRIx64 " smlParseLine re run", info->id); i = 0; rawLine = oldRaw; code = smlClearForRerun(info); @@ -1431,6 +1457,7 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char } i++; } + uDebug("SML:0x%" PRIx64 " smlParseLine end", info->id); return code; } @@ -1461,7 +1488,8 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL do { code = smlModifyDBSchemas(info); if (code == 0) break; - taosMsleep(200); + taosMsleep(500); + uInfo("SML:0x%" PRIx64 " smlModifyDBSchemas retry code:%s, times:%d", info->id, tstrerror(code), retryNum); } while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES); if (code != 0) { @@ -1488,6 +1516,7 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, } SRequestObj *request = NULL; SSmlHandle *info = NULL; + int cnt = 0; while(1){ request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid); if (request == NULL) { @@ -1542,16 +1571,22 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, request->code = code; info->cost.endTime = taosGetTimestampUs(); info->cost.code = code; - smlPrintStatisticInfo(info); - if(code == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || code == TSDB_CODE_SDB_OBJ_CREATING){ + if(code == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || code == TSDB_CODE_SDB_OBJ_CREATING + || code == TSDB_CODE_PAR_VALUE_TOO_LONG || code == TSDB_CODE_MND_TRANS_CONFLICT){ + if(cnt++ >= 10){ + uInfo("SML:%"PRIx64" retry:%d/10 end code:%d, msg:%s", info->id, cnt, code, tstrerror(code)); + break; + } + taosMsleep(100); refreshMeta(request->pTscObj, request); - uInfo("SML:%"PRIx64" ver is old retry or object is creating code:%d", info->id, code); + uInfo("SML:%"PRIx64" retry:%d/10,ver is old retry or object is creating code:%d, msg:%s", info->id, cnt, code, tstrerror(code)); smlDestroyInfo(info); info = NULL; taos_free_result(request); request = NULL; continue; } + smlPrintStatisticInfo(info); break; } diff --git a/source/client/src/clientSmlLine.c b/source/client/src/clientSmlLine.c index b1aea1bfaa55c0117ee413c2d1b59a35244e57d2..335e3a1dc76a0b1cfd231f43e3e43632cd4e44ef 100644 --- a/source/client/src/clientSmlLine.c +++ b/source/client/src/clientSmlLine.c @@ -583,12 +583,14 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine .i = ts, .length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes}; if (info->dataFormat) { + uDebug("SML:0x%" PRIx64 " smlParseInfluxString format true, ts:%" PRId64, info->id, ts); ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0); if(ret != TSDB_CODE_SUCCESS){return ret;} ret = smlBuildRow(info->currTableDataCtx); if(ret != TSDB_CODE_SUCCESS){return ret;} clearColValArray(info->currTableDataCtx->pValues); } else { + uDebug("SML:0x%" PRIx64 " smlParseInfluxString format false, ts:%" PRId64, info->id, ts); taosArraySet(elements->colArray, 0, &kv); } info->preLine = *elements; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 79139a5fe5eea6a8cab73e5775d3b14a2cef921a..77bbd0be1a77bd0d673a52bb4b1842459fae76eb 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -24,7 +24,7 @@ #include "tref.h" #include "ttimer.h" -#define EMPTY_BLOCK_POLL_IDLE_DURATION 100 +#define EMPTY_BLOCK_POLL_IDLE_DURATION 10 #define DEFAULT_AUTO_COMMIT_INTERVAL 5000 struct SMqMgmt { @@ -148,7 +148,8 @@ typedef struct { typedef struct { int8_t tmqRspType; - int32_t epoch; + int32_t epoch; // epoch can be used to guard the vgHandle + int32_t vgId; SMqClientVg* vgHandle; SMqClientTopic* topicHandle; uint64_t reqId; @@ -1329,6 +1330,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { pRspWrapper->topicHandle = pTopic; pRspWrapper->reqId = requestId; pRspWrapper->pEpset = pMsg->pEpSet; + pRspWrapper->vgId = pVg->vgId; pMsg->pEpSet = NULL; if (rspType == TMQ_MSG_TYPE__POLL_RSP) { @@ -1747,7 +1749,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { for (int j = 0; j < numOfVg; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 100ms - tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 100ms before start next poll", tmq->consumerId, tmq->epoch, + tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, tmq->epoch, pVg->vgId); continue; } @@ -1864,9 +1866,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { return pRsp; } } else { - SMqClientVg* pVg = pollRspWrapper->vgHandle; tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", - tmq->consumerId, pVg->vgId, pDataRsp->head.epoch, consumerEpoch); + tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); } @@ -1886,7 +1887,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { return pRsp; } else { tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", - tmq->consumerId, pollRspWrapper->vgHandle->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch); + tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); } @@ -1933,7 +1934,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } else { tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", - tmq->consumerId, pollRspWrapper->vgHandle->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); + tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); } @@ -1955,7 +1956,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { void* rspObj; int64_t startTime = taosGetTimestampMs(); - tscDebug("consumer:0x%" PRIx64 " start to poll at %" PRId64, tmq->consumerId, startTime); + tscDebug("consumer:0x%" PRIx64 " start to poll at %"PRId64", timeout:%" PRId64, tmq->consumerId, startTime, timeout); #if 0 tmqHandleAllDelayedTask(tmq); @@ -2017,37 +2018,43 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { } } -int32_t tmq_consumer_close(tmq_t* tmq) { - tscDebug("consumer:0x%" PRIx64" start to close consumer, status:%d", tmq->consumerId, tmq->status); +static void displayConsumeStatistics(const tmq_t* pTmq) { + int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics); + tscDebug("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d", + pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch); - if (tmq->status == TMQ_CONSUMER_STATUS__READY) { - int32_t rsp = tmq_commit_sync(tmq, NULL); - if (rsp != 0) { - return rsp; + tscDebug("consumer:0x%" PRIx64 " rows dist begin: ", pTmq->consumerId); + for (int32_t i = 0; i < numOfTopics; ++i) { + SMqClientTopic* pTopics = taosArrayGet(pTmq->clientTopics, i); + + tscDebug("consumer:0x%" PRIx64 " topic:%d", pTmq->consumerId, i); + int32_t numOfVgs = taosArrayGetSize(pTopics->vgs); + for (int32_t j = 0; j < numOfVgs; ++j) { + SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j); + tscDebug("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows); } + } - int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); - tscDebug("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d", - tmq->consumerId, tmq->pollCnt, tmq->totalRows, numOfTopics, tmq->epoch); + tscDebug("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId); +} - tscDebug("consumer:0x%" PRIx64 " rows dist begin: ", tmq->consumerId); - for (int32_t i = 0; i < numOfTopics; ++i) { - SMqClientTopic* pTopics = taosArrayGet(tmq->clientTopics, i); +int32_t tmq_consumer_close(tmq_t* tmq) { + tscDebug("consumer:0x%" PRIx64" start to close consumer, status:%d", tmq->consumerId, tmq->status); + displayConsumeStatistics(tmq); - tscDebug("consumer:0x%" PRIx64 " topic:%d", tmq->consumerId, i); - int32_t numOfVgs = taosArrayGetSize(pTopics->vgs); - for (int32_t j = 0; j < numOfVgs; ++j) { - SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j); - tscDebug("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows); + if (tmq->status == TMQ_CONSUMER_STATUS__READY) { + // if auto commit is set, commit before close consumer. Otherwise, do nothing. + if (tmq->autoCommit) { + int32_t rsp = tmq_commit_sync(tmq, NULL); + if (rsp != 0) { + return rsp; } } - tscDebug("consumer:0x%" PRIx64 " rows dist end", tmq->consumerId); - int32_t retryCnt = 0; tmq_list_t* lst = tmq_list_new(); while (1) { - rsp = tmq_subscribe(tmq, lst); + int32_t rsp = tmq_subscribe(tmq, lst); if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) { break; } else { @@ -2057,6 +2064,8 @@ int32_t tmq_consumer_close(tmq_t* tmq) { } tmq_list_destroy(lst); + } else { + tscWarn("consumer:0x%" PRIx64" not in ready state, close it directly", tmq->consumerId); } taosRemoveRef(tmqMgmt.rsetId, tmq->refId); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 0b4c7b88d764c03f98ff1298a903d8bfbd0af94a..aeeec1d61ce8b9fb307ba870d2a3872e7b5641e2 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -1238,13 +1238,13 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi } if (taosLoadCfg(pCfg, envCmd, cfgDir, envFile, apolloUrl) != 0) { - uError("failed to load cfg since %s", terrstr()); + printf("failed to load cfg since %s", terrstr()); cfgCleanup(pCfg); return -1; } if (cfgLoadFromArray(pCfg, pArgs) != 0) { - uError("failed to load cfg from array since %s", terrstr()); + printf("failed to load cfg from array since %s", terrstr()); cfgCleanup(pCfg); return -1; } @@ -1260,13 +1260,13 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi if (taosMulModeMkDir(tsLogDir, 0777) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to create dir:%s since %s", tsLogDir, terrstr()); + printf("failed to create dir:%s since %s", tsLogDir, terrstr()); cfgCleanup(pCfg); return -1; } if (taosInitLog(logname, logFileNum) != 0) { - uError("failed to init log file since %s", terrstr()); + printf("failed to init log file since %s", terrstr()); cfgCleanup(pCfg); return -1; } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index dfc3b3fde85fe81049cc2842d6d7c6f4c5e4380e..e891eef1d8245bce3bda4e67df05286a754c3c42 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -457,6 +457,7 @@ typedef struct { void* pIter; SMnode* pMnode; STableMetaRsp* pMeta; + bool restore; bool sysDbRsp; char db[TSDB_DB_FNAME_LEN]; char filterTb[TSDB_TABLE_NAME_LEN]; diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index c32212dfc1a9133771dc6747d4c636b6adb7d00e..a096e0341e6ad65aa1a374a7f4b80072eca5550d 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -864,7 +864,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr } // grant info - pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f; + pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 1000; pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed; if (pMnode->grant.expireTimeMS == 0) { pGrantInfo->expire_time = INT32_MAX; diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index 7fe08514f62108ae08f75c2d875cb117881b9298..c50b205f37f569fced5a690f329ae3ba26c1ac6a 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -324,7 +324,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) { pReq->info.rsp = pRsp; pReq->info.rspLen = size; - if (rowsRead == 0 || rowsRead < rowsToRead) { + if (rowsRead == 0 || ((rowsRead < rowsToRead) && !pShow->restore)) { pRsp->completed = 1; mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id); mndReleaseShowObj(pShow, true); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 2a369a863a8f6a79356d058992c0717d16c2a557..c577097644d8b296db434a1b71e5e9d11a98b774 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -3113,9 +3113,18 @@ static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; STR_TO_VARSTR(typeName, "SUPER_TABLE"); + bool fetch = pShow->restore ? false : true; + pShow->restore = false; while (numOfRows < rows) { - pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb); - if (pShow->pIter == NULL) break; + if (fetch) { + pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb); + if (pShow->pIter == NULL) break; + } else { + fetch = true; + void *pKey = taosHashGetKey(pShow->pIter, NULL); + pStb = sdbAcquire(pSdb, SDB_STB, pKey); + if (!pStb) continue; + } if (pDb != NULL && pStb->dbUid != pDb->uid) { sdbRelease(pSdb, pStb); @@ -3129,6 +3138,17 @@ static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB sdbRelease(pSdb, pStb); continue; } + + if ((numOfRows + pStb->numOfColumns) > rows) { + pShow->restore = true; + if (numOfRows == 0) { + mError("mndRetrieveStbCol failed to get stable cols since buf:%d less than result:%d, stable name:%s, db:%s", + rows, pStb->numOfColumns, pStb->name, pStb->db); + } + sdbRelease(pSdb, pStb); + break; + } + varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE])); mDebug("mndRetrieveStbCol get stable cols, stable name:%s, db:%s", pStb->name, pStb->db); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 29d1d0aafaa53bd81166b72098dd461a886eecd3..2d053d04aebdbfd533e289ff804481a53c8895f0 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -157,7 +157,7 @@ typedef struct SMTbCursor SMTbCursor; SMTbCursor *metaOpenTbCursor(SMeta *pMeta); void metaCloseTbCursor(SMTbCursor *pTbCur); int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType); -int32_t metaTbCursorPrev(SMTbCursor *pTbCur); +int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType); #endif diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index ee2bd007ce55539e0a1df703152029e088b72925..c2b38f5cd13101927bcdbc8f5ea1762c0885e1f7 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -141,7 +141,7 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle); // tqRead int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset); int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset); -int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum); +int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum, uint64_t reqId); // tqExec int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp); diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 6ab322d26aeb7bf1844db50599e9227dfa5cb4f6..3376150d264675b98e8f7455ea941d956dfc7f58 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -336,7 +336,7 @@ int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType) { return 0; } -int32_t metaTbCursorPrev(SMTbCursor *pTbCur) { +int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType) { int ret; void *pBuf; STbCfg tbCfg; @@ -350,7 +350,7 @@ int32_t metaTbCursorPrev(SMTbCursor *pTbCur) { tDecoderClear(&pTbCur->mr.coder); metaGetTableEntryByVersion(&pTbCur->mr, ((SUidIdxVal *)pTbCur->pVal)[0].version, *(tb_uid_t *)pTbCur->pKey); - if (pTbCur->mr.me.type == TSDB_SUPER_TABLE) { + if (pTbCur->mr.me.type == jumpTableType) { continue; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1ee799a8cfc6bf9138209329e4a84c9b98001e32..c4800e50510b69d4fd669c15465300fc0c9f8af9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -309,70 +309,6 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con return 0; } -//int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const STaosxRsp* pRsp) { -//#if 0 -// A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); -// A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); -// -// if (pRsp->withSchema) { -// A(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum); -// } else { -// A(taosArrayGetSize(pRsp->blockSchema) == 0); -// } -// -// if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) { -// if (pRsp->blockNum > 0) { -// A(pRsp->rspOffset.version > pRsp->reqOffset.version); -// } else { -// A(pRsp->rspOffset.version >= pRsp->reqOffset.version); -// } -// } -//#endif -// -// int32_t len = 0; -// int32_t code = 0; -// tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code); -// if (code < 0) { -// return -1; -// } -// -// int32_t tlen = sizeof(SMqRspHead) + len; -// void* buf = rpcMallocCont(tlen); -// if (buf == NULL) { -// terrno = TSDB_CODE_OUT_OF_MEMORY; -// return -1; -// } -// -// ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__TAOSX_RSP; -// ((SMqRspHead*)buf)->epoch = pReq->epoch; -// ((SMqRspHead*)buf)->consumerId = pReq->consumerId; -// -// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); -// -// SEncoder encoder = {0}; -// tEncoderInit(&encoder, abuf, len); -// tEncodeSTaosxRsp(&encoder, pRsp); -// tEncoderClear(&encoder); -// -// SRpcMsg rsp = { -// .info = pMsg->info, -// .pCont = buf, -// .contLen = tlen, -// .code = 0, -// }; -// -// tmsgSendRsp(&rsp); -// -// char buf1[80] = {0}; -// char buf2[80] = {0}; -// tFormatOffset(buf1, 80, &pRsp->reqOffset); -// tFormatOffset(buf2, 80, &pRsp->rspOffset); -// -// tqDebug("taosx rsp, vgId:%d, consumer:0x%" PRIx64 " (epoch %d) send rsp, numOfBlks:%d, req:%s, rsp:%s", -// TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); -// return 0; -//} - static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) { return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG && pLeft->val.version <= pRight->val.version; @@ -615,9 +551,9 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* // NOTE: this pHandle->consumerId may have been changed already. tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64 - ", ts:%" PRId64, + ", ts:%" PRId64", reqId:0x%"PRIx64, consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid, - dataRsp.rspOffset.ts); + dataRsp.rspOffset.ts, pRequest->reqId); tDeleteSMqDataRsp(&dataRsp); return code; @@ -637,9 +573,9 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* if (metaRsp.metaRspLen > 0) { code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp); tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 - ",version:%" PRId64, + ",ts:%" PRId64, consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, - metaRsp.rspOffset.version); + metaRsp.rspOffset.ts); taosMemoryFree(metaRsp.metaRsp); tDeleteSTaosxRsp(&taosxRsp); return code; @@ -680,17 +616,12 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* break; } - if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) { + if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); -// if (terrno == 0) { // failed to seek to given ver, but no errors happen. -// code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, (SMqDataRsp*) &taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); -// return code; -// } else { // error happens, return to consumers - code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); - tDeleteSTaosxRsp(&taosxRsp); - taosMemoryFreeClear(pCkHead); - return code; -// } + code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); + tDeleteSTaosxRsp(&taosxRsp); + taosMemoryFreeClear(pCkHead); + return code; } SWalCont* pHead = &pCkHead->head; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 1c8088be7e48c426a97551284518f493108d8d83..7f9563ae5f9a3b43e5ad1246115447defc783584 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -183,23 +183,24 @@ end: return tbSuid == realTbSuid; } -int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) { +int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead, uint64_t reqId) { int32_t code = 0; + int32_t vgId = TD_VID(pTq->pVnode); taosThreadMutexLock(&pHandle->pWalReader->mutex); int64_t offset = *fetchOffset; while (1) { if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) { - tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return", - pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode), offset); + tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return, reqId:0x%"PRIx64, + pHandle->consumerId, pHandle->epoch, vgId, offset, reqId); *fetchOffset = offset - 1; code = -1; goto END; } - tqDebug("vgId:%d, taosx get msg ver %" PRId64 ", type: %s", pTq->pVnode->config.vgId, offset, - TMSG_INFO((*ppCkHead)->head.msgType)); + tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64, vgId, + pHandle->consumerId, offset, TMSG_INFO((*ppCkHead)->head.msgType), reqId); if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) { code = walFetchBody(pHandle->pWalReader, ppCkHead); diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index ad93cc567c0bdce904d09263b4b0e198787a2903..95981c2f0888dd4b9b53b8726a2de3563bca79b8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -134,7 +134,6 @@ static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* id // all queried tables have been dropped already, return immediately. if (p->pSchema == NULL) { - taosMemoryFree(p); tsdbWarn("all queried tables has been dropped, try next group, %s", idstr); return TSDB_CODE_PAR_TABLE_NOT_EXIST; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index e5f3c7406de63d787d2d4418780b491b683b140f..5ad9276c6c06c28b72fcb93bd11a28e77ff329da 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -922,7 +922,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN pBlockNum->numOfBlocks += 1; } - if ((pScanInfo->pBlockList != NULL) && (taosArrayGetSize(pScanInfo->pBlockList) > 0)) { + if (taosArrayGetSize(pScanInfo->pBlockList) > 0) { numOfQTable += 1; } } @@ -4220,12 +4220,8 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) { if (pStatus->loadFromFile) { SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); if (pBlockInfo != NULL) { - pBlockScanInfo = - *(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); + pBlockScanInfo = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, pReader->idStr); if (pBlockScanInfo == NULL) { - code = TSDB_CODE_INVALID_PARA; - tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid, - taosHashGetSize(pReader->status.pTableMap), pReader->idStr); goto _err; } } else { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 29a23cd90bbb3f762ad2b12c7410ee8b17be5475..ee92805d2798d91ed67aba13c4b52a1c1d931c3b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1194,6 +1194,8 @@ static int32_t getPreSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, if (code != TSDB_CODE_SUCCESS) { SET_SESSION_WIN_KEY_INVALID(pKey); } + + taosMemoryFree(pCur); return code; } diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 24f42ff178c45660a13f58df2f3c025eb3e78048..f24d3523c880dfa453ce08f2b60a465e30a03abb 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -57,9 +57,11 @@ typedef struct SSysTableScanInfo { const char* pUser; bool sysInfo; bool showRewrite; + bool restore; SNode* pCondition; // db_name filter condition, to discard data that are not in current database SMTbCursor* pCur; // cursor for iterate the local table meta store. SSysTableIndex* pIdx; // idx for local table meta + SHashObj* pSchema; SColMatchInfo matchInfo; SName name; SSDataBlock* pRes; @@ -514,9 +516,23 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta); } - SHashObj* stableSchema = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); - taosHashSetFreeFp(stableSchema, tDeleteSSchemaWrapperForHash); - while ((ret = metaTbCursorNext(pInfo->pCur, TSDB_TABLE_MAX)) == 0) { + if (pInfo->pSchema == NULL) { + pInfo->pSchema = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + taosHashSetFreeFp(pInfo->pSchema, tDeleteSSchemaWrapperForHash); + } + + if (!pInfo->pCur || !pInfo->pSchema) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + qError("sysTableScanUserCols failed since %s", terrstr(terrno)); + blockDataDestroy(dataBlock); + pInfo->loadInfo.totalRows = 0; + return NULL; + } + + int32_t restore = pInfo->restore; + pInfo->restore = false; + while (restore || ((ret = metaTbCursorNext(pInfo->pCur, TSDB_TABLE_MAX)) == 0)) { + if (restore) restore = false; char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; @@ -524,33 +540,36 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { if (pInfo->pCur->mr.me.type == TSDB_SUPER_TABLE) { qDebug("sysTableScanUserCols cursor get super table"); - void* schema = taosHashGet(stableSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t)); + void* schema = taosHashGet(pInfo->pSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t)); if (schema == NULL) { SSchemaWrapper* schemaWrapper = tCloneSSchemaWrapper(&pInfo->pCur->mr.me.stbEntry.schemaRow); - taosHashPut(stableSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES); + taosHashPut(pInfo->pSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES); } continue; } else if (pInfo->pCur->mr.me.type == TSDB_CHILD_TABLE) { qDebug("sysTableScanUserCols cursor get child table"); STR_TO_VARSTR(typeName, "CHILD_TABLE"); STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name); - int64_t suid = pInfo->pCur->mr.me.ctbEntry.suid; - void* schema = taosHashGet(stableSchema, &pInfo->pCur->mr.me.ctbEntry.suid, sizeof(int64_t)); + void* schema = taosHashGet(pInfo->pSchema, &pInfo->pCur->mr.me.ctbEntry.suid, sizeof(int64_t)); if (schema != NULL) { schemaRow = *(SSchemaWrapper**)schema; } else { - tDecoderClear(&pInfo->pCur->mr.coder); - int code = metaGetTableEntryByUid(&pInfo->pCur->mr, suid); + SMetaReader smrSuperTable = {0}; + metaReaderInit(&smrSuperTable, pInfo->readHandle.meta, 0); + int code = metaGetTableEntryByUid(&smrSuperTable, suid); if (code != TSDB_CODE_SUCCESS) { // terrno has been set by metaGetTableEntryByName, therefore, return directly qError("sysTableScanUserCols get meta by suid:%" PRId64 " error, code:%d", suid, code); + metaReaderClear(&smrSuperTable); blockDataDestroy(dataBlock); pInfo->loadInfo.totalRows = 0; - taosHashCleanup(stableSchema); return NULL; } - schemaRow = &pInfo->pCur->mr.me.stbEntry.schemaRow; + SSchemaWrapper* schemaWrapper = tCloneSSchemaWrapper(&smrSuperTable.me.stbEntry.schemaRow); + taosHashPut(pInfo->pSchema, &suid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES); + schemaRow = schemaWrapper; + metaReaderClear(&smrSuperTable); } } else if (pInfo->pCur->mr.me.type == TSDB_NORMAL_TABLE) { qDebug("sysTableScanUserCols cursor get normal table"); @@ -562,20 +581,19 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { continue; } - sysTableUserColsFillOneTableCols(pInfo, dbname, &numOfRows, dataBlock, tableName, schemaRow, typeName); - - if (numOfRows >= pOperator->resultInfo.capacity) { + if ((numOfRows + schemaRow->nCols) > pOperator->resultInfo.capacity) { relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo); numOfRows = 0; + pInfo->restore = true; if (pInfo->pRes->info.rows > 0) { break; } + } else { + sysTableUserColsFillOneTableCols(pInfo, dbname, &numOfRows, dataBlock, tableName, schemaRow, typeName); } } - taosHashCleanup(stableSchema); - if (numOfRows > 0) { relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo); numOfRows = 0; @@ -695,7 +713,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { } if ((smrSuperTable.me.stbEntry.schemaTag.nCols + numOfRows) > pOperator->resultInfo.capacity) { - metaTbCursorPrev(pInfo->pCur); + metaTbCursorPrev(pInfo->pCur, TSDB_TABLE_MAX); blockFull = true; } else { sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &pInfo->pCur->mr, dbname, tableName, &numOfRows, @@ -1789,6 +1807,11 @@ void destroySysScanOperator(void* param) { pInfo->pIdx = NULL; } + if(pInfo->pSchema) { + taosHashCleanup(pInfo->pSchema); + pInfo->pSchema = NULL; + } + taosArrayDestroy(pInfo->matchInfo.pList); taosMemoryFreeClear(pInfo->pUser); diff --git a/source/libs/parser/src/parInsertSml.c b/source/libs/parser/src/parInsertSml.c index 3c2b6499a4b3da2075a725e5dee8a96d3382d7e6..106ee641af9b2c9688654cd1100b80a22c40612c 100644 --- a/source/libs/parser/src/parInsertSml.c +++ b/source/libs/parser/src/parInsertSml.c @@ -70,7 +70,7 @@ static int32_t smlBoundColumnData(SArray* cols, SBoundColInfo* pBoundInfo, SSche SToken sToken = {.n = kv->keyLen, .z = (char*)kv->key}; col_id_t t = lastColIdx + 1; col_id_t index = ((t == 0 && !isTag) ? 0 : insFindCol(&sToken, t, pBoundInfo->numOfCols, pSchema)); - uDebug("SML, index:%d, t:%d, ncols:%d", index, t, pBoundInfo->numOfCols); + uTrace("SML, index:%d, t:%d, ncols:%d", index, t, pBoundInfo->numOfCols); if (index < 0 && t > 0) { index = insFindCol(&sToken, 0, t, pSchema); } @@ -345,7 +345,7 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc } if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pColSchema->bytes - VARSTR_HEADER_SIZE, &len)) { if (errno == E2BIG) { - uError("sml bind taosMbsToUcs4 error, kv length:%d, bytes:%d", (int)kv->length, pColSchema->bytes); + uError("sml bind taosMbsToUcs4 error, kv length:%d, bytes:%d, kv->value:%s", (int)kv->length, pColSchema->bytes, kv->value); buildInvalidOperationMsg(&pBuf, "value too long"); ret = TSDB_CODE_PAR_VALUE_TOO_LONG; goto end; diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 89dd51a8924b3a24e70a665a3ae76e14541f6081..bd9ea058b4f911d5060c85b9303afb3d2cc5dbdb 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -21,7 +21,7 @@ #include "tjson.h" #include "tglobal.h" -#define LOG_MAX_LINE_SIZE (1024) +#define LOG_MAX_LINE_SIZE (10024) #define LOG_MAX_LINE_BUFFER_SIZE (LOG_MAX_LINE_SIZE + 3) #define LOG_MAX_LINE_DUMP_SIZE (1024 * 1024) #define LOG_MAX_LINE_DUMP_BUFFER_SIZE (LOG_MAX_LINE_DUMP_SIZE + 3) diff --git a/tests/pytest/util/common.py b/tests/pytest/util/common.py index 5b73989d6f22b25c056cba802328d8e3d583ef87..6d813a416644661bf83881b753afe0d55d97d5f2 100644 --- a/tests/pytest/util/common.py +++ b/tests/pytest/util/common.py @@ -739,6 +739,11 @@ class TDCom: else: os.system("unset LD_PRELOAD; pkill %s " % processorName) + def gen_tag_col_str(self, gen_type, data_type, count): + """ + gen multi tags or cols by gen_type + """ + return ','.join(map(lambda i: f'{gen_type}{i} {data_type}', range(count))) def is_json(msg): if isinstance(msg, str): @@ -775,4 +780,5 @@ def dict2toml(in_dict: dict, file:str): with open(file, 'w') as f: toml.dump(in_dict, f) + tdCom = TDCom() diff --git a/tests/system-test/0-others/information_schema.py b/tests/system-test/0-others/information_schema.py index 720eab74c42eba2dad40500c673d6a7427d3324b..3c4a71c3e4e47baf7c6503f77ea9b588eaebec83 100644 --- a/tests/system-test/0-others/information_schema.py +++ b/tests/system-test/0-others/information_schema.py @@ -93,7 +93,6 @@ class TDTestCase: tdSql.checkEqual(i[2],len(self.perf_list)) tdSql.execute('create table db1.ntb (ts timestamp,c0 int)') tdSql.query(f'select db_name, count(*) from information_schema.ins_tables group by db_name') - print(tdSql.queryResult) for i in tdSql.queryResult: if i[0].lower() == 'information_schema': tdSql.checkEqual(i[1],len(self.ins_list)) @@ -101,9 +100,77 @@ class TDTestCase: tdSql.checkEqual(i[1],len(self.perf_list)) elif i[0].lower() == self.dbname: tdSql.checkEqual(i[1],self.tbnum+1) + + + + def ins_col_check_4096(self): + tdSql.execute('create database db3 vgroups 2 replica 1') + col_str = tdCom.gen_tag_col_str("col", "int",4094) + tdSql.execute(f'create stable if not exists db3.stb (col_ts timestamp, {col_str}) tags (t1 int)') + for i in range(100): + tdLog.info(f"create table db3.ctb{i} using db3.stb tags({i})") + tdSql.execute(f"create table db3.ctb{i} using db3.stb tags({i})") + col_value_str = '1, ' * 4093 + '1' + tdSql.execute(f"insert into db3.ctb{i} values(now,{col_value_str})(now+1s,{col_value_str})(now+2s,{col_value_str})(now+3s,{col_value_str})") + tdSql.query("select * from information_schema.ins_columns") + + tdSql.execute('drop database db3') + def ins_stable_check(self): + tdSql.execute('create database db3 vgroups 2 replica 1') + tbnum = 10 + ctbnum = 10 + for i in range(tbnum): + tdSql.execute(f'create stable db3.stb_{i} (ts timestamp,c0 int) tags(t0 int)') + tdSql.execute(f'create table db3.ntb_{i} (ts timestamp,c0 int)') + for j in range(ctbnum): + tdSql.execute(f"create table db3.ctb_{i}_{j} using db3.stb_{i} tags({j})") + tdSql.query("select stable_name,count(table_name) from information_schema.ins_tables where db_name = 'db3' group by stable_name order by stable_name") + result = tdSql.queryResult + for i in range(len(result)): + if result[i][0] == None: + tdSql.checkEqual(result[0][1],tbnum) + else: + tdSql.checkEqual(result[i][0],f'stb_{i-1}') + tdSql.checkEqual(result[i][1],ctbnum) + + + + def ins_columns_check(self): + tdSql.execute('drop database if exists db2') + tdSql.execute('create database if not exists db2 vgroups 1 replica 1') + for i in range (5): + self.stb4096 = 'create table db2.stb%d (ts timestamp' % (i) + for j in range (4094 - i): + self.stb4096 += ', c%d int' % (j) + self.stb4096 += ') tags (t1 int)' + tdSql.execute(self.stb4096) + for k in range(10): + tdSql.execute("create table db2.ctb_%d_%dc using db2.stb%d tags(%d)" %(i,k,i,k)) + for t in range (2): + tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type=="SUPER_TABLE"') + tdSql.checkEqual(20465,len(tdSql.queryResult)) + for t in range (2): + tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type=="CHILD_TABLE"') + tdSql.checkEqual(204650,len(tdSql.queryResult)) + + for i in range (5): + self.ntb4096 = 'create table db2.ntb%d (ts timestamp' % (i) + for j in range (4095 - i): + self.ntb4096 += ', c%d binary(10)' % (j) + self.ntb4096 += ')' + tdSql.execute(self.ntb4096) + for t in range (2): + tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type=="NORMAL_TABLE"') + tdSql.checkEqual(20470,len(tdSql.queryResult)) + + def run(self): self.prepare_data() self.count_check() + self.ins_columns_check() + # self.ins_col_check_4096() + self.ins_stable_check() + def stop(self): tdSql.close() diff --git a/utils/test/c/tmqSim.c b/utils/test/c/tmqSim.c index cb7b5012982eea11c61596f3c60861676f368db8..69debe7ab50c0dff57cbc570cbb7643f3249521b 100644 --- a/utils/test/c/tmqSim.c +++ b/utils/test/c/tmqSim.c @@ -688,16 +688,17 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) { } static int32_t g_once_commit_flag = 0; -static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { - taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code); - if (0 == g_once_commit_flag) { - g_once_commit_flag = 1; - notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT); +static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { + taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code); + + if (0 == g_once_commit_flag) { + g_once_commit_flag = 1; + notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT); } - char tmpString[128]; - taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString)); + char tmpString[128]; + taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString)); } void build_consumer(SThreadInfo* pInfo) {