diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 8ca313cca67d120920c666e7ab3d9b9177cf5de8..939bb6d964fd4aceaee62800462e7d8d956754dd 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -4341,6 +4341,65 @@ int32_t validateJoinNodes(SQueryInfo* pQueryInfo, SSqlObj* pSql) { return TSDB_CODE_SUCCESS; } + +void mergeJoinNodesImpl(int8_t* r, int8_t* p, int16_t* tidx, SJoinNode** nodes, int32_t type) { + SJoinNode *node = nodes[*tidx]; + SArray* arr = (type == 0) ? node->tsJoin : node->tagJoin; + int32_t size = taosArrayGetSize(arr); + + p[*tidx] = 1; + + for (int32_t j = 0; j < size; j++) { + int16_t* idx = taosArrayGet(arr, j); + r[*idx] = 1; + if (p[*idx] == 0) { + mergeJoinNodesImpl(r, p, idx, nodes, type); + } + } +} + +int32_t mergeJoinNodes(SQueryInfo* pQueryInfo) { + int8_t r[TSDB_MAX_JOIN_TABLE_NUM] = {0}; + int8_t p[TSDB_MAX_JOIN_TABLE_NUM] = {0}; + + for (int16_t i = 0; i < pQueryInfo->numOfTables; ++i) { + mergeJoinNodesImpl(r, p, &i, pQueryInfo->tagCond.joinInfo.joinTables, 0); + + taosArrayClear(pQueryInfo->tagCond.joinInfo.joinTables[i]->tsJoin); + + for (int32_t j = 0; j < TSDB_MAX_JOIN_TABLE_NUM; ++j) { + if (r[j]) { + taosArrayPush(pQueryInfo->tagCond.joinInfo.joinTables[i]->tsJoin, &j); + } + } + + memset(r, 0, sizeof(r)); + memset(p, 0, sizeof(p)); + } + + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { + for (int16_t i = 0; i < pQueryInfo->numOfTables; ++i) { + mergeJoinNodesImpl(r, p, &i, pQueryInfo->tagCond.joinInfo.joinTables, 1); + + taosArrayClear(pQueryInfo->tagCond.joinInfo.joinTables[i]->tagJoin); + + for (int32_t j = 0; j < TSDB_MAX_JOIN_TABLE_NUM; ++j) { + if (r[j]) { + taosArrayPush(pQueryInfo->tagCond.joinInfo.joinTables[i]->tagJoin, &j); + } + } + + memset(r, 0, sizeof(r)); + memset(p, 0, sizeof(p)); + } + + } + + return TSDB_CODE_SUCCESS; +} + + int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSql) { if (pExpr == NULL) { return TSDB_CODE_SUCCESS; @@ -4419,6 +4478,11 @@ int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SSqlObj* pSql if (ret) { goto PARSE_WHERE_EXIT; } + + ret = mergeJoinNodes(pQueryInfo); + if (ret) { + goto PARSE_WHERE_EXIT; + } } PARSE_WHERE_EXIT: diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 3bfed7abae6ea97b129eced75a90740c517cfebc..478099878566c1b0a4126a1a36c36ac8680f5ab7 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -118,156 +118,6 @@ static bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) { -static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJoinSupporter* pSupporter2, STimeWindow * win) { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); - - STSBuf* output1 = tsBufCreate(true, pQueryInfo->order.order); - STSBuf* output2 = tsBufCreate(true, pQueryInfo->order.order); - - win->skey = INT64_MAX; - win->ekey = INT64_MIN; - - SLimitVal* pLimit = &pQueryInfo->limit; - int32_t order = pQueryInfo->order.order; - - SQueryInfo* pSubQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[0]->cmd, 0); - SQueryInfo* pSubQueryInfo2 = tscGetQueryInfoDetail(&pSql->pSubs[1]->cmd, 0); - - pSubQueryInfo1->tsBuf = output1; - pSubQueryInfo2->tsBuf = output2; - - TSKEY st = taosGetTimestampUs(); - - // no result generated, return directly - if (pSupporter1->pTSBuf == NULL || pSupporter2->pTSBuf == NULL) { - tscDebug("%p at least one ts-comp is empty, 0 for secondary query after ts blocks intersecting", pSql); - return 0; - } - - tsBufResetPos(pSupporter1->pTSBuf); - tsBufResetPos(pSupporter2->pTSBuf); - - if (!tsBufNextPos(pSupporter1->pTSBuf)) { - tsBufFlush(output1); - tsBufFlush(output2); - - tscDebug("%p input1 is empty, 0 for secondary query after ts blocks intersecting", pSql); - return 0; - } - - if (!tsBufNextPos(pSupporter2->pTSBuf)) { - tsBufFlush(output1); - tsBufFlush(output2); - - tscDebug("%p input2 is empty, 0 for secondary query after ts blocks intersecting", pSql); - return 0; - } - - int64_t numOfInput1 = 1; - int64_t numOfInput2 = 1; - - while(1) { - STSElem elem = tsBufGetElem(pSupporter1->pTSBuf); - - // no data in pSupporter1 anymore, jump out of loop - if (!tsBufIsValidElem(&elem)) { - break; - } - - // find the data in supporter2 with the same tag value - STSElem e2 = tsBufFindElemStartPosByTag(pSupporter2->pTSBuf, elem.tag); - - /** - * there are elements in pSupporter2 with the same tag, continue - */ - tVariant tag1 = {0}; - tVariantAssign(&tag1, elem.tag); - - if (tsBufIsValidElem(&e2)) { - while (1) { - STSElem elem1 = tsBufGetElem(pSupporter1->pTSBuf); - STSElem elem2 = tsBufGetElem(pSupporter2->pTSBuf); - - // data with current are exhausted - if (!tsBufIsValidElem(&elem1) || tVariantCompare(elem1.tag, &tag1) != 0) { - break; - } - - if (!tsBufIsValidElem(&elem2) || tVariantCompare(elem2.tag, &tag1) != 0) { // ignore all records with the same tag - skipRemainValue(pSupporter1->pTSBuf, &tag1); - break; - } - - /* - * in case of stable query, limit/offset is not applied here. the limit/offset is applied to the - * final results which is acquired after the secondary merge of in the client. - */ - int32_t re = tsCompare(order, elem1.ts, elem2.ts); - if (re < 0) { - tsBufNextPos(pSupporter1->pTSBuf); - numOfInput1++; - } else if (re > 0) { - tsBufNextPos(pSupporter2->pTSBuf); - numOfInput2++; - } else { - if (pLimit->offset == 0 || pQueryInfo->interval.interval > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) { - if (win->skey > elem1.ts) { - win->skey = elem1.ts; - } - - if (win->ekey < elem1.ts) { - win->ekey = elem1.ts; - } - - tsBufAppend(output1, elem1.id, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts)); - tsBufAppend(output2, elem2.id, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts)); - } else { - pLimit->offset -= 1;//offset apply to projection? - } - - tsBufNextPos(pSupporter1->pTSBuf); - numOfInput1++; - - tsBufNextPos(pSupporter2->pTSBuf); - numOfInput2++; - } - } - } else { // no data in pSupporter2, ignore current data in pSupporter2 - skipRemainValue(pSupporter1->pTSBuf, &tag1); - } - } - - /* - * failed to set the correct ts order yet in two cases: - * 1. only one element - * 2. only one element for each tag. - */ - if (output1->tsOrder == -1) { - output1->tsOrder = TSDB_ORDER_ASC; - output2->tsOrder = TSDB_ORDER_ASC; - } - - tsBufFlush(output1); - tsBufFlush(output2); - - tsBufDestroy(pSupporter1->pTSBuf); - pSupporter1->pTSBuf = NULL; - tsBufDestroy(pSupporter2->pTSBuf); - pSupporter2->pTSBuf = NULL; - - TSKEY et = taosGetTimestampUs(); - tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks " - "intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elapsed time:%" PRId64 " us", - pSql, numOfInput1, numOfInput2, output1->numOfTotal, output1->numOfGroups, win->skey, win->ekey, - tsBufGetNumOfGroup(output1), et - st); - - return output1->numOfTotal; -} - - - - - static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); @@ -279,7 +129,19 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) { int32_t joinNum = pSql->subState.numOfSub; SMergeTsCtx ctxlist[TSDB_MAX_JOIN_TABLE_NUM] = {0}; SMergeTsCtx* ctxStack[TSDB_MAX_JOIN_TABLE_NUM] = {0}; - + int32_t slot = 0; + int32_t tableNum = 0; + int16_t* tableMIdx = 0; + int32_t equalNum = 0; + int32_t stackidx = 0; + SMergeTsCtx* ctx = NULL; + SMergeTsCtx* pctx = NULL; + SMergeTsCtx* mainCtx = NULL; + STSElem cur; + STSElem prev; + SArray* tsCond = NULL; + int32_t mergeDone = 0; + for (int32_t i = 0; i < joinNum; ++i) { STSBuf* output = tsBufCreate(true, pQueryInfo->order.order); SQueryInfo* pSubQueryInfo = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0); @@ -288,14 +150,14 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) { SJoinSupporter* pSupporter = pSql->pSubs[i]->param; - if (pSupporter[i]->pTSBuf == NULL) { + if (pSupporter->pTSBuf == NULL) { tscDebug("%p at least one ts-comp is empty, 0 for secondary query after ts blocks intersecting", pSql); return 0; } - tsBufResetPos(pSupporter[i]->pTSBuf); + tsBufResetPos(pSupporter->pTSBuf); - if (!tsBufNextPos(pSupporter[i]->pTSBuf)) { + if (!tsBufNextPos(pSupporter->pTSBuf)) { tscDebug("%p input1 is empty, 0 for secondary query after ts blocks intersecting", pSql); return 0; } @@ -306,215 +168,181 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) { TSKEY st = taosGetTimestampUs(); - - - - - - - - - int32_t slot = 0; - int32_t tableNum = 0; - int16_t* tableMIdx = 0; - int32_t equalNum = 0; - int32_t stackidx = 0; - int32_t mergeDone = 0; - SMergeCtx* ctx = NULL; - SMergeCtx* pctx = NULL; - STidTags* cur = NULL; - STidTags* prev = NULL; - SArray* tagCond = NULL; - for (int16_t tidx = 0; tidx < joinNum; tidx++) { pctx = &ctxlist[tidx]; if (pctx->compared) { continue; } - assert(pctx->idx == 0 && taosArrayGetSize(pctx->res) == 0); - - tagCond = pQueryInfo->tagCond.joinInfo.joinTables[tidx]->tagJoin; - - taosArrayInsert(tagCond, 0, &tidx); + assert(pctx->numOfInput == 0); - tableNum = taosArrayGetSize(tagCond); - assert(tableNum >= 1); - - prev = (STidTags*) varDataVal(pctx->p->pIdTagList + pctx->idx * pctx->p->tagSize); - pctx->compared = 1; + tsCond = pQueryInfo->tagCond.joinInfo.joinTables[tidx]->tsJoin; - ctxStack[stackidx++] = pctx; + tableNum = taosArrayGetSize(tsCond); + assert(tableNum >= 2); - tableMIdx = taosArrayGet(tagCond, ++slot); + for (int32_t i = 0; i < tableNum; ++i) { + tableMIdx = taosArrayGet(tsCond, i); + SMergeTsCtx* tctx = &ctxlist[*tableMIdx]; + tctx->compared = 1; + } - equalNum = 1; - - while (1) { - ctx = &ctxlist[*tableMIdx]; + tableMIdx = taosArrayGet(tsCond, 0); + pctx = &ctxlist[*tableMIdx]; - ctx->compared = 1; - - cur = (STidTags*) varDataVal(ctx->p->pIdTagList + ctx->idx * ctx->p->tagSize); + mainCtx = pctx; - assert(cur->tid != 0 && prev->tid != 0); + while (1) { + pctx = mainCtx; - ctxStack[stackidx++] = ctx; + prev = tsBufGetElem(pctx->p->pTSBuf); - int32_t ret = doCompare(prev->tag, cur->tag, pColSchema->type, pColSchema->bytes); - if (ret == 0) { - if (++equalNum < tableNum) { - prev = cur; - pctx = ctx; - - if (++slot >= tableNum) { - slot = 0; - } + ctxStack[stackidx++] = pctx; - tableMIdx = taosArrayGet(tagCond, slot); - continue; - } - - tscDebug("%p tag matched, vgId:%d, val:%d, tid:%d, uid:%"PRIu64", tid:%d, uid:%"PRIu64, pParentSql, prev->vgId, - *(int*) prev->tag, prev->tid, prev->uid, cur->tid, cur->uid); + if (!tsBufIsValidElem(&prev)) { + break; + } - assert(stackidx == tableNum); - - for (int32_t i = 0; i < stackidx; ++i) { - SMergeCtx* tctx = ctxStack[i]; - prev = (STidTags*) varDataVal(tctx->p->pIdTagList + tctx->idx * tctx->p->tagSize); + tVariant tag = {0}; + tVariantAssign(&tag, prev.tag); - taosArrayPush(tctx->res, &prev); - } + int32_t skipped = 0; - for (int32_t i = 0; i < stackidx; ++i) { - SMergeCtx* tctx = ctxStack[i]; + for (int32_t i = 1; i < tableNum; ++i) { + SMergeTsCtx* tctx = &ctxlist[i]; - if (++tctx->idx >= tctx->p->num) { - mergeDone = 1; - break; - } - } + // find the data in supporter2 with the same tag value + STSElem e2 = tsBufFindElemStartPosByTag(tctx->p->pTSBuf, &tag); - if (mergeDone) { + if (!tsBufIsValidElem(&e2)) { + skipRemainValue(pctx->p->pTSBuf, &tag); + skipped = 1; break; } + } + if (skipped) { + slot = 0; stackidx = 0; - equalNum = 1; - - prev = (STidTags*) varDataVal(pctx->p->pIdTagList + pctx->idx * pctx->p->tagSize); - - ctxStack[stackidx++] = pctx; - } else if (ret > 0) { - if (++ctx->idx >= ctx->p->num) { - break; - } - } else { - for (int32_t i = 0; i < stackidx; ++i) { - SMergeCtx* tctx = ctxStack[i]; - if (++tctx->idx >= tctx->p->num) { - mergeDone = 1; - break; - } - } - - if (mergeDone) { - break; - } - - stackidx = 0; - equalNum = 1; - - prev = (STidTags*) varDataVal(pctx->p->pIdTagList + pctx->idx * pctx->p->tagSize); - ctxStack[stackidx++] = pctx; + continue; } + + tableMIdx = taosArrayGet(tsCond, ++slot); + equalNum = 1; - } - - slot = 0; - mergeDone = 0; - } - - - - - - - - - int64_t numOfInput1 = 1; - int64_t numOfInput2 = 1; - - while(1) { - STSElem elem = tsBufGetElem(pSupporter1->pTSBuf); - - // no data in pSupporter1 anymore, jump out of loop - if (!tsBufIsValidElem(&elem)) { - break; - } - - // find the data in supporter2 with the same tag value - STSElem e2 = tsBufFindElemStartPosByTag(pSupporter2->pTSBuf, elem.tag); - - /** - * there are elements in pSupporter2 with the same tag, continue - */ - tVariant tag1 = {0}; - tVariantAssign(&tag1, elem.tag); - - if (tsBufIsValidElem(&e2)) { while (1) { - STSElem elem1 = tsBufGetElem(pSupporter1->pTSBuf); - STSElem elem2 = tsBufGetElem(pSupporter2->pTSBuf); + ctx = &ctxlist[*tableMIdx]; + + prev = tsBufGetElem(pctx->p->pTSBuf); + cur = tsBufGetElem(ctx->p->pTSBuf); // data with current are exhausted - if (!tsBufIsValidElem(&elem1) || tVariantCompare(elem1.tag, &tag1) != 0) { + if (!tsBufIsValidElem(&prev) || tVariantCompare(prev.tag, &tag) != 0) { break; } - if (!tsBufIsValidElem(&elem2) || tVariantCompare(elem2.tag, &tag1) != 0) { // ignore all records with the same tag - skipRemainValue(pSupporter1->pTSBuf, &tag1); + if (!tsBufIsValidElem(&cur) || tVariantCompare(cur.tag, &tag) != 0) { // ignore all records with the same tag break; } - /* - * in case of stable query, limit/offset is not applied here. the limit/offset is applied to the - * final results which is acquired after the secondary merge of in the client. - */ - int32_t re = tsCompare(order, elem1.ts, elem2.ts); - if (re < 0) { - tsBufNextPos(pSupporter1->pTSBuf); - numOfInput1++; - } else if (re > 0) { - tsBufNextPos(pSupporter2->pTSBuf); - numOfInput2++; - } else { - if (pLimit->offset == 0 || pQueryInfo->interval.interval > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) { - if (win->skey > elem1.ts) { - win->skey = elem1.ts; + ctxStack[stackidx++] = ctx; + + int32_t ret = tsCompare(order, prev.ts, cur.ts); + if (ret == 0) { + if (++equalNum < tableNum) { + pctx = ctx; + + if (++slot >= tableNum) { + slot = 0; } - if (win->ekey < elem1.ts) { - win->ekey = elem1.ts; + tableMIdx = taosArrayGet(tsCond, slot); + continue; + } + + assert(stackidx == tableNum); + + if (pLimit->offset == 0 || pQueryInfo->interval.interval > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) { + if (win->skey > prev.ts) { + win->skey = prev.ts; + } + + if (win->ekey < prev.ts) { + win->ekey = prev.ts; } - tsBufAppend(output1, elem1.id, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts)); - tsBufAppend(output2, elem2.id, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts)); + for (int32_t i = 0; i < stackidx; ++i) { + SMergeTsCtx* tctx = ctxStack[i]; + prev = tsBufGetElem(tctx->p->pTSBuf); + + tsBufAppend(tctx->res, prev.id, prev.tag, (const char*)&prev.ts, sizeof(prev.ts)); + } } else { pLimit->offset -= 1;//offset apply to projection? } - tsBufNextPos(pSupporter1->pTSBuf); - numOfInput1++; + for (int32_t i = 0; i < stackidx; ++i) { + SMergeTsCtx* tctx = ctxStack[i]; + + if (!tsBufNextPos(tctx->p->pTSBuf)) { + mergeDone = 1; + } + tctx->numOfInput++; + } + + if (mergeDone) { + break; + } + + stackidx = 0; + equalNum = 1; + + ctxStack[stackidx++] = pctx; + } else if (ret > 0) { + if (!tsBufNextPos(ctx->p->pTSBuf)) { + mergeDone = 1; + break; + } + + ctx->numOfInput++; + stackidx--; + } else { + stackidx--; + + for (int32_t i = 0; i < stackidx; ++i) { + SMergeTsCtx* tctx = ctxStack[i]; + + if (!tsBufNextPos(tctx->p->pTSBuf)) { + mergeDone = 1; + } + tctx->numOfInput++; + } + + if (mergeDone) { + break; + } - tsBufNextPos(pSupporter2->pTSBuf); - numOfInput2++; + stackidx = 0; + equalNum = 1; + + ctxStack[stackidx++] = pctx; } + + } + + if (mergeDone) { + break; } - } else { // no data in pSupporter2, ignore current data in pSupporter2 - skipRemainValue(pSupporter1->pTSBuf, &tag1); + + slot = 0; + stackidx = 0; + + skipRemainValue(mainCtx->p->pTSBuf, &tag); } + + stackidx = 0; + slot = 0; + mergeDone = 0; } /* @@ -522,26 +350,31 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, STimeWindow * win) { * 1. only one element * 2. only one element for each tag. */ - if (output1->tsOrder == -1) { - output1->tsOrder = TSDB_ORDER_ASC; - output2->tsOrder = TSDB_ORDER_ASC; + if (ctxlist[0].res->tsOrder == -1) { + for (int32_t i = 0; i < joinNum; ++i) { + ctxlist[i].res->tsOrder = TSDB_ORDER_ASC; + } } - tsBufFlush(output1); - tsBufFlush(output2); - - tsBufDestroy(pSupporter1->pTSBuf); - pSupporter1->pTSBuf = NULL; - tsBufDestroy(pSupporter2->pTSBuf); - pSupporter2->pTSBuf = NULL; + for (int32_t i = 0; i < joinNum; ++i) { + tsBufFlush(ctxlist[i].res); + + tsBufDestroy(ctxlist[i].p->pTSBuf); + ctxlist[i].p->pTSBuf = NULL; + } TSKEY et = taosGetTimestampUs(); - tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks " - "intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elapsed time:%" PRId64 " us", - pSql, numOfInput1, numOfInput2, output1->numOfTotal, output1->numOfGroups, win->skey, win->ekey, - tsBufGetNumOfGroup(output1), et - st); - return output1->numOfTotal; + for (int32_t i = 0; i < joinNum; ++i) { + tsBufFlush(ctxlist[i].res); + + tscDebug("%p tblidx:%d, input:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks " + "intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elapsed time:%" PRId64 " us", + pSql, i, ctxlist[i].numOfInput, ctxlist[i].res->numOfTotal, ctxlist[i].res->numOfGroups, win->skey, win->ekey, + tsBufGetNumOfGroup(ctxlist[i].res), et - st); + } + + return ctxlist[0].res->numOfTotal; } @@ -933,7 +766,7 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr size_t numOfTables = taosArrayGetSize(tables); for (size_t i = 0; i < numOfTables; i++) { - STidTags* tt = *(STidTags **)taosArrayGet(tables, i); + STidTags* tt = taosArrayGet(tables, i); if (prev == NULL || tt->vgId != prev->vgId) { SVgroupsInfo* pvg = pTableMetaInfo->vgroupList; @@ -1069,7 +902,7 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar SJoinSupporter* p = pParentSql->pSubs[i]->param; ctxlist[i].p = p; - ctxlist[i].res = taosArrayInit(p->num, sizeof(STidTags*)); + ctxlist[i].res = taosArrayInit(p->num, size); tscDebug("Join %d - num:%d", i, p->num); @@ -1106,13 +939,20 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar tagCond = pQueryInfo->tagCond.joinInfo.joinTables[tidx]->tagJoin; - taosArrayInsert(tagCond, 0, &tidx); - tableNum = taosArrayGetSize(tagCond); - assert(tableNum >= 1); + assert(tableNum >= 2); + + for (int32_t i = 0; i < tableNum; ++i) { + tableMIdx = taosArrayGet(tagCond, i); + SMergeCtx* tctx = &ctxlist[*tableMIdx]; + tctx->compared = 1; + } + + tableMIdx = taosArrayGet(tagCond, slot); + pctx = &ctxlist[*tableMIdx]; + prev = (STidTags*) varDataVal(pctx->p->pIdTagList + pctx->idx * pctx->p->tagSize); - pctx->compared = 1; ctxStack[stackidx++] = pctx; @@ -1122,8 +962,6 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar while (1) { ctx = &ctxlist[*tableMIdx]; - - ctx->compared = 1; cur = (STidTags*) varDataVal(ctx->p->pIdTagList + ctx->idx * ctx->p->tagSize); @@ -1154,7 +992,7 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar SMergeCtx* tctx = ctxStack[i]; prev = (STidTags*) varDataVal(tctx->p->pIdTagList + tctx->idx * tctx->p->tagSize); - taosArrayPush(tctx->res, &prev); + taosArrayPush(tctx->res, prev); } for (int32_t i = 0; i < stackidx; ++i) { @@ -1177,10 +1015,14 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar ctxStack[stackidx++] = pctx; } else if (ret > 0) { + stackidx--; + if (++ctx->idx >= ctx->p->num) { break; } } else { + stackidx--; + for (int32_t i = 0; i < stackidx; ++i) { SMergeCtx* tctx = ctxStack[i]; if (++tctx->idx >= tctx->p->num) { @@ -1204,6 +1046,7 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar slot = 0; mergeDone = 0; + stackidx = 0; } for (int32_t i = 0; i < joinNum; ++i) { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 13c2dc0af5a102bbc95caf681fe2cf44f2f8b315..0ca4653acdaee3b90ffcdfb579ab0ea977eb393e 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1292,6 +1292,7 @@ bool tscColumnExists(SArray* pColumnList, SColumnIndex* pColIndex) { while (i < numOfCols) { SColumn* pCol = taosArrayGetP(pColumnList, i); if ((pCol->colIndex.columnIndex != col) || (pCol->colIndex.tableIndex != pColIndex->tableIndex)) { + ++i; continue; } else { break;