未验证 提交 e67b7842 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #20881 from taosdata/fix/3.0_merge_main

merge main
...@@ -573,16 +573,18 @@ function install_config() { ...@@ -573,16 +573,18 @@ function install_config() {
} }
function install_share_etc() { function install_share_etc() {
[ ! -d ${script_dir}/share/etc ] && return
for c in `ls ${script_dir}/share/etc/`; do for c in `ls ${script_dir}/share/etc/`; do
if [ -e /etc/$c ]; then if [ -e /etc/$c ]; then
out=/etc/$c.new.`date +%F` out=/etc/$c.new.`date +%F`
${csudo}cp -f ${script_dir}/share/etc/$c $out ${csudo}cp -f ${script_dir}/share/etc/$c $out ||:
else else
${csudo}cp -f ${script_dir}/share/etc/$c /etc/$c ${csudo}cp -f ${script_dir}/share/etc/$c /etc/$c ||:
fi fi
done done
${csudo} cp ${script_dir}/share/srv/* ${service_config_dir} [ ! -d ${script_dir}/share/srv ] && return
${csudo} cp ${script_dir}/share/srv/* ${service_config_dir} ||:
} }
function install_log() { function install_log() {
...@@ -612,7 +614,7 @@ function install_examples() { ...@@ -612,7 +614,7 @@ function install_examples() {
function install_web() { function install_web() {
if [ -d "${script_dir}/share" ]; then if [ -d "${script_dir}/share" ]; then
${csudo}cp -rf ${script_dir}/share/* ${install_main_dir}/share ${csudo}cp -rf ${script_dir}/share/* ${install_main_dir}/share > /dev/null 2>&1 ||:
fi fi
} }
......
...@@ -150,7 +150,7 @@ fi ...@@ -150,7 +150,7 @@ fi
mkdir -p ${install_dir}/bin && cp ${bin_files} ${install_dir}/bin && chmod a+x ${install_dir}/bin/* || : mkdir -p ${install_dir}/bin && cp ${bin_files} ${install_dir}/bin && chmod a+x ${install_dir}/bin/* || :
mkdir -p ${install_dir}/init.d && cp ${init_file_deb} ${install_dir}/init.d/${serverName}.deb mkdir -p ${install_dir}/init.d && cp ${init_file_deb} ${install_dir}/init.d/${serverName}.deb
mkdir -p ${install_dir}/init.d && cp ${init_file_rpm} ${install_dir}/init.d/${serverName}.rpm mkdir -p ${install_dir}/init.d && cp ${init_file_rpm} ${install_dir}/init.d/${serverName}.rpm
mkdir -p ${install_dir}/share && cp -rf ${build_dir}/share/{etc,srv} ${install_dir}/share mkdir -p ${install_dir}/share && cp -rf ${build_dir}/share/{etc,srv} ${install_dir}/share ||:
if [ $adapterName != "taosadapter" ]; then if [ $adapterName != "taosadapter" ]; then
mv ${install_dir}/cfg/${clientName2}adapter.toml ${install_dir}/cfg/$adapterName.toml mv ${install_dir}/cfg/${clientName2}adapter.toml ${install_dir}/cfg/$adapterName.toml
......
...@@ -144,6 +144,15 @@ extern char *gStmtStatusStr[]; ...@@ -144,6 +144,15 @@ extern char *gStmtStatusStr[];
goto _return; \ goto _return; \
} \ } \
} while (0) } while (0)
#define STMT_ERRI_JRET(c) \
do { \
code = c; \
if (code != TSDB_CODE_SUCCESS) { \
terrno = code; \
goto _return; \
} \
} while (0)
#define STMT_ELOG(param, ...) qError("stmt:%p " param, pStmt, __VA_ARGS__) #define STMT_ELOG(param, ...) qError("stmt:%p " param, pStmt, __VA_ARGS__)
#define STMT_DLOG(param, ...) qDebug("stmt:%p " param, pStmt, __VA_ARGS__) #define STMT_DLOG(param, ...) qDebug("stmt:%p " param, pStmt, __VA_ARGS__)
......
...@@ -699,7 +699,7 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns, ...@@ -699,7 +699,7 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
pReq.numOfTags = 1; pReq.numOfTags = 1;
SField field = {0}; SField field = {0};
field.type = TSDB_DATA_TYPE_NCHAR; field.type = TSDB_DATA_TYPE_NCHAR;
field.bytes = 1; field.bytes = TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
strcpy(field.name, tsSmlTagName); strcpy(field.name, tsSmlTagName);
taosArrayPush(pReq.pTags, &field); taosArrayPush(pReq.pTags, &field);
} }
......
...@@ -975,15 +975,17 @@ int stmtIsInsert(TAOS_STMT* stmt, int* insert) { ...@@ -975,15 +975,17 @@ int stmtIsInsert(TAOS_STMT* stmt, int* insert) {
} }
int stmtGetTagFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) { int stmtGetTagFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
int32_t code = 0;
STscStmt* pStmt = (STscStmt*)stmt; STscStmt* pStmt = (STscStmt*)stmt;
int32_t preCode = pStmt->errCode;
STMT_DLOG_E("start to get tag fields"); STMT_DLOG_E("start to get tag fields");
if (STMT_TYPE_QUERY == pStmt->sql.type) { if (STMT_TYPE_QUERY == pStmt->sql.type) {
STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR); STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
} }
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS)); STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 && if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
STMT_TYPE_MULTI_INSERT != pStmt->sql.type) { STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
...@@ -995,27 +997,33 @@ int stmtGetTagFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) { ...@@ -995,27 +997,33 @@ int stmtGetTagFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
pStmt->exec.pRequest = NULL; pStmt->exec.pRequest = NULL;
} }
STMT_ERR_RET(stmtCreateRequest(pStmt)); STMT_ERRI_JRET(stmtCreateRequest(pStmt));
if (pStmt->bInfo.needParse) { if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt)); STMT_ERRI_JRET(stmtParseSql(pStmt));
} }
STMT_ERR_RET(stmtFetchTagFields(stmt, nums, fields)); STMT_ERRI_JRET(stmtFetchTagFields(stmt, nums, fields));
return TSDB_CODE_SUCCESS; _return:
pStmt->errCode = preCode;
return code;
} }
int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) { int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
int32_t code = 0;
STscStmt* pStmt = (STscStmt*)stmt; STscStmt* pStmt = (STscStmt*)stmt;
int32_t preCode = pStmt->errCode;
STMT_DLOG_E("start to get col fields"); STMT_DLOG_E("start to get col fields");
if (STMT_TYPE_QUERY == pStmt->sql.type) { if (STMT_TYPE_QUERY == pStmt->sql.type) {
STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR); STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
} }
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS)); STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 && if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
STMT_TYPE_MULTI_INSERT != pStmt->sql.type) { STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
...@@ -1027,15 +1035,19 @@ int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) { ...@@ -1027,15 +1035,19 @@ int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
pStmt->exec.pRequest = NULL; pStmt->exec.pRequest = NULL;
} }
STMT_ERR_RET(stmtCreateRequest(pStmt)); STMT_ERRI_JRET(stmtCreateRequest(pStmt));
if (pStmt->bInfo.needParse) { if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt)); STMT_ERRI_JRET(stmtParseSql(pStmt));
} }
STMT_ERR_RET(stmtFetchColFields(stmt, nums, fields)); STMT_ERRI_JRET(stmtFetchColFields(stmt, nums, fields));
return TSDB_CODE_SUCCESS; _return:
pStmt->errCode = preCode;
return code;
} }
int stmtGetParamNum(TAOS_STMT* stmt, int* nums) { int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
......
...@@ -114,11 +114,11 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -114,11 +114,11 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
if (code != 0) { if (code != 0) {
if (terrno != 0) code = terrno; if (terrno != 0) code = terrno;
dGError("vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr(code)); dGError("vnodeProcessFetchMsg vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr());
vmSendRsp(pMsg, code); vmSendRsp(pMsg, code);
} }
dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code); dGTrace("vnodeProcessFetchMsg vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
......
...@@ -156,7 +156,7 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) { ...@@ -156,7 +156,7 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
size_t valueLen = 0; size_t valueLen = 0;
valueLen = strlen(stb); valueLen = strlen(stb);
size += sizeof(int32_t); size += sizeof(int32_t);
size += keyLen; size += valueLen;
stb = taosHashIterate(pUser->writeTbs, stb); stb = taosHashIterate(pUser->writeTbs, stb);
} }
...@@ -369,7 +369,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) { ...@@ -369,7 +369,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
int32_t valuelen = 0; int32_t valuelen = 0;
SDB_GET_INT32(pRaw, dataPos, &valuelen, _OVER); SDB_GET_INT32(pRaw, dataPos, &valuelen, _OVER);
char *value = taosMemoryCalloc(valuelen, sizeof(char)); char *value = taosMemoryCalloc(valuelen, sizeof(char));
memset(value, 0, keyLen); memset(value, 0, valuelen);
SDB_GET_BINARY(pRaw, dataPos, value, valuelen, _OVER) SDB_GET_BINARY(pRaw, dataPos, value, valuelen, _OVER)
taosHashPut(pUser->writeTbs, key, keyLen, value, valuelen); taosHashPut(pUser->writeTbs, key, keyLen, value, valuelen);
...@@ -458,6 +458,31 @@ SHashObj *mndDupTableHash(SHashObj *pOld) { ...@@ -458,6 +458,31 @@ SHashObj *mndDupTableHash(SHashObj *pOld) {
return pNew; return pNew;
} }
SHashObj *mndDupUseDbHash(SHashObj *pOld) {
SHashObj *pNew =
taosHashInit(taosHashGetSize(pOld), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (pNew == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
int32_t *db = taosHashIterate(pOld, NULL);
while (db != NULL) {
size_t keyLen = 0;
char *key = taosHashGetKey(db, &keyLen);
if (taosHashPut(pNew, key, keyLen, db, sizeof(*db)) != 0) {
taosHashCancelIterate(pOld, db);
taosHashCleanup(pNew);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
db = taosHashIterate(pOld, db);
}
return pNew;
}
static int32_t mndUserDupObj(SUserObj *pUser, SUserObj *pNew) { static int32_t mndUserDupObj(SUserObj *pUser, SUserObj *pNew) {
memcpy(pNew, pUser, sizeof(SUserObj)); memcpy(pNew, pUser, sizeof(SUserObj));
pNew->authVersion++; pNew->authVersion++;
...@@ -469,7 +494,7 @@ static int32_t mndUserDupObj(SUserObj *pUser, SUserObj *pNew) { ...@@ -469,7 +494,7 @@ static int32_t mndUserDupObj(SUserObj *pUser, SUserObj *pNew) {
pNew->readTbs = mndDupTableHash(pUser->readTbs); pNew->readTbs = mndDupTableHash(pUser->readTbs);
pNew->writeTbs = mndDupTableHash(pUser->writeTbs); pNew->writeTbs = mndDupTableHash(pUser->writeTbs);
pNew->topics = mndDupTopicHash(pUser->topics); pNew->topics = mndDupTopicHash(pUser->topics);
pNew->useDbs = mndDupDbHash(pUser->useDbs); pNew->useDbs = mndDupUseDbHash(pUser->useDbs);
taosRUnLockLatch(&pUser->lock); taosRUnLockLatch(&pUser->lock);
if (pNew->readDbs == NULL || pNew->writeDbs == NULL || pNew->topics == NULL) { if (pNew->readDbs == NULL || pNew->writeDbs == NULL || pNew->topics == NULL) {
......
...@@ -296,10 +296,9 @@ void tqCloseReader(STqReader* pReader) { ...@@ -296,10 +296,9 @@ void tqCloseReader(STqReader* pReader) {
int32_t tqSeekVer(STqReader* pReader, int64_t ver, const char* id) { int32_t tqSeekVer(STqReader* pReader, int64_t ver, const char* id) {
if (walReadSeekVer(pReader->pWalReader, ver) < 0) { if (walReadSeekVer(pReader->pWalReader, ver) < 0) {
tqDebug("tmq poll: wal reader failed to seek to ver:%"PRId64" code:%s, %s", ver, tstrerror(terrno), id);
return -1; return -1;
} }
tqDebug("tmq poll: wal reader seek to ver:%"PRId64" %s", ver, id); tqDebug("tmq poll: wal reader seek to ver success ver:%"PRId64" %s", ver, id);
return 0; return 0;
} }
......
...@@ -1400,10 +1400,10 @@ int32_t ctgChkSetAuthRes(SCatalog* pCtg, SCtgAuthReq* req, SCtgAuthRsp* res) { ...@@ -1400,10 +1400,10 @@ int32_t ctgChkSetAuthRes(SCatalog* pCtg, SCtgAuthReq* req, SCtgAuthRsp* res) {
pRes->pass = false; pRes->pass = false;
pRes->pCond = NULL; pRes->pCond = NULL;
// if (!pInfo->enable) { if (!pInfo->enable) {
// pRes->pass = false; pRes->pass = false;
// return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
// } }
if (pInfo->superAuth) { if (pInfo->superAuth) {
pRes->pass = true; pRes->pass = true;
...@@ -1453,7 +1453,8 @@ int32_t ctgChkSetAuthRes(SCatalog* pCtg, SCtgAuthReq* req, SCtgAuthRsp* res) { ...@@ -1453,7 +1453,8 @@ int32_t ctgChkSetAuthRes(SCatalog* pCtg, SCtgAuthReq* req, SCtgAuthRsp* res) {
} }
case AUTH_TYPE_READ_OR_WRITE: { case AUTH_TYPE_READ_OR_WRITE: {
if ((pInfo->readDbs && taosHashGet(pInfo->readDbs, dbFName, strlen(dbFName))) || if ((pInfo->readDbs && taosHashGet(pInfo->readDbs, dbFName, strlen(dbFName))) ||
(pInfo->writeDbs && taosHashGet(pInfo->writeDbs, dbFName, strlen(dbFName)))) { (pInfo->writeDbs && taosHashGet(pInfo->writeDbs, dbFName, strlen(dbFName))) ||
(pInfo->useDbs && taosHashGet(pInfo->useDbs, dbFName, strlen(dbFName)))) {
pRes->pass = true; pRes->pass = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -1062,6 +1062,7 @@ int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) { ...@@ -1062,6 +1062,7 @@ int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
if ((pTaskInfo->execModel != OPTR_EXEC_MODEL_QUEUE) || (pTaskInfo->streamInfo.submit.msgStr != NULL)) { if ((pTaskInfo->execModel != OPTR_EXEC_MODEL_QUEUE) || (pTaskInfo->streamInfo.submit.msgStr != NULL)) {
qError("qStreamSetScanMemData err:%d,%p", pTaskInfo->execModel, pTaskInfo->streamInfo.submit.msgStr); qError("qStreamSetScanMemData err:%d,%p", pTaskInfo->execModel, pTaskInfo->streamInfo.submit.msgStr);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return -1; return -1;
} }
qDebug("set the submit block for future scan"); qDebug("set the submit block for future scan");
...@@ -1125,6 +1126,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1125,6 +1126,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
} else { } else {
taosRUnLockLatch(&pTaskInfo->lock); taosRUnLockLatch(&pTaskInfo->lock);
qError("no table in table list, %s", id); qError("no table in table list, %s", id);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return -1; return -1;
} }
} }
...@@ -1144,6 +1146,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1144,6 +1146,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
} else { } else {
qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid, qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid,
numOfTables, pScanInfo->currentTable, id); numOfTables, pScanInfo->currentTable, id);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return -1; return -1;
} }
...@@ -1176,6 +1179,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1176,6 +1179,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
pScanBaseInfo->cond.twindows.skey = oldSkey; pScanBaseInfo->cond.twindows.skey = oldSkey;
} else { } else {
qError("invalid pOffset->type:%d, %s", pOffset->type, id); qError("invalid pOffset->type:%d, %s", pOffset->type, id);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return -1; return -1;
} }
...@@ -1190,6 +1194,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1190,6 +1194,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
if (setForSnapShot(sContext, pOffset->uid) != 0) { if (setForSnapShot(sContext, pOffset->uid) != 0) {
qError("setDataForSnapShot error. uid:%" PRId64 " , %s", pOffset->uid, id); qError("setDataForSnapShot error. uid:%" PRId64 " , %s", pOffset->uid, id);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return -1; return -1;
} }
...@@ -1226,6 +1231,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1226,6 +1231,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
SSnapContext* sContext = pInfo->sContext; SSnapContext* sContext = pInfo->sContext;
if (setForSnapShot(sContext, pOffset->uid) != 0) { if (setForSnapShot(sContext, pOffset->uid) != 0) {
qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version); qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return -1; return -1;
} }
qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64 " %s", pOffset->uid, pOffset->ts, qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64 " %s", pOffset->uid, pOffset->ts,
......
...@@ -998,6 +998,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan ...@@ -998,6 +998,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id) { SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id) {
if (pOperator == NULL) { if (pOperator == NULL) {
qError("invalid operator, failed to find tableScanOperator %s", id); qError("invalid operator, failed to find tableScanOperator %s", id);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return NULL; return NULL;
} }
...@@ -1006,6 +1007,7 @@ SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, con ...@@ -1006,6 +1007,7 @@ SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, con
} else { } else {
if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) { if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
qError("invalid operator, failed to find tableScanOperator %s", id); qError("invalid operator, failed to find tableScanOperator %s", id);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return NULL; return NULL;
} }
......
...@@ -2027,7 +2027,7 @@ static int32_t buildInsertUserAuthReq(const char* pUser, SName* pName, SArray** ...@@ -2027,7 +2027,7 @@ static int32_t buildInsertUserAuthReq(const char* pUser, SName* pName, SArray**
SUserAuthInfo userAuth = {.type = AUTH_TYPE_WRITE}; SUserAuthInfo userAuth = {.type = AUTH_TYPE_WRITE};
snprintf(userAuth.user, sizeof(userAuth.user), "%s", pUser); snprintf(userAuth.user, sizeof(userAuth.user), "%s", pUser);
// tNameGetFullDbName(pName, userAuth.dbFName); memcpy(&userAuth.tbName, pName, sizeof(SName));
taosArrayPush(*pUserAuth, &userAuth); taosArrayPush(*pUserAuth, &userAuth);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -207,17 +207,12 @@ int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) { ...@@ -207,17 +207,12 @@ int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) {
return 0; return 0;
} }
// pReader->curInvalid = 1;
// pReader->curVersion = ver;
if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) { if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId, wInfo("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId,
ver, pWal->vers.firstVer, pWal->vers.lastVer); ver, pWal->vers.firstVer, pWal->vers.lastVer);
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1; return -1;
} }
// if (ver < pWal->vers.snapshotVer) {
// }
if (walReadSeekVerImpl(pReader, ver) < 0) { if (walReadSeekVerImpl(pReader, ver) < 0) {
return -1; return -1;
...@@ -236,8 +231,6 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { ...@@ -236,8 +231,6 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
if (pRead->curVersion != fetchVer) { if (pRead->curVersion != fetchVer) {
if (walReadSeekVer(pRead, fetchVer) < 0) { if (walReadSeekVer(pRead, fetchVer) < 0) {
// pRead->curVersion = fetchVer;
// pRead->curInvalid = 1;
return -1; return -1;
} }
seeked = true; seeked = true;
...@@ -256,7 +249,6 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { ...@@ -256,7 +249,6 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
} else { } else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
} }
// pRead->curInvalid = 1;
return -1; return -1;
} }
} }
......
...@@ -24,7 +24,7 @@ class TDTestCase: ...@@ -24,7 +24,7 @@ class TDTestCase:
tdSql.init(conn.cursor(), True) tdSql.init(conn.cursor(), True)
#tdSql.init(conn.cursor(), logSql) # output sql.txt file #tdSql.init(conn.cursor(), logSql) # output sql.txt file
def checkFileContent(self, dbname="sml_db"): def checkContent(self, dbname="sml_db"):
simClientCfg="%s/taos.cfg"%tdDnodes.getSimCfgPath() simClientCfg="%s/taos.cfg"%tdDnodes.getSimCfgPath()
buildPath = tdCom.getBuildPath() buildPath = tdCom.getBuildPath()
cmdStr = '%s/build/bin/sml_test %s'%(buildPath, simClientCfg) cmdStr = '%s/build/bin/sml_test %s'%(buildPath, simClientCfg)
...@@ -102,7 +102,7 @@ class TDTestCase: ...@@ -102,7 +102,7 @@ class TDTestCase:
def run(self): def run(self):
tdSql.prepare() tdSql.prepare()
self.checkFileContent() self.checkContent()
def stop(self): def stop(self):
tdSql.close() tdSql.close()
......
...@@ -32,34 +32,6 @@ class TDTestCase: ...@@ -32,34 +32,6 @@ class TDTestCase:
tdSql.init(conn.cursor()) tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file #tdSql.init(conn.cursor(), logSql) # output sql.txt file
def checkFileContent(self, consumerId, queryString):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
else:
break
return
def prepareTestEnv(self): def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt', paraDict = {'dbName': 'dbt',
......
...@@ -138,34 +138,6 @@ class TDTestCase: ...@@ -138,34 +138,6 @@ class TDTestCase:
else: else:
tdLog.exit("three mnodes is not ready in 10s ") tdLog.exit("three mnodes is not ready in 10s ")
def checkFileContent(self, consumerId, queryString):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
else:
break
return
def tmqCase1(self): def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ") tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db1', paraDict = {'dbName': 'db1',
...@@ -257,7 +229,7 @@ class TDTestCase: ...@@ -257,7 +229,7 @@ class TDTestCase:
tdLog.exit("0 tmq consume rows error!") tdLog.exit("0 tmq consume rows error!")
if expectRowsList[0] == resultList[0]: if expectRowsList[0] == resultList[0]:
self.checkFileContent(consumerId, queryString) tmqCom.checkFileContent(consumerId, queryString)
time.sleep(10) time.sleep(10)
for i in range(len(topicNameList)): for i in range(len(topicNameList)):
......
...@@ -5,6 +5,7 @@ import time ...@@ -5,6 +5,7 @@ import time
import socket import socket
import os import os
import threading import threading
import math
from util.log import * from util.log import *
from util.sql import * from util.sql import *
...@@ -21,34 +22,6 @@ class TDTestCase: ...@@ -21,34 +22,6 @@ class TDTestCase:
tdSql.init(conn.cursor()) tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file #tdSql.init(conn.cursor(), logSql) # output sql.txt file
def checkFileContent(self, consumerId, queryString):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
else:
break
return
def tmqCase1(self): def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ") tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db1', paraDict = {'dbName': 'db1',
...@@ -110,7 +83,7 @@ class TDTestCase: ...@@ -110,7 +83,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("0 tmq consume rows error!") tdLog.exit("0 tmq consume rows error!")
self.checkFileContent(consumerId, queryString) tmqCom.checkFileContent(consumerId, queryString)
# reinit consume info, and start tmq_sim, then check consume result # reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
...@@ -136,7 +109,7 @@ class TDTestCase: ...@@ -136,7 +109,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0])) tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0]))
tdLog.exit("1 tmq consume rows error!") tdLog.exit("1 tmq consume rows error!")
self.checkFileContent(consumerId, queryString) tmqCom.checkFileContent(consumerId, queryString)
# reinit consume info, and start tmq_sim, then check consume result # reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
...@@ -162,7 +135,7 @@ class TDTestCase: ...@@ -162,7 +135,7 @@ class TDTestCase:
# tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[2], resultList[0])) # tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[2], resultList[0]))
# tdLog.exit("2 tmq consume rows error!") # tdLog.exit("2 tmq consume rows error!")
# self.checkFileContent(consumerId, queryString) # tmqCom.checkFileContent(consumerId, queryString)
time.sleep(10) time.sleep(10)
for i in range(len(topicNameList)): for i in range(len(topicNameList)):
......
...@@ -21,34 +21,6 @@ class TDTestCase: ...@@ -21,34 +21,6 @@ class TDTestCase:
tdSql.init(conn.cursor()) tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file #tdSql.init(conn.cursor(), logSql) # output sql.txt file
def checkFileContent(self, consumerId, queryString):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
else:
break
return
def tmqCase1(self): def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ") tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db1', paraDict = {'dbName': 'db1',
...@@ -110,7 +82,7 @@ class TDTestCase: ...@@ -110,7 +82,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("0 tmq consume rows error!") tdLog.exit("0 tmq consume rows error!")
self.checkFileContent(consumerId, queryString) tmqCom.checkFileContent(consumerId, queryString)
# reinit consume info, and start tmq_sim, then check consume result # reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
...@@ -135,7 +107,7 @@ class TDTestCase: ...@@ -135,7 +107,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0])) tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0]))
tdLog.exit("1 tmq consume rows error!") tdLog.exit("1 tmq consume rows error!")
self.checkFileContent(consumerId, queryString) tmqCom.checkFileContent(consumerId, queryString)
# reinit consume info, and start tmq_sim, then check consume result # reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
...@@ -160,7 +132,7 @@ class TDTestCase: ...@@ -160,7 +132,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[2], resultList[0])) tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[2], resultList[0]))
tdLog.exit("2 tmq consume rows error!") tdLog.exit("2 tmq consume rows error!")
self.checkFileContent(consumerId, queryString) tmqCom.checkFileContent(consumerId, queryString)
time.sleep(10) time.sleep(10)
for i in range(len(topicNameList)): for i in range(len(topicNameList)):
......
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
################################################################### ###################################################################
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import math
from asyncore import loop from asyncore import loop
from collections import defaultdict from collections import defaultdict
import subprocess import subprocess
...@@ -467,18 +467,24 @@ class TMQCom: ...@@ -467,18 +467,24 @@ class TMQCom:
for i in range(0,skipRowsOfCons): for i in range(0,skipRowsOfCons):
consumeFile.readline() consumeFile.readline()
lines = 0
while True: while True:
dst = queryFile.readline() dst = queryFile.readline()
src = consumeFile.readline() src = consumeFile.readline()
lines += 1 dstSplit = dst.split(',')
if dst: srcSplit = src.split(',')
if dst != src:
tdLog.info("src row: %s"%src) if not dst or not src:
tdLog.info("dst row: %s"%dst)
tdLog.exit("consumerId %d consume rows[%d] is not match the rows by direct query"%(consumerId, lines))
else:
break break
if len(dstSplit) != len(srcSplit):
tdLog.exit("consumerId %d consume rows len is not match the rows by direct query,len(dstSplit):%d != len(srcSplit):%d, dst:%s, src:%s"
%(consumerId, len(dstSplit), len(srcSplit), dst, src))
for i in range(len(dstSplit)):
if srcSplit[i] != dstSplit[i]:
srcFloat = float(srcSplit[i])
dstFloat = float(dstSplit[i])
if not math.isclose(srcFloat, dstFloat, abs_tol=1e-9):
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
return return
def getResultFileByTaosShell(self, consumerId, queryString): def getResultFileByTaosShell(self, consumerId, queryString):
......
...@@ -21,34 +21,6 @@ class TDTestCase: ...@@ -21,34 +21,6 @@ class TDTestCase:
tdSql.init(conn.cursor()) tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file #tdSql.init(conn.cursor(), logSql) # output sql.txt file
def checkFileContent(self, consumerId, queryString):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
else:
break
return
def tmqCase1(self): def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ") tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db1', paraDict = {'dbName': 'db1',
......
...@@ -60,34 +60,6 @@ class TDTestCase: ...@@ -60,34 +60,6 @@ class TDTestCase:
tdLog.exit("create udf functions fail") tdLog.exit("create udf functions fail")
return return
def checkFileContent(self, consumerId, queryString):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
else:
break
return
def prepareTestEnv(self): def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt', paraDict = {'dbName': 'dbt',
...@@ -201,7 +173,7 @@ class TDTestCase: ...@@ -201,7 +173,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("0 tmq consume rows error!") tdLog.exit("0 tmq consume rows error!")
# self.checkFileContent(consumerId, queryString) # tmqCom.checkFileContent(consumerId, queryString)
# tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId)) # tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
# reinit consume info, and start tmq_sim, then check consume result # reinit consume info, and start tmq_sim, then check consume result
...@@ -228,7 +200,7 @@ class TDTestCase: ...@@ -228,7 +200,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0])) tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0]))
tdLog.exit("1 tmq consume rows error!") tdLog.exit("1 tmq consume rows error!")
# self.checkFileContent(consumerId, queryString) # tmqCom.checkFileContent(consumerId, queryString)
# tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId)) # tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
time.sleep(10) time.sleep(10)
...@@ -312,7 +284,7 @@ class TDTestCase: ...@@ -312,7 +284,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("2 tmq consume rows error!") tdLog.exit("2 tmq consume rows error!")
# self.checkFileContent(consumerId, queryString) # tmqCom.checkFileContent(consumerId, queryString)
# tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId)) # tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
# reinit consume info, and start tmq_sim, then check consume result # reinit consume info, and start tmq_sim, then check consume result
...@@ -339,7 +311,7 @@ class TDTestCase: ...@@ -339,7 +311,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0])) tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0]))
tdLog.exit("3 tmq consume rows error!") tdLog.exit("3 tmq consume rows error!")
# self.checkFileContent(consumerId, queryString) # tmqCom.checkFileContent(consumerId, queryString)
# tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId)) # tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
time.sleep(10) time.sleep(10)
......
...@@ -60,34 +60,6 @@ class TDTestCase: ...@@ -60,34 +60,6 @@ class TDTestCase:
tdLog.exit("create udf functions fail") tdLog.exit("create udf functions fail")
return return
def checkFileContent(self, consumerId, queryString):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
else:
break
return
def prepareTestEnv(self): def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt', paraDict = {'dbName': 'dbt',
...@@ -201,7 +173,7 @@ class TDTestCase: ...@@ -201,7 +173,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("0 tmq consume rows error!") tdLog.exit("0 tmq consume rows error!")
# self.checkFileContent(consumerId, queryString) # tmqCom.checkFileContent(consumerId, queryString)
# tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId)) # tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
# reinit consume info, and start tmq_sim, then check consume result # reinit consume info, and start tmq_sim, then check consume result
...@@ -228,7 +200,7 @@ class TDTestCase: ...@@ -228,7 +200,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0])) tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0]))
tdLog.exit("1 tmq consume rows error!") tdLog.exit("1 tmq consume rows error!")
# self.checkFileContent(consumerId, queryString) # tmqCom.checkFileContent(consumerId, queryString)
# tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId)) # tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
time.sleep(10) time.sleep(10)
...@@ -312,7 +284,7 @@ class TDTestCase: ...@@ -312,7 +284,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("2 tmq consume rows error!") tdLog.exit("2 tmq consume rows error!")
# self.checkFileContent(consumerId, queryString) # tmqCom.checkFileContent(consumerId, queryString)
# tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId)) # tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
# reinit consume info, and start tmq_sim, then check consume result # reinit consume info, and start tmq_sim, then check consume result
...@@ -339,7 +311,7 @@ class TDTestCase: ...@@ -339,7 +311,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0])) tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0]))
tdLog.exit("3 tmq consume rows error!") tdLog.exit("3 tmq consume rows error!")
# self.checkFileContent(consumerId, queryString) # tmqCom.checkFileContent(consumerId, queryString)
# tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId)) # tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
time.sleep(10) time.sleep(10)
......
...@@ -60,34 +60,6 @@ class TDTestCase: ...@@ -60,34 +60,6 @@ class TDTestCase:
tdLog.exit("create udf functions fail") tdLog.exit("create udf functions fail")
return return
def checkFileContent(self, consumerId, queryString):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
else:
break
return
def prepareTestEnv(self): def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt', paraDict = {'dbName': 'dbt',
...@@ -201,7 +173,7 @@ class TDTestCase: ...@@ -201,7 +173,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("0 tmq consume rows error!") tdLog.exit("0 tmq consume rows error!")
self.checkFileContent(consumerId, queryString) tmqCom.checkFileContent(consumerId, queryString)
tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId)) tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
...@@ -229,7 +201,7 @@ class TDTestCase: ...@@ -229,7 +201,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0])) tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0]))
tdLog.exit("1 tmq consume rows error!") tdLog.exit("1 tmq consume rows error!")
self.checkFileContent(consumerId, queryString) tmqCom.checkFileContent(consumerId, queryString)
tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId)) tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
time.sleep(10) time.sleep(10)
...@@ -313,7 +285,7 @@ class TDTestCase: ...@@ -313,7 +285,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("2 tmq consume rows error!") tdLog.exit("2 tmq consume rows error!")
self.checkFileContent(consumerId, queryString) tmqCom.checkFileContent(consumerId, queryString)
tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId)) tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
# reinit consume info, and start tmq_sim, then check consume result # reinit consume info, and start tmq_sim, then check consume result
...@@ -340,7 +312,7 @@ class TDTestCase: ...@@ -340,7 +312,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0])) tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0]))
tdLog.exit("3 tmq consume rows error!") tdLog.exit("3 tmq consume rows error!")
self.checkFileContent(consumerId, queryString) tmqCom.checkFileContent(consumerId, queryString)
tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId)) tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
time.sleep(10) time.sleep(10)
......
...@@ -21,34 +21,6 @@ class TDTestCase: ...@@ -21,34 +21,6 @@ class TDTestCase:
tdSql.init(conn.cursor()) tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file #tdSql.init(conn.cursor(), logSql) # output sql.txt file
def checkFileContent(self, consumerId, queryString):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
else:
break
return
def tmqCase1(self): def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ") tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db1', paraDict = {'dbName': 'db1',
...@@ -114,7 +86,7 @@ class TDTestCase: ...@@ -114,7 +86,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("0 tmq consume rows error!") tdLog.exit("0 tmq consume rows error!")
self.checkFileContent(consumerId, queryString) tmqCom.checkFileContent(consumerId, queryString)
# reinit consume info, and start tmq_sim, then check consume result # reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
...@@ -140,7 +112,7 @@ class TDTestCase: ...@@ -140,7 +112,7 @@ class TDTestCase:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0])) tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0]))
tdLog.exit("1 tmq consume rows error!") tdLog.exit("1 tmq consume rows error!")
self.checkFileContent(consumerId, queryString) tmqCom.checkFileContent(consumerId, queryString)
# reinit consume info, and start tmq_sim, then check consume result # reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
...@@ -166,7 +138,7 @@ class TDTestCase: ...@@ -166,7 +138,7 @@ class TDTestCase:
# tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[2], resultList[0])) # tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[2], resultList[0]))
# tdLog.exit("2 tmq consume rows error!") # tdLog.exit("2 tmq consume rows error!")
# self.checkFileContent(consumerId, queryString) # tmqCom.checkFileContent(consumerId, queryString)
time.sleep(10) time.sleep(10)
for i in range(len(topicNameList)): for i in range(len(topicNameList)):
......
...@@ -317,7 +317,6 @@ void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, i ...@@ -317,7 +317,6 @@ void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, i
quotationStr[0] = '\"'; quotationStr[0] = '\"';
quotationStr[1] = 0; quotationStr[1] = 0;
int n;
char buf[TSDB_MAX_BYTES_PER_ROW]; char buf[TSDB_MAX_BYTES_PER_ROW];
switch (field->type) { switch (field->type) {
case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_BOOL:
...@@ -348,15 +347,11 @@ void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, i ...@@ -348,15 +347,11 @@ void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, i
taosFprintfFile(pFile, "%" PRIu64, *((uint64_t *)val)); taosFprintfFile(pFile, "%" PRIu64, *((uint64_t *)val));
break; break;
case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_FLOAT:
taosFprintfFile(pFile, "%.5f", GET_FLOAT_VAL(val)); taosFprintfFile(pFile, "%e", GET_FLOAT_VAL(val));
break; break;
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", length, GET_DOUBLE_VAL(val)); snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.15e", 23, GET_DOUBLE_VAL(val));
if (n > TMAX(25, length)) { taosFprintfFile(pFile, "%s", buf);
taosFprintfFile(pFile, "%*.15e", length, GET_DOUBLE_VAL(val));
} else {
taosFprintfFile(pFile, "%s", buf);
}
break; break;
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
...@@ -512,7 +507,6 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t ...@@ -512,7 +507,6 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t
return; return;
} }
int n;
char buf[TSDB_MAX_BYTES_PER_ROW]; char buf[TSDB_MAX_BYTES_PER_ROW];
switch (field->type) { switch (field->type) {
case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_BOOL:
...@@ -543,15 +537,11 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t ...@@ -543,15 +537,11 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t
printf("%*" PRIu64, width, *((uint64_t *)val)); printf("%*" PRIu64, width, *((uint64_t *)val));
break; break;
case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_FLOAT:
printf("%*ef", width, GET_FLOAT_VAL(val)); printf("%*e", width, GET_FLOAT_VAL(val));
break; break;
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", width, GET_DOUBLE_VAL(val)); snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%.9e", GET_DOUBLE_VAL(val));
if (n > TMAX(25, width)) { printf("%*s", width, buf);
printf("%*.15e", width, GET_DOUBLE_VAL(val));
} else {
printf("%s", buf);
}
break; break;
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册