未验证 提交 6b3e339c 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #6238 from taosdata/feature/m2d

Feature/m2d
......@@ -114,6 +114,25 @@ mkdir -p ${install_dir}/examples
examples_dir="${top_dir}/tests/examples"
cp -r ${examples_dir}/c ${install_dir}/examples
if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then
if [ -d ${examples_dir}/JDBC/connectionPools/target ]; then
rm -rf ${examples_dir}/JDBC/connectionPools/target
fi
if [ -d ${examples_dir}/JDBC/JDBCDemo/target ]; then
rm -rf ${examples_dir}/JDBC/JDBCDemo/target
fi
if [ -d ${examples_dir}/JDBC/mybatisplus-demo/target ]; then
rm -rf ${examples_dir}/JDBC/mybatisplus-demo/target
fi
if [ -d ${examples_dir}/JDBC/springbootdemo/target ]; then
rm -rf ${examples_dir}/JDBC/springbootdemo/target
fi
if [ -d ${examples_dir}/JDBC/SpringJdbcTemplate/target ]; then
rm -rf ${examples_dir}/JDBC/SpringJdbcTemplate/target
fi
if [ -d ${examples_dir}/JDBC/taosdemo/target ]; then
rm -rf ${examples_dir}/JDBC/taosdemo/target
fi
cp -r ${examples_dir}/JDBC ${install_dir}/examples
cp -r ${examples_dir}/matlab ${install_dir}/examples
cp -r ${examples_dir}/python ${install_dir}/examples
......
name: tdengine
base: core18
version: '2.1.1.0'
icon: snap/gui/t-dengine.svg
summary: an open-source big data platform designed and optimized for IoT.
......
......@@ -7187,6 +7187,11 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, int32_t index) {
const char* msg1 = "point interpolation query needs timestamp";
const char* msg2 = "too many tables in from clause";
const char* msg3 = "start(end) time of query range required or time range too large";
// const char* msg5 = "too many columns in selection clause";
// const char* msg6 = "too many tables in from clause";
// const char* msg7 = "invalid table alias name";
// const char* msg8 = "alias name too long";
const char* msg9 = "only tag query not compatible with normal column filter";
int32_t code = TSDB_CODE_SUCCESS;
......@@ -7326,6 +7331,20 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, int32_t index) {
}
}
if (tscQueryTags(pQueryInfo)) {
SExprInfo* pExpr1 = tscSqlExprGet(pQueryInfo, 0);
if (pExpr1->base.functionId != TSDB_FUNC_TID_TAG) {
int32_t numOfCols = (int32_t)taosArrayGetSize(pQueryInfo->colList);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumn* pCols = taosArrayGetP(pQueryInfo->colList, i);
if (pCols->info.flist.numOfFilters > 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg9);
}
}
}
}
// parse the having clause in the first place
if (validateHavingClause(pQueryInfo, pSqlNode->pHaving, pCmd, pSqlNode->pSelNodeList, joinQuery, timeWindowQuery) !=
TSDB_CODE_SUCCESS) {
......
......@@ -1928,8 +1928,9 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
}
}
tscDebug("0x%"PRIx64" recv table meta, uid:%" PRIu64 ", tid:%d, name:%s", pSql->self, pTableMeta->id.uid, pTableMeta->id.tid,
tNameGetTableName(&pTableMetaInfo->name));
tscDebug("0x%"PRIx64" recv table meta, uid:%" PRIu64 ", tid:%d, name:%s, numOfCols:%d, numOfTags:%d", pSql->self,
pTableMeta->id.uid, pTableMeta->id.tid, tNameGetTableName(&pTableMetaInfo->name), pTableMeta->tableInfo.numOfColumns,
pTableMeta->tableInfo.numOfTags);
free(pTableMeta);
return TSDB_CODE_SUCCESS;
......@@ -2072,7 +2073,7 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
pInfo->vgroupList->numOfVgroups = pVgroupMsg->numOfVgroups;
if (pInfo->vgroupList->numOfVgroups <= 0) {
tscDebug("0x%"PRIx64" empty vgroup info, no corresponding tables for stable", pSql->self);
tscDebug("0x%" PRIx64 " empty vgroup info, no corresponding tables for stable", pSql->self);
} else {
for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
// just init, no need to lock
......
......@@ -627,6 +627,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
if (pSql->sqlstr == NULL) {
tscError("0x%"PRIx64" failed to malloc sql string buffer", pSql->self);
tscFreeSqlObj(pSql);
free(pStream);
return NULL;
}
......
......@@ -215,7 +215,7 @@ static void tscProcessSubscriptionTimer(void *handle, void *tmrId) {
taosTmrReset(tscProcessSubscriptionTimer, pSub->interval, pSub, tscTmr, &pSub->pTimer);
}
//TODO refactor: extract table list name not simply from the sql
static SArray* getTableList( SSqlObj* pSql ) {
const char* p = strstr( pSql->sqlstr, " from " );
assert(p != NULL); // we are sure this is a 'select' statement
......@@ -224,11 +224,11 @@ static SArray* getTableList( SSqlObj* pSql ) {
SSqlObj* pNew = taos_query(pSql->pTscObj, sql);
if (pNew == NULL) {
tscError("0x%"PRIx64"failed to retrieve table id: cannot create new sql object.", pSql->self);
tscError("0x%"PRIx64" failed to retrieve table id: cannot create new sql object.", pSql->self);
return NULL;
} else if (taos_errno(pNew) != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64"failed to retrieve table id,error: %s", pSql->self, tstrerror(taos_errno(pNew)));
tscError("0x%"PRIx64" failed to retrieve table id,error: %s", pSql->self, tstrerror(taos_errno(pNew)));
return NULL;
}
......
......@@ -219,6 +219,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_NO_WRITE_AUTH TAOS_DEF_ERROR_CODE(0, 0x0512) //"Database write operation denied")
#define TSDB_CODE_VND_IS_SYNCING TAOS_DEF_ERROR_CODE(0, 0x0513) //"Database is syncing")
#define TSDB_CODE_VND_INVALID_TSDB_STATE TAOS_DEF_ERROR_CODE(0, 0x0514) //"Invalid tsdb state")
#define TSDB_CODE_VND_IS_CLOSING TAOS_DEF_ERROR_CODE(0, 0x0515) //"Database is closing")
// tsdb
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) //"Invalid table ID")
......
......@@ -470,6 +470,7 @@ typedef struct SThreadInfo_S {
// seq of query or subscribe
uint64_t querySeq; // sequence number of sql command
TAOS_SUB* tsub;
} threadInfo;
......
......@@ -719,13 +719,13 @@ static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void *
if (action == SDB_ACTION_INSERT) {
return sdbPerformInsertAction(pHead, pTable);
} else if (action == SDB_ACTION_DELETE) {
if (qtype == TAOS_QTYPE_FWD) {
//if (qtype == TAOS_QTYPE_FWD) {
// Drop database/stable may take a long time and cause a timeout, so we confirm first then reput it into queue
sdbWriteFwdToQueue(1, hparam, TAOS_QTYPE_QUERY, unused);
return TSDB_CODE_SUCCESS;
} else {
// sdbWriteFwdToQueue(1, hparam, TAOS_QTYPE_QUERY, unused);
// return TSDB_CODE_SUCCESS;
//} else {
return sdbPerformDeleteAction(pHead, pTable);
}
//}
} else if (action == SDB_ACTION_UPDATE) {
return sdbPerformUpdateAction(pHead, pTable);
} else {
......
......@@ -1189,8 +1189,8 @@ static int32_t mnodeFindSuperTableTagIndex(SSTableObj *pStable, const char *tagN
static int32_t mnodeAddSuperTableTagCb(SMnodeMsg *pMsg, int32_t code) {
SSTableObj *pStable = (SSTableObj *)pMsg->pTable;
mLInfo("msg:%p, app:%p stable %s, add tag result:%s", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId,
tstrerror(code));
mLInfo("msg:%p, app:%p stable %s, add tag result:%s, numOfTags:%d", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId,
tstrerror(code), pStable->numOfTags);
return code;
}
......
......@@ -121,7 +121,7 @@ static int32_t mnodeVgroupActionDelete(SSdbRow *pRow) {
SVgObj *pVgroup = pRow->pObj;
if (pVgroup->pDb == NULL) {
mError("vgId:%d, db:%s is not exist while insert into hash", pVgroup->vgId, pVgroup->dbName);
mError("vgId:%d, db:%s is not exist while delete from hash", pVgroup->vgId, pVgroup->dbName);
return TSDB_CODE_MND_VGROUP_NOT_EXIST;
}
......
......@@ -709,7 +709,7 @@ static void syncChooseMaster(SSyncNode *pNode) {
}
static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
int32_t onlineNum = 0;
int32_t onlineNum = 0, arbOnlineNum = 0;
int32_t masterIndex = -1;
int32_t replica = pNode->replica;
......@@ -723,13 +723,15 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
SSyncPeer *pArb = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA];
if (pArb && pArb->role != TAOS_SYNC_ROLE_OFFLINE) {
onlineNum++;
++arbOnlineNum;
replica = pNode->replica + 1;
}
if (onlineNum <= replica * 0.5) {
if (nodeRole != TAOS_SYNC_ROLE_UNSYNCED) {
if (nodeRole == TAOS_SYNC_ROLE_MASTER && onlineNum == replica * 0.5 && onlineNum >= 1) {
if (nodeRole == TAOS_SYNC_ROLE_MASTER && onlineNum == replica * 0.5 && ((replica > 2 && onlineNum - arbOnlineNum > 1) || pNode->replica < 3)) {
sInfo("vgId:%d, self keep work as master, online:%d replica:%d", pNode->vgId, onlineNum, replica);
masterIndex = pNode->selfIndex;
} else {
nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
sInfo("vgId:%d, self change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica);
......@@ -1002,6 +1004,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
// nodeVersion = pHead->version;
code = (*pNode->writeToCacheFp)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL);
syncConfirmForward(pNode->rid, pHead->version, code, false);
} else {
if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
code = syncSaveIntoBuffer(pPeer, pHead);
......@@ -1404,7 +1407,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
pthread_mutex_lock(&pNode->mutex);
for (int32_t i = 0; i < pSyncFwds->fwds; ++i) {
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % SYNC_MAX_FWDS;
if (ABS(time - pFwdInfo->time) < 2000) break;
if (ABS(time - pFwdInfo->time) < 10000) break;
sDebug("vgId:%d, forward info expired, hver:%" PRIu64 " curtime:%" PRIu64 " savetime:%" PRIu64, pNode->vgId,
pFwdInfo->version, time, pFwdInfo->time);
......
......@@ -613,7 +613,7 @@ void doCleanupDataCache(SCacheObj *pCacheObj) {
// todo memory leak if there are object with refcount greater than 0 in hash table?
taosHashCleanup(pCacheObj->pHashTable);
taosTrashcanEmpty(pCacheObj, true);
taosTrashcanEmpty(pCacheObj, false);
__cache_lock_destroy(pCacheObj);
......
......@@ -454,7 +454,11 @@ void vnodeDestroy(SVnodeObj *pVnode) {
}
if (pVnode->tsdb) {
code = tsdbCloseRepo(pVnode->tsdb, 1);
// the deleted vnode does not need to commit, so as to speed up the deletion
int toCommit = 1;
if (pVnode->dropped) toCommit = 0;
code = tsdbCloseRepo(pVnode->tsdb, toCommit);
pVnode->tsdb = NULL;
}
......
......@@ -126,11 +126,16 @@ void vnodeStopSyncFile(int32_t vgId, uint64_t fversion) {
}
void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code) {
void *pVnode = vnodeAcquire(vgId);
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while confirm forward", vgId);
}
if (code == TSDB_CODE_SYN_CONFIRM_EXPIRED && pVnode->status == TAOS_VN_STATUS_CLOSING) {
vDebug("vgId:%d, db:%s, vnode is closing while confirm forward", vgId, pVnode->db);
code = TSDB_CODE_VND_IS_CLOSING;
}
dnodeSendRpcVWriteRsp(pVnode, wparam, code);
vnodeRelease(pVnode);
}
......
def pre_test(){
sh '''
sudo rmtaos||echo 'no taosd installed'
'''
sh '''
cd ${WKC}
git reset --hard
git checkout $BRANCH_NAME
git pull
git submodule update
cd ${WK}
git reset --hard
git checkout $BRANCH_NAME
git pull
export TZ=Asia/Harbin
date
rm -rf ${WK}/debug
mkdir debug
cd debug
cmake .. > /dev/null
make > /dev/null
make install > /dev/null
pip3 install ${WKC}/src/connector/python/linux/python3/
'''
return 1
}
pipeline {
agent none
environment{
WK = '/var/lib/jenkins/workspace/TDinternal'
WKC= '/var/lib/jenkins/workspace/TDinternal/community'
}
stages {
stage('Parallel test stage') {
parallel {
stage('pytest') {
agent{label 'slam1'}
steps {
pre_test()
sh '''
cd ${WKC}/tests
find pytest -name '*'sql|xargs rm -rf
./test-all.sh pytest
date'''
}
}
stage('test_b1') {
agent{label 'slam2'}
steps {
pre_test()
sh '''
cd ${WKC}/tests
./test-all.sh b1
date'''
}
}
stage('test_crash_gen') {
agent{label "slam3"}
steps {
pre_test()
sh '''
cd ${WKC}/tests/pytest
'''
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/pytest
./crash_gen.sh -a -p -t 4 -s 2000
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/pytest
rm -rf /var/lib/taos/*
rm -rf /var/log/taos/*
./handle_crash_gen_val_log.sh
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/pytest
rm -rf /var/lib/taos/*
rm -rf /var/log/taos/*
./handle_taosd_val_log.sh
'''
}
sh'''
systemctl start taosd
sleep 10
'''
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/gotest
bash batchtest.sh
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/examples/python/PYTHONConnectorChecker
python3 PythonChecker.py
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/examples/JDBC/JDBCDemo/
mvn clean package assembly:single -DskipTests >/dev/null
java -jar target/JDBCDemo-SNAPSHOT-jar-with-dependencies.jar -host 127.0.0.1
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/src/connector/jdbc
mvn clean package -Dmaven.test.skip=true >/dev/null
cd ${WKC}/tests/examples/JDBC/JDBCDemo/
java --class-path=../../../../src/connector/jdbc/target:$JAVA_HOME/jre/lib/ext -jar target/JDBCDemo-SNAPSHOT-jar-with-dependencies.jar -host 127.0.0.1
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cp -rf ${WKC}/tests/examples/nodejs ${JENKINS_HOME}/workspace/
cd ${JENKINS_HOME}/workspace/nodejs
node nodejsChecker.js host=localhost
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${JENKINS_HOME}/workspace/C#NET/src/CheckC#
dotnet run
'''
}
sh '''
systemctl stop taosd
cd ${WKC}/tests
./test-all.sh b2
date
'''
sh '''
cd ${WKC}/tests
./test-all.sh full unit
date'''
}
}
stage('test_valgrind') {
agent{label "slam4"}
steps {
pre_test()
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/pytest
nohup taosd >/dev/null &
sleep 10
python3 concurrent_inquiry.py -c 1
'''
}
sh '''
cd ${WKC}/tests
./test-all.sh full jdbc
date'''
sh '''
cd ${WKC}/tests/pytest
./valgrind-test.sh 2>&1 > mem-error-out.log
./handle_val_log.sh
date
cd ${WKC}/tests
./test-all.sh b3
date'''
sh '''
date
cd ${WKC}/tests
./test-all.sh full example
date'''
}
}
stage('arm64_build'){
agent{label 'arm64'}
steps{
sh '''
cd ${WK}
git fetch
git checkout develop
git pull
cd ${WKC}
git fetch
git checkout develop
git pull
git submodule update
cd ${WKC}/packaging
./release.sh -v cluster -c aarch64 -n 2.0.0.0 -m 2.0.0.0
'''
}
}
stage('arm32_build'){
agent{label 'arm32'}
steps{
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WK}
git fetch
git checkout develop
git pull
cd ${WKC}
git fetch
git checkout develop
git pull
git submodule update
cd ${WKC}/packaging
./release.sh -v cluster -c aarch32 -n 2.0.0.0 -m 2.0.0.0
'''
}
}
}
}
}
}
post {
success {
emailext (
subject: "SUCCESSFUL: Job '${env.JOB_NAME} [${env.BUILD_NUMBER}]'",
body: '''<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
</head>
<body leftmargin="8" marginwidth="0" topmargin="8" marginheight="4" offset="0">
<table width="95%" cellpadding="0" cellspacing="0" style="font-size: 16pt; font-family: Tahoma, Arial, Helvetica, sans-serif">
<tr>
<td><br />
<b><font color="#0B610B"><font size="6">构建信息</font></font></b>
<hr size="2" width="100%" align="center" /></td>
</tr>
<tr>
<td>
<ul>
<div style="font-size:18px">
<li>构建名称>>分支:${PROJECT_NAME}</li>
<li>构建结果:<span style="color:green"> Successful </span></li>
<li>构建编号:${BUILD_NUMBER}</li>
<li>触发用户:${CAUSE}</li>
<li>变更概要:${CHANGES}</li>
<li>构建地址:<a href=${BUILD_URL}>${BUILD_URL}</a></li>
<li>构建日志:<a href=${BUILD_URL}console>${BUILD_URL}console</a></li>
<li>变更集:${JELLY_SCRIPT}</li>
</div>
</ul>
</td>
</tr>
</table></font>
</body>
</html>''',
to: "yqliu@taosdata.com,pxiao@taosdata.com",
from: "support@taosdata.com"
)
}
failure {
emailext (
subject: "FAILED: Job '${env.JOB_NAME} [${env.BUILD_NUMBER}]'",
body: '''<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
</head>
<body leftmargin="8" marginwidth="0" topmargin="8" marginheight="4" offset="0">
<table width="95%" cellpadding="0" cellspacing="0" style="font-size: 16pt; font-family: Tahoma, Arial, Helvetica, sans-serif">
<tr>
<td><br />
<b><font color="#0B610B"><font size="6">构建信息</font></font></b>
<hr size="2" width="100%" align="center" /></td>
</tr>
<tr>
<td>
<ul>
<div style="font-size:18px">
<li>构建名称>>分支:${PROJECT_NAME}</li>
<li>构建结果:<span style="color:green"> Successful </span></li>
<li>构建编号:${BUILD_NUMBER}</li>
<li>触发用户:${CAUSE}</li>
<li>变更概要:${CHANGES}</li>
<li>构建地址:<a href=${BUILD_URL}>${BUILD_URL}</a></li>
<li>构建日志:<a href=${BUILD_URL}console>${BUILD_URL}console</a></li>
<li>变更集:${JELLY_SCRIPT}</li>
</div>
</ul>
</td>
</tr>
</table></font>
</body>
</html>''',
to: "yqliu@taosdata.com,pxiao@taosdata.com",
from: "support@taosdata.com"
)
}
}
}
\ No newline at end of file
......@@ -64,18 +64,25 @@ function runQueryPerfTest {
[ -f $PERFORMANCE_TEST_REPORT ] && rm $PERFORMANCE_TEST_REPORT
nohup $WORK_DIR/TDengine/debug/build/bin/taosd -c /etc/taosperf/ > /dev/null 2>&1 &
echoInfo "Wait TDengine to start"
sleep 300
sleep 60
echoInfo "Run Performance Test"
cd $WORK_DIR/TDengine/tests/pytest
python3 query/queryPerformance.py -c $LOCAL_COMMIT | tee -a $PERFORMANCE_TEST_REPORT
mkdir -p /var/lib/perf/
mkdir -p /var/log/perf/
rm -rf /var/lib/perf/*
rm -rf /var/log/perf/*
nohup $WORK_DIR/TDengine/debug/build/bin/taosd -c /etc/perf/ > /dev/null 2>&1 &
echoInfo "Wait TDengine to start"
sleep 10
echoInfo "Run Performance Test"
cd $WORK_DIR/TDengine/tests/pytest
python3 insert/insertFromCSVPerformance.py -c $LOCAL_COMMIT | tee -a $PERFORMANCE_TEST_REPORT
python3 tools/taosdemoPerformance.py -c $LOCAL_COMMIT | tee -a $PERFORMANCE_TEST_REPORT
#python3 perfbenchmark/joinPerformance.py | tee -a $PERFORMANCE_TEST_REPORT
}
......
......@@ -22,7 +22,7 @@ from queue import Queue, Empty
from .shared.config import Config
from .shared.db import DbTarget, DbConn
from .shared.misc import Logging, Helper, CrashGenError, Status, Progress, Dice
from .shared.types import DirPath
from .shared.types import DirPath, IpcStream
# from crash_gen.misc import CrashGenError, Dice, Helper, Logging, Progress, Status
# from crash_gen.db import DbConn, DbTarget
......@@ -177,13 +177,12 @@ quorum 2
return "127.0.0.1"
def getServiceCmdLine(self): # to start the instance
cmdLine = []
if Config.getConfig().track_memory_leaks:
Logging.info("Invoking VALGRIND on service...")
cmdLine = ['valgrind', '--leak-check=yes']
return ['exec /usr/bin/valgrind', '--leak-check=yes', self.getExecFile(), '-c', self.getCfgDir()]
else:
# TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control
cmdLine += ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen()
return cmdLine
return ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen()
def _getDnodes(self, dbc):
dbc.query("show dnodes")
......@@ -281,16 +280,16 @@ class TdeSubProcess:
return '[TdeSubProc: pid = {}, status = {}]'.format(
self.getPid(), self.getStatus() )
def getStdOut(self) -> BinaryIO :
def getIpcStdOut(self) -> IpcStream :
if self._popen.universal_newlines : # alias of text_mode
raise CrashGenError("We need binary mode for STDOUT IPC")
# Logging.info("Type of stdout is: {}".format(type(self._popen.stdout)))
return typing.cast(BinaryIO, self._popen.stdout)
return typing.cast(IpcStream, self._popen.stdout)
def getStdErr(self) -> BinaryIO :
def getIpcStdErr(self) -> IpcStream :
if self._popen.universal_newlines : # alias of text_mode
raise CrashGenError("We need binary mode for STDERR IPC")
return typing.cast(BinaryIO, self._popen.stderr)
return typing.cast(IpcStream, self._popen.stderr)
# Now it's always running, since we matched the life cycle
# def isRunning(self):
......@@ -302,11 +301,6 @@ class TdeSubProcess:
def _start(self, cmdLine) -> Popen :
ON_POSIX = 'posix' in sys.builtin_module_names
# Sanity check
# if self.subProcess: # already there
# raise RuntimeError("Corrupt process state")
# Prepare environment variables for coverage information
# Ref: https://stackoverflow.com/questions/2231227/python-subprocess-popen-with-a-modified-environment
myEnv = os.environ.copy()
......@@ -314,9 +308,8 @@ class TdeSubProcess:
# print(myEnv)
# print("Starting TDengine with env: ", myEnv.items())
# print("Starting TDengine via Shell: {}".format(cmdLineStr))
print("Starting TDengine: {}".format(cmdLine))
# useShell = True # Needed to pass environments into it
return Popen(
' '.join(cmdLine), # ' '.join(cmdLine) if useShell else cmdLine,
shell=True, # Always use shell, since we need to pass ENV vars
......@@ -732,19 +725,19 @@ class ServiceManagerThread:
self._ipcQueue = Queue() # type: Queue
self._thread = threading.Thread( # First thread captures server OUTPUT
target=self.svcOutputReader,
args=(subProc.getStdOut(), self._ipcQueue, logDir))
args=(subProc.getIpcStdOut(), self._ipcQueue, logDir))
self._thread.daemon = True # thread dies with the program
self._thread.start()
time.sleep(0.01)
if not self._thread.is_alive(): # What happened?
Logging.info("Failed to started process to monitor STDOUT")
Logging.info("Failed to start process to monitor STDOUT")
self.stop()
raise CrashGenError("Failed to start thread to monitor STDOUT")
Logging.info("Successfully started process to monitor STDOUT")
self._thread2 = threading.Thread( # 2nd thread captures server ERRORs
target=self.svcErrorReader,
args=(subProc.getStdErr(), self._ipcQueue, logDir))
args=(subProc.getIpcStdErr(), self._ipcQueue, logDir))
self._thread2.daemon = True # thread dies with the program
self._thread2.start()
time.sleep(0.01)
......@@ -887,14 +880,19 @@ class ServiceManagerThread:
print("\nNon-UTF8 server output: {}\n".format(bChunk.decode('cp437')))
return None
def _textChunkGenerator(self, streamIn: BinaryIO, logDir: str, logFile: str
def _textChunkGenerator(self, streamIn: IpcStream, logDir: str, logFile: str
) -> Generator[TextChunk, None, None]:
'''
Take an input stream with binary data, produced a generator of decoded
"text chunks", and also save the original binary data in a log file.
Take an input stream with binary data (likely from Popen), produced a generator of decoded
"text chunks".
Side effect: it also save the original binary data in a log file.
'''
os.makedirs(logDir, exist_ok=True)
logF = open(os.path.join(logDir, logFile), 'wb')
if logF is None:
Logging.error("Failed to open log file (binary write): {}/{}".format(logDir, logFile))
return
for bChunk in iter(streamIn.readline, b''):
logF.write(bChunk) # Write to log file immediately
tChunk = self._decodeBinaryChunk(bChunk) # decode
......@@ -902,14 +900,14 @@ class ServiceManagerThread:
yield tChunk # TODO: split into actual text lines
# At the end...
streamIn.close() # Close the stream
logF.close() # Close the output file
streamIn.close() # Close the incoming stream
logF.close() # Close the log file
def svcOutputReader(self, stdOut: BinaryIO, queue, logDir: str):
def svcOutputReader(self, ipcStdOut: IpcStream, queue, logDir: str):
'''
The infinite routine that processes the STDOUT stream for the sub process being managed.
:param stdOut: the IO stream object used to fetch the data from
:param ipcStdOut: the IO stream object used to fetch the data from
:param queue: the queue where we dump the roughly parsed chunk-by-chunk text data
:param logDir: where we should dump a verbatim output file
'''
......@@ -917,7 +915,7 @@ class ServiceManagerThread:
# Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
# print("This is the svcOutput Reader...")
# stdOut.readline() # Skip the first output? TODO: remove?
for tChunk in self._textChunkGenerator(stdOut, logDir, 'stdout.log') :
for tChunk in self._textChunkGenerator(ipcStdOut, logDir, 'stdout.log') :
queue.put(tChunk) # tChunk garanteed not to be None
self._printProgress("_i")
......@@ -940,12 +938,12 @@ class ServiceManagerThread:
Logging.info("EOF found TDengine STDOUT, marking the process as terminated")
self.setStatus(Status.STATUS_STOPPED)
def svcErrorReader(self, stdErr: BinaryIO, queue, logDir: str):
def svcErrorReader(self, ipcStdErr: IpcStream, queue, logDir: str):
# os.makedirs(logDir, exist_ok=True)
# logFile = os.path.join(logDir,'stderr.log')
# fErr = open(logFile, 'wb')
# for line in iter(err.readline, b''):
for tChunk in self._textChunkGenerator(stdErr, logDir, 'stderr.log') :
for tChunk in self._textChunkGenerator(ipcStdErr, logDir, 'stderr.log') :
queue.put(tChunk) # tChunk garanteed not to be None
# fErr.write(line)
Logging.info("TDengine STDERR: {}".format(tChunk))
......
from typing import Any, List, Dict, NewType
from typing import Any, BinaryIO, List, Dict, NewType
from enum import Enum
DirPath = NewType('DirPath', str)
......@@ -26,3 +26,5 @@ class TdDataType(Enum):
TdColumns = Dict[str, TdDataType]
TdTags = Dict[str, TdDataType]
IpcStream = NewType('IpcStream', BinaryIO)
\ No newline at end of file
......@@ -183,7 +183,7 @@ python3 ./test.py -f stable/query_after_reset.py
# perfbenchmark
python3 ./test.py -f perfbenchmark/bug3433.py
#python3 ./test.py -f perfbenchmark/bug3589.py
python3 ./test.py -f perfbenchmark/taosdemoInsert.py
#query
python3 ./test.py -f query/filter.py
......
......@@ -31,7 +31,7 @@ class insertFromCSVPerformace:
self.host = "127.0.0.1"
self.user = "root"
self.password = "taosdata"
self.config = "/etc/taosperf"
self.config = "/etc/perf"
self.conn = taos.connect(
self.host,
self.user,
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import taos
import sys
import os
import json
import argparse
import subprocess
import datetime
import re
from multiprocessing import cpu_count
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.dnodes import TDDnode
class Taosdemo:
def __init__(self, clearCache, dbName, keep):
self.clearCache = clearCache
self.dbname = dbName
self.drop = "yes"
self.keep = keep
self.host = "127.0.0.1"
self.user = "root"
self.password = "taosdata"
# self.config = "/etc/taosperf"
# self.conn = taos.connect(
# self.host,
# self.user,
# self.password,
# self.config)
# env config
def getBuildPath(self) -> str:
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root) - len("/debug/build/bin")]
break
return buildPath
def getExeToolsDir(self) -> str:
self.debugdir = self.getBuildPath() + "/debug/build/bin"
return self.debugdir
def getCfgDir(self) -> str:
self.config = self.getBuildPath() + "/sim/dnode1/cfg"
return self.config
# taodemo insert file config
def dbinfocfg(self) -> dict:
return {
"name": self.dbname,
"drop": self.drop,
"replica": 1,
"days": 10,
"cache": 16,
"blocks": 8,
"precision": "ms",
"keep": self.keep,
"minRows": 100,
"maxRows": 4096,
"comp": 2,
"walLevel": 1,
"cachelast": 0,
"quorum": 1,
"fsync": 3000,
"update": 0
}
def type_check(func):
def wrapper(self, **kwargs):
num_types = ["int", "float", "bigint", "tinyint", "smallint", "double"]
str_types = ["binary", "nchar"]
for k ,v in kwargs.items():
if k.lower() not in num_types and k.lower() not in str_types:
return f"args {k} type error, not allowed"
elif not isinstance(v, (int, list, tuple)):
return f"value {v} type error, not allowed"
elif k.lower() in num_types and not isinstance(v, int):
return f"arg {v} takes 1 positional argument must be type int "
elif isinstance(v, (list,tuple)) and len(v) > 2:
return f"arg {v} takes from 1 to 2 positional arguments but more than 2 were given "
elif isinstance(v,(list,tuple)) and [ False for _ in v if not isinstance(_, int) ]:
return f"arg {v} takes from 1 to 2 positional arguments must be type int "
else:
pass
return func(self, **kwargs)
return wrapper
@type_check
def column_tag_count(self, **column_tag) -> list :
init_column_tag = []
for k, v in column_tag.items():
if re.search(k, "int, float, bigint, tinyint, smallint, double", re.IGNORECASE):
init_column_tag.append({"type": k, "count": v})
elif re.search(k, "binary, nchar", re.IGNORECASE):
if isinstance(v, int):
init_column_tag.append({"type": k, "count": v, "len":8})
elif len(v) == 1:
init_column_tag.append({"type": k, "count": v[0], "len": 8})
else:
init_column_tag.append({"type": k, "count": v[0], "len": v[1]})
return init_column_tag
def stbcfg(self, stb: str, child_tab_count: int, rows: int, prechildtab: str, columns: dict, tags: dict) -> dict:
return {
"name": stb,
"child_table_exists": "no",
"childtable_count": child_tab_count,
"childtable_prefix": prechildtab,
"auto_create_table": "no",
"batch_create_tbl_num": 10,
"data_source": "rand",
"insert_mode": "taosc",
"insert_rows": rows,
"childtable_limit": 0,
"childtable_offset": 0,
"rows_per_tbl": 1,
"max_sql_len": 65480,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 10,
"start_timestamp": f"{datetime.datetime.now():%F %X}",
"sample_format": "csv",
"sample_file": "./sample.csv",
"tags_file": "",
"columns": self.column_tag_count(**columns),
"tags": self.column_tag_count(**tags)
}
def schemecfg(self,intcount=1,floatcount=0,bcount=0,tcount=0,scount=0,doublecount=0,binarycount=0,ncharcount=0):
return {
"INT": intcount,
"FLOAT": floatcount,
"BIGINT": bcount,
"TINYINT": tcount,
"SMALLINT": scount,
"DOUBLE": doublecount,
"BINARY": binarycount,
"NCHAR": ncharcount
}
def insertcfg(self,db: dict, stbs: list) -> dict:
return {
"filetype": "insert",
"cfgdir": self.config,
"host": self.host,
"port": 6030,
"user": self.user,
"password": self.password,
"thread_count": cpu_count(),
"thread_count_create_tbl": cpu_count(),
"result_file": "/tmp/insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
"num_of_records_per_req": 100,
"max_sql_len": 1024000,
"databases": [{
"dbinfo": db,
"super_tables": stbs
}]
}
def createinsertfile(self,db: dict, stbs: list) -> str:
date = datetime.datetime.now()
file_create_table = f"/tmp/insert_{date:%F-%H%M}.json"
with open(file_create_table, 'w') as f:
json.dump(self.insertcfg(db, stbs), f)
return file_create_table
# taosdemo query file config
def querysqls(self, sql: str) -> list:
return [{"sql":sql,"result":""}]
def querycfg(self, sql: str) -> dict:
return {
"filetype": "query",
"cfgdir": self.config,
"host": self.host,
"port": 6030,
"user": self.user,
"password": self.password,
"confirm_parameter_prompt": "yes",
"query_times": 10,
"query_mode": "taosc",
"databases": self.dbname,
"specified_table_query": {
"query_interval": 0,
"concurrent": cpu_count(),
"sqls": self.querysqls(sql)
}
}
def createqueryfile(self, sql: str):
date = datetime.datetime.now()
file_query_table = f"/tmp/query_{date:%F-%H%M}.json"
with open(file_query_table,"w") as f:
json.dump(self.querycfg(sql), f)
return file_query_table
# Execute taosdemo, and delete temporary files when finished
def taosdemotable(self, filepath: str, resultfile="/dev/null"):
taosdemopath = self.getBuildPath() + "/debug/build/bin"
with open(filepath,"r") as f:
filetype = json.load(f)["filetype"]
if filetype == "insert":
taosdemo_table_cmd = f"{taosdemopath}/taosdemo -f {filepath} > {resultfile} 2>&1"
else:
taosdemo_table_cmd = f"yes | {taosdemopath}/taosdemo -f {filepath} > {resultfile} 2>&1"
try:
_ = subprocess.check_output(taosdemo_table_cmd, shell=True).decode("utf-8")
except subprocess.CalledProcessError as e:
_ = e.output
def droptmpfile(self, filepath: str):
drop_file_cmd = f"[ -f {filepath} ] && rm -f {filepath}"
try:
_ = subprocess.check_output(drop_file_cmd, shell=True).decode("utf-8")
except subprocess.CalledProcessError as e:
_ = e.output
# TODO:需要完成TD-4153的数据插入和客户端请求的性能查询。
def td4153insert(self):
tdLog.printNoPrefix("========== start to create table and insert data ==========")
self.dbname = "td4153"
db = self.dbinfocfg()
stblist = []
columntype = self.schemecfg(intcount=1, ncharcount=100)
tagtype = self.schemecfg(intcount=1)
stbname = "stb1"
prechild = "t1"
stable = self.stbcfg(
stb=stbname,
prechildtab=prechild,
child_tab_count=2,
rows=10000,
columns=columntype,
tags=tagtype
)
stblist.append(stable)
insertfile = self.createinsertfile(db=db, stbs=stblist)
nmon_file = f"/tmp/insert_{datetime.datetime.now():%F-%H%M}.nmon"
cmd = f"nmon -s5 -F {nmon_file} -m /tmp/"
try:
_ = subprocess.check_output(cmd, shell=True).decode("utf-8")
except subprocess.CalledProcessError as e:
_ = e.output
self.taosdemotable(insertfile)
self.droptmpfile(insertfile)
self.droptmpfile("/tmp/insert_res.txt")
# In order to prevent too many performance files from being generated, the nmon file is deleted.
# and the delete statement can be cancelled during the actual test.
self.droptmpfile(nmon_file)
cmd = f"ps -ef|grep -w nmon| grep -v grep | awk '{{print $2}}'"
try:
time.sleep(10)
_ = subprocess.check_output(cmd,shell=True).decode("utf-8")
except BaseException as e:
raise e
def td4153query(self):
tdLog.printNoPrefix("========== start to query operation ==========")
sqls = {
"select_all": "select * from stb1",
"select_join": "select * from t10, t11 where t10.ts=t11.ts"
}
for type, sql in sqls.items():
result_file = f"/tmp/queryResult_{type}.log"
query_file = self.createqueryfile(sql)
try:
self.taosdemotable(query_file, resultfile=result_file)
except subprocess.CalledProcessError as e:
out_put = e.output
if result_file:
print(f"execute rows {type.split('_')[1]} sql, the sql is: {sql}")
max_sql_time_cmd = f'''
grep -o Spent.*s {result_file} |awk 'NR==1{{max=$2;next}}{{max=max>$2?max:$2}}END{{print "Max=",max,"s"}}'
'''
max_sql_time = subprocess.check_output(max_sql_time_cmd, shell=True).decode("UTF-8")
print(f"{type.split('_')[1]} rows sql time : {max_sql_time}")
min_sql_time_cmd = f'''
grep -o Spent.*s {result_file} |awk 'NR==1{{min=$2;next}}{{min=min<$2?min:$2}}END{{print "Min=",min,"s"}}'
'''
min_sql_time = subprocess.check_output(min_sql_time_cmd, shell=True).decode("UTF-8")
print(f"{type.split('_')[1]} rows sql time : {min_sql_time}")
avg_sql_time_cmd = f'''
grep -o Spent.*s {result_file} |awk '{{sum+=$2}}END{{print "Average=",sum/NR,"s"}}'
'''
avg_sql_time = subprocess.check_output(avg_sql_time_cmd, shell=True).decode("UTF-8")
print(f"{type.split('_')[1]} rows sql time : {avg_sql_time}")
self.droptmpfile(query_file)
self.droptmpfile(result_file)
drop_query_tmt_file_cmd = " find ./ -name 'querySystemInfo-*' -type f -exec rm {} \; "
try:
_ = subprocess.check_output(drop_query_tmt_file_cmd, shell=True).decode("utf-8")
except subprocess.CalledProcessError as e:
_ = e.output
pass
def td4153(self):
self.td4153insert()
self.td4153query()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'-r',
'--remove-cache',
action='store_true',
default=False,
help='clear cache before query (default: False)')
parser.add_argument(
'-d',
'--database-name',
action='store',
default='db',
type=str,
help='Database name to be created (default: db)')
parser.add_argument(
'-k',
'--keep-time',
action='store',
default=3650,
type=int,
help='Database keep parameters (default: 3650)')
args = parser.parse_args()
taosdemo = Taosdemo(args.remove_cache, args.database_name, args.keep_time)
# taosdemo.conn = taos.connect(
# taosdemo.host,
# taosdemo.user,
# taosdemo.password,
# taosdemo.config
# )
debugdir = taosdemo.getExeToolsDir()
cfgdir = taosdemo.getCfgDir()
cmd = f"{debugdir}/taosd -c {cfgdir} >/dev/null 2>&1 &"
try:
_ = subprocess.check_output(cmd, shell=True).decode("utf-8")
except subprocess.CalledProcessError as e:
_ = e.output
if taosdemo.clearCache:
# must be root permission
subprocess.check_output("echo 3 > /proc/sys/vm/drop_caches", shell=True).decode("utf-8")
taosdemo.td4153()
......@@ -24,7 +24,7 @@ class taosdemoPerformace:
self.host = "127.0.0.1"
self.user = "root"
self.password = "taosdata"
self.config = "/etc/taosperf"
self.config = "/etc/perf"
self.conn = taos.connect(
self.host,
self.user,
......@@ -77,7 +77,7 @@ class taosdemoPerformace:
insert_data = {
"filetype": "insert",
"cfgdir": "/etc/taosperf",
"cfgdir": "/etc/perf",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
......
......@@ -887,10 +887,16 @@ sql_error select tbname, t1 from select_tags_mt0 interval(1y);
#valid sql: select first(c1), last(c2), count(*) from select_tags_mt0 group by tbname, t1;
#valid sql: select first(c1), tbname, t1 from select_tags_mt0 group by t2;
print ==================================>TD-4231
sql_error select t1,tbname from select_tags_mt0 where c1<0
sql_error select t1,tbname from select_tags_mt0 where c1<0 and tbname in ('select_tags_tb12')
sql select tbname from select_tags_mt0 where tbname in ('select_tags_tb12');
sql_error select first(c1), last(c2), t1 from select_tags_mt0 group by tbname;
sql_error select first(c1), last(c2), tbname, t2 from select_tags_mt0 group by tbname;
sql_error select first(c1), count(*), t2, t1, tbname from select_tags_mt0 group by tbname;
# this sql is valid: select first(c1), t2 from select_tags_mt0 group by tbname;
#valid sql: select first(c1), t2 from select_tags_mt0 group by tbname;
#sql select first(ts), tbname from select_tags_mt0 group by tbname;
#sql select count(c1) from select_tags_mt0 where c1=99 group by tbname;
......
......@@ -158,7 +158,7 @@ if $dnode4Vtatus != offline then
sleep 2000
goto wait_dnode4_vgroup_offline
endi
if $dnode3Vtatus != master then
if $dnode3Vtatus != unsynced then
sleep 2000
goto wait_dnode4_vgroup_offline
endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册