From f5710472d5a806a6ad8dca6e81b2ed414a193867 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 31 Oct 2020 16:18:21 +0800 Subject: [PATCH] [TD-1844] --- src/client/src/tscSubquery.c | 171 +++++++++++++++++++++++++---------- 1 file changed, 123 insertions(+), 48 deletions(-) diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 902ff989c7..d58b44b92f 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -32,11 +32,15 @@ typedef struct SInsertSupporter { static void freeJoinSubqueryObj(SSqlObj* pSql); static bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql); -static bool tsCompare(int32_t order, int64_t left, int64_t right) { +static int32_t tsCompare(int32_t order, int64_t left, int64_t right) { + if (left == right) { + return 0; + } + if (order == TSDB_ORDER_ASC) { - return left < right; + return left < right? -1:1; } else { - return left > right; + return left > right? -1:1; } } @@ -51,10 +55,10 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ SLimitVal* pLimit = &pQueryInfo->limit; int32_t order = pQueryInfo->order.order; - + SQueryInfo* pSubQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[0]->cmd, 0); SQueryInfo* pSubQueryInfo2 = tscGetQueryInfoDetail(&pSql->pSubs[1]->cmd, 0); - + pSubQueryInfo1->tsBuf = output1; pSubQueryInfo2->tsBuf = output2; @@ -88,59 +92,106 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ int64_t numOfInput1 = 1; int64_t numOfInput2 = 1; - while (1) { - STSElem elem1 = tsBufGetElem(pSupporter1->pTSBuf); - STSElem elem2 = tsBufGetElem(pSupporter2->pTSBuf); + int32_t numOfVnodes = 0; + int32_t* idList = NULL; + tsBufGetVnodeIdList(pSupporter2->pTSBuf, &numOfVnodes, &idList); -#ifdef _DEBUG_VIEW - tscInfo("%" PRId64 ", tags:%"PRId64" \t %" PRId64 ", tags:%"PRId64, elem1.ts, elem1.tag.i64Key, elem2.ts, elem2.tag.i64Key); -#endif + bool completed = false; + while(1) { + STSElem elem = tsBufGetElem(pSupporter1->pTSBuf); - int32_t res = tVariantCompare(elem1.tag, elem2.tag); - if (res == -1 || (res == 0 && tsCompare(order, elem1.ts, elem2.ts))) { - if (!tsBufNextPos(pSupporter1->pTSBuf)) { - break; - } + // no data in pSupporter1 anymore, jump out of loop + if (elem.vnode < 0 || completed) { + break; + } - numOfInput1++; - } else if ((res > 0) || (res == 0 && tsCompare(order, elem2.ts, elem1.ts))) { - if (!tsBufNextPos(pSupporter2->pTSBuf)) { + bool f = false; + for (int32_t i = 0; i < numOfVnodes; ++i) { + STSElem el = tsBufGetElemStartPos(pSupporter2->pTSBuf, idList[i], elem.tag); + if (el.vnode == idList[i]) { + f = true; break; } + } - numOfInput2++; - } else { - /* - * in case of stable query, limit/offset is not applied here. the limit/offset is applied to the - * final results which is acquired after the secondry merge of in the client. - */ - if (pLimit->offset == 0 || pQueryInfo->interval.interval > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) { - if (win->skey > elem1.ts) { - win->skey = elem1.ts; - } - - if (win->ekey < elem1.ts) { - win->ekey = elem1.ts; - } - - tsBufAppend(output1, elem1.vnode, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts)); - tsBufAppend(output2, elem2.vnode, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts)); + /** + * there are elements in pSupporter2 with the same tag, continue + */ + if (f) { + while (1) { + STSElem elem1 = tsBufGetElem(pSupporter1->pTSBuf); + STSElem elem2 = tsBufGetElem(pSupporter2->pTSBuf); + + /* + * in case of stable query, limit/offset is not applied here. the limit/offset is applied to the + * final results which is acquired after the secondry merge of in the client. + */ + int32_t re = tsCompare(order, elem1.ts, elem2.ts); + if (re < 0) { + if (!tsBufNextPos(pSupporter1->pTSBuf)) { + completed = true; + break; + } - } else { - pLimit->offset -= 1; - } + numOfInput1++; + } else if (re > 0) { + if (!tsBufNextPos(pSupporter2->pTSBuf)) { + completed = true; + break; + } - if (!tsBufNextPos(pSupporter1->pTSBuf)) { - break; + numOfInput2++; + } else { + if (pLimit->offset == 0 || pQueryInfo->interval.interval > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) { + if (win->skey > elem1.ts) { + win->skey = elem1.ts; + } + + if (win->ekey < elem1.ts) { + win->ekey = elem1.ts; + } + + tsBufAppend(output1, elem1.vnode, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts)); + tsBufAppend(output2, elem2.vnode, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts)); + } else { + pLimit->offset -= 1; + } + + if (!tsBufNextPos(pSupporter1->pTSBuf)) { + completed = true; + break; + } + + numOfInput1++; + + if (!tsBufNextPos(pSupporter2->pTSBuf)) { + completed = true; + break; + } + + numOfInput2++; + } } + } else { // no data in pSupporter2, ignore current data in pSupporter2 + tVariant tag = {0}; + tVariantAssign(&tag, elem.tag); - numOfInput1++; + // ignore all records with the same tag + while (tsBufNextPos(pSupporter1->pTSBuf)) { + STSElem el1 = tsBufGetElem(pSupporter1->pTSBuf); + int32_t res = tVariantCompare(el1.tag, &tag); - if (!tsBufNextPos(pSupporter2->pTSBuf)) { - break; + // it is a record with new tag + if (res != 0) { + break; + } } - numOfInput2++; + STSElem el1 = tsBufGetElem(pSupporter1->pTSBuf); + if (el1.vnode < 0) { // no data exists, abort + completed = true; + break; + } } } @@ -162,8 +213,9 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ TSKEY et = taosGetTimestampUs(); tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks " - "intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elasped time:%"PRId64" us", pSql, numOfInput1, numOfInput2, output1->numOfTotal, - output1->numOfVnodes, win->skey, win->ekey, tsBufGetNumOfVnodes(output1), et - st); + "intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elapsed time:%" PRId64 " us", + pSql, numOfInput1, numOfInput2, output1->numOfTotal, output1->numOfVnodes, win->skey, win->ekey, + tsBufGetNumOfVnodes(output1), et - st); return output1->numOfTotal; } @@ -517,18 +569,29 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr vgTables = taosArrayInit(4, sizeof(STableIdInfo)); info.itemList = vgTables; + + if (taosArrayGetSize(result) > 0) { + SVgroupTableInfo* prevGroup = taosArrayGet(result, taosArrayGetSize(result) - 1); + tscDebug("%p vgId:%d, tables:%"PRId64, pSql, prevGroup->vgInfo.vgId, taosArrayGetSize(prevGroup->itemList)); + } + taosArrayPush(result, &info); } STableIdInfo item = {.uid = tt->uid, .tid = tt->tid, .key = INT64_MIN}; taosArrayPush(vgTables, &item); - tscDebug("%p tid:%d, uid:%"PRIu64",vgId:%d added, total:%d", pSql, tt->tid, tt->uid, tt->vgId, (int32_t) taosArrayGetSize(vgTables)); + tscTrace("%p tid:%d, uid:%"PRIu64",vgId:%d added", pSql, tt->tid, tt->uid, tt->vgId); prev = tt; } pTableMetaInfo->pVgroupTables = result; pTableMetaInfo->vgroupIndex = 0; + + if (taosArrayGetSize(result) > 0) { + SVgroupTableInfo* g = taosArrayGet(result, taosArrayGetSize(result) - 1); + tscDebug("%p vgId:%d, tables:%"PRId64, pSql, g->vgInfo.vgId, taosArrayGetSize(g->itemList)); + } } static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* pParent) { @@ -656,6 +719,18 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar qsort((*s1)->pData, t1, size, tidTagsCompar); qsort((*s2)->pData, t2, size, tidTagsCompar); +#if 0 + for(int32_t k = 0; k < t1; ++k) { + STidTags* p = (*s1)->pData + size * k; + printf("%d, tag:%s\n", p->vgId, ((tstr*)(p->tag))->data); + } + + for(int32_t k = 0; k < t1; ++k) { + STidTags* p = (*s2)->pData + size * k; + printf("%d, tag:%s\n", p->vgId, ((tstr*)(p->tag))->data); + } +#endif + tscDebug("%p tags match complete, result: %"PRId64", %"PRId64, pParentSql, t1, t2); return TSDB_CODE_SUCCESS; } -- GitLab