提交 ff144921 编写于 作者: H hjxilinx

[TD-32] support multi-table query

上级 6fae35af
......@@ -1238,8 +1238,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
goto _clean;
}
if (pCmd->pDataBlocks->nSize > 0) {
// merge according to vgId
if (pCmd->pDataBlocks->nSize > 0) { // merge according to vgId
if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) {
goto _error_clean;
}
......@@ -1294,12 +1293,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion) {
int32_t ret = TSDB_CODE_SUCCESS;
// if (NULL == pSql->asyncTblPos) {
// tscCleanSqlCmd(&pSql->cmd);
// } else {
tscTrace("continue parse sql: %s", pSql->asyncTblPos);
// }
if (tscIsInsertOrImportData(pSql->sqlstr)) {
/*
......
......@@ -2789,7 +2789,7 @@ static int32_t optrToString(tSQLExpr* pExpr, char** exprString) {
return TSDB_CODE_SUCCESS;
}
static int32_t tablenameListToString(tSQLExpr* pExpr, /*char* str*/ SStringBuilder* sb) {
static int32_t tablenameListToString(tSQLExpr* pExpr, SStringBuilder* sb) {
tSQLExprList* pList = pExpr->pParam;
if (pList->nExpr <= 0) {
return TSDB_CODE_INVALID_SQL;
......@@ -2815,7 +2815,7 @@ static int32_t tablenameListToString(tSQLExpr* pExpr, /*char* str*/ SStringBuild
return TSDB_CODE_SUCCESS;
}
static int32_t tablenameCondToString(tSQLExpr* pExpr, /*char* str*/ SStringBuilder* sb) {
static int32_t tablenameCondToString(tSQLExpr* pExpr, SStringBuilder* sb) {
taosStringBuilderAppendStringLen(sb, QUERY_COND_REL_PREFIX_LIKE, QUERY_COND_REL_PREFIX_LIKE_LEN);
taosStringBuilderAppendString(sb, pExpr->val.pz);
......@@ -3756,8 +3756,8 @@ int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, SSqlObj* pSql
return TSDB_CODE_SUCCESS;
}
const char* msg = "invalid filter expression";
const char* msg1 = "invalid expression";
const char* msg2 = "invalid filter expression";
int32_t ret = TSDB_CODE_SUCCESS;
......@@ -3819,7 +3819,7 @@ int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, SSqlObj* pSql
taosStringBuilderDestroy(&sb);
if (!validateFilterExpr(pQueryInfo)) {
return invalidSqlErrMsg(pQueryInfo->msg, msg);
return invalidSqlErrMsg(pQueryInfo->msg, msg2);
}
doAddJoinTagsColumnsIntoTagList(pQueryInfo, &condExpr);
......
......@@ -847,6 +847,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder);
}
// serialize tag column query condition
if (pQueryInfo->tagCond.numOfTagCond > 0) {
STagCond* pTagCond = &pQueryInfo->tagCond;
......@@ -865,6 +866,16 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
}
// tbname in/like query expression should be sent to mgmt node
STagCond* pTagCond = &pQueryInfo->tagCond;
if (pTagCond->tbnameCond.cond != NULL) {
size_t s = strlen(pTagCond->tbnameCond.cond);
memcpy(pMsg, pTagCond->tbnameCond.cond, s);
pQueryMsg->nameCondLen = htons(s);
pMsg += s;
}
msgLen = pMsg - pStart;
tscTrace("%p msg built success,len:%d bytes", pSql, msgLen);
......
......@@ -608,7 +608,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
* the payloadLen should be actual message body size
* the old value of payloadLen is the allocated payload size
*/
pCmd->payloadLen = pDataBlock->nAllocSize - tsRpcHeadSize;
pCmd->payloadLen = pDataBlock->nAllocSize - tsRpcHeadSize - sizeof(SMsgDesc);
assert(pCmd->allocSize >= pCmd->payloadLen + tsRpcHeadSize + 100 && pCmd->payloadLen > 0);
return TSDB_CODE_SUCCESS;
......
......@@ -243,16 +243,6 @@ static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMs
taos_queue queue = dnodeGetVnodeRworker(pVnode);
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
// SReadMsg readMsg = {
// .rpcMsg = {0},
// .pCont = qhandle,
// .contLen = 0,
// .pRpcContext = pMsg->pRpcContext,
// };
//
// taos_queue queue = dnodeGetVnodeRworker(pVnode);
// taosWriteQitem(queue, TSDB_MSG_TYPE_QUERY, &readMsg);
}
static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg) {
......
......@@ -475,6 +475,7 @@ typedef struct {
int64_t slidingTime; // value for sliding window
char slidingTimeUnit; // time interval type, for revisement of interval(1d)
uint16_t tagCondLen; // tag length in current query
uint16_t nameCondLen; // table name in/like query expression string length
int16_t numOfGroupCols; // num of group by columns
int16_t orderByIdx;
int16_t orderType; // used in group by xx order by xxx
......
......@@ -172,7 +172,6 @@ typedef struct SQueryRuntimeEnv {
typedef struct SQInfo {
void* signature;
// void* param; // pointer to the RpcReadMsg
TSKEY startTime;
TSKEY elapsedTime;
int32_t pointsInterpo;
......
......@@ -16,6 +16,7 @@
#include "qast.h"
#include <tarray.h>
#include <tskiplist.h>
#include "../../client/inc/tschemautil.h"
#include "os.h"
#include "qsqlparser.h"
#include "qsyntaxtreefunction.h"
......@@ -239,9 +240,7 @@ uint8_t isQueryOnPrimaryKey(const char *primaryColumnName, const tSQLSyntaxNode
}
static tSQLSyntaxNode *createSyntaxTree(SSchema *pSchema, int32_t numOfCols, char *str, int32_t *i) {
SSQLToken t0;
t0 = tStrGetToken(str, i, false, 0, NULL);
SSQLToken t0 = tStrGetToken(str, i, false, 0, NULL);
if (t0.n == 0) {
return NULL;
}
......@@ -344,6 +343,7 @@ void tSQLBinaryExprFromString(tSQLBinaryExpr **pExpr, SSchema *pSchema, int32_t
}
int32_t pos = 0;
tSQLSyntaxNode *pStxNode = createSyntaxTree(pSchema, numOfCols, src, &pos);
if (pStxNode != NULL) {
assert(pStxNode->nodeType == TSQL_NODE_EXPR);
......
......@@ -5265,11 +5265,11 @@ void qTableQuery(SQInfo *pQInfo) {
// vnodeDecRefCount(pQInfo);
}
static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryTableMsg, SSqlFuncExprMsg *pExprMsg) {
static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncExprMsg *pExprMsg) {
int32_t j = 0;
while (j < pQueryTableMsg->numOfCols) {
if (pExprMsg->colInfo.colId == pQueryTableMsg->colList[j].colId) {
while (j < pQueryMsg->numOfCols) {
if (pExprMsg->colInfo.colId == pQueryMsg->colList[j].colId) {
break;
}
......@@ -5279,44 +5279,44 @@ static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryTableMsg, SSqlFuncEx
return j;
}
bool vnodeValidateExprColumnInfo(SQueryTableMsg *pQueryTableMsg, SSqlFuncExprMsg *pExprMsg) {
int32_t j = getColumnIndexInSource(pQueryTableMsg, pExprMsg);
return j < pQueryTableMsg->numOfCols;
bool vnodeValidateExprColumnInfo(SQueryTableMsg *pQueryMsg, SSqlFuncExprMsg *pExprMsg) {
int32_t j = getColumnIndexInSource(pQueryMsg, pExprMsg);
return j < pQueryMsg->numOfCols;
}
static int32_t validateQueryMeterMsg(SQueryTableMsg *pQueryTableMsg) {
if (pQueryTableMsg->intervalTime < 0) {
dError("qmsg:%p illegal value of aggTimeInterval %" PRId64 "", pQueryTableMsg, pQueryTableMsg->intervalTime);
static int32_t validateQueryMeterMsg(SQueryTableMsg *pQueryMsg) {
if (pQueryMsg->intervalTime < 0) {
dError("qmsg:%p illegal value of aggTimeInterval %" PRId64 "", pQueryMsg, pQueryMsg->intervalTime);
return -1;
}
if (pQueryTableMsg->numOfCols <= 0 || pQueryTableMsg->numOfCols > TSDB_MAX_COLUMNS) {
dError("qmsg:%p illegal value of numOfCols %d", pQueryTableMsg, pQueryTableMsg->numOfCols);
if (pQueryMsg->numOfCols <= 0 || pQueryMsg->numOfCols > TSDB_MAX_COLUMNS) {
dError("qmsg:%p illegal value of numOfCols %d", pQueryMsg, pQueryMsg->numOfCols);
return -1;
}
if (pQueryTableMsg->numOfTables <= 0) {
dError("qmsg:%p illegal value of numOfTables %d", pQueryTableMsg, pQueryTableMsg->numOfTables);
if (pQueryMsg->numOfTables <= 0) {
dError("qmsg:%p illegal value of numOfTables %d", pQueryMsg, pQueryMsg->numOfTables);
return -1;
}
if (pQueryTableMsg->numOfGroupCols < 0) {
dError("qmsg:%p illegal value of numOfGroupbyCols %d", pQueryTableMsg, pQueryTableMsg->numOfGroupCols);
if (pQueryMsg->numOfGroupCols < 0) {
dError("qmsg:%p illegal value of numOfGroupbyCols %d", pQueryMsg, pQueryMsg->numOfGroupCols);
return -1;
}
if (pQueryTableMsg->numOfOutputCols > TSDB_MAX_COLUMNS || pQueryTableMsg->numOfOutputCols <= 0) {
dError("qmsg:%p illegal value of output columns %d", pQueryTableMsg, pQueryTableMsg->numOfOutputCols);
if (pQueryMsg->numOfOutputCols > TSDB_MAX_COLUMNS || pQueryMsg->numOfOutputCols <= 0) {
dError("qmsg:%p illegal value of output columns %d", pQueryMsg, pQueryMsg->numOfOutputCols);
return -1;
}
return 0;
}
static char* createTableIdList(SQueryTableMsg* pQueryTableMsg, char* pMsg, SArray** pTableIdList) {
assert(pQueryTableMsg->numOfTables > 0);
static char* createTableIdList(SQueryTableMsg* pQueryMsg, char* pMsg, SArray** pTableIdList) {
assert(pQueryMsg->numOfTables > 0);
*pTableIdList = taosArrayInit(pQueryTableMsg->numOfTables, sizeof(STableIdInfo));
*pTableIdList = taosArrayInit(pQueryMsg->numOfTables, sizeof(STableIdInfo));
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->sid = htonl(pTableIdInfo->sid);
......@@ -5326,7 +5326,7 @@ static char* createTableIdList(SQueryTableMsg* pQueryTableMsg, char* pMsg, SArra
taosArrayPush(*pTableIdList, pTableIdInfo);
pMsg += sizeof(STableIdInfo);
for (int32_t j = 1; j < pQueryTableMsg->numOfTables; ++j) {
for (int32_t j = 1; j < pQueryMsg->numOfTables; ++j) {
pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->sid = htonl(pTableIdInfo->sid);
......@@ -5341,49 +5341,47 @@ static char* createTableIdList(SQueryTableMsg* pQueryTableMsg, char* pMsg, SArra
}
/**
* pQueryTableMsg->head has been converted before this function is called.
* pQueryMsg->head has been converted before this function is called.
*
* @param pQueryTableMsg
* @param pQueryMsg
* @param pTableIdList
* @param pExpr
* @return
*/
static int32_t convertQueryMsg(SQueryTableMsg *pQueryTableMsg, SArray **pTableIdList, SSqlFuncExprMsg ***pExpr,
wchar_t** tagCond) {
pQueryTableMsg->numOfTables = htonl(pQueryTableMsg->numOfTables);
pQueryTableMsg->window.skey = htobe64(pQueryTableMsg->window.skey);
pQueryTableMsg->window.ekey = htobe64(pQueryTableMsg->window.ekey);
pQueryTableMsg->intervalTime = htobe64(pQueryTableMsg->intervalTime);
pQueryTableMsg->slidingTime = htobe64(pQueryTableMsg->slidingTime);
pQueryTableMsg->limit = htobe64(pQueryTableMsg->limit);
pQueryTableMsg->offset = htobe64(pQueryTableMsg->offset);
pQueryTableMsg->order = htons(pQueryTableMsg->order);
pQueryTableMsg->orderColId = htons(pQueryTableMsg->orderColId);
pQueryTableMsg->queryType = htons(pQueryTableMsg->queryType);
pQueryTableMsg->numOfCols = htons(pQueryTableMsg->numOfCols);
pQueryTableMsg->numOfOutputCols = htons(pQueryTableMsg->numOfOutputCols);
pQueryTableMsg->numOfGroupCols = htons(pQueryTableMsg->numOfGroupCols);
pQueryTableMsg->tagCondLen = htons(pQueryTableMsg->tagCondLen);
pQueryTableMsg->tsOffset = htonl(pQueryTableMsg->tsOffset);
pQueryTableMsg->tsLen = htonl(pQueryTableMsg->tsLen);
pQueryTableMsg->tsNumOfBlocks = htonl(pQueryTableMsg->tsNumOfBlocks);
pQueryTableMsg->tsOrder = htonl(pQueryTableMsg->tsOrder);
static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, SSqlFuncExprMsg ***pExpr,
wchar_t** tagCond, char** nameCond) {
pQueryMsg->numOfTables = htonl(pQueryMsg->numOfTables);
pQueryMsg->window.skey = htobe64(pQueryMsg->window.skey);
pQueryMsg->window.ekey = htobe64(pQueryMsg->window.ekey);
pQueryMsg->intervalTime = htobe64(pQueryMsg->intervalTime);
pQueryMsg->slidingTime = htobe64(pQueryMsg->slidingTime);
pQueryMsg->limit = htobe64(pQueryMsg->limit);
pQueryMsg->offset = htobe64(pQueryMsg->offset);
pQueryMsg->order = htons(pQueryMsg->order);
pQueryMsg->orderColId = htons(pQueryMsg->orderColId);
pQueryMsg->queryType = htons(pQueryMsg->queryType);
pQueryMsg->numOfCols = htons(pQueryMsg->numOfCols);
pQueryMsg->numOfOutputCols = htons(pQueryMsg->numOfOutputCols);
pQueryMsg->numOfGroupCols = htons(pQueryMsg->numOfGroupCols);
pQueryMsg->tagCondLen = htons(pQueryMsg->tagCondLen);
pQueryMsg->nameCondLen = htons(pQueryMsg->nameCondLen);
pQueryMsg->tsOffset = htonl(pQueryMsg->tsOffset);
pQueryMsg->tsLen = htonl(pQueryMsg->tsLen);
pQueryMsg->tsNumOfBlocks = htonl(pQueryMsg->tsNumOfBlocks);
pQueryMsg->tsOrder = htonl(pQueryMsg->tsOrder);
// query msg safety check
if (validateQueryMeterMsg(pQueryTableMsg) != 0) {
if (validateQueryMeterMsg(pQueryMsg) != 0) {
return TSDB_CODE_INVALID_QUERY_MSG;
}
char *pMsg = (char *)(pQueryTableMsg->colList) + sizeof(SColumnInfo) * pQueryTableMsg->numOfCols;
char *pMsg = (char *)(pQueryMsg->colList) + sizeof(SColumnInfo) * pQueryMsg->numOfCols;
for (int32_t col = 0; col < pQueryTableMsg->numOfCols; ++col) {
SColumnInfo* pColInfo = &pQueryTableMsg->colList[col];
for (int32_t col = 0; col < pQueryMsg->numOfCols; ++col) {
SColumnInfo* pColInfo = &pQueryMsg->colList[col];
pColInfo->colId = htons(pColInfo->colId);
pColInfo->type = htons(pColInfo->type);
......@@ -5423,10 +5421,10 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryTableMsg, SArray **pTableId
bool hasArithmeticFunction = false;
*pExpr = calloc(pQueryTableMsg->numOfOutputCols, POINTER_BYTES);
*pExpr = calloc(pQueryMsg->numOfOutputCols, POINTER_BYTES);
SSqlFuncExprMsg *pExprMsg = (SSqlFuncExprMsg *)pMsg;
for (int32_t i = 0; i < pQueryTableMsg->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryMsg->numOfOutputCols; ++i) {
(*pExpr)[i] = pExprMsg;
pExprMsg->colInfo.colIdx = htons(pExprMsg->colInfo.colIdx);
......@@ -5457,7 +5455,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryTableMsg, SArray **pTableId
return TSDB_CODE_INVALID_QUERY_MSG;
}
} else {
if (!vnodeValidateExprColumnInfo(pQueryTableMsg, pExprMsg)) {
if (!vnodeValidateExprColumnInfo(pQueryMsg, pExprMsg)) {
return TSDB_CODE_INVALID_QUERY_MSG;
}
}
......@@ -5465,55 +5463,59 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryTableMsg, SArray **pTableId
pExprMsg = (SSqlFuncExprMsg *)pMsg;
}
pQueryTableMsg->colNameLen = htonl(pQueryTableMsg->colNameLen);
pQueryMsg->colNameLen = htonl(pQueryMsg->colNameLen);
if (hasArithmeticFunction) { // column name array
assert(pQueryTableMsg->colNameLen > 0);
pQueryTableMsg->colNameList = (int64_t)pMsg;
pMsg += pQueryTableMsg->colNameLen;
assert(pQueryMsg->colNameLen > 0);
pQueryMsg->colNameList = (int64_t)pMsg;
pMsg += pQueryMsg->colNameLen;
}
pMsg = createTableIdList(pQueryTableMsg, pMsg, pTableIdList);
pMsg = createTableIdList(pQueryMsg, pMsg, pTableIdList);
if (pQueryTableMsg->numOfGroupCols > 0) { // group by tag columns
// if (pQueryTableMsg->numOfGroupCols > 0) {
// pQueryTableMsg->groupbyTagIds = (uint64_t) & (pTagSchema[pQueryTableMsg->numOfTagsCols]);
if (pQueryMsg->numOfGroupCols > 0) { // group by tag columns
// if (pQueryMsg->numOfGroupCols > 0) {
// pQueryMsg->groupbyTagIds = (uint64_t) & (pTagSchema[pQueryMsg->numOfTagsCols]);
// } else {
// pQueryTableMsg->groupbyTagIds = 0;
// pQueryMsg->groupbyTagIds = 0;
// }
pQueryTableMsg->orderByIdx = htons(pQueryTableMsg->orderByIdx);
pQueryTableMsg->orderType = htons(pQueryTableMsg->orderType);
pQueryMsg->orderByIdx = htons(pQueryMsg->orderByIdx);
pQueryMsg->orderType = htons(pQueryMsg->orderType);
pMsg += sizeof(SColIndexEx) * pQueryTableMsg->numOfGroupCols;
pMsg += sizeof(SColIndexEx) * pQueryMsg->numOfGroupCols;
} else {
pQueryTableMsg->groupbyTagIds = 0;
pQueryMsg->groupbyTagIds = 0;
}
pQueryTableMsg->interpoType = htons(pQueryTableMsg->interpoType);
if (pQueryTableMsg->interpoType != TSDB_INTERPO_NONE) {
pQueryTableMsg->defaultVal = (uint64_t)(pMsg);
pQueryMsg->interpoType = htons(pQueryMsg->interpoType);
if (pQueryMsg->interpoType != TSDB_INTERPO_NONE) {
pQueryMsg->defaultVal = (uint64_t)(pMsg);
int64_t *v = (int64_t *)pMsg;
for (int32_t i = 0; i < pQueryTableMsg->numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryMsg->numOfOutputCols; ++i) {
v[i] = htobe64(v[i]);
}
pMsg += sizeof(int64_t) * pQueryTableMsg->numOfOutputCols;
pMsg += sizeof(int64_t) * pQueryMsg->numOfOutputCols;
}
// the tag query condition expression string is located at the end of query msg
if (pQueryTableMsg->tagCondLen > 0) {
*tagCond = calloc(1, pQueryTableMsg->tagCondLen * TSDB_NCHAR_SIZE);
memcpy(*tagCond, pMsg, pQueryTableMsg->tagCondLen * TSDB_NCHAR_SIZE);
if (pQueryMsg->tagCondLen > 0) {
*tagCond = calloc(1, pQueryMsg->tagCondLen * TSDB_NCHAR_SIZE);
memcpy(*tagCond, pMsg, pQueryMsg->tagCondLen * TSDB_NCHAR_SIZE);
}
if (pQueryMsg->nameCondLen > 0) {
*nameCond = strndup(pMsg, pQueryMsg->nameCondLen);
}
dTrace("qmsg:%p query on %d meter(s), qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, numOfTagCols:%d, "
"timestamp order:%d, tags order:%d, tags order col:%d, numOfOutputCols:%d, numOfCols:%d, interval:%" PRId64
", fillType:%d, comptslen:%d, limit:%" PRId64 ", offset:%" PRId64,
pQueryTableMsg, pQueryTableMsg->numOfTables, pQueryTableMsg->window.skey, pQueryTableMsg->window.ekey,
pQueryTableMsg->numOfGroupCols, pQueryTableMsg->order, pQueryTableMsg->orderType,
pQueryTableMsg->orderByIdx, pQueryTableMsg->numOfOutputCols,
pQueryTableMsg->numOfCols, pQueryTableMsg->intervalTime, pQueryTableMsg->interpoType, pQueryTableMsg->tsLen,
pQueryTableMsg->limit, pQueryTableMsg->offset);
pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->window.skey, pQueryMsg->window.ekey,
pQueryMsg->numOfGroupCols, pQueryMsg->order, pQueryMsg->orderType,
pQueryMsg->orderByIdx, pQueryMsg->numOfOutputCols,
pQueryMsg->numOfCols, pQueryMsg->intervalTime, pQueryMsg->interpoType, pQueryMsg->tsLen,
pQueryMsg->limit, pQueryMsg->offset);
return 0;
}
......@@ -6047,54 +6049,59 @@ _error:
return code;
}
int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, SQInfo **pQInfo) {
assert(pQueryTableMsg != NULL);
int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, SQInfo **pQInfo) {
assert(pQueryMsg != NULL);
int32_t code = TSDB_CODE_SUCCESS;
SArray *pTableIdList = NULL;
SSqlFuncExprMsg** pExprMsg = NULL;
wchar_t* tagCond = NULL;
char* nameCond = NULL;
if ((code = convertQueryMsg(pQueryTableMsg, &pTableIdList, &pExprMsg, &tagCond)) != TSDB_CODE_SUCCESS) {
if ((code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &tagCond, &nameCond)) != TSDB_CODE_SUCCESS) {
return code;
}
if (pQueryTableMsg->numOfTables <= 0) {
dError("Invalid number of tables to query, numOfTables:%d", pQueryTableMsg->numOfTables);
if (pQueryMsg->numOfTables <= 0) {
dError("Invalid number of tables to query, numOfTables:%d", pQueryMsg->numOfTables);
code = TSDB_CODE_INVALID_QUERY_MSG;
goto _query_over;
}
// todo check vnode status
if (pTableIdList == NULL || taosArrayGetSize(pTableIdList) == 0) {
dError("qmsg:%p, SQueryTableMsg wrong format", pQueryTableMsg);
dError("qmsg:%p, SQueryTableMsg wrong format", pQueryMsg);
code = TSDB_CODE_INVALID_QUERY_MSG;
goto _query_over;
}
SSqlFunctionExpr *pExprs = NULL;
if ((code = createSqlFunctionExprFromMsg(pQueryTableMsg, &pExprs, pExprMsg)) != TSDB_CODE_SUCCESS) {
if ((code = createSqlFunctionExprFromMsg(pQueryMsg, &pExprs, pExprMsg)) != TSDB_CODE_SUCCESS) {
goto _query_over;
}
SSqlGroupbyExpr *pGroupbyExpr = createGroupbyExprFromMsg(pQueryTableMsg, &code);
if ((pGroupbyExpr == NULL && pQueryTableMsg->numOfGroupCols != 0) || code != TSDB_CODE_SUCCESS) {
SSqlGroupbyExpr *pGroupbyExpr = createGroupbyExprFromMsg(pQueryMsg, &code);
if ((pGroupbyExpr == NULL && pQueryMsg->numOfGroupCols != 0) || code != TSDB_CODE_SUCCESS) {
goto _query_over;
}
// super table query
if ((pQueryTableMsg->queryType & TSDB_QUERY_TYPE_STABLE_QUERY) != 0) {
SArray* res = NULL;
if ((pQueryMsg->queryType & TSDB_QUERY_TYPE_STABLE_QUERY) != 0) {
STableId* id = taosArrayGet(pTableIdList, 0);
id->uid = -1;
SArray* res = tsdbQueryTableList(tsdb, id->uid, tagCond, pQueryTableMsg->tagCondLen);
res = tsdbQueryTableList(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen);
if (taosArrayGetSize(res) == 0) { // no qualified table in stable query in this vnode
code = TSDB_CODE_SUCCESS;
goto _query_over;
}
} else {
res = pTableIdList;
}
code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pTableIdList, tsdb, pQInfo);
code = createQInfo(pQueryMsg, pGroupbyExpr, pExprs, res, tsdb, pQInfo);
_query_over:
if (code != TSDB_CODE_SUCCESS) {
......@@ -6103,16 +6110,16 @@ _query_over:
// if failed to add ref for all meters in this query, abort current query
// if (code != TSDB_CODE_SUCCESS) {
// vnodeDecQueryRefCount(pQueryTableMsg, pMeterObjList, incNumber);
// vnodeDecQueryRefCount(pQueryMsg, pMeterObjList, incNumber);
// }
//
// tfree(pQueryTableMsg->pSqlFuncExprs);
// tfree(pQueryMsg->pSqlFuncExprs);
// tfree(pMeterObjList);
// ret = vnodeSendQueryRspMsg(pObj, code, pObj->qhandle);
//
// tfree(pQueryTableMsg->pSidExtInfo);
// for(int32_t i = 0; i < pQueryTableMsg->numOfCols; ++i) {
// vnodeFreeColumnInfo(&pQueryTableMsg->colList[i]);
// tfree(pQueryMsg->pSidExtInfo);
// for(int32_t i = 0; i < pQueryMsg->numOfCols; ++i) {
// vnodeFreeColumnInfo(&pQueryMsg->colList[i]);
// }
//
// atomic_fetch_add_32(&vnodeSelectReqNum, 1);
......
......@@ -73,14 +73,15 @@ typedef struct SQueryFilesInfo {
char dbFilePathPrefix[PATH_MAX];
} SQueryFilesInfo;
typedef struct STableQueryRec {
typedef struct STableQueryInfo {
STableId tableId;
TSKEY lastKey;
STable * pTableObj;
int64_t offsetInHeaderFile;
int32_t numOfBlocks;
int32_t start;
SCompBlock *pBlock;
} STableQueryRec;
} STableQueryInfo;
typedef struct {
SCompBlock *compBlock;
......@@ -89,7 +90,7 @@ typedef struct {
typedef struct STableDataBlockInfoEx {
SCompBlockFields pBlock;
STableQueryRec * pMeterDataInfo;
STableQueryInfo* pMeterDataInfo;
int32_t blockIndex;
int32_t groupIdx; /* number of group is less than the total number of meters */
} STableDataBlockInfoEx;
......@@ -105,7 +106,6 @@ typedef struct STsdbQueryHandle {
SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */
SQueryFilesInfo vnodeFileInfo;
int16_t numOfRowsPerPage;
......@@ -113,21 +113,22 @@ typedef struct STsdbQueryHandle {
int16_t order;
STimeWindow window; // the primary query time window that applies to all queries
int32_t blockBufferSize;
SCompBlock *pBlock;
SCompBlock* pBlock;
int32_t numOfBlocks;
SField ** pFields;
SArray * pColumns; // column list, SColumnInfoEx array list
SArray * pTableIdList; // table id object list
bool locateStart;
int32_t realNumOfRows;
bool loadDataAfterSeek; // load data after seek.
SArray* pTableQueryInfo;
int32_t activeIndex;
STableDataBlockInfoEx *pDataBlockInfoEx;
STableQueryRec * pTableQueryInfo;
int32_t tableIndex;
bool isFirstSlot;
void * qinfo; // query info handle, for debug purpose
SSkipListIterator* memIter;
STableDataBlockInfoEx *pDataBlockInfoEx;
} STsdbQueryHandle;
int32_t doAllocateBuf(STsdbQueryHandle *pQueryHandle, int32_t rowsPerFileBlock) {
......@@ -263,25 +264,27 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond
pQueryHandle->window = pCond->twindow;
pQueryHandle->pTsdb = tsdb;
pQueryHandle->pTableIdList = idList;
pQueryHandle->pColumns = pColumnInfo;
pQueryHandle->loadDataAfterSeek = false;
pQueryHandle->isFirstSlot = true;
// only support table query
assert(taosArrayGetSize(idList) == 1);
size_t size = taosArrayGetSize(idList);
assert(size >= 1);
pQueryHandle->pTableQueryInfo = calloc(1, sizeof(STableQueryRec));
STableQueryRec* pTableQRec = pQueryHandle->pTableQueryInfo;
pQueryHandle->pTableQueryInfo = taosArrayInit(size, sizeof(STableQueryInfo));
for(int32_t i = 0; i < size; ++i) {
STableId id = *(STableId*) taosArrayGet(idList, i);
pTableQRec->lastKey = pQueryHandle->window.skey;
STableIdInfo* idInfo = taosArrayGet(pQueryHandle->pTableIdList, 0);
STableQueryInfo info = {
.lastKey = pQueryHandle->window.skey,
.tableId = id,
.pTableObj = tsdbGetTableByUid(tsdbGetMeta(tsdb), id.uid), //todo this may be failed
};
STable *pTable = tsdbGetTableByUid(tsdbGetMeta(pQueryHandle->pTsdb), idInfo->uid);
assert(pTable != NULL);
taosArrayPush(pQueryHandle->pTableQueryInfo, &info);
}
pTableQRec->pTableObj = pTable;
pQueryHandle->activeIndex = 0;
// malloc buffer in order to load data from file
int32_t numOfCols = taosArrayGetSize(pColumnInfo);
......@@ -313,7 +316,9 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond
bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle) {
STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
STable *pTable = pHandle->pTableQueryInfo->pTableObj;
STableQueryInfo* pTableQInfo = taosArrayGet(pHandle->pTableQueryInfo, pHandle->activeIndex);
STable *pTable = pTableQInfo->pTableObj;
// no data in cache, abort
if (pTable->mem == NULL && pTable->imem == NULL) {
......@@ -321,7 +326,7 @@ bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle) {
}
// all data in mem are checked already.
if (pHandle->pTableQueryInfo->lastKey > pTable->mem->keyLast) {
if (pTableQInfo->lastKey > pTable->mem->keyLast) {
return false;
}
......@@ -364,9 +369,9 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max
// copy data from cache into data block
SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) {
STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
STableIdInfo* idInfo = taosArrayGet(pHandle->pTableIdList, 0);
STable *pTable = pHandle->pTableQueryInfo->pTableObj;
STableQueryInfo* pTableQInfo = taosArrayGet(pHandle->pTableQueryInfo, pHandle->activeIndex);
STable *pTable = pTableQInfo->pTableObj;
TSKEY skey = 0, ekey = 0;
int32_t rows = 0;
......@@ -382,14 +387,14 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) {
}
SDataBlockInfo blockInfo = {
.uid = idInfo->uid,
.sid = idInfo->sid,
.uid = pTable->tableId.uid,
.sid = pTable->tableId.tid,
.size = rows,
.window = {.skey = skey, .ekey = ekey}
};
// update the last key value
pHandle->pTableQueryInfo->lastKey = ekey + 1;
pTableQInfo->lastKey = ekey + 1;
return blockInfo;
}
......@@ -427,7 +432,9 @@ static SArray* createTableIdArrayList(struct STsdbRepo* tsdb, int64_t uid) {
SSkipListIterator* iter = tSkipListCreateIter(pTable->pIndex);
while(tSkipListIterNext(iter)) {
STable* t = *(STable**) tSkipListIterGet(iter);
SSkipListNode* pNode = tSkipListIterGet(iter);
STable* t = *(STable**) SL_GET_NODE_DATA(pNode);
taosArrayPush(pList, &t->tableId);
}
......@@ -696,7 +703,7 @@ bool tSkipListNodeFilterCallback(const void* pNode, void* param) {
return true;
}
static int32_t mgmtFilterMeterByIndex(STable* pSTable, SArray* pRes, const char* pCond) {
static int32_t doQueryTableList(STable* pSTable, SArray* pRes, const char* pCond) {
STColumn* stcol = schemaColAt(pSTable->tagSchema, 0);
tSQLBinaryExpr* pExpr = NULL;
......@@ -736,7 +743,7 @@ SArray *tsdbQueryTableList(struct STsdbRepo* tsdb, int64_t uid, const wchar_t *p
STable* pSTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
assert(pSTable != NULL);
if (mgmtFilterMeterByIndex(pSTable, result, str) == TSDB_CODE_SUCCESS) {
if (doQueryTableList(pSTable, result, str) == TSDB_CODE_SUCCESS) {
return result;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册