提交 706cba79 编写于 作者: X Xiaoyu Wang

Merge remote-tracking branch 'origin/main' into fix/main_bugfix_wxy

......@@ -15,6 +15,7 @@ target_sources(
"src/vnd/vnodeSync.c"
"src/vnd/vnodeSnapshot.c"
"src/vnd/vnodeCompact.c"
"src/vnd/vnodeRetention.c"
# meta
"src/meta/metaOpen.c"
......
......@@ -180,7 +180,6 @@ int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo);
int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo);
int32_t tsdbFinishCommit(STsdb* pTsdb);
int32_t tsdbRollbackCommit(STsdb* pTsdb);
int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now);
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq2* pMsg, SSubmitRsp2* pRsp);
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows);
......
......@@ -595,6 +595,7 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq2 *pMsg, STbUidStore *pStore) {
return 0;
}
#if 0
/**
* @brief retention of rsma1/rsma2
*
......@@ -618,6 +619,7 @@ int32_t smaDoRetention(SSma *pSma, int64_t now) {
_end:
return code;
}
#endif
static void tdBlockDataDestroy(SArray *pBlockArr) {
for (int32_t i = 0; i < taosArrayGetSize(pBlockArr); ++i) {
......
......@@ -57,32 +57,6 @@ typedef struct {
SBlockData sData;
} STsdbCompactor;
static int32_t tsdbCommitCompact(STsdbCompactor *pCompactor) {
int32_t code = 0;
int32_t lino = 0;
STsdb *pTsdb = pCompactor->pTsdb;
code = tsdbFSPrepareCommit(pTsdb, &pCompactor->fs);
TSDB_CHECK_CODE(code, lino, _exit);
taosThreadRwlockWrlock(&pTsdb->rwLock);
code = tsdbFSCommit(pTsdb);
if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosThreadRwlockUnlock(&pTsdb->rwLock);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
static int32_t tsdbAbortCompact(STsdbCompactor *pCompactor) {
int32_t code = 0;
int32_t lino = 0;
......@@ -660,8 +634,31 @@ _exit:
if (code) {
tsdbAbortCompact(pCompactor);
} else {
tsdbCommitCompact(pCompactor);
tsdbFSPrepareCommit(pTsdb, &pCompactor->fs);
}
tsdbEndCompact(pCompactor);
return code;
}
int32_t tsdbCommitCompact(STsdb *pTsdb) {
int32_t code = 0;
int32_t lino = 0;
taosThreadRwlockWrlock(&pTsdb->rwLock);
code = tsdbFSCommit(pTsdb);
if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosThreadRwlockUnlock(&pTsdb->rwLock);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
} else {
tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
}
return code;
}
......@@ -15,7 +15,7 @@
#include "tsdb.h"
static bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) {
static bool tsdbShouldDoRetentionImpl(STsdb *pTsdb, int64_t now) {
for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) {
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet);
int32_t expLevel = tsdbFidLevel(pSet->fid, &pTsdb->keepCfg, now);
......@@ -38,19 +38,21 @@ static bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) {
return false;
}
bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now) {
bool should;
taosThreadRwlockRdlock(&pTsdb->rwLock);
should = tsdbShouldDoRetentionImpl(pTsdb, now);
taosThreadRwlockUnlock(&pTsdb->rwLock);
return should;
}
int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
int32_t code = 0;
if (!tsdbShouldDoRetention(pTsdb, now)) {
return code;
}
// do retention
int32_t lino = 0;
STsdbFS fs = {0};
code = tsdbFSCopy(pTsdb, &fs);
if (code) goto _err;
TSDB_CHECK_CODE(code, lino, _exit);
for (int32_t iSet = 0; iSet < taosArrayGetSize(fs.aDFileSet); iSet++) {
SDFileSet *pSet = (SDFileSet *)taosArrayGet(fs.aDFileSet, iSet);
......@@ -60,8 +62,10 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
if (expLevel < 0) {
taosMemoryFree(pSet->pHeadF);
taosMemoryFree(pSet->pDataF);
taosMemoryFree(pSet->aSttF[0]);
taosMemoryFree(pSet->pSmaF);
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
taosMemoryFree(pSet->aSttF[iStt]);
}
taosArrayRemove(fs.aDFileSet, iSet);
iSet--;
} else {
......@@ -78,35 +82,33 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
fSet.diskId = did;
code = tsdbDFileSetCopy(pTsdb, pSet, &fSet);
if (code) goto _err;
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbFSUpsertFSet(&fs, &fSet);
if (code) goto _err;
TSDB_CHECK_CODE(code, lino, _exit);
}
}
// do change fs
code = tsdbFSPrepareCommit(pTsdb, &fs);
if (code) goto _err;
taosThreadRwlockWrlock(&pTsdb->rwLock);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbFSCommit(pTsdb);
_exit:
if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
goto _err;
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
} else {
tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
}
taosThreadRwlockUnlock(&pTsdb->rwLock);
tsdbFSDestroy(&fs);
_exit:
return code;
}
_err:
tsdbError("vgId:%d, tsdb do retention failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
ASSERT(0);
// tsdbFSRollback(pTsdb->pFS);
return code;
static int32_t tsdbCommitRetentionImpl(STsdb *pTsdb) { return tsdbFSCommit(pTsdb); }
int32_t tsdbCommitRetention(STsdb *pTsdb) {
taosThreadRwlockWrlock(&pTsdb->rwLock);
tsdbCommitRetentionImpl(pTsdb);
taosThreadRwlockUnlock(&pTsdb->rwLock);
tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
return 0;
}
\ No newline at end of file
......@@ -35,9 +35,7 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq2 *pMsg, SSubmitRsp2
return -1;
}
if (pMsg) {
arrSize = taosArrayGetSize(pMsg->aSubmitTbData);
}
arrSize = taosArrayGetSize(pMsg->aSubmitTbData);
// scan and convert
if (tsdbScanAndConvertSubmitMsg(pTsdb, pMsg) < 0) {
......
......@@ -15,6 +15,8 @@
#include "vnd.h"
extern int32_t tsdbCommitCompact(STsdb *pTsdb);
static int32_t vnodeCompactTask(void *param) {
int32_t code = 0;
int32_t lino = 0;
......@@ -33,8 +35,11 @@ static int32_t vnodeCompactTask(void *param) {
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
}
vnodeCommitInfo(dir);
tsdbCommitCompact(pVnode->pTsdb);
_exit:
tsem_post(&pInfo->pVnode->canCommit);
taosMemoryFree(pInfo);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnd.h"
typedef struct {
SVnode *pVnode;
int64_t now;
int64_t commitID;
SVnodeInfo info;
} SRetentionInfo;
extern bool tsdbShouldDoRetention(STsdb *pTsdb, int64_t now);
extern int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now);
extern int32_t tsdbCommitRetention(STsdb *pTsdb);
static int32_t vnodePrepareRentention(SVnode *pVnode, SRetentionInfo *pInfo) {
int32_t code = 0;
int32_t lino = 0;
tsem_wait(&pVnode->canCommit);
pInfo->commitID = ++pVnode->state.commitID;
char dir[TSDB_FILENAME_LEN] = {0};
if (pVnode->pTfs) {
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
}
if (vnodeLoadInfo(dir, &pInfo->info) < 0) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
tsem_post(&pVnode->canCommit);
} else {
vInfo("vgId:%d %s done", TD_VID(pVnode), __func__);
}
return code;
}
static int32_t vnodeRetentionTask(void *param) {
int32_t code = 0;
int32_t lino = 0;
SRetentionInfo *pInfo = (SRetentionInfo *)param;
SVnode *pVnode = pInfo->pVnode;
char dir[TSDB_FILENAME_LEN] = {0};
if (pVnode->pTfs) {
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
}
// save info
pInfo->info.state.commitID = pInfo->commitID;
if (vnodeSaveInfo(dir, &pInfo->info) < 0) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
// do job
code = tsdbDoRetention(pInfo->pVnode->pTsdb, pInfo->now);
TSDB_CHECK_CODE(code, lino, _exit);
// commit info
vnodeCommitInfo(dir);
// commit sub-job
tsdbCommitRetention(pVnode->pTsdb);
_exit:
if (code) {
vError("vgId:%d %s failed at line %d since %s", TD_VID(pInfo->pVnode), __func__, lino, tstrerror(code));
} else {
vInfo("vgId:%d %s done", TD_VID(pInfo->pVnode), __func__);
}
tsem_post(&pInfo->pVnode->canCommit);
taosMemoryFree(pInfo);
return code;
}
int32_t vnodeAsyncRentention(SVnode *pVnode, int64_t now) {
int32_t code = 0;
int32_t lino = 0;
if (!tsdbShouldDoRetention(pVnode->pTsdb, now)) return code;
SRetentionInfo *pInfo = (SRetentionInfo *)taosMemoryCalloc(1, sizeof(*pInfo));
if (pInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pInfo->pVnode = pVnode;
pInfo->now = now;
code = vnodePrepareRentention(pVnode, pInfo);
TSDB_CHECK_CODE(code, lino, _exit);
vnodeScheduleTask(vnodeRetentionTask, pInfo);
_exit:
if (code) {
vError("vgId:%d %s failed at line %d since %s", TD_VID(pInfo->pVnode), __func__, lino, tstrerror(code));
if (pInfo) taosMemoryFree(pInfo);
} else {
vInfo("vgId:%d %s done", TD_VID(pInfo->pVnode), __func__);
}
return 0;
}
\ No newline at end of file
......@@ -586,6 +586,7 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
}
extern int32_t vnodeAsyncRentention(SVnode *pVnode, int64_t now);
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
int32_t code = 0;
SVTrimDbReq trimReq = {0};
......@@ -598,12 +599,16 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq,
vInfo("vgId:%d, trim vnode request will be processed, time:%d", pVnode->config.vgId, trimReq.timestamp);
// process
// process
#if 0
code = tsdbDoRetention(pVnode->pTsdb, trimReq.timestamp);
if (code) goto _exit;
code = smaDoRetention(pVnode->pSma, trimReq.timestamp);
if (code) goto _exit;
#else
vnodeAsyncRentention(pVnode, trimReq.timestamp);
#endif
_exit:
return code;
......@@ -635,6 +640,10 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *p
ret = smaDoRetention(pVnode->pSma, ttlReq.timestamp);
if (ret) goto end;
#else
vnodeAsyncRentention(pVnode, ttlReq.timestamp);
tsem_wait(&pVnode->canCommit);
tsem_post(&pVnode->canCommit);
#endif
end:
......
......@@ -5438,8 +5438,6 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) {
if (pDistInfo->maxRows < p1.maxRows) {
pDistInfo->maxRows = p1.maxRows;
}
pDistInfo->numOfVgroups += (p1.numOfTables != 0 ? 1 : 0);
pDistInfo->numOfVgroups += (p1.numOfTables != 0 ? 1 : 0);
for (int32_t i = 0; i < tListLen(pDistInfo->blockRowsHisto); ++i) {
pDistInfo->blockRowsHisto[i] += p1.blockRowsHisto[i];
......
......@@ -343,7 +343,7 @@ if __name__ == "__main__":
for i in range(cursor.rowcount):
if res[i][0] == "queryPolicy":
if int(res[i][1]) == int(queryPolicy):
tdLog.success(
tdLog.info(
f"alter queryPolicy to {queryPolicy} successfully"
)
else:
......@@ -402,7 +402,7 @@ if __name__ == "__main__":
for i in range(cursor.rowcount):
if res[i][0] == "queryPolicy":
if int(res[i][1]) == int(queryPolicy):
tdLog.success(
tdLog.info(
f"alter queryPolicy to {queryPolicy} successfully"
)
else:
......@@ -471,7 +471,7 @@ if __name__ == "__main__":
# for i in range(tdSql.queryRows):
# if tdSql.queryResult[i][0] == "queryPolicy" :
# if int(tdSql.queryResult[i][1]) == int(queryPolicy):
# tdLog.success('alter queryPolicy to %d successfully'%queryPolicy)
# tdLog.info('alter queryPolicy to %d successfully'%queryPolicy)
# else :
# tdLog.debug(tdSql.queryResult)
# tdLog.exit("alter queryPolicy to %d failed"%queryPolicy)
......@@ -484,7 +484,7 @@ if __name__ == "__main__":
for i in range(cursor.rowcount):
if res[i][0] == "queryPolicy":
if int(res[i][1]) == int(queryPolicy):
tdLog.success(
tdLog.info(
f"alter queryPolicy to {queryPolicy} successfully"
)
else:
......@@ -545,7 +545,7 @@ if __name__ == "__main__":
for i in range(cursor.rowcount):
if res[i][0] == "queryPolicy":
if int(res[i][1]) == int(queryPolicy):
tdLog.success(
tdLog.info(
f"alter queryPolicy to {queryPolicy} successfully"
)
else:
......
......@@ -680,8 +680,8 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py -N 6 -M 3 -n 3
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertData.py -N 6 -M 3
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertData.py -N 6 -M 3 -n 3
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 -n 3
#,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3
#,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 -n 3
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 -n 3
......@@ -913,13 +913,13 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/arccos.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/arctan.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/query_cols_tags_and_or.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 3
#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQueryInterval.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity_1.py -Q 3
#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 3
#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 3
#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 3
#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity.py -Q 3
#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity_1.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/avg.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/elapsed.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/csum.py -Q 3
......@@ -927,9 +927,9 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sample.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_diff.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/unique.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stateduration.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_stateduration.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/statecount.py -Q 3
#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stateduration.py -Q 3
#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_stateduration.py -Q 3
#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/statecount.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tail.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/ttl_comment.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/distribute_agg_count.py -Q 3
......@@ -945,7 +945,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_null.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_partition.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_partition.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_min_last_interval.py -Q 3
#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_min_last_interval.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsbsQuery.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -Q 3
......@@ -991,7 +991,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/timetruncate.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/diff.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/Timediff.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/json_tag.py -Q 4
#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/json_tag.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/top.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/bottom.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/percentile.py -Q 4
......@@ -1086,7 +1086,7 @@
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/json_tag.py
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/query_json.py
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/sample_csv_json.py
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/sml_json_alltypes.py
#,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/sml_json_alltypes.py
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/taosdemoTestQueryWithJson.py -R
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/telnet_tcp.py -R
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
from ssl import ALERT_DESCRIPTION_CERTIFICATE_UNOBTAINABLE
import taos
import sys
import time
import os
import socket
import subprocess
import random
import string
import random
from util.log import *
from util.sql import *
from util.cases import *
from util.common import *
from util.sqlset import *
from util.dnodes import *
from util.dnodes import TDDnodes
from util.dnodes import TDDnode
#
# -------------- util --------------------------
#
def pathSize(path):
total_size = 0
for dirpath, dirnames, filenames in os.walk(path):
for i in filenames:
#use join to concatenate all the components of path
f = os.path.join(dirpath, i)
#use getsize to generate size in bytes and add it to the total size
total_size += os.path.getsize(f)
#print(dirpath)
print(" %s %.02f MB"%(path, total_size/1024/1024))
return total_size
'''
total = 0
with os.scandir(path) as it:
for entry in it:
if entry.is_file():
total += entry.stat().st_size
elif entry.is_dir():
total += pathSize(entry.path)
print(" %s %.02f MB"%(path, total/1024/1024))
return total
'''
#
# --------------- cluster ------------------------
#
class MyDnodes(TDDnodes):
def __init__(self ,dnodes_lists):
super(MyDnodes,self).__init__()
self.dnodes = dnodes_lists # dnode must be TDDnode instance
self.simDeployed = False
class TagCluster:
noConn = True
def init(self, conn, logSql, replicaVar=1):
tdLog.debug(f"start to excute {__file__}")
self.TDDnodes = None
self.depoly_cluster(5)
self.master_dnode = self.TDDnodes.dnodes[0]
self.host=self.master_dnode.cfgDict["fqdn"]
conn1 = taos.connect(self.master_dnode.cfgDict["fqdn"] , config=self.master_dnode.cfgDir)
tdSql.init(conn1.cursor())
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/build/bin")]
break
return buildPath
def depoly_cluster(self ,dnodes_nums):
testCluster = False
valgrind = 0
hostname = socket.gethostname()
dnodes = []
start_port = 6030
for num in range(1, dnodes_nums+1):
dnode = TDDnode(num)
dnode.addExtraCfg("firstEp", f"{hostname}:{start_port}")
dnode.addExtraCfg("fqdn", f"{hostname}")
dnode.addExtraCfg("serverPort", f"{start_port + (num-1)*100}")
dnode.addExtraCfg("monitorFqdn", hostname)
dnode.addExtraCfg("monitorPort", 7043)
dnodes.append(dnode)
self.TDDnodes = MyDnodes(dnodes)
self.TDDnodes.init("")
self.TDDnodes.setTestCluster(testCluster)
self.TDDnodes.setValgrind(valgrind)
self.TDDnodes.setAsan(tdDnodes.getAsan())
self.TDDnodes.stopAll()
for dnode in self.TDDnodes.dnodes:
self.TDDnodes.deploy(dnode.index,{})
for dnode in self.TDDnodes.dnodes:
self.TDDnodes.starttaosd(dnode.index)
# create cluster
dnode_first_host = ""
sql = ""
for dnode in self.TDDnodes.dnodes[1:]:
# print(dnode.cfgDict)
dnode_id = dnode.cfgDict["fqdn"] + ":" +dnode.cfgDict["serverPort"]
if dnode_first_host == "":
dnode_first_host = dnode.cfgDict["firstEp"].split(":")[0]
dnode_first_port = dnode.cfgDict["firstEp"].split(":")[-1]
sql += f"create dnode '{dnode_id}'; "
cmd = f"{self.getBuildPath()}/build/bin/taos -h {dnode_first_host} -P {dnode_first_port} -s "
cmd += f'"{sql}"'
print(cmd)
os.system(cmd)
time.sleep(2)
tdLog.info(" create cluster done! ")
def getConnection(self, dnode):
host = dnode.cfgDict["fqdn"]
port = dnode.cfgDict["serverPort"]
config_dir = dnode.cfgDir
return taos.connect(host=host, port=int(port), config=config_dir)
def run(self):
tdLog.info(" create cluster ok.")
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
class PerfDB:
def __init__(self):
self.sqls = []
self.spends= []
# execute
def execute(self, sql):
print(f" perfdb execute {sql}")
stime = time.time()
ret = tdSql.execute(sql, 1)
spend = time.time() - stime
self.sqls.append(sql)
self.spends.append(spend)
return ret
# query
def query(self, sql):
print(f" perfdb query {sql}")
start = time.time()
ret = tdSql.query(sql, None, 1)
spend = time.time() - start
self.sqls.append(sql)
self.spends.append(spend)
return ret
#
# ----------------- TDTestCase ------------------
#
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
tdLog.debug("start to execute %s" % __file__)
self.dbs = [PerfDB(), PerfDB()]
self.cur = 0
self.tagCluster = TagCluster()
self.tagCluster.init(conn, logSql, replicaVar)
self.lenBinary = 64
self.lenNchar = 32
# column
self.column_dict = {
'ts': 'timestamp',
'col1': 'tinyint',
'col2': 'smallint',
'col3': 'int',
'col4': 'bigint',
'col5': 'tinyint unsigned',
'col6': 'smallint unsigned',
'col7': 'int unsigned',
'col8': 'bigint unsigned',
'col9': 'float',
'col10': 'double',
'col11': 'bool',
'col12': f'varchar({self.lenBinary})',
'col13': f'nchar({self.lenNchar})'
}
# tag
self.tag_dict = {
't1': 'tinyint',
't2': 'smallint',
't3': 'int',
't4': 'bigint',
't5': 'tinyint unsigned',
't6': 'smallint unsigned',
't7': 'int unsigned',
't8': 'bigint unsigned',
't9': 'float',
't10': 'double',
't11': 'bool',
't12': f'varchar({self.lenBinary})',
't13': f'nchar({self.lenNchar})',
't14': 'timestamp'
}
# generate specail wide random string
def random_string(self, count):
letters = string.ascii_letters
return ''.join(random.choice(letters) for i in range(count))
# execute
def execute(self, sql):
obj = self.dbs[self.cur]
return obj.execute(sql)
# query
def query(self, sql):
return self.dbs[self.cur].query(sql)
def set_stb_sql(self,stbname,column_dict,tag_dict):
column_sql = ''
tag_sql = ''
for k,v in column_dict.items():
column_sql += f"{k} {v}, "
for k,v in tag_dict.items():
tag_sql += f"{k} {v}, "
create_stb_sql = f'create stable {stbname} ({column_sql[:-2]}) tags ({tag_sql[:-2]})'
return create_stb_sql
# create datbase
def create_database(self, dbname, vgroups, replica):
sql = f'create database {dbname} vgroups {vgroups} replica {replica}'
tdSql.execute(sql)
#tdSql.execute(sql)
tdSql.execute(f'use {dbname}')
# create stable and child tables
def create_table(self, stbname, tbname, count):
# create stable
create_table_sql = self.set_stb_sql(stbname, self.column_dict, self.tag_dict)
tdSql.execute(create_table_sql)
# create child table
tdLog.info(f" start create {count} child tables.")
for i in range(count):
ti = i % 128
binTxt = self.random_string(self.lenBinary)
tags = f'{ti},{ti},{i},{i},{ti},{ti},{i},{i},{i}.000{i},{i}.000{i},true,"{binTxt}","nch{i}",now'
sql = f'create table {tbname}{i} using {stbname} tags({tags})'
tdSql.execute(sql)
if i > 0 and i % 1000 == 0:
tdLog.info(f" child table count = {i}")
tdLog.info(f" end create {count} child tables.")
# create stable and child tables
def create_tagidx(self, stbname):
cnt = -1
for key in self.tag_dict.keys():
# first tag have default index, so skip
if cnt == -1:
cnt = 0
continue;
sql = f'create index idx_{key} on {stbname} ({key})'
tdLog.info(f" sql={sql}")
tdSql.execute(sql)
cnt += 1
tdLog.info(f' create {cnt} tag indexs ok.')
# insert to child table d1 data
def insert_data(self, tbname):
# d1 insert 3 rows
for i in range(3):
sql = f'insert into {tbname}1(ts,col1) values(now+{i}s,{i});'
tdSql.execute(sql)
# d20 insert 4
for i in range(4):
sql = f'insert into {tbname}20(ts,col1) values(now+{i}s,{i});'
tdSql.execute(sql)
# check show indexs
def show_tagidx(self, dbname, stbname):
sql = f'select index_name,column_name from information_schema.ins_indexes where db_name="{dbname}"'
tdSql.query(sql)
rows = len(self.tag_dict.keys())-1
tdSql.checkRows(rows)
for i in range(rows):
col_name = tdSql.getData(i, 1)
idx_name = f'idx_{col_name}'
tdSql.checkData(i, 0, idx_name)
tdLog.info(f' show {rows} tag indexs ok.')
# query with tag idx
def query_tagidx(self, stbname):
sql = f'select * from meters where t2=1'
self.query(sql)
tdSql.checkRows(3)
sql = f'select * from meters where t2<10'
self.query(sql)
tdSql.checkRows(3)
sql = f'select * from meters where t2>10'
self.query(sql)
tdSql.checkRows(4)
sql = f'select * from meters where t3<30'
self.query(sql)
tdSql.checkRows(7)
sql = f'select * from meters where t12="11"'
tdSql.query(sql)
tdSql.checkRows(0)
sql = f'select * from meters where (t4 < 10 or t5 = 20) and t6= 30'
self.query(sql)
tdSql.checkRows(0)
sql = f'select * from meters where (t7 < 20 and t8 = 20) or t4 = 20'
self.query(sql)
tdSql.checkRows(4)
sql = f'select * from meters where t12 like "%ab%" '
self.query(sql)
tdSql.checkRows(0)
sql = f'select * from meters where t13 = "d20" '
self.query(sql)
tdSql.checkRows(0)
sql = f'select * from meters where t13 = "nch20" '
self.query(sql)
tdSql.checkRows(4)
sql = f'select * from meters where tbname = "d20" '
self.query(sql)
tdSql.checkRows(4)
# drop child table
def drop_tables(self, tbname, count):
# table d1 and d20 have verify data , so can not drop
start = random.randint(21, count/2)
end = random.randint(count/2 + 1, count - 1)
for i in range(start, end):
sql = f'drop table {tbname}{i}'
tdSql.execute(sql)
cnt = end - start + 1
tdLog.info(f' drop table from {start} to {end} count={cnt}')
# drop tag index
def drop_tagidx(self, dbname, stbname):
# drop index
cnt = -1
for key in self.tag_dict.keys():
# first tag have default index, so skip
if cnt == -1:
cnt = 0
continue;
sql = f'drop index idx_{key}'
tdSql.execute(sql)
cnt += 1
# check idx result is 0
sql = f'select index_name,column_name from information_schema.ins_indexes where db_name="{dbname}"'
tdSql.query(sql)
tdSql.checkRows(0)
tdLog.info(f' drop {cnt} tag indexs ok.')
# show performance
def show_performance(self, count) :
db = self.dbs[0]
db1 = self.dbs[1]
cnt = len(db.sqls)
cnt1 = len(db1.sqls)
if cnt != len(db1.sqls):
tdLog.info(f" datebase sql count not equal. cnt={cnt} cnt1={cnt1}\n")
return False
tdLog.info(f" database sql cnt ={cnt}")
print(f" ----------------- performance (child tables = {count})--------------------")
print(" No time(index) time(no-index) diff(col3-col2) rate(col2/col3) sql")
for i in range(cnt):
key = db.sqls[i]
value = db.spends[i]
key1 = db1.sqls[i]
value1 = db1.spends[i]
diff = value1 - value
rate = value/value1*100
print(" %d %.3fs %.3fs %.3fs %d%% %s"%(i+1, value, value1, diff, rate, key))
print(" --------------------- end ------------------------")
return True
def show_diskspace(self):
#calc
selfPath = os.path.dirname(os.path.realpath(__file__))
projPath = ""
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
# total
vnode2_size = pathSize(projPath + "sim/dnode2/data/vnode/vnode2/")
vnode3_size = pathSize(projPath + "sim/dnode3/data/vnode/vnode3/")
vnode4_size = pathSize(projPath + "sim/dnode4/data/vnode/vnode4/")
vnode5_size = pathSize(projPath + "sim/dnode5/data/vnode/vnode5/")
# show
print(" ----------------- disk space --------------------")
idx_size = vnode2_size + vnode3_size
noidx_size = vnode4_size + vnode5_size
print(" index = %.02f M"%(idx_size/1024/1024))
print(" no-index = %.02f M"%(noidx_size/1024/1024))
print(" index/no-index = %.2f multiple"%(idx_size/noidx_size))
print(" -------------------- end ------------------------")
# main
def testdb(self, dbname, stable, tbname, count, createidx):
# cur
if createidx:
self.cur = 0
else :
self.cur = 1
# do
self.create_database(dbname, 2, 1)
self.create_table(stable, tbname, count)
if(createidx):
self.create_tagidx(stable)
self.insert_data(tbname)
if(createidx):
self.show_tagidx(dbname,stable)
self.query_tagidx(stable)
#self.drop_tables(tbname, count)
#if(createidx):
# self.drop_tagidx(dbname, stable)
# query after delete , expect no crash
#self.query_tagidx(stable)
tdSql.execute(f'flush database {dbname}')
# run
def run(self):
self.tagCluster.run()
# var
dbname = "tagindex"
dbname1 = dbname + "1"
stable = "meters"
tbname = "d"
count = 10000
# test db
tdLog.info(f" ------------- {dbname} ----------")
self.testdb(dbname, stable, tbname, count, True)
tdLog.info(f" ------------- {dbname1} ----------")
self.testdb(dbname1, stable, tbname, count, False)
# show test result
self.show_performance(count)
self.tagCluster.TDDnodes.stopAll()
sec = 10
print(f" sleep {sec}s wait taosd stopping ...")
time.sleep(sec)
self.show_diskspace()
def stop(self):
self.tagCluster.stop()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
......@@ -107,11 +107,11 @@ class TDTestCase:
def insert_data(self, tbname):
# d1 insert 3 rows
for i in range(3):
sql = f'insert into {tbname}1(ts,col1) values(now,{i});'
sql = f'insert into {tbname}1(ts,col1) values(now+{i}s,{i});'
tdSql.execute(sql)
# d20 insert 4
for i in range(4):
sql = f'insert into {tbname}20(ts,col1) values(now,{i});'
sql = f'insert into {tbname}20(ts,col1) values(now+{i}s,{i});'
tdSql.execute(sql)
# check show indexs
......@@ -150,6 +150,22 @@ class TDTestCase:
tdSql.query(sql)
tdSql.checkRows(0)
sql = f'select t12 ,t13,tbname from meters where t13="nch20"'
tdSql.query(sql)
tdSql.checkRows(4)
sql = f'select * from meters where t12 like "%ab%" '
tdSql.query(sql)
tdSql.checkRows(0)
sql = f'select * from meters where t13 = "d20" '
tdSql.query(sql)
tdSql.checkRows(0)
sql = f'select * from meters where tbname = "d20" '
tdSql.query(sql)
tdSql.checkRows(4)
sql = f'select * from meters where (t4 < 10 or t5 = 20) and t6= 30'
tdSql.query(sql)
tdSql.checkRows(0)
......@@ -188,6 +204,22 @@ class TDTestCase:
tdSql.checkRows(0)
tdLog.info(f' drop {cnt} tag indexs ok.')
# create long name idx
def longname_idx(self, stbname):
long_name = "aaaaaaaaaabbbbbbbbbbccccccccccddddddddddeeeeeeeeeeffffffffffgggggggggghhhhhhhhhhiiiiiiiiiijjjjjjjjjjkkkkkkkkkkllllllllllmmmmmmmmmm"
key = "t3"
sql = f'create index {long_name} on {stbname} ({key})'
tdSql.error(sql)
long_name = "aaaaaaaaaabbbbbbbbbbccccccccccddddddddddeeeeeeeeeeffffffffff"
key = "t3"
sql = f'create index {long_name} on {stbname} ({key})'
tdLog.info(f"{sql}")
tdSql.execute(sql)
sql = f'drop index {long_name}'
tdLog.info(f"{sql}")
tdSql.execute(sql)
# run
def run(self):
# var
......@@ -204,6 +236,7 @@ class TDTestCase:
self.drop_tagidx(stable)
# query after delete , expect no crash
self.query_tagidx(stable)
self.longname_idx(stable)
def stop(self):
......
......@@ -94,7 +94,7 @@ else
sleep 1
done
AsanFileSuccessLen=$(grep -w successfully $AsanFile | wc -l)
AsanFileSuccessLen=$(grep -w "successfully executed" $AsanFile | wc -l)
echo "AsanFileSuccessLen:" $AsanFileSuccessLen
if [ $AsanFileSuccessLen -gt 0 ]; then
......@@ -104,4 +104,4 @@ else
echo "Execute script failure"
exit 1
fi
fi
fi
\ No newline at end of file
......@@ -318,7 +318,7 @@ if __name__ == "__main__":
for i in range(cursor.rowcount):
if res[i][0] == "queryPolicy" :
if int(res[i][1]) == int(queryPolicy):
tdLog.success(f'alter queryPolicy to {queryPolicy} successfully')
tdLog.info(f'alter queryPolicy to {queryPolicy} successfully')
else:
tdLog.debug(res)
tdLog.exit(f"alter queryPolicy to {queryPolicy} failed")
......@@ -371,7 +371,7 @@ if __name__ == "__main__":
for i in range(cursor.rowcount):
if res[i][0] == "queryPolicy" :
if int(res[i][1]) == int(queryPolicy):
tdLog.success(f'alter queryPolicy to {queryPolicy} successfully')
tdLog.info(f'alter queryPolicy to {queryPolicy} successfully')
else:
tdLog.debug(res)
tdLog.exit(f"alter queryPolicy to {queryPolicy} failed")
......@@ -439,7 +439,7 @@ if __name__ == "__main__":
# for i in range(tdSql.queryRows):
# if tdSql.queryResult[i][0] == "queryPolicy" :
# if int(tdSql.queryResult[i][1]) == int(queryPolicy):
# tdLog.success('alter queryPolicy to %d successfully'%queryPolicy)
# tdLog.info('alter queryPolicy to %d successfully'%queryPolicy)
# else :
# tdLog.debug(tdSql.queryResult)
# tdLog.exit("alter queryPolicy to %d failed"%queryPolicy)
......@@ -452,7 +452,7 @@ if __name__ == "__main__":
for i in range(cursor.rowcount):
if res[i][0] == "queryPolicy" :
if int(res[i][1]) == int(queryPolicy):
tdLog.success(f'alter queryPolicy to {queryPolicy} successfully')
tdLog.info(f'alter queryPolicy to {queryPolicy} successfully')
else:
tdLog.debug(res)
tdLog.exit(f"alter queryPolicy to {queryPolicy} failed")
......@@ -509,7 +509,7 @@ if __name__ == "__main__":
for i in range(cursor.rowcount):
if res[i][0] == "queryPolicy" :
if int(res[i][1]) == int(queryPolicy):
tdLog.success(f'alter queryPolicy to {queryPolicy} successfully')
tdLog.info(f'alter queryPolicy to {queryPolicy} successfully')
else:
tdLog.debug(res)
tdLog.exit(f"alter queryPolicy to {queryPolicy} failed")
......
......@@ -86,8 +86,8 @@ SWords shellCommands[] = {
"<anyword> <db_options> <anyword> <db_options> <anyword> <db_options> <anyword> <db_options> <anyword> "
"<db_options> <anyword> <db_options> <anyword> ;",
0, 0, NULL},
{"create dnode ", 0, 0, NULL},
{"create index ", 0, 0, NULL},
{"create dnode <anyword>", 0, 0, NULL},
{"create index <anyword> on <stb_name> ()", 0, 0, NULL},
{"create mnode on dnode <dnode_id> ;", 0, 0, NULL},
{"create qnode on dnode <dnode_id> ;", 0, 0, NULL},
{"create stream <anyword> into <anyword> as select", 0, 0, NULL}, // 26 append sub sql
......@@ -98,6 +98,7 @@ SWords shellCommands[] = {
{"describe <all_table>", 0, 0, NULL},
{"delete from <all_table> where ", 0, 0, NULL},
{"drop database <db_name>", 0, 0, NULL},
{"drop index <anyword>", 0, 0, NULL},
{"drop table <all_table>", 0, 0, NULL},
{"drop dnode <dnode_id>", 0, 0, NULL},
{"drop mnode on dnode <dnode_id> ;", 0, 0, NULL},
......@@ -384,7 +385,7 @@ void showHelp() {
create table <tb_name> using <stb_name> tags ...\n\
create database <db_name> <db_options> ...\n\
create dnode \"fqdn:port\" ...\n\
create index ...\n\
create index <index_name> on <stb_name> (tag_column_name);\n\
create mnode on dnode <dnode_id> ;\n\
create qnode on dnode <dnode_id> ;\n\
create stream <stream_name> into <stb_name> as select ...\n\
......@@ -404,6 +405,7 @@ void showHelp() {
drop consumer group ... \n\
drop topic <topic_name> ;\n\
drop stream <stream_name> ;\n\
drop index <index_name>;\n\
----- E ----- \n\
explain select clause ...\n\
----- F ----- \n\
......@@ -534,7 +536,7 @@ SWord* addWord(const char* p, int32_t len, bool pattern) {
word->len = len;
// check format
if (pattern) {
if (pattern && len > 0) {
word->type = wordType(p, len);
} else {
word->type = WT_TEXT;
......@@ -1724,6 +1726,9 @@ bool matchEnd(TAOS* con, SShellCmd* cmd) {
if (strlen(last) == 0) {
goto _return;
}
if (strcmp(last, " ") == 0) {
goto _return;
}
// match database
if (elast == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册