提交 bc8cffd2 编写于 作者: H Haojun Liao

[TD-1373]<fix>: fix bugs in super table join.

上级 f3425fc9
...@@ -23,7 +23,7 @@ extern "C" { ...@@ -23,7 +23,7 @@ extern "C" {
#include "tscUtil.h" #include "tscUtil.h"
#include "tsclient.h" #include "tsclient.h"
void tscFetchDatablockFromSubquery(SSqlObj* pSql); void tscFetchDatablockForSubquery(SSqlObj* pSql);
void tscSetupOutputColumnIndex(SSqlObj* pSql); void tscSetupOutputColumnIndex(SSqlObj* pSql);
void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code); void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code);
......
...@@ -228,6 +228,7 @@ void tscClearSubqueryInfo(SSqlCmd* pCmd); ...@@ -228,6 +228,7 @@ void tscClearSubqueryInfo(SSqlCmd* pCmd);
void tscFreeVgroupTableInfo(SArray* pVgroupTables); void tscFreeVgroupTableInfo(SArray* pVgroupTables);
SArray* tscVgroupTableInfoClone(SArray* pVgroupTables); SArray* tscVgroupTableInfoClone(SArray* pVgroupTables);
void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index); void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index);
void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo);
int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex); int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex);
int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo); int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo);
......
...@@ -339,9 +339,9 @@ typedef struct STscObj { ...@@ -339,9 +339,9 @@ typedef struct STscObj {
} STscObj; } STscObj;
typedef struct SSubqueryState { typedef struct SSubqueryState {
int32_t numOfRemain; // the number of remain unfinished subquery int32_t numOfRemain; // the number of remain unfinished subquery
int32_t numOfSub; // the number of total sub-queries int32_t numOfSub; // the number of total sub-queries
uint64_t numOfRetrievedRows; // total number of points in this query uint64_t numOfRetrievedRows; // total number of points in this query
} SSubqueryState; } SSubqueryState;
typedef struct SSqlObj { typedef struct SSqlObj {
...@@ -515,7 +515,6 @@ extern SRpcCorEpSet tscMgmtEpSet; ...@@ -515,7 +515,6 @@ extern SRpcCorEpSet tscMgmtEpSet;
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
int32_t tscCompareTidTags(const void* p1, const void* p2);
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables); void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -176,7 +176,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo ...@@ -176,7 +176,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
} }
if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) { if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
tscFetchDatablockFromSubquery(pSql); tscFetchDatablockForSubquery(pSql);
} else { } else {
tscProcessSql(pSql); tscProcessSql(pSql);
} }
...@@ -226,7 +226,7 @@ void taos_fetch_rows_a(TAOS_RES *taosa, __async_cb_func_t fp, void *param) { ...@@ -226,7 +226,7 @@ void taos_fetch_rows_a(TAOS_RES *taosa, __async_cb_func_t fp, void *param) {
// handle the sub queries of join query // handle the sub queries of join query
if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) { if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
tscFetchDatablockFromSubquery(pSql); tscFetchDatablockForSubquery(pSql);
} else if (pRes->completed) { } else if (pRes->completed) {
if(pCmd->command == TSDB_SQL_FETCH || (pCmd->command >= TSDB_SQL_SERV_STATUS && pCmd->command <= TSDB_SQL_CURRENT_USER)) { if(pCmd->command == TSDB_SQL_FETCH || (pCmd->command >= TSDB_SQL_SERV_STATUS && pCmd->command <= TSDB_SQL_CURRENT_USER)) {
if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes. if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
......
...@@ -1632,7 +1632,7 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t ...@@ -1632,7 +1632,7 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
} }
static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSchema* pSchema, SConvertFunc cvtFunc, static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSchema* pSchema, SConvertFunc cvtFunc,
tSQLExprItem* item, int32_t resColIdx, SColumnIndex* pColIndex, bool finalResult) { const char* name, int32_t resColIdx, SColumnIndex* pColIndex, bool finalResult) {
const char* msg1 = "not support column types"; const char* msg1 = "not support column types";
int16_t type = 0; int16_t type = 0;
...@@ -1640,7 +1640,7 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS ...@@ -1640,7 +1640,7 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS
int32_t functionID = cvtFunc.execFuncId; int32_t functionID = cvtFunc.execFuncId;
if (functionID == TSDB_FUNC_SPREAD) { if (functionID == TSDB_FUNC_SPREAD) {
int32_t t1 = pSchema[pColIndex->columnIndex].type; int32_t t1 = pSchema->type;
if (t1 == TSDB_DATA_TYPE_BINARY || t1 == TSDB_DATA_TYPE_NCHAR || t1 == TSDB_DATA_TYPE_BOOL) { if (t1 == TSDB_DATA_TYPE_BINARY || t1 == TSDB_DATA_TYPE_NCHAR || t1 == TSDB_DATA_TYPE_BOOL) {
invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
return -1; return -1;
...@@ -1649,17 +1649,12 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS ...@@ -1649,17 +1649,12 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS
bytes = tDataTypeDesc[type].nSize; bytes = tDataTypeDesc[type].nSize;
} }
} else { } else {
type = pSchema[pColIndex->columnIndex].type; type = pSchema->type;
bytes = pSchema[pColIndex->columnIndex].bytes; bytes = pSchema->bytes;
} }
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, functionID, pColIndex, type, bytes, bytes, false); SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, functionID, pColIndex, type, bytes, bytes, false);
if (item->aliasName != NULL) { tstrncpy(pExpr->aliasName, name, tListLen(pExpr->aliasName));
tstrncpy(pExpr->aliasName, item->aliasName, tListLen(pExpr->aliasName));
} else {
int32_t len = MIN(tListLen(pExpr->aliasName), item->pNode->token.n + 1);
tstrncpy(pExpr->aliasName, item->pNode->token.z, len);
}
if (cvtFunc.originFuncId == TSDB_FUNC_LAST_ROW && cvtFunc.originFuncId != functionID) { if (cvtFunc.originFuncId == TSDB_FUNC_LAST_ROW && cvtFunc.originFuncId != functionID) {
pExpr->colInfo.flag |= TSDB_COL_NULL; pExpr->colInfo.flag |= TSDB_COL_NULL;
...@@ -1687,6 +1682,18 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS ...@@ -1687,6 +1682,18 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void setResultColName(char* name, tSQLExprItem* pItem, int32_t functionId, SStrToken* pToken) {
if (pItem->aliasName != NULL) {
tstrncpy(name, pItem->aliasName, TSDB_COL_NAME_LEN);
} else {
char uname[TSDB_COL_NAME_LEN] = {0};
int32_t len = MIN(pToken->n + 1, TSDB_COL_NAME_LEN);
tstrncpy(uname, pToken->z, len);
snprintf(name, TSDB_COL_NAME_LEN - 1, "%s(%s)", aAggs[functionId].aName, uname);
}
}
int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExprItem* pItem, bool finalResult) { int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExprItem* pItem, bool finalResult) {
STableMetaInfo* pTableMetaInfo = NULL; STableMetaInfo* pTableMetaInfo = NULL;
int32_t optr = pItem->pNode->nSQLOptr; int32_t optr = pItem->pNode->nSQLOptr;
...@@ -1954,9 +1961,13 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -1954,9 +1961,13 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta); SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
char name[TSDB_COL_NAME_LEN] = {0};
for (int32_t j = 0; j < tscGetNumOfColumns(pTableMetaInfo->pTableMeta); ++j) { for (int32_t j = 0; j < tscGetNumOfColumns(pTableMetaInfo->pTableMeta); ++j) {
index.columnIndex = j; index.columnIndex = j;
if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem, colIndex++, &index, finalResult) != 0) { SStrToken t = {.z = pSchema[j].name, .n = strnlen(pSchema[j].name, TSDB_COL_NAME_LEN)};
setResultColName(name, pItem, cvtFunc.originFuncId, &t);
if (setExprInfoForFunctions(pCmd, pQueryInfo, &pSchema[j], cvtFunc, name, colIndex++, &index, finalResult) != 0) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
} }
...@@ -1967,14 +1978,18 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -1967,14 +1978,18 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
} }
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
// functions can not be applied to tags // functions can not be applied to tags
if ((index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) || (index.columnIndex < 0)) { if ((index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) || (index.columnIndex < 0)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6);
} }
if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem, colIndex + i, &index, finalResult) != 0) { char name[TSDB_COL_NAME_LEN] = {0};
SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex);
setResultColName(name, pItem, cvtFunc.originFuncId, &pParamElem->pNode->colInfo);
if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, name, colIndex + i, &index, finalResult) != 0) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
...@@ -2011,7 +2026,12 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -2011,7 +2026,12 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
for (int32_t i = 0; i < tscGetNumOfColumns(pTableMetaInfo->pTableMeta); ++i) { for (int32_t i = 0; i < tscGetNumOfColumns(pTableMetaInfo->pTableMeta); ++i) {
SColumnIndex index = {.tableIndex = j, .columnIndex = i}; SColumnIndex index = {.tableIndex = j, .columnIndex = i};
if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, pItem, colIndex, &index, finalResult) != 0) {
char name[TSDB_COL_NAME_LEN] = {0};
SStrToken t = {.z = pSchema->name, .n = strnlen(pSchema->name, TSDB_COL_NAME_LEN)};
setResultColName(name, pItem, cvtFunc.originFuncId, &t);
if (setExprInfoForFunctions(pCmd, pQueryInfo, &pSchema[index.columnIndex], cvtFunc, name, colIndex, &index, finalResult) != 0) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
...@@ -3458,7 +3478,7 @@ static int32_t validateArithmeticSQLExpr(SSqlCmd* pCmd, tSQLExpr* pExpr, SQueryI ...@@ -3458,7 +3478,7 @@ static int32_t validateArithmeticSQLExpr(SSqlCmd* pCmd, tSQLExpr* pExpr, SQueryI
} }
// the expression not from the same table, return error // the expression not from the same table, return error
if (uidLeft != uidRight) { if (uidLeft != uidRight && uidLeft != 0 && uidRight != 0) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
} }
......
...@@ -880,8 +880,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -880,8 +880,9 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->tsOffset = htonl((int32_t)(pMsg - pCmd->payload)); pQueryMsg->tsOffset = htonl((int32_t)(pMsg - pCmd->payload));
if (pQueryInfo->tsBuf != NULL) { if (pQueryInfo->tsBuf != NULL) {
int32_t vnodeId = htonl(pQueryMsg->head.vgId); // note: here used the index instead of actual vnode id.
int32_t code = dumpFileBlockByVnodeId(pQueryInfo->tsBuf, vnodeId, pMsg, &pQueryMsg->tsLen, &pQueryMsg->tsNumOfBlocks); int32_t vnodeIndex = pTableMetaInfo->vgroupIndex;
int32_t code = dumpFileBlockByVnodeId(pQueryInfo->tsBuf, vnodeIndex, pMsg, &pQueryMsg->tsLen, &pQueryMsg->tsNumOfBlocks);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
......
...@@ -44,6 +44,17 @@ static int32_t tsCompare(int32_t order, int64_t left, int64_t right) { ...@@ -44,6 +44,17 @@ static int32_t tsCompare(int32_t order, int64_t left, int64_t right) {
} }
} }
static void skipRemainValue(STSBuf* pTSBuf, tVariant* tag1) {
while (tsBufNextPos(pTSBuf)) {
STSElem el1 = tsBufGetElem(pTSBuf);
int32_t res = tVariantCompare(el1.tag, tag1);
if (res != 0) { // it is a record with new tag
return;
}
}
}
static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJoinSupporter* pSupporter2, STimeWindow * win) { static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJoinSupporter* pSupporter2, STimeWindow * win) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
...@@ -92,54 +103,48 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ ...@@ -92,54 +103,48 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
int64_t numOfInput1 = 1; int64_t numOfInput1 = 1;
int64_t numOfInput2 = 1; int64_t numOfInput2 = 1;
int32_t numOfVnodes = 0;
int32_t* idList = NULL;
tsBufGetVnodeIdList(pSupporter2->pTSBuf, &numOfVnodes, &idList);
bool completed = false;
while(1) { while(1) {
STSElem elem = tsBufGetElem(pSupporter1->pTSBuf); STSElem elem = tsBufGetElem(pSupporter1->pTSBuf);
// no data in pSupporter1 anymore, jump out of loop // no data in pSupporter1 anymore, jump out of loop
if (elem.vnode < 0 || completed) { if (!tsBufIsValidElem(&elem)) {
break; break;
} }
bool f = false; // find the data in supporter2 with the same tag value
for (int32_t i = 0; i < numOfVnodes; ++i) { STSElem e2 = tsBufFindElemStartPosByTag(pSupporter2->pTSBuf, elem.tag);
STSElem el = tsBufGetElemStartPos(pSupporter2->pTSBuf, idList[i], elem.tag);
if (el.vnode == idList[i]) {
f = true;
break;
}
}
/** /**
* there are elements in pSupporter2 with the same tag, continue * there are elements in pSupporter2 with the same tag, continue
*/ */
if (f) { tVariant tag1 = {0};
tVariantAssign(&tag1, elem.tag);
if (tsBufIsValidElem(&e2)) {
while (1) { while (1) {
STSElem elem1 = tsBufGetElem(pSupporter1->pTSBuf); STSElem elem1 = tsBufGetElem(pSupporter1->pTSBuf);
STSElem elem2 = tsBufGetElem(pSupporter2->pTSBuf); STSElem elem2 = tsBufGetElem(pSupporter2->pTSBuf);
// data with current are exhausted
if (!tsBufIsValidElem(&elem1) || tVariantCompare(elem1.tag, &tag1) != 0) {
break;
}
if (!tsBufIsValidElem(&elem2) || tVariantCompare(elem2.tag, &tag1) != 0) { // ignore all records with the same tag
skipRemainValue(pSupporter1->pTSBuf, &tag1);
break;
}
/* /*
* in case of stable query, limit/offset is not applied here. the limit/offset is applied to the * in case of stable query, limit/offset is not applied here. the limit/offset is applied to the
* final results which is acquired after the secondry merge of in the client. * final results which is acquired after the secondary merge of in the client.
*/ */
int32_t re = tsCompare(order, elem1.ts, elem2.ts); int32_t re = tsCompare(order, elem1.ts, elem2.ts);
if (re < 0) { if (re < 0) {
if (!tsBufNextPos(pSupporter1->pTSBuf)) { tsBufNextPos(pSupporter1->pTSBuf);
completed = true;
break;
}
numOfInput1++; numOfInput1++;
} else if (re > 0) { } else if (re > 0) {
if (!tsBufNextPos(pSupporter2->pTSBuf)) { tsBufNextPos(pSupporter2->pTSBuf);
completed = true;
break;
}
numOfInput2++; numOfInput2++;
} else { } else {
if (pLimit->offset == 0 || pQueryInfo->interval.interval > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) { if (pLimit->offset == 0 || pQueryInfo->interval.interval > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) {
...@@ -154,43 +159,18 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ ...@@ -154,43 +159,18 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
tsBufAppend(output1, elem1.vnode, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts)); tsBufAppend(output1, elem1.vnode, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
tsBufAppend(output2, elem2.vnode, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts)); tsBufAppend(output2, elem2.vnode, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts));
} else { } else {
pLimit->offset -= 1; pLimit->offset -= 1;//offset apply to projection?
}
if (!tsBufNextPos(pSupporter1->pTSBuf)) {
completed = true;
break;
} }
tsBufNextPos(pSupporter1->pTSBuf);
numOfInput1++; numOfInput1++;
if (!tsBufNextPos(pSupporter2->pTSBuf)) { tsBufNextPos(pSupporter2->pTSBuf);
completed = true;
break;
}
numOfInput2++; numOfInput2++;
} }
} }
} else { // no data in pSupporter2, ignore current data in pSupporter2 } else { // no data in pSupporter2, ignore current data in pSupporter2
tVariant tag = {0}; skipRemainValue(pSupporter1->pTSBuf, &tag1);
tVariantAssign(&tag, elem.tag);
// ignore all records with the same tag
while (tsBufNextPos(pSupporter1->pTSBuf)) {
STSElem el1 = tsBufGetElem(pSupporter1->pTSBuf);
int32_t res = tVariantCompare(el1.tag, &tag);
// it is a record with new tag
if (res != 0) {
break;
}
}
STSElem el1 = tsBufGetElem(pSupporter1->pTSBuf);
if (el1.vnode < 0) { // no data exists, abort
break;
}
} }
} }
...@@ -299,6 +279,68 @@ static UNUSED_FUNC bool needSecondaryQuery(SQueryInfo* pQueryInfo) { ...@@ -299,6 +279,68 @@ static UNUSED_FUNC bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
return false; return false;
} }
static void filterVgroupTables(SQueryInfo* pQueryInfo, SArray* pVgroupTables) {
int32_t num = 0;
int32_t* list = NULL;
tsBufGetVnodeIdList(pQueryInfo->tsBuf, &num, &list);
// The virtual node, of which all tables are disqualified after the timestamp intersection,
// is removed to avoid next stage query.
// TODO: If tables from some vnodes are not qualified for next stage query, discard them.
for (int32_t k = 0; k < taosArrayGetSize(pVgroupTables);) {
SVgroupTableInfo* p = taosArrayGet(pVgroupTables, k);
bool found = false;
for (int32_t f = 0; f < num; ++f) {
if (p->vgInfo.vgId == list[f]) {
found = true;
break;
}
}
if (!found) {
tscRemoveVgroupTableGroup(pVgroupTables, k);
} else {
k++;
}
}
assert(taosArrayGetSize(pVgroupTables) > 0);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
taosTFree(list);
}
static SArray* buildVgroupTableByResult(SQueryInfo* pQueryInfo, SArray* pVgroupTables) {
int32_t num = 0;
int32_t* list = NULL;
tsBufGetVnodeIdList(pQueryInfo->tsBuf, &num, &list);
int32_t numOfGroups = taosArrayGetSize(pVgroupTables);
SArray* pNew = taosArrayInit(num, sizeof(SVgroupTableInfo));
SVgroupTableInfo info;
for (int32_t i = 0; i < num; ++i) {
int32_t vnodeId = list[i];
for (int32_t j = 0; j < numOfGroups; ++j) {
SVgroupTableInfo* p1 = taosArrayGet(pVgroupTables, j);
if (p1->vgInfo.vgId == vnodeId) {
tscVgroupTableCopy(&info, p1);
break;
}
}
taosArrayPush(pNew, &info);
}
taosTFree(list);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
return pNew;
}
/* /*
* launch secondary stage query to fetch the result that contains timestamp in set * launch secondary stage query to fetch the result that contains timestamp in set
*/ */
...@@ -373,12 +415,11 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -373,12 +415,11 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
pQueryInfo->fieldsInfo = pSupporter->fieldsInfo; pQueryInfo->fieldsInfo = pSupporter->fieldsInfo;
pQueryInfo->groupbyExpr = pSupporter->groupInfo; pQueryInfo->groupbyExpr = pSupporter->groupInfo;
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); assert(pNew->subState.numOfSub == 0 && pNew->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1);
assert(pNew->subState.numOfSub == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
tscFieldInfoUpdateOffset(pNewQueryInfo); tscFieldInfoUpdateOffset(pQueryInfo);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
pTableMetaInfo->pVgroupTables = pSupporter->pVgroupTables; pTableMetaInfo->pVgroupTables = pSupporter->pVgroupTables;
pSupporter->exprList = NULL; pSupporter->exprList = NULL;
...@@ -392,7 +433,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -392,7 +433,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
* during the timestamp intersection. * during the timestamp intersection.
*/ */
pSupporter->limit = pQueryInfo->limit; pSupporter->limit = pQueryInfo->limit;
pNewQueryInfo->limit = pSupporter->limit; pQueryInfo->limit = pSupporter->limit;
SColumnIndex index = {.tableIndex = 0, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; SColumnIndex index = {.tableIndex = 0, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
SSchema* s = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, 0); SSchema* s = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, 0);
...@@ -407,7 +448,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -407,7 +448,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
tscAddSpecialColumnForSelect(pQueryInfo, 0, functionId, &index, s, TSDB_COL_NORMAL); tscAddSpecialColumnForSelect(pQueryInfo, 0, functionId, &index, s, TSDB_COL_NORMAL);
tscPrintSelectClause(pNew, 0); tscPrintSelectClause(pNew, 0);
tscFieldInfoUpdateOffset(pNewQueryInfo); tscFieldInfoUpdateOffset(pQueryInfo);
pExpr = tscSqlExprGet(pQueryInfo, 0); pExpr = tscSqlExprGet(pQueryInfo, 0);
} }
...@@ -422,39 +463,21 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -422,39 +463,21 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
pExpr->numOfParams = 1; pExpr->numOfParams = 1;
} }
int32_t num = 0; if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
int32_t *list = NULL; assert(pTableMetaInfo->pVgroupTables != NULL);
tsBufGetVnodeIdList(pNewQueryInfo->tsBuf, &num, &list); if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
SArray* p = buildVgroupTableByResult(pQueryInfo, pTableMetaInfo->pVgroupTables);
if (pTableMetaInfo->pVgroupTables != NULL) { tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables);
for(int32_t k = 0; k < taosArrayGetSize(pTableMetaInfo->pVgroupTables);) { pTableMetaInfo->pVgroupTables = p;
SVgroupTableInfo* p = taosArrayGet(pTableMetaInfo->pVgroupTables, k); } else {
filterVgroupTables(pQueryInfo, pTableMetaInfo->pVgroupTables);
bool found = false;
for(int32_t f = 0; f < num; ++f) {
if (p->vgInfo.vgId == list[f]) {
found = true;
break;
}
}
if (!found) {
tscRemoveVgroupTableGroup(pTableMetaInfo->pVgroupTables, k);
} else {
k++;
}
} }
assert(taosArrayGetSize(pTableMetaInfo->pVgroupTables) > 0);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
} }
taosTFree(list); size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
tscDebug("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s", tscDebug("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s",
pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, taosArrayGetSize(pNewQueryInfo->exprList), pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pQueryInfo->type, taosArrayGetSize(pQueryInfo->exprList),
numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, pTableMetaInfo->name); numOfCols, pQueryInfo->fieldsInfo.numOfOutput, pTableMetaInfo->name);
} }
//prepare the subqueries object failed, abort //prepare the subqueries object failed, abort
...@@ -1034,6 +1057,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR ...@@ -1034,6 +1057,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
} }
} }
assert(pState->numOfRemain > 0);
if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) { if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) {
tscDebug("%p sub:%p completed, remain:%d, total:%d", pParentSql, tres, pState->numOfRemain, pState->numOfSub); tscDebug("%p sub:%p completed, remain:%d, total:%d", pParentSql, tres, pState->numOfRemain, pState->numOfSub);
return; return;
...@@ -1047,7 +1071,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR ...@@ -1047,7 +1071,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
} }
// update the records for each subquery in parent sql object. // update the records for each subquery in parent sql object.
bool isSTableSub = tscIsTwoStageSTableQuery(pQueryInfo, 0); bool stableQuery = tscIsTwoStageSTableQuery(pQueryInfo, 0);
for (int32_t i = 0; i < pState->numOfSub; ++i) { for (int32_t i = 0; i < pState->numOfSub; ++i) {
if (pParentSql->pSubs[i] == NULL) { if (pParentSql->pSubs[i] == NULL) {
tscDebug("%p %p sub:%d not retrieve data", pParentSql, NULL, i); tscDebug("%p %p sub:%d not retrieve data", pParentSql, NULL, i);
...@@ -1061,7 +1085,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR ...@@ -1061,7 +1085,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
pRes1->numOfRows, pRes1->numOfTotal); pRes1->numOfRows, pRes1->numOfTotal);
assert(pRes1->row < pRes1->numOfRows); assert(pRes1->row < pRes1->numOfRows);
} else { } else {
if (!isSTableSub) { if (!stableQuery) {
pRes1->numOfClauseTotal += pRes1->numOfRows; pRes1->numOfClauseTotal += pRes1->numOfRows;
} }
...@@ -1074,7 +1098,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR ...@@ -1074,7 +1098,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
tscBuildResFromSubqueries(pParentSql); tscBuildResFromSubqueries(pParentSql);
} }
void tscFetchDatablockFromSubquery(SSqlObj* pSql) { void tscFetchDatablockForSubquery(SSqlObj* pSql) {
assert(pSql->subState.numOfSub >= 1); assert(pSql->subState.numOfSub >= 1);
int32_t numOfFetch = 0; int32_t numOfFetch = 0;
...@@ -1136,11 +1160,22 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { ...@@ -1136,11 +1160,22 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
if (numOfFetch <= 0) { if (numOfFetch <= 0) {
bool tryNextVnode = false; bool tryNextVnode = false;
SSqlObj* pp = pSql->pSubs[0]; bool orderedPrjQuery = false;
SQueryInfo* pi = tscGetQueryInfoDetail(&pp->cmd, 0); for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
if (pSub == NULL) {
continue;
}
SQueryInfo* p = tscGetQueryInfoDetail(&pSub->cmd, 0);
orderedPrjQuery = tscNonOrderedProjectionQueryOnSTable(p, 0);
if (orderedPrjQuery) {
break;
}
}
// get the number of subquery that need to retrieve the next vnode. // get the number of subquery that need to retrieve the next vnode.
if (tscNonOrderedProjectionQueryOnSTable(pi, 0)) { if (orderedPrjQuery) {
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = pSql->pSubs[i];
if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) { if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) {
...@@ -1244,7 +1279,6 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) { ...@@ -1244,7 +1279,6 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
tscDebug("%p all subquery response, retrieve data for subclause:%d", pSql, pCmd->clauseIndex);
// the column transfer support struct has been built // the column transfer support struct has been built
if (pRes->pColumnIndex != NULL) { if (pRes->pColumnIndex != NULL) {
...@@ -1340,21 +1374,23 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { ...@@ -1340,21 +1374,23 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
return; return;
} }
// wait for the other subqueries response from vnode
if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
return;
// In case of consequence query from other vnode, do not wait for other query response here.
if (!(pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0))) {
if (atomic_sub_fetch_32(&pParentSql->subState.numOfRemain, 1) > 0) {
return;
}
} }
tscSetupOutputColumnIndex(pParentSql); tscSetupOutputColumnIndex(pParentSql);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
/** /**
* if the query is a continue query (vgroupIndex > 0 for projection query) for next vnode, do the retrieval of * if the query is a continue query (vgroupIndex > 0 for projection query) for next vnode, do the retrieval of
* data instead of returning to its invoker * data instead of returning to its invoker
*/ */
if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
// pParentSql->subState.numOfRemain = pParentSql->subState.numOfSub; // reset the record value
pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data
pSql->cmd.command = TSDB_SQL_FETCH; pSql->cmd.command = TSDB_SQL_FETCH;
tscProcessSql(pSql); tscProcessSql(pSql);
......
...@@ -1696,6 +1696,17 @@ void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index) { ...@@ -1696,6 +1696,17 @@ void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index) {
taosArrayRemove(pVgroupTable, index); taosArrayRemove(pVgroupTable, index);
} }
void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo) {
memset(info, 0, sizeof(SVgroupTableInfo));
info->vgInfo = pInfo->vgInfo;
for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) {
info->vgInfo.epAddr[j].fqdn = strdup(pInfo->vgInfo.epAddr[j].fqdn);
}
info->itemList = taosArrayClone(pInfo->itemList);
}
SArray* tscVgroupTableInfoClone(SArray* pVgroupTables) { SArray* tscVgroupTableInfoClone(SArray* pVgroupTables) {
if (pVgroupTables == NULL) { if (pVgroupTables == NULL) {
return NULL; return NULL;
...@@ -1707,14 +1718,8 @@ SArray* tscVgroupTableInfoClone(SArray* pVgroupTables) { ...@@ -1707,14 +1718,8 @@ SArray* tscVgroupTableInfoClone(SArray* pVgroupTables) {
SVgroupTableInfo info; SVgroupTableInfo info;
for (size_t i = 0; i < num; i++) { for (size_t i = 0; i < num; i++) {
SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTables, i); SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTables, i);
memset(&info, 0, sizeof(SVgroupTableInfo)); tscVgroupTableCopy(&info, pInfo);
info.vgInfo = pInfo->vgInfo;
for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) {
info.vgInfo.epAddr[j].fqdn = strdup(pInfo->vgInfo.epAddr[j].fqdn);
}
info.itemList = taosArrayClone(pInfo->itemList);
taosArrayPush(pa, &info); taosArrayPush(pa, &info);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册