提交 d58310d9 编写于 作者: H Haojun Liao

[td-225] fix bugs in join/union query.

上级 4e2ed952
...@@ -220,14 +220,13 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi ...@@ -220,14 +220,13 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) { if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
tscFetchDatablockFromSubquery(pSql); tscFetchDatablockFromSubquery(pSql);
} else if (pRes->completed) { } else if (pRes->completed) {
if(pCmd->command == TSDB_SQL_FETCH) { if(pCmd->command == TSDB_SQL_FETCH || (pCmd->command >= TSDB_SQL_SERV_STATUS && pCmd->command <= TSDB_SQL_CURRENT_USER)) {
if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes. if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode); tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode);
return;
} else { } else {
/* /*
* all available virtual node has been checked already, now we need to check * all available virtual nodes in current clause has been checked already, now try the
* for the next subclause queries * next one in the following union subclause
*/ */
if (pCmd->clauseIndex < pCmd->numOfClause - 1) { if (pCmd->clauseIndex < pCmd->numOfClause - 1) {
tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode); tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
...@@ -235,11 +234,12 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi ...@@ -235,11 +234,12 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
} }
/* /*
* 1. has reach the limitation * 1. has reach the limitation
* 2. no remain virtual nodes to be retrieved anymore * 2. no remain virtual nodes to be retrieved anymore
*/ */
(*pSql->fetchFp)(param, pSql, 0); (*pSql->fetchFp)(param, pSql, 0);
} }
return; return;
} else if (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE) { } else if (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE) {
// in case of show command, return no data // in case of show command, return no data
......
...@@ -293,7 +293,7 @@ static void tscProcessCurrentDB(SSqlObj *pSql) { ...@@ -293,7 +293,7 @@ static void tscProcessCurrentDB(SSqlObj *pSql) {
char db[TSDB_DB_NAME_LEN] = {0}; char db[TSDB_DB_NAME_LEN] = {0};
extractDBName(pSql->pTscObj->db, db); extractDBName(pSql->pTscObj->db, db);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resType = TSDB_DATA_TYPE_BINARY; pExpr->resType = TSDB_DATA_TYPE_BINARY;
...@@ -314,7 +314,7 @@ static void tscProcessCurrentDB(SSqlObj *pSql) { ...@@ -314,7 +314,7 @@ static void tscProcessCurrentDB(SSqlObj *pSql) {
static void tscProcessServerVer(SSqlObj *pSql) { static void tscProcessServerVer(SSqlObj *pSql) {
const char* v = pSql->pTscObj->sversion; const char* v = pSql->pTscObj->sversion;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resType = TSDB_DATA_TYPE_BINARY; pExpr->resType = TSDB_DATA_TYPE_BINARY;
......
...@@ -46,19 +46,27 @@ void tscSaveSubscriptionProgress(void* sub); ...@@ -46,19 +46,27 @@ void tscSaveSubscriptionProgress(void* sub);
static int32_t minMsgSize() { return tsRpcHeadSize + 100; } static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) { static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
assert(pSql != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0);
SRpcEpSet* pEpSet = &pSql->epSet; SRpcEpSet* pEpSet = &pSql->epSet;
pEpSet->inUse = 0; pEpSet->inUse = 0;
if (pVgroupInfo == NULL) {
pEpSet->numOfEps = 0; // apply the FQDN string length check here
return; bool hasFqdn = false;
}
pEpSet->numOfEps = pVgroupInfo->numOfEps; pEpSet->numOfEps = pVgroupInfo->numOfEps;
for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) { for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
strcpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn); strcpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn);
pEpSet->port[i] = pVgroupInfo->epAddr[i].port; pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
if (!hasFqdn) {
hasFqdn = (strlen(pEpSet->fqdn[i]) > 0);
}
} }
assert(hasFqdn);
} }
static void tscDumpMgmtEpSet(SRpcEpSet *epSet) { static void tscDumpMgmtEpSet(SRpcEpSet *epSet) {
taosCorBeginRead(&tscMgmtEpSet.version); taosCorBeginRead(&tscMgmtEpSet.version);
*epSet = tscMgmtEpSet.epSet; *epSet = tscMgmtEpSet.epSet;
...@@ -128,21 +136,6 @@ void tscPrintMgmtEp() { ...@@ -128,21 +136,6 @@ void tscPrintMgmtEp() {
} }
} }
/*
* For each management node, try twice at least in case of poor network situation.
* If the client start to connect to a non-management node from the client, and the first retry may fail due to
* the poor network quality. And then, the second retry get the response with redirection command.
* The retry will not be executed since only *two* retry is allowed in case of single management node in the cluster.
* Therefore, we need to multiply the retry times by factor of 2 to fix this problem.
*/
UNUSED_FUNC
static int32_t tscGetMgmtConnMaxRetryTimes() {
int32_t factor = 2;
SRpcEpSet dump;
tscDumpMgmtEpSet(&dump);
return dump.numOfEps * factor;
}
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
STscObj *pObj = (STscObj *)param; STscObj *pObj = (STscObj *)param;
if (pObj == NULL) return; if (pObj == NULL) return;
...@@ -425,21 +418,18 @@ int doProcessSql(SSqlObj *pSql) { ...@@ -425,21 +418,18 @@ int doProcessSql(SSqlObj *pSql) {
} }
int tscProcessSql(SSqlObj *pSql) { int tscProcessSql(SSqlObj *pSql) {
char * name = NULL; char *name = NULL;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = NULL; STableMetaInfo *pTableMetaInfo = NULL;
uint32_t type = 0; uint32_t type = 0;
if (pQueryInfo != NULL) { if (pQueryInfo != NULL) {
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (pTableMetaInfo != NULL) { name = (pTableMetaInfo != NULL)? pTableMetaInfo->name:NULL;
name = pTableMetaInfo->name;
}
type = pQueryInfo->type; type = pQueryInfo->type;
// while numOfTables equals to 0, it must be Heartbeat // while numOfTables equals to 0, it must be Heartbeat
assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0); assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
} }
...@@ -451,7 +441,6 @@ int tscProcessSql(SSqlObj *pSql) { ...@@ -451,7 +441,6 @@ int tscProcessSql(SSqlObj *pSql) {
return pSql->res.code; return pSql->res.code;
} }
} else if (pCmd->command < TSDB_SQL_LOCAL) { } else if (pCmd->command < TSDB_SQL_LOCAL) {
//pSql->epSet = tscMgmtEpSet; //pSql->epSet = tscMgmtEpSet;
} else { // local handler } else { // local handler
return (*tscProcessMsgRsp[pCmd->command])(pSql); return (*tscProcessMsgRsp[pCmd->command])(pSql);
...@@ -598,11 +587,11 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char ...@@ -598,11 +587,11 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
} else { } else {
pVgroupInfo = &pTableMeta->vgroupInfo; pVgroupInfo = &pTableMeta->vgroupInfo;
} }
tscSetDnodeEpSet(pSql, pVgroupInfo);
if (pVgroupInfo != NULL) { assert(pVgroupInfo != NULL);
pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
} tscSetDnodeEpSet(pSql, pVgroupInfo);
pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->tid = htonl(pTableMeta->id.tid); pTableIdInfo->tid = htonl(pTableMeta->id.tid);
...@@ -1885,11 +1874,10 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { ...@@ -1885,11 +1874,10 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
for (int32_t k = 0; k < pVgroups->numOfEps; ++k) { for (int32_t k = 0; k < pVgroups->numOfEps; ++k) {
pVgroups->epAddr[k].port = htons(pVgroups->epAddr[k].port); pVgroups->epAddr[k].port = htons(pVgroups->epAddr[k].port);
} }
pMsg += size;
} }
pMsg += size;
} }
return pSql->res.code; return pSql->res.code;
......
...@@ -1085,7 +1085,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { ...@@ -1085,7 +1085,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
assert(taos_errno(pSql) == code); assert(taos_errno(pSql) == code);
tscError("%p abort query, code:%d, global code:%d", pSql, code, pParentSql->res.code); tscError("%p abort query, code:%s, global code:%s", pSql, tstrerror(code), tstrerror(pParentSql->res.code));
pParentSql->res.code = code; pParentSql->res.code = code;
quitAllSubquery(pParentSql, pSupporter); quitAllSubquery(pParentSql, pSupporter);
......
...@@ -2034,6 +2034,10 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) { ...@@ -2034,6 +2034,10 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) {
} }
int32_t numOfVgroups = pTableMetaInfo->vgroupList->numOfVgroups; int32_t numOfVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
if (pTableMetaInfo->pVgroupTables != NULL) {
numOfVgroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables);
}
return tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && return tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) &&
(!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vgroupIndex < numOfVgroups - 1); (!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vgroupIndex < numOfVgroups - 1);
} }
......
...@@ -2216,7 +2216,7 @@ static void ensureOutputBufferSimple(SQueryRuntimeEnv* pRuntimeEnv, int32_t capa ...@@ -2216,7 +2216,7 @@ static void ensureOutputBufferSimple(SQueryRuntimeEnv* pRuntimeEnv, int32_t capa
static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pBlockInfo) { static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pBlockInfo) {
// in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block // in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
if (!QUERY_IS_INTERVAL_QUERY(pQuery) && !pRuntimeEnv->groupbyNormalCol && !isFixedOutputQuery(pRuntimeEnv)) { if (!QUERY_IS_INTERVAL_QUERY(pQuery) && !pRuntimeEnv->groupbyNormalCol && !isFixedOutputQuery(pRuntimeEnv) && !isTSCompQuery(pQuery)) {
SResultRec *pRec = &pQuery->rec; SResultRec *pRec = &pQuery->rec;
if (pQuery->rec.capacity - pQuery->rec.rows < pBlockInfo->rows) { if (pQuery->rec.capacity - pQuery->rec.rows < pBlockInfo->rows) {
......
...@@ -257,6 +257,21 @@ if $data01 != $val then ...@@ -257,6 +257,21 @@ if $data01 != $val then
return -1 return -1
endi endi
sql select count(join_tb1.*) + count(join_tb0.*) from join_tb1 , join_tb0 where join_tb1.ts = join_tb0.ts and join_tb1.ts >= 100000 and join_tb0.c7 = false;;
if $rows != 1 then
return -1
endi
if $data00 != 20.000000000 then
print expect 20.000000000 actual $data00
return -1
endi
sql select count(join_tb1.*)/10 from join_tb1 , join_tb0 where join_tb1.ts = join_tb0.ts and join_tb1.ts >= 100000 and join_tb0.c7 = false;;
if $data00 != 1.000000000 then
return -1
endi
print 3 print 3
#agg + where condition #agg + where condition
sql select count(join_tb1.c3), count(join_tb0.ts) from $tb1 , $tb2 where $ts1 = $ts2 and join_tb1.ts <= 100002 and join_tb0.c7 = true; sql select count(join_tb1.c3), count(join_tb0.ts) from $tb1 , $tb2 where $ts1 = $ts2 and join_tb1.ts <= 100002 and join_tb0.c7 = true;
...@@ -381,6 +396,15 @@ if $data00 != $val then ...@@ -381,6 +396,15 @@ if $data00 != $val then
return -1 return -1
endi endi
sql select sum(join_mt0.c1)+sum(join_mt0.c1) from join_mt0, join_mt1 where join_mt0.ts = join_mt1.ts and join_mt0.t1=join_mt1.t1 and join_mt0.c2=99 and join_mt1.ts=100999;
if $rows != 1 then
return -1
endi
if $data00 != 396.000000000 then
return -1
endi
# first/last # first/last
sql select count(join_mt0.c1), sum(join_mt1.c2), first(join_mt0.c5), last(join_mt1.c7) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts and join_mt0.t1=1 interval(10a) order by join_mt0.ts asc; sql select count(join_mt0.c1), sum(join_mt1.c2), first(join_mt0.c5), last(join_mt1.c7) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts and join_mt0.t1=1 interval(10a) order by join_mt0.ts asc;
......
#system sh/stop_dnodes.sh system sh/stop_dnodes.sh
#
#system sh/deploy.sh -n dnode1 -i 1 system sh/deploy.sh -n dnode1 -i 1
#system sh/cfg.sh -n dnode1 -c walLevel -v 0 system sh/cfg.sh -n dnode1 -c walLevel -v 0
#system sh/cfg.sh -n dnode1 -c debugFlag -v 135 system sh/cfg.sh -n dnode1 -c debugFlag -v 135
#system sh/cfg.sh -n dnode1 -c rpcDebugFlag -v 135 system sh/cfg.sh -n dnode1 -c rpcDebugFlag -v 135
#system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
sleep 1000 sleep 1000
sql connect sql connect
...@@ -24,77 +24,77 @@ $mt = $mtPrefix . $i ...@@ -24,77 +24,77 @@ $mt = $mtPrefix . $i
$j = 1 $j = 1
$mt1 = $mtPrefix . $j $mt1 = $mtPrefix . $j
#
#sql drop database if exits $db -x step1 sql drop database if exits $db -x step1
#step1: step1:
#sql create database if not exists $db maxtables 4 sql create database if not exists $db maxtables 4
sql use $db sql use $db
#sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int) sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int)
#
#$i = 0 $i = 0
#$t = 1578203484000 $t = 1578203484000
#
#while $i < $tbNum while $i < $tbNum
# $tb = $tbPrefix . $i $tb = $tbPrefix . $i
# sql create table $tb using $mt tags( $i ) sql create table $tb using $mt tags( $i )
#
# $x = 0 $x = 0
# while $x < $rowNum while $x < $rowNum
# $ms = $x * 1000 $ms = $x * 1000
# $ms = $ms * 60 $ms = $ms * 60
#
# $c = $x / 100 $c = $x / 100
# $c = $c * 100 $c = $c * 100
# $c = $x - $c $c = $x - $c
# $binary = 'binary . $c $binary = 'binary . $c
# $binary = $binary . ' $binary = $binary . '
# $nchar = 'nchar . $c $nchar = 'nchar . $c
# $nchar = $nchar . ' $nchar = $nchar . '
#
# $t1 = $t + $ms $t1 = $t + $ms
# sql insert into $tb values ($t1 , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar ) sql insert into $tb values ($t1 , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar )
# $x = $x + 1 $x = $x + 1
# endw endw
#
# $i = $i + 1 $i = $i + 1
#endw endw
#
#sql create table $mt1 (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int) sql create table $mt1 (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int)
#
#$j = 0 $j = 0
#$t = 1578203484000 $t = 1578203484000
#$rowNum = 1000 $rowNum = 1000
#$tbNum = 5 $tbNum = 5
#$i = 0 $i = 0
#
#while $i < $tbNum while $i < $tbNum
# $tb1 = $tbPrefix1 . $j $tb1 = $tbPrefix1 . $j
# sql create table $tb1 using $mt1 tags( $i ) sql create table $tb1 using $mt1 tags( $i )
#
# $x = 0 $x = 0
# while $x < $rowNum while $x < $rowNum
# $ms = $x * 1000 $ms = $x * 1000
# $ms = $ms * 60 $ms = $ms * 60
#
# $c = $x / 100 $c = $x / 100
# $c = $c * 100 $c = $c * 100
# $c = $x - $c $c = $x - $c
# $binary = 'binary . $c $binary = 'binary . $c
# $binary = $binary . ' $binary = $binary . '
# $nchar = 'nchar . $c $nchar = 'nchar . $c
# $nchar = $nchar . ' $nchar = $nchar . '
#
# $t1 = $t + $ms $t1 = $t + $ms
# sql insert into $tb1 values ($t1 , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar ) sql insert into $tb1 values ($t1 , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar )
# $x = $x + 1 $x = $x + 1
# endw endw
#
# $i = $i + 1 $i = $i + 1
# $j = $j + 1 $j = $j + 1
#endw endw
#
#print sleep 1sec. print sleep 1sec.
#sleep 1000 sleep 1000
$i = 1 $i = 1
$tb = $tbPrefix . $i $tb = $tbPrefix . $i
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册