From c59d762a35f09a06534379763764b8e2b148f526 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Thu, 30 Apr 2020 18:25:33 +0800 Subject: [PATCH] [td-186] fix bugs for stable join query. --- src/client/inc/tscSubquery.h | 4 +- src/client/inc/tscUtil.h | 25 +- src/client/inc/tsclient.h | 6 +- src/client/src/tscFunctionImpl.c | 90 +++--- src/client/src/tscSQLParser.c | 20 +- src/client/src/tscSchemaUtil.c | 2 - src/client/src/tscServer.c | 119 ++++---- src/client/src/tscSubquery.c | 465 ++++++++++++++++++++++++------- src/client/src/tscUtil.c | 99 +++---- src/inc/query.h | 4 +- src/inc/taosdef.h | 2 + src/inc/taosmsg.h | 47 +--- src/mnode/src/mgmtTable.c | 4 +- src/query/inc/qresultBuf.h | 2 +- src/query/inc/queryExecutor.h | 2 +- src/query/inc/sql.y | 2 +- src/query/inc/tsqlfunction.h | 22 +- src/query/src/qresultBuf.c | 8 +- src/query/src/queryExecutor.c | 114 +++++--- src/query/src/sql.c | 213 +++++++------- src/tsdb/src/tsdbRead.c | 38 +-- src/util/inc/tcompare.h | 2 + src/util/src/tcompare.c | 27 ++ src/vnode/src/vnodeRead.c | 2 +- 24 files changed, 824 insertions(+), 495 deletions(-) diff --git a/src/client/inc/tscSubquery.h b/src/client/inc/tscSubquery.h index 1d4194d707..368fe2250a 100644 --- a/src/client/inc/tscSubquery.h +++ b/src/client/inc/tscSubquery.h @@ -29,8 +29,8 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql); int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql); void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code); -SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index); -void tscDestroyJoinSupporter(SJoinSubquerySupporter* pSupporter); +SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index); +void tscDestroyJoinSupporter(SJoinSupporter* pSupporter); int32_t tscHandleMasterJoinQuery(SSqlObj* pSql); diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index e6d15c8736..88988796e9 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -51,7 +51,14 @@ typedef struct SParsedDataColInfo { bool hasVal[TSDB_MAX_COLUMNS]; } SParsedDataColInfo; -typedef struct SJoinSubquerySupporter { +typedef struct STidTags { + int64_t uid; + int32_t tid; + int32_t vgId; + char tag[]; +} STidTags; + +typedef struct SJoinSupporter { SSubqueryState* pState; SSqlObj* pObj; // parent SqlObj int32_t subqueryIndex; // index of sub query @@ -65,8 +72,17 @@ typedef struct SJoinSubquerySupporter { SSqlGroupbyExpr groupbyExpr; struct STSBuf* pTSBuf; // the TSBuf struct that holds the compressed timestamp array FILE* f; // temporary file in order to create TSBuf - char path[PATH_MAX]; // temporary file path -} SJoinSubquerySupporter; + char path[PATH_MAX]; // temporary file path, todo dynamic allocate memory + int32_t tagSize; // the length of each in the first filter stage + char* pIdTagList; // result of first stage tags + int32_t totalLen; + int32_t num; +} SJoinSupporter; + +typedef struct SVgroupTableInfo { + SCMVgroupInfo vgInfo; + SArray* itemList; //SArray +} SVgroupTableInfo; int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks); @@ -87,7 +103,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta, STableDataBlocks** dataBlocks); -UNUSED_FUNC STableIdInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx); +//UNUSED_FUNC STableIdInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx); /** * @@ -190,7 +206,6 @@ bool tscShouldFreeHeatBeat(SSqlObj* pHb); void tscCleanSqlCmd(SSqlCmd* pCmd); bool tscShouldBeFreed(SSqlObj* pSql); -void tscClearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache); STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd *pCmd, int32_t subClauseIndex, int32_t tableIndex); STableMetaInfo* tscGetMetaInfo(SQueryInfo *pQueryInfo, int32_t tableIndex); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index b349a4e2dd..418ca7ad36 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -67,9 +67,10 @@ typedef struct STableMeta { } STableMeta; typedef struct STableMetaInfo { - STableMeta * pTableMeta; // table meta, cached in client side and acquried by name + STableMeta * pTableMeta; // table meta, cached in client side and acquired by name SVgroupsInfo *vgroupList; - + SArray *pVgroupTables; // SArray + /* * 1. keep the vgroup index during the multi-vnode super table projection query * 2. keep the vgroup index for multi-vnode insertion @@ -382,7 +383,6 @@ int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo); void tscDestroyResPointerInfo(SSqlRes *pRes); void tscResetSqlCmdObj(SSqlCmd *pCmd); -void tscFreeResData(SSqlObj *pSql); /** * free query result of the sql object diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index bc6231ba09..22322ec219 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -153,7 +153,7 @@ typedef struct SRateInfo { int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type, - int16_t *bytes, int16_t *intermediateResBytes, int16_t extLength, bool isSuperTable) { + int16_t *bytes, int16_t *interResBytes, int16_t extLength, bool isSuperTable) { if (!isValidDataType(dataType, dataBytes)) { tscError("Illegal data type %d or data type length %d", dataType, dataBytes); return TSDB_CODE_INVALID_SQL; @@ -164,28 +164,35 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_INTERP) { *type = (int16_t)dataType; *bytes = (int16_t)dataBytes; - *intermediateResBytes = *bytes + sizeof(SResultInfo); + *interResBytes = *bytes + sizeof(SResultInfo); + return TSDB_CODE_SUCCESS; + } + + if (functionId == TSDB_FUNC_TID_TAG) { // todo use struct + *type = TSDB_DATA_TYPE_BINARY; + *bytes = dataBytes + sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t); // (uid, tid) + VGID + TAGSIZE + *interResBytes = *bytes; return TSDB_CODE_SUCCESS; } if (functionId == TSDB_FUNC_COUNT) { *type = TSDB_DATA_TYPE_BIGINT; *bytes = sizeof(int64_t); - *intermediateResBytes = *bytes; + *interResBytes = *bytes; return TSDB_CODE_SUCCESS; } if (functionId == TSDB_FUNC_ARITHM) { *type = TSDB_DATA_TYPE_DOUBLE; *bytes = sizeof(double); - *intermediateResBytes = *bytes; + *interResBytes = *bytes; return TSDB_CODE_SUCCESS; } if (functionId == TSDB_FUNC_TS_COMP) { *type = TSDB_DATA_TYPE_BINARY; *bytes = sizeof(int32_t); // this results is compressed ts data - *intermediateResBytes = POINTER_BYTES; + *interResBytes = POINTER_BYTES; return TSDB_CODE_SUCCESS; } @@ -193,54 +200,54 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI if (functionId == TSDB_FUNC_MIN || functionId == TSDB_FUNC_MAX) { *type = TSDB_DATA_TYPE_BINARY; *bytes = dataBytes + DATA_SET_FLAG_SIZE; - *intermediateResBytes = *bytes; + *interResBytes = *bytes; return TSDB_CODE_SUCCESS; } else if (functionId == TSDB_FUNC_SUM) { *type = TSDB_DATA_TYPE_BINARY; *bytes = sizeof(SSumInfo); - *intermediateResBytes = *bytes; + *interResBytes = *bytes; return TSDB_CODE_SUCCESS; } else if (functionId == TSDB_FUNC_AVG) { *type = TSDB_DATA_TYPE_BINARY; *bytes = sizeof(SAvgInfo); - *intermediateResBytes = *bytes; + *interResBytes = *bytes; return TSDB_CODE_SUCCESS; } else if (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE) { *type = TSDB_DATA_TYPE_DOUBLE; *bytes = sizeof(SRateInfo); - *intermediateResBytes = sizeof(SRateInfo); + *interResBytes = sizeof(SRateInfo); return TSDB_CODE_SUCCESS; } else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { *type = TSDB_DATA_TYPE_BINARY; *bytes = sizeof(STopBotInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param; - *intermediateResBytes = *bytes; + *interResBytes = *bytes; return TSDB_CODE_SUCCESS; } else if (functionId == TSDB_FUNC_SPREAD) { *type = TSDB_DATA_TYPE_BINARY; *bytes = sizeof(SSpreadInfo); - *intermediateResBytes = *bytes; + *interResBytes = *bytes; return TSDB_CODE_SUCCESS; } else if (functionId == TSDB_FUNC_APERCT) { *type = TSDB_DATA_TYPE_BINARY; *bytes = sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1) + sizeof(SHistogramInfo) + sizeof(SAPercentileInfo); - *intermediateResBytes = *bytes; + *interResBytes = *bytes; return TSDB_CODE_SUCCESS; } else if (functionId == TSDB_FUNC_LAST_ROW) { *type = TSDB_DATA_TYPE_BINARY; *bytes = sizeof(SLastrowInfo) + dataBytes; - *intermediateResBytes = *bytes; + *interResBytes = *bytes; return TSDB_CODE_SUCCESS; } else if (functionId == TSDB_FUNC_TWA) { *type = TSDB_DATA_TYPE_DOUBLE; *bytes = sizeof(STwaInfo); - *intermediateResBytes = *bytes; + *interResBytes = *bytes; return TSDB_CODE_SUCCESS; } } @@ -253,57 +260,57 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI } *bytes = sizeof(int64_t); - *intermediateResBytes = sizeof(SSumInfo); + *interResBytes = sizeof(SSumInfo); return TSDB_CODE_SUCCESS; } else if (functionId == TSDB_FUNC_APERCT) { *type = TSDB_DATA_TYPE_DOUBLE; *bytes = sizeof(double); - *intermediateResBytes = + *interResBytes = sizeof(SAPercentileInfo) + sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1); return TSDB_CODE_SUCCESS; } else if (functionId == TSDB_FUNC_TWA) { *type = TSDB_DATA_TYPE_DOUBLE; *bytes = sizeof(double); - *intermediateResBytes = sizeof(STwaInfo); + *interResBytes = sizeof(STwaInfo); return TSDB_CODE_SUCCESS; } if (functionId == TSDB_FUNC_AVG) { *type = TSDB_DATA_TYPE_DOUBLE; *bytes = sizeof(double); - *intermediateResBytes = sizeof(SAvgInfo); + *interResBytes = sizeof(SAvgInfo); } else if (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE) { *type = TSDB_DATA_TYPE_DOUBLE; *bytes = sizeof(double); - *intermediateResBytes = sizeof(SRateInfo); + *interResBytes = sizeof(SRateInfo); } else if (functionId == TSDB_FUNC_STDDEV) { *type = TSDB_DATA_TYPE_DOUBLE; *bytes = sizeof(double); - *intermediateResBytes = sizeof(SStddevInfo); + *interResBytes = sizeof(SStddevInfo); } else if (functionId == TSDB_FUNC_MIN || functionId == TSDB_FUNC_MAX) { *type = (int16_t)dataType; *bytes = (int16_t)dataBytes; - *intermediateResBytes = dataBytes + DATA_SET_FLAG_SIZE; + *interResBytes = dataBytes + DATA_SET_FLAG_SIZE; } else if (functionId == TSDB_FUNC_FIRST || functionId == TSDB_FUNC_LAST) { *type = (int16_t)dataType; *bytes = (int16_t)dataBytes; - *intermediateResBytes = dataBytes + sizeof(SResultInfo); + *interResBytes = dataBytes + sizeof(SResultInfo); } else if (functionId == TSDB_FUNC_SPREAD) { *type = (int16_t)TSDB_DATA_TYPE_DOUBLE; *bytes = sizeof(double); - *intermediateResBytes = sizeof(SSpreadInfo); + *interResBytes = sizeof(SSpreadInfo); } else if (functionId == TSDB_FUNC_PERCT) { *type = (int16_t)TSDB_DATA_TYPE_DOUBLE; *bytes = (int16_t)sizeof(double); - *intermediateResBytes = (int16_t)sizeof(double); + *interResBytes = (int16_t)sizeof(double); } else if (functionId == TSDB_FUNC_LEASTSQR) { *type = TSDB_DATA_TYPE_BINARY; *bytes = TSDB_AVG_FUNCTION_INTER_BUFFER_SIZE; // string - *intermediateResBytes = *bytes + sizeof(SResultInfo); + *interResBytes = *bytes + sizeof(SResultInfo); } else if (functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_LAST_DST) { *type = TSDB_DATA_TYPE_BINARY; *bytes = dataBytes + sizeof(SFirstLastInfo); - *intermediateResBytes = *bytes; + *interResBytes = *bytes; } else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { *type = (int16_t)dataType; *bytes = (int16_t)dataBytes; @@ -311,11 +318,11 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI size_t size = sizeof(STopBotInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param; // the output column may be larger than sizeof(STopBotInfo) - *intermediateResBytes = size; + *interResBytes = size; } else if (functionId == TSDB_FUNC_LAST_ROW) { *type = (int16_t)dataType; *bytes = (int16_t)dataBytes; - *intermediateResBytes = dataBytes + sizeof(SLastrowInfo); + *interResBytes = dataBytes + sizeof(SLastrowInfo); } else { return TSDB_CODE_INVALID_SQL; } @@ -4836,7 +4843,7 @@ SQLAggFuncElem aAggs[] = {{ "apercentile", TSDB_FUNC_APERCT, TSDB_FUNC_APERCT, - TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_METRIC, + TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_STABLE, apercentile_function_setup, apercentile_function, apercentile_function_f, @@ -4881,7 +4888,7 @@ SQLAggFuncElem aAggs[] = {{ "last_row", TSDB_FUNC_LAST_ROW, TSDB_FUNC_LAST_ROW, - TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_NEED_TS | + TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY, first_last_function_setup, last_row_function, @@ -4897,7 +4904,7 @@ SQLAggFuncElem aAggs[] = {{ "top", TSDB_FUNC_TOP, TSDB_FUNC_TOP, - TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_NEED_TS | + TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY, top_bottom_function_setup, top_function, @@ -4913,7 +4920,7 @@ SQLAggFuncElem aAggs[] = {{ "bottom", TSDB_FUNC_BOTTOM, TSDB_FUNC_BOTTOM, - TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_NEED_TS | + TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY, top_bottom_function_setup, bottom_function, @@ -5079,7 +5086,7 @@ SQLAggFuncElem aAggs[] = {{ "arithmetic", TSDB_FUNC_ARITHM, TSDB_FUNC_ARITHM, - TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_NEED_TS, + TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS, function_setup, arithmetic_function, arithmetic_function_f, @@ -5140,7 +5147,7 @@ SQLAggFuncElem aAggs[] = {{ "interp", TSDB_FUNC_INTERP, TSDB_FUNC_INTERP, - TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_NEED_TS, + TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS, function_setup, interp_function, do_sum_f, // todo filter handle @@ -5239,4 +5246,19 @@ SQLAggFuncElem aAggs[] = {{ sumrate_func_merge, sumrate_func_second_merge, data_req_load_info, + }, + { + // 34 + "tid_tag", // return table id and the corresponding tags for join match + TSDB_FUNC_TID_TAG, + TSDB_FUNC_TID_TAG, + TSDB_FUNCSTATE_MO, + function_setup, + noop1, + noop2, + no_next_step, + noop1, + noop1, + noop1, + data_req_load_info, }}; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 41a5f5177d..27cd613ac2 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -296,17 +296,17 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { break; } - case TSDB_SQL_CREATE_DNODE: { // todo parse hostname - const char* msg = "invalid ip address"; + case TSDB_SQL_CREATE_DNODE: { // todo hostname + const char* msg = "invalid host name (ip address)"; if (pInfo->pDCLInfo->nTokens > 1) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg); } - SSQLToken* pIpAddr = &pInfo->pDCLInfo->a[0]; - if (!validateIpAddress(pIpAddr->z, pIpAddr->n)) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg); - } +// SSQLToken* pIpAddr = &pInfo->pDCLInfo->a[0]; +// if (!validateIpAddress(pIpAddr->z, pIpAddr->n)) { +// return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg); +// } break; } @@ -1325,7 +1325,7 @@ SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t outputIndex, int32_t c pQueryInfo->type = TSDB_QUERY_TYPE_STABLE_QUERY; } else { index.columnIndex = colIndex; - pQueryInfo->type = TSDB_QUERY_TYPE_PROJECTION_QUERY; + pQueryInfo->type |= TSDB_QUERY_TYPE_PROJECTION_QUERY; } return tscSqlExprAppend(pQueryInfo, functionId, &index, pSchema->type, pSchema->bytes, @@ -2342,7 +2342,7 @@ bool hasUnsupportFunctionsForSTableQuery(SQueryInfo* pQueryInfo) { size_t size = tscSqlExprNumOfExprs(pQueryInfo); for (int32_t i = 0; i < size; ++i) { int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId; - if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_METRIC) == 0) { + if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_STABLE) == 0) { invalidSqlErrMsg(pQueryInfo->msg, msg3); return true; } @@ -5111,7 +5111,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo) { doUpdateSqlFunctionForColPrj(pQueryInfo); } } else { - if ((pQueryInfo->type & TSDB_QUERY_TYPE_PROJECTION_QUERY) == TSDB_QUERY_TYPE_PROJECTION_QUERY) { + if ((pQueryInfo->type & TSDB_QUERY_TYPE_PROJECTION_QUERY) != 0) { if (numOfAggregation > 0 && pQueryInfo->groupbyExpr.numOfGroupCols == 0) { return invalidSqlErrMsg(pQueryInfo->msg, msg3); } @@ -5747,6 +5747,8 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { if (code != TSDB_CODE_SUCCESS) { return code; } + } else { + TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TABLE_QUERY); } // parse the group by clause in the first place diff --git a/src/client/src/tscSchemaUtil.c b/src/client/src/tscSchemaUtil.c index c898f05a77..35bc3c4a80 100644 --- a/src/client/src/tscSchemaUtil.c +++ b/src/client/src/tscSchemaUtil.c @@ -83,7 +83,6 @@ STableComInfo tscGetTableInfo(const STableMeta* pTableMeta) { return pTableMeta->tableInfo; } - bool isValidSchema(struct SSchema* pSchema, int32_t numOfCols) { if (!VALIDNUMOFCOLS(numOfCols)) { return false; @@ -184,7 +183,6 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size return pTableMeta; } - /** * the TableMeta data format in memory is as follows: * diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 4aee787271..0ab830bf70 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -45,15 +45,15 @@ void tscSaveSubscriptionProgress(void* sub); static int32_t minMsgSize() { return tsRpcHeadSize + 100; } -static void tscSetDnodeIpList(SSqlObj* pSql, STableMeta* pTableMeta) { +static void tscSetDnodeIpList(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) { SRpcIpSet* pIpList = &pSql->ipList; - pIpList->numOfIps = pTableMeta->vgroupInfo.numOfIps; + pIpList->numOfIps = pVgroupInfo->numOfIps; pIpList->inUse = 0; - for(int32_t i = 0; i < pTableMeta->vgroupInfo.numOfIps; ++i) { - strcpy(pIpList->fqdn[i], pTableMeta->vgroupInfo.ipAddr[i].fqdn); - pIpList->port[i] = pTableMeta->vgroupInfo.ipAddr[i].port; + for(int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) { + strcpy(pIpList->fqdn[i], pVgroupInfo->ipAddr[i].fqdn); + pIpList->port[i] = pVgroupInfo->ipAddr[i].port; } } @@ -198,7 +198,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { } if (pSql->cmd.command < TSDB_SQL_MGMT) { - tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList.port); + tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList.port[0]); memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen); SRpcMsg rpcMsg = { @@ -341,7 +341,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { tscTrace("%p cmd:%d code:%s rsp len:%d", pSql, pCmd->command, tstrerror(pRes->code), pRes->rspLen); } } - + if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql); @@ -559,7 +559,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { // pSql->cmd.payloadLen is set during copying data into payload pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; - tscSetDnodeIpList(pSql, pTableMeta); + tscSetDnodeIpList(pSql, &pTableMeta->vgroupInfo); tscTrace("%p build submit msg, vgId:%d numOfVgroup:%d numberOfIP:%d", pSql, vgId, htonl(pMsgDesc->numOfVnodes), pSql->ipList.numOfIps); return TSDB_CODE_SUCCESS; @@ -589,18 +589,65 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) { return size; } -static char *doSerializeTableInfo(SSqlObj *pSql, int32_t vgId, char *pMsg) { +static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) { STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0); STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; - tscTrace("%p vgId:%d, query on table:%s, uid:%" PRIu64, pSql, vgId, pTableMetaInfo->name, pTableMeta->uid); + if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo) || pTableMetaInfo->pVgroupTables == NULL) { + + SCMVgroupInfo* pVgroupInfo = NULL; + if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { + pVgroupInfo = &pTableMeta->vgroupInfo; + } else { + int32_t index = pTableMetaInfo->vgroupIndex; + assert(index >= 0); + + pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index]; + tscTrace("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, pTableMetaInfo->vgroupList->numOfVgroups); + } + + tscSetDnodeIpList(pSql, pVgroupInfo); + pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId); + + STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; + pTableIdInfo->tid = htonl(pTableMeta->sid); + pTableIdInfo->uid = htobe64(pTableMeta->uid); + pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid)); + + pQueryMsg->numOfTables = htonl(1); // set the number of tables + + pMsg += sizeof(STableIdInfo); + } else { + int32_t index = pTableMetaInfo->vgroupIndex; + int32_t numOfVgroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables); + assert(index >= 0 && index < numOfVgroups); + + tscTrace("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups); + + SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index); + + // set the vgroup info + tscSetDnodeIpList(pSql, &pTableIdList->vgInfo); + pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId); + + int32_t numOfTables = taosArrayGetSize(pTableIdList->itemList); + pQueryMsg->numOfTables = htonl(numOfTables); // set the number of tables + + // serialize each table id info + for(int32_t i = 0; i < numOfTables; ++i) { + STableIdInfo* pItem = taosArrayGet(pTableIdList->itemList, i); + + STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; + pTableIdInfo->tid = htonl(pItem->tid); + pTableIdInfo->uid = htobe64(pItem->uid); +// pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid)); + pMsg += sizeof(STableIdInfo); + } + } - STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; - pTableIdInfo->sid = htonl(pTableMeta->sid); - pTableIdInfo->uid = htobe64(pTableMeta->uid); - pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid)); + tscTrace("%p vgId:%d, query on table:%s, uid:%" PRIu64, pSql, htonl(pQueryMsg->head.vgId), pTableMetaInfo->name, + pTableMeta->uid); - pMsg += sizeof(STableIdInfo); return pMsg; } @@ -637,38 +684,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart; - int32_t msgLen = 0; - int32_t numOfTables = 0; - int32_t numOfTags = taosArrayGetSize(pTableMetaInfo->tagColList); - - if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { - numOfTables = 1; - tscSetDnodeIpList(pSql, pTableMeta); - pQueryMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId); - tscTrace("%p queried tables:%d, table name: %s", pSql, 1, pTableMetaInfo->name); - } else { // query super table - int32_t index = pTableMetaInfo->vgroupIndex; - if (index < 0) { - tscError("%p error vgroupIndex:%d", pSql, index); - return -1; - } - - SCMVgroupInfo* pVgroupInfo = &pTableMetaInfo->vgroupList->vgroups[index]; - - pSql->ipList.numOfIps = pVgroupInfo->numOfIps; // todo fix me - pSql->ipList.inUse = 0; + int32_t numOfTags = taosArrayGetSize(pTableMetaInfo->tagColList); - for(int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) { - strcpy(pSql->ipList.fqdn[i], pVgroupInfo->ipAddr[i].fqdn); - pSql->ipList.port[i] = pVgroupInfo->ipAddr[i].port; - } - - tscTrace("%p query on super table, numOfVgroup:%d, vgroupIndex:%d", pSql, pTableMetaInfo->vgroupList->numOfVgroups, index); - - pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId); - numOfTables = 1; - } - if (pQueryInfo->order.order == TSDB_ORDER_ASC) { pQueryMsg->window.skey = htobe64(pQueryInfo->window.skey); pQueryMsg->window.ekey = htobe64(pQueryInfo->window.ekey); @@ -677,7 +694,6 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->window.ekey = htobe64(pQueryInfo->window.skey); } - pQueryMsg->numOfTables = htonl(numOfTables); pQueryMsg->order = htons(pQueryInfo->order.order); pQueryMsg->orderColId = htons(pQueryInfo->order.orderColId); pQueryMsg->interpoType = htons(pQueryInfo->interpoType); @@ -782,10 +798,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pSqlFuncExpr = (SSqlFuncMsg *)pMsg; } - + // serialize the table info (sid, uid, tags) - pMsg = doSerializeTableInfo(pSql, htons(pQueryMsg->head.vgId), pMsg); - + pMsg = doSerializeTableInfo(pQueryMsg, pSql, pMsg); + SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr; if (pGroupbyExpr->numOfGroupCols > 0) { pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex); @@ -891,8 +907,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pMsg += strlen(pQueryInfo->tagCond.tbnameCond.cond) + 1; } - // tbname in/like query expression should be sent to mgmt node - msgLen = pMsg - pStart; + int32_t msgLen = pMsg - pStart; tscTrace("%p msg built success,len:%d bytes", pSql, msgLen); pCmd->payloadLen = msgLen; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 8475b02aba..e4712f0e49 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -14,10 +14,12 @@ */ #include "tscSubquery.h" +#include +#include #include "os.h" #include "qtsbuf.h" -#include "tsclient.h" #include "tscLog.h" +#include "tsclient.h" typedef struct SInsertSupporter { SSubqueryState* pState; @@ -27,7 +29,7 @@ typedef struct SInsertSupporter { static void freeJoinSubqueryObj(SSqlObj* pSql); static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql); -static bool doCompare(int32_t order, int64_t left, int64_t right) { +static bool tsCompare(int32_t order, int64_t left, int64_t right) { if (order == TSDB_ORDER_ASC) { return left < right; } else { @@ -35,8 +37,8 @@ static bool doCompare(int32_t order, int64_t left, int64_t right) { } } -static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSupporter1, - SJoinSubquerySupporter* pSupporter2, TSKEY* st, TSKEY* et) { +static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, + SJoinSupporter* pSupporter2, TSKEY* st, TSKEY* et) { STSBuf* output1 = tsBufCreate(true); STSBuf* output2 = tsBufCreate(true); @@ -82,17 +84,16 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor STSElem elem2 = tsBufGetElem(pSupporter2->pTSBuf); #ifdef _DEBUG_VIEW - // for debug purpose tscPrint("%" PRId64 ", tags:%d \t %" PRId64 ", tags:%d", elem1.ts, elem1.tag, elem2.ts, elem2.tag); #endif - if (elem1.tag < elem2.tag || (elem1.tag == elem2.tag && doCompare(order, elem1.ts, elem2.ts))) { + if (elem1.tag < elem2.tag || (elem1.tag == elem2.tag && tsCompare(order, elem1.ts, elem2.ts))) { if (!tsBufNextPos(pSupporter1->pTSBuf)) { break; } numOfInput1++; - } else if (elem1.tag > elem2.tag || (elem1.tag == elem2.tag && doCompare(order, elem2.ts, elem1.ts))) { + } else if (elem1.tag > elem2.tag || (elem1.tag == elem2.tag && tsCompare(order, elem2.ts, elem1.ts))) { if (!tsBufNextPos(pSupporter2->pTSBuf)) { break; } @@ -156,8 +157,8 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor } // todo handle failed to create sub query -SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index) { - SJoinSubquerySupporter* pSupporter = calloc(1, sizeof(SJoinSubquerySupporter)); +SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index) { + SJoinSupporter* pSupporter = calloc(1, sizeof(SJoinSupporter)); if (pSupporter == NULL) { return NULL; } @@ -185,7 +186,7 @@ SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pS return pSupporter; } -void tscDestroyJoinSupporter(SJoinSubquerySupporter* pSupporter) { +void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) { if (pSupporter == NULL) { return; } @@ -234,14 +235,12 @@ bool needSecondaryQuery(SQueryInfo* pQueryInfo) { */ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { int32_t numOfSub = 0; - SJoinSubquerySupporter* pSupporter = NULL; + SJoinSupporter* pSupporter = NULL; /* * If the columns are not involved in the final select clause, * the corresponding query will not be issued. */ - SSubqueryState* pState = NULL; - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { pSupporter = pSql->pSubs[i]->param; if (taosArrayGetSize(pSupporter->exprList) > 0) { @@ -256,7 +255,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { "select clause", pSql, pSql->numOfSubs, numOfSub); //the subqueries that do not actually launch the secondary query to virtual node is set as completed. - pState = pSupporter->pState; + SSubqueryState* pState = pSupporter->pState; pState->numOfTotal = pSql->numOfSubs; pState->numOfCompleted = (pSql->numOfSubs - numOfSub); @@ -377,7 +376,7 @@ void freeJoinSubqueryObj(SSqlObj* pSql) { continue; } - SJoinSubquerySupporter* p = pSub->param; + SJoinSupporter* p = pSub->param; pState = p->pState; tscDestroyJoinSupporter(p); @@ -391,14 +390,13 @@ void freeJoinSubqueryObj(SSqlObj* pSql) { pSql->numOfSubs = 0; } -static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSubquerySupporter* pSupporter) { +static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSupporter* pSupporter) { int32_t numOfTotal = pSupporter->pState->numOfTotal; int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1); + pSqlObj->res.code = pSupporter->pState->code; if (finished >= numOfTotal) { - pSqlObj->res.code = abs(pSupporter->pState->code); tscError("%p all subquery return and query failed, global code:%d", pSqlObj, pSqlObj->res.code); - freeJoinSubqueryObj(pSqlObj); } } @@ -411,35 +409,36 @@ static void updateQueryTimeRange(SQueryInfo* pQueryInfo, int64_t st, int64_t et) pQueryInfo->window.ekey = et; } -static void tSIntersectionAndLaunchSecQuery(SJoinSubquerySupporter* pSupporter, SSqlObj* pSql) { +static void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj* pSql) { SSqlObj* pParentSql = pSupporter->pObj; - SSqlCmd* pCmd = &pSql->cmd; - SSqlRes* pRes = &pSql->res; +// SSqlCmd* pCmd = &pSql->cmd; +// SSqlRes* pRes = &pSql->res; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); +// SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo* pParentQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex); - if (tscNonOrderedProjectionQueryOnSTable(pParentQueryInfo, 0)) { - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - assert(pQueryInfo->numOfTables == 1); - - // for projection query, need to try next vnode -// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes; - int32_t totalVnode = 0; - if ((++pTableMetaInfo->vgroupIndex) < totalVnode) { - tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql, - pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVnode, pRes->numOfTotal); - - pSql->cmd.command = TSDB_SQL_SELECT; - pSql->fp = tscJoinQueryCallback; - tscProcessSql(pSql); - - return; - } - } +// if (tscNonOrderedProjectionQueryOnSTable(pParentQueryInfo, 0)) { +// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); +// assert(pQueryInfo->numOfTables == 1); +// +// // for projection query, need to try next vnode +//// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes; +// int32_t totalVnode = 0; +// if ((++pTableMetaInfo->vgroupIndex) < totalVnode) { +// tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql, +// pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVnode, pRes->numOfTotal); +// +// pSql->cmd.command = TSDB_SQL_SELECT; +// pSql->fp = tscJoinQueryCallback; +// tscProcessSql(pSql); +// +// return; +// } +// } int32_t numOfTotal = pSupporter->pState->numOfTotal; int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1); + printf("---------------------------------%d, total:%d\n", finished, numOfTotal); if (finished >= numOfTotal) { assert(finished == numOfTotal); @@ -453,11 +452,10 @@ static void tSIntersectionAndLaunchSecQuery(SJoinSubquerySupporter* pSupporter, tscTrace("%p all subqueries retrieve ts complete, do ts block intersect", pParentSql); - SJoinSubquerySupporter* p1 = pParentSql->pSubs[0]->param; - SJoinSubquerySupporter* p2 = pParentSql->pSubs[1]->param; + SJoinSupporter* p1 = pParentSql->pSubs[0]->param; + SJoinSupporter* p2 = pParentSql->pSubs[1]->param; TSKEY st, et; - int64_t num = doTSBlockIntersect(pParentSql, p1, p2, &st, &et); if (num <= 0) { // no result during ts intersect tscTrace("%p free all sub SqlObj and quit", pParentSql); @@ -469,8 +467,134 @@ static void tSIntersectionAndLaunchSecQuery(SJoinSubquerySupporter* pSupporter, } } +int32_t tagsOrderCompar(const void* p1, const void* p2) { + STidTags* t1 = (STidTags*) p1; + STidTags* t2 = (STidTags*) p2; + + if (t1->vgId != t2->vgId) { + return (t1->vgId > t2->vgId)? 1:-1; + } else { + if (t1->tid != t2->tid) { + return (t1->tid > t2->tid)? 1:-1; + } else { + return 0; + } + } +} + +static void doBuildVgroupTableInfo(SArray* res, STableMetaInfo* pTableMetaInfo) { + SArray* pGroup = taosArrayInit(4, sizeof(SVgroupTableInfo)); + + SArray* vgTableIdItem = taosArrayInit(4, sizeof(STableIdInfo)); + int32_t size = taosArrayGetSize(res); + + STidTags* prev = taosArrayGet(res, 0); + int32_t prevVgId = prev->vgId; + + STableIdInfo item = {.uid = prev->uid, .tid = prev->tid, .key = INT64_MIN}; + taosArrayPush(vgTableIdItem, &item); + + for(int32_t k = 1; k < size; ++k) { + STidTags* t1 = taosArrayGet(res, k); + if (prevVgId != t1->vgId) { + + SVgroupTableInfo info = {0}; + + SVgroupsInfo* pvg = pTableMetaInfo->vgroupList; + for(int32_t m = 0; m < pvg->numOfVgroups; ++m) { + if (prevVgId == pvg->vgroups[m].vgId) { + info.vgInfo = pvg->vgroups[m]; + break; + } + } + + assert(info.vgInfo.numOfIps != 0); + info.itemList = vgTableIdItem; + taosArrayPush(pGroup, &info); + + vgTableIdItem = taosArrayInit(4, sizeof(STableIdInfo)); + STableIdInfo item1 = {.uid = t1->uid, .tid = t1->tid, .key = INT64_MIN}; + taosArrayPush(vgTableIdItem, &item1); + prevVgId = t1->vgId; + } else { + taosArrayPush(vgTableIdItem, &item); + } + } + + if (taosArrayGetSize(vgTableIdItem) > 0) { + SVgroupTableInfo info = {0}; + SVgroupsInfo* pvg = pTableMetaInfo->vgroupList; + + for(int32_t m = 0; m < pvg->numOfVgroups; ++m) { + if (prevVgId == pvg->vgroups[m].vgId) { + info.vgInfo = pvg->vgroups[m]; + break; + } + } + + assert(info.vgInfo.numOfIps != 0); + info.itemList = vgTableIdItem; + taosArrayPush(pGroup, &info); + } + + pTableMetaInfo->pVgroupTables = pGroup; +} + +static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* pParent) { + SSqlCmd* pCmd = &pSql->cmd; + tscClearSubqueryInfo(pCmd); + tscFreeSqlResult(pSql); + + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + + tscInitQueryInfo(pQueryInfo); + + TSDB_QUERY_CLEAR_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY); + TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY); + + pCmd->command = TSDB_SQL_SELECT; + pSql->fp = tscJoinQueryCallback; + + SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1}; + + SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; + tscAddSpecialColumnForSelect(pQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL); + + // set the tags value for ts_comp function + SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, 0); + int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->uid); + + pExpr->param->i64Key = tagColIndex; + pExpr->numOfParams = 1; + + // add the filter tag column + if (pSupporter->colList != NULL) { + size_t s = taosArrayGetSize(pSupporter->colList); + + for (int32_t i = 0; i < s; ++i) { + SColumn *pCol = taosArrayGetP(pSupporter->colList, i); + + if (pCol->numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered. + SColumn *p = tscColumnClone(pCol); + taosArrayPush(pQueryInfo->colList, &p); + } + } + } + + size_t numOfCols = taosArrayGetSize(pQueryInfo->colList); + + tscTrace( + "%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, transfer to ts_comp query to retrieve timestamps, " + "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", + pParent, pSql, 0, pTableMetaInfo->vgroupIndex, pQueryInfo->type, tscSqlExprNumOfExprs(pQueryInfo), + numOfCols, pQueryInfo->fieldsInfo.numOfOutput, pTableMetaInfo->name); + + tscProcessSql(pSql); +} + static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { - SJoinSubquerySupporter* pSupporter = (SJoinSubquerySupporter*)param; + SJoinSupporter* pSupporter = (SJoinSupporter*)param; SSqlObj* pParentSql = pSupporter->pObj; SSqlObj* pSql = (SSqlObj*)tres; @@ -478,21 +602,134 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) { - if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { - tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, numOfRows, - pSupporter->pState->code); - - quitAllSubquery(pParentSql, pSupporter); - return; +// if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { +// tscError("%p abort query due to other subquery failure. code:%d, global code:%s", pSql, numOfRows, +// tstrerror(pSupporter->pState->code)); +// +// quitAllSubquery(pParentSql, pSupporter); +// return; +// } +// +// if (numOfRows < 0) { +// tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex); +// pSupporter->pState->code = numOfRows; +// quitAllSubquery(pParentSql, pSupporter); +// return; +// } + + // response of tag retrieve + if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) { + if (numOfRows == 0 || pSql->res.completed) { + + if (numOfRows > 0) { + size_t length = pSupporter->totalLen + pSql->res.rspLen; + char* tmp = realloc(pSupporter->pIdTagList, length); + assert(tmp != NULL); + pSupporter->pIdTagList = tmp; + + memcpy(pSupporter->pIdTagList, pSql->res.data, pSql->res.rspLen); + pSupporter->totalLen += pSql->res.rspLen; + pSupporter->num += pSql->res.numOfRows; + } + + int32_t numOfTotal = pSupporter->pState->numOfTotal; + int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1); + + if (finished < numOfTotal) { + return; + } + + // all subqueries are returned, start to compare the tags + assert(finished == numOfTotal); + tscTrace("%p all subqueries retrieve tags complete, do tags match", pParentSql); + + SJoinSupporter* p1 = pParentSql->pSubs[0]->param; + SJoinSupporter* p2 = pParentSql->pSubs[1]->param; + + qsort(p1->pIdTagList, p1->num, p1->tagSize, tagsOrderCompar); + qsort(p2->pIdTagList, p2->num, p2->tagSize, tagsOrderCompar); + + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + + SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);// todo: tags mismatch, tags not completed + + SColumn *pCol = taosArrayGetP(pTableMetaInfo->tagColList, 0); + SSchema *pColSchema = &pSchema[pCol->colIndex.columnIndex]; + + SArray* s1 = taosArrayInit(p1->num, p1->tagSize); + SArray* s2 = taosArrayInit(p2->num, p2->tagSize); + + int32_t i = 0, j = 0; + while(i < p1->num && j < p2->num) { + STidTags* pp1 = (STidTags*) p1->pIdTagList + i * p1->tagSize; + STidTags* pp2 = (STidTags*) p2->pIdTagList + j * p2->tagSize; + + int32_t ret = doCompare(pp1->tag, pp2->tag, pColSchema->type, pColSchema->bytes); + if (ret == 0) { + taosArrayPush(s1, pp1); + taosArrayPush(s2, pp2); + j++; + i++; + } else if (ret > 0) { + j++; + } else { + i++; + } + } + + if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) {// no results,return. + tscTrace("%p free all sub SqlObj and quit", pParentSql); + freeJoinSubqueryObj(pParentSql); + return; + } else { + SSqlCmd* pSubCmd1 = &pParentSql->pSubs[0]->cmd; + SSqlCmd* pSubCmd2 = &pParentSql->pSubs[1]->cmd; + + SQueryInfo* pQueryInfo1 = tscGetQueryInfoDetail(pSubCmd1, 0); + STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo1, 0); + doBuildVgroupTableInfo(s1, pTableMetaInfo1); + + SQueryInfo* pQueryInfo2 = tscGetQueryInfoDetail(pSubCmd2, 0); + STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0); + doBuildVgroupTableInfo(s2, pTableMetaInfo2); + + pSupporter->pState->numOfCompleted = 0; + pSupporter->pState->code = 0; + pSupporter->pState->numOfTotal = 2; + + for(int32_t m = 0; m < pParentSql->numOfSubs; ++m) { + SSqlObj* psub = pParentSql->pSubs[m]; + issueTSCompQuery(psub, psub->param, pParentSql); + } + } + + } else { + size_t length = pSupporter->totalLen + pSql->res.rspLen; + char* tmp = realloc(pSupporter->pIdTagList, length); + assert(tmp != NULL); + + pSupporter->pIdTagList = tmp; + + memcpy(pSupporter->pIdTagList, pSql->res.data, pSql->res.rspLen); + pSupporter->totalLen += pSql->res.rspLen; + pSupporter->num += pSql->res.numOfRows; + + // continue retrieve data from vnode + taos_fetch_rows_a(tres, joinRetrieveCallback, param); } - if (numOfRows < 0) { - tscError("%p sub query failed, code:%d, index:%d", pSql, numOfRows, pSupporter->subqueryIndex); - pSupporter->pState->code = numOfRows; - quitAllSubquery(pParentSql, pSupporter); - return; - } else if (numOfRows == 0) { + return; + } + + if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) { + if (numOfRows < 0) { + tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex); + pSupporter->pState->code = numOfRows; + quitAllSubquery(pParentSql, pSupporter); + return; + } + + if (numOfRows == 0) { tSIntersectionAndLaunchSecQuery(pSupporter, pSql); return; } @@ -590,16 +827,16 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { } } -static SJoinSubquerySupporter* tscUpdateSubqueryStatus(SSqlObj* pSql, int32_t numOfFetch) { +static SJoinSupporter* tscUpdateSubqueryStatus(SSqlObj* pSql, int32_t numOfFetch) { int32_t notInvolved = 0; - SJoinSubquerySupporter* pSupporter = NULL; + SJoinSupporter* pSupporter = NULL; SSubqueryState* pState = NULL; for(int32_t i = 0; i < pSql->numOfSubs; ++i) { if (pSql->pSubs[i] == NULL) { notInvolved++; } else { - pSupporter = (SJoinSubquerySupporter*)pSql->pSubs[i]->param; + pSupporter = (SJoinSupporter*)pSql->pSubs[i]->param; pState = pSupporter->pState; } } @@ -666,7 +903,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { // TODO multi-vnode retrieve for projection query with limitation has bugs, since the global limiation is not handled tscTrace("%p retrieve data from %d subqueries", pSql, numOfFetch); - SJoinSubquerySupporter* pSupporter = tscUpdateSubqueryStatus(pSql, numOfFetch); + SJoinSupporter* pSupporter = tscUpdateSubqueryStatus(pSql, numOfFetch); for (int32_t i = 0; i < pSql->numOfSubs; ++i) { SSqlObj* pSql1 = pSql->pSubs[i]; @@ -677,7 +914,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { SSqlRes* pRes1 = &pSql1->res; SSqlCmd* pCmd1 = &pSql1->cmd; - pSupporter = (SJoinSubquerySupporter*)pSql1->param; + pSupporter = (SJoinSupporter*)pSql1->param; // wait for all subqueries completed SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd1, 0); @@ -746,13 +983,13 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) { void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { SSqlObj* pSql = (SSqlObj*)tres; - SJoinSubquerySupporter* pSupporter = (SJoinSubquerySupporter*)param; + SJoinSupporter* pSupporter = (SJoinSupporter*)param; // There is only one subquery and table for each subquery. SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); assert(pQueryInfo->numOfTables == 1 && pSql->cmd.numOfClause == 1); - if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) { + if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) { if (code != TSDB_CODE_SUCCESS) { // direct call joinRetrieveCallback and set the error code joinRetrieveCallback(param, pSql, code); } else { // first stage query, continue to retrieve compressed time stamp data @@ -817,7 +1054,7 @@ static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code); static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj); // todo merge with callback -int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySupporter *pSupporter) { +int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter *pSupporter) { SSqlCmd * pCmd = &pSql->cmd; SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); @@ -875,42 +1112,78 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu pSupporter->groupbyExpr = pNewQueryInfo->groupbyExpr; memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr)); - SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1}; - SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; - tscInitQueryInfo(pNewQueryInfo); - tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL); - - // set the tags value for ts_comp function - SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0); - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0); - int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->uid); - pExpr->param->i64Key = tagColIndex; - pExpr->numOfParams = 1; - - // add the filter tag column - if (pSupporter->colList != NULL) { - size_t s = taosArrayGetSize(pSupporter->colList); + if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { // return the tableId & tag + SSchema s = {0}; + SColumnIndex index = {0}; - for (int32_t i = 0; i < s; ++i) { - SColumn *pCol = taosArrayGetP(pSupporter->colList, i); - - if (pCol->numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered. - SColumn* p = tscColumnClone(pCol); - taosArrayPush(pNewQueryInfo->colList, &p); - } + size_t numOfTags = taosArrayGetSize(pTableMetaInfo->tagColList); + for(int32_t i = 0; i < numOfTags; ++i) { + SColumn* c = taosArrayGetP(pTableMetaInfo->tagColList, i); + index = (SColumnIndex) {.tableIndex = 0, .columnIndex = c->colIndex.columnIndex}; + + SSchema* pTagSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta); + s = pTagSchema[c->colIndex.columnIndex]; + + int16_t bytes = 0; + int16_t type = 0; + int16_t inter = 0; + + getResultDataInfo(s.type, s.bytes, TSDB_FUNC_TID_TAG, 0, &type, &bytes, &inter, 0, 0); + + s.type = type; + s.bytes = bytes; + pSupporter->tagSize = s.bytes; } - } - size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList); + // set get tags query type + TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY); + tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TID_TAG, &index, &s, TSDB_COL_TAG); + size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList); - tscTrace("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, transfer to ts_comp query to retrieve timestamps, " - "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", - pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, - tscSqlExprNumOfExprs(pNewQueryInfo), numOfCols, - pNewQueryInfo->fieldsInfo.numOfOutput, pNewQueryInfo->pTableMetaInfo[0]->name); + tscTrace( + "%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, transfer to tid_tag query to retrieve (tableId, tags), " + "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", + pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo), + numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, pNewQueryInfo->pTableMetaInfo[0]->name); + + } else { + SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1}; + SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; + tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL); + + // set the tags value for ts_comp function + SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0); + + int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->uid); + + pExpr->param->i64Key = tagColIndex; + pExpr->numOfParams = 1; + + // add the filter tag column + if (pSupporter->colList != NULL) { + size_t s = taosArrayGetSize(pSupporter->colList); + + for (int32_t i = 0; i < s; ++i) { + SColumn *pCol = taosArrayGetP(pSupporter->colList, i); + + if (pCol->numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered. + SColumn *p = tscColumnClone(pCol); + taosArrayPush(pNewQueryInfo->colList, &p); + } + } + } + + size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList); + + tscTrace( + "%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, transfer to ts_comp query to retrieve timestamps, " + "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", + pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo), + numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, pNewQueryInfo->pTableMetaInfo[0]->name); + } } else { assert(0); SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); @@ -930,7 +1203,7 @@ int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) { tscTrace("%p start launch subquery, total:%d", pSql, pQueryInfo->numOfTables); for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { - SJoinSubquerySupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i); + SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i); if (pSupporter == NULL) { // failed to create support struct, abort current query tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i); @@ -1620,7 +1893,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) { continue; } - SJoinSubquerySupporter *pSupporter = (SJoinSubquerySupporter *)pChildObj->param; + SJoinSupporter *pSupporter = (SJoinSupporter *)pChildObj->param; pState = pSupporter->pState; tscDestroyJoinSupporter(pChildObj->param); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 4656ee71b4..d1cf21e863 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -76,7 +76,9 @@ bool tscQueryOnSTable(SSqlCmd* pCmd) { bool tscQueryTags(SQueryInfo* pQueryInfo) { for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { - if (tscSqlExprGet(pQueryInfo, i)->functionId != TSDB_FUNC_TAGPRJ) { + int32_t functId = tscSqlExprGet(pQueryInfo, i)->functionId; + + if (functId != TSDB_FUNC_TAGPRJ && functId != TSDB_FUNC_TID_TAG) { return false; } } @@ -123,23 +125,23 @@ void tscGetDBInfoFromMeterId(char* tableId, char* db) { db[0] = 0; } -STableIdInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx) { - if (pSidList == NULL) { - tscError("illegal sidlist"); - return 0; - } - - if (idx < 0 || idx >= pSidList->numOfSids) { - int32_t sidRange = (pSidList->numOfSids > 0) ? (pSidList->numOfSids - 1) : 0; - - tscError("illegal sidIdx:%d, reset to 0, sidIdx range:%d-%d", idx, 0, sidRange); - idx = 0; - } - - assert(pSidList->pSidExtInfoList[idx] >= 0); - - return (STableIdInfo*)(pSidList->pSidExtInfoList[idx] + (char*)pSidList); -} +//STableIdInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx) { +// if (pSidList == NULL) { +// tscError("illegal sidlist"); +// return 0; +// } +// +// if (idx < 0 || idx >= pSidList->numOfSids) { +// int32_t sidRange = (pSidList->numOfSids > 0) ? (pSidList->numOfSids - 1) : 0; +// +// tscError("illegal sidIdx:%d, reset to 0, sidIdx range:%d-%d", idx, 0, sidRange); +// idx = 0; +// } +// +// assert(pSidList->pSidExtInfoList[idx] >= 0); +// +// return (STableIdInfo*)(pSidList->pSidExtInfoList[idx] + (char*)pSidList); +//} bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) { if (pQueryInfo == NULL) { @@ -919,7 +921,6 @@ static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SCol STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, pColIndex->tableIndex); SSqlExpr* pExpr = calloc(1, sizeof(SSqlExpr)); - pExpr->functionId = functionId; // set the correct column index @@ -1596,6 +1597,35 @@ void tscClearSubqueryInfo(SSqlCmd* pCmd) { } } +void doRemoveTableMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFromCache) { + if (index < 0 || index >= pQueryInfo->numOfTables) { + return; + } + + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index); + + tscClearTableMetaInfo(pTableMetaInfo, removeFromCache); + free(pTableMetaInfo); + + int32_t after = pQueryInfo->numOfTables - index - 1; + if (after > 0) { + memmove(&pQueryInfo->pTableMetaInfo[index], &pQueryInfo->pTableMetaInfo[index + 1], after * POINTER_BYTES); + } + + pQueryInfo->numOfTables -= 1; +} + +void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache) { + tscTrace("%p deref the table meta in cache, numOfTables:%d", address, pQueryInfo->numOfTables); + + int32_t index = pQueryInfo->numOfTables; + while (index >= 0) { + doRemoveTableMetaInfo(pQueryInfo, --index, removeFromCache); + } + + tfree(pQueryInfo->pTableMetaInfo); +} + void tscFreeQueryInfo(SSqlCmd* pCmd) { if (pCmd == NULL || pCmd->numOfClause == 0) { return; @@ -1606,7 +1636,7 @@ void tscFreeQueryInfo(SSqlCmd* pCmd) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i); freeQueryInfoImpl(pQueryInfo); - tscClearAllTableMetaInfo(pQueryInfo, (const char*)addr, false); + clearAllTableMetaInfo(pQueryInfo, (const char*)addr, false); tfree(pQueryInfo); } @@ -1657,35 +1687,6 @@ STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo* pQueryInfo) { return tscAddTableMetaInfo(pQueryInfo, NULL, NULL, NULL, NULL); } -void doRemoveTableMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFromCache) { - if (index < 0 || index >= pQueryInfo->numOfTables) { - return; - } - - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index); - - tscClearTableMetaInfo(pTableMetaInfo, removeFromCache); - free(pTableMetaInfo); - - int32_t after = pQueryInfo->numOfTables - index - 1; - if (after > 0) { - memmove(&pQueryInfo->pTableMetaInfo[index], &pQueryInfo->pTableMetaInfo[index + 1], after * POINTER_BYTES); - } - - pQueryInfo->numOfTables -= 1; -} - -void tscClearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache) { - tscTrace("%p deref the table meta in cache, numOfTables:%d", address, pQueryInfo->numOfTables); - - int32_t index = pQueryInfo->numOfTables; - while (index >= 0) { - doRemoveTableMetaInfo(pQueryInfo, --index, removeFromCache); - } - - tfree(pQueryInfo->pTableMetaInfo); -} - void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache) { if (pTableMetaInfo == NULL) { return; diff --git a/src/inc/query.h b/src/inc/query.h index ffeb225223..cdadd4759f 100644 --- a/src/inc/query.h +++ b/src/inc/query.h @@ -23,12 +23,12 @@ typedef void* qinfo_t; /** * create the qinfo object according to QueryTableMsg - * @param pVnode + * @param tsdb * @param pQueryTableMsg * @param qinfo * @return */ -int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo); +int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo); /** * Destroy QInfo object diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index a712b14c8f..68cb2fbfd2 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -298,9 +298,11 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); #define TSDB_QUERY_TYPE_TAG_FILTER_QUERY 0x400u #define TSDB_QUERY_TYPE_INSERT 0x100u // insert type #define TSDB_QUERY_TYPE_IMPORT 0x200u // import data +#define TSDB_QUERY_TYPE_MULTITABLE_QUERY 0x800u #define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0) #define TSDB_QUERY_SET_TYPE(x, _type) ((x) |= (_type)) +#define TSDB_QUERY_CLEAR_TYPE(x, _type) ((x) &= (~_type)) #define TSDB_QUERY_RESET_TYPE(x) ((x) = TSDB_QUERY_TYPE_NON_TYPE) #define TSDB_ORDER_ASC 1 diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index a7e509f75c..68e72b0964 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -422,8 +422,8 @@ typedef struct SColumnInfo { } SColumnInfo; typedef struct STableIdInfo { - int32_t sid; int64_t uid; + int32_t tid; TSKEY key; // last accessed ts, for subscription } STableIdInfo; @@ -459,9 +459,6 @@ typedef struct { int16_t tagNameRelType; // relation of tag criteria and tbname criteria int16_t interpoType; // interpolate type uint64_t defaultVal; // default value array list - -// int32_t colNameLen; -// int64_t colNameList; int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed int32_t tsLen; // total length of ts comp block int32_t tsNumOfBlocks; // ts comp block numbers @@ -638,34 +635,6 @@ typedef struct { SCMVgroupInfo vgroups[]; } SVgroupsInfo; -//typedef struct { -// int32_t numOfTables; -//// int32_t numOfVgroups; -//// SCMVgroupInfo vgroups[]; -//} SCMSTableVgroupRspMsg; - -//typedef struct { -// int16_t elemLen; -// -// char tableId[TSDB_TABLE_ID_LEN + 1]; -// int16_t orderIndex; -// int16_t orderType; // used in group by xx order by xxx -// -// int16_t rel; // denotes the relation between condition and table list -// -// int32_t tableCond; // offset value of table name condition -// int32_t tableCondLen; -// -// int32_t cond; // offset of column query condition -// int32_t condLen; -// -// int16_t tagCols[TSDB_MAX_TAGS + 1]; // required tag columns, plus one is for table name -// int16_t numOfTags; // required number of tags -// -// int16_t numOfGroupCols; // num of group by columns -// int32_t groupbyTagColumnList; -//} SSuperTableMetaElemMsg; -// //typedef struct { // int32_t numOfTables; // int32_t join; @@ -673,20 +642,6 @@ typedef struct { // int32_t metaElem[TSDB_MAX_JOIN_TABLE_NUM]; //} SSuperTableMetaMsg; -typedef struct { - int32_t nodeId; - uint32_t nodeIp; - uint16_t nodePort; -} SVnodeDesc; - -typedef struct { - SVnodeDesc vpeerDesc[TSDB_MAX_REPLICA_NUM]; - int16_t index; // used locally - int32_t vgId; - int32_t numOfSids; - int32_t pSidExtInfoList[]; // offset value of STableIdInfo -} SVnodeSidList; - typedef struct STableMetaMsg { int32_t contLen; char tableId[TSDB_TABLE_ID_LEN + 1]; // table id diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 7c45460cfd..99a6860d99 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -1252,8 +1252,10 @@ static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) { for (int32_t vn = 0; vn < vgItem->numOfVnodes; ++vn) { SDnodeObj *pDnode = vgItem->vnodeGid[vn].pDnode; if (pDnode == NULL) break; - + + strncpy(pVgroup->vgroups[vg].ipAddr[vn].fqdn, pDnode->dnodeFqdn, tListLen(pDnode->dnodeFqdn)); pVgroup->vgroups[vg].ipAddr[vn].port = htons(tsDnodeShellPort); + pVgroup->vgroups[vg].numOfIps++; } diff --git a/src/query/inc/qresultBuf.h b/src/query/inc/qresultBuf.h index 39600f5129..ebea29feda 100644 --- a/src/query/inc/qresultBuf.h +++ b/src/query/inc/qresultBuf.h @@ -51,7 +51,7 @@ typedef struct SDiskbasedResultBuf { * @param rowSize * @return */ -int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t size, int32_t rowSize); +int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t size, int32_t rowSize, void* handle); /** * diff --git a/src/query/inc/queryExecutor.h b/src/query/inc/queryExecutor.h index cfce3109b4..906dadb317 100644 --- a/src/query/inc/queryExecutor.h +++ b/src/query/inc/queryExecutor.h @@ -169,6 +169,7 @@ typedef struct SQInfo { int32_t code; // error code to returned to client sem_t dataReady; void* tsdb; + int32_t vgId; STableGroupInfo tableIdGroupInfo; // table id list < only includes the STableId list> STableGroupInfo groupInfo; // @@ -185,7 +186,6 @@ typedef struct SQInfo { */ int32_t tableIndex; int32_t numOfGroupResultPages; - TSKEY* tsList; } SQInfo; #endif // TDENGINE_QUERYEXECUTOR_H diff --git a/src/query/inc/sql.y b/src/query/inc/sql.y index 87b974b9ba..c68c6b507a 100644 --- a/src/query/inc/sql.y +++ b/src/query/inc/sql.y @@ -162,7 +162,7 @@ ifnotexists(X) ::= . {X.n = 0;} /////////////////////////////////THE CREATE STATEMENT/////////////////////////////////////// //create option for dnode/db/user/account -cmd ::= CREATE DNODE IPTOKEN(X). { setDCLSQLElems(pInfo, TSDB_SQL_CREATE_DNODE, 1, &X);} +cmd ::= CREATE DNODE ids(X). { setDCLSQLElems(pInfo, TSDB_SQL_CREATE_DNODE, 1, &X);} cmd ::= CREATE ACCOUNT ids(X) PASS ids(Y) acct_optr(Z). { setCreateAcctSQL(pInfo, TSDB_SQL_CREATE_ACCT, &X, &Y, &Z);} cmd ::= CREATE DATABASE ifnotexists(Z) ids(X) db_optr(Y). { setCreateDBSQL(pInfo, TSDB_SQL_CREATE_DB, &X, &Y, &Z);} diff --git a/src/query/inc/tsqlfunction.h b/src/query/inc/tsqlfunction.h index 676be4f7be..b148410bfb 100644 --- a/src/query/inc/tsqlfunction.h +++ b/src/query/inc/tsqlfunction.h @@ -68,16 +68,18 @@ extern "C" { #define TSDB_FUNC_AVG_RATE 32 #define TSDB_FUNC_AVG_IRATE 33 -#define TSDB_FUNCSTATE_SO 0x1U // single output -#define TSDB_FUNCSTATE_MO 0x2U // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM -#define TSDB_FUNCSTATE_STREAM 0x4U // function avail for stream -#define TSDB_FUNCSTATE_METRIC 0x8U // function avail for metric -#define TSDB_FUNCSTATE_OF 0x10U // outer forward -#define TSDB_FUNCSTATE_NEED_TS 0x20U // timestamp is required during query processing -#define TSDB_FUNCSTATE_SELECTIVITY 0x40U // selectivity functions, can exists along with tag columns - -#define TSDB_BASE_FUNC_SO TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_OF -#define TSDB_BASE_FUNC_MO TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_OF +#define TSDB_FUNC_TID_TAG 34 + +#define TSDB_FUNCSTATE_SO 0x1u // single output +#define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM +#define TSDB_FUNCSTATE_STREAM 0x4u // function avail for stream +#define TSDB_FUNCSTATE_STABLE 0x8u // function avail for metric +#define TSDB_FUNCSTATE_OF 0x10u // outer forward +#define TSDB_FUNCSTATE_NEED_TS 0x20u // timestamp is required during query processing +#define TSDB_FUNCSTATE_SELECTIVITY 0x40u // selectivity functions, can exists along with tag columns + +#define TSDB_BASE_FUNC_SO TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_OF +#define TSDB_BASE_FUNC_MO TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_OF #define TSDB_FUNCTIONS_NAME_MAX_LENGTH 16 diff --git a/src/query/src/qresultBuf.c b/src/query/src/qresultBuf.c index 7cdbfbd0ef..d9a3c6a0bf 100644 --- a/src/query/src/qresultBuf.c +++ b/src/query/src/qresultBuf.c @@ -7,7 +7,7 @@ #define DEFAULT_INTERN_BUF_SIZE 16384L -int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t size, int32_t rowSize) { +int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t size, int32_t rowSize, void* handle) { SDiskbasedResultBuf* pResBuf = calloc(1, sizeof(SDiskbasedResultBuf)); pResBuf->numOfRowsPerPage = (DEFAULT_INTERN_BUF_SIZE - sizeof(tFilePage)) / rowSize; pResBuf->numOfPages = size; @@ -41,11 +41,13 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t si pResBuf->pBuf = mmap(NULL, pResBuf->totalBufSize, PROT_READ | PROT_WRITE, MAP_SHARED, pResBuf->fd, 0); if (pResBuf->pBuf == MAP_FAILED) { - qError("QInfo:%p failed to map temp file: %s. %s", pResBuf->path, strerror(errno)); + qError("QInfo:%p failed to map temp file: %s. %s", handle, pResBuf->path, strerror(errno)); return TSDB_CODE_CLI_OUT_OF_MEMORY; // todo change error code } - qTrace("create tmp file for output result, %s, " PRId64 "bytes", pResBuf->path, pResBuf->totalBufSize); + qTrace("QInfo:%p create tmp file for output result, %s, %" PRId64 "bytes", handle, pResBuf->path, + pResBuf->totalBufSize); + *pResultBuf = pResBuf; return TSDB_CODE_SUCCESS; } diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index 9f21d0b370..879d8bba72 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -1651,7 +1651,7 @@ static bool needReverseScan(SQuery *pQuery) { static bool onlyQueryTags(SQuery* pQuery) { for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { int32_t functionId = pQuery->pSelectExpr[i].base.functionId; - if (functionId != TSDB_FUNC_TAGPRJ) { + if (functionId != TSDB_FUNC_TAGPRJ && functionId != TSDB_FUNC_TID_TAG) { return false; } } @@ -4210,7 +4210,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv) { return true; } -int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, bool isSTableQuery) { +int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool isSTableQuery) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery *pQuery = pQInfo->runtimeEnv.pQuery; @@ -4238,6 +4238,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, bool isSTableQuery) } pQInfo->tsdb = tsdb; + pQInfo->vgId = vgId; pRuntimeEnv->pQuery = pQuery; pRuntimeEnv->pTSBuf = param; @@ -4259,7 +4260,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, bool isSTableQuery) if (isSTableQuery) { int32_t rows = getInitialPageNum(pQInfo); - code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize); + code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize, pQInfo); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -4278,7 +4279,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, bool isSTableQuery) } else if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) { int32_t rows = getInitialPageNum(pQInfo); - code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize); + code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize, pQInfo); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -5267,11 +5268,11 @@ static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **p *pTableIdList = taosArrayInit(pQueryMsg->numOfTables, sizeof(STableId)); STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; - pTableIdInfo->sid = htonl(pTableIdInfo->sid); + pTableIdInfo->tid = htonl(pTableIdInfo->tid); pTableIdInfo->uid = htobe64(pTableIdInfo->uid); pTableIdInfo->key = htobe64(pTableIdInfo->key); - STableId id = {.uid = pTableIdInfo->uid, .tid = pTableIdInfo->sid}; + STableId id = {.uid = pTableIdInfo->uid, .tid = pTableIdInfo->tid}; taosArrayPush(*pTableIdList, &id); pMsg += sizeof(STableIdInfo); @@ -5279,7 +5280,7 @@ static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **p for (int32_t j = 1; j < pQueryMsg->numOfTables; ++j) { pTableIdInfo = (STableIdInfo *)pMsg; - pTableIdInfo->sid = htonl(pTableIdInfo->sid); + pTableIdInfo->tid = htonl(pTableIdInfo->tid); pTableIdInfo->uid = htobe64(pTableIdInfo->uid); pTableIdInfo->key = htobe64(pTableIdInfo->key); @@ -5479,9 +5480,9 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, strcpy(*tbnameCond, pMsg); pMsg += len; } - - qTrace("qmsg:%p query on %d table(s), qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, ts order:%d, " - "outputCols:%d, numOfCols:%d, interval:%d" PRId64 ", fillType:%d, comptsLen:%d, limit:%" PRId64 ", offset:%" PRId64, + + qTrace("qmsg:%p query %d tables, qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, order:%d, " + "outputCols:%d, numOfCols:%d, interval:%" PRId64 ", fillType:%d, comptsLen:%d, limit:%" PRId64 ", offset:%" PRId64, pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols, pQueryMsg->order, pQueryMsg->numOfOutput, pQueryMsg->numOfCols, pQueryMsg->intervalTime, pQueryMsg->interpoType, pQueryMsg->tsLen, pQueryMsg->limit, pQueryMsg->offset); @@ -5882,7 +5883,7 @@ static bool isValidQInfo(void *param) { static void freeQInfo(SQInfo *pQInfo); -static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, SQInfo *pQInfo, bool isSTable) { +static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, bool isSTable) { int32_t code = TSDB_CODE_SUCCESS; SQuery *pQuery = pQInfo->runtimeEnv.pQuery; @@ -5900,14 +5901,14 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, SQInfo *pQInfo, (!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) { qTrace("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey, pQuery->window.ekey, pQuery->order.order); + setQueryStatus(pQuery, QUERY_COMPLETED); sem_post(&pQInfo->dataReady); - setQueryStatus(pQuery, QUERY_COMPLETED); return TSDB_CODE_SUCCESS; } // filter the qualified - if ((code = doInitQInfo(pQInfo, pTSBuf, tsdb, isSTable)) != TSDB_CODE_SUCCESS) { + if ((code = doInitQInfo(pQInfo, pTSBuf, tsdb, vgId, isSTable)) != TSDB_CODE_SUCCESS) { goto _error; } @@ -6066,7 +6067,7 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { // todo if interpolation exists, the result may be dump to client by several rounds } -int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) { +int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) { assert(pQueryMsg != NULL); int32_t code = TSDB_CODE_SUCCESS; @@ -6106,8 +6107,15 @@ int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) bool isSTableQuery = false; STableGroupInfo groupInfo = {0}; - - if ((pQueryMsg->queryType & TSDB_QUERY_TYPE_STABLE_QUERY) != 0) { + + if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY|TSDB_QUERY_TYPE_TABLE_QUERY)) { + isSTableQuery = TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY); + + STableId *id = taosArrayGet(pTableIdList, 0); + if ((code = tsdbGetOneTableGroup(tsdb, id->uid, &groupInfo)) != TSDB_CODE_SUCCESS) { + goto _query_over; + } + } else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_STABLE_QUERY)) { isSTableQuery = true; STableId *id = taosArrayGet(pTableIdList, 0); @@ -6125,12 +6133,7 @@ int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) goto _query_over; } } else { - assert(taosArrayGetSize(pTableIdList) == 1); - - STableId *id = taosArrayGet(pTableIdList, 0); - if ((code = tsdbGetOneTableGroup(tsdb, id->uid, &groupInfo)) != TSDB_CODE_SUCCESS) { - goto _query_over; - } + assert(0); } (*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, &groupInfo, pTagColumnInfo); @@ -6138,7 +6141,7 @@ int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) code = TSDB_CODE_SERV_OUT_OF_MEMORY; } - code = initQInfo(pQueryMsg, tsdb, *pQInfo, isSTableQuery); + code = initQInfo(pQueryMsg, tsdb, vgId, *pQInfo, isSTableQuery); _query_over: tfree(tagCond); @@ -6147,7 +6150,7 @@ _query_over: // if failed to add ref for all meters in this query, abort current query // atomic_fetch_add_32(&vnodeSelectReqNum, 1); - return TSDB_CODE_SUCCESS; + return code; } void qDestroyQueryInfo(qinfo_t pQInfo) { @@ -6273,31 +6276,60 @@ static void buildTagQueryResult(SQInfo* pQInfo) { assert(num == 1); // only one group SArray* pa = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0); - num = taosArrayGetSize(pa); - assert(num == pQInfo->groupInfo.numOfTables); + assert(num == pQInfo->groupInfo.numOfTables); int16_t type, bytes; - for(int32_t i = 0; i < num; ++i) { - SExprInfo* pExprInfo = pQuery->pSelectExpr; - SGroupItem* item = taosArrayGet(pa, i); - + int32_t functionId = pQuery->pSelectExpr[0].base.functionId; + if (functionId == TSDB_FUNC_TID_TAG) { // return the tags & table Id + assert(pQuery->numOfOutput == 1); + SExprInfo* pExprInfo = &pQuery->pSelectExpr[0]; + + int32_t rsize = pExprInfo->bytes; char* data = NULL; - for(int32_t j = 0; j < pQuery->numOfOutput; ++j) { - // todo check the return value - if (pExprInfo[j].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { - tsdbGetTableName(pQInfo->tsdb, &item->id, &data); - strncpy(pQuery->sdata[j]->data + i * TSDB_TABLE_NAME_LEN, data, TSDB_TABLE_NAME_LEN); - tfree(data); + + for(int32_t i = 0; i < num; ++i) { + SGroupItem* item = taosArrayGet(pa, i); + + char* output = pQuery->sdata[0]->data + i * rsize; + *(int64_t*) output = item->id.uid; // memory align problem + output += sizeof(item->id.uid); + + *(int32_t*) output = item->id.tid; + output += sizeof(item->id.tid); + + *(int32_t*) output = pQInfo->vgId; + output += sizeof(pQInfo->vgId); + + tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo->base.colInfo.colId, &type, &bytes, &data); + memcpy(output, data, bytes); + } - } else { - tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo[j].base.colInfo.colId, &type, &bytes, &data); - assert(bytes == pExprInfo[j].bytes && type == pExprInfo[j].type); - memcpy(pQuery->sdata[j]->data + i * bytes, data, bytes); + qTrace("QInfo:%p create (tableId, tag) info completed, rows:%d", num); + } else { // return only the tags|table name etc. + for(int32_t i = 0; i < num; ++i) { + SExprInfo* pExprInfo = pQuery->pSelectExpr; + SGroupItem* item = taosArrayGet(pa, i); + + char* data = NULL; + for(int32_t j = 0; j < pQuery->numOfOutput; ++j) { + // todo check the return value + if (pExprInfo[j].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { + tsdbGetTableName(pQInfo->tsdb, &item->id, &data); + strncpy(pQuery->sdata[j]->data + i * TSDB_TABLE_NAME_LEN, data, TSDB_TABLE_NAME_LEN); + tfree(data); + + } else { + tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo[j].base.colInfo.colId, &type, &bytes, &data); + assert(bytes == pExprInfo[j].bytes && type == pExprInfo[j].type); + memcpy(pQuery->sdata[j]->data + i * bytes, data, bytes); + } + } - } + + qTrace("QInfo:%p create tag values results completed, rows:%d", num); } pQuery->rec.rows = num; diff --git a/src/query/src/sql.c b/src/query/src/sql.c index d52e58395a..08a8d41c69 100644 --- a/src/query/src/sql.c +++ b/src/query/src/sql.c @@ -27,7 +27,6 @@ #include #include -#include #include #include #include "qsqlparser.h" @@ -203,61 +202,62 @@ typedef union { ** yy_default[] Default action for each state. ** *********** Begin parsing tables **********************************************/ -#define YY_ACTTAB_COUNT (529) +#define YY_ACTTAB_COUNT (531) static const YYACTIONTYPE yy_action[] = { - /* 0 */ 752, 440, 132, 150, 244, 10, 616, 246, 132, 441, - /* 10 */ 132, 155, 821, 41, 43, 20, 35, 36, 820, 154, - /* 20 */ 821, 29, 741, 440, 200, 39, 37, 40, 38, 131, - /* 30 */ 499, 441, 96, 34, 33, 100, 151, 32, 31, 30, - /* 40 */ 41, 43, 741, 35, 36, 152, 136, 163, 29, 727, - /* 50 */ 749, 200, 39, 37, 40, 38, 185, 100, 225, 224, - /* 60 */ 34, 33, 162, 730, 32, 31, 30, 400, 401, 402, + /* 0 */ 752, 440, 133, 151, 244, 10, 616, 246, 133, 441, + /* 10 */ 133, 156, 821, 41, 43, 20, 35, 36, 820, 155, + /* 20 */ 821, 29, 741, 440, 201, 39, 37, 40, 38, 132, + /* 30 */ 499, 441, 97, 34, 33, 101, 152, 32, 31, 30, + /* 40 */ 41, 43, 741, 35, 36, 153, 137, 164, 29, 727, + /* 50 */ 749, 201, 39, 37, 40, 38, 186, 101, 225, 224, + /* 60 */ 34, 33, 163, 730, 32, 31, 30, 400, 401, 402, /* 70 */ 403, 404, 405, 406, 407, 408, 409, 410, 411, 245, - /* 80 */ 730, 41, 43, 188, 35, 36, 215, 236, 197, 29, - /* 90 */ 58, 20, 200, 39, 37, 40, 38, 32, 31, 30, - /* 100 */ 56, 34, 33, 75, 730, 32, 31, 30, 43, 236, - /* 110 */ 35, 36, 776, 817, 195, 29, 20, 20, 200, 39, - /* 120 */ 37, 40, 38, 164, 570, 727, 227, 34, 33, 440, - /* 130 */ 167, 32, 31, 30, 238, 35, 36, 441, 7, 816, - /* 140 */ 29, 61, 110, 200, 39, 37, 40, 38, 223, 228, + /* 80 */ 730, 41, 43, 189, 35, 36, 216, 236, 198, 29, + /* 90 */ 58, 20, 201, 39, 37, 40, 38, 32, 31, 30, + /* 100 */ 56, 34, 33, 76, 730, 32, 31, 30, 43, 236, + /* 110 */ 35, 36, 776, 817, 196, 29, 20, 20, 201, 39, + /* 120 */ 37, 40, 38, 165, 570, 727, 227, 34, 33, 440, + /* 130 */ 168, 32, 31, 30, 238, 35, 36, 441, 7, 816, + /* 140 */ 29, 61, 111, 201, 39, 37, 40, 38, 223, 228, /* 150 */ 727, 727, 34, 33, 50, 728, 32, 31, 30, 15, - /* 160 */ 214, 237, 213, 212, 211, 210, 209, 208, 207, 206, + /* 160 */ 215, 237, 214, 213, 212, 211, 210, 209, 208, 207, /* 170 */ 712, 51, 701, 702, 703, 704, 705, 706, 707, 708, - /* 180 */ 709, 710, 711, 159, 583, 11, 815, 574, 100, 577, - /* 190 */ 100, 580, 168, 159, 583, 222, 221, 574, 16, 577, - /* 200 */ 20, 580, 34, 33, 145, 26, 32, 31, 30, 238, - /* 210 */ 86, 85, 139, 174, 657, 156, 157, 123, 144, 199, - /* 220 */ 182, 715, 179, 714, 148, 156, 157, 159, 583, 531, - /* 230 */ 60, 574, 149, 577, 726, 580, 237, 16, 39, 37, + /* 180 */ 709, 710, 711, 160, 583, 11, 815, 574, 101, 577, + /* 190 */ 101, 580, 169, 160, 583, 222, 221, 574, 16, 577, + /* 200 */ 20, 580, 34, 33, 146, 26, 32, 31, 30, 238, + /* 210 */ 87, 86, 140, 175, 657, 157, 158, 124, 145, 200, + /* 220 */ 183, 715, 180, 714, 149, 157, 158, 160, 583, 531, + /* 230 */ 60, 574, 150, 577, 726, 580, 237, 16, 39, 37, /* 240 */ 40, 38, 27, 775, 26, 59, 34, 33, 551, 552, - /* 250 */ 32, 31, 30, 137, 113, 114, 219, 64, 67, 156, - /* 260 */ 157, 95, 515, 666, 184, 512, 123, 513, 26, 514, - /* 270 */ 523, 147, 127, 125, 240, 88, 87, 187, 42, 158, - /* 280 */ 73, 77, 239, 84, 76, 572, 528, 729, 42, 582, - /* 290 */ 79, 17, 658, 165, 166, 123, 243, 242, 92, 582, + /* 250 */ 32, 31, 30, 138, 114, 115, 68, 64, 67, 157, + /* 260 */ 158, 96, 515, 666, 185, 512, 124, 513, 26, 514, + /* 270 */ 523, 148, 128, 126, 240, 89, 88, 188, 42, 159, + /* 280 */ 74, 78, 239, 85, 77, 572, 528, 729, 42, 582, + /* 290 */ 80, 17, 658, 166, 167, 124, 243, 242, 93, 582, /* 300 */ 47, 542, 543, 600, 581, 45, 13, 12, 584, 576, - /* 310 */ 138, 579, 12, 575, 581, 578, 2, 72, 71, 48, - /* 320 */ 505, 573, 42, 743, 45, 504, 204, 9, 8, 21, - /* 330 */ 21, 140, 519, 582, 520, 517, 141, 518, 83, 82, - /* 340 */ 142, 143, 134, 130, 135, 830, 133, 786, 581, 785, - /* 350 */ 160, 782, 781, 161, 751, 721, 768, 226, 97, 767, - /* 360 */ 111, 112, 516, 668, 205, 109, 128, 24, 218, 220, - /* 370 */ 829, 69, 26, 828, 826, 115, 186, 686, 25, 22, - /* 380 */ 90, 129, 655, 78, 653, 80, 651, 650, 169, 538, - /* 390 */ 124, 648, 189, 647, 646, 644, 636, 193, 52, 740, - /* 400 */ 126, 642, 640, 638, 49, 755, 756, 101, 769, 44, - /* 410 */ 198, 196, 194, 28, 192, 190, 217, 74, 229, 230, - /* 420 */ 202, 232, 231, 614, 233, 234, 53, 235, 241, 170, - /* 430 */ 146, 62, 171, 65, 173, 172, 613, 176, 175, 178, - /* 440 */ 649, 177, 612, 89, 91, 117, 687, 118, 116, 119, - /* 450 */ 120, 643, 104, 102, 122, 725, 106, 103, 105, 121, - /* 460 */ 107, 1, 108, 23, 180, 181, 605, 183, 187, 525, - /* 470 */ 55, 539, 153, 98, 57, 191, 18, 63, 4, 544, - /* 480 */ 99, 5, 585, 3, 19, 14, 201, 6, 203, 480, - /* 490 */ 479, 478, 477, 476, 475, 474, 473, 471, 45, 444, - /* 500 */ 66, 446, 21, 501, 216, 68, 500, 498, 54, 465, - /* 510 */ 46, 463, 455, 70, 461, 457, 459, 453, 451, 472, - /* 520 */ 470, 81, 426, 442, 93, 415, 94, 413, 618, + /* 310 */ 139, 579, 12, 575, 581, 578, 2, 73, 72, 48, + /* 320 */ 505, 573, 42, 743, 45, 504, 205, 9, 8, 21, + /* 330 */ 21, 141, 519, 582, 520, 517, 142, 518, 84, 83, + /* 340 */ 143, 144, 135, 131, 136, 830, 134, 786, 581, 785, + /* 350 */ 161, 782, 781, 162, 751, 721, 768, 226, 98, 767, + /* 360 */ 112, 113, 516, 668, 206, 110, 129, 24, 219, 665, + /* 370 */ 220, 829, 26, 70, 828, 826, 187, 116, 686, 25, + /* 380 */ 91, 22, 130, 655, 79, 653, 81, 651, 650, 538, + /* 390 */ 170, 125, 190, 648, 647, 646, 644, 194, 52, 740, + /* 400 */ 636, 127, 642, 640, 638, 49, 755, 102, 756, 44, + /* 410 */ 769, 199, 197, 195, 193, 191, 28, 218, 75, 229, + /* 420 */ 230, 231, 232, 233, 234, 235, 203, 53, 241, 614, + /* 430 */ 171, 172, 147, 62, 65, 174, 613, 177, 173, 179, + /* 440 */ 612, 176, 649, 178, 181, 643, 123, 687, 117, 119, + /* 450 */ 118, 120, 121, 90, 103, 725, 108, 104, 105, 122, + /* 460 */ 106, 107, 109, 92, 1, 23, 182, 188, 605, 184, + /* 470 */ 525, 55, 539, 57, 99, 154, 192, 18, 63, 4, + /* 480 */ 544, 100, 480, 585, 3, 19, 5, 14, 202, 6, + /* 490 */ 204, 479, 478, 477, 476, 475, 474, 473, 471, 45, + /* 500 */ 217, 444, 66, 21, 501, 500, 46, 498, 54, 465, + /* 510 */ 463, 455, 461, 457, 69, 459, 71, 453, 451, 472, + /* 520 */ 470, 82, 426, 442, 94, 415, 413, 618, 617, 617, + /* 530 */ 95, }; static const YYCODETYPE yy_lookahead[] = { /* 0 */ 207, 1, 256, 206, 207, 256, 204, 205, 256, 9, @@ -300,20 +300,20 @@ static const YYCODETYPE yy_lookahead[] = { /* 370 */ 207, 207, 103, 207, 207, 207, 240, 207, 207, 207, /* 380 */ 59, 207, 207, 207, 207, 207, 207, 207, 207, 107, /* 390 */ 207, 207, 259, 207, 207, 207, 207, 259, 117, 253, - /* 400 */ 207, 207, 207, 207, 119, 208, 208, 252, 208, 116, - /* 410 */ 111, 115, 110, 121, 109, 108, 75, 84, 83, 49, - /* 420 */ 208, 82, 80, 5, 53, 81, 208, 79, 75, 132, - /* 430 */ 208, 212, 5, 212, 58, 132, 5, 5, 132, 58, - /* 440 */ 208, 132, 5, 209, 209, 220, 222, 216, 221, 219, - /* 450 */ 217, 208, 249, 251, 215, 240, 247, 250, 248, 218, - /* 460 */ 246, 213, 245, 210, 132, 58, 86, 124, 104, 97, - /* 470 */ 105, 97, 1, 96, 101, 96, 101, 72, 112, 97, - /* 480 */ 96, 112, 97, 96, 101, 96, 98, 96, 98, 9, - /* 490 */ 5, 5, 5, 5, 1, 5, 5, 5, 101, 76, - /* 500 */ 72, 58, 101, 5, 15, 127, 5, 97, 96, 5, - /* 510 */ 16, 5, 5, 127, 5, 5, 5, 5, 5, 5, - /* 520 */ 5, 58, 58, 76, 21, 59, 21, 58, 0, 267, - /* 530 */ 267, 267, 267, 267, 267, 267, 267, 267, 267, 267, + /* 400 */ 207, 207, 207, 207, 207, 119, 208, 252, 208, 116, + /* 410 */ 208, 111, 115, 110, 109, 108, 121, 75, 84, 83, + /* 420 */ 49, 80, 82, 53, 81, 79, 208, 208, 75, 5, + /* 430 */ 132, 5, 208, 212, 212, 58, 5, 5, 132, 58, + /* 440 */ 5, 132, 208, 132, 132, 208, 215, 222, 221, 216, + /* 450 */ 220, 219, 217, 209, 251, 240, 246, 250, 249, 218, + /* 460 */ 248, 247, 245, 209, 213, 210, 58, 104, 86, 124, + /* 470 */ 97, 105, 97, 101, 96, 1, 96, 101, 72, 112, + /* 480 */ 97, 96, 9, 97, 96, 101, 112, 96, 98, 96, + /* 490 */ 98, 5, 5, 5, 5, 1, 5, 5, 5, 101, + /* 500 */ 15, 76, 72, 101, 5, 5, 16, 97, 96, 5, + /* 510 */ 5, 5, 5, 5, 127, 5, 127, 5, 5, 5, + /* 520 */ 5, 58, 58, 76, 21, 59, 58, 0, 267, 267, + /* 530 */ 21, 267, 267, 267, 267, 267, 267, 267, 267, 267, /* 540 */ 267, 267, 267, 267, 267, 267, 267, 267, 267, 267, /* 550 */ 267, 267, 267, 267, 267, 267, 267, 267, 267, 267, /* 560 */ 267, 267, 267, 267, 267, 267, 267, 267, 267, 267, @@ -333,41 +333,41 @@ static const YYCODETYPE yy_lookahead[] = { /* 700 */ 267, 267, 267, 267, 267, 267, 267, 267, 267, 267, /* 710 */ 267, 267, 267, 267, 267, 267, 267, 267, 267, 267, /* 720 */ 267, 267, 267, 267, 267, 267, 267, 267, 267, 267, - /* 730 */ 267, 267, + /* 730 */ 267, 267, 267, 267, }; #define YY_SHIFT_COUNT (246) #define YY_SHIFT_MIN (0) -#define YY_SHIFT_MAX (528) +#define YY_SHIFT_MAX (527) static const unsigned short int yy_shift_ofst[] = { /* 0 */ 141, 74, 182, 226, 128, 128, 128, 128, 128, 128, /* 10 */ 0, 22, 226, 260, 260, 260, 102, 128, 128, 128, - /* 20 */ 128, 128, 31, 149, 9, 9, 529, 192, 226, 226, + /* 20 */ 128, 128, 31, 149, 9, 9, 531, 192, 226, 226, /* 30 */ 226, 226, 226, 226, 226, 226, 226, 226, 226, 226, /* 40 */ 226, 226, 226, 226, 226, 260, 260, 25, 25, 25, /* 50 */ 25, 25, 25, 42, 25, 165, 128, 128, 135, 135, /* 60 */ 185, 128, 128, 128, 128, 128, 128, 128, 128, 128, /* 70 */ 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, /* 80 */ 128, 128, 128, 128, 128, 128, 128, 128, 128, 128, - /* 90 */ 128, 128, 128, 128, 128, 269, 321, 321, 282, 282, - /* 100 */ 321, 281, 285, 293, 299, 296, 302, 305, 307, 292, - /* 110 */ 269, 321, 321, 341, 341, 321, 333, 335, 370, 342, - /* 120 */ 339, 371, 344, 348, 321, 353, 321, 353, 529, 529, - /* 130 */ 27, 68, 68, 68, 94, 119, 213, 213, 213, 216, - /* 140 */ 169, 169, 169, 169, 190, 208, 67, 89, 60, 60, - /* 150 */ 236, 173, 204, 205, 206, 211, 304, 308, 284, 220, - /* 160 */ 199, 53, 223, 228, 229, 327, 330, 191, 201, 266, - /* 170 */ 418, 297, 427, 303, 376, 431, 306, 432, 309, 381, - /* 180 */ 437, 332, 407, 380, 343, 364, 372, 365, 373, 374, - /* 190 */ 377, 471, 379, 382, 384, 375, 366, 383, 369, 385, - /* 200 */ 387, 389, 388, 391, 390, 405, 480, 485, 486, 487, - /* 210 */ 488, 493, 490, 491, 492, 397, 423, 489, 428, 443, - /* 220 */ 494, 378, 386, 401, 498, 501, 410, 412, 401, 504, - /* 230 */ 506, 507, 509, 510, 511, 512, 513, 514, 515, 463, - /* 240 */ 464, 447, 503, 505, 466, 469, 528, + /* 90 */ 128, 128, 128, 128, 128, 128, 269, 321, 321, 282, + /* 100 */ 282, 321, 281, 286, 293, 300, 297, 303, 305, 307, + /* 110 */ 295, 269, 321, 321, 342, 342, 321, 334, 336, 371, + /* 120 */ 341, 340, 370, 343, 346, 321, 353, 321, 353, 531, + /* 130 */ 531, 27, 68, 68, 68, 94, 119, 213, 213, 213, + /* 140 */ 216, 169, 169, 169, 169, 190, 208, 67, 89, 60, + /* 150 */ 60, 236, 173, 204, 205, 206, 211, 304, 308, 284, + /* 160 */ 220, 199, 53, 223, 228, 229, 327, 330, 191, 201, + /* 170 */ 266, 424, 298, 426, 306, 377, 431, 309, 432, 311, + /* 180 */ 381, 435, 312, 408, 382, 345, 363, 373, 366, 372, + /* 190 */ 375, 378, 474, 380, 383, 385, 376, 367, 384, 374, + /* 200 */ 386, 388, 391, 390, 393, 392, 406, 473, 486, 487, + /* 210 */ 488, 489, 494, 491, 492, 493, 398, 425, 485, 430, + /* 220 */ 490, 387, 389, 402, 499, 500, 410, 412, 402, 504, + /* 230 */ 505, 506, 507, 508, 510, 512, 513, 514, 515, 463, + /* 240 */ 464, 447, 503, 509, 466, 468, 527, }; -#define YY_REDUCE_COUNT (129) +#define YY_REDUCE_COUNT (130) #define YY_REDUCE_MIN (-254) -#define YY_REDUCE_MAX (253) +#define YY_REDUCE_MAX (255) static const short yy_reduce_ofst[] = { /* 0 */ -198, -53, -254, -246, -150, -172, -192, -116, -91, -90, /* 10 */ -207, -203, -248, -179, -162, -138, -218, -175, -19, -17, @@ -378,10 +378,11 @@ static const short yy_reduce_ofst[] = { /* 60 */ 121, 153, 154, 156, 157, 159, 160, 161, 162, 163, /* 70 */ 164, 166, 167, 168, 170, 171, 172, 174, 175, 176, /* 80 */ 177, 178, 179, 180, 181, 183, 184, 186, 187, 188, - /* 90 */ 189, 193, 194, 195, 196, 136, 197, 198, 133, 138, - /* 100 */ 200, 146, 155, 202, 207, 203, 210, 209, 214, 217, - /* 110 */ 215, 212, 218, 219, 221, 222, 224, 227, 225, 231, - /* 120 */ 230, 233, 241, 239, 232, 234, 243, 235, 248, 253, + /* 90 */ 189, 193, 194, 195, 196, 197, 136, 198, 200, 133, + /* 100 */ 138, 202, 146, 155, 203, 207, 209, 212, 214, 210, + /* 110 */ 217, 215, 218, 219, 221, 222, 224, 225, 227, 230, + /* 120 */ 233, 232, 235, 241, 231, 234, 244, 237, 254, 251, + /* 130 */ 255, }; static const YYACTIONTYPE yy_default[] = { /* 0 */ 615, 667, 823, 823, 615, 615, 615, 615, 615, 615, @@ -391,21 +392,21 @@ static const YYACTIONTYPE yy_default[] = { /* 40 */ 615, 615, 615, 615, 615, 615, 615, 615, 615, 615, /* 50 */ 615, 615, 615, 615, 615, 615, 615, 615, 772, 772, /* 60 */ 746, 615, 615, 615, 615, 615, 615, 615, 615, 615, - /* 70 */ 615, 615, 615, 615, 615, 615, 615, 615, 654, 615, - /* 80 */ 652, 615, 615, 615, 615, 615, 615, 615, 615, 615, - /* 90 */ 615, 615, 641, 615, 615, 615, 635, 635, 615, 615, - /* 100 */ 635, 779, 783, 777, 765, 773, 764, 760, 759, 787, - /* 110 */ 615, 635, 635, 664, 664, 635, 685, 683, 681, 673, - /* 120 */ 679, 675, 677, 671, 635, 662, 635, 662, 700, 713, - /* 130 */ 615, 788, 822, 778, 806, 805, 818, 812, 811, 615, - /* 140 */ 810, 809, 808, 807, 615, 615, 615, 615, 814, 813, - /* 150 */ 615, 615, 615, 615, 615, 615, 615, 615, 615, 790, - /* 160 */ 784, 780, 615, 615, 615, 615, 615, 615, 615, 615, + /* 70 */ 615, 615, 615, 615, 615, 615, 615, 615, 615, 654, + /* 80 */ 615, 652, 615, 615, 615, 615, 615, 615, 615, 615, + /* 90 */ 615, 615, 615, 641, 615, 615, 615, 635, 635, 615, + /* 100 */ 615, 635, 779, 783, 777, 765, 773, 764, 760, 759, + /* 110 */ 787, 615, 635, 635, 664, 664, 635, 685, 683, 681, + /* 120 */ 673, 679, 675, 677, 671, 635, 662, 635, 662, 700, + /* 130 */ 713, 615, 788, 822, 778, 806, 805, 818, 812, 811, + /* 140 */ 615, 810, 809, 808, 807, 615, 615, 615, 615, 814, + /* 150 */ 813, 615, 615, 615, 615, 615, 615, 615, 615, 615, + /* 160 */ 790, 784, 780, 615, 615, 615, 615, 615, 615, 615, /* 170 */ 615, 615, 615, 615, 615, 615, 615, 615, 615, 615, - /* 180 */ 615, 615, 615, 615, 615, 745, 615, 615, 754, 615, - /* 190 */ 615, 615, 615, 615, 615, 774, 615, 766, 615, 615, - /* 200 */ 615, 615, 615, 615, 722, 615, 615, 615, 615, 615, - /* 210 */ 615, 615, 615, 615, 615, 688, 615, 615, 615, 615, + /* 180 */ 615, 615, 615, 615, 615, 615, 745, 615, 615, 754, + /* 190 */ 615, 615, 615, 615, 615, 615, 774, 615, 766, 615, + /* 200 */ 615, 615, 615, 615, 615, 722, 615, 615, 615, 615, + /* 210 */ 615, 615, 615, 615, 615, 615, 688, 615, 615, 615, /* 220 */ 615, 615, 615, 827, 615, 615, 615, 716, 825, 615, /* 230 */ 615, 615, 615, 615, 615, 615, 615, 615, 615, 615, /* 240 */ 615, 615, 639, 637, 615, 631, 615, @@ -1038,7 +1039,7 @@ static const char *const yyRuleName[] = { /* 44 */ "ifexists ::=", /* 45 */ "ifnotexists ::= IF NOT EXISTS", /* 46 */ "ifnotexists ::=", - /* 47 */ "cmd ::= CREATE DNODE IPTOKEN", + /* 47 */ "cmd ::= CREATE DNODE ids", /* 48 */ "cmd ::= CREATE ACCOUNT ids PASS ids acct_optr", /* 49 */ "cmd ::= CREATE DATABASE ifnotexists ids db_optr", /* 50 */ "cmd ::= CREATE USER ids PASS ids", @@ -1711,7 +1712,7 @@ static const struct { { 209, 0 }, /* (44) ifexists ::= */ { 212, -3 }, /* (45) ifnotexists ::= IF NOT EXISTS */ { 212, 0 }, /* (46) ifnotexists ::= */ - { 205, -3 }, /* (47) cmd ::= CREATE DNODE IPTOKEN */ + { 205, -3 }, /* (47) cmd ::= CREATE DNODE ids */ { 205, -6 }, /* (48) cmd ::= CREATE ACCOUNT ids PASS ids acct_optr */ { 205, -5 }, /* (49) cmd ::= CREATE DATABASE ifnotexists ids db_optr */ { 205, -5 }, /* (50) cmd ::= CREATE USER ids PASS ids */ @@ -2122,7 +2123,7 @@ static void yy_reduce( case 45: /* ifnotexists ::= IF NOT EXISTS */ {yymsp[-2].minor.yy0.n = 1;} break; - case 47: /* cmd ::= CREATE DNODE IPTOKEN */ + case 47: /* cmd ::= CREATE DNODE ids */ { setDCLSQLElems(pInfo, TSDB_SQL_CREATE_DNODE, 1, &yymsp[0].minor.yy0);} break; case 48: /* cmd ::= CREATE ACCOUNT ids PASS ids acct_optr */ diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 110c114540..cf86ddbcd2 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -246,8 +246,8 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { SDataRow row = SL_GET_NODE_DATA(node); pCheckInfo->lastKey = dataRowKey(row); // first timestamp in buffer - uTrace("%p uid:%" PRId64", tid:%d check data in buffer from skey:%" PRId64 ", order:%d", pHandle, - pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, pCheckInfo->lastKey, pHandle->order); + uTrace("%p uid:%" PRId64", tid:%d check data in buffer from skey:%" PRId64 ", order:%d, %p", pHandle, + pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, pCheckInfo->lastKey, pHandle->order, pHandle->qinfo); // all data in mem are checked already. if ((pCheckInfo->lastKey > pHandle->window.ekey && ASCENDING_ORDER_TRAVERSE(pHandle->order)) || @@ -1273,33 +1273,6 @@ void filterPrepare(void* expr, void* param) { } } -int32_t doCompare(const char* f1, const char* f2, int32_t type, size_t size) { - switch (type) { - case TSDB_DATA_TYPE_INT: DEFAULT_COMP(GET_INT32_VAL(f1), GET_INT32_VAL(f2)); - case TSDB_DATA_TYPE_DOUBLE: DEFAULT_COMP(GET_DOUBLE_VAL(f1), GET_DOUBLE_VAL(f2)); - case TSDB_DATA_TYPE_FLOAT: DEFAULT_COMP(GET_FLOAT_VAL(f1), GET_FLOAT_VAL(f2)); - case TSDB_DATA_TYPE_BIGINT: DEFAULT_COMP(GET_INT64_VAL(f1), GET_INT64_VAL(f2)); - case TSDB_DATA_TYPE_SMALLINT: DEFAULT_COMP(GET_INT16_VAL(f1), GET_INT16_VAL(f2)); - case TSDB_DATA_TYPE_TINYINT: - case TSDB_DATA_TYPE_BOOL: DEFAULT_COMP(GET_INT8_VAL(f1), GET_INT8_VAL(f2)); - case TSDB_DATA_TYPE_NCHAR: { - int32_t ret = wcsncmp((wchar_t*) f1, (wchar_t*) f2, size/TSDB_NCHAR_SIZE); - if (ret == 0) { - return ret; - } - return (ret < 0) ? -1 : 1; - } - default: { - int32_t ret = strncmp(f1, f2, (size_t)size); - if (ret == 0) { - return ret; - } - - return (ret < 0) ? -1 : 1; - } - } -} - typedef struct STableGroupSupporter { int32_t numOfCols; SColIndex* pCols; @@ -1504,7 +1477,7 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT *tsdb, int64_t uid, const char *pTagC const char* tbnameCond, STableGroupInfo *pGroupInfo, SColIndex *pColIndex, int32_t numOfCols) { STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid); if (pTable == NULL) { - uError("failed to get stable, uid:%" PRIu64, uid); + uError("failed to get stable, uid:%, %p" PRIu64, uid); return TSDB_CODE_INVALID_TABLE_ID; } @@ -1517,7 +1490,12 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT *tsdb, int64_t uid, const char *pTagC if (ret == TSDB_CODE_SUCCESS) { pGroupInfo->numOfTables = taosArrayGetSize(res); pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, tsdb); + + uTrace("no tbname condition or tagcond, all tables belongs to one group, numOfTables:%d", pGroupInfo->numOfTables); + } else { + // todo add error } + taosArrayDestroy(res); return ret; } diff --git a/src/util/inc/tcompare.h b/src/util/inc/tcompare.h index 36010971b5..8aaa39e483 100644 --- a/src/util/inc/tcompare.h +++ b/src/util/inc/tcompare.h @@ -38,6 +38,8 @@ int patternMatch(const char *zPattern, const char *zString, size_t size, const S int WCSPatternMatch(const wchar_t *zPattern, const wchar_t *zString, size_t size, const SPatternCompareInfo *pInfo); +int32_t doCompare(const char* f1, const char* f2, int32_t type, size_t size); + __compar_fn_t getKeyComparFunc(int32_t keyType); __compar_fn_t getComparFunc(int32_t type, int32_t optr); diff --git a/src/util/src/tcompare.c b/src/util/src/tcompare.c index 6bc2c4e305..13a5a8580e 100644 --- a/src/util/src/tcompare.c +++ b/src/util/src/tcompare.c @@ -333,3 +333,30 @@ __compar_fn_t getKeyComparFunc(int32_t keyType) { return comparFn; } + +int32_t doCompare(const char* f1, const char* f2, int32_t type, size_t size) { + switch (type) { + case TSDB_DATA_TYPE_INT: DEFAULT_COMP(GET_INT32_VAL(f1), GET_INT32_VAL(f2)); + case TSDB_DATA_TYPE_DOUBLE: DEFAULT_COMP(GET_DOUBLE_VAL(f1), GET_DOUBLE_VAL(f2)); + case TSDB_DATA_TYPE_FLOAT: DEFAULT_COMP(GET_FLOAT_VAL(f1), GET_FLOAT_VAL(f2)); + case TSDB_DATA_TYPE_BIGINT: DEFAULT_COMP(GET_INT64_VAL(f1), GET_INT64_VAL(f2)); + case TSDB_DATA_TYPE_SMALLINT: DEFAULT_COMP(GET_INT16_VAL(f1), GET_INT16_VAL(f2)); + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_BOOL: DEFAULT_COMP(GET_INT8_VAL(f1), GET_INT8_VAL(f2)); + case TSDB_DATA_TYPE_NCHAR: { + int32_t ret = wcsncmp((wchar_t*) f1, (wchar_t*) f2, size/TSDB_NCHAR_SIZE); + if (ret == 0) { + return ret; + } + return (ret < 0) ? -1 : 1; + } + default: { + int32_t ret = strncmp(f1, f2, (size_t)size); + if (ret == 0) { + return ret; + } + + return (ret < 0) ? -1 : 1; + } + } +} diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index d8f534e3b7..b111f56e39 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -56,7 +56,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont qinfo_t pQInfo = NULL; if (contLen != 0) { - pRet->code = qCreateQueryInfo(pVnode->tsdb, pQueryTableMsg, &pQInfo); + pRet->code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo); SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); pRsp->qhandle = htobe64((uint64_t) (pQInfo)); -- GitLab