提交 f5710472 编写于 作者: H Haojun Liao

[TD-1844]

上级 d371c840
......@@ -32,11 +32,15 @@ typedef struct SInsertSupporter {
static void freeJoinSubqueryObj(SSqlObj* pSql);
static bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql);
static bool tsCompare(int32_t order, int64_t left, int64_t right) {
static int32_t tsCompare(int32_t order, int64_t left, int64_t right) {
if (left == right) {
return 0;
}
if (order == TSDB_ORDER_ASC) {
return left < right;
return left < right? -1:1;
} else {
return left > right;
return left > right? -1:1;
}
}
......@@ -51,10 +55,10 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
SLimitVal* pLimit = &pQueryInfo->limit;
int32_t order = pQueryInfo->order.order;
SQueryInfo* pSubQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[0]->cmd, 0);
SQueryInfo* pSubQueryInfo2 = tscGetQueryInfoDetail(&pSql->pSubs[1]->cmd, 0);
pSubQueryInfo1->tsBuf = output1;
pSubQueryInfo2->tsBuf = output2;
......@@ -88,59 +92,106 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
int64_t numOfInput1 = 1;
int64_t numOfInput2 = 1;
while (1) {
STSElem elem1 = tsBufGetElem(pSupporter1->pTSBuf);
STSElem elem2 = tsBufGetElem(pSupporter2->pTSBuf);
int32_t numOfVnodes = 0;
int32_t* idList = NULL;
tsBufGetVnodeIdList(pSupporter2->pTSBuf, &numOfVnodes, &idList);
#ifdef _DEBUG_VIEW
tscInfo("%" PRId64 ", tags:%"PRId64" \t %" PRId64 ", tags:%"PRId64, elem1.ts, elem1.tag.i64Key, elem2.ts, elem2.tag.i64Key);
#endif
bool completed = false;
while(1) {
STSElem elem = tsBufGetElem(pSupporter1->pTSBuf);
int32_t res = tVariantCompare(elem1.tag, elem2.tag);
if (res == -1 || (res == 0 && tsCompare(order, elem1.ts, elem2.ts))) {
if (!tsBufNextPos(pSupporter1->pTSBuf)) {
break;
}
// no data in pSupporter1 anymore, jump out of loop
if (elem.vnode < 0 || completed) {
break;
}
numOfInput1++;
} else if ((res > 0) || (res == 0 && tsCompare(order, elem2.ts, elem1.ts))) {
if (!tsBufNextPos(pSupporter2->pTSBuf)) {
bool f = false;
for (int32_t i = 0; i < numOfVnodes; ++i) {
STSElem el = tsBufGetElemStartPos(pSupporter2->pTSBuf, idList[i], elem.tag);
if (el.vnode == idList[i]) {
f = true;
break;
}
}
numOfInput2++;
} else {
/*
* in case of stable query, limit/offset is not applied here. the limit/offset is applied to the
* final results which is acquired after the secondry merge of in the client.
*/
if (pLimit->offset == 0 || pQueryInfo->interval.interval > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) {
if (win->skey > elem1.ts) {
win->skey = elem1.ts;
}
if (win->ekey < elem1.ts) {
win->ekey = elem1.ts;
}
tsBufAppend(output1, elem1.vnode, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
tsBufAppend(output2, elem2.vnode, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts));
/**
* there are elements in pSupporter2 with the same tag, continue
*/
if (f) {
while (1) {
STSElem elem1 = tsBufGetElem(pSupporter1->pTSBuf);
STSElem elem2 = tsBufGetElem(pSupporter2->pTSBuf);
/*
* in case of stable query, limit/offset is not applied here. the limit/offset is applied to the
* final results which is acquired after the secondry merge of in the client.
*/
int32_t re = tsCompare(order, elem1.ts, elem2.ts);
if (re < 0) {
if (!tsBufNextPos(pSupporter1->pTSBuf)) {
completed = true;
break;
}
} else {
pLimit->offset -= 1;
}
numOfInput1++;
} else if (re > 0) {
if (!tsBufNextPos(pSupporter2->pTSBuf)) {
completed = true;
break;
}
if (!tsBufNextPos(pSupporter1->pTSBuf)) {
break;
numOfInput2++;
} else {
if (pLimit->offset == 0 || pQueryInfo->interval.interval > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) {
if (win->skey > elem1.ts) {
win->skey = elem1.ts;
}
if (win->ekey < elem1.ts) {
win->ekey = elem1.ts;
}
tsBufAppend(output1, elem1.vnode, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
tsBufAppend(output2, elem2.vnode, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts));
} else {
pLimit->offset -= 1;
}
if (!tsBufNextPos(pSupporter1->pTSBuf)) {
completed = true;
break;
}
numOfInput1++;
if (!tsBufNextPos(pSupporter2->pTSBuf)) {
completed = true;
break;
}
numOfInput2++;
}
}
} else { // no data in pSupporter2, ignore current data in pSupporter2
tVariant tag = {0};
tVariantAssign(&tag, elem.tag);
numOfInput1++;
// ignore all records with the same tag
while (tsBufNextPos(pSupporter1->pTSBuf)) {
STSElem el1 = tsBufGetElem(pSupporter1->pTSBuf);
int32_t res = tVariantCompare(el1.tag, &tag);
if (!tsBufNextPos(pSupporter2->pTSBuf)) {
break;
// it is a record with new tag
if (res != 0) {
break;
}
}
numOfInput2++;
STSElem el1 = tsBufGetElem(pSupporter1->pTSBuf);
if (el1.vnode < 0) { // no data exists, abort
completed = true;
break;
}
}
}
......@@ -162,8 +213,9 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
TSKEY et = taosGetTimestampUs();
tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks "
"intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elasped time:%"PRId64" us", pSql, numOfInput1, numOfInput2, output1->numOfTotal,
output1->numOfVnodes, win->skey, win->ekey, tsBufGetNumOfVnodes(output1), et - st);
"intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elapsed time:%" PRId64 " us",
pSql, numOfInput1, numOfInput2, output1->numOfTotal, output1->numOfVnodes, win->skey, win->ekey,
tsBufGetNumOfVnodes(output1), et - st);
return output1->numOfTotal;
}
......@@ -517,18 +569,29 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr
vgTables = taosArrayInit(4, sizeof(STableIdInfo));
info.itemList = vgTables;
if (taosArrayGetSize(result) > 0) {
SVgroupTableInfo* prevGroup = taosArrayGet(result, taosArrayGetSize(result) - 1);
tscDebug("%p vgId:%d, tables:%"PRId64, pSql, prevGroup->vgInfo.vgId, taosArrayGetSize(prevGroup->itemList));
}
taosArrayPush(result, &info);
}
STableIdInfo item = {.uid = tt->uid, .tid = tt->tid, .key = INT64_MIN};
taosArrayPush(vgTables, &item);
tscDebug("%p tid:%d, uid:%"PRIu64",vgId:%d added, total:%d", pSql, tt->tid, tt->uid, tt->vgId, (int32_t) taosArrayGetSize(vgTables));
tscTrace("%p tid:%d, uid:%"PRIu64",vgId:%d added", pSql, tt->tid, tt->uid, tt->vgId);
prev = tt;
}
pTableMetaInfo->pVgroupTables = result;
pTableMetaInfo->vgroupIndex = 0;
if (taosArrayGetSize(result) > 0) {
SVgroupTableInfo* g = taosArrayGet(result, taosArrayGetSize(result) - 1);
tscDebug("%p vgId:%d, tables:%"PRId64, pSql, g->vgInfo.vgId, taosArrayGetSize(g->itemList));
}
}
static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* pParent) {
......@@ -656,6 +719,18 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar
qsort((*s1)->pData, t1, size, tidTagsCompar);
qsort((*s2)->pData, t2, size, tidTagsCompar);
#if 0
for(int32_t k = 0; k < t1; ++k) {
STidTags* p = (*s1)->pData + size * k;
printf("%d, tag:%s\n", p->vgId, ((tstr*)(p->tag))->data);
}
for(int32_t k = 0; k < t1; ++k) {
STidTags* p = (*s2)->pData + size * k;
printf("%d, tag:%s\n", p->vgId, ((tstr*)(p->tag))->data);
}
#endif
tscDebug("%p tags match complete, result: %"PRId64", %"PRId64, pParentSql, t1, t2);
return TSDB_CODE_SUCCESS;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册