提交 c2ee6972 编写于 作者: H hjxilinx

[td-198] super table interval query join without ts matched.

上级 cf212a1e
......@@ -27,6 +27,8 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql);
void tscGetQualifiedTSList(SSqlObj* pSql, SJoinSubquerySupporter* p1, SJoinSubquerySupporter* p2, int32_t* num);
void tscSetupOutputColumnIndex(SSqlObj* pSql);
int32_t tscLaunchSecondPhaseDirectly(SSqlObj* pSql, SSubqueryState* pState);
int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql);
void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code);
......
......@@ -196,7 +196,7 @@ typedef struct SDataBlockList {
typedef struct SQueryInfo {
int16_t command; // the command may be different for each subclause, so keep it seperately.
uint16_t type; // query/insert/import type
uint32_t type; // query/insert/import type
char slidingTimeUnit;
int64_t etime, stime;
......
......@@ -219,6 +219,97 @@ bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
return false;
}
int32_t tscLaunchSecondPhaseDirectly(SSqlObj* pSql, SSubqueryState* pState) {
/*
* If the columns are not involved in the final select clause, the secondary query will not be launched
* for the subquery.
*/
pSql->res.qhandle = 0x1;
pSql->res.numOfRows = 0;
tscTrace("%p start to launch secondary subqueries", pSql);
bool success = true;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SJoinSubquerySupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i);
assert(pSupporter != NULL);
SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, pSupporter, NULL);
if (pNew == NULL) {
tscDestroyJoinSupporter(pSupporter);
success = false;
break;
}
pSql->pSubs[i] = pNew;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
pQueryInfo->tsBuf = NULL; // transfer the ownership of timestamp comp-z data to the new created object
// set the second stage sub query for join process
pQueryInfo->type |= TSDB_QUERY_TYPE_JOIN_SEC_STAGE;
/*
* if the first column of the secondary query is not ts function, add this function.
* Because this column is required to filter with timestamp after intersecting.
*/
assert(pNew->numOfSubs == 0 && pNew->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1);
/*
* if the first column of the secondary query is not ts function, add this function.
* Because this column is required to filter with timestamp after intersecting.
*/
if (tscSqlExprGet(pQueryInfo, 0)->functionId != TSDB_FUNC_TS) {
tscAddTimestampColumn(pQueryInfo, TSDB_FUNC_TS, 0);
}
tscFieldInfoCalOffset(pQueryInfo);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
/*
* When handling the projection query, the offset value will be modified for table-table join, which is changed
* during the timestamp intersection.
*/
// fetch the join tag column
if (UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, 0);
assert(pQueryInfo->tagCond.joinInfo.hasJoin);
int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pQueryInfo->tagCond, pMeterMetaInfo->pMeterMeta->uid);
pExpr->param[0].i64Key = tagColIndex;
pExpr->numOfParams = 1;
}
tscPrintSelectClause(pNew, 0);
tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
pSql, pNew, 0, pMeterMetaInfo->vnodeIndex, pQueryInfo->type,
pQueryInfo->exprsInfo.numOfExprs, pQueryInfo->colList.numOfCols,
pQueryInfo->fieldsInfo.numOfOutputCols, pQueryInfo->pMeterInfo[0]->name);
}
//prepare the subqueries object failed, abort
if (!success) {
pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
tscError("%p failed to prepare subqueries objs for secondary phase query, numOfSub:%d, code:%d", pSql,
pSql->numOfSubs, pSql->res.code);
freeSubqueryObj(pSql);
return pSql->res.code;
}
for(int32_t i = 0; i < pSql->numOfSubs; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
if (pSub == NULL) {
continue;
}
tscProcessSql(pSub);
}
return TSDB_CODE_SUCCESS;
}
/*
* launch secondary stage query to fetch the result that contains timestamp in set
*/
......
......@@ -949,7 +949,11 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
} else {
*sqlstr = sql;
}
if (*sqlstr == NULL) {
code = TSDB_CODE_INVALID_SQL;
}
return code;
}
......@@ -1290,7 +1294,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
SQueryInfo *pQueryInfo = NULL;
tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo);
uint16_t type = (sToken.type == TK_INSERT)? TSDB_QUERY_TYPE_INSERT:TSDB_QUERY_TYPE_IMPORT;
uint32_t type = (sToken.type == TK_INSERT)? TSDB_QUERY_TYPE_INSERT:TSDB_QUERY_TYPE_IMPORT;
TSDB_QUERY_SET_TYPE(pQueryInfo->type, type);
sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL);
......
......@@ -1135,6 +1135,10 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
const char* msg5 = "invalid function name";
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
if (isSTable) {
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_QUERY);
}
for (int32_t i = 0; i < pSelection->nExpr; ++i) {
int32_t outputIndex = pQueryInfo->exprsInfo.numOfExprs;
......@@ -3653,7 +3657,9 @@ static int32_t validateJoinExpr(SQueryInfo* pQueryInfo, SCondExpr* pCondExpr) {
}
if (!pCondExpr->tsJoin) {
return invalidSqlErrMsg(pQueryInfo->msg, msg2);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TS_NO_MATCH_JOIN_QUERY);
} else {
TSDB_QUERY_UNSET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TS_NO_MATCH_JOIN_QUERY);
}
return TSDB_CODE_SUCCESS;
......@@ -5625,9 +5631,14 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
pQuerySql->pSortOrder == NULL);
return doLocalQueryProcess(pQueryInfo, pQuerySql);
}
if (pQuerySql->from->nExpr > TSDB_MAX_JOIN_TABLE_NUM) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg7);
if (pQuerySql->from->nExpr > 1) {
if (pQuerySql->from->nExpr > 2) { // not support more than 2 tables join query
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg7);
}
// set the timestamp not matched join query
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TS_NO_MATCH_JOIN_QUERY);
}
pQueryInfo->command = TSDB_SQL_SELECT;
......
......@@ -635,7 +635,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
pSql->pSubs[pSql->numOfSubs++] = pNew;
assert(pSql->numOfSubs <= pSupporter->pState->numOfTotal);
if (QUERY_IS_JOIN_QUERY(pQueryInfo->type)) {
addGroupInfoForSubquery(pSql, pNew, 0, tableIndex);
......@@ -749,7 +749,7 @@ int tscProcessSql(SSqlObj *pSql) {
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SMeterMetaInfo *pMeterMetaInfo = NULL;
int16_t type = 0;
uint32_t type = 0;
if (pQueryInfo != NULL) {
pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
......@@ -794,26 +794,37 @@ int tscProcessSql(SSqlObj *pSql) {
if (QUERY_IS_JOIN_QUERY(type)) {
if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0) {
SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
pState->numOfTotal = pQueryInfo->numOfTables;
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
SJoinSubquerySupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i);
if (pSupporter == NULL) { // failed to create support struct, abort current query
tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i);
pState->numOfCompleted = pQueryInfo->numOfTables - i - 1;
pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
return pSql->res.code;
if ((pQueryInfo->type & TSDB_QUERY_TYPE_TS_NO_MATCH_JOIN_QUERY) != 0) {
pSql->numOfSubs = pQueryInfo->numOfTables;
if (pSql->pSubs == NULL) {
pSql->pSubs = calloc(pSql->numOfSubs, POINTER_BYTES);
if (pSql->pSubs == NULL) {
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
}
int32_t code = tscLaunchJoinSubquery(pSql, i, pSupporter);
if (code != TSDB_CODE_SUCCESS) { // failed to create subquery object, quit query
tscDestroyJoinSupporter(pSupporter);
pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
break;
tscLaunchSecondPhaseDirectly(pSql, pState);
} else {
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
SJoinSubquerySupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i);
if (pSupporter == NULL) { // failed to create support struct, abort current query
tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i);
pState->numOfCompleted = pQueryInfo->numOfTables - i - 1;
pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
return pSql->res.code;
}
int32_t code = tscLaunchJoinSubquery(pSql, i, pSupporter);
if (code != TSDB_CODE_SUCCESS) { // failed to create subquery object, quit query
tscDestroyJoinSupporter(pSupporter);
pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
break;
}
}
}
......
......@@ -551,9 +551,36 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) {
}
if (numOfTableHasRes >= 2) { // do merge result
success = (doSetResultRowData(pSql->pSubs[0], false) != NULL) &&
(doSetResultRowData(pSql->pSubs[1], false) != NULL);
// printf("first:%" PRId64 ", second:%" PRId64 "\n", key1, key2);
bool s1 = doSetResultRowData(pSql->pSubs[0], false);
bool s2 = doSetResultRowData(pSql->pSubs[1], false);
success = s1 && s2;
if (success) {
TSKEY key1 = *(TSKEY*) pSql->pSubs[0]->res.tsrow[0];
TSKEY key2 = *(TSKEY*) pSql->pSubs[1]->res.tsrow[0];
while(1) {
if (key1 < key2) {
s1 = doSetResultRowData(pSql->pSubs[0], false);
if (!s1) { // retrieve next block
break;
}
} else if (key1 > key2) {
s2 = doSetResultRowData(pSql->pSubs[1], false);
if (!s2) {
break;
}
} else {
break;
}
key1 = *(TSKEY *)pSql->pSubs[0]->res.tsrow[0];
key2 = *(TSKEY *)pSql->pSubs[1]->res.tsrow[0];
}
success = s1 && s2;
// printf("first:%" PRId64 ", second:%" PRId64 "\n", key1, key2);
}
} else { // only one subquery
SSqlObj *pSub = pSql->pSubs[0];
if (pSub == NULL) {
......
......@@ -230,10 +230,12 @@ extern "C" {
#define TSDB_QUERY_TYPE_INSERT 0x100U // insert type
#define TSDB_QUERY_TYPE_IMPORT 0x200U // import data
#define TSDB_QUERY_TYPE_TS_NO_MATCH_JOIN_QUERY 0x400u // join query without ts match
#define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0)
#define TSDB_QUERY_SET_TYPE(x, _type) ((x) |= (_type))
#define TSDB_QUERY_RESET_TYPE(x) ((x) = TSDB_QUERY_TYPE_NON_TYPE)
#define TSDB_QUERY_UNSET_TYPE(x, _type) ((x) &= ~(_type))
#define TSQL_SO_ASC 1
#define TSQL_SO_DESC 0
......
......@@ -115,7 +115,7 @@ enum {
};
#define QUERY_IS_STABLE_QUERY(type) (((type)&TSDB_QUERY_TYPE_STABLE_QUERY) != 0)
#define QUERY_IS_JOIN_QUERY(type) (((type)&TSDB_QUERY_TYPE_JOIN_QUERY) != 0)
#define QUERY_IS_JOIN_QUERY(type) (((type)&(TSDB_QUERY_TYPE_JOIN_QUERY|TSDB_QUERY_TYPE_TS_NO_MATCH_JOIN_QUERY)) != 0)
#define QUERY_IS_PROJECTION_QUERY(type) (((type)&TSDB_QUERY_TYPE_PROJECTION_QUERY) != 0)
#define QUERY_IS_FREE_RESOURCE(type) (((type)&TSDB_QUERY_TYPE_FREE_RESOURCE) != 0)
......
......@@ -2665,7 +2665,6 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
int32_t j = 0;
TSKEY lastKey = -1;
int32_t lastIndex = -1;
//bool firstAccessedPoint = true;
for (j = 0; j < (*forwardStep); ++j) {
int32_t offset = GET_COL_DATA_POS(pQuery, j, step);
......
......@@ -217,7 +217,7 @@ int32_t tVariantCompare(const tVariant *pDst, const tVariant *pSrc) {
case TSDB_DATA_TYPE_NCHAR:
return strncasecmp(pSrc->pz,pDst->pz,pSrc->nLen);
default:
return 1;
return 0;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册