提交 98e99fd4 编写于 作者: H Haojun Liao

[td-225] fix bugs in union query.

上级 66dbdcdd
......@@ -263,9 +263,11 @@ int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid);
void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex);
bool hasMoreVnodesToTry(SSqlObj *pSql);
bool hasMoreClauseToTry(SSqlObj* pSql);
void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)());
void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp);
int tscSetMgmtIpListFromCfg(const char *first, const char *second);
void* malloc_throw(size_t size);
......
......@@ -169,7 +169,11 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
}
if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
tscFetchDatablockFromSubquery(pSql);
} else {
tscProcessSql(pSql);
}
}
/*
......
......@@ -485,7 +485,6 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
case TSDB_SQL_SELECT: {
assert(pCmd->numOfClause == 1);
const char* msg1 = "columns in select clause not identical";
for (int32_t i = pCmd->numOfClause; i < pInfo->subclauseInfo.numOfClause; ++i) {
......@@ -496,16 +495,19 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
assert(pCmd->numOfClause == pInfo->subclauseInfo.numOfClause);
for (int32_t i = 0; i < pInfo->subclauseInfo.numOfClause; ++i) {
for (int32_t i = pCmd->clauseIndex; i < pInfo->subclauseInfo.numOfClause; ++i) {
SQuerySQL* pQuerySql = pInfo->subclauseInfo.pClause[i];
tscTrace("%p start to parse %dth subclause, total:%d", pSql, i, pInfo->subclauseInfo.numOfClause);
if ((code = doCheckForQuery(pSql, pQuerySql, i)) != TSDB_CODE_SUCCESS) {
return code;
}
tscPrintSelectClause(pSql, i);
pCmd->clauseIndex += 1;
}
// restore the clause index
pCmd->clauseIndex = 0;
// set the command/global limit parameters from the first subclause to the sqlcmd object
SQueryInfo* pQueryInfo1 = tscGetQueryInfoDetail(pCmd, 0);
pCmd->command = pQueryInfo1->command;
......@@ -5867,6 +5869,8 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo);
}
assert(pCmd->clauseIndex == index);
// too many result columns not support order by in query
if (pQuerySql->pSelection->nExpr > TSDB_MAX_COLUMNS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg8);
......@@ -5980,12 +5984,11 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
pQueryInfo->window.ekey = pQueryInfo->window.ekey / 1000;
}
} else { // set the time rang
pQueryInfo->window.skey = TSKEY_INITIAL_VAL;
pQueryInfo->window.ekey = INT64_MAX;
pQueryInfo->window = TSWINDOW_INITIALIZER;
}
// user does not specified the query time window, twa is not allowed in such case.
if ((pQueryInfo->window.skey == 0 || pQueryInfo->window.ekey == INT64_MAX ||
if ((pQueryInfo->window.skey == INT64_MIN || pQueryInfo->window.ekey == INT64_MAX ||
(pQueryInfo->window.ekey == INT64_MAX / 1000 && tinfo.precision == TSDB_TIME_PRECISION_MILLI)) && tscIsTWAQuery(pQueryInfo)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg9);
}
......
......@@ -475,7 +475,7 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload;
pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
pRetrieveMsg->free = htons(pQueryInfo->type);
// todo valid the vgroupId at the client side
......
......@@ -424,7 +424,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
}
// current data set are exhausted, fetch more data from node
if (pRes->row >= pRes->numOfRows && (pRes->completed != true || hasMoreVnodesToTry(pSql)) &&
if (pRes->row >= pRes->numOfRows && (pRes->completed != true || hasMoreVnodesToTry(pSql) || hasMoreClauseToTry(pSql)) &&
(pCmd->command == TSDB_SQL_RETRIEVE ||
pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE ||
pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE ||
......
......@@ -848,13 +848,14 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
SSqlRes* pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
// TODO put to async res?
if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
assert(numOfRows == taos_errno(pSql));
pParentSql->res.code = numOfRows;
tscError("%p retrieve failed, index:%d, code:%s", pSql, pSupporter->subqueryIndex, tstrerror(numOfRows));
tscQueueAsyncRes(pParentSql);
return;
}
if (numOfRows >= 0) {
......@@ -941,14 +942,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
SSqlRes *pRes = &pSub->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0);
// STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
// if (pRes->row >= pRes->numOfRows && pTableMetaInfo->vgroupIndex < pTableMetaInfo->vgroupList->numOfVgroups &&
// (!tscHasReachLimitation(pQueryInfo, pRes)) && !pRes->completed) {
// numOfFetch++;
// }
// } else {
if (!tscHasReachLimitation(pQueryInfo, pRes)) {
if (pRes->row >= pRes->numOfRows) {
hasData = false;
......@@ -963,9 +957,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
break;
}
}
}
// }
// has data remains in client side, and continue to return data to app
if (hasData) {
......@@ -1026,7 +1018,7 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
tscDebug("%p all subquery response, retrieve data", pSql);
tscDebug("%p all subquery response, retrieve data for subclause:%d", pSql, pCmd->clauseIndex);
// the column transfer support struct has been built
if (pRes->pColumnIndex != NULL) {
......@@ -1195,7 +1187,10 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
pNew->cmd.numOfCols = 0;
pNewQueryInfo->intervalTime = 0;
memset(&pNewQueryInfo->limit, 0, sizeof(SLimitVal));
pSupporter->limit = pNewQueryInfo->limit;
pNewQueryInfo->limit.limit = -1;
pNewQueryInfo->limit.offset = 0;
// backup the data and clear it in the sqlcmd object
pSupporter->groupbyExpr = pNewQueryInfo->groupbyExpr;
......@@ -1982,88 +1977,119 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
void tscBuildResFromSubqueries(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
static char* getResultBlockPosition(SSqlCmd* pCmd, SSqlRes* pRes, int32_t columnIndex, int16_t* bytes) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (pRes->code != TSDB_CODE_SUCCESS) {
tscQueueAsyncRes(pSql);
return;
}
SFieldSupInfo* pInfo = (SFieldSupInfo*) TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.pSupportInfo, columnIndex);
assert(pInfo->pSqlExpr != NULL);
while (1) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
*bytes = pInfo->pSqlExpr->resBytes;
char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows;
if (pRes->tsrow == NULL) {
pRes->tsrow = calloc(numOfExprs, POINTER_BYTES);
pRes->length = calloc(numOfExprs, sizeof(int32_t));
}
return pData;
}
bool success = false;
static void doBuildResFromSubqueries(SSqlObj* pSql) {
SSqlRes* pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
int32_t numOfTableHasRes = 0;
int32_t numOfRes = INT32_MAX;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
if (pSql->pSubs[i] != NULL) {
numOfTableHasRes++;
}
if (pSql->pSubs[i] == NULL) {
continue;
}
if (numOfTableHasRes >= 2) { // do merge result
success = (doSetResultRowData(pSql->pSubs[0], false) != NULL) && (doSetResultRowData(pSql->pSubs[1], false) != NULL);
} else { // only one subquery
SSqlObj *pSub = pSql->pSubs[0];
if (pSub == NULL) {
pSub = pSql->pSubs[1];
numOfRes = MIN(numOfRes, pSql->pSubs[i]->res.numOfRows);
}
success = (doSetResultRowData(pSub, false) != NULL);
}
int32_t totalSize = tscGetResRowLength(pQueryInfo->exprList);
pRes->pRsp = realloc(pRes->pRsp, numOfRes * totalSize);
pRes->data = pRes->pRsp;
if (success) { // current row of final output has been built, return to app
for (int32_t i = 0; i < numOfExprs; ++i) {
char* data = pRes->data;
int16_t bytes = 0;
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
for(int32_t i = 0; i < numOfExprs; ++i) {
SColumnIndex* pIndex = &pRes->pColumnIndex[i];
SSqlRes *pRes1 = &pSql->pSubs[pIndex->tableIndex]->res;
pRes->tsrow[i] = pRes1->tsrow[pIndex->columnIndex];
pRes->length[i] = pRes1->length[pIndex->columnIndex];
SSqlCmd *pCmd1 = &pSql->pSubs[pIndex->tableIndex]->cmd;
char* pData = getResultBlockPosition(pCmd1, pRes1, pIndex->columnIndex, &bytes);
memcpy(data, pData, bytes * numOfRes);
data += bytes * numOfRes;
pRes1->row = numOfRes;
}
pRes->numOfClauseTotal++;
break;
} else { // continue retrieve data from vnode
if (!tscHasRemainDataInSubqueryResultSet(pSql)) {
tscDebug("%p at least one subquery exhausted, free all other %d subqueries", pSql, pSql->numOfSubs - 1);
SSubqueryState *pState = NULL;
pRes->numOfRows = numOfRes;
pRes->numOfClauseTotal += numOfRes;
}
// free all sub sqlobj
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SSqlObj *pChildObj = pSql->pSubs[i];
if (pChildObj == NULL) {
continue;
void tscBuildResFromSubqueries(SSqlObj *pSql) {
SSqlRes* pRes = &pSql->res;
if (pRes->code != TSDB_CODE_SUCCESS) {
tscQueueAsyncRes(pSql);
return;
}
SJoinSupporter *pSupporter = (SJoinSupporter *)pChildObj->param;
pState = pSupporter->pState;
if (pRes->tsrow == NULL) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
tscDestroyJoinSupporter(pChildObj->param);
taos_free_result(pChildObj);
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
pRes->tsrow = calloc(numOfExprs, POINTER_BYTES);
pRes->buffer = calloc(numOfExprs, POINTER_BYTES);
pRes->length = calloc(numOfExprs, sizeof(int32_t));
tscRestoreSQLFuncForSTableQuery(pQueryInfo);
}
free(pState);
while (1) {
if (pRes->row < pRes->numOfRows) {
assert(0);
}
pRes->completed = true; // set query completed
doBuildResFromSubqueries(pSql);
sem_post(&pSql->rspSem);
return;
}
// continue retrieve data from vnode
// if (!tscHasRemainDataInSubqueryResultSet(pSql)) {
// tscDebug("%p at least one subquery exhausted, free all other %d subqueries", pSql, pSql->numOfSubs - 1);
// SSubqueryState* pState = NULL;
//
// // free all sub sqlobj
// for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
// SSqlObj* pChildObj = pSql->pSubs[i];
// if (pChildObj == NULL) {
// continue;
// }
//
// SJoinSupporter* pSupporter = (SJoinSupporter*)pChildObj->param;
// pState = pSupporter->pState;
//
// tscDestroyJoinSupporter(pChildObj->param);
// taos_free_result(pChildObj);
// }
//
// free(pState);
//
// pRes->completed = true; // set query completed
// sem_post(&pSql->rspSem);
// return;
// }
tscFetchDatablockFromSubquery(pSql);
if (pRes->code != TSDB_CODE_SUCCESS) {
return;
}
}
}
if (pSql->res.code == TSDB_CODE_SUCCESS) {
(*pSql->fp)(pSql->param, pSql, 0);
(*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
} else {
tscQueueAsyncRes(pSql);
}
......@@ -2117,14 +2143,6 @@ void **doSetResultRowData(SSqlObj *pSql, bool finalResult) {
assert(pRes->row >= 0 && pRes->row <= pRes->numOfRows);
if(pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
if (pRes->completed) {
tfree(pRes->tsrow);
}
return pRes->tsrow;
}
if (pRes->row >= pRes->numOfRows) { // all the results has returned to invoker
tfree(pRes->tsrow);
return pRes->tsrow;
......@@ -2182,7 +2200,7 @@ void **doSetResultRowData(SSqlObj *pSql, bool finalResult) {
return pRes->tsrow;
}
static bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
bool hasData = true;
SSqlCmd *pCmd = &pSql->cmd;
......
......@@ -1994,6 +1994,10 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) {
(!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vgroupIndex < numOfVgroups - 1);
}
bool hasMoreClauseToTry(SSqlObj* pSql) {
return pSql->cmd.clauseIndex < pSql->cmd.numOfClause - 1;
}
void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
......@@ -2050,7 +2054,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
}
}
void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) {
void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
......@@ -2070,17 +2074,13 @@ void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) {
tfree(pSql->pSubs);
pSql->numOfSubs = 0;
if (pSql->fp != NULL) {
pSql->fp = queryFp;
assert(queryFp != NULL);
}
pSql->fp = fp;
tscDebug("%p try data in the next subclause:%d, total subclause:%d", pSql, pCmd->clauseIndex, pCmd->numOfClause);
if (pCmd->command > TSDB_SQL_LOCAL) {
tscProcessLocalCmd(pSql);
} else {
tscProcessSql(pSql);
tscDoQuery(pSql);
}
}
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c debugFlag -v 135
system sh/cfg.sh -n dnode1 -c rpcDebugFlag -v 135
system sh/exec.sh -n dnode1 -s start
#system sh/stop_dnodes.sh
#
#system sh/deploy.sh -n dnode1 -i 1
#system sh/cfg.sh -n dnode1 -c walLevel -v 0
#system sh/cfg.sh -n dnode1 -c debugFlag -v 135
#system sh/cfg.sh -n dnode1 -c rpcDebugFlag -v 135
#system sh/exec.sh -n dnode1 -s start
sleep 1000
sql connect
......@@ -24,77 +24,77 @@ $mt = $mtPrefix . $i
$j = 1
$mt1 = $mtPrefix . $j
sql drop database if exits $db -x step1
step1:
sql create database if not exists $db maxtables 4
#
#sql drop database if exits $db -x step1
#step1:
#sql create database if not exists $db maxtables 4
sql use $db
sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int)
$i = 0
$t = 1578203484000
while $i < $tbNum
$tb = $tbPrefix . $i
sql create table $tb using $mt tags( $i )
$x = 0
while $x < $rowNum
$ms = $x * 1000
$ms = $ms * 60
$c = $x / 100
$c = $c * 100
$c = $x - $c
$binary = 'binary . $c
$binary = $binary . '
$nchar = 'nchar . $c
$nchar = $nchar . '
$t1 = $t + $ms
sql insert into $tb values ($t1 , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar )
$x = $x + 1
endw
$i = $i + 1
endw
sql create table $mt1 (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int)
$j = 0
$t = 1578203484000
$rowNum = 1000
$tbNum = 5
$i = 0
while $i < $tbNum
$tb1 = $tbPrefix1 . $j
sql create table $tb1 using $mt1 tags( $i )
$x = 0
while $x < $rowNum
$ms = $x * 1000
$ms = $ms * 60
$c = $x / 100
$c = $c * 100
$c = $x - $c
$binary = 'binary . $c
$binary = $binary . '
$nchar = 'nchar . $c
$nchar = $nchar . '
$t1 = $t + $ms
sql insert into $tb1 values ($t1 , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar )
$x = $x + 1
endw
$i = $i + 1
$j = $j + 1
endw
print sleep 1sec.
sleep 1000
#sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int)
#
#$i = 0
#$t = 1578203484000
#
#while $i < $tbNum
# $tb = $tbPrefix . $i
# sql create table $tb using $mt tags( $i )
#
# $x = 0
# while $x < $rowNum
# $ms = $x * 1000
# $ms = $ms * 60
#
# $c = $x / 100
# $c = $c * 100
# $c = $x - $c
# $binary = 'binary . $c
# $binary = $binary . '
# $nchar = 'nchar . $c
# $nchar = $nchar . '
#
# $t1 = $t + $ms
# sql insert into $tb values ($t1 , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar )
# $x = $x + 1
# endw
#
# $i = $i + 1
#endw
#
#sql create table $mt1 (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int)
#
#$j = 0
#$t = 1578203484000
#$rowNum = 1000
#$tbNum = 5
#$i = 0
#
#while $i < $tbNum
# $tb1 = $tbPrefix1 . $j
# sql create table $tb1 using $mt1 tags( $i )
#
# $x = 0
# while $x < $rowNum
# $ms = $x * 1000
# $ms = $ms * 60
#
# $c = $x / 100
# $c = $c * 100
# $c = $x - $c
# $binary = 'binary . $c
# $binary = $binary . '
# $nchar = 'nchar . $c
# $nchar = $nchar . '
#
# $t1 = $t + $ms
# sql insert into $tb1 values ($t1 , $c , $c , $c , $c , $c , $c , $c , $binary , $nchar )
# $x = $x + 1
# endw
#
# $i = $i + 1
# $j = $j + 1
#endw
#
#print sleep 1sec.
#sleep 1000
$i = 1
$tb = $tbPrefix . $i
......@@ -222,7 +222,7 @@ endi
print ===========================================tags union
# two super table tag union, limit is not active during retrieve tags query
sql select t1 from union_mt0 union all select t1 from union_mt0 limit 1
sql select t1 from union_mt0 union all select t1 from union_mt0
if $rows != 20 then
return -1
endi
......@@ -235,6 +235,10 @@ if $data90 != 9 then
return -1
endi
#sql select t1 from union_mt0 union all select t1 from union_mt0 limit 1
#if $row != 11 then
# return -1
#endi
#========================================== two super table join subclause
print ================two super table join subclause
sql select avg(union_mt0.c1) as c from union_mt0 interval(1h) limit 10 union all select union_mt1.ts, union_mt1.c1/1.0 as c from union_mt0, union_mt1 where union_mt1.ts=union_mt0.ts and union_mt1.t1=union_mt0.t1 limit 5;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册