diff --git a/Jenkinsfile2 b/Jenkinsfile2 index d90bc3061b2af6eabe3394622cbef2119db892e3..a3006c4f7d0dbbb1282ab6eaba4c66862f7723b0 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -34,16 +34,6 @@ def abort_previous(){ } def pre_test(){ sh 'hostname' - sh ''' - date - sudo rmtaos || echo "taosd has not installed" - ''' - sh ''' - killall -9 taosd ||echo "no taosd running" - killall -9 gdb || echo "no gdb running" - killall -9 python3.8 || echo "no python program running" - cd ${WKC} - ''' script { if (env.CHANGE_TARGET == 'master') { sh ''' @@ -81,10 +71,10 @@ def pre_test(){ git pull >/dev/null git fetch origin +refs/pull/${CHANGE_ID}/merge git checkout -qf FETCH_HEAD - git log|head -n20 + git log -5 cd ${WK} git pull >/dev/null - git log|head -n20 + git log -5 ''' } else if (env.CHANGE_URL =~ /\/TDinternal\//) { sh ''' @@ -92,10 +82,10 @@ def pre_test(){ git pull >/dev/null git fetch origin +refs/pull/${CHANGE_ID}/merge git checkout -qf FETCH_HEAD - git log|head -n20 + git log -5 cd ${WKC} git pull >/dev/null - git log|head -n20 + git log -5 ''' } else { sh ''' @@ -106,21 +96,10 @@ def pre_test(){ cd ${WKC} git submodule update --init --recursive ''' - sh ''' - cd ${WK} - export TZ=Asia/Harbin - date - rm -rf debug - mkdir debug - cd debug - cmake .. > /dev/null - make -j4> /dev/null - ''' sh ''' cd ${WKPY} git reset --hard git pull - pip3 install . ''' return 1 } @@ -131,12 +110,14 @@ def pre_test_win(){ time /t taskkill /f /t /im python.exe taskkill /f /t /im bash.exe - cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine - rd /s /Q C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine\\debug + rd /s /Q C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\debug exit 0 ''' bat ''' - cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal + git reset --hard + git fetch || git fetch + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community git reset --hard git fetch || git fetch git checkout -f @@ -144,39 +125,73 @@ def pre_test_win(){ script { if (env.CHANGE_TARGET == 'master') { bat ''' - cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal + git checkout master + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community git checkout master ''' } else if(env.CHANGE_TARGET == '2.0') { bat ''' - cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal + git checkout 2.0 + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community git checkout 2.0 ''' } else if(env.CHANGE_TARGET == '3.0') { bat ''' - cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal + git checkout 3.0 + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community git checkout 3.0 ''' } else { bat ''' - cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal + git checkout develop + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community git checkout develop ''' } } + script { + if (env.CHANGE_URL =~ /\/TDengine\//) { + bat ''' + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal + git pull + git log -5 + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community + git pull + git fetch origin +refs/pull/${CHANGE_ID}/merge + git checkout -qf FETCH_HEAD + git log -5 + ''' + } else if (env.CHANGE_URL =~ /\/TDinternal\//) { + bat ''' + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal + git pull + git fetch origin +refs/pull/${CHANGE_ID}/merge + git checkout -qf FETCH_HEAD + git log -5 + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community + git pull + git log -5 + ''' + } else { + sh ''' + echo "unmatched reposiotry ${CHANGE_URL}" + ''' + } + } bat ''' - cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine - git branch - git pull || git pull - git fetch origin +refs/pull/%CHANGE_ID%/merge - git checkout -qf FETCH_HEAD + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community + git submodule update --init --recursive ''' } def pre_test_build_win() { bat ''' echo "building ..." time /t - cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDengine + cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal mkdir debug cd debug call "C:\\Program Files (x86)\\Microsoft Visual Studio\\2017\\Community\\VC\\Auxiliary\\Build\\vcvarsall.bat" x64 @@ -192,6 +207,7 @@ pipeline { agent none options { skipDefaultCheckout() } environment{ + WKDIR = '/var/lib/jenkins/workspace' WK = '/var/lib/jenkins/workspace/TDinternal' WKC = '/var/lib/jenkins/workspace/TDinternal/community' WKPY = '/var/lib/jenkins/workspace/taos-connector-python' @@ -206,39 +222,22 @@ pipeline { changeRequest() } steps { - timeout(time: 45, unit: 'MINUTES'){ + timeout(time: 20, unit: 'MINUTES'){ pre_test() script { - if (env.CHANGE_URL =~ /\/TDengine\//) { - sh ''' - cd ${WK}/debug - ctest -VV - ''' - sh ''' - export LD_LIBRARY_PATH=${WK}/debug/build/lib - cd ${WKC}/tests/system-test - ./fulltest.sh - ''' - } else if (env.CHANGE_URL =~ /\/TDinternal\//) { - sh ''' - cd ${WKC}/debug - ctest -VV - ''' - sh ''' - export LD_LIBRARY_PATH=${WKC}/debug/build/lib - cd ${WKC}/tests/system-test - ./fulltest.sh - ''' - } else { - sh ''' - echo "unmatched reposiotry ${CHANGE_URL}" - ''' - } + sh ''' + cd ${WKC}/tests/parallel_test + date + time ./container_build.sh -w ${WKDIR} -t 8 -e + rm -f /tmp/cases.task + ./collect_cases.sh -e + ''' + sh ''' + cd ${WKC}/tests/parallel_test + date + time ./run.sh -e -m /home/m.json -t /tmp/cases.task -b ${CHANGE_TARGET} -l ${WKDIR}/log + ''' } - sh ''' - cd ${WKC}/tests - ./test-all.sh b1fq - ''' } } } diff --git a/example/src/tmq.c b/example/src/tmq.c index e867f17e7833f01feb2cde43d82a78bbba3162f0..b4013f26eeb83621a7f815e0dd1e71e2144b5fbe 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -61,7 +61,7 @@ int32_t init_env() { taos_free_result(pRes); pRes = - taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)"); + taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int)"); if (taos_errno(pRes) != 0) { printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); return -1; @@ -106,8 +106,8 @@ int32_t create_topic() { } taos_free_result(pRes); - /*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/ - pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1"); + pRes = taos_query(pConn, "create topic topic_ctb_column as abc1"); + /*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/ if (taos_errno(pRes) != 0) { printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 6141829a3f909d798d98cabf84b7e8b759326aba..8d0b93dde2f79398f8526747131377a68fd71fab 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -173,6 +173,7 @@ typedef struct SqlFunctionCtx { SInputColumnInfoData input; SResultDataInfo resDataInfo; uint32_t order; // data block scanner order: asc|desc + uint8_t scanFlag; // record current running step, default: 0 //////////////////////////////////////////////////////////////// int32_t startRow; // start row index int32_t size; // handled processed row number @@ -183,7 +184,6 @@ typedef struct SqlFunctionCtx { bool hasNull; // null value exist in current block, TODO remove it bool requireNull; // require null in some function, TODO remove it int32_t columnIndex; // TODO remove it - uint8_t currentStage; // record current running step, default: 0 bool isAggSet; int64_t startTs; // timestamp range of current query when function is executed on a specific data block, TODO remove it bool stableQuery; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index a6b8b842f94ee51825fe323778ebd6e6ae149c64..639f00ab86e7f87fd77e3755fb875f039501242e 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -185,6 +185,7 @@ typedef struct { int32_t async; tsem_t rspSem; tmq_resp_err_t rspErr; + SArray* offsets; } SMqCommitCbParam; tmq_conf_t* tmq_conf_new() { @@ -246,10 +247,13 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value if (strcmp(key, "msg.with.table.name") == 0) { if (strcmp(value, "true") == 0) { conf->withTbName = 1; + return TMQ_CONF_OK; } else if (strcmp(value, "false") == 0) { conf->withTbName = 0; + return TMQ_CONF_OK; } else if (strcmp(value, "none") == 0) { conf->withTbName = -1; + return TMQ_CONF_OK; } else { return TMQ_CONF_INVALID; } @@ -395,6 +399,9 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { if (!pParam->async) tsem_post(&pParam->rspSem); else { + if (pParam->offsets) { + taosArrayDestroy(pParam->offsets); + } tsem_destroy(&pParam->rspSem); /*if (pParam->pArray) {*/ /*taosArrayDestroy(pParam->pArray);*/ @@ -540,10 +547,10 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in // build msg // send to mnode SMqCMCommitOffsetReq req; - SArray* pArray = NULL; + SArray* pOffsets = NULL; if (offsets == NULL) { - pArray = taosArrayInit(0, sizeof(SMqOffset)); + pOffsets = taosArrayInit(0, sizeof(SMqOffset)); for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { @@ -553,11 +560,11 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in strcpy(offset.cgroup, tmq->groupId); offset.vgId = pVg->vgId; offset.offset = pVg->currentOffset; - taosArrayPush(pArray, &offset); + taosArrayPush(pOffsets, &offset); } } - req.num = pArray->size; - req.offsets = pArray->pData; + req.num = pOffsets->size; + req.offsets = pOffsets->pData; } else { req.num = taosArrayGetSize(&offsets->container); req.offsets = (SMqOffset*)offsets->container.pData; @@ -591,6 +598,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in pParam->tmq = tmq; tsem_init(&pParam->rspSem, 0, 0); pParam->async = async; + pParam->offsets = pOffsets; pRequest->body.requestMsg = (SDataBuf){ .pData = buf, @@ -613,8 +621,8 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in tsem_destroy(&pParam->rspSem); taosMemoryFree(pParam); - if (pArray) { - taosArrayDestroy(pArray); + if (pOffsets) { + taosArrayDestroy(pOffsets); } } @@ -1015,7 +1023,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { atomic_store_32(&tmq->epSkipCnt, 0); #endif int32_t tlen = sizeof(SMqAskEpReq); - SMqAskEpReq* req = taosMemoryMalloc(tlen); + SMqAskEpReq* req = taosMemoryCalloc(1, tlen); if (req == NULL) { tscError("failed to malloc get subscribe ep buf"); /*atomic_store_8(&tmq->epStatus, 0);*/ @@ -1025,7 +1033,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { req->epoch = htonl(tmq->epoch); strcpy(req->cgroup, tmq->groupId); - SMqAskEpCbParam* pParam = taosMemoryMalloc(sizeof(SMqAskEpCbParam)); + SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam)); if (pParam == NULL) { tscError("failed to malloc subscribe param"); taosMemoryFree(req); @@ -1107,7 +1115,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopic* reqOffset = tmq->resetOffsetCfg; } - SMqPollReq* pReq = taosMemoryMalloc(sizeof(SMqPollReq)); + SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq)); if (pReq == NULL) { return NULL; } diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 51b38604613d60fc817bbb1a91926a5eb9aca2cc..5d8ec50e81b8df13285733abdbbde1ad49f9f6e5 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -46,8 +46,6 @@ typedef struct { int32_t vgId; int32_t vgVersion; int8_t dropped; - uint64_t dbUid; - char db[TSDB_DB_FNAME_LEN]; char path[PATH_MAX + 20]; } SWrapperCfg; @@ -57,8 +55,6 @@ typedef struct { int32_t vgVersion; int8_t dropped; int8_t accessState; - uint64_t dbUid; - char *db; char *path; SVnode *pImpl; STaosQueue *pWriteQ; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c index f251dd120e8e690c64045cbdd1b88ebb32f927e0..ba4293eeb2ceb6a775281d624956d107643d7445 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c @@ -47,7 +47,7 @@ SVnodeObj **vmGetVnodeListFromHash(SVnodesMgmt *pMgmt, int32_t *numOfVnodes) { int32_t vmGetVnodeListFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes) { int32_t code = TSDB_CODE_INVALID_JSON_FORMAT; int32_t len = 0; - int32_t maxLen = 30000; + int32_t maxLen = 1024 * 1024; char *content = taosMemoryCalloc(1, maxLen + 1); cJSON *root = NULL; FILE *fp = NULL; @@ -64,6 +64,11 @@ int32_t vmGetVnodeListFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t goto _OVER; } + if (content == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + len = (int32_t)taosReadFile(pFile, content, maxLen); if (len <= 0) { dError("failed to read %s since content is null", file); @@ -116,20 +121,6 @@ int32_t vmGetVnodeListFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t goto _OVER; } pCfg->vgVersion = vgVersion->valueint; - - cJSON *dbUid = cJSON_GetObjectItem(vnode, "dbUid"); - if (!dbUid || dbUid->type != cJSON_String) { - dError("failed to read %s since dbUid not found", file); - goto _OVER; - } - pCfg->dbUid = atoll(dbUid->valuestring); - - cJSON *db = cJSON_GetObjectItem(vnode, "db"); - if (!db || db->type != cJSON_String) { - dError("failed to read %s since db not found", file); - goto _OVER; - } - tstrncpy(pCfg->db, db->valuestring, TSDB_DB_FNAME_LEN); } *ppCfgs = pCfgs; @@ -165,8 +156,12 @@ int32_t vmWriteVnodeListToFile(SVnodesMgmt *pMgmt) { SVnodeObj **pVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes); int32_t len = 0; - int32_t maxLen = 65536; + int32_t maxLen = 1024 * 1024; char *content = taosMemoryCalloc(1, maxLen + 1); + if (content == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, " \"vnodes\": [\n"); @@ -175,9 +170,7 @@ int32_t vmWriteVnodeListToFile(SVnodesMgmt *pMgmt) { len += snprintf(content + len, maxLen - len, " {\n"); len += snprintf(content + len, maxLen - len, " \"vgId\": %d,\n", pVnode->vgId); len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pVnode->dropped); - len += snprintf(content + len, maxLen - len, " \"vgVersion\": %d,\n", pVnode->vgVersion); - len += snprintf(content + len, maxLen - len, " \"dbUid\": \"%" PRIu64 "\",\n", pVnode->dbUid); - len += snprintf(content + len, maxLen - len, " \"db\": \"%s\"\n", pVnode->db); + len += snprintf(content + len, maxLen - len, " \"vgVersion\": %d\n", pVnode->vgVersion); if (i < numOfVnodes - 1) { len += snprintf(content + len, maxLen - len, " },\n"); } else { diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 914acce2ea9aa5dbdbb8337179c43e7922cf5e72..528f6a0ffe22d381b537c4c04eea985dc4aeae78 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -174,8 +174,6 @@ static void vmGenerateWrapperCfg(SVnodesMgmt *pMgmt, SCreateVnodeReq *pCreate, S pCfg->vgId = pCreate->vgId; pCfg->vgVersion = pCreate->vgVersion; pCfg->dropped = 0; - pCfg->dbUid = pCreate->dbUid; - tstrncpy(pCfg->db, pCreate->db, TSDB_DB_FNAME_LEN); snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId); } @@ -227,6 +225,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, msgCb); if (pImpl == NULL) { dError("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr()); + code = terrno; goto _OVER; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index af439fcc03f1525b62d3f0fcc21109f5b2f959f0..9a0a524267a85c90872579ca07d93f35d0e31668 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -57,13 +57,11 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { pVnode->vgVersion = pCfg->vgVersion; pVnode->dropped = 0; pVnode->accessState = TSDB_VN_ALL_ACCCESS; - pVnode->dbUid = pCfg->dbUid; - pVnode->db = tstrdup(pCfg->db); pVnode->path = tstrdup(pCfg->path); pVnode->pImpl = pImpl; pVnode->pWrapper = pMgmt->pWrapper; - if (pVnode->path == NULL || pVnode->db == NULL) { + if (pVnode->path == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -109,7 +107,6 @@ void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { } taosMemoryFree(pVnode->path); - taosMemoryFree(pVnode->db); taosMemoryFree(pVnode); } diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 5085de86107ea16b8f4578e86730594e0046d23d..4828b9f523c273d5e2125b12819fc1fc1856659c 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1051,7 +1051,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) { int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions); if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { - mError("failed to execute redoActions since %s", terrstr()); + mError("failed to execute redoActions since:%s, code:0x%x", terrstr(), terrno); } return code; } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 5eb89e8bb7c81f107d1c8db1eb3e31410fd5eb8e..38dedee5a29b044c89c11b66b00bbbf2160e0903 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -93,6 +93,7 @@ struct STqReadHandle { SMeta* pVnodeMeta; SArray* pColIdList; // SArray int32_t sver; + int64_t cachedSchemaUid; SSchemaWrapper* pSchemaWrapper; STSchema* pSchema; }; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 28cdb39bd5257dd08e1167463bfd7683d00df361..952645190760826528e4e31d96c88cd8638eb7cf 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -559,6 +559,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } // db subscribe } else if (pExec->subType == TOPIC_SUB_TYPE__DB) { + rsp.withSchema = 1; STqReadHandle* pReader = pExec->pExecReader[workerId]; tqReadHandleSetMsg(pReader, pCont, 0); while (tqNextDataBlock(pReader)) { diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index f531d3f5fb52dcb29ad1703f68134371386c8aeb..996d789e248778906c1b1bc046c7d1c6a58cabfe 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -25,6 +25,7 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { pReadHandle->ver = -1; pReadHandle->pColIdList = NULL; pReadHandle->sver = -1; + pReadHandle->cachedSchemaUid = -1; pReadHandle->pSchema = NULL; pReadHandle->pSchemaWrapper = NULL; pReadHandle->tbIdHash = NULL; @@ -84,19 +85,20 @@ bool tqNextDataBlock(STqReadHandle* pHandle) { return false; } -int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* pGroupId, uint64_t* pUid, int32_t* pNumOfRows, - int16_t* pNumOfCols) { +int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* pGroupId, uint64_t* pUid, + int32_t* pNumOfRows, int16_t* pNumOfCols) { /*int32_t sversion = pHandle->pBlock->sversion;*/ // TODO set to real sversion *pUid = 0; int32_t sversion = 0; - if (pHandle->sver != sversion) { + if (pHandle->sver != sversion || pHandle->cachedSchemaUid != pHandle->msgIter.suid) { pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion); // this interface use suid instead of uid pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.suid, sversion, true); pHandle->sver = sversion; + pHandle->cachedSchemaUid = pHandle->msgIter.suid; } STSchema* pTschema = pHandle->pSchema; diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index ae134e64964ab60b0e1c462013f53b4eb822bddc..7476da2a0f75a2c5aa38cb9660ea9943bc63e586 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -137,18 +137,21 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { // open query if (vnodeQueryOpen(pVnode)) { vError("vgId:%d failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno)); + terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } // vnode begin if (vnodeBegin(pVnode) < 0) { vError("vgId:%d failed to begin since %s", TD_VID(pVnode), tstrerror(terrno)); + terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } // open sync if (vnodeSyncOpen(pVnode, dir)) { vError("vgId:%d failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno)); + terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 34b7fce33cbccc11a6699636f160d44bd0ee4665..f79d3c8450cac0c17c09f44007cfa8cc44fbd638 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -706,7 +706,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo, - SNode* pConditions, SOperatorInfo* pOperatorDumy, SInterval* pInterval); + SNode* pConditions, SOperatorInfo* pOperatorDumy); SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index fe5066a06585ec76fcb27cd221436224c23a12ce..df446def5ea3d665986a7575f7b723f3f96ea09c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -746,7 +746,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt pCtx[i].order = order; pCtx[i].size = pBlock->info.rows; pCtx[i].pSrcBlock = pBlock; - pCtx[i].currentStage = scanFlag; + pCtx[i].scanFlag = scanFlag; SInputColumnInfoData* pInput = &pCtx[i].input; pInput->uid = pBlock->info.uid; @@ -826,23 +826,22 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt return code; } -static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) { +static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) { for (int32_t k = 0; k < pOperator->numOfExprs; ++k) { if (functionNeedToExecute(&pCtx[k])) { pCtx[k].startTs = startTs; - // this can be set during create the struct // todo add a dummy funtion to avoid process check if (pCtx[k].fpSet.process != NULL) { int32_t code = pCtx[k].fpSet.process(&pCtx[k]); if (code != TSDB_CODE_SUCCESS) { - qError("%s call aggregate function error happens, code : %s", - GET_TASKID(pOperator->pTaskInfo), tstrerror(code)); - pOperator->pTaskInfo->code = code; - longjmp(pOperator->pTaskInfo->env, code); + qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code)); + return code; } } } } + + return TSDB_CODE_SUCCESS; } static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) { @@ -998,18 +997,22 @@ static bool functionNeedToExecute(SqlFunctionCtx* pCtx) { return false; } - if (isRowEntryCompleted(pResInfo)) { - return false; + if (pCtx->scanFlag == REPEAT_SCAN) { + return fmIsRepeatScanFunc(pCtx->functionId); } - if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_FIRST) { - // return QUERY_IS_ASC_QUERY(pQueryAttr); + if (isRowEntryCompleted(pResInfo)) { + return false; } - // denote the order type - if ((functionId == FUNCTION_LAST_DST || functionId == FUNCTION_LAST)) { - // return pCtx->param[0].i == pQueryAttr->order.order; - } +// if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_FIRST) { +// // return QUERY_IS_ASC_QUERY(pQueryAttr); +// } +// +// // denote the order type +// if ((functionId == FUNCTION_LAST_DST || functionId == FUNCTION_LAST)) { +// // return pCtx->param[0].i == pQueryAttr->order.order; +// } // in the reverse table scan, only the following functions need to be executed // if (IS_REVERSE_SCAN(pRuntimeEnv) || @@ -1944,7 +1947,7 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t cleanupResultRowEntry(pEntry); pCtx[i].resultInfo = pEntry; - pCtx[i].currentStage = stage; + pCtx[i].scanFlag = stage; // set the timestamp output buffer for top/bottom/diff query // int32_t fid = pCtx[i].functionId; @@ -3724,7 +3727,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { SAggOperatorInfo* pAggInfo = pOperator->info; SOptrBasicInfo* pInfo = &pAggInfo->binfo; - SOperatorInfo* downstream = pOperator->pDownstream[0]; int32_t order = TSDB_ORDER_ASC; @@ -3738,9 +3740,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { if (pBlock == NULL) { break; } - // if (pAggInfo->current != NULL) { - // setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfExprs); - // } int32_t code = getTableScanInfo(pOperator, &order, &scanFlag); if (code != TSDB_CODE_SUCCESS) { @@ -3750,17 +3749,19 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { // there is an scalar expression that needs to be calculated before apply the group aggregation. if (pAggInfo->pScalarExprInfo != NULL) { code = projectApplyFunctions(pAggInfo->pScalarExprInfo, pBlock, pBlock, pAggInfo->pScalarCtx, - pAggInfo->numOfScalarExpr, NULL); + pAggInfo->numOfScalarExpr, NULL); if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = code; - longjmp(pTaskInfo->env, pTaskInfo->code); + longjmp(pTaskInfo->env, code); } } // the pDataBlock are always the same one, no need to call this again setExecutionContext(pOperator->numOfExprs, pBlock->info.groupId, pTaskInfo, pAggInfo); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, true); - doAggregateImpl(pOperator, 0, pInfo->pCtx); + code = doAggregateImpl(pOperator, 0, pInfo->pCtx); + if (code != 0) { + longjmp(pTaskInfo->env, code); + } #if 0 // test for encode/decode result info if(pOperator->encodeResultRow){ @@ -4807,17 +4808,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc; - - SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); - SSDataBlock* pResBlockDumy = createResDataBlock(pDescNode); - - SQueryTableDataCond cond = {0}; - int32_t code = initQueryTableDataCond(&cond, pTableScanNode); - if (code != TSDB_CODE_SUCCESS) { - return NULL; - } - - SInterval interval = extractIntervalInfo(pTableScanNode); SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); SArray* tableIdList = extractTableIdList(pTableGroupInfo); @@ -4825,7 +4815,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pDataReader, pResBlock, pCols, tableIdList, pTaskInfo, - pScanPhyNode->node.pConditions, pOperatorDumy, &interval); + pScanPhyNode->node.pConditions, pOperatorDumy); taosArrayDestroy(tableIdList); return pOperator; } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 1483b6b042e44c858bb01a4d3f8d636f776cd474..c6cb01e8fbb7f7c1c9dc2b6aa4f99b3da8131f79 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -260,6 +260,53 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction pTableScanInfo->cond.order = TSDB_ORDER_DESC; } +static void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) { + // currently only the tbname pseudo column + if (pTableScanInfo->numOfPseudoExpr == 0) { + return; + } + + SMetaReader mr = {0}; + metaReaderInit(&mr, pTableScanInfo->readHandle.meta, 0); + metaGetTableEntryByUid(&mr, pBlock->info.uid); + + for (int32_t j = 0; j < pTableScanInfo->numOfPseudoExpr; ++j) { + SExprInfo* pExpr = &pTableScanInfo->pPseudoExpr[j]; + + int32_t dstSlotId = pExpr->base.resSchema.slotId; + + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId); + colInfoDataEnsureCapacity(pColInfoData, 0, pBlock->info.rows); + + int32_t functionId = pExpr->pExpr->_function.functionId; + + // this is to handle the tbname + if (fmIsScanPseudoColumnFunc(functionId)) { + struct SScalarFuncExecFuncs fpSet = {0}; + fmGetScalarFuncExecFuncs(functionId, &fpSet); + + SColumnInfoData infoData = {0}; + infoData.info.type = TSDB_DATA_TYPE_BIGINT; + infoData.info.bytes = sizeof(uint64_t); + colInfoDataEnsureCapacity(&infoData, 0, 1); + + colDataAppendInt64(&infoData, 0, &pBlock->info.uid); + SScalarParam srcParam = { + .numOfRows = pBlock->info.rows, .param = pTableScanInfo->readHandle.meta, .columnData = &infoData}; + + SScalarParam param = {.columnData = pColInfoData}; + fpSet.process(&srcParam, 1, ¶m); + } else { // these are tags + const char* p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId); + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + colDataAppend(pColInfoData, i, p, (p == NULL)); + } + } + } + + metaReaderClear(&mr); +} + static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { STableScanInfo* pTableScanInfo = pOperator->info; SSDataBlock* pBlock = pTableScanInfo->pResBlock; @@ -285,23 +332,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { // currently only the tbname pseudo column if (pTableScanInfo->numOfPseudoExpr > 0) { - int32_t dstSlotId = pTableScanInfo->pPseudoExpr->base.resSchema.slotId; - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId); - colInfoDataEnsureCapacity(pColInfoData, 0, pBlock->info.rows); - - struct SScalarFuncExecFuncs fpSet; - fmGetScalarFuncExecFuncs(pTableScanInfo->pPseudoExpr->pExpr->_function.functionId, &fpSet); - - SColumnInfoData infoData = {0}; - infoData.info.type = TSDB_DATA_TYPE_BIGINT; - infoData.info.bytes = sizeof(uint64_t); - colInfoDataEnsureCapacity(&infoData, 0, 1); - - colDataAppendInt64(&infoData, 0, &pBlock->info.uid); - SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .param = pTableScanInfo->readHandle.meta, .columnData = &infoData}; - - SScalarParam param = {.columnData = pColInfoData}; - fpSet.process(&srcParam, 1, ¶m); + addTagPseudoColumnData(pTableScanInfo, pBlock); } return pBlock; @@ -751,8 +782,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataReader, SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, - SExecTaskInfo* pTaskInfo, SNode* pCondition, SOperatorInfo* pOperatorDumy, - SInterval* pInterval) { + SExecTaskInfo* pTaskInfo, SNode* pCondition, SOperatorInfo* pOperatorDumy ) { SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -760,6 +790,8 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR goto _error; } + STableScanInfo* pSTInfo = (STableScanInfo*)pOperatorDumy->info; + int32_t numOfOutput = taosArrayGetSize(pColList); SArray* pColIds = taosArrayInit(4, sizeof(int16_t)); @@ -792,7 +824,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR } pInfo->primaryTsIndex = 0; // TODO(liuyao) get it from physical plan - pInfo->pUpdateInfo = updateInfoInitP(pInterval, 10000); // TODO(liuyao) get watermark from physical plan + pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, 10000); // TODO(liuyao) get watermark from physical plan if (pInfo->pUpdateInfo == NULL) { taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); @@ -805,7 +837,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR pInfo->pDataReader = pDataReader; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->pOperatorDumy = pOperatorDumy; - pInfo->interval = *pInterval; + pInfo->interval = pSTInfo->interval; pOperator->name = "StreamBlockScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; diff --git a/source/libs/executor/test/indexexcutorTests.cpp b/source/libs/executor/test/index_executor_tests.cpp similarity index 100% rename from source/libs/executor/test/indexexcutorTests.cpp rename to source/libs/executor/test/index_executor_tests.cpp diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 64bee0c09659db7289495a1dc52177650fb03258..2ec050d82d70a26e7a02b5a6571d496e6e43dc3c 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1645,7 +1645,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) { int32_t type = pCol->info.type; SPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); - if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) { + if (pCtx->scanFlag == REPEAT_SCAN && pInfo->stage == 0) { pInfo->stage += 1; // all data are null, set it completed diff --git a/source/libs/function/src/taggfunction.c b/source/libs/function/src/taggfunction.c index 4d80b88a3a3ee72f09efdead1d9ee3a0fa6cb68b..b6d5d38c9e164007860621126322fe90191632b6 100644 --- a/source/libs/function/src/taggfunction.c +++ b/source/libs/function/src/taggfunction.c @@ -37,7 +37,7 @@ #define GET_TRUE_DATA_TYPE() \ int32_t type = 0; \ - if (pCtx->currentStage == MERGE_STAGE) { \ + if (pCtx->scanFlag == MERGE_STAGE) { \ type = pCtx->resDataInfo.type; \ assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); \ } else { \ @@ -908,7 +908,7 @@ static void avg_func_merge(SqlFunctionCtx *pCtx) { static void avg_finalizer(SqlFunctionCtx *pCtx) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - if (pCtx->currentStage == MERGE_STAGE) { + if (pCtx->scanFlag == MERGE_STAGE) { assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); if (GET_INT64_VAL(GET_ROWCELL_INTERBUF(pResInfo)) <= 0) { @@ -1152,7 +1152,7 @@ static void stddev_function(SqlFunctionCtx *pCtx) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); SStddevInfo *pStd = GET_ROWCELL_INTERBUF(pResInfo); - if (pCtx->currentStage == REPEAT_SCAN && pStd->stage == 0) { + if (pCtx->scanFlag == REPEAT_SCAN && pStd->stage == 0) { pStd->stage++; avg_finalizer(pCtx); @@ -1814,7 +1814,7 @@ static STopBotInfo *getTopBotOutputInfo(SqlFunctionCtx *pCtx) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); // only the first_stage_merge is directly written data into final output buffer - if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) { + if (pCtx->stableQuery && pCtx->scanFlag != MERGE_STAGE) { return (STopBotInfo*) pCtx->pOutput; } else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer return GET_ROWCELL_INTERBUF(pResInfo); @@ -1956,7 +1956,7 @@ static void top_func_merge(SqlFunctionCtx *pCtx) { for (int32_t i = 0; i < pInput->num; ++i) { int16_t type = (pCtx->resDataInfo.type == TSDB_DATA_TYPE_FLOAT)? TSDB_DATA_TYPE_DOUBLE:pCtx->resDataInfo.type; // do_top_function_add(pOutput, (int32_t)pCtx->param[0].param.i, &pInput->res[i]->v.i, pInput->res[i]->timestamp, -// type, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage); +// type, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->scanFlag); } SET_VAL(pCtx, pInput->num, pOutput->num); @@ -2013,7 +2013,7 @@ static void bottom_func_merge(SqlFunctionCtx *pCtx) { for (int32_t i = 0; i < pInput->num; ++i) { int16_t type = (pCtx->resDataInfo.type == TSDB_DATA_TYPE_FLOAT) ? TSDB_DATA_TYPE_DOUBLE : pCtx->resDataInfo.type; // do_bottom_function_add(pOutput, (int32_t)pCtx->param[0].param.i, &pInput->res[i]->v.i, pInput->res[i]->timestamp, type, -// &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage); +// &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->scanFlag); } SET_VAL(pCtx, pInput->num, pOutput->num); @@ -2073,7 +2073,7 @@ static void percentile_function(SqlFunctionCtx *pCtx) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); - if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) { + if (pCtx->scanFlag == REPEAT_SCAN && pInfo->stage == 0) { pInfo->stage += 1; // all data are null, set it completed @@ -2180,7 +2180,7 @@ static SAPercentileInfo *getAPerctInfo(SqlFunctionCtx *pCtx) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); SAPercentileInfo* pInfo = NULL; - if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) { + if (pCtx->stableQuery && pCtx->scanFlag != MERGE_STAGE) { pInfo = (SAPercentileInfo*) pCtx->pOutput; } else { pInfo = GET_ROWCELL_INTERBUF(pResInfo); @@ -2270,7 +2270,7 @@ static void apercentile_finalizer(SqlFunctionCtx *pCtx) { SResultRowEntryInfo * pResInfo = GET_RES_INFO(pCtx); SAPercentileInfo *pOutput = GET_ROWCELL_INTERBUF(pResInfo); - if (pCtx->currentStage == MERGE_STAGE) { + if (pCtx->scanFlag == MERGE_STAGE) { // if (pResInfo->hasResult == DATA_SET_FLAG) { // check for null // assert(pOutput->pHisto->numOfElems > 0); // @@ -2510,7 +2510,7 @@ static void copy_function(SqlFunctionCtx *pCtx); static void tag_function(SqlFunctionCtx *pCtx) { SET_VAL(pCtx, 1, 1); - if (pCtx->currentStage == MERGE_STAGE) { + if (pCtx->scanFlag == MERGE_STAGE) { copy_function(pCtx); } else { taosVariantDump(&pCtx->tag, pCtx->pOutput, pCtx->resDataInfo.type, true); @@ -2966,7 +2966,7 @@ static bool spread_function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pRe SSpreadInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); // this is the server-side setup function in client-side, the secondary merge do not need this procedure - if (pCtx->currentStage == MERGE_STAGE) { + if (pCtx->scanFlag == MERGE_STAGE) { // pCtx->param[0].param.d = DBL_MAX; // pCtx->param[3].param.d = -DBL_MAX; } else { @@ -3086,7 +3086,7 @@ void spread_function_finalizer(SqlFunctionCtx *pCtx) { */ SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - if (pCtx->currentStage == MERGE_STAGE) { + if (pCtx->scanFlag == MERGE_STAGE) { assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); // if (pResInfo->hasResult != DATA_SET_FLAG) { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 5ed7d9c1b54418231db1d727687942f9c117b307..6ae1b23b9738b0755075d4f46216117c7387485c 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -228,10 +228,12 @@ static void setScanWindowInfo(SScanLogicNode* pScan) { static int32_t osdOptimize(SOptimizeContext* pCxt, SLogicNode* pLogicNode) { SOsdInfo info = {0}; int32_t code = osdMatch(pCxt, pLogicNode, &info); + if (TSDB_CODE_SUCCESS == code && info.pScan) { + setScanWindowInfo((SScanLogicNode*)info.pScan); + } if (TSDB_CODE_SUCCESS == code && (NULL != info.pDsoFuncs || NULL != info.pSdrFuncs)) { info.pScan->dataRequired = osdGetDataRequired(info.pSdrFuncs); info.pScan->pDynamicScanFuncs = info.pDsoFuncs; - setScanWindowInfo((SScanLogicNode*)info.pScan); OPTIMIZE_FLAG_SET_MASK(info.pScan->node.optimizedFlag, OPTIMIZE_FLAG_OSD); pCxt->optimized = true; } diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index a5811a5ace83725d2733e79e3e31bf5d3bca2a30..08093c8b184a47fa5e65d77ce4f5a56fe5d86ff6 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -154,7 +154,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in // sink if (pTask->sinkType == TASK_SINK__TABLE) { - blockDebugShowData(pRes); + // blockDebugShowData(pRes); pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pRes); } else if (pTask->sinkType == TASK_SINK__SMA) { pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pRes); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 718be6aa64755afea556961e2c765a7558c60ddf..cd7e3beb1308d656f328a62e4d70d680f47c7c52 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -13,7 +13,6 @@ */ #ifdef USE_UV - #include "transComm.h" typedef struct SCliConn { @@ -21,15 +20,16 @@ typedef struct SCliConn { uv_connect_t connReq; uv_stream_t* stream; uv_write_t writeReq; - void* hostThrd; - SConnBuffer readBuf; - void* data; - STransQueue cliMsgs; - queue conn; - uint64_t expireTime; - int hThrdIdx; - STransCtx ctx; + void* hostThrd; + int hThrdIdx; + + SConnBuffer readBuf; + STransQueue cliMsgs; + queue conn; + uint64_t expireTime; + + STransCtx ctx; bool broken; // link broken or not ConnStatus status; // @@ -157,13 +157,11 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); transClearBuffer(&conn->readBuf); \ transFreeMsg(transContFromHead((char*)head)); \ tDebug("cli conn %p receive release request, ref: %d", conn, T_REF_VAL_GET(conn)); \ - while (T_REF_VAL_GET(conn) > 1) { \ - transUnrefCliHandle(conn); \ - } \ - if (T_REF_VAL_GET(conn) == 1) { \ + if (T_REF_VAL_GET(conn) > 1) { \ transUnrefCliHandle(conn); \ } \ destroyCmsg(pMsg); \ + addConnToPool(((SCliThrdObj*)conn->hostThrd)->pool, conn); \ return; \ } \ } while (0) @@ -707,7 +705,8 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { uint64_t et = taosGetTimestampUs(); uint64_t el = et - pMsg->st; - tTrace("%s cli msg tran time cost: %" PRIu64 "us", ((STrans*)pThrd->pTransInst)->label, el); + tTrace("%s cli msg tran time cost: %" PRIu64 "us, threadID: %" PRId64 "", ((STrans*)pThrd->pTransInst)->label, el, + pThrd->thread); STransConnCtx* pCtx = pMsg->ctx; STrans* pTransInst = pThrd->pTransInst; @@ -1030,8 +1029,8 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; - tDebug("send request at thread:%d %p, dst: %s:%d, app:%p", index, pReq, EPSET_GET_INUSE_IP(&pCtx->epSet), - EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->ahandle); + tDebug("send request at thread:%d, threadID: %" PRId64 ", msg: %p, dst: %s:%d, app:%p", index, thrd->thread, pReq, + EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->ahandle); ASSERT(transSendAsync(thrd->asyncPool, &(cliMsg->q)) == 0); } @@ -1058,8 +1057,8 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM cliMsg->type = Normal; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; - tDebug("send request at thread:%d %p, dst: %s:%d, app:%p", index, pReq, EPSET_GET_INUSE_IP(&pCtx->epSet), - EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->ahandle); + tDebug("send request at thread:%d, threadID:%" PRId64 ", msg: %p, dst: %s:%d, app:%p", index, thrd->thread, pReq, + EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->ahandle); transSendAsync(thrd->asyncPool, &(cliMsg->q)); tsem_t* pSem = pCtx->pSem; diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index af66d39904fbfce2e365f810d40c3ba3f4c3e26e..fc840691b6a62753b4c3950d9c5df81c472c1b86 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -35,7 +35,6 @@ typedef struct SSrvConn { uv_timer_t pTimer; queue queue; - int persist; // persist connection or not SConnBuffer readBuf; // read buf, int inType; void* pTransInst; // rpc init @@ -138,6 +137,7 @@ static void destroySmsg(SSrvMsg* smsg); // check whether already read complete packet static SSrvConn* createConn(void* hThrd); static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/); +static int reallocConnRefHandle(SSrvConn* conn); static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd); @@ -164,7 +164,7 @@ static void* transWorkerThread(void* arg); static void* transAcceptThread(void* arg); // add handle loop -static bool addHandleToWorkloop(SWorkThrdObj* pThrd,char *pipeName); +static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName); static bool addHandleToAcceptloop(void* arg); #define CONN_SHOULD_RELEASE(conn, head) \ @@ -180,6 +180,7 @@ static bool addHandleToAcceptloop(void* arg); srvMsg->msg = tmsg; \ srvMsg->type = Release; \ srvMsg->pConn = conn; \ + reallocConnRefHandle(conn); \ if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ return; \ } \ @@ -360,10 +361,14 @@ void uvOnSendCb(uv_write_t* req, int status) { tTrace("server conn %p data already was written on stream", conn); if (!transQueueEmpty(&conn->srvMsgs)) { SSrvMsg* msg = transQueuePop(&conn->srvMsgs); - if (msg->type == Release && conn->status != ConnNormal) { - conn->status = ConnNormal; - transUnrefSrvHandle(conn); - } + // if (msg->type == Release && conn->status != ConnNormal) { + // conn->status = ConnNormal; + // transUnrefSrvHandle(conn); + // reallocConnRefHandle(conn); + // destroySmsg(msg); + // transQueueClear(&conn->srvMsgs); + // return; + //} destroySmsg(msg); // send second data, just use for push if (!transQueueEmpty(&conn->srvMsgs)) { @@ -421,8 +426,15 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { if (pConn->status == ConnNormal) { pHead->msgType = pConn->inType + 1; } else { - pHead->msgType = smsg->type == Release ? 0 : pMsg->msgType; + if (smsg->type == Release) { + pHead->msgType = 0; + pConn->status = ConnNormal; + transUnrefSrvHandle(pConn); + } else { + pHead->msgType = pMsg->msgType; + } } + pHead->release = smsg->type == Release ? 1 : 0; pHead->code = htonl(pMsg->code); @@ -517,7 +529,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) { int64_t refId = transMsg.refId; SExHandle* exh2 = uvAcquireExHandle(refId); if (exh2 == NULL || exh1 != exh2) { - tTrace("server handle %p except msg, ignore it", exh1); + tTrace("server handle except msg %p, ignore it", exh1); uvReleaseExHandle(refId); destroySmsg(msg); continue; @@ -581,11 +593,12 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { if (uv_accept(stream, (uv_stream_t*)cli) == 0) { if (pObj->numOfWorkerReady < pObj->numOfThreads) { - tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads, pObj->numOfWorkerReady); + tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads, + pObj->numOfWorkerReady); uv_close((uv_handle_t*)cli, NULL); return; } - + uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t)); wr->data = cli; uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify)); @@ -681,14 +694,14 @@ void* transAcceptThread(void* arg) { return NULL; } -void uvOnPipeConnectionCb(uv_connect_t *connect, int status) { +void uvOnPipeConnectionCb(uv_connect_t* connect, int status) { if (status != 0) { return; } SWorkThrdObj* pThrd = container_of(connect, SWorkThrdObj, connect_req); uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); } -static bool addHandleToWorkloop(SWorkThrdObj* pThrd,char *pipeName) { +static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName) { pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); if (0 != uv_loop_init(pThrd->loop)) { return false; @@ -787,6 +800,19 @@ static void destroyConn(SSrvConn* conn, bool clear) { // uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb); } } +static int reallocConnRefHandle(SSrvConn* conn) { + uvReleaseExHandle(conn->refId); + uvRemoveExHandle(conn->refId); + // avoid app continue to send msg on invalid handle + SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); + exh->handle = conn; + exh->pThrd = conn->hostThrd; + exh->refId = uvAddExHandle(exh); + uvAcquireExHandle(exh->refId); + conn->refId = exh->refId; + + return 0; +} static void uvDestroyConn(uv_handle_t* handle) { SSrvConn* conn = handle->data; if (conn == NULL) { @@ -822,7 +848,7 @@ static void uvPipeListenCb(uv_stream_t* handle, int status) { ASSERT(status == 0); SServerObj* srv = container_of(handle, SServerObj, pipeListen); - uv_pipe_t* pipe = &(srv->pipe[srv->numOfWorkerReady][0]); + uv_pipe_t* pipe = &(srv->pipe[srv->numOfWorkerReady][0]); ASSERT(0 == uv_pipe_init(srv->loop, pipe, 1)); ASSERT(0 == uv_accept((uv_stream_t*)&srv->pipeListen, (uv_stream_t*)pipe)); @@ -859,7 +885,8 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%p-%lu", taosSafeRand(), GetCurrentProcessId()); #else char pipeName[PATH_MAX] = {0}; - snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08X-%lu", tsTempDir, TD_DIRSEP, taosSafeRand(), taosGetSelfPthreadId()); + snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08X-%lu", tsTempDir, TD_DIRSEP, taosSafeRand(), + taosGetSelfPthreadId()); #endif assert(0 == uv_pipe_bind(&srv->pipeListen, pipeName)); assert(0 == uv_listen((uv_stream_t*)&srv->pipeListen, SOMAXCONN, uvPipeListenCb)); @@ -874,7 +901,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); thrd->pipe = &(srv->pipe[i][1]); // init read - if (false == addHandleToWorkloop(thrd,pipeName)) { + if (false == addHandleToWorkloop(thrd, pipeName)) { goto End; } int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd)); @@ -959,6 +986,7 @@ void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) { void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) { SSrvConn* conn = msg->pConn; if (conn->status == ConnAcquire) { + reallocConnRefHandle(conn); if (!transQueuePush(&conn->srvMsgs, msg)) { return; } diff --git a/tests/parallel_test/collect_cases.sh b/tests/parallel_test/collect_cases.sh new file mode 100755 index 0000000000000000000000000000000000000000..c560598c81c54957b076e028a1accc9a7fd38462 --- /dev/null +++ b/tests/parallel_test/collect_cases.sh @@ -0,0 +1,45 @@ +#!/bin/bash + +case_file=/tmp/cases.task + +function usage() { + echo "$0" + echo -e "\t -o output case file" + echo -e "\t -e enterprise edition" + echo -e "\t -h help" +} + +ent=0 +while getopts "o:eh" opt; do + case $opt in + o) + case_file=$OPTARG + ;; + e) + ent=1 + ;; + h) + usage + exit 0 + ;; + \?) + echo "Invalid option: -$OPTARG" + usage + exit 0 + ;; + esac +done + +script_dir=`dirname $0` +cd $script_dir + +if [ $ent -eq 0 ]; then + echo ",,unit-test,bash test.sh" >$case_file +else + echo ",,unit-test,bash test.sh -e" >$case_file +fi +cat ../script/jenkins/basic.txt |grep -v "^#"|grep -v "^$"|sed "s/^/,,script,/" >>$case_file +grep "^python" ../system-test/fulltest.sh |sed "s/^/,,system-test,/" >>$case_file + +exit 0 + diff --git a/tests/parallel_test/container_build.sh b/tests/parallel_test/container_build.sh new file mode 100755 index 0000000000000000000000000000000000000000..3f23cd8b5f9239ef2b703940564bc5c89c537988 --- /dev/null +++ b/tests/parallel_test/container_build.sh @@ -0,0 +1,59 @@ +#!/bin/bash + +function usage() { + echo "$0" + echo -e "\t -w work dir" + echo -e "\t -e enterprise edition" + echo -e "\t -t make thread count" + echo -e "\t -h help" +} + +ent=0 +while getopts "w:t:eh" opt; do + case $opt in + w) + WORKDIR=$OPTARG + ;; + e) + ent=1 + ;; + t) + THREAD_COUNT=$OPTARG + ;; + h) + usage + exit 0 + ;; + \?) + echo "Invalid option: -$OPTARG" + usage + exit 0 + ;; + esac +done + +if [ -z "$WORKDIR" ]; then + usage + exit 1 +fi +if [ -z "$THREAD_COUNT" ]; then + THREAD_COUNT=1 +fi + +ulimit -c unlimited + +if [ $ent -eq 0 ]; then + REP_DIR=/home/TDengine + REP_MOUNT_PARAM=$WORKDIR/TDengine:/home/TDengine +else + REP_DIR=/home/TDinternal + REP_MOUNT_PARAM=$WORKDIR/TDinternal:/home/TDinternal +fi + +docker run \ + -v $REP_MOUNT_PARAM \ + --rm --ulimit core=-1 taos_test:v1.0 sh -c "cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_TOOLS=true;make -j $THREAD_COUNT" + +ret=$? +exit $ret + diff --git a/tests/parallel_test/run.sh b/tests/parallel_test/run.sh new file mode 100755 index 0000000000000000000000000000000000000000..6417f41fd46f11a749ec684be076561696657d60 --- /dev/null +++ b/tests/parallel_test/run.sh @@ -0,0 +1,357 @@ +#!/bin/bash + +function usage() { + echo "$0" + echo -e "\t -m vm config file" + echo -e "\t -t task file" + echo -e "\t -b branch" + echo -e "\t -l log dir" + echo -e "\t -e enterprise edition" + echo -e "\t -o default timeout value" + echo -e "\t -h help" +} + +ent=0 +while getopts "m:t:b:l:o:eh" opt; do + case $opt in + m) + config_file=$OPTARG + ;; + t) + t_file=$OPTARG + ;; + b) + branch=$OPTARG + ;; + l) + log_dir=$OPTARG + ;; + e) + ent=1 + ;; + o) + timeout_param="-o $OPTARG" + ;; + h) + usage + exit 0 + ;; + \?) + echo "Invalid option: -$OPTARG" + usage + exit 0 + ;; + esac +done +#config_file=$1 +if [ -z $config_file ]; then + usage + exit 1 +fi +if [ ! -f $config_file ]; then + echo "$config_file not found" + usage + exit 1 +fi +#t_file=$2 +if [ -z $t_file ]; then + usage + exit 1 +fi +if [ ! -f $t_file ]; then + echo "$t_file not found" + usage + exit 1 +fi +date_tag=`date +%Y%m%d-%H%M%S` +if [ -z $log_dir ]; then + log_dir="log/${branch}_${date_tag}" +else + log_dir="$log_dir/${branch}_${date_tag}" +fi + +hosts=() +usernames=() +passwords=() +workdirs=() +threads=() + +i=0 +while [ 1 ]; do + host=`jq .[$i].host $config_file` + if [ "$host" = "null" ]; then + break + fi + username=`jq .[$i].username $config_file` + if [ "$username" = "null" ]; then + break + fi + password=`jq .[$i].password $config_file` + if [ "$password" = "null" ]; then + password="" + fi + workdir=`jq .[$i].workdir $config_file` + if [ "$workdir" = "null" ]; then + break + fi + thread=`jq .[$i].thread $config_file` + if [ "$thread" = "null" ]; then + break + fi + hosts[i]=`echo $host|sed 's/\"$//'|sed 's/^\"//'` + usernames[i]=`echo $username|sed 's/\"$//'|sed 's/^\"//'` + passwords[i]=`echo $password|sed 's/\"$//'|sed 's/^\"//'` + workdirs[i]=`echo $workdir|sed 's/\"$//'|sed 's/^\"//'` + threads[i]=$thread + i=$(( i + 1 )) +done + + +function prepare_cases() { + cat $t_file >>$task_file + local i=0 + while [ $i -lt $1 ]; do + echo "%%FINISHED%%" >>$task_file + i=$(( i + 1 )) + done +} + +function clean_tmp() { + # clean tmp dir + local index=$1 + local ssh_script="sshpass -p ${passwords[index]} ssh -o StrictHostKeyChecking=no ${usernames[index]}@${hosts[index]}" + if [ -z ${passwords[index]} ]; then + ssh_script="ssh -o StrictHostKeyChecking=no ${usernames[index]}@${hosts[index]}" + fi + local cmd="${ssh_script} rm -rf ${workdirs[index]}/tmp" + ${cmd} +} + +function run_thread() { + local index=$1 + local thread_no=$2 + local runcase_script="sshpass -p ${passwords[index]} ssh -o StrictHostKeyChecking=no ${usernames[index]}@${hosts[index]}" + if [ -z ${passwords[index]} ]; then + runcase_script="ssh -o StrictHostKeyChecking=no ${usernames[index]}@${hosts[index]}" + fi + local count=0 + local script="${workdirs[index]}/TDengine/tests/parallel_test/run_container.sh" + if [ $ent -ne 0 ]; then + local script="${workdirs[index]}/TDinternal/community/tests/parallel_test/run_container.sh -e" + fi + local cmd="${runcase_script} ${script}" + + # script="echo" + while [ 1 ]; do + local line=`flock -x $lock_file -c "head -n1 $task_file;sed -i \"1d\" $task_file"` + if [ "x$line" = "x%%FINISHED%%" ]; then + # echo "$index . $thread_no EXIT" + break + fi + if [ -z "$line" ]; then + continue + fi + echo "$line"|grep -q "^#" + if [ $? -eq 0 ]; then + continue + fi + local case_redo_time=`echo "$line"|cut -d, -f2` + if [ -z "$case_redo_time" ]; then + case_redo_time=${DEFAULT_RETRY_TIME:-2} + fi + local exec_dir=`echo "$line"|cut -d, -f3` + local case_cmd=`echo "$line"|cut -d, -f4` + local case_file="" + echo "$case_cmd"|grep -q "\.sh" + if [ $? -eq 0 ]; then + case_file=`echo "$case_cmd"|grep -o ".*\.sh"|awk '{print $NF}'` + fi + echo "$case_cmd"|grep -q "^python3" + if [ $? -eq 0 ]; then + case_file=`echo "$case_cmd"|grep -o ".*\.py"|awk '{print $NF}'` + fi + echo "$case_cmd"|grep -q "\.sim" + if [ $? -eq 0 ]; then + case_file=`echo "$case_cmd"|grep -o ".*\.sim"|awk '{print $NF}'` + fi + if [ -z "$case_file" ]; then + case_file=`echo "$case_cmd"|awk '{print $NF}'` + fi + if [ -z "$case_file" ]; then + continue + fi + case_file="$exec_dir/${case_file}.${index}.${thread_no}.${count}" + count=$(( count + 1 )) + local case_path=`dirname "$case_file"` + if [ ! -z "$case_path" ]; then + mkdir -p $log_dir/$case_path + fi + cmd="${runcase_script} ${script} -w ${workdirs[index]} -c \"${case_cmd}\" -t ${thread_no} -d ${exec_dir} ${timeout_param}" + # echo "$thread_no $count $cmd" + local ret=0 + local redo_count=1 + start_time=`date +%s` + while [ ${redo_count} -lt 6 ]; do + if [ -f $log_dir/$case_file.log ]; then + cp $log_dir/$case_file.log $log_dir/$case_file.${redo_count}.redolog + fi + echo "${hosts[index]}-${thread_no} order:${count}, redo:${redo_count} task:${line}" >$log_dir/$case_file.log + echo -e "\e[33m >>>>> \e[0m ${case_cmd}" + date >>$log_dir/$case_file.log + # $cmd 2>&1 | tee -a $log_dir/$case_file.log + # ret=${PIPESTATUS[0]} + $cmd >>$log_dir/$case_file.log 2>&1 + ret=$? + echo "${hosts[index]} `date` ret:${ret}" >>$log_dir/$case_file.log + if [ $ret -eq 0 ]; then + break + fi + redo=0 + grep -q "wait too long for taosd start" $log_dir/$case_file.log + if [ $? -eq 0 ]; then + redo=1 + fi + grep -q "kex_exchange_identification: Connection closed by remote host" $log_dir/$case_file.log + if [ $? -eq 0 ]; then + redo=1 + fi + grep -q "ssh_exchange_identification: Connection closed by remote host" $log_dir/$case_file.log + if [ $? -eq 0 ]; then + redo=1 + fi + grep -q "kex_exchange_identification: read: Connection reset by peer" $log_dir/$case_file.log + if [ $? -eq 0 ]; then + redo=1 + fi + grep -q "Database not ready" $log_dir/$case_file.log + if [ $? -eq 0 ]; then + redo=1 + fi + grep -q "Unable to establish connection" $log_dir/$case_file.log + if [ $? -eq 0 ]; then + redo=1 + fi + if [ $redo_count -lt $case_redo_time ]; then + redo=1 + fi + if [ $redo -eq 0 ]; then + break + fi + redo_count=$(( redo_count + 1 )) + done + end_time=`date +%s` + echo >>$log_dir/$case_file.log + echo "${hosts[index]} execute time: $(( end_time - start_time ))s" >>$log_dir/$case_file.log + # echo "$thread_no ${line} DONE" + if [ $ret -ne 0 ]; then + flock -x $lock_file -c "echo \"${hosts[index]} ret:${ret} ${line}\" >>$log_dir/failed.log" + mkdir -p $log_dir/${case_file}.coredump + local remote_coredump_dir="${workdirs[index]}/tmp/thread_volume/$thread_no/coredump" + local scpcmd="sshpass -p ${passwords[index]} scp -o StrictHostKeyChecking=no -r ${usernames[index]}@${hosts[index]}" + if [ -z ${passwords[index]} ]; then + scpcmd="scp -o StrictHostKeyChecking=no -r ${usernames[index]}@${hosts[index]}" + fi + cmd="$scpcmd:${remote_coredump_dir}/* $log_dir/${case_file}.coredump/" + $cmd # 2>/dev/null + local case_info=`echo "$line"|cut -d, -f 3,4` + local corefile=`ls $log_dir/${case_file}.coredump/` + corefile=`find $log_dir/${case_file}.coredump/ -name "core.*"` + echo -e "$case_info \e[31m failed\e[0m" + echo "=========================log============================" + cat $log_dir/$case_file.log + echo "=====================================================" + echo -e "\e[34m log file: $log_dir/$case_file.log \e[0m" + if [ ! -z "$corefile" ]; then + echo -e "\e[34m corefiles: $corefile \e[0m" + local build_dir=$log_dir/build_${hosts[index]} + local remote_build_dir="${workdirs[index]}/TDengine/debug/build" + if [ $ent -ne 0 ]; then + remote_build_dir="${workdirs[index]}/TDinternal/debug/build" + fi + mkdir $build_dir 2>/dev/null + if [ $? -eq 0 ]; then + # scp build binary + cmd="$scpcmd:${remote_build_dir}/* ${build_dir}/" + echo "$cmd" + $cmd >/dev/null + fi + fi + # get remote sim dir + local remote_sim_dir="${workdirs[index]}/tmp/thread_volume/$thread_no" + local tarcmd="sshpass -p ${passwords[index]} ssh -o StrictHostKeyChecking=no -r ${usernames[index]}@${hosts[index]}" + if [ -z ${passwords[index]} ]; then + tarcmd="ssh -o StrictHostKeyChecking=no ${usernames[index]}@${hosts[index]}" + fi + cmd="$tarcmd sh -c \"cd $remote_sim_dir; tar -czf sim.tar.gz sim\"" + $cmd + local remote_sim_tar="${workdirs[index]}/tmp/thread_volume/$thread_no/sim.tar.gz" + scpcmd="sshpass -p ${passwords[index]} scp -o StrictHostKeyChecking=no -r ${usernames[index]}@${hosts[index]}" + if [ -z ${passwords[index]} ]; then + scpcmd="scp -o StrictHostKeyChecking=no -r ${usernames[index]}@${hosts[index]}" + fi + cmd="$scpcmd:${remote_sim_tar} $log_dir/${case_file}.sim.tar.gz" + $cmd + fi + done +} + +# echo "hosts: ${hosts[@]}" +# echo "usernames: ${usernames[@]}" +# echo "passwords: ${passwords[@]}" +# echo "workdirs: ${workdirs[@]}" +# echo "threads: ${threads[@]}" +# TODO: check host accessibility + +i=0 +while [ $i -lt ${#hosts[*]} ]; do + clean_tmp $i & + i=$(( i + 1 )) +done +wait + +mkdir -p $log_dir +rm -rf $log_dir/* +task_file=$log_dir/$$.task +lock_file=$log_dir/$$.lock + +i=0 +j=0 +while [ $i -lt ${#hosts[*]} ]; do + j=$(( j + threads[i] )) + i=$(( i + 1 )) +done +prepare_cases $j + +i=0 +while [ $i -lt ${#hosts[*]} ]; do + j=0 + while [ $j -lt ${threads[i]} ]; do + run_thread $i $j & + j=$(( j + 1 )) + done + i=$(( i + 1 )) +done + +wait + +rm -f $lock_file +rm -f $task_file + +# docker ps -a|grep -v CONTAINER|awk '{print $1}'|xargs docker rm -f +RET=0 +i=1 +if [ -f "$log_dir/failed.log" ]; then + echo "=====================================================" + while read line; do + line=`echo "$line"|cut -d, -f 3,4` + echo -e "$i. $line \e[31m failed\e[0m" >&2 + i=$(( i + 1 )) + done <$log_dir/failed.log + RET=1 +fi + +echo "${log_dir}" >&2 + +date + +exit $RET diff --git a/tests/parallel_test/run_case.sh b/tests/parallel_test/run_case.sh new file mode 100755 index 0000000000000000000000000000000000000000..9705c024b85c8add289e3a7cca1331e7e26efdcf --- /dev/null +++ b/tests/parallel_test/run_case.sh @@ -0,0 +1,74 @@ +#!/bin/bash + +function usage() { + echo "$0" + echo -e "\t -d execution dir" + echo -e "\t -c command" + echo -e "\t -e enterprise edition" + echo -e "\t -o default timeout value" + echo -e "\t -h help" +} + +ent=0 +while getopts "d:c:o:eh" opt; do + case $opt in + d) + exec_dir=$OPTARG + ;; + c) + cmd=$OPTARG + ;; + o) + TIMEOUT_CMD="timeout $OPTARG" + ;; + e) + ent=1 + ;; + h) + usage + exit 0 + ;; + \?) + echo "Invalid option: -$OPTARG" + usage + exit 0 + ;; + esac +done + +if [ -z "$exec_dir" ]; then + usage + exit 0 +fi +if [ -z "$cmd" ]; then + usage + exit 0 +fi + +if [ $ent -eq 0 ]; then + export PATH=$PATH:/home/TDengine/debug/build/bin + export LD_LIBRARY_PATH=/home/TDengine/debug/build/lib + ln -s /home/TDengine/debug/build/lib/libtaos.so /usr/lib/libtaos.so 2>/dev/null + CONTAINER_TESTDIR=/home/TDengine +else + export PATH=$PATH:/home/TDinternal/debug/build/bin + export LD_LIBRARY_PATH=/home/TDinternal/debug/build/lib + ln -s /home/TDinternal/debug/build/lib/libtaos.so /usr/lib/libtaos.so 2>/dev/null + CONTAINER_TESTDIR=/home/TDinternal/community +fi +mkdir -p /var/lib/taos/subscribe +mkdir -p /var/log/taos +mkdir -p /var/lib/taos + +cd $CONTAINER_TESTDIR/tests/$exec_dir +ulimit -c unlimited + +$TIMEOUT_CMD $cmd +RET=$? + +if [ $RET -ne 0 ]; then + pwd +fi + +exit $RET + diff --git a/tests/parallel_test/run_container.sh b/tests/parallel_test/run_container.sh new file mode 100755 index 0000000000000000000000000000000000000000..affd9128a46bed4cad123d72fcd09e95d5d2bf0f --- /dev/null +++ b/tests/parallel_test/run_container.sh @@ -0,0 +1,106 @@ +#!/bin/bash + +function usage() { + echo "$0" + echo -e "\t -w work dir" + echo -e "\t -d execution dir" + echo -e "\t -c command" + echo -e "\t -t thread number" + echo -e "\t -e enterprise edition" + echo -e "\t -o default timeout value" + echo -e "\t -h help" +} + +ent=0 +while getopts "w:d:c:t:o:eh" opt; do + case $opt in + w) + WORKDIR=$OPTARG + ;; + d) + exec_dir=$OPTARG + ;; + c) + cmd=$OPTARG + ;; + t) + thread_no=$OPTARG + ;; + e) + ent=1 + ;; + o) + extra_param="-o $OPTARG" + ;; + h) + usage + exit 0 + ;; + \?) + echo "Invalid option: -$OPTARG" + usage + exit 0 + ;; + esac +done + +if [ -z "$WORKDIR" ]; then + usage + exit 1 +fi +if [ -z "$exec_dir" ]; then + usage + exit 1 +fi +if [ -z "$cmd" ]; then + usage + exit 1 +fi +if [ -z "$thread_no" ]; then + usage + exit 1 +fi +if [ $ent -ne 0 ]; then + # enterprise edition + extra_param="$extra_param -e" + INTERNAL_REPDIR=$WORKDIR/TDinternal + REPDIR=$INTERNAL_REPDIR/community + CONTAINER_TESTDIR=/home/TDinternal/community + SIM_DIR=/home/TDinternal/sim + REP_MOUNT_PARAM="$INTERNAL_REPDIR:/home/TDinternal" +else + # community edition + REPDIR=$WORKDIR/TDengine + CONTAINER_TESTDIR=/home/TDengine + SIM_DIR=/home/TDengine/sim + REP_MOUNT_PARAM="$REPDIR:/home/TDengine" +fi + +ulimit -c unlimited + +TMP_DIR=$WORKDIR/tmp + +MOUNT_DIR="" +rm -rf ${TMP_DIR}/thread_volume/$thread_no/sim +mkdir -p ${TMP_DIR}/thread_volume/$thread_no/sim/tsim +mkdir -p ${TMP_DIR}/thread_volume/$thread_no/coredump +rm -rf ${TMP_DIR}/thread_volume/$thread_no/coredump/* +if [ ! -d "${TMP_DIR}/thread_volume/$thread_no/$exec_dir" ]; then + subdir=`echo "$exec_dir"|cut -d/ -f1` + echo "cp -rf ${REPDIR}/tests/$subdir ${TMP_DIR}/thread_volume/$thread_no/" + cp -rf ${REPDIR}/tests/$subdir ${TMP_DIR}/thread_volume/$thread_no/ +fi +MOUNT_DIR="$TMP_DIR/thread_volume/$thread_no/$exec_dir:$CONTAINER_TESTDIR/tests/$exec_dir" +echo "$thread_no -> ${exec_dir}:$cmd" +coredump_dir=`cat /proc/sys/kernel/core_pattern | xargs dirname` + +docker run \ + -v $REP_MOUNT_PARAM \ + -v $MOUNT_DIR \ + -v "$TMP_DIR/thread_volume/$thread_no/sim:${SIM_DIR}" \ + -v ${TMP_DIR}/thread_volume/$thread_no/coredump:$coredump_dir \ + -v $WORKDIR/taos-connector-python/taos:/usr/local/lib/python3.8/site-packages/taos:ro \ + --rm --ulimit core=-1 taos_test:v1.0 $CONTAINER_TESTDIR/tests/parallel_test/run_case.sh -d "$exec_dir" -c "$cmd" $extra_param +ret=$? +exit $ret + diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py index 9dcd485194c77951cab8b88c6abfe73e67ec0506..3c23e784c5e7eca90ea60b01f35a0a396c8dcccc 100644 --- a/tests/pytest/util/dnodes.py +++ b/tests/pytest/util/dnodes.py @@ -35,7 +35,7 @@ class TDSimClient: "tableIncStepPerVnode": "10000", "maxVgroupsPerDb": "1000", "sdbDebugFlag": "143", - "rpcDebugFlag": "135", + "rpcDebugFlag": "143", "tmrDebugFlag": "131", "cDebugFlag": "135", "udebugFlag": "135", @@ -136,7 +136,7 @@ class TDDnode: "tsdbDebugFlag": "135", "mDebugFlag": "135", "sdbDebugFlag": "135", - "rpcDebugFlag": "135", + "rpcDebugFlag": "143", "tmrDebugFlag": "131", "cDebugFlag": "135", "httpDebugFlag": "135", diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 8420b9106563f31b5a6f3290a50e53c664c64492..f6bc9f83064bca6da216fae4e23260e8bff83645 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -63,6 +63,7 @@ # ---- tstream ./test.sh -f tsim/tstream/basic0.sim +./test.sh -f tsim/tstream/basic1.sim # ---- transaction ./test.sh -f tsim/trans/create_db.sim diff --git a/tests/script/tsim/tstream/basic1.sim b/tests/script/tsim/tstream/basic1.sim new file mode 100644 index 0000000000000000000000000000000000000000..3bb5943b3b48b23115b48bb96e1bf41c186836f3 --- /dev/null +++ b/tests/script/tsim/tstream/basic1.sim @@ -0,0 +1,375 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print =============== create database +sql create database test vgroups 1 +sql show databases +if $rows != 3 then + return -1 +endi + +print $data00 $data01 $data02 + +sql use test + + +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams1 trigger at_once into streamt as select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s); +sql insert into t1 values(1648791213000,1,2,3,1.0); +sql insert into t1 values(1648791223001,2,2,3,1.1); +sql insert into t1 values(1648791233002,3,2,3,2.1); +sql insert into t1 values(1648791243003,4,2,3,3.1); +sql insert into t1 values(1648791213004,4,2,3,4.1); +sleep 1000 +sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; + +if $rows != 4 then + print ======$rows + return -1 +endi + +# row 0 +if $data01 != 2 then + print ======$data01 + return -1 +endi + +if $data02 != 2 then + print ======$data02 + return -1 +endi + +if $data03 != 5 then + print ======$data03 + return -1 +endi + +if $data04 != 2 then + print ======$data04 + return -1 +endi + +if $data05 != 3 then + print ======$data05 + return -1 +endi + +# row 1 +if $data11 != 1 then + print ======$data11 + return -1 +endi + +if $data12 != 1 then + print ======$data12 + return -1 +endi + +if $data13 != 2 then + print ======$data13 + return -1 +endi + +if $data14 != 2 then + print ======$data14 + return -1 +endi + +if $data15 != 3 then + print ======$data15 + return -1 +endi + +# row 2 +if $data21 != 1 then + print ======$data21 + return -1 +endi + +if $data22 != 1 then + print ======$data22 + return -1 +endi + +if $data23 != 3 then + print ======$data23 + return -1 +endi + +if $data24 != 2 then + print ======$data24 + return -1 +endi + +if $data25 != 3 then + print ======$data25 + return -1 +endi + +# row 3 +if $data31 != 1 then + print ======$data31 + return -1 +endi + +if $data32 != 1 then + print ======$data32 + return -1 +endi + +if $data33 != 4 then + print ======$data33 + return -1 +endi + +if $data34 != 2 then + print ======$data34 + return -1 +endi + +if $data35 != 3 then + print ======$data35 + return -1 +endi + +sql insert into t1 values(1648791223001,12,14,13,11.1); +sleep 100 +sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; + +if $rows != 4 then + print ======$rows + return -1 +endi + +# row 0 +if $data01 != 2 then + print ======$data01 + return -1 +endi + +if $data02 != 2 then + print ======$data02 + return -1 +endi + +if $data03 != 5 then + print ======$data03 + return -1 +endi + +if $data04 != 2 then + print ======$data04 + return -1 +endi + +if $data05 != 3 then + print ======$data05 + return -1 +endi + +# row 1 +if $data11 != 1 then + print ======$data11 + return -1 +endi + +if $data12 != 1 then + print ======$data12 + return -1 +endi + +if $data13 != 12 then + print ======$data13 + return -1 +endi + +if $data14 != 14 then + print ======$data14 + return -1 +endi + +if $data15 != 13 then + print ======$data15 + return -1 +endi + +# row 2 +if $data21 != 1 then + print ======$data21 + return -1 +endi + +if $data22 != 1 then + print ======$data22 + return -1 +endi + +if $data23 != 3 then + print ======$data23 + return -1 +endi + +if $data24 != 2 then + print ======$data24 + return -1 +endi + +if $data25 != 3 then + print ======$data25 + return -1 +endi + +# row 3 +if $data31 != 1 then + print ======$data31 + return -1 +endi + +if $data32 != 1 then + print ======$data32 + return -1 +endi + +if $data33 != 4 then + print ======$data33 + return -1 +endi + +if $data34 != 2 then + print ======$data34 + return -1 +endi + +if $data35 != 3 then + print ======$data35 + return -1 +endi + +sql insert into t1 values(1648791223002,12,14,13,11.1); +sleep 100 +sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; + +# row 1 +if $data11 != 2 then + print ======$data11 + return -1 +endi + +if $data12 != 2 then + print ======$data12 + return -1 +endi + +if $data13 != 24 then + print ======$data13 + return -1 +endi + +if $data14 != 14 then + print ======$data14 + return -1 +endi + +if $data15 != 13 then + print ======$data15 + return -1 +endi + +sql insert into t1 values(1648791223003,12,14,13,11.1); +sleep 100 +sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; + +# row 1 +if $data11 != 3 then + print ======$data11 + return -1 +endi + +if $data12 != 3 then + print ======$data12 + return -1 +endi + +if $data13 != 36 then + print ======$data13 + return -1 +endi + +if $data14 != 14 then + print ======$data14 + return -1 +endi + +if $data15 != 13 then + print ======$data15 + return -1 +endi + +sql insert into t1 values(1648791223001,1,1,1,1.1); +sql insert into t1 values(1648791223002,2,2,2,2.1); +sql insert into t1 values(1648791223003,3,3,3,3.1); +sleep 100 +sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; + +# row 1 +if $data11 != 3 then + print ======$data11 + return -1 +endi + +if $data12 != 3 then + print ======$data12 + return -1 +endi + +if $data13 != 6 then + print ======$data13 + return -1 +endi + +if $data14 != 3 then + print ======$data14 + return -1 +endi + +if $data15 != 1 then + print ======$data15 + return -1 +endi + +sql insert into t1 values(1648791233003,3,2,3,2.1); +sql insert into t1 values(1648791233002,5,6,7,8.1); +sql insert into t1 values(1648791233002,3,2,3,2.1); +sleep 100 +sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; + +# row 2 +if $data21 != 2 then + print ======$data21 + return -1 +endi + +if $data22 != 2 then + print ======$data22 + return -1 +endi + +if $data23 != 6 then + print ======$data23 + return -1 +endi + +if $data24 != 2 then + print ======$data24 + return -1 +endi + +if $data25 != 3 then + print ======$data25 + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/system-test/0-others/taosShellNetChk.py b/tests/system-test/0-others/taosShellNetChk.py index 524268101afcca88e06bdbed4da0bdd55e6633b5..bbaeacf328fd5422ccd018a79ce6d9c632a370a9 100644 --- a/tests/system-test/0-others/taosShellNetChk.py +++ b/tests/system-test/0-others/taosShellNetChk.py @@ -138,7 +138,8 @@ class TDTestCase: if "2: service ok" in retVal: tdLog.info("taos -k success") else: - tdLog.exit("taos -k fail") + tdLog.info(retVal) + tdLog.exit("taos -k fail 1") # stop taosd tdDnodes.stop(1) @@ -149,7 +150,8 @@ class TDTestCase: if "0: unavailable" in retVal: tdLog.info("taos -k success") else: - tdLog.exit("taos -k fail") + tdLog.info(retVal) + tdLog.exit("taos -k fail 2") # restart taosd tdDnodes.start(1) @@ -158,7 +160,8 @@ class TDTestCase: if "2: service ok" in retVal: tdLog.info("taos -k success") else: - tdLog.exit("taos -k fail") + tdLog.info(retVal) + tdLog.exit("taos -k fail 3") tdLog.printNoPrefix("================================ parameter: -n") # stop taosd diff --git a/tests/system-test/0-others/user_control.py b/tests/system-test/0-others/user_control.py index ec3f8ce11a7661261545bf1160809f746493ceb1..78aefd5e9ef2366af1c7b4113918ffa40f844bb8 100644 --- a/tests/system-test/0-others/user_control.py +++ b/tests/system-test/0-others/user_control.py @@ -6,6 +6,7 @@ import traceback from util.log import * from util.sql import * from util.cases import * +from util.dnodes import * PRIVILEGES_ALL = "ALL" @@ -161,6 +162,8 @@ class TDTestCase: for sql in sqls: tdSql.error(sql) + tdSql.execute("DROP USER u1") + def __alter_pass_sql(self, user, passwd): return f'''ALTER USER {user} PASS '{passwd}' ''' @@ -237,7 +240,7 @@ class TDTestCase: f"DROP user {self.__user_list[0]} , {self.__user_list[1]}", f"DROP users {self.__user_list[0]} {self.__user_list[1]}", f"DROP users {self.__user_list[0]} , {self.__user_list[1]}", - "DROP user root", + # "DROP user root", "DROP user abcde", "DROP user ALL", ] @@ -267,7 +270,7 @@ class TDTestCase: # 查看用户 tdLog.printNoPrefix("==========step2: show user test") tdSql.query("show users") - tdSql.checkRows(self.users_count + 2) + tdSql.checkRows(self.users_count + 1) # 密码登录认证 self.login_currrent(self.__user_list[0], self.__passwd_list[0]) @@ -282,30 +285,36 @@ class TDTestCase: self.login_err(self.__user_list[0], self.__passwd_list[0]) self.login_currrent(self.__user_list[0], f"new{self.__passwd_list[0]}") + tdDnodes.stop(1) + tdDnodes.start(1) + + tdSql.query("show users") + tdSql.checkRows(self.users_count + 1) + # 普通用户权限 # 密码登录 - _, user = self.user_login(self.__user_list[0], f"new{self.__passwd_list[0]}") - with taos_connect(user=self.__user_list[0], passwd=f"new{self.__passwd_list[0]}") as conn: - user = conn - # 不能创建用户 - tdLog.printNoPrefix("==========step5: normal user can not create user") - user.error("create use utest1 pass 'utest1pass'") - # 可以查看用户 - tdLog.printNoPrefix("==========step6: normal user can show user") - user.query("show users") - assert user.queryRows == self.users_count + 2 - # 不可以修改其他用户的密码 - tdLog.printNoPrefix("==========step7: normal user can not alter other user pass") - user.error(self.__alter_pass_sql(self.__user_list[1], self.__passwd_list[1] )) - user.error("root", "taosdata_root") - # 可以修改自己的密码 - tdLog.printNoPrefix("==========step8: normal user can alter owner pass") - user.query(self.__alter_pass_sql(self.__user_list[0], self.__passwd_list[0])) - # 不可以删除用户,包括自己 - tdLog.printNoPrefix("==========step9: normal user can not drop any user ") - user.error(f"drop user {self.__user_list[0]}") - user.error(f"drop user {self.__user_list[1]}") - user.error("drop user root") + # _, user = self.user_login(self.__user_list[0], f"new{self.__passwd_list[0]}") + with taos_connect(user=self.__user_list[0], passwd=f"new{self.__passwd_list[0]}") as user: + # user = conn + # 不能创建用户 + tdLog.printNoPrefix("==========step5: normal user can not create user") + user.error("create use utest1 pass 'utest1pass'") + # 可以查看用户 + tdLog.printNoPrefix("==========step6: normal user can show user") + user.query("show users") + assert user.queryRows == self.users_count + 1 + # 不可以修改其他用户的密码 + tdLog.printNoPrefix("==========step7: normal user can not alter other user pass") + user.error(self.__alter_pass_sql(self.__user_list[1], self.__passwd_list[1] )) + user.error(self.__alter_pass_sql("root", "taosdata_root" )) + # 可以修改自己的密码 + tdLog.printNoPrefix("==========step8: normal user can alter owner pass") + user.query(self.__alter_pass_sql(self.__user_list[0], self.__passwd_list[0])) + # 不可以删除用户,包括自己 + tdLog.printNoPrefix("==========step9: normal user can not drop any user ") + user.error(f"drop user {self.__user_list[0]}") + user.error(f"drop user {self.__user_list[1]}") + user.error("drop user root") # root删除用户测试 tdLog.printNoPrefix("==========step10: super user drop normal user") @@ -316,6 +325,20 @@ class TDTestCase: tdSql.checkData(0, 0, "root") tdSql.checkData(0, 1, "super") + tdDnodes.stop(1) + tdDnodes.start(1) + + # 删除后无法登录 + self.login_err(self.__user_list[0], self.__passwd_list[0]) + self.login_err(self.__user_list[0], f"new{self.__passwd_list[0]}") + self.login_err(self.__user_list[1], self.__passwd_list[1]) + self.login_err(self.__user_list[1], f"new{self.__passwd_list[1]}") + + tdSql.query("show users") + tdSql.checkRows(1) + tdSql.checkData(0, 0, "root") + tdSql.checkData(0, 1, "super") + def stop(self): tdSql.close() diff --git a/tests/system-test/2-query/timezone.py b/tests/system-test/2-query/timezone.py index 1f3dac90c6a3e45669510b3a5f208df318e39be7..ff55ab31bff3ef0c3bd315e9cf190fc517bd7c78 100644 --- a/tests/system-test/2-query/timezone.py +++ b/tests/system-test/2-query/timezone.py @@ -15,8 +15,16 @@ class TDTestCase: def run(self): # sourcery skip: extract-duplicate-method tdSql.prepare() # get system timezone - time_zone = os.popen('timedatectl | grep zone').read( - ).strip().split(':')[1].lstrip() + time_zone_arr = os.popen('timedatectl | grep zone').read( + ).strip().split(':') + if len(time_zone_arr) > 1: + time_zone = time_zone_arr[1].lstrip() + else: + # possibly in a docker container + time_zone_1 = os.popen('ls -l /etc/localtime|awk -F/ \'{print $(NF-1) "/" $NF}\'').read().strip() + time_zone_2 = os.popen('date "+(%Z, %z)"').read().strip() + time_zone = time_zone_1 + " " + time_zone_2 + print("expected time zone: " + time_zone) tdLog.printNoPrefix("==========step1:create tables==========") tdSql.execute( diff --git a/tests/system-test/99-TDcase/TD-15554.py b/tests/system-test/99-TDcase/TD-15554.py new file mode 100644 index 0000000000000000000000000000000000000000..890580ca2ce6a7e6a8e5e0fa0f2950f0002bf340 --- /dev/null +++ b/tests/system-test/99-TDcase/TD-15554.py @@ -0,0 +1,489 @@ + +import taos +import sys +import time +import socket +import os +import threading + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * + +class TDTestCase: + hostname = socket.gethostname() + + clientCfgDict = {'qdebugflag':'143'} + updatecfgDict = {'clientCfg': {}, 'qdebugflag':'143'} + updatecfgDict["clientCfg"] = clientCfgDict + print ("===================: ", updatecfgDict) + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + #tdSql.init(conn.cursor()) + tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def newcur(self,cfg,host,port): + user = "root" + password = "taosdata" + con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port) + cur=con.cursor() + print(cur) + return cur + + def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl): + tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) + tsql.execute("use %s" %dbName) + tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) + pre_create = "create table" + sql = pre_create + #tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname)) + for i in range(ctbNum): + sql += " %s_%d using %s tags(%d)"%(stbName,i,stbName,i+1) + if (i > 0) and (i%100 == 0): + tsql.execute(sql) + sql = pre_create + if sql != pre_create: + tsql.execute(sql) + + tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum)) + return + + def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs): + tdLog.debug("start to insert data ............") + tsql.execute("use %s" %dbName) + pre_insert = "insert into " + sql = pre_insert + + #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) + for i in range(ctbNum): + sql += " %s_%d values "%(stbName,i) + for j in range(rowsPerTbl): + sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j) + if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)): + tsql.execute(sql) + if j < rowsPerTbl - 1: + sql = "insert into %s_%d values " %(stbName,i) + else: + sql = "insert into " + #end sql + if sql != pre_insert: + #print("insert sql:%s"%sql) + tsql.execute(sql) + tdLog.debug("insert data ............ [OK]") + return + + def prepareEnv(self, **parameterDict): + print ("input parameters:") + print (parameterDict) + # create new connector for my thread + tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030) + self.create_tables(tsql,\ + parameterDict["dbName"],\ + parameterDict["vgroups"],\ + parameterDict["stbName"],\ + parameterDict["ctbNum"],\ + parameterDict["rowsPerTbl"]) + + self.insert_data(tsql,\ + parameterDict["dbName"],\ + parameterDict["stbName"],\ + parameterDict["ctbNum"],\ + parameterDict["rowsPerTbl"],\ + parameterDict["batchNum"],\ + parameterDict["startTs"]) + return + + + def tmqCase1(self, cfgPath, buildPath): + tdLog.printNoPrefix("======== test case 1: Produce while consume") + tdLog.info("step 1: create database, stb, ctb and insert data") + # create and start thread + parameterDict = {'cfg': '', \ + 'dbName': 'db', \ + 'vgroups': 1, \ + 'stbName': 'stb', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 1000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) + prepareEnvThread.start() + time.sleep(2) + + # wait stb ready + while 1: + tdSql.query("show %s.stables"%parameterDict['dbName']) + if tdSql.getRows() == 1: + break + else: + time.sleep(1) + + tdLog.info("create topics from super table") + topicFromStb = 'topic_stb_column' + topicFromCtb = 'topic_ctb_column' + + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName'])) + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName'])) + + time.sleep(1) + tdSql.query("show topics") + #tdSql.checkRows(2) + topic1 = tdSql.getData(0 , 0) + topic2 = tdSql.getData(1 , 0) + + tdLog.info("show topics: %s, %s"%(topic1, topic2)) + if topic1 != topicFromStb and topic1 != topicFromCtb: + tdLog.exit("topic error1") + if topic2 != topicFromStb and topic2 != topicFromCtb: + tdLog.exit("topic error2") + + tdLog.info("create consume info table and consume result table") + cdbName = parameterDict["dbName"] + tdSql.query("create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)") + tdSql.query("create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)") + + consumerId = 0 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + topicList = topicFromStb + ifcheckdata = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + sql = "insert into consumeinfo values " + sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) + tdSql.query(sql) + + tdLog.info("check stb if there are data") + while 1: + tdSql.query("select count(*) from %s"%parameterDict["stbName"]) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + countOfStb = tdSql.getData(0, 0) + if countOfStb != 0: + tdLog.info("count from stb: %d"%countOfStb) + break + else: + time.sleep(1) + + tdLog.info("start consume processor") + pollDelay = 5 + showMsg = 1 + showRow = 1 + + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" + tdLog.info(shellCmd) + os.system(shellCmd) + + # wait for data ready + prepareEnvThread.join() + + tdLog.info("insert process end, and start to check consume result") + while 1: + tdSql.query("select * from consumeresult") + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == 1: + break + else: + time.sleep(5) + + tdLog.info("consumer result: %d, %d"%(tdSql.getData(0 , 2), tdSql.getData(0 , 3))) + tdSql.checkData(0 , 1, consumerId) + # mulit rows and mulit tables in one sql, this num of msg is not sure + #tdSql.checkData(0 , 2, expectmsgcnt) + tdSql.checkData(0 , 3, expectrowcnt) + + tdSql.query("drop topic %s"%topicFromStb) + tdSql.query("drop topic %s"%topicFromCtb) + + tdLog.printNoPrefix("======== test case 1 end ...... ") + + def tmqCase2(self, cfgPath, buildPath): + tdLog.printNoPrefix("======== test case 2: add child table with consuming ") + # create and start thread + parameterDict = {'cfg': '', \ + 'dbName': 'db2', \ + 'vgroups': 1, \ + 'stbName': 'stb', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 10000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) + prepareEnvThread.start() + + # wait db ready + while 1: + tdSql.query("show databases") + if tdSql.getRows() == 4: + print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),) + break + else: + time.sleep(1) + + tdSql.query("use %s"%parameterDict['dbName']) + # wait stb ready + while 1: + tdSql.query("show %s.stables"%parameterDict['dbName']) + if tdSql.getRows() == 1: + break + else: + time.sleep(1) + + tdLog.info("create topics from super table") + topicFromStb = 'topic_stb_column2' + topicFromCtb = 'topic_ctb_column2' + + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName'])) + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName'])) + + time.sleep(1) + tdSql.query("show topics") + topic1 = tdSql.getData(0 , 0) + topic2 = tdSql.getData(1 , 0) + tdLog.info("show topics: %s, %s"%(topic1, topic2)) + if topic1 != topicFromStb and topic1 != topicFromCtb: + tdLog.exit("topic error1") + if topic2 != topicFromStb and topic2 != topicFromCtb: + tdLog.exit("topic error2") + + tdLog.info("create consume info table and consume result table") + cdbName = parameterDict["dbName"] + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) + tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) + + rowsOfNewCtb = 1000 + consumerId = 0 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + rowsOfNewCtb + topicList = topicFromStb + ifcheckdata = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + sql = "insert into consumeinfo values " + sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) + tdSql.query(sql) + + tdLog.info("check stb if there are data") + while 1: + tdSql.query("select count(*) from %s"%parameterDict["stbName"]) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + countOfStb = tdSql.getData(0, 0) + if countOfStb != 0: + tdLog.info("count from stb: %d"%countOfStb) + break + else: + time.sleep(1) + + tdLog.info("start consume processor") + pollDelay = 5 + showMsg = 1 + showRow = 1 + + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" + tdLog.info(shellCmd) + os.system(shellCmd) + + # create new child table and insert data + newCtbName = 'newctb' + tdSql.query("create table %s.%s using %s.%s tags(9999)"%(parameterDict["dbName"], newCtbName, parameterDict["dbName"], parameterDict["stbName"])) + startTs = parameterDict["startTs"] + for j in range(rowsOfNewCtb): + sql = "insert into %s.%s values (%d, %d, 'tmqrow_%d') "%(parameterDict["dbName"], newCtbName, startTs + j, j, j) + tdSql.execute(sql) + tdLog.debug("insert data into new child table ............ [OK]") + + # wait for data ready + prepareEnvThread.join() + + tdLog.info("insert process end, and start to check consume result") + while 1: + tdSql.query("select * from consumeresult") + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == 1: + break + else: + time.sleep(5) + + tdSql.checkData(0 , 1, consumerId) + tdSql.checkData(0 , 3, expectrowcnt) + + tdSql.query("drop topic %s"%topicFromStb) + tdSql.query("drop topic %s"%topicFromCtb) + + tdLog.printNoPrefix("======== test case 2 end ...... ") + + def tmqCase3(self, cfgPath, buildPath): + tdLog.printNoPrefix("======== test case 3: tow topics, each contains a stable, \ + but at the beginning, no ctables in the stable of one topic,\ + after starting consumer, create ctables ") + # create and start thread + parameterDict = {'cfg': '', \ + 'dbName': 'db3', \ + 'vgroups': 1, \ + 'stbName': 'stb', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 30000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) + prepareEnvThread.start() + + # wait db ready + while 1: + tdSql.query("show databases") + if tdSql.getRows() == 4: + print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),) + break + else: + time.sleep(1) + + tdSql.query("use %s"%parameterDict['dbName']) + # wait stb ready + while 1: + tdSql.query("show %s.stables"%parameterDict['dbName']) + if tdSql.getRows() == 1: + break + else: + time.sleep(1) + + tdLog.info("create stable2 for the seconde topic") + parameterDict2 = {'cfg': '', \ + 'dbName': 'db3', \ + 'vgroups': 1, \ + 'stbName': 'stb2', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 30000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict2['cfg'] = cfgPath + tdSql.execute("create stable if not exists %s.%s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%(parameterDict2['dbName'], parameterDict2['stbName'])) + + tdLog.info("create topics from super table") + topicFromStb = 'topic_stb_column3' + topicFromStb2 = 'topic_stb_column32' + + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName'])) + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb2, parameterDict2['dbName'], parameterDict2['stbName'])) + + tdSql.query("show topics") + topic1 = tdSql.getData(0 , 0) + topic2 = tdSql.getData(1 , 0) + tdLog.info("show topics: %s, %s"%(topic1, topic2)) + if topic1 != topicFromStb and topic1 != topicFromStb2: + tdLog.exit("topic error1") + if topic2 != topicFromStb and topic2 != topicFromStb2: + tdLog.exit("topic error2") + + tdLog.info("create consume info table and consume result table") + cdbName = parameterDict["dbName"] + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) + tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) + + consumerId = 0 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"] + topicList = topicFromStb + ',' + topicFromStb2 + ifcheckdata = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + sql = "insert into consumeinfo values " + sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) + tdSql.query(sql) + + tdLog.info("check stb if there are data") + while 1: + tdSql.query("select count(*) from %s"%parameterDict["stbName"]) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + countOfStb = tdSql.getData(0, 0) + if countOfStb != 0: + tdLog.info("count from stb: %d"%countOfStb) + break + else: + time.sleep(1) + + tdLog.info("start consume processor") + pollDelay = 5 + showMsg = 1 + showRow = 1 + + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" + tdLog.info(shellCmd) + os.system(shellCmd) + + # start the second thread to create new child table and insert data + prepareEnvThread2 = threading.Thread(target=self.prepareEnv, kwargs=parameterDict2) + prepareEnvThread2.start() + + # wait for data ready + prepareEnvThread.join() + prepareEnvThread2.join() + + tdLog.info("insert process end, and start to check consume result") + while 1: + tdSql.query("select * from consumeresult") + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == 1: + break + else: + time.sleep(5) + + tdSql.checkData(0 , 1, consumerId) + tdSql.checkData(0 , 3, expectrowcnt) + + tdSql.query("drop topic %s"%topicFromStb) + tdSql.query("drop topic %s"%topicFromStb2) + + tdLog.printNoPrefix("======== test case 3 end ...... ") + + def run(self): + tdSql.prepare() + + buildPath = self.getBuildPath() + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + cfgPath = buildPath + "/../sim/psim/cfg" + tdLog.info("cfgPath: %s" % cfgPath) + + #self.tmqCase1(cfgPath, buildPath) + #self.tmqCase2(cfgPath, buildPath) + self.tmqCase3(cfgPath, buildPath) + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/99-TDcase/TD-15557.py b/tests/system-test/99-TDcase/TD-15557.py new file mode 100644 index 0000000000000000000000000000000000000000..e005985fe02246ce502f9414321c1448869afab3 --- /dev/null +++ b/tests/system-test/99-TDcase/TD-15557.py @@ -0,0 +1,405 @@ + +import taos +import sys +import time +import socket +import os +import threading + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * + +class TDTestCase: + hostname = socket.gethostname() + #rpcDebugFlagVal = '143' + #clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} + #clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal + #updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} + #updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal + #print ("===================: ", updatecfgDict) + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + #tdSql.init(conn.cursor()) + tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def newcur(self,cfg,host,port): + user = "root" + password = "taosdata" + con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port) + cur=con.cursor() + print(cur) + return cur + + def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg,showRow,cdbName,valgrind): + shellCmd = 'nohup ' + if valgrind == 1: + logFile = cfgPath + '/../log/valgrind-tmq.log' + shellCmd = 'nohup valgrind --log-file=' + logFile + shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' + + shellCmd += buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" + tdLog.info(shellCmd) + os.system(shellCmd) + + def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl): + tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) + tsql.execute("use %s" %dbName) + tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) + pre_create = "create table" + sql = pre_create + #tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname)) + for i in range(ctbNum): + sql += " %s_%d using %s tags(%d)"%(stbName,i,stbName,i+1) + if (i > 0) and (i%100 == 0): + tsql.execute(sql) + sql = pre_create + if sql != pre_create: + tsql.execute(sql) + + event.set() + tdLog.debug("complete to create database[%s], stable[%s] and %d child tables" %(dbName, stbName, ctbNum)) + return + + def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs): + tdLog.debug("start to insert data ............") + tsql.execute("use %s" %dbName) + pre_insert = "insert into " + sql = pre_insert + + #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) + for i in range(ctbNum): + sql += " %s_%d values "%(stbName,i) + for j in range(rowsPerTbl): + sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j) + if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)): + tsql.execute(sql) + if j < rowsPerTbl - 1: + sql = "insert into %s_%d values " %(stbName,i) + else: + sql = "insert into " + #end sql + if sql != pre_insert: + #print("insert sql:%s"%sql) + tsql.execute(sql) + tdLog.debug("insert data ............ [OK]") + return + + def prepareEnv(self, **parameterDict): + print ("input parameters:") + print (parameterDict) + # create new connector for my thread + tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030) + self.create_tables(tsql,\ + parameterDict["dbName"],\ + parameterDict["vgroups"],\ + parameterDict["stbName"],\ + parameterDict["ctbNum"],\ + parameterDict["rowsPerTbl"]) + + self.insert_data(tsql,\ + parameterDict["dbName"],\ + parameterDict["stbName"],\ + parameterDict["ctbNum"],\ + parameterDict["rowsPerTbl"],\ + parameterDict["batchNum"],\ + parameterDict["startTs"]) + return + + def tmqCase1(self, cfgPath, buildPath): + tdLog.printNoPrefix("======== test case 1: Produce while one consume to subscribe one db") + tdLog.info("step 1: create database, stb, ctb and insert data") + # create and start thread + parameterDict = {'cfg': '', \ + 'dbName': 'db1', \ + 'vgroups': 4, \ + 'stbName': 'stb', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 10000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups'])) + + prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) + prepareEnvThread.start() + + tdLog.info("create topics from db") + topicName1 = 'topic_db1' + + tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) + + tdLog.info("create consume info table and consume result table") + cdbName = parameterDict["dbName"] + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) + tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) + + consumerId = 0 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + topicList = topicName1 + ifcheckdata = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + sql = "insert into %s.consumeinfo values "%cdbName + sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) + tdSql.query(sql) + + event.wait() + + tdLog.info("start consume processor") + pollDelay = 5 + showMsg = 1 + showRow = 1 + + valgrind = 1 + self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName,valgrind) + + # wait for data ready + prepareEnvThread.join() + + tdLog.info("insert process end, and start to check consume result") + while 1: + tdSql.query("select * from %s.consumeresult"%cdbName) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == 1: + break + else: + time.sleep(5) + + tdLog.info("consumer result: %d, %d"%(tdSql.getData(0 , 2), tdSql.getData(0 , 3))) + tdSql.checkData(0 , 1, consumerId) + # mulit rows and mulit tables in one sql, this num of msg is not sure + #tdSql.checkData(0 , 2, expectmsgcnt) + tdSql.checkData(0 , 3, expectrowcnt) + + tdSql.query("drop topic %s"%topicName1) + + tdLog.printNoPrefix("======== test case 1 end ...... ") + + def tmqCase2(self, cfgPath, buildPath): + tdLog.printNoPrefix("======== test case 2: Produce while two consumers to subscribe one db") + tdLog.info("step 1: create database, stb, ctb and insert data") + # create and start thread + parameterDict = {'cfg': '', \ + 'dbName': 'db2', \ + 'vgroups': 4, \ + 'stbName': 'stb', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 100000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups'])) + + prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) + prepareEnvThread.start() + + tdLog.info("create topics from db") + topicName1 = 'topic_db1' + + tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) + + tdLog.info("create consume info table and consume result table") + cdbName = parameterDict["dbName"] + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) + tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) + + consumerId = 0 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + topicList = topicName1 + ifcheckdata = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + sql = "insert into %s.consumeinfo values "%cdbName + sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) + tdSql.query(sql) + + consumerId = 1 + sql = "insert into %s.consumeinfo values "%cdbName + sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) + tdSql.query(sql) + + event.wait() + + tdLog.info("start consume processor") + pollDelay = 5 + showMsg = 1 + showRow = 1 + + valgrind = 1 + self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName,valgrind) + + # wait for data ready + prepareEnvThread.join() + + tdLog.info("insert process end, and start to check consume result") + while 1: + tdSql.query("select * from %s.consumeresult"%cdbName) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == 2: + break + else: + time.sleep(5) + + consumerId0 = tdSql.getData(0 , 1) + consumerId1 = tdSql.getData(1 , 1) + actConsumeRows0 = tdSql.getData(0 , 3) + actConsumeRows1 = tdSql.getData(1 , 3) + + tdLog.info("consumer %d rows: %d"%(consumerId0, actConsumeRows0)) + tdLog.info("consumer %d rows: %d"%(consumerId1, actConsumeRows1)) + + totalConsumeRows = actConsumeRows0 + actConsumeRows1 + if totalConsumeRows != expectrowcnt: + tdLog.exit("tmq consume rows error!") + + tdSql.query("drop topic %s"%topicName1) + + tdLog.printNoPrefix("======== test case 2 end ...... ") + + def tmqCase3(self, cfgPath, buildPath): + tdLog.printNoPrefix("======== test case 3: Produce while one consumers to subscribe one db, include 2 stb") + tdLog.info("step 1: create database, stb, ctb and insert data") + # create and start thread + parameterDict = {'cfg': '', \ + 'dbName': 'db3', \ + 'vgroups': 4, \ + 'stbName': 'stb', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 100000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups'])) + + prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) + prepareEnvThread.start() + + parameterDict2 = {'cfg': '', \ + 'dbName': 'db3', \ + 'vgroups': 4, \ + 'stbName': 'stb2', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 100000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + prepareEnvThread2 = threading.Thread(target=self.prepareEnv, kwargs=parameterDict2) + prepareEnvThread2.start() + + tdLog.info("create topics from db") + topicName1 = 'topic_db1' + + tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName'])) + + tdLog.info("create consume info table and consume result table") + cdbName = parameterDict["dbName"] + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) + tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) + + consumerId = 0 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"] + topicList = topicName1 + ifcheckdata = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + sql = "insert into %s.consumeinfo values "%cdbName + sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) + tdSql.query(sql) + + # consumerId = 1 + # sql = "insert into %s.consumeinfo values "%cdbName + # sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) + # tdSql.query(sql) + + event.wait() + + tdLog.info("start consume processor") + pollDelay = 5 + showMsg = 1 + showRow = 1 + valgrind = 1 + self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow, cdbName,valgrind) + + # wait for data ready + prepareEnvThread.join() + prepareEnvThread2.join() + + tdLog.info("insert process end, and start to check consume result") + while 1: + tdSql.query("select * from %s.consumeresult"%cdbName) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == 1: + break + else: + time.sleep(5) + + consumerId0 = tdSql.getData(0 , 1) + #consumerId1 = tdSql.getData(1 , 1) + actConsumeRows0 = tdSql.getData(0 , 3) + #actConsumeRows1 = tdSql.getData(1 , 3) + + tdLog.info("consumer %d rows: %d"%(consumerId0, actConsumeRows0)) + #tdLog.info("consumer %d rows: %d"%(consumerId1, actConsumeRows1)) + + #totalConsumeRows = actConsumeRows0 + actConsumeRows1 + if actConsumeRows0 != expectrowcnt: + tdLog.exit("tmq consume rows error!") + + tdSql.query("drop topic %s"%topicName1) + + tdLog.printNoPrefix("======== test case 3 end ...... ") + + def run(self): + tdSql.prepare() + + buildPath = self.getBuildPath() + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + cfgPath = buildPath + "/../sim/psim/cfg" + tdLog.info("cfgPath: %s" % cfgPath) + + self.tmqCase1(cfgPath, buildPath) + #self.tmqCase2(cfgPath, buildPath) + #self.tmqCase3(cfgPath, buildPath) + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 817f81487317f02f1df7baabeed80cc24094bb33..c80206abbcb13b0420da9e7d3dcb4f65471b9ab4 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -8,6 +8,8 @@ python3 ./test.py -f 0-others/taosShellNetChk.py python3 ./test.py -f 0-others/telemetry.py python3 ./test.py -f 0-others/taosdMonitor.py +python3 ./test.py -f 0-others/user_control.py + #python3 ./test.py -f 2-query/between.py python3 ./test.py -f 2-query/distinct.py python3 ./test.py -f 2-query/varchar.py @@ -53,5 +55,3 @@ python3 ./test.py -f 2-query/arctan.py # python3 ./test.py -f 2-query/query_cols_tags_and_or.py python3 ./test.py -f 7-tmq/basic5.py - - diff --git a/tests/unit-test/test.sh b/tests/unit-test/test.sh new file mode 100755 index 0000000000000000000000000000000000000000..41225977175b7467b9c0ce49cbd22f0139929aba --- /dev/null +++ b/tests/unit-test/test.sh @@ -0,0 +1,40 @@ +#!/bin/bash + +function usage() { + echo "$0" + echo -e "\t -e enterprise edition" + echo -e "\t -h help" +} + +ent=0 +while getopts "eh" opt; do + case $opt in + e) + ent=1 + ;; + h) + usage + exit 0 + ;; + \?) + echo "Invalid option: -$OPTARG" + usage + exit 0 + ;; + esac +done + +script_dir=`dirname $0` +cd ${script_dir} +PWD=`pwd` + +if [ $ent -eq 0 ]; then + cd ../../debug +else + cd ../../../debug +fi + +ctest -j8 +ret=$? +exit $ret + diff --git a/tools/shell/src/shellArguments.c b/tools/shell/src/shellArguments.c index 13f8cde3e3bc43635f92d2c52978a35fef519bbb..1639fd1ca681ab0bb980b2fd1fca8b34d58e15f3 100644 --- a/tools/shell/src/shellArguments.c +++ b/tools/shell/src/shellArguments.c @@ -332,7 +332,7 @@ int32_t shellParseArgs(int32_t argc, char *argv[]) { shellInitArgs(argc, argv); shell.info.clientVersion = "Welcome to the TDengine shell from %s, Client Version:%s\n" - "Copyright (c) 2020 by TAOS Data, Inc. All rights reserved.\n\n"; + "Copyright (c) 2022 by TAOS Data, Inc. All rights reserved.\n\n"; shell.info.promptHeader = "taos> "; shell.info.promptContinue = " -> "; shell.info.promptSize = 6; diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 21fd3d03597e66ee2f8a2d1c13dee74d698bd605..8b126cf90ee32f68299dc95ad2444803995f83c1 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -29,11 +29,11 @@ static void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD static int32_t shellDumpResultToFile(const char *fname, TAOS_RES *tres); static void shellPrintNChar(const char *str, int32_t length, int32_t width); static void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t length, int32_t precision); -static int32_t shellVerticalPrintResult(TAOS_RES *tres); +static int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql); static int32_t shellCalcColWidth(TAOS_FIELD *field, int32_t precision); static void shellPrintHeader(TAOS_FIELD *fields, int32_t *width, int32_t num_fields); -static int32_t shellHorizontalPrintResult(TAOS_RES *tres); -static int32_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical); +static int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql); +static int32_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical, const char *sql); static void shellReadHistory(); static void shellWriteHistory(); static void shellPrintError(TAOS_RES *tres, int64_t st); @@ -121,7 +121,7 @@ int32_t shellRunCommand(char *command) { char quote = 0, *cmd = command; for (char c = *command++; c != 0; c = *command++) { if (c == '\\' && (*command == '\'' || *command == '"' || *command == '`')) { - command ++; + command++; continue; } @@ -190,7 +190,7 @@ void shellRunSingleCommandImp(char *command) { if (pFields != NULL) { // select and show kinds of commands int32_t error_no = 0; - int32_t numOfRows = shellDumpResult(pSql, fname, &error_no, printMode); + int32_t numOfRows = shellDumpResult(pSql, fname, &error_no, printMode, command); if (numOfRows < 0) return; et = taosGetTimestampUs(); @@ -272,6 +272,7 @@ void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, i return; } + int n; char buf[TSDB_MAX_BYTES_PER_ROW]; switch (field->type) { case TSDB_DATA_TYPE_BOOL: @@ -280,20 +281,37 @@ void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, i case TSDB_DATA_TYPE_TINYINT: taosFprintfFile(pFile, "%d", *((int8_t *)val)); break; + case TSDB_DATA_TYPE_UTINYINT: + taosFprintfFile(pFile, "%u", *((uint8_t *)val)); + break; case TSDB_DATA_TYPE_SMALLINT: taosFprintfFile(pFile, "%d", *((int16_t *)val)); break; + case TSDB_DATA_TYPE_USMALLINT: + taosFprintfFile(pFile, "%u", *((uint16_t *)val)); + break; case TSDB_DATA_TYPE_INT: taosFprintfFile(pFile, "%d", *((int32_t *)val)); break; + case TSDB_DATA_TYPE_UINT: + taosFprintfFile(pFile, "%u", *((uint32_t *)val)); + break; case TSDB_DATA_TYPE_BIGINT: taosFprintfFile(pFile, "%" PRId64, *((int64_t *)val)); break; + case TSDB_DATA_TYPE_UBIGINT: + taosFprintfFile(pFile, "%" PRIu64, *((uint64_t *)val)); + break; case TSDB_DATA_TYPE_FLOAT: taosFprintfFile(pFile, "%.5f", GET_FLOAT_VAL(val)); break; case TSDB_DATA_TYPE_DOUBLE: - taosFprintfFile(pFile, "%.9f", GET_DOUBLE_VAL(val)); + n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", length, GET_DOUBLE_VAL(val)); + if (n > MAX(25, length)) { + taosFprintfFile(pFile, "%*.15e", length, GET_DOUBLE_VAL(val)); + } else { + taosFprintfFile(pFile, "%s", buf); + } break; case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: @@ -435,6 +453,7 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t return; } + int n; char buf[TSDB_MAX_BYTES_PER_ROW]; switch (field->type) { case TSDB_DATA_TYPE_BOOL: @@ -468,7 +487,12 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t printf("%*.5f", width, GET_FLOAT_VAL(val)); break; case TSDB_DATA_TYPE_DOUBLE: - printf("%*.9f", width, GET_DOUBLE_VAL(val)); + n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", width, GET_DOUBLE_VAL(val)); + if (n > MAX(25, width)) { + printf("%*.15e", width, GET_DOUBLE_VAL(val)); + } else { + printf("%s", buf); + } break; case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: @@ -483,7 +507,16 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t } } -int32_t shellVerticalPrintResult(TAOS_RES *tres) { +bool shellIsLimitQuery(const char *sql) { + //todo refactor + if (strcasestr(sql, " limit ") != NULL) { + return true; + } + + return false; +} + +int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql) { TAOS_ROW row = taos_fetch_row(tres); if (row == NULL) { return 0; @@ -503,7 +536,7 @@ int32_t shellVerticalPrintResult(TAOS_RES *tres) { uint64_t resShowMaxNum = UINT64_MAX; - if (shell.args.commands == NULL && shell.args.file[0] == 0) { + if (shell.args.commands == NULL && shell.args.file[0] == 0 && !shellIsLimitQuery(sql)) { resShowMaxNum = SHELL_DEFAULT_RES_SHOW_NUM; } @@ -525,8 +558,13 @@ int32_t shellVerticalPrintResult(TAOS_RES *tres) { putchar('\n'); } } else if (showMore) { - printf("[100 Rows showed, and more rows are fetching but will not be showed. You can ctrl+c to stop or wait.]\n"); - printf("[You can add limit statement to get more or redirect results to specific file to get all.]\n"); + printf("\n"); + printf(" Notice: The result shows only the first %d rows.\n", SHELL_DEFAULT_RES_SHOW_NUM); + printf(" You can use the `LIMIT` clause to get fewer result to show.\n"); + printf(" Or use '>>' to redirect the whole set of the result to a specified file.\n"); + printf("\n"); + printf(" You can use Ctrl+C to stop the underway fetching.\n"); + printf("\n"); showMore = 0; } @@ -618,7 +656,7 @@ void shellPrintHeader(TAOS_FIELD *fields, int32_t *width, int32_t num_fields) { putchar('\n'); } -int32_t shellHorizontalPrintResult(TAOS_RES *tres) { +int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql) { TAOS_ROW row = taos_fetch_row(tres); if (row == NULL) { return 0; @@ -637,7 +675,7 @@ int32_t shellHorizontalPrintResult(TAOS_RES *tres) { uint64_t resShowMaxNum = UINT64_MAX; - if (shell.args.commands == NULL && shell.args.file[0] == 0) { + if (shell.args.commands == NULL && shell.args.file[0] == 0 && !shellIsLimitQuery(sql)) { resShowMaxNum = SHELL_DEFAULT_RES_SHOW_NUM; } @@ -655,8 +693,13 @@ int32_t shellHorizontalPrintResult(TAOS_RES *tres) { } putchar('\n'); } else if (showMore) { - printf("[100 Rows showed, and more rows are fetching but will not be showed. You can ctrl+c to stop or wait.]\n"); - printf("[You can add limit statement to show more or redirect results to specific file to get all.]\n"); + printf("\n"); + printf(" Notice: The result shows only the first %d rows.\n", SHELL_DEFAULT_RES_SHOW_NUM); + printf(" You can use the `LIMIT` clause to get fewer result to show.\n"); + printf(" Or use '>>' to redirect the whole set of the result to a specified file.\n"); + printf("\n"); + printf(" You can use Ctrl+C to stop the underway fetching.\n"); + printf("\n"); showMore = 0; } @@ -667,14 +710,14 @@ int32_t shellHorizontalPrintResult(TAOS_RES *tres) { return numOfRows; } -int32_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical) { +int32_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical, const char *sql) { int32_t numOfRows = 0; if (fname != NULL) { numOfRows = shellDumpResultToFile(fname, tres); } else if (vertical) { - numOfRows = shellVerticalPrintResult(tres); + numOfRows = shellVerticalPrintResult(tres, sql); } else { - numOfRows = shellHorizontalPrintResult(tres); + numOfRows = shellHorizontalPrintResult(tres, sql); } *error_no = taos_errno(tres);