From d58310d9d73b404a9138b934cb9e2f9b9e8507f5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 30 Jul 2020 15:46:50 +0800 Subject: [PATCH] [td-225] fix bugs in join/union query. --- src/client/src/tscAsync.c | 12 +- src/client/src/tscLocal.c | 4 +- src/client/src/tscServer.c | 58 ++++------ src/client/src/tscSubquery.c | 2 +- src/client/src/tscUtil.c | 4 + src/query/src/qExecutor.c | 2 +- tests/script/general/parser/join.sim | 24 ++++ tests/script/general/parser/union.sim | 154 +++++++++++++------------- 8 files changed, 138 insertions(+), 122 deletions(-) diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 85cff4ba17..59d906e8fd 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -220,14 +220,13 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) { tscFetchDatablockFromSubquery(pSql); } else if (pRes->completed) { - if(pCmd->command == TSDB_SQL_FETCH) { + if(pCmd->command == TSDB_SQL_FETCH || (pCmd->command >= TSDB_SQL_SERV_STATUS && pCmd->command <= TSDB_SQL_CURRENT_USER)) { if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes. tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode); - return; } else { /* - * all available virtual node has been checked already, now we need to check - * for the next subclause queries + * all available virtual nodes in current clause has been checked already, now try the + * next one in the following union subclause */ if (pCmd->clauseIndex < pCmd->numOfClause - 1) { tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode); @@ -235,11 +234,12 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi } /* - * 1. has reach the limitation - * 2. no remain virtual nodes to be retrieved anymore + * 1. has reach the limitation + * 2. no remain virtual nodes to be retrieved anymore */ (*pSql->fetchFp)(param, pSql, 0); } + return; } else if (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE) { // in case of show command, return no data diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index 7f336daa91..6822851d84 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -293,7 +293,7 @@ static void tscProcessCurrentDB(SSqlObj *pSql) { char db[TSDB_DB_NAME_LEN] = {0}; extractDBName(pSql->pTscObj->db, db); - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); pExpr->resType = TSDB_DATA_TYPE_BINARY; @@ -314,7 +314,7 @@ static void tscProcessCurrentDB(SSqlObj *pSql) { static void tscProcessServerVer(SSqlObj *pSql) { const char* v = pSql->pTscObj->sversion; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); pExpr->resType = TSDB_DATA_TYPE_BINARY; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 690a5f790a..6b9fc0551e 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -46,19 +46,27 @@ void tscSaveSubscriptionProgress(void* sub); static int32_t minMsgSize() { return tsRpcHeadSize + 100; } static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) { + assert(pSql != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0); + SRpcEpSet* pEpSet = &pSql->epSet; - pEpSet->inUse = 0; - if (pVgroupInfo == NULL) { - pEpSet->numOfEps = 0; - return; - } + pEpSet->inUse = 0; + + // apply the FQDN string length check here + bool hasFqdn = false; pEpSet->numOfEps = pVgroupInfo->numOfEps; for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) { strcpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn); pEpSet->port[i] = pVgroupInfo->epAddr[i].port; + + if (!hasFqdn) { + hasFqdn = (strlen(pEpSet->fqdn[i]) > 0); + } } + + assert(hasFqdn); } + static void tscDumpMgmtEpSet(SRpcEpSet *epSet) { taosCorBeginRead(&tscMgmtEpSet.version); *epSet = tscMgmtEpSet.epSet; @@ -128,21 +136,6 @@ void tscPrintMgmtEp() { } } -/* - * For each management node, try twice at least in case of poor network situation. - * If the client start to connect to a non-management node from the client, and the first retry may fail due to - * the poor network quality. And then, the second retry get the response with redirection command. - * The retry will not be executed since only *two* retry is allowed in case of single management node in the cluster. - * Therefore, we need to multiply the retry times by factor of 2 to fix this problem. - */ -UNUSED_FUNC -static int32_t tscGetMgmtConnMaxRetryTimes() { - int32_t factor = 2; - SRpcEpSet dump; - tscDumpMgmtEpSet(&dump); - return dump.numOfEps * factor; -} - void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { STscObj *pObj = (STscObj *)param; if (pObj == NULL) return; @@ -425,21 +418,18 @@ int doProcessSql(SSqlObj *pSql) { } int tscProcessSql(SSqlObj *pSql) { - char * name = NULL; + char *name = NULL; SSqlCmd *pCmd = &pSql->cmd; - SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); STableMetaInfo *pTableMetaInfo = NULL; uint32_t type = 0; if (pQueryInfo != NULL) { pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - if (pTableMetaInfo != NULL) { - name = pTableMetaInfo->name; - } - + name = (pTableMetaInfo != NULL)? pTableMetaInfo->name:NULL; type = pQueryInfo->type; - + // while numOfTables equals to 0, it must be Heartbeat assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0); } @@ -451,7 +441,6 @@ int tscProcessSql(SSqlObj *pSql) { return pSql->res.code; } } else if (pCmd->command < TSDB_SQL_LOCAL) { - //pSql->epSet = tscMgmtEpSet; } else { // local handler return (*tscProcessMsgRsp[pCmd->command])(pSql); @@ -598,11 +587,11 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char } else { pVgroupInfo = &pTableMeta->vgroupInfo; } - tscSetDnodeEpSet(pSql, pVgroupInfo); - if (pVgroupInfo != NULL) { - pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId); - } + assert(pVgroupInfo != NULL); + + tscSetDnodeEpSet(pSql, pVgroupInfo); + pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId); STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; pTableIdInfo->tid = htonl(pTableMeta->id.tid); @@ -1885,11 +1874,10 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { for (int32_t k = 0; k < pVgroups->numOfEps; ++k) { pVgroups->epAddr[k].port = htons(pVgroups->epAddr[k].port); - } - - pMsg += size; } + + pMsg += size; } return pSql->res.code; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index c7e7d1323b..7258ac528e 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1085,7 +1085,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { assert(taos_errno(pSql) == code); - tscError("%p abort query, code:%d, global code:%d", pSql, code, pParentSql->res.code); + tscError("%p abort query, code:%s, global code:%s", pSql, tstrerror(code), tstrerror(pParentSql->res.code)); pParentSql->res.code = code; quitAllSubquery(pParentSql, pSupporter); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 32a82a080f..5ee3db36d1 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2034,6 +2034,10 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) { } int32_t numOfVgroups = pTableMetaInfo->vgroupList->numOfVgroups; + if (pTableMetaInfo->pVgroupTables != NULL) { + numOfVgroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables); + } + return tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && (!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vgroupIndex < numOfVgroups - 1); } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 6efc8a827e..fb13972689 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -2216,7 +2216,7 @@ static void ensureOutputBufferSimple(SQueryRuntimeEnv* pRuntimeEnv, int32_t capa static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pBlockInfo) { // in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block SQuery* pQuery = pRuntimeEnv->pQuery; - if (!QUERY_IS_INTERVAL_QUERY(pQuery) && !pRuntimeEnv->groupbyNormalCol && !isFixedOutputQuery(pRuntimeEnv)) { + if (!QUERY_IS_INTERVAL_QUERY(pQuery) && !pRuntimeEnv->groupbyNormalCol && !isFixedOutputQuery(pRuntimeEnv) && !isTSCompQuery(pQuery)) { SResultRec *pRec = &pQuery->rec; if (pQuery->rec.capacity - pQuery->rec.rows < pBlockInfo->rows) { diff --git a/tests/script/general/parser/join.sim b/tests/script/general/parser/join.sim index 07f2cd3f77..882f561ae1 100644 --- a/tests/script/general/parser/join.sim +++ b/tests/script/general/parser/join.sim @@ -257,6 +257,21 @@ if $data01 != $val then return -1 endi +sql select count(join_tb1.*) + count(join_tb0.*) from join_tb1 , join_tb0 where join_tb1.ts = join_tb0.ts and join_tb1.ts >= 100000 and join_tb0.c7 = false;; +if $rows != 1 then + return -1 +endi + +if $data00 != 20.000000000 then + print expect 20.000000000 actual $data00 + return -1 +endi + +sql select count(join_tb1.*)/10 from join_tb1 , join_tb0 where join_tb1.ts = join_tb0.ts and join_tb1.ts >= 100000 and join_tb0.c7 = false;; +if $data00 != 1.000000000 then + return -1 +endi + print 3 #agg + where condition sql select count(join_tb1.c3), count(join_tb0.ts) from $tb1 , $tb2 where $ts1 = $ts2 and join_tb1.ts <= 100002 and join_tb0.c7 = true; @@ -381,6 +396,15 @@ if $data00 != $val then return -1 endi +sql select sum(join_mt0.c1)+sum(join_mt0.c1) from join_mt0, join_mt1 where join_mt0.ts = join_mt1.ts and join_mt0.t1=join_mt1.t1 and join_mt0.c2=99 and join_mt1.ts=100999; +if $rows != 1 then + return -1 +endi + +if $data00 != 396.000000000 then + return -1 +endi + # first/last sql select count(join_mt0.c1), sum(join_mt1.c2), first(join_mt0.c5), last(join_mt1.c7) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts and join_mt0.t1=1 interval(10a) order by join_mt0.ts asc; diff --git a/tests/script/general/parser/union.sim b/tests/script/general/parser/union.sim index 358bcb8a40..b9dc8e8e1f 100644 --- a/tests/script/general/parser/union.sim +++ b/tests/script/general/parser/union.sim @@ -1,10 +1,10 @@ -#system sh/stop_dnodes.sh -# -#system sh/deploy.sh -n dnode1 -i 1 -#system sh/cfg.sh -n dnode1 -c walLevel -v 0 -#system sh/cfg.sh -n dnode1 -c debugFlag -v 135 -#system sh/cfg.sh -n dnode1 -c rpcDebugFlag -v 135 -#system sh/exec.sh -n dnode1 -s start +system sh/stop_dnodes.sh + +system sh/deploy.sh -n dnode1 -i 1 +system sh/cfg.sh -n dnode1 -c walLevel -v 0 +system sh/cfg.sh -n dnode1 -c debugFlag -v 135 +system sh/cfg.sh -n dnode1 -c rpcDebugFlag -v 135 +system sh/exec.sh -n dnode1 -s start sleep 1000 sql connect @@ -24,77 +24,77 @@ $mt = $mtPrefix . $i $j = 1 $mt1 = $mtPrefix . $j -# -#sql drop database if exits $db -x step1 -#step1: -#sql create database if not exists $db maxtables 4 + +sql drop database if exits $db -x step1 +step1: +sql create database if not exists $db maxtables 4 sql use $db -#sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int) -# -#$i = 0 -#$t = 1578203484000 -# -#while $i < $tbNum -# $tb = $tbPrefix . $i -# sql create table $tb using $mt tags( $i ) -# -# $x = 0 -# while $x < $rowNum -# $ms = $x * 1000 -# $ms = $ms * 60 -# -# $c = $x / 100 -# $c = $c * 100 -# $c = $x - $c -# $binary = 'binary . $c -# $binary = $binary . ' -# $nchar = 'nchar . $c -# $nchar = $nchar . ' -# -# $t1 = $t + $ms -# sql insert into $tb values ($t1 , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar ) -# $x = $x + 1 -# endw -# -# $i = $i + 1 -#endw -# -#sql create table $mt1 (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int) -# -#$j = 0 -#$t = 1578203484000 -#$rowNum = 1000 -#$tbNum = 5 -#$i = 0 -# -#while $i < $tbNum -# $tb1 = $tbPrefix1 . $j -# sql create table $tb1 using $mt1 tags( $i ) -# -# $x = 0 -# while $x < $rowNum -# $ms = $x * 1000 -# $ms = $ms * 60 -# -# $c = $x / 100 -# $c = $c * 100 -# $c = $x - $c -# $binary = 'binary . $c -# $binary = $binary . ' -# $nchar = 'nchar . $c -# $nchar = $nchar . ' -# -# $t1 = $t + $ms -# sql insert into $tb1 values ($t1 , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar ) -# $x = $x + 1 -# endw -# -# $i = $i + 1 -# $j = $j + 1 -#endw -# -#print sleep 1sec. -#sleep 1000 +sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int) + +$i = 0 +$t = 1578203484000 + +while $i < $tbNum + $tb = $tbPrefix . $i + sql create table $tb using $mt tags( $i ) + + $x = 0 + while $x < $rowNum + $ms = $x * 1000 + $ms = $ms * 60 + + $c = $x / 100 + $c = $c * 100 + $c = $x - $c + $binary = 'binary . $c + $binary = $binary . ' + $nchar = 'nchar . $c + $nchar = $nchar . ' + + $t1 = $t + $ms + sql insert into $tb values ($t1 , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar ) + $x = $x + 1 + endw + + $i = $i + 1 +endw + +sql create table $mt1 (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int) + +$j = 0 +$t = 1578203484000 +$rowNum = 1000 +$tbNum = 5 +$i = 0 + +while $i < $tbNum + $tb1 = $tbPrefix1 . $j + sql create table $tb1 using $mt1 tags( $i ) + + $x = 0 + while $x < $rowNum + $ms = $x * 1000 + $ms = $ms * 60 + + $c = $x / 100 + $c = $c * 100 + $c = $x - $c + $binary = 'binary . $c + $binary = $binary . ' + $nchar = 'nchar . $c + $nchar = $nchar . ' + + $t1 = $t + $ms + sql insert into $tb1 values ($t1 , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar ) + $x = $x + 1 + endw + + $i = $i + 1 + $j = $j + 1 +endw + +print sleep 1sec. +sleep 1000 $i = 1 $tb = $tbPrefix . $i -- GitLab