提交 ba6226c0 编写于 作者: wmmhello's avatar wmmhello

Merge branch 'main' of https://github.com/taosdata/TDengine into fix/TD023101

......@@ -147,9 +147,9 @@ typedef struct {
} SMonStbInfo;
typedef struct {
int32_t expire_time;
int64_t timeseries_used;
int64_t timeseries_total;
uint32_t expire_time;
int64_t timeseries_used;
int64_t timeseries_total;
} SMonGrantInfo;
typedef struct {
......
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>com.tdengine.taoskeeper</string>
<key>ProgramArguments</key>
<array>
<string>/usr/local/bin/taoskeeper</string>
</array>
<key>ProcessType</key>
<string>Interactive</string>
<key>Disabled</key>
<false/>
<key>RunAtLoad</key>
<false/>
<key>LaunchOnlyOnce</key>
<false/>
<key>SessionCreate</key>
<true/>
<key>ExitTimeOut</key>
<integer>600</integer>
<key>KeepAlive</key>
<dict>
<key>SuccessfulExit</key>
<false/>
<key>AfterInitialDemand</key>
<true/>
</dict>
<key>Program</key>
<string>/usr/local/bin/taoskeeper</string>
</dict>
</plist>
\ No newline at end of file
......@@ -53,7 +53,7 @@ if [ -d ${top_dir}/tools/taos-tools/packaging/deb ]; then
cd ${top_dir}/tools/taos-tools/packaging/deb
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
taostools_ver=$(git tag |grep -v taos | sort | tail -1)
taostools_ver=$(git for-each-ref --sort=taggerdate --format '%(tag)' refs/tags|grep -v taos | tail -1)
taostools_install_dir="${release_dir}/${clientName2}Tools-${taostools_ver}"
cd ${curr_dir}
......
......@@ -582,6 +582,11 @@ function install_service_on_launchctl() {
${csudo}cp ${install_main_dir}/service/com.taosdata.taosadapter.plist /Library/LaunchDaemons/com.taosdata.taosadapter.plist || :
${csudo}launchctl load -w /Library/LaunchDaemons/com.taosdata.taosadapter.plist || :
fi
if [ -f ${install_main_dir}/service/com.taosdata.taoskeeper.plist ]; then
${csudo}launchctl unload -w /Library/LaunchDaemons/com.taosdata.taoskeeper.plist > /dev/null 2>&1 || :
${csudo}cp ${install_main_dir}/service/com.taosdata.taoskeeper.plist /Library/LaunchDaemons/com.taosdata.taoskeeper.plist || :
${csudo}launchctl load -w /Library/LaunchDaemons/com.taosdata.taoskeeper.plist || :
fi
}
function install_taosadapter_service() {
......
......@@ -70,7 +70,7 @@ extern "C" {
#define VALUE_LEN 6
#define OTD_JSON_FIELDS_NUM 4
#define MAX_RETRY_TIMES 5
#define MAX_RETRY_TIMES 100
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;
typedef enum {
......
......@@ -543,7 +543,7 @@ void taos_init_imp(void) {
if (taosCreateLog("taoslog", 10, configDir, NULL, NULL, NULL, NULL, 1) != 0) {
// ignore create log failed, only print
printf(" WARING: Create taoslog failed. configDir=%s\n", configDir);
printf(" WARING: Create taoslog failed:%s. configDir=%s\n", strerror(errno), configDir);
}
if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 1) != 0) {
......
......@@ -750,6 +750,7 @@ end:
}
static int32_t smlModifyDBSchemas(SSmlHandle *info) {
uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas start, format:%d, needModifySchema:%d", info->id, info->dataFormat, info->needModifySchema);
if (info->dataFormat && !info->needModifySchema) {
return TSDB_CODE_SUCCESS;
}
......@@ -779,6 +780,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) {
uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas create table:%s", info->id, pName.tname);
SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField));
SArray *pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField));
code = smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true);
......@@ -818,6 +820,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
goto end;
}
if (action != SCHEMA_ACTION_NULL) {
uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas change table tag, table:%s, action:%d", info->id, pName.tname, action);
SArray *pColumns =
taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
SArray *pTags =
......@@ -869,6 +872,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
goto end;
}
if (action != SCHEMA_ACTION_NULL) {
uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas change table col, table:%s, action:%d", info->id, pName.tname, action);
SArray *pColumns =
taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
SArray *pTags =
......@@ -935,15 +939,19 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
}
sTableData->tableMeta = pTableMeta;
uDebug("SML:0x%" PRIx64 "modify schema uid:%" PRIu64 ", sversion:%d, tversion:%d", info->id, pTableMeta->uid, pTableMeta->sversion, pTableMeta->tversion)
tmp = (SSmlSTableMeta **)taosHashIterate(info->superTables, tmp);
}
uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas end success, format:%d, needModifySchema:%d", info->id, info->dataFormat, info->needModifySchema);
return 0;
end:
taosHashCleanup(hashTmp);
taosMemoryFreeClear(pTableMeta);
// catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
uError("SML:0x%" PRIx64 " smlModifyDBSchemas end failed:%d:%s, format:%d, needModifySchema:%d", info->id, code, tstrerror(code), info->dataFormat, info->needModifySchema);
return code;
}
......@@ -1019,8 +1027,9 @@ static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols
} else {
size_t tmp = taosArrayGetSize(metaArray);
if (tmp > INT16_MAX) {
smlBuildInvalidDataMsg(msg, "too many cols or tags", kv->key);
uError("too many cols or tags");
return -1;
return TSDB_CODE_SML_INVALID_DATA;
}
int16_t size = tmp;
int ret = taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES);
......@@ -1170,6 +1179,7 @@ static int32_t smlPushCols(SArray *colsArray, SArray *cols) {
}
static int32_t smlParseLineBottom(SSmlHandle *info) {
uDebug("SML:0x%" PRIx64 " smlParseLineBottom start, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum);
if (info->dataFormat) return TSDB_CODE_SUCCESS;
for (int32_t i = 0; i < info->lineNum; i++) {
......@@ -1212,6 +1222,7 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
SSmlSTableMeta **tableMeta =
(SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
if (tableMeta) { // update meta
uDebug("SML:0x%" PRIx64 " smlParseLineBottom update meta, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum);
ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, elements->colArray, false, &info->msgBuf);
if (ret == TSDB_CODE_SUCCESS) {
ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, tinfo->tags, true, &info->msgBuf);
......@@ -1226,7 +1237,7 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
// uError("SML:0x%" PRIx64 " smlUpdateMeta failed", info->id);
// return ret;
// }
uDebug("SML:0x%" PRIx64 " smlParseLineBottom add meta, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum);
SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat);
smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags);
if(terrno == TSDB_CODE_DUP_KEY){return terrno;}
......@@ -1234,12 +1245,14 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES);
}
}
uDebug("SML:0x%" PRIx64 " smlParseLineBottom end, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum);
return TSDB_CODE_SUCCESS;
}
static int32_t smlInsertData(SSmlHandle *info) {
int32_t code = TSDB_CODE_SUCCESS;
uDebug("SML:0x%" PRIx64 " smlInsertData start, format:%d", info->id, info->dataFormat);
if(info->pRequest->dbList == NULL){
info->pRequest->dbList = taosArrayInit(1, TSDB_DB_FNAME_LEN);
......@@ -1284,6 +1297,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
// use tablemeta of stable to save vgid and uid of child table
(*pMeta)->tableMeta->vgId = vg.vgId;
(*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid
uDebug("SML:0x%" PRIx64 " smlInsertData table:%s, uid:%" PRIu64 ", format:%d", info->id, pName.tname, tableData->uid, info->dataFormat);
code = smlBindData(info->pQuery, info->dataFormat, tableData->tags, (*pMeta)->cols, tableData->cols,
(*pMeta)->tableMeta, tableData->childTableName, tableData->sTableName, tableData->sTableNameLen,
......@@ -1306,16 +1320,18 @@ static int32_t smlInsertData(SSmlHandle *info) {
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
launchQueryImpl(info->pRequest, info->pQuery, true, NULL);
uDebug("SML:0x%" PRIx64 " smlInsertData end, format:%d, code:%d,%s", info->id, info->dataFormat, info->pRequest->code, tstrerror(info->pRequest->code));
return info->pRequest->code;
}
static void smlPrintStatisticInfo(SSmlHandle *info) {
uDebug(
"SML:0x%" PRIx64
" smlInsertLines result, code:%d,lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \
" smlInsertLines result, code:%d, msg:%s, lineNum:%d,stable num:%d,ctable num:%d,create stable num:%d,alter stable tag num:%d,alter stable col num:%d \
parse cost:%" PRId64 ",schema cost:%" PRId64 ",bind cost:%" PRId64 ",rpc cost:%" PRId64 ",total cost:%" PRId64
"",
info->id, info->cost.code, info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables,
info->id, info->cost.code, tstrerror(info->cost.code), info->cost.lineNum, info->cost.numOfSTables, info->cost.numOfCTables,
info->cost.numOfCreateSTables, info->cost.numOfAlterTagSTables, info->cost.numOfAlterColSTables,
info->cost.schemaTime - info->cost.parseTime, info->cost.insertBindTime - info->cost.schemaTime,
info->cost.insertRpcTime - info->cost.insertBindTime, info->cost.endTime - info->cost.insertRpcTime,
......@@ -1360,6 +1376,7 @@ int32_t smlClearForRerun(SSmlHandle *info) {
}
static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
uDebug("SML:0x%" PRIx64 " smlParseLine start", info->id);
int32_t code = TSDB_CODE_SUCCESS;
if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
if (lines) {
......@@ -1395,8 +1412,16 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
}
}
uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, len:%d, sql:%s", info->id, info->isRawLine, len,
(info->isRawLine ? "rawdata" : tmp));
char cTmp = 0; // for print tmp if is raw
if(info->isRawLine){
cTmp = tmp[len - 1];
tmp[len - 1] = '\0';
}
uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, numLines:%d, protocol:%d, len:%d, sql:%s", info->id, info->isRawLine, numLines, info->protocol, len, tmp);
if(info->isRawLine){
tmp[len - 1] = cTmp;
}
if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
if (info->dataFormat) {
......@@ -1421,6 +1446,7 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
return code;
}
if (info->reRun) {
uDebug("SML:0x%" PRIx64 " smlParseLine re run", info->id);
i = 0;
rawLine = oldRaw;
code = smlClearForRerun(info);
......@@ -1431,6 +1457,7 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
}
i++;
}
uDebug("SML:0x%" PRIx64 " smlParseLine end", info->id);
return code;
}
......@@ -1461,7 +1488,8 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL
do {
code = smlModifyDBSchemas(info);
if (code == 0) break;
taosMsleep(200);
taosMsleep(500);
uInfo("SML:0x%" PRIx64 " smlModifyDBSchemas retry code:%s, times:%d", info->id, tstrerror(code), retryNum);
} while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES);
if (code != 0) {
......@@ -1488,6 +1516,7 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine,
}
SRequestObj *request = NULL;
SSmlHandle *info = NULL;
int cnt = 0;
while(1){
request = (SRequestObj *)createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid);
if (request == NULL) {
......@@ -1542,16 +1571,22 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine,
request->code = code;
info->cost.endTime = taosGetTimestampUs();
info->cost.code = code;
smlPrintStatisticInfo(info);
if(code == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || code == TSDB_CODE_SDB_OBJ_CREATING){
if(code == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || code == TSDB_CODE_SDB_OBJ_CREATING
|| code == TSDB_CODE_PAR_VALUE_TOO_LONG || code == TSDB_CODE_MND_TRANS_CONFLICT){
if(cnt++ >= 10){
uInfo("SML:%"PRIx64" retry:%d/10 end code:%d, msg:%s", info->id, cnt, code, tstrerror(code));
break;
}
taosMsleep(100);
refreshMeta(request->pTscObj, request);
uInfo("SML:%"PRIx64" ver is old retry or object is creating code:%d", info->id, code);
uInfo("SML:%"PRIx64" retry:%d/10,ver is old retry or object is creating code:%d, msg:%s", info->id, cnt, code, tstrerror(code));
smlDestroyInfo(info);
info = NULL;
taos_free_result(request);
request = NULL;
continue;
}
smlPrintStatisticInfo(info);
break;
}
......
......@@ -583,12 +583,14 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
.i = ts,
.length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
if (info->dataFormat) {
uDebug("SML:0x%" PRIx64 " smlParseInfluxString format true, ts:%" PRId64, info->id, ts);
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0);
if(ret != TSDB_CODE_SUCCESS){return ret;}
ret = smlBuildRow(info->currTableDataCtx);
if(ret != TSDB_CODE_SUCCESS){return ret;}
clearColValArray(info->currTableDataCtx->pValues);
} else {
uDebug("SML:0x%" PRIx64 " smlParseInfluxString format false, ts:%" PRId64, info->id, ts);
taosArraySet(elements->colArray, 0, &kv);
}
info->preLine = *elements;
......
......@@ -24,7 +24,7 @@
#include "tref.h"
#include "ttimer.h"
#define EMPTY_BLOCK_POLL_IDLE_DURATION 100
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10
#define DEFAULT_AUTO_COMMIT_INTERVAL 5000
struct SMqMgmt {
......@@ -148,7 +148,8 @@ typedef struct {
typedef struct {
int8_t tmqRspType;
int32_t epoch;
int32_t epoch; // epoch can be used to guard the vgHandle
int32_t vgId;
SMqClientVg* vgHandle;
SMqClientTopic* topicHandle;
uint64_t reqId;
......@@ -1329,6 +1330,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
pRspWrapper->topicHandle = pTopic;
pRspWrapper->reqId = requestId;
pRspWrapper->pEpset = pMsg->pEpSet;
pRspWrapper->vgId = pVg->vgId;
pMsg->pEpSet = NULL;
if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
......@@ -1747,7 +1749,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
for (int j = 0; j < numOfVg; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 100ms
tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 100ms before start next poll", tmq->consumerId, tmq->epoch,
tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, tmq->epoch,
pVg->vgId);
continue;
}
......@@ -1864,9 +1866,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return pRsp;
}
} else {
SMqClientVg* pVg = pollRspWrapper->vgHandle;
tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pVg->vgId, pDataRsp->head.epoch, consumerEpoch);
tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch);
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper);
}
......@@ -1886,7 +1887,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return pRsp;
} else {
tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->vgHandle->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper);
}
......@@ -1933,7 +1934,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
} else {
tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->vgHandle->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper);
}
......@@ -1955,7 +1956,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
void* rspObj;
int64_t startTime = taosGetTimestampMs();
tscDebug("consumer:0x%" PRIx64 " start to poll at %" PRId64, tmq->consumerId, startTime);
tscDebug("consumer:0x%" PRIx64 " start to poll at %"PRId64", timeout:%" PRId64, tmq->consumerId, startTime, timeout);
#if 0
tmqHandleAllDelayedTask(tmq);
......@@ -2017,37 +2018,43 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
}
}
int32_t tmq_consumer_close(tmq_t* tmq) {
tscDebug("consumer:0x%" PRIx64" start to close consumer, status:%d", tmq->consumerId, tmq->status);
static void displayConsumeStatistics(const tmq_t* pTmq) {
int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics);
tscDebug("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d",
pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch);
if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
int32_t rsp = tmq_commit_sync(tmq, NULL);
if (rsp != 0) {
return rsp;
tscDebug("consumer:0x%" PRIx64 " rows dist begin: ", pTmq->consumerId);
for (int32_t i = 0; i < numOfTopics; ++i) {
SMqClientTopic* pTopics = taosArrayGet(pTmq->clientTopics, i);
tscDebug("consumer:0x%" PRIx64 " topic:%d", pTmq->consumerId, i);
int32_t numOfVgs = taosArrayGetSize(pTopics->vgs);
for (int32_t j = 0; j < numOfVgs; ++j) {
SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j);
tscDebug("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows);
}
}
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
tscDebug("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d",
tmq->consumerId, tmq->pollCnt, tmq->totalRows, numOfTopics, tmq->epoch);
tscDebug("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId);
}
tscDebug("consumer:0x%" PRIx64 " rows dist begin: ", tmq->consumerId);
for (int32_t i = 0; i < numOfTopics; ++i) {
SMqClientTopic* pTopics = taosArrayGet(tmq->clientTopics, i);
int32_t tmq_consumer_close(tmq_t* tmq) {
tscDebug("consumer:0x%" PRIx64" start to close consumer, status:%d", tmq->consumerId, tmq->status);
displayConsumeStatistics(tmq);
tscDebug("consumer:0x%" PRIx64 " topic:%d", tmq->consumerId, i);
int32_t numOfVgs = taosArrayGetSize(pTopics->vgs);
for (int32_t j = 0; j < numOfVgs; ++j) {
SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j);
tscDebug("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows);
if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
// if auto commit is set, commit before close consumer. Otherwise, do nothing.
if (tmq->autoCommit) {
int32_t rsp = tmq_commit_sync(tmq, NULL);
if (rsp != 0) {
return rsp;
}
}
tscDebug("consumer:0x%" PRIx64 " rows dist end", tmq->consumerId);
int32_t retryCnt = 0;
tmq_list_t* lst = tmq_list_new();
while (1) {
rsp = tmq_subscribe(tmq, lst);
int32_t rsp = tmq_subscribe(tmq, lst);
if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
break;
} else {
......@@ -2057,6 +2064,8 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
}
tmq_list_destroy(lst);
} else {
tscWarn("consumer:0x%" PRIx64" not in ready state, close it directly", tmq->consumerId);
}
taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
......
......@@ -1238,13 +1238,13 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi
}
if (taosLoadCfg(pCfg, envCmd, cfgDir, envFile, apolloUrl) != 0) {
uError("failed to load cfg since %s", terrstr());
printf("failed to load cfg since %s", terrstr());
cfgCleanup(pCfg);
return -1;
}
if (cfgLoadFromArray(pCfg, pArgs) != 0) {
uError("failed to load cfg from array since %s", terrstr());
printf("failed to load cfg from array since %s", terrstr());
cfgCleanup(pCfg);
return -1;
}
......@@ -1260,13 +1260,13 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi
if (taosMulModeMkDir(tsLogDir, 0777) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to create dir:%s since %s", tsLogDir, terrstr());
printf("failed to create dir:%s since %s", tsLogDir, terrstr());
cfgCleanup(pCfg);
return -1;
}
if (taosInitLog(logname, logFileNum) != 0) {
uError("failed to init log file since %s", terrstr());
printf("failed to init log file since %s", terrstr());
cfgCleanup(pCfg);
return -1;
}
......
......@@ -457,6 +457,7 @@ typedef struct {
void* pIter;
SMnode* pMnode;
STableMetaRsp* pMeta;
bool restore;
bool sysDbRsp;
char db[TSDB_DB_FNAME_LEN];
char filterTb[TSDB_TABLE_NAME_LEN];
......
......@@ -864,7 +864,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
}
// grant info
pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f;
pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 1000;
pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
if (pMnode->grant.expireTimeMS == 0) {
pGrantInfo->expire_time = INT32_MAX;
......
......@@ -324,7 +324,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
pReq->info.rsp = pRsp;
pReq->info.rspLen = size;
if (rowsRead == 0 || rowsRead < rowsToRead) {
if (rowsRead == 0 || ((rowsRead < rowsToRead) && !pShow->restore)) {
pRsp->completed = 1;
mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
mndReleaseShowObj(pShow, true);
......
......@@ -3113,9 +3113,18 @@ static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(typeName, "SUPER_TABLE");
bool fetch = pShow->restore ? false : true;
pShow->restore = false;
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
if (pShow->pIter == NULL) break;
if (fetch) {
pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
if (pShow->pIter == NULL) break;
} else {
fetch = true;
void *pKey = taosHashGetKey(pShow->pIter, NULL);
pStb = sdbAcquire(pSdb, SDB_STB, pKey);
if (!pStb) continue;
}
if (pDb != NULL && pStb->dbUid != pDb->uid) {
sdbRelease(pSdb, pStb);
......@@ -3129,6 +3138,17 @@ static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
sdbRelease(pSdb, pStb);
continue;
}
if ((numOfRows + pStb->numOfColumns) > rows) {
pShow->restore = true;
if (numOfRows == 0) {
mError("mndRetrieveStbCol failed to get stable cols since buf:%d less than result:%d, stable name:%s, db:%s",
rows, pStb->numOfColumns, pStb->name, pStb->db);
}
sdbRelease(pSdb, pStb);
break;
}
varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
mDebug("mndRetrieveStbCol get stable cols, stable name:%s, db:%s", pStb->name, pStb->db);
......
......@@ -157,7 +157,7 @@ typedef struct SMTbCursor SMTbCursor;
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
void metaCloseTbCursor(SMTbCursor *pTbCur);
int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType);
int32_t metaTbCursorPrev(SMTbCursor *pTbCur);
int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType);
#endif
......
......@@ -141,7 +141,7 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
// tqRead
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset);
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum, uint64_t reqId);
// tqExec
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp);
......
......@@ -336,7 +336,7 @@ int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType) {
return 0;
}
int32_t metaTbCursorPrev(SMTbCursor *pTbCur) {
int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType) {
int ret;
void *pBuf;
STbCfg tbCfg;
......@@ -350,7 +350,7 @@ int32_t metaTbCursorPrev(SMTbCursor *pTbCur) {
tDecoderClear(&pTbCur->mr.coder);
metaGetTableEntryByVersion(&pTbCur->mr, ((SUidIdxVal *)pTbCur->pVal)[0].version, *(tb_uid_t *)pTbCur->pKey);
if (pTbCur->mr.me.type == TSDB_SUPER_TABLE) {
if (pTbCur->mr.me.type == jumpTableType) {
continue;
}
......
......@@ -309,70 +309,6 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
return 0;
}
//int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const STaosxRsp* pRsp) {
//#if 0
// A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
// A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
//
// if (pRsp->withSchema) {
// A(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum);
// } else {
// A(taosArrayGetSize(pRsp->blockSchema) == 0);
// }
//
// if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
// if (pRsp->blockNum > 0) {
// A(pRsp->rspOffset.version > pRsp->reqOffset.version);
// } else {
// A(pRsp->rspOffset.version >= pRsp->reqOffset.version);
// }
// }
//#endif
//
// int32_t len = 0;
// int32_t code = 0;
// tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
// if (code < 0) {
// return -1;
// }
//
// int32_t tlen = sizeof(SMqRspHead) + len;
// void* buf = rpcMallocCont(tlen);
// if (buf == NULL) {
// terrno = TSDB_CODE_OUT_OF_MEMORY;
// return -1;
// }
//
// ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__TAOSX_RSP;
// ((SMqRspHead*)buf)->epoch = pReq->epoch;
// ((SMqRspHead*)buf)->consumerId = pReq->consumerId;
//
// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
//
// SEncoder encoder = {0};
// tEncoderInit(&encoder, abuf, len);
// tEncodeSTaosxRsp(&encoder, pRsp);
// tEncoderClear(&encoder);
//
// SRpcMsg rsp = {
// .info = pMsg->info,
// .pCont = buf,
// .contLen = tlen,
// .code = 0,
// };
//
// tmsgSendRsp(&rsp);
//
// char buf1[80] = {0};
// char buf2[80] = {0};
// tFormatOffset(buf1, 80, &pRsp->reqOffset);
// tFormatOffset(buf2, 80, &pRsp->rspOffset);
//
// tqDebug("taosx rsp, vgId:%d, consumer:0x%" PRIx64 " (epoch %d) send rsp, numOfBlks:%d, req:%s, rsp:%s",
// TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
// return 0;
//}
static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
pLeft->val.version <= pRight->val.version;
......@@ -615,9 +551,9 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
// NOTE: this pHandle->consumerId may have been changed already.
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64
", ts:%" PRId64,
", ts:%" PRId64", reqId:0x%"PRIx64,
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid,
dataRsp.rspOffset.ts);
dataRsp.rspOffset.ts, pRequest->reqId);
tDeleteSMqDataRsp(&dataRsp);
return code;
......@@ -637,9 +573,9 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
if (metaRsp.metaRspLen > 0) {
code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp);
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64
",version:%" PRId64,
",ts:%" PRId64,
consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
metaRsp.rspOffset.version);
metaRsp.rspOffset.ts);
taosMemoryFree(metaRsp.metaRsp);
tDeleteSTaosxRsp(&taosxRsp);
return code;
......@@ -680,17 +616,12 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
break;
}
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) {
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
// if (terrno == 0) { // failed to seek to given ver, but no errors happen.
// code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, (SMqDataRsp*) &taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
// return code;
// } else { // error happens, return to consumers
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
tDeleteSTaosxRsp(&taosxRsp);
taosMemoryFreeClear(pCkHead);
return code;
// }
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
tDeleteSTaosxRsp(&taosxRsp);
taosMemoryFreeClear(pCkHead);
return code;
}
SWalCont* pHead = &pCkHead->head;
......
......@@ -183,23 +183,24 @@ end:
return tbSuid == realTbSuid;
}
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) {
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead, uint64_t reqId) {
int32_t code = 0;
int32_t vgId = TD_VID(pTq->pVnode);
taosThreadMutexLock(&pHandle->pWalReader->mutex);
int64_t offset = *fetchOffset;
while (1) {
if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) {
tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return",
pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode), offset);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return, reqId:0x%"PRIx64,
pHandle->consumerId, pHandle->epoch, vgId, offset, reqId);
*fetchOffset = offset - 1;
code = -1;
goto END;
}
tqDebug("vgId:%d, taosx get msg ver %" PRId64 ", type: %s", pTq->pVnode->config.vgId, offset,
TMSG_INFO((*ppCkHead)->head.msgType));
tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, reqId:0x%" PRIx64, vgId,
pHandle->consumerId, offset, TMSG_INFO((*ppCkHead)->head.msgType), reqId);
if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) {
code = walFetchBody(pHandle->pWalReader, ppCkHead);
......
......@@ -134,7 +134,6 @@ static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* id
// all queried tables have been dropped already, return immediately.
if (p->pSchema == NULL) {
taosMemoryFree(p);
tsdbWarn("all queried tables has been dropped, try next group, %s", idstr);
return TSDB_CODE_PAR_TABLE_NOT_EXIST;
}
......
......@@ -922,7 +922,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
pBlockNum->numOfBlocks += 1;
}
if ((pScanInfo->pBlockList != NULL) && (taosArrayGetSize(pScanInfo->pBlockList) > 0)) {
if (taosArrayGetSize(pScanInfo->pBlockList) > 0) {
numOfQTable += 1;
}
}
......@@ -4220,12 +4220,8 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) {
if (pStatus->loadFromFile) {
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
if (pBlockInfo != NULL) {
pBlockScanInfo =
*(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
pBlockScanInfo = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, pReader->idStr);
if (pBlockScanInfo == NULL) {
code = TSDB_CODE_INVALID_PARA;
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
goto _err;
}
} else {
......
......@@ -1194,6 +1194,8 @@ static int32_t getPreSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs,
if (code != TSDB_CODE_SUCCESS) {
SET_SESSION_WIN_KEY_INVALID(pKey);
}
taosMemoryFree(pCur);
return code;
}
......
......@@ -57,9 +57,11 @@ typedef struct SSysTableScanInfo {
const char* pUser;
bool sysInfo;
bool showRewrite;
bool restore;
SNode* pCondition; // db_name filter condition, to discard data that are not in current database
SMTbCursor* pCur; // cursor for iterate the local table meta store.
SSysTableIndex* pIdx; // idx for local table meta
SHashObj* pSchema;
SColMatchInfo matchInfo;
SName name;
SSDataBlock* pRes;
......@@ -514,9 +516,23 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
pInfo->pCur = metaOpenTbCursor(pInfo->readHandle.meta);
}
SHashObj* stableSchema = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
taosHashSetFreeFp(stableSchema, tDeleteSSchemaWrapperForHash);
while ((ret = metaTbCursorNext(pInfo->pCur, TSDB_TABLE_MAX)) == 0) {
if (pInfo->pSchema == NULL) {
pInfo->pSchema = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
taosHashSetFreeFp(pInfo->pSchema, tDeleteSSchemaWrapperForHash);
}
if (!pInfo->pCur || !pInfo->pSchema) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
qError("sysTableScanUserCols failed since %s", terrstr(terrno));
blockDataDestroy(dataBlock);
pInfo->loadInfo.totalRows = 0;
return NULL;
}
int32_t restore = pInfo->restore;
pInfo->restore = false;
while (restore || ((ret = metaTbCursorNext(pInfo->pCur, TSDB_TABLE_MAX)) == 0)) {
if (restore) restore = false;
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
......@@ -524,33 +540,36 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
if (pInfo->pCur->mr.me.type == TSDB_SUPER_TABLE) {
qDebug("sysTableScanUserCols cursor get super table");
void* schema = taosHashGet(stableSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t));
void* schema = taosHashGet(pInfo->pSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t));
if (schema == NULL) {
SSchemaWrapper* schemaWrapper = tCloneSSchemaWrapper(&pInfo->pCur->mr.me.stbEntry.schemaRow);
taosHashPut(stableSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES);
taosHashPut(pInfo->pSchema, &pInfo->pCur->mr.me.uid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES);
}
continue;
} else if (pInfo->pCur->mr.me.type == TSDB_CHILD_TABLE) {
qDebug("sysTableScanUserCols cursor get child table");
STR_TO_VARSTR(typeName, "CHILD_TABLE");
STR_TO_VARSTR(tableName, pInfo->pCur->mr.me.name);
int64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
void* schema = taosHashGet(stableSchema, &pInfo->pCur->mr.me.ctbEntry.suid, sizeof(int64_t));
void* schema = taosHashGet(pInfo->pSchema, &pInfo->pCur->mr.me.ctbEntry.suid, sizeof(int64_t));
if (schema != NULL) {
schemaRow = *(SSchemaWrapper**)schema;
} else {
tDecoderClear(&pInfo->pCur->mr.coder);
int code = metaGetTableEntryByUid(&pInfo->pCur->mr, suid);
SMetaReader smrSuperTable = {0};
metaReaderInit(&smrSuperTable, pInfo->readHandle.meta, 0);
int code = metaGetTableEntryByUid(&smrSuperTable, suid);
if (code != TSDB_CODE_SUCCESS) {
// terrno has been set by metaGetTableEntryByName, therefore, return directly
qError("sysTableScanUserCols get meta by suid:%" PRId64 " error, code:%d", suid, code);
metaReaderClear(&smrSuperTable);
blockDataDestroy(dataBlock);
pInfo->loadInfo.totalRows = 0;
taosHashCleanup(stableSchema);
return NULL;
}
schemaRow = &pInfo->pCur->mr.me.stbEntry.schemaRow;
SSchemaWrapper* schemaWrapper = tCloneSSchemaWrapper(&smrSuperTable.me.stbEntry.schemaRow);
taosHashPut(pInfo->pSchema, &suid, sizeof(int64_t), &schemaWrapper, POINTER_BYTES);
schemaRow = schemaWrapper;
metaReaderClear(&smrSuperTable);
}
} else if (pInfo->pCur->mr.me.type == TSDB_NORMAL_TABLE) {
qDebug("sysTableScanUserCols cursor get normal table");
......@@ -562,20 +581,19 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
continue;
}
sysTableUserColsFillOneTableCols(pInfo, dbname, &numOfRows, dataBlock, tableName, schemaRow, typeName);
if (numOfRows >= pOperator->resultInfo.capacity) {
if ((numOfRows + schemaRow->nCols) > pOperator->resultInfo.capacity) {
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
numOfRows = 0;
pInfo->restore = true;
if (pInfo->pRes->info.rows > 0) {
break;
}
} else {
sysTableUserColsFillOneTableCols(pInfo, dbname, &numOfRows, dataBlock, tableName, schemaRow, typeName);
}
}
taosHashCleanup(stableSchema);
if (numOfRows > 0) {
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
numOfRows = 0;
......@@ -695,7 +713,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
}
if ((smrSuperTable.me.stbEntry.schemaTag.nCols + numOfRows) > pOperator->resultInfo.capacity) {
metaTbCursorPrev(pInfo->pCur);
metaTbCursorPrev(pInfo->pCur, TSDB_TABLE_MAX);
blockFull = true;
} else {
sysTableUserTagsFillOneTableTags(pInfo, &smrSuperTable, &pInfo->pCur->mr, dbname, tableName, &numOfRows,
......@@ -1789,6 +1807,11 @@ void destroySysScanOperator(void* param) {
pInfo->pIdx = NULL;
}
if(pInfo->pSchema) {
taosHashCleanup(pInfo->pSchema);
pInfo->pSchema = NULL;
}
taosArrayDestroy(pInfo->matchInfo.pList);
taosMemoryFreeClear(pInfo->pUser);
......
......@@ -70,7 +70,7 @@ static int32_t smlBoundColumnData(SArray* cols, SBoundColInfo* pBoundInfo, SSche
SToken sToken = {.n = kv->keyLen, .z = (char*)kv->key};
col_id_t t = lastColIdx + 1;
col_id_t index = ((t == 0 && !isTag) ? 0 : insFindCol(&sToken, t, pBoundInfo->numOfCols, pSchema));
uDebug("SML, index:%d, t:%d, ncols:%d", index, t, pBoundInfo->numOfCols);
uTrace("SML, index:%d, t:%d, ncols:%d", index, t, pBoundInfo->numOfCols);
if (index < 0 && t > 0) {
index = insFindCol(&sToken, 0, t, pSchema);
}
......@@ -345,7 +345,7 @@ int32_t smlBindData(SQuery* query, bool dataFormat, SArray* tags, SArray* colsSc
}
if (!taosMbsToUcs4(kv->value, kv->length, (TdUcs4*)pUcs4, pColSchema->bytes - VARSTR_HEADER_SIZE, &len)) {
if (errno == E2BIG) {
uError("sml bind taosMbsToUcs4 error, kv length:%d, bytes:%d", (int)kv->length, pColSchema->bytes);
uError("sml bind taosMbsToUcs4 error, kv length:%d, bytes:%d, kv->value:%s", (int)kv->length, pColSchema->bytes, kv->value);
buildInvalidOperationMsg(&pBuf, "value too long");
ret = TSDB_CODE_PAR_VALUE_TOO_LONG;
goto end;
......
......@@ -21,7 +21,7 @@
#include "tjson.h"
#include "tglobal.h"
#define LOG_MAX_LINE_SIZE (1024)
#define LOG_MAX_LINE_SIZE (10024)
#define LOG_MAX_LINE_BUFFER_SIZE (LOG_MAX_LINE_SIZE + 3)
#define LOG_MAX_LINE_DUMP_SIZE (1024 * 1024)
#define LOG_MAX_LINE_DUMP_BUFFER_SIZE (LOG_MAX_LINE_DUMP_SIZE + 3)
......
......@@ -739,6 +739,11 @@ class TDCom:
else:
os.system("unset LD_PRELOAD; pkill %s " % processorName)
def gen_tag_col_str(self, gen_type, data_type, count):
"""
gen multi tags or cols by gen_type
"""
return ','.join(map(lambda i: f'{gen_type}{i} {data_type}', range(count)))
def is_json(msg):
if isinstance(msg, str):
......@@ -775,4 +780,5 @@ def dict2toml(in_dict: dict, file:str):
with open(file, 'w') as f:
toml.dump(in_dict, f)
tdCom = TDCom()
......@@ -93,7 +93,6 @@ class TDTestCase:
tdSql.checkEqual(i[2],len(self.perf_list))
tdSql.execute('create table db1.ntb (ts timestamp,c0 int)')
tdSql.query(f'select db_name, count(*) from information_schema.ins_tables group by db_name')
print(tdSql.queryResult)
for i in tdSql.queryResult:
if i[0].lower() == 'information_schema':
tdSql.checkEqual(i[1],len(self.ins_list))
......@@ -101,9 +100,77 @@ class TDTestCase:
tdSql.checkEqual(i[1],len(self.perf_list))
elif i[0].lower() == self.dbname:
tdSql.checkEqual(i[1],self.tbnum+1)
def ins_col_check_4096(self):
tdSql.execute('create database db3 vgroups 2 replica 1')
col_str = tdCom.gen_tag_col_str("col", "int",4094)
tdSql.execute(f'create stable if not exists db3.stb (col_ts timestamp, {col_str}) tags (t1 int)')
for i in range(100):
tdLog.info(f"create table db3.ctb{i} using db3.stb tags({i})")
tdSql.execute(f"create table db3.ctb{i} using db3.stb tags({i})")
col_value_str = '1, ' * 4093 + '1'
tdSql.execute(f"insert into db3.ctb{i} values(now,{col_value_str})(now+1s,{col_value_str})(now+2s,{col_value_str})(now+3s,{col_value_str})")
tdSql.query("select * from information_schema.ins_columns")
tdSql.execute('drop database db3')
def ins_stable_check(self):
tdSql.execute('create database db3 vgroups 2 replica 1')
tbnum = 10
ctbnum = 10
for i in range(tbnum):
tdSql.execute(f'create stable db3.stb_{i} (ts timestamp,c0 int) tags(t0 int)')
tdSql.execute(f'create table db3.ntb_{i} (ts timestamp,c0 int)')
for j in range(ctbnum):
tdSql.execute(f"create table db3.ctb_{i}_{j} using db3.stb_{i} tags({j})")
tdSql.query("select stable_name,count(table_name) from information_schema.ins_tables where db_name = 'db3' group by stable_name order by stable_name")
result = tdSql.queryResult
for i in range(len(result)):
if result[i][0] == None:
tdSql.checkEqual(result[0][1],tbnum)
else:
tdSql.checkEqual(result[i][0],f'stb_{i-1}')
tdSql.checkEqual(result[i][1],ctbnum)
def ins_columns_check(self):
tdSql.execute('drop database if exists db2')
tdSql.execute('create database if not exists db2 vgroups 1 replica 1')
for i in range (5):
self.stb4096 = 'create table db2.stb%d (ts timestamp' % (i)
for j in range (4094 - i):
self.stb4096 += ', c%d int' % (j)
self.stb4096 += ') tags (t1 int)'
tdSql.execute(self.stb4096)
for k in range(10):
tdSql.execute("create table db2.ctb_%d_%dc using db2.stb%d tags(%d)" %(i,k,i,k))
for t in range (2):
tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type=="SUPER_TABLE"')
tdSql.checkEqual(20465,len(tdSql.queryResult))
for t in range (2):
tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type=="CHILD_TABLE"')
tdSql.checkEqual(204650,len(tdSql.queryResult))
for i in range (5):
self.ntb4096 = 'create table db2.ntb%d (ts timestamp' % (i)
for j in range (4095 - i):
self.ntb4096 += ', c%d binary(10)' % (j)
self.ntb4096 += ')'
tdSql.execute(self.ntb4096)
for t in range (2):
tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type=="NORMAL_TABLE"')
tdSql.checkEqual(20470,len(tdSql.queryResult))
def run(self):
self.prepare_data()
self.count_check()
self.ins_columns_check()
# self.ins_col_check_4096()
self.ins_stable_check()
def stop(self):
tdSql.close()
......
......@@ -688,16 +688,17 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
}
static int32_t g_once_commit_flag = 0;
static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code);
if (0 == g_once_commit_flag) {
g_once_commit_flag = 1;
notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT);
static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code);
if (0 == g_once_commit_flag) {
g_once_commit_flag = 1;
notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT);
}
char tmpString[128];
taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString));
char tmpString[128];
taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString));
}
void build_consumer(SThreadInfo* pInfo) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册