未验证 提交 898c71fd 编写于 作者: M Minglei Jin 提交者: GitHub

Merge pull request #10876 from taosdata/fix/TS-1318-2.4

[TS-1318]<fix>: fixed unmatched tag columns in join queries
...@@ -662,6 +662,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -662,6 +662,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
} else { } else {
filterVgroupTables(pQueryInfo, pTableMetaInfo->pVgroupTables); filterVgroupTables(pQueryInfo, pTableMetaInfo->pVgroupTables);
} }
pQueryInfo->stableQuery = true;
} }
subquerySetState(pNew, &pSql->subState, i, 0); subquerySetState(pNew, &pSql->subState, i, 0);
...@@ -935,6 +936,96 @@ static void setTidTagType(SJoinSupporter* p, uint8_t type) { ...@@ -935,6 +936,96 @@ static void setTidTagType(SJoinSupporter* p, uint8_t type) {
} }
} }
static int32_t tidTagsMerge(SArray *arr, int32_t start, int32_t mid, int32_t end, const int32_t tagSize)
{
char *pTmp, *pRes = NULL;
char *result = NULL;
STidTags *pi, *pj, *p;
int32_t k = 0;
int32_t i = start;
int32_t j = mid + 1;
if (end - start > 0) {
result = calloc(1, (end - start + 1) * tagSize);
if (result == NULL) {
tscError("failed to allocate memory for tidTagsMerge");
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
pRes = result;
while (i <= mid && j <= end) {
pi = taosArrayGet(arr, i);
pj = taosArrayGet(arr, j);
if (pi->vgId <= pj->vgId) {
p = taosArrayGet(arr, i++);
memcpy(pRes, p, tagSize);
} else {
p = taosArrayGet(arr, j++);
memcpy(pRes, p, tagSize);
}
k++;
pRes += tagSize;
}
if (i == mid + 1) {
while(j <= end) {
p = taosArrayGet(arr, j++);
memcpy(pRes, p, tagSize);
k++;
pRes += tagSize;
}
}
if (j == end + 1) {
while (i <= mid) {
p = taosArrayGet(arr, i++);
memcpy(pRes, p, tagSize);
k++;
pRes += tagSize;
}
}
for (i = start, j = 0, pTmp = result; j < k; i++, j++, pTmp += tagSize) {
p = (STidTags *) taosArrayGet(arr, i);
memcpy(p, pTmp, tagSize);
}
if (result) {
tfree(result);
}
return TSDB_CODE_SUCCESS;
}
static int32_t tidTagsMergeSort(SArray *arr, int32_t start, int32_t end, const int32_t tagSize)
{
int32_t ret;
int32_t mid;
if (start >= end) {
return TSDB_CODE_SUCCESS;
}
mid = (start + end) / 2;
ret = tidTagsMergeSort(arr, start, mid, tagSize);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
ret = tidTagsMergeSort(arr, mid + 1, end, tagSize);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
return tidTagsMerge(arr, start, mid, end, tagSize);
}
static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray* resList) { static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray* resList) {
int16_t joinNum = pParentSql->subState.numOfSub; int16_t joinNum = pParentSql->subState.numOfSub;
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
...@@ -1124,7 +1215,10 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar ...@@ -1124,7 +1215,10 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar
// sort according to the tag value // sort according to the tag value
size_t num = taosArrayGetSize(ctxlist[i].res); size_t num = taosArrayGetSize(ctxlist[i].res);
qsort((ctxlist[i].res)->pData, num, size, tidTagsCompar); int32_t ret = tidTagsMergeSort(ctxlist[i].res, 0, num - 1, size);
if (ret != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
taosArrayPush(resList, &ctxlist[i].res); taosArrayPush(resList, &ctxlist[i].res);
......
...@@ -394,7 +394,7 @@ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo) { ...@@ -394,7 +394,7 @@ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo) {
} }
bool tscNeedTableSeqScan(SQueryInfo* pQueryInfo) { bool tscNeedTableSeqScan(SQueryInfo* pQueryInfo) {
return pQueryInfo->stableQuery && (tscQueryContainsFunction(pQueryInfo, TSDB_FUNC_TWA) || tscQueryContainsFunction(pQueryInfo, TSDB_FUNC_ELAPSED)); return pQueryInfo->stableQuery && (tscQueryContainsFunction(pQueryInfo, TSDB_FUNC_TWA) || tscQueryContainsFunction(pQueryInfo, TSDB_FUNC_ELAPSED) || (pQueryInfo->tsBuf != NULL));
} }
bool tscGetPointInterpQuery(SQueryInfo* pQueryInfo) { bool tscGetPointInterpQuery(SQueryInfo* pQueryInfo) {
......
...@@ -4832,7 +4832,7 @@ static int32_t setupQueryHandle(void* tsdb, SQueryRuntimeEnv* pRuntimeEnv, int64 ...@@ -4832,7 +4832,7 @@ static int32_t setupQueryHandle(void* tsdb, SQueryRuntimeEnv* pRuntimeEnv, int64
} }
STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window); STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window);
if (pQueryAttr->tsCompQuery || pQueryAttr->pointInterpQuery) { if (pQueryAttr->tsCompQuery || pQueryAttr->pointInterpQuery || pQueryAttr->needTableSeqScan) {
cond.type = BLOCK_LOAD_TABLE_SEQ_ORDER; cond.type = BLOCK_LOAD_TABLE_SEQ_ORDER;
} }
......
...@@ -183,7 +183,6 @@ static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle); ...@@ -183,7 +183,6 @@ static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock); static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock);
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
static int32_t tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win, STsdbQueryHandle* pQueryHandle); static int32_t tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win, STsdbQueryHandle* pQueryHandle);
static int32_t tsdbCheckInfoCompar(const void* key1, const void* key2);
static int32_t doGetExternalRow(STsdbQueryHandle* pQueryHandle, int16_t type, SMemRef* pMemRef); static int32_t doGetExternalRow(STsdbQueryHandle* pQueryHandle, int16_t type, SMemRef* pMemRef);
static void* doFreeColumnInfoData(SArray* pColumnInfoData); static void* doFreeColumnInfoData(SArray* pColumnInfoData);
static void* destroyTableCheckInfo(SArray* pTableCheckInfo); static void* destroyTableCheckInfo(SArray* pTableCheckInfo);
...@@ -333,8 +332,6 @@ static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STa ...@@ -333,8 +332,6 @@ static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STa
} }
} }
taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar);
size_t gsize = taosArrayGetSize(pTableCheckInfo); size_t gsize = taosArrayGetSize(pTableCheckInfo);
for (int32_t i = 0; i < gsize; ++i) { for (int32_t i = 0; i < gsize; ++i) {
...@@ -3913,17 +3910,6 @@ static int32_t tableGroupComparFn(const void *p1, const void *p2, const void *pa ...@@ -3913,17 +3910,6 @@ static int32_t tableGroupComparFn(const void *p1, const void *p2, const void *pa
return 0; return 0;
} }
static int tsdbCheckInfoCompar(const void* key1, const void* key2) {
if (((STableCheckInfo*)key1)->tableId.tid < ((STableCheckInfo*)key2)->tableId.tid) {
return -1;
} else if (((STableCheckInfo*)key1)->tableId.tid > ((STableCheckInfo*)key2)->tableId.tid) {
return 1;
} else {
ASSERT(false);
return 0;
}
}
void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTables, TSKEY skey, void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTables, TSKEY skey,
STableGroupSupporter* pSupp, __ext_compar_fn_t compareFn) { STableGroupSupporter* pSupp, __ext_compar_fn_t compareFn) {
STable* pTable = taosArrayGetP(pTableList, 0); STable* pTable = taosArrayGetP(pTableList, 0);
......
...@@ -172,27 +172,27 @@ int tsdbSetReadTable(SReadH *pReadh, STable *pTable) { ...@@ -172,27 +172,27 @@ int tsdbSetReadTable(SReadH *pReadh, STable *pTable) {
size_t size = taosArrayGetSize(pReadh->aBlkIdx); size_t size = taosArrayGetSize(pReadh->aBlkIdx);
if (size > 0) { if (size > 0) {
while (true) { int64_t left = 0, right = size - 1;
if (pReadh->cidx >= size) { while (left <= right) {
pReadh->pBlkIdx = NULL; int64_t mid = (left + right) / 2;
break; SBlockIdx *pBlkIdx = taosArrayGet(pReadh->aBlkIdx, (size_t)mid);
}
SBlockIdx *pBlkIdx = taosArrayGet(pReadh->aBlkIdx, pReadh->cidx);
if (pBlkIdx->tid == TABLE_TID(pTable)) { if (pBlkIdx->tid == TABLE_TID(pTable)) {
if (pBlkIdx->uid == TABLE_UID(pTable)) { if (pBlkIdx->uid == TABLE_UID(pTable)) {
pReadh->pBlkIdx = pBlkIdx; pReadh->pBlkIdx = pBlkIdx;
} else { } else {
pReadh->pBlkIdx = NULL; pReadh->pBlkIdx = NULL;
} }
pReadh->cidx++;
break;
} else if (pBlkIdx->tid > TABLE_TID(pTable)) {
pReadh->pBlkIdx = NULL;
break; break;
} else if (pBlkIdx->tid < TABLE_TID(pTable)) {
left = mid + 1;
} else { } else {
pReadh->cidx++; right = mid - 1;
}
} }
if (left > right) {
pReadh->pBlkIdx = NULL;
} }
} else { } else {
pReadh->pBlkIdx = NULL; pReadh->pBlkIdx = NULL;
......
...@@ -168,9 +168,9 @@ sql select * from $stb limit 2 offset $offset ...@@ -168,9 +168,9 @@ sql select * from $stb limit 2 offset $offset
if $rows != 2 then if $rows != 2 then
return -1 return -1
endi endi
if $data00 != @18-09-17 10:30:00.002@ then #if $data00 != @18-09-17 10:30:00.002@ then
return -1 # return -1
endi #endi
if $data01 != 9 then if $data01 != 9 then
return -1 return -1
endi endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册