提交 9d60e73f 编写于 作者: K kailixu

Merge branch '3.0' into enh/TD-23421-M

......@@ -573,16 +573,18 @@ function install_config() {
}
function install_share_etc() {
[ ! -d ${script_dir}/share/etc ] && return
for c in `ls ${script_dir}/share/etc/`; do
if [ -e /etc/$c ]; then
out=/etc/$c.new.`date +%F`
${csudo}cp -f ${script_dir}/share/etc/$c $out
${csudo}cp -f ${script_dir}/share/etc/$c $out ||:
else
${csudo}cp -f ${script_dir}/share/etc/$c /etc/$c
${csudo}cp -f ${script_dir}/share/etc/$c /etc/$c ||:
fi
done
${csudo} cp ${script_dir}/share/srv/* ${service_config_dir}
[ ! -d ${script_dir}/share/srv ] && return
${csudo} cp ${script_dir}/share/srv/* ${service_config_dir} ||:
}
function install_log() {
......@@ -612,7 +614,7 @@ function install_examples() {
function install_web() {
if [ -d "${script_dir}/share" ]; then
${csudo}cp -rf ${script_dir}/share/* ${install_main_dir}/share
${csudo}cp -rf ${script_dir}/share/* ${install_main_dir}/share > /dev/null 2>&1 ||:
fi
}
......
......@@ -150,7 +150,7 @@ fi
mkdir -p ${install_dir}/bin && cp ${bin_files} ${install_dir}/bin && chmod a+x ${install_dir}/bin/* || :
mkdir -p ${install_dir}/init.d && cp ${init_file_deb} ${install_dir}/init.d/${serverName}.deb
mkdir -p ${install_dir}/init.d && cp ${init_file_rpm} ${install_dir}/init.d/${serverName}.rpm
mkdir -p ${install_dir}/share && cp -rf ${build_dir}/share/{etc,srv} ${install_dir}/share
mkdir -p ${install_dir}/share && cp -rf ${build_dir}/share/{etc,srv} ${install_dir}/share ||:
if [ $adapterName != "taosadapter" ]; then
mv ${install_dir}/cfg/${clientName2}adapter.toml ${install_dir}/cfg/$adapterName.toml
......
......@@ -144,6 +144,15 @@ extern char *gStmtStatusStr[];
goto _return; \
} \
} while (0)
#define STMT_ERRI_JRET(c) \
do { \
code = c; \
if (code != TSDB_CODE_SUCCESS) { \
terrno = code; \
goto _return; \
} \
} while (0)
#define STMT_ELOG(param, ...) qError("stmt:%p " param, pStmt, __VA_ARGS__)
#define STMT_DLOG(param, ...) qDebug("stmt:%p " param, pStmt, __VA_ARGS__)
......
......@@ -699,7 +699,7 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
pReq.numOfTags = 1;
SField field = {0};
field.type = TSDB_DATA_TYPE_NCHAR;
field.bytes = 1;
field.bytes = TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
strcpy(field.name, tsSmlTagName);
taosArrayPush(pReq.pTags, &field);
}
......
......@@ -975,15 +975,17 @@ int stmtIsInsert(TAOS_STMT* stmt, int* insert) {
}
int stmtGetTagFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
int32_t code = 0;
STscStmt* pStmt = (STscStmt*)stmt;
int32_t preCode = pStmt->errCode;
STMT_DLOG_E("start to get tag fields");
if (STMT_TYPE_QUERY == pStmt->sql.type) {
STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR);
STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
......@@ -995,27 +997,33 @@ int stmtGetTagFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
pStmt->exec.pRequest = NULL;
}
STMT_ERR_RET(stmtCreateRequest(pStmt));
STMT_ERRI_JRET(stmtCreateRequest(pStmt));
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
STMT_ERRI_JRET(stmtParseSql(pStmt));
}
STMT_ERR_RET(stmtFetchTagFields(stmt, nums, fields));
STMT_ERRI_JRET(stmtFetchTagFields(stmt, nums, fields));
return TSDB_CODE_SUCCESS;
_return:
pStmt->errCode = preCode;
return code;
}
int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
int32_t code = 0;
STscStmt* pStmt = (STscStmt*)stmt;
int32_t preCode = pStmt->errCode;
STMT_DLOG_E("start to get col fields");
if (STMT_TYPE_QUERY == pStmt->sql.type) {
STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR);
STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
......@@ -1027,15 +1035,19 @@ int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
pStmt->exec.pRequest = NULL;
}
STMT_ERR_RET(stmtCreateRequest(pStmt));
STMT_ERRI_JRET(stmtCreateRequest(pStmt));
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
STMT_ERRI_JRET(stmtParseSql(pStmt));
}
STMT_ERR_RET(stmtFetchColFields(stmt, nums, fields));
STMT_ERRI_JRET(stmtFetchColFields(stmt, nums, fields));
return TSDB_CODE_SUCCESS;
_return:
pStmt->errCode = preCode;
return code;
}
int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
......
......@@ -114,11 +114,11 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
if (code != 0) {
if (terrno != 0) code = terrno;
dGError("vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr(code));
dGError("vnodeProcessFetchMsg vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr());
vmSendRsp(pMsg, code);
}
dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
dGTrace("vnodeProcessFetchMsg vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
......
......@@ -156,7 +156,7 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
size_t valueLen = 0;
valueLen = strlen(stb);
size += sizeof(int32_t);
size += keyLen;
size += valueLen;
stb = taosHashIterate(pUser->writeTbs, stb);
}
......@@ -373,7 +373,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
int32_t valuelen = 0;
SDB_GET_INT32(pRaw, dataPos, &valuelen, _OVER);
char *value = taosMemoryCalloc(valuelen, sizeof(char));
memset(value, 0, keyLen);
memset(value, 0, valuelen);
SDB_GET_BINARY(pRaw, dataPos, value, valuelen, _OVER)
taosHashPut(pUser->writeTbs, key, keyLen, value, valuelen);
......@@ -462,6 +462,31 @@ SHashObj *mndDupTableHash(SHashObj *pOld) {
return pNew;
}
SHashObj *mndDupUseDbHash(SHashObj *pOld) {
SHashObj *pNew =
taosHashInit(taosHashGetSize(pOld), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (pNew == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
int32_t *db = taosHashIterate(pOld, NULL);
while (db != NULL) {
size_t keyLen = 0;
char *key = taosHashGetKey(db, &keyLen);
if (taosHashPut(pNew, key, keyLen, db, sizeof(*db)) != 0) {
taosHashCancelIterate(pOld, db);
taosHashCleanup(pNew);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
db = taosHashIterate(pOld, db);
}
return pNew;
}
static int32_t mndUserDupObj(SUserObj *pUser, SUserObj *pNew) {
memcpy(pNew, pUser, sizeof(SUserObj));
pNew->authVersion++;
......@@ -473,7 +498,7 @@ static int32_t mndUserDupObj(SUserObj *pUser, SUserObj *pNew) {
pNew->readTbs = mndDupTableHash(pUser->readTbs);
pNew->writeTbs = mndDupTableHash(pUser->writeTbs);
pNew->topics = mndDupTopicHash(pUser->topics);
pNew->useDbs = mndDupDbHash(pUser->useDbs);
pNew->useDbs = mndDupUseDbHash(pUser->useDbs);
taosRUnLockLatch(&pUser->lock);
if (pNew->readDbs == NULL || pNew->writeDbs == NULL || pNew->topics == NULL) {
......
......@@ -296,10 +296,9 @@ void tqCloseReader(STqReader* pReader) {
int32_t tqSeekVer(STqReader* pReader, int64_t ver, const char* id) {
if (walReadSeekVer(pReader->pWalReader, ver) < 0) {
tqDebug("tmq poll: wal reader failed to seek to ver:%"PRId64" code:%s, %s", ver, tstrerror(terrno), id);
return -1;
}
tqDebug("tmq poll: wal reader seek to ver:%"PRId64" %s", ver, id);
tqDebug("tmq poll: wal reader seek to ver success ver:%"PRId64" %s", ver, id);
return 0;
}
......
......@@ -1400,10 +1400,10 @@ int32_t ctgChkSetAuthRes(SCatalog* pCtg, SCtgAuthReq* req, SCtgAuthRsp* res) {
pRes->pass = false;
pRes->pCond = NULL;
// if (!pInfo->enable) {
// pRes->pass = false;
// return TSDB_CODE_SUCCESS;
// }
if (!pInfo->enable) {
pRes->pass = false;
return TSDB_CODE_SUCCESS;
}
if (pInfo->superAuth) {
pRes->pass = true;
......@@ -1453,7 +1453,8 @@ int32_t ctgChkSetAuthRes(SCatalog* pCtg, SCtgAuthReq* req, SCtgAuthRsp* res) {
}
case AUTH_TYPE_READ_OR_WRITE: {
if ((pInfo->readDbs && taosHashGet(pInfo->readDbs, dbFName, strlen(dbFName))) ||
(pInfo->writeDbs && taosHashGet(pInfo->writeDbs, dbFName, strlen(dbFName)))) {
(pInfo->writeDbs && taosHashGet(pInfo->writeDbs, dbFName, strlen(dbFName))) ||
(pInfo->useDbs && taosHashGet(pInfo->useDbs, dbFName, strlen(dbFName)))) {
pRes->pass = true;
return TSDB_CODE_SUCCESS;
}
......
......@@ -1062,6 +1062,7 @@ int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
if ((pTaskInfo->execModel != OPTR_EXEC_MODEL_QUEUE) || (pTaskInfo->streamInfo.submit.msgStr != NULL)) {
qError("qStreamSetScanMemData err:%d,%p", pTaskInfo->execModel, pTaskInfo->streamInfo.submit.msgStr);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return -1;
}
qDebug("set the submit block for future scan");
......@@ -1125,6 +1126,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
} else {
taosRUnLockLatch(&pTaskInfo->lock);
qError("no table in table list, %s", id);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return -1;
}
}
......@@ -1144,6 +1146,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
} else {
qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid,
numOfTables, pScanInfo->currentTable, id);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return -1;
}
......@@ -1176,6 +1179,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
pScanBaseInfo->cond.twindows.skey = oldSkey;
} else {
qError("invalid pOffset->type:%d, %s", pOffset->type, id);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return -1;
}
......@@ -1190,6 +1194,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
if (setForSnapShot(sContext, pOffset->uid) != 0) {
qError("setDataForSnapShot error. uid:%" PRId64 " , %s", pOffset->uid, id);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return -1;
}
......@@ -1226,6 +1231,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
SSnapContext* sContext = pInfo->sContext;
if (setForSnapShot(sContext, pOffset->uid) != 0) {
qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return -1;
}
qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64 " %s", pOffset->uid, pOffset->ts,
......
......@@ -998,6 +998,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id) {
if (pOperator == NULL) {
qError("invalid operator, failed to find tableScanOperator %s", id);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return NULL;
}
......@@ -1006,6 +1007,7 @@ SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, con
} else {
if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
qError("invalid operator, failed to find tableScanOperator %s", id);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return NULL;
}
......
......@@ -2027,7 +2027,7 @@ static int32_t buildInsertUserAuthReq(const char* pUser, SName* pName, SArray**
SUserAuthInfo userAuth = {.type = AUTH_TYPE_WRITE};
snprintf(userAuth.user, sizeof(userAuth.user), "%s", pUser);
// tNameGetFullDbName(pName, userAuth.dbFName);
memcpy(&userAuth.tbName, pName, sizeof(SName));
taosArrayPush(*pUserAuth, &userAuth);
return TSDB_CODE_SUCCESS;
......
......@@ -207,17 +207,12 @@ int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) {
return 0;
}
// pReader->curInvalid = 1;
// pReader->curVersion = ver;
if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId,
wInfo("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId,
ver, pWal->vers.firstVer, pWal->vers.lastVer);
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1;
}
// if (ver < pWal->vers.snapshotVer) {
// }
if (walReadSeekVerImpl(pReader, ver) < 0) {
return -1;
......@@ -236,8 +231,6 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
if (pRead->curVersion != fetchVer) {
if (walReadSeekVer(pRead, fetchVer) < 0) {
// pRead->curVersion = fetchVer;
// pRead->curInvalid = 1;
return -1;
}
seeked = true;
......@@ -256,7 +249,6 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
} else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
}
// pRead->curInvalid = 1;
return -1;
}
}
......
......@@ -24,7 +24,7 @@ class TDTestCase:
tdSql.init(conn.cursor(), True)
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def checkFileContent(self, dbname="sml_db"):
def checkContent(self, dbname="sml_db"):
simClientCfg="%s/taos.cfg"%tdDnodes.getSimCfgPath()
buildPath = tdCom.getBuildPath()
cmdStr = '%s/build/bin/sml_test %s'%(buildPath, simClientCfg)
......@@ -102,7 +102,7 @@ class TDTestCase:
def run(self):
tdSql.prepare()
self.checkFileContent()
self.checkContent()
def stop(self):
tdSql.close()
......
......@@ -32,34 +32,6 @@ class TDTestCase:
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def checkFileContent(self, consumerId, queryString):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
else:
break
return
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
......
......@@ -138,34 +138,6 @@ class TDTestCase:
else:
tdLog.exit("three mnodes is not ready in 10s ")
def checkFileContent(self, consumerId, queryString):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
else:
break
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db1',
......@@ -257,7 +229,7 @@ class TDTestCase:
tdLog.exit("0 tmq consume rows error!")
if expectRowsList[0] == resultList[0]:
self.checkFileContent(consumerId, queryString)
tmqCom.checkFileContent(consumerId, queryString)
time.sleep(10)
for i in range(len(topicNameList)):
......
......@@ -5,6 +5,7 @@ import time
import socket
import os
import threading
import math
from util.log import *
from util.sql import *
......@@ -21,34 +22,6 @@ class TDTestCase:
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def checkFileContent(self, consumerId, queryString):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
else:
break
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db1',
......@@ -110,7 +83,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("0 tmq consume rows error!")
self.checkFileContent(consumerId, queryString)
tmqCom.checkFileContent(consumerId, queryString)
# reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable()
......@@ -136,7 +109,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0]))
tdLog.exit("1 tmq consume rows error!")
self.checkFileContent(consumerId, queryString)
tmqCom.checkFileContent(consumerId, queryString)
# reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable()
......@@ -162,7 +135,7 @@ class TDTestCase:
# tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[2], resultList[0]))
# tdLog.exit("2 tmq consume rows error!")
# self.checkFileContent(consumerId, queryString)
# tmqCom.checkFileContent(consumerId, queryString)
time.sleep(10)
for i in range(len(topicNameList)):
......
......@@ -21,34 +21,6 @@ class TDTestCase:
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def checkFileContent(self, consumerId, queryString):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
else:
break
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db1',
......@@ -110,7 +82,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("0 tmq consume rows error!")
self.checkFileContent(consumerId, queryString)
tmqCom.checkFileContent(consumerId, queryString)
# reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable()
......@@ -135,7 +107,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0]))
tdLog.exit("1 tmq consume rows error!")
self.checkFileContent(consumerId, queryString)
tmqCom.checkFileContent(consumerId, queryString)
# reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable()
......@@ -160,7 +132,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[2], resultList[0]))
tdLog.exit("2 tmq consume rows error!")
self.checkFileContent(consumerId, queryString)
tmqCom.checkFileContent(consumerId, queryString)
time.sleep(10)
for i in range(len(topicNameList)):
......
......@@ -10,7 +10,7 @@
###################################################################
# -*- coding: utf-8 -*-
import math
from asyncore import loop
from collections import defaultdict
import subprocess
......@@ -467,18 +467,24 @@ class TMQCom:
for i in range(0,skipRowsOfCons):
consumeFile.readline()
lines = 0
while True:
dst = queryFile.readline()
src = consumeFile.readline()
lines += 1
if dst:
if dst != src:
tdLog.info("src row: %s"%src)
tdLog.info("dst row: %s"%dst)
tdLog.exit("consumerId %d consume rows[%d] is not match the rows by direct query"%(consumerId, lines))
else:
dstSplit = dst.split(',')
srcSplit = src.split(',')
if not dst or not src:
break
if len(dstSplit) != len(srcSplit):
tdLog.exit("consumerId %d consume rows len is not match the rows by direct query,len(dstSplit):%d != len(srcSplit):%d, dst:%s, src:%s"
%(consumerId, len(dstSplit), len(srcSplit), dst, src))
for i in range(len(dstSplit)):
if srcSplit[i] != dstSplit[i]:
srcFloat = float(srcSplit[i])
dstFloat = float(dstSplit[i])
if not math.isclose(srcFloat, dstFloat, abs_tol=1e-9):
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
return
def getResultFileByTaosShell(self, consumerId, queryString):
......
......@@ -21,34 +21,6 @@ class TDTestCase:
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def checkFileContent(self, consumerId, queryString):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
else:
break
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db1',
......
......@@ -60,34 +60,6 @@ class TDTestCase:
tdLog.exit("create udf functions fail")
return
def checkFileContent(self, consumerId, queryString):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
else:
break
return
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
......@@ -201,7 +173,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("0 tmq consume rows error!")
# self.checkFileContent(consumerId, queryString)
# tmqCom.checkFileContent(consumerId, queryString)
# tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
# reinit consume info, and start tmq_sim, then check consume result
......@@ -228,7 +200,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0]))
tdLog.exit("1 tmq consume rows error!")
# self.checkFileContent(consumerId, queryString)
# tmqCom.checkFileContent(consumerId, queryString)
# tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
time.sleep(10)
......@@ -312,7 +284,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("2 tmq consume rows error!")
# self.checkFileContent(consumerId, queryString)
# tmqCom.checkFileContent(consumerId, queryString)
# tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
# reinit consume info, and start tmq_sim, then check consume result
......@@ -339,7 +311,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0]))
tdLog.exit("3 tmq consume rows error!")
# self.checkFileContent(consumerId, queryString)
# tmqCom.checkFileContent(consumerId, queryString)
# tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
time.sleep(10)
......
......@@ -60,34 +60,6 @@ class TDTestCase:
tdLog.exit("create udf functions fail")
return
def checkFileContent(self, consumerId, queryString):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
else:
break
return
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
......@@ -201,7 +173,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("0 tmq consume rows error!")
# self.checkFileContent(consumerId, queryString)
# tmqCom.checkFileContent(consumerId, queryString)
# tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
# reinit consume info, and start tmq_sim, then check consume result
......@@ -228,7 +200,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0]))
tdLog.exit("1 tmq consume rows error!")
# self.checkFileContent(consumerId, queryString)
# tmqCom.checkFileContent(consumerId, queryString)
# tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
time.sleep(10)
......@@ -312,7 +284,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("2 tmq consume rows error!")
# self.checkFileContent(consumerId, queryString)
# tmqCom.checkFileContent(consumerId, queryString)
# tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
# reinit consume info, and start tmq_sim, then check consume result
......@@ -339,7 +311,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0]))
tdLog.exit("3 tmq consume rows error!")
# self.checkFileContent(consumerId, queryString)
# tmqCom.checkFileContent(consumerId, queryString)
# tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
time.sleep(10)
......
......@@ -60,34 +60,6 @@ class TDTestCase:
tdLog.exit("create udf functions fail")
return
def checkFileContent(self, consumerId, queryString):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
else:
break
return
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
......@@ -201,7 +173,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("0 tmq consume rows error!")
self.checkFileContent(consumerId, queryString)
tmqCom.checkFileContent(consumerId, queryString)
tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
......@@ -229,7 +201,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0]))
tdLog.exit("1 tmq consume rows error!")
self.checkFileContent(consumerId, queryString)
tmqCom.checkFileContent(consumerId, queryString)
tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
time.sleep(10)
......@@ -313,7 +285,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("2 tmq consume rows error!")
self.checkFileContent(consumerId, queryString)
tmqCom.checkFileContent(consumerId, queryString)
tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
# reinit consume info, and start tmq_sim, then check consume result
......@@ -340,7 +312,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0]))
tdLog.exit("3 tmq consume rows error!")
self.checkFileContent(consumerId, queryString)
tmqCom.checkFileContent(consumerId, queryString)
tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
time.sleep(10)
......
......@@ -21,34 +21,6 @@ class TDTestCase:
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def checkFileContent(self, consumerId, queryString):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
else:
break
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db1',
......@@ -114,7 +86,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("0 tmq consume rows error!")
self.checkFileContent(consumerId, queryString)
tmqCom.checkFileContent(consumerId, queryString)
# reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable()
......@@ -140,7 +112,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0]))
tdLog.exit("1 tmq consume rows error!")
self.checkFileContent(consumerId, queryString)
tmqCom.checkFileContent(consumerId, queryString)
# reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable()
......@@ -166,7 +138,7 @@ class TDTestCase:
# tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[2], resultList[0]))
# tdLog.exit("2 tmq consume rows error!")
# self.checkFileContent(consumerId, queryString)
# tmqCom.checkFileContent(consumerId, queryString)
time.sleep(10)
for i in range(len(topicNameList)):
......
......@@ -317,7 +317,6 @@ void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, i
quotationStr[0] = '\"';
quotationStr[1] = 0;
int n;
char buf[TSDB_MAX_BYTES_PER_ROW];
switch (field->type) {
case TSDB_DATA_TYPE_BOOL:
......@@ -348,15 +347,11 @@ void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, i
taosFprintfFile(pFile, "%" PRIu64, *((uint64_t *)val));
break;
case TSDB_DATA_TYPE_FLOAT:
taosFprintfFile(pFile, "%.5f", GET_FLOAT_VAL(val));
taosFprintfFile(pFile, "%e", GET_FLOAT_VAL(val));
break;
case TSDB_DATA_TYPE_DOUBLE:
n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", length, GET_DOUBLE_VAL(val));
if (n > TMAX(25, length)) {
taosFprintfFile(pFile, "%*.15e", length, GET_DOUBLE_VAL(val));
} else {
taosFprintfFile(pFile, "%s", buf);
}
snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.15e", 23, GET_DOUBLE_VAL(val));
taosFprintfFile(pFile, "%s", buf);
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
......@@ -512,7 +507,6 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t
return;
}
int n;
char buf[TSDB_MAX_BYTES_PER_ROW];
switch (field->type) {
case TSDB_DATA_TYPE_BOOL:
......@@ -543,15 +537,11 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t
printf("%*" PRIu64, width, *((uint64_t *)val));
break;
case TSDB_DATA_TYPE_FLOAT:
printf("%*ef", width, GET_FLOAT_VAL(val));
printf("%*e", width, GET_FLOAT_VAL(val));
break;
case TSDB_DATA_TYPE_DOUBLE:
n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", width, GET_DOUBLE_VAL(val));
if (n > TMAX(25, width)) {
printf("%*.15e", width, GET_DOUBLE_VAL(val));
} else {
printf("%s", buf);
}
snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%.9e", GET_DOUBLE_VAL(val));
printf("%*s", width, buf);
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册