提交 624fb8b3 编写于 作者: D dapan1121

support multiple tables join

上级 bbfde59f
......@@ -83,6 +83,22 @@ typedef struct SJoinSupporter {
SArray* pVgroupTables;
} SJoinSupporter;
typedef struct SMergeCtx {
SJoinSupporter* p;
int32_t idx;
SArray* res;
int8_t compared;
}SMergeCtx;
typedef struct SMergeTsCtx {
SJoinSupporter* p;
STSBuf* res;
int64_t numOfInput;
int8_t compared;
}SMergeTsCtx;
typedef struct SVgroupTableInfo {
SVgroupInfo vgInfo;
SArray* itemList; //SArray<STableIdInfo>
......@@ -183,6 +199,7 @@ int32_t tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deep
void tscSqlExprInfoDestroy(SArray* pExprInfo);
SColumn* tscColumnClone(const SColumn* src);
bool tscColumnExists(SArray* pColumnList, SColumnIndex* pColIndex);
SColumn* tscColumnListInsert(SArray* pColList, SColumnIndex* colIndex);
SArray* tscColumnListClone(const SArray* src, int16_t tableIndex);
void tscColumnListDestroy(SArray* pColList);
......
......@@ -142,15 +142,15 @@ typedef struct SCond {
} SCond;
typedef struct SJoinNode {
char tableName[TSDB_TABLE_FNAME_LEN];
uint64_t uid;
int16_t tagColId;
SArray* tsJoin;
SArray* tagJoin;
} SJoinNode;
typedef struct SJoinInfo {
bool hasJoin;
SJoinNode left;
SJoinNode right;
SJoinNode* joinTables[TSDB_MAX_JOIN_TABLE_NUM];
} SJoinInfo;
typedef struct STagCond {
......
......@@ -3334,23 +3334,25 @@ static int32_t getColumnQueryCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSq
}
}
static int32_t getJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr) {
const char* msg1 = "invalid join query condition";
const char* msg2 = "invalid table name in join query";
static int32_t checkAndSetJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr) {
int32_t code = 0;
const char* msg1 = "timestamp required for join tables";
const char* msg3 = "type of join columns must be identical";
const char* msg4 = "invalid column name in join condition";
const char* msg5 = "only support one join tag for each table";
if (pExpr == NULL) {
return TSDB_CODE_SUCCESS;
}
if (!tSqlExprIsParentOfLeaf(pExpr)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
code = checkAndSetJoinCondInfo(pCmd, pQueryInfo, pExpr->pLeft);
if (code) {
return code;
}
STagCond* pTagCond = &pQueryInfo->tagCond;
SJoinNode* pLeft = &pTagCond->joinInfo.left;
SJoinNode* pRight = &pTagCond->joinInfo.right;
return checkAndSetJoinCondInfo(pCmd, pQueryInfo, pExpr->pRight);
}
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
if (getColumnIndexByName(pCmd, &pExpr->pLeft->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
......@@ -3360,13 +3362,28 @@ static int32_t getJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr*
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
SSchema* pTagSchema1 = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex);
pLeft->uid = pTableMetaInfo->pTableMeta->id.uid;
pLeft->tagColId = pTagSchema1->colId;
assert(index.tableIndex >= 0 && index.tableIndex < TSDB_MAX_JOIN_TABLE_NUM);
int32_t code = tNameExtractFullName(&pTableMetaInfo->name, pLeft->tableName);
if (code != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
SJoinNode **leftNode = &pQueryInfo->tagCond.joinInfo.joinTables[index.tableIndex];
if (*leftNode == NULL) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
(*leftNode)->uid = pTableMetaInfo->pTableMeta->id.uid;
(*leftNode)->tagColId = pTagSchema1->colId;
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
index.columnIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
if (!tscColumnExists(pTableMetaInfo->tagColList, &index)) {
tscColumnListInsert(pTableMetaInfo->tagColList, &index);
if (taosArrayGetSize(pTableMetaInfo->tagColList) > 1) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
}
}
}
int16_t leftIdx = index.tableIndex;
index = (SColumnIndex)COLUMN_INDEX_INITIALIZER;
if (getColumnIndexByName(pCmd, &pExpr->pRight->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
......@@ -3376,20 +3393,55 @@ static int32_t getJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr*
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
SSchema* pTagSchema2 = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex);
pRight->uid = pTableMetaInfo->pTableMeta->id.uid;
pRight->tagColId = pTagSchema2->colId;
assert(index.tableIndex >= 0 && index.tableIndex < TSDB_MAX_JOIN_TABLE_NUM);
code = tNameExtractFullName(&pTableMetaInfo->name, pRight->tableName);
if (code != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
SJoinNode **rightNode = &pQueryInfo->tagCond.joinInfo.joinTables[index.tableIndex];
if (*rightNode == NULL) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
(*rightNode)->uid = pTableMetaInfo->pTableMeta->id.uid;
(*rightNode)->tagColId = pTagSchema2->colId;
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
index.columnIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
if (!tscColumnExists(pTableMetaInfo->tagColList, &index)) {
tscColumnListInsert(pTableMetaInfo->tagColList, &index);
if (taosArrayGetSize(pTableMetaInfo->tagColList) > 1) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
}
}
}
int16_t rightIdx = index.tableIndex;
if (pTagSchema1->type != pTagSchema2->type) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
pTagCond->joinInfo.hasJoin = true;
if ((*leftNode)->tagJoin == NULL) {
(*leftNode)->tagJoin = taosArrayInit(2, sizeof(int16_t));
}
if ((*rightNode)->tagJoin == NULL) {
(*rightNode)->tagJoin = taosArrayInit(2, sizeof(int16_t));
}
taosArrayPush((*leftNode)->tagJoin, &rightIdx);
taosArrayPush((*rightNode)->tagJoin, &leftIdx);
pQueryInfo->tagCond.joinInfo.hasJoin = true;
return TSDB_CODE_SUCCESS;
}
static int32_t getJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr) {
if (pExpr == NULL) {
return TSDB_CODE_SUCCESS;
}
return checkAndSetJoinCondInfo(pCmd, pQueryInfo, pExpr);
}
static int32_t validateSQLExpr(SSqlCmd* pCmd, tSqlExpr* pExpr, SQueryInfo* pQueryInfo, SColumnList* pList,
......@@ -3655,7 +3707,6 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql
const char* msg1 = "table query cannot use tags filter";
const char* msg2 = "illegal column name";
const char* msg3 = "only one query time range allowed";
const char* msg4 = "only one join condition allowed";
const char* msg5 = "not support ordinary column join";
const char* msg6 = "only one query condition on tbname allowed";
const char* msg7 = "only in/like allowed in filter table name";
......@@ -3686,6 +3737,45 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY);
pCondExpr->tsJoin = true;
assert(index.tableIndex >= 0 && index.tableIndex < TSDB_MAX_JOIN_TABLE_NUM);
SJoinNode **leftNode = &pQueryInfo->tagCond.joinInfo.joinTables[index.tableIndex];
if (*leftNode == NULL) {
*leftNode = calloc(1, sizeof(SJoinNode));
if (*leftNode == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
int16_t leftIdx = index.tableIndex;
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
if (getColumnIndexByName(pCmd, &pRight->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
assert(index.tableIndex >= 0 && index.tableIndex < TSDB_MAX_JOIN_TABLE_NUM);
SJoinNode **rightNode = &pQueryInfo->tagCond.joinInfo.joinTables[index.tableIndex];
if (*rightNode == NULL) {
*rightNode = calloc(1, sizeof(SJoinNode));
if (*rightNode == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
int16_t rightIdx = index.tableIndex;
if ((*leftNode)->tsJoin == NULL) {
(*leftNode)->tsJoin = taosArrayInit(2, sizeof(int16_t));
}
if ((*rightNode)->tsJoin == NULL) {
(*rightNode)->tsJoin = taosArrayInit(2, sizeof(int16_t));
}
taosArrayPush((*leftNode)->tsJoin, &rightIdx);
taosArrayPush((*rightNode)->tsJoin, &leftIdx);
/*
* to release expression, e.g., m1.ts = m2.ts,
* since this expression is used to set the join query type
......@@ -3743,10 +3833,6 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql
return TSDB_CODE_TSC_INVALID_SQL;
}
if (pCondExpr->pJoinExpr != NULL) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
}
pQueryInfo->type |= TSDB_QUERY_TYPE_JOIN_QUERY;
ret = setExprToCond(&pCondExpr->pJoinExpr, *pExpr, NULL, parentOptr, pQueryInfo->msg);
*pExpr = NULL;
......@@ -3974,6 +4060,7 @@ static bool validateFilterExpr(SQueryInfo* pQueryInfo) {
static int32_t getTimeRangeFromExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr) {
const char* msg0 = "invalid timestamp";
const char* msg1 = "only one time stamp window allowed";
int32_t code = 0;
if (pExpr == NULL) {
return TSDB_CODE_SUCCESS;
......@@ -3984,7 +4071,10 @@ static int32_t getTimeRangeFromExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlE
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
getTimeRangeFromExpr(pCmd, pQueryInfo, pExpr->pLeft);
code = getTimeRangeFromExpr(pCmd, pQueryInfo, pExpr->pLeft);
if (code) {
return code;
}
return getTimeRangeFromExpr(pCmd, pQueryInfo, pExpr->pRight);
} else {
......@@ -4066,6 +4156,7 @@ static void cleanQueryExpr(SCondExpr* pCondExpr) {
}
}
/*
static void doAddJoinTagsColumnsIntoTagList(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SCondExpr* pCondExpr) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (QUERY_IS_JOIN_QUERY(pQueryInfo->type) && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
......@@ -4088,6 +4179,7 @@ static void doAddJoinTagsColumnsIntoTagList(SSqlCmd* pCmd, SQueryInfo* pQueryInf
tscColumnListInsert(pTableMetaInfo->tagColList, &index);
}
}
*/
static int32_t validateTagCondExpr(SSqlCmd* pCmd, tExprNode *p) {
const char *msg1 = "invalid tag operator";
......@@ -4223,6 +4315,32 @@ static int32_t getTagQueryCondExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SCondE
return ret;
}
int32_t validateJoinNodes(SQueryInfo* pQueryInfo, SSqlObj* pSql) {
const char* msg1 = "timestamp required for join tables";
const char* msg2 = "tag required for join stables";
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
SJoinNode *node = pQueryInfo->tagCond.joinInfo.joinTables[i];
if (node == NULL || node->tsJoin == NULL || taosArrayGetSize(node->tsJoin) <= 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(&pSql->cmd), msg1);
}
}
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
SJoinNode *node = pQueryInfo->tagCond.joinInfo.joinTables[i];
if (node == NULL || node->tagJoin == NULL || taosArrayGetSize(node->tagJoin) <= 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(&pSql->cmd), msg2);
}
}
}
return TSDB_CODE_SUCCESS;
}
int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSql) {
if (pExpr == NULL) {
return TSDB_CODE_SUCCESS;
......@@ -4243,7 +4361,7 @@ int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSql
int32_t type = 0;
if ((ret = getQueryCondExpr(&pSql->cmd, pQueryInfo, pExpr, &condExpr, &type, (*pExpr)->tokenId)) != TSDB_CODE_SUCCESS) {
return ret;
goto PARSE_WHERE_EXIT;
}
tSqlExprCompact(pExpr);
......@@ -4253,32 +4371,32 @@ int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSql
// 1. check if it is a join query
if ((ret = validateJoinExpr(&pSql->cmd, pQueryInfo, &condExpr)) != TSDB_CODE_SUCCESS) {
return ret;
goto PARSE_WHERE_EXIT;
}
// 2. get the query time range
if ((ret = getTimeRangeFromExpr(&pSql->cmd, pQueryInfo, condExpr.pTimewindow)) != TSDB_CODE_SUCCESS) {
return ret;
goto PARSE_WHERE_EXIT;
}
// 3. get the tag query condition
if ((ret = getTagQueryCondExpr(&pSql->cmd, pQueryInfo, &condExpr, pExpr)) != TSDB_CODE_SUCCESS) {
return ret;
goto PARSE_WHERE_EXIT;
}
// 4. get the table name query condition
if ((ret = getTablenameCond(&pSql->cmd, pQueryInfo, condExpr.pTableCond, &sb)) != TSDB_CODE_SUCCESS) {
return ret;
goto PARSE_WHERE_EXIT;
}
// 5. other column query condition
if ((ret = getColumnQueryCondInfo(&pSql->cmd, pQueryInfo, condExpr.pColumnCond, TK_AND)) != TSDB_CODE_SUCCESS) {
return ret;
goto PARSE_WHERE_EXIT;
}
// 6. join condition
if ((ret = getJoinCondInfo(&pSql->cmd, pQueryInfo, condExpr.pJoinExpr)) != TSDB_CODE_SUCCESS) {
return ret;
goto PARSE_WHERE_EXIT;
}
// 7. query condition for table name
......@@ -4286,12 +4404,24 @@ int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSql
ret = setTableCondForSTableQuery(&pSql->cmd, pQueryInfo, getAccountId(pSql), condExpr.pTableCond, condExpr.tableCondIndex, &sb);
taosStringBuilderDestroy(&sb);
if (ret) {
goto PARSE_WHERE_EXIT;
}
if (!validateFilterExpr(pQueryInfo)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(&pSql->cmd), msg2);
ret = invalidSqlErrMsg(tscGetErrorMsgPayload(&pSql->cmd), msg2);
goto PARSE_WHERE_EXIT;
}
doAddJoinTagsColumnsIntoTagList(&pSql->cmd, pQueryInfo, &condExpr);
//doAddJoinTagsColumnsIntoTagList(&pSql->cmd, pQueryInfo, &condExpr);
if (pQueryInfo->tagCond.joinInfo.hasJoin) {
ret = validateJoinNodes(pQueryInfo, pSql);
if (ret) {
goto PARSE_WHERE_EXIT;
}
}
PARSE_WHERE_EXIT:
cleanQueryExpr(&condExpr);
return ret;
......@@ -6504,7 +6634,6 @@ int32_t doValidateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t i
const char* msg1 = "point interpolation query needs timestamp";
const char* msg2 = "fill only available for interval query";
const char* msg3 = "start(end) time of query range required or time range too large";
const char* msg4 = "illegal number of tables in from clause";
const char* msg5 = "too many columns in selection clause";
const char* msg6 = "too many tables in from clause";
const char* msg7 = "invalid table alias name";
......@@ -6541,15 +6670,11 @@ int32_t doValidateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t i
size_t fromSize = taosArrayGetSize(pQuerySqlNode->from);
if (fromSize > TSDB_MAX_JOIN_TABLE_NUM * 2) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6);
}
pQueryInfo->command = TSDB_SQL_SELECT;
if (fromSize > 4) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6);
}
// set all query tables, which are maybe more than one.
for (int32_t i = 0; i < fromSize; ) {
tVariantListItem* item = taosArrayGet(pQuerySqlNode->from, i);
......
......@@ -264,6 +264,287 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
return output1->numOfTotal;
}
static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
win->skey = INT64_MAX;
win->ekey = INT64_MIN;
SLimitVal* pLimit = &pQueryInfo->limit;
int32_t order = pQueryInfo->order.order;
int32_t joinNum = pSql->subState.numOfSub;
SMergeTsCtx ctxlist[TSDB_MAX_JOIN_TABLE_NUM] = {0};
SMergeTsCtx* ctxStack[TSDB_MAX_JOIN_TABLE_NUM] = {0};
for (int32_t i = 0; i < joinNum; ++i) {
STSBuf* output = tsBufCreate(true, pQueryInfo->order.order);
SQueryInfo* pSubQueryInfo = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0);
pSubQueryInfo->tsBuf = output;
SJoinSupporter* pSupporter = pSql->pSubs[i]->param;
if (pSupporter[i]->pTSBuf == NULL) {
tscDebug("%p at least one ts-comp is empty, 0 for secondary query after ts blocks intersecting", pSql);
return 0;
}
tsBufResetPos(pSupporter[i]->pTSBuf);
if (!tsBufNextPos(pSupporter[i]->pTSBuf)) {
tscDebug("%p input1 is empty, 0 for secondary query after ts blocks intersecting", pSql);
return 0;
}
ctxlist[i].p = pSupporter;
ctxlist[i].res = output;
}
TSKEY st = taosGetTimestampUs();
int32_t slot = 0;
int32_t tableNum = 0;
int16_t* tableMIdx = 0;
int32_t equalNum = 0;
int32_t stackidx = 0;
int32_t mergeDone = 0;
SMergeCtx* ctx = NULL;
SMergeCtx* pctx = NULL;
STidTags* cur = NULL;
STidTags* prev = NULL;
SArray* tagCond = NULL;
for (int16_t tidx = 0; tidx < joinNum; tidx++) {
pctx = &ctxlist[tidx];
if (pctx->compared) {
continue;
}
assert(pctx->idx == 0 && taosArrayGetSize(pctx->res) == 0);
tagCond = pQueryInfo->tagCond.joinInfo.joinTables[tidx]->tagJoin;
taosArrayInsert(tagCond, 0, &tidx);
tableNum = taosArrayGetSize(tagCond);
assert(tableNum >= 1);
prev = (STidTags*) varDataVal(pctx->p->pIdTagList + pctx->idx * pctx->p->tagSize);
pctx->compared = 1;
ctxStack[stackidx++] = pctx;
tableMIdx = taosArrayGet(tagCond, ++slot);
equalNum = 1;
while (1) {
ctx = &ctxlist[*tableMIdx];
ctx->compared = 1;
cur = (STidTags*) varDataVal(ctx->p->pIdTagList + ctx->idx * ctx->p->tagSize);
assert(cur->tid != 0 && prev->tid != 0);
ctxStack[stackidx++] = ctx;
int32_t ret = doCompare(prev->tag, cur->tag, pColSchema->type, pColSchema->bytes);
if (ret == 0) {
if (++equalNum < tableNum) {
prev = cur;
pctx = ctx;
if (++slot >= tableNum) {
slot = 0;
}
tableMIdx = taosArrayGet(tagCond, slot);
continue;
}
tscDebug("%p tag matched, vgId:%d, val:%d, tid:%d, uid:%"PRIu64", tid:%d, uid:%"PRIu64, pParentSql, prev->vgId,
*(int*) prev->tag, prev->tid, prev->uid, cur->tid, cur->uid);
assert(stackidx == tableNum);
for (int32_t i = 0; i < stackidx; ++i) {
SMergeCtx* tctx = ctxStack[i];
prev = (STidTags*) varDataVal(tctx->p->pIdTagList + tctx->idx * tctx->p->tagSize);
taosArrayPush(tctx->res, &prev);
}
for (int32_t i = 0; i < stackidx; ++i) {
SMergeCtx* tctx = ctxStack[i];
if (++tctx->idx >= tctx->p->num) {
mergeDone = 1;
break;
}
}
if (mergeDone) {
break;
}
stackidx = 0;
equalNum = 1;
prev = (STidTags*) varDataVal(pctx->p->pIdTagList + pctx->idx * pctx->p->tagSize);
ctxStack[stackidx++] = pctx;
} else if (ret > 0) {
if (++ctx->idx >= ctx->p->num) {
break;
}
} else {
for (int32_t i = 0; i < stackidx; ++i) {
SMergeCtx* tctx = ctxStack[i];
if (++tctx->idx >= tctx->p->num) {
mergeDone = 1;
break;
}
}
if (mergeDone) {
break;
}
stackidx = 0;
equalNum = 1;
prev = (STidTags*) varDataVal(pctx->p->pIdTagList + pctx->idx * pctx->p->tagSize);
ctxStack[stackidx++] = pctx;
}
}
slot = 0;
mergeDone = 0;
}
int64_t numOfInput1 = 1;
int64_t numOfInput2 = 1;
while(1) {
STSElem elem = tsBufGetElem(pSupporter1->pTSBuf);
// no data in pSupporter1 anymore, jump out of loop
if (!tsBufIsValidElem(&elem)) {
break;
}
// find the data in supporter2 with the same tag value
STSElem e2 = tsBufFindElemStartPosByTag(pSupporter2->pTSBuf, elem.tag);
/**
* there are elements in pSupporter2 with the same tag, continue
*/
tVariant tag1 = {0};
tVariantAssign(&tag1, elem.tag);
if (tsBufIsValidElem(&e2)) {
while (1) {
STSElem elem1 = tsBufGetElem(pSupporter1->pTSBuf);
STSElem elem2 = tsBufGetElem(pSupporter2->pTSBuf);
// data with current are exhausted
if (!tsBufIsValidElem(&elem1) || tVariantCompare(elem1.tag, &tag1) != 0) {
break;
}
if (!tsBufIsValidElem(&elem2) || tVariantCompare(elem2.tag, &tag1) != 0) { // ignore all records with the same tag
skipRemainValue(pSupporter1->pTSBuf, &tag1);
break;
}
/*
* in case of stable query, limit/offset is not applied here. the limit/offset is applied to the
* final results which is acquired after the secondary merge of in the client.
*/
int32_t re = tsCompare(order, elem1.ts, elem2.ts);
if (re < 0) {
tsBufNextPos(pSupporter1->pTSBuf);
numOfInput1++;
} else if (re > 0) {
tsBufNextPos(pSupporter2->pTSBuf);
numOfInput2++;
} else {
if (pLimit->offset == 0 || pQueryInfo->interval.interval > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) {
if (win->skey > elem1.ts) {
win->skey = elem1.ts;
}
if (win->ekey < elem1.ts) {
win->ekey = elem1.ts;
}
tsBufAppend(output1, elem1.id, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
tsBufAppend(output2, elem2.id, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts));
} else {
pLimit->offset -= 1;//offset apply to projection?
}
tsBufNextPos(pSupporter1->pTSBuf);
numOfInput1++;
tsBufNextPos(pSupporter2->pTSBuf);
numOfInput2++;
}
}
} else { // no data in pSupporter2, ignore current data in pSupporter2
skipRemainValue(pSupporter1->pTSBuf, &tag1);
}
}
/*
* failed to set the correct ts order yet in two cases:
* 1. only one element
* 2. only one element for each tag.
*/
if (output1->tsOrder == -1) {
output1->tsOrder = TSDB_ORDER_ASC;
output2->tsOrder = TSDB_ORDER_ASC;
}
tsBufFlush(output1);
tsBufFlush(output2);
tsBufDestroy(pSupporter1->pTSBuf);
pSupporter1->pTSBuf = NULL;
tsBufDestroy(pSupporter2->pTSBuf);
pSupporter2->pTSBuf = NULL;
TSKEY et = taosGetTimestampUs();
tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks "
"intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elapsed time:%" PRId64 " us",
pSql, numOfInput1, numOfInput2, output1->numOfTotal, output1->numOfGroups, win->skey, win->ekey,
tsBufGetNumOfGroup(output1), et - st);
return output1->numOfTotal;
}
// todo handle failed to create sub query
SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, int32_t index) {
SJoinSupporter* pSupporter = calloc(1, sizeof(SJoinSupporter));
......@@ -652,7 +933,7 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr
size_t numOfTables = taosArrayGetSize(tables);
for (size_t i = 0; i < numOfTables; i++) {
STidTags* tt = taosArrayGet(tables, i);
STidTags* tt = *(STidTags **)taosArrayGet(tables, i);
if (prev == NULL || tt->vgId != prev->vgId) {
SVgroupsInfo* pvg = pTableMetaInfo->vgroupList;
......@@ -768,76 +1049,194 @@ static bool checkForDuplicateTagVal(SSchema* pColSchema, SJoinSupporter* p1, SSq
return true;
}
static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray** s1, SArray** s2) {
SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
tscDebug("%p all subquery retrieve <tid, tags> complete, do tags match, %d, %d", pParentSql, p1->num, p2->num);
// sort according to the tag value
qsort(p1->pIdTagList, p1->num, p1->tagSize, tagValCompar);
qsort(p2->pIdTagList, p2->num, p2->tagSize, tagValCompar);
static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray* resList) {
int16_t joinNum = pParentSql->subState.numOfSub;
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int16_t tagColId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid);
SJoinSupporter* p0 = pParentSql->pSubs[0]->param;
SMergeCtx ctxlist[TSDB_MAX_JOIN_TABLE_NUM] = {0};
SMergeCtx* ctxStack[TSDB_MAX_JOIN_TABLE_NUM] = {0};
// int16_t for padding
int32_t size = p0->tagSize - sizeof(int16_t);
SSchema* pColSchema = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, tagColId);
// int16_t for padding
int32_t size = p1->tagSize - sizeof(int16_t);
*s1 = taosArrayInit(p1->num, size);
*s2 = taosArrayInit(p2->num, size);
tscDebug("%p all subquery retrieve <tid, tags> complete, do tags match", pParentSql);
for (int32_t i = 0; i < joinNum; i++) {
SJoinSupporter* p = pParentSql->pSubs[i]->param;
ctxlist[i].p = p;
ctxlist[i].res = taosArrayInit(p->num, sizeof(STidTags*));
tscDebug("Join %d - num:%d", i, p->num);
// sort according to the tag valu
qsort(p->pIdTagList, p->num, p->tagSize, tagValCompar);
if (!(checkForDuplicateTagVal(pColSchema, p1, pParentSql) && checkForDuplicateTagVal(pColSchema, p2, pParentSql))) {
if (!checkForDuplicateTagVal(pColSchema, p, pParentSql)) {
for (int32_t j = 0; j <= i; j++) {
taosArrayDestroy(ctxlist[j].res);
}
return TSDB_CODE_QRY_DUP_JOIN_KEY;
}
}
int32_t i = 0, j = 0;
while(i < p1->num && j < p2->num) {
STidTags* pp1 = (STidTags*) varDataVal(p1->pIdTagList + i * p1->tagSize);
STidTags* pp2 = (STidTags*) varDataVal(p2->pIdTagList + j * p2->tagSize);
assert(pp1->tid != 0 && pp2->tid != 0);
int32_t slot = 0;
int32_t tableNum = 0;
int16_t* tableMIdx = 0;
int32_t equalNum = 0;
int32_t stackidx = 0;
int32_t mergeDone = 0;
SMergeCtx* ctx = NULL;
SMergeCtx* pctx = NULL;
STidTags* cur = NULL;
STidTags* prev = NULL;
SArray* tagCond = NULL;
int32_t ret = doCompare(pp1->tag, pp2->tag, pColSchema->type, pColSchema->bytes);
for (int16_t tidx = 0; tidx < joinNum; tidx++) {
pctx = &ctxlist[tidx];
if (pctx->compared) {
continue;
}
assert(pctx->idx == 0 && taosArrayGetSize(pctx->res) == 0);
tagCond = pQueryInfo->tagCond.joinInfo.joinTables[tidx]->tagJoin;
taosArrayInsert(tagCond, 0, &tidx);
tableNum = taosArrayGetSize(tagCond);
assert(tableNum >= 1);
prev = (STidTags*) varDataVal(pctx->p->pIdTagList + pctx->idx * pctx->p->tagSize);
pctx->compared = 1;
ctxStack[stackidx++] = pctx;
tableMIdx = taosArrayGet(tagCond, ++slot);
equalNum = 1;
while (1) {
ctx = &ctxlist[*tableMIdx];
ctx->compared = 1;
cur = (STidTags*) varDataVal(ctx->p->pIdTagList + ctx->idx * ctx->p->tagSize);
assert(cur->tid != 0 && prev->tid != 0);
ctxStack[stackidx++] = ctx;
int32_t ret = doCompare(prev->tag, cur->tag, pColSchema->type, pColSchema->bytes);
if (ret == 0) {
tscDebug("%p tag matched, vgId:%d, val:%d, tid:%d, uid:%"PRIu64", tid:%d, uid:%"PRIu64, pParentSql, pp1->vgId,
*(int*) pp1->tag, pp1->tid, pp1->uid, pp2->tid, pp2->uid);
if (++equalNum < tableNum) {
prev = cur;
pctx = ctx;
if (++slot >= tableNum) {
slot = 0;
}
tableMIdx = taosArrayGet(tagCond, slot);
continue;
}
tscDebug("%p tag matched, vgId:%d, val:%d, tid:%d, uid:%"PRIu64", tid:%d, uid:%"PRIu64, pParentSql, prev->vgId,
*(int*) prev->tag, prev->tid, prev->uid, cur->tid, cur->uid);
assert(stackidx == tableNum);
for (int32_t i = 0; i < stackidx; ++i) {
SMergeCtx* tctx = ctxStack[i];
prev = (STidTags*) varDataVal(tctx->p->pIdTagList + tctx->idx * tctx->p->tagSize);
taosArrayPush(tctx->res, &prev);
}
for (int32_t i = 0; i < stackidx; ++i) {
SMergeCtx* tctx = ctxStack[i];
if (++tctx->idx >= tctx->p->num) {
mergeDone = 1;
break;
}
}
if (mergeDone) {
break;
}
stackidx = 0;
equalNum = 1;
prev = (STidTags*) varDataVal(pctx->p->pIdTagList + pctx->idx * pctx->p->tagSize);
taosArrayPush(*s1, pp1);
taosArrayPush(*s2, pp2);
j++;
i++;
ctxStack[stackidx++] = pctx;
} else if (ret > 0) {
j++;
if (++ctx->idx >= ctx->p->num) {
break;
}
} else {
i++;
for (int32_t i = 0; i < stackidx; ++i) {
SMergeCtx* tctx = ctxStack[i];
if (++tctx->idx >= tctx->p->num) {
mergeDone = 1;
break;
}
}
if (mergeDone) {
break;
}
stackidx = 0;
equalNum = 1;
prev = (STidTags*) varDataVal(pctx->p->pIdTagList + pctx->idx * pctx->p->tagSize);
ctxStack[stackidx++] = pctx;
}
}
slot = 0;
mergeDone = 0;
}
for (int32_t i = 0; i < joinNum; ++i) {
// reorganize the tid-tag value according to both the vgroup id and tag values
// sort according to the tag value
size_t t1 = taosArrayGetSize(*s1);
size_t t2 = taosArrayGetSize(*s2);
size_t num = taosArrayGetSize(ctxlist[i].res);
qsort((*s1)->pData, t1, size, tidTagsCompar);
qsort((*s2)->pData, t2, size, tidTagsCompar);
qsort((ctxlist[i].res)->pData, num, size, tidTagsCompar);
#if 0
for(int32_t k = 0; k < t1; ++k) {
STidTags* p = (*s1)->pData + size * k;
printf("%d, tag:%s\n", p->vgId, ((tstr*)(p->tag))->data);
}
taosArrayPush(resList, &ctxlist[i].res);
for(int32_t k = 0; k < t1; ++k) {
STidTags* p = (*s2)->pData + size * k;
printf("%d, tag:%s\n", p->vgId, ((tstr*)(p->tag))->data);
tscDebug("%p tags match complete, result num: %"PRIzu, pParentSql, num);
}
#endif
tscDebug("%p tags match complete, result: %"PRIzu", %"PRIzu, pParentSql, t1, t2);
return TSDB_CODE_SUCCESS;
}
bool emptyTagList(SArray* resList, int32_t size) {
int32_t rsize = taosArrayGetSize(resList);
if (rsize != size) {
return true;
}
for (int32_t i = 0; i < size; ++i) {
SArray** s = taosArrayGet(resList, i);
if (taosArrayGetSize(*s) <= 0) {
return true;
}
}
return false;
}
static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
SJoinSupporter* pSupporter = (SJoinSupporter*)param;
......@@ -939,19 +1338,19 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
return;
}
SArray *s1 = NULL, *s2 = NULL;
int32_t code = getIntersectionOfTableTuple(pQueryInfo, pParentSql, &s1, &s2);
SArray* resList = taosArrayInit(pParentSql->subState.numOfSub, sizeof(SArray *));
int32_t code = getIntersectionOfTableTuple(pQueryInfo, pParentSql, resList);
if (code != TSDB_CODE_SUCCESS) {
freeJoinSubqueryObj(pParentSql);
pParentSql->res.code = code;
tscAsyncResultOnError(pParentSql);
taosArrayDestroy(s1);
taosArrayDestroy(s2);
taosArrayDestroy(resList);
return;
}
if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) { // no results,return.
if (emptyTagList(resList, pParentSql->subState.numOfSub)) { // no results,return.
assert(pParentSql->fp != tscJoinQueryCallback);
tscDebug("%p tag intersect does not generated qualified tables for join, free all sub SqlObj and quit", pParentSql);
......@@ -963,37 +1362,34 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
(*pParentSql->fp)(pParentSql->param, pParentSql, 0);
} else {
for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) {
// proceed to for ts_comp query
SSqlCmd* pSubCmd1 = &pParentSql->pSubs[0]->cmd;
SSqlCmd* pSubCmd2 = &pParentSql->pSubs[1]->cmd;
SSqlCmd* pSubCmd = &pParentSql->pSubs[m]->cmd;
SArray** s = taosArrayGet(resList, m);
SQueryInfo* pQueryInfo1 = tscGetQueryInfoDetail(pSubCmd1, 0);
STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo1, 0);
tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo1, s1);
SQueryInfo* pQueryInfo2 = tscGetQueryInfoDetail(pSubCmd2, 0);
STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0);
tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo2, s2);
SSqlObj* psub1 = pParentSql->pSubs[0];
((SJoinSupporter*)psub1->param)->pVgroupTables = tscVgroupTableInfoDup(pTableMetaInfo1->pVgroupTables);
SSqlObj* psub2 = pParentSql->pSubs[1];
((SJoinSupporter*)psub2->param)->pVgroupTables = tscVgroupTableInfoDup(pTableMetaInfo2->pVgroupTables);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pSubCmd, 0);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo, *s);
pParentSql->subState.numOfSub = 2;
SSqlObj* psub = pParentSql->pSubs[m];
((SJoinSupporter*)psub->param)->pVgroupTables = tscVgroupTableInfoDup(pTableMetaInfo->pVgroupTables);
memset(pParentSql->subState.states, 0, sizeof(pParentSql->subState.states[0]) * pParentSql->subState.numOfSub);
tscDebug("%p reset all sub states to 0", pParentSql);
for (int32_t m = 0; m < pParentSql->subState.numOfSub; ++m) {
SSqlObj* sub = pParentSql->pSubs[m];
issueTsCompQuery(sub, sub->param, pParentSql);
issueTsCompQuery(psub, psub->param, pParentSql);
}
}
taosArrayDestroy(s1);
taosArrayDestroy(s2);
int32_t rsize = taosArrayGetSize(resList);
for (int32_t i = 0; i < rsize; ++i) {
SArray** s = taosArrayGet(resList, i);
if (*s) {
taosArrayDestroy(*s);
}
}
taosArrayDestroy(resList);
}
static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
......@@ -1124,12 +1520,8 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
tscDebug("%p all subquery retrieve ts complete, do ts block intersect", pParentSql);
// proceeds to launched secondary query to retrieve final data
SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
STimeWindow win = TSWINDOW_INITIALIZER;
int64_t num = doTSBlockIntersect(pParentSql, p1, p2, &win);
int64_t num = doTSBlockIntersect(pParentSql, &win);
if (num <= 0) { // no result during ts intersect
tscDebug("%p no results generated in ts intersection, free all sub SqlObj and quit", pParentSql);
freeJoinSubqueryObj(pParentSql);
......
......@@ -1279,6 +1279,33 @@ int32_t tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepco
return 0;
}
bool tscColumnExists(SArray* pColumnList, SColumnIndex* pColIndex) {
// ignore the tbname columnIndex to be inserted into source list
if (pColIndex->columnIndex < 0) {
return false;
}
size_t numOfCols = taosArrayGetSize(pColumnList);
int16_t col = pColIndex->columnIndex;
int32_t i = 0;
while (i < numOfCols) {
SColumn* pCol = taosArrayGetP(pColumnList, i);
if ((pCol->colIndex.columnIndex != col) || (pCol->colIndex.tableIndex != pColIndex->tableIndex)) {
continue;
} else {
break;
}
}
if (i >= numOfCols || numOfCols == 0) {
return false;
}
return true;
}
SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex) {
// ignore the tbname columnIndex to be inserted into source list
if (pColIndex->columnIndex < 0) {
......@@ -1583,7 +1610,20 @@ int32_t tscTagCondCopy(STagCond* dest, const STagCond* src) {
dest->tbnameCond.uid = src->tbnameCond.uid;
dest->tbnameCond.len = src->tbnameCond.len;
memcpy(&dest->joinInfo, &src->joinInfo, sizeof(SJoinInfo));
dest->joinInfo.hasJoin = src->joinInfo.hasJoin;
for (int32_t i = 0; i < TSDB_MAX_JOIN_TABLE_NUM; ++i) {
if (src->joinInfo.joinTables[i]) {
dest->joinInfo.joinTables[i] = calloc(1, sizeof(SJoinNode));
memcpy(dest->joinInfo.joinTables[i], src->joinInfo.joinTables[i], sizeof(SJoinNode));
dest->joinInfo.joinTables[i]->tsJoin = taosArrayDup(src->joinInfo.joinTables[i]->tsJoin);
dest->joinInfo.joinTables[i]->tagJoin = taosArrayDup(src->joinInfo.joinTables[i]->tagJoin);
}
}
dest->relType = src->relType;
if (src->pCond == NULL) {
......@@ -1629,6 +1669,23 @@ void tscTagCondRelease(STagCond* pTagCond) {
taosArrayDestroy(pTagCond->pCond);
}
for (int32_t i = 0; i < TSDB_MAX_JOIN_TABLE_NUM; ++i) {
SJoinNode *node = pTagCond->joinInfo.joinTables[i];
if (node == NULL) {
continue;
}
if (node->tsJoin != NULL) {
taosArrayDestroy(node->tsJoin);
}
if (node->tagJoin != NULL) {
taosArrayDestroy(node->tagJoin);
}
tfree(node);
}
memset(pTagCond, 0, sizeof(STagCond));
}
......@@ -2318,16 +2375,21 @@ void tscDoQuery(SSqlObj* pSql) {
}
int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid) {
if (pTagCond->joinInfo.left.uid == uid) {
return pTagCond->joinInfo.left.tagColId;
} else if (pTagCond->joinInfo.right.uid == uid) {
return pTagCond->joinInfo.right.tagColId;
} else {
int32_t i = 0;
while (i < TSDB_MAX_JOIN_TABLE_NUM) {
SJoinNode* node = pTagCond->joinInfo.joinTables[i];
if (node && node->uid == uid) {
return node->tagColId;
}
i++;
}
assert(0);
return -1;
}
}
int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId) {
int32_t numOfTags = tscGetNumOfTags(pTableMeta);
......
......@@ -317,7 +317,7 @@ do { \
#define TSDB_MAX_DB_QUORUM_OPTION 2
#define TSDB_DEFAULT_DB_QUORUM_OPTION 1
#define TSDB_MAX_JOIN_TABLE_NUM 5
#define TSDB_MAX_JOIN_TABLE_NUM 10
#define TSDB_MAX_UNION_CLAUSE 5
#define TSDB_MAX_BINARY_LEN (TSDB_MAX_BYTES_PER_ROW-TSDB_KEYSIZE)
......
......@@ -2393,6 +2393,8 @@ void filterRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInf
break;
}
}
pRuntimeEnv->pQuery->current->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf);
} else {
for (int32_t i = 0; i < numOfRows; ++i) {
bool qualified = false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册