提交 b93fbf7c 编写于 作者: H hjxilinx

[td-186]

上级 bb8b91f2
......@@ -38,6 +38,9 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql);
int32_t tscHandleMultivnodeInsert(SSqlObj *pSql);
void tscBuildResFromSubqueries(SSqlObj *pSql);
void **doSetResultRowData(SSqlObj *pSql, bool finalResult);
#ifdef __cplusplus
}
#endif
......
......@@ -57,8 +57,8 @@ typedef struct SJoinSubquerySupporter {
int64_t interval; // interval time
SLimitVal limit; // limit info
uint64_t uid; // query meter uid
SArray* colList; // previous query information
SArray* exprsInfo;
SArray* colList; // previous query information, no need to use this attribute, and the corresponding attribution
SArray* exprList;
SFieldInfo fieldsInfo;
STagCond tagCond;
SSqlGroupbyExpr groupbyExpr;
......@@ -159,7 +159,7 @@ SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functi
int32_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo);
SSqlExpr* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index);
SArray* tscSqlExprCopy(const SArray* src, uint64_t uid, bool deepcopy);
void tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy);
void tscSqlExprInfoDestroy(SArray* pExprInfo);
SColumn* tscColumnClone(const SColumn* src);
......@@ -203,7 +203,10 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo *pQueryInfo);
int32_t tscAddSubqueryInfo(SSqlCmd *pCmd);
void tscFreeQueryInfo(SSqlCmd* pCmd);
void tscInitQueryInfo(SQueryInfo* pQueryInfo);
void tscClearSubqueryInfo(SSqlCmd* pCmd);
int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex);
......
......@@ -206,7 +206,7 @@ typedef struct SQueryInfo {
SArray * colList; // SArray<SColumn*>
SFieldInfo fieldsInfo;
SArray * exprsInfo; // SArray<SSqlExpr*>
SArray * exprList; // SArray<SSqlExpr*>
SLimitVal limit;
SLimitVal slimit;
STagCond tagCond;
......
......@@ -14,17 +14,18 @@
*/
#include "os.h"
#include "tutil.h"
#include "tnote.h"
#include "trpc.h"
#include "tscLog.h"
#include "tscProfile.h"
#include "tscSubquery.h"
#include "tscSecondaryMerge.h"
#include "tscUtil.h"
#include "tsclient.h"
#include "tsocket.h"
#include "tutil.h"
#include "tnote.h"
#include "tsched.h"
#include "tschemautil.h"
#include "tsclient.h"
static void tscProcessFetchRow(SSchedMsg *pMsg);
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
......@@ -219,12 +220,17 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
pSql->param = param;
tscResetForNextRetrieve(pRes);
if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC && pCmd->command < TSDB_SQL_LOCAL) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
// handle the sub queries of join query
if (pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE) {
tscFetchDatablockFromSubquery(pSql);
} else {
if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC && pCmd->command < TSDB_SQL_LOCAL) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
}
tscProcessSql(pSql);
}
tscProcessSql(pSql);
}
void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW), void *param) {
......
......@@ -330,7 +330,7 @@ static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) {
}
int32_t totalNumOfResults = pMetricMeta->numOfTables;
int32_t rowLen = tscGetResRowLength(pQueryInfo->exprsInfo);
int32_t rowLen = tscGetResRowLength(pQueryInfo->exprList);
tscInitResObjForLocalQuery(pSql, totalNumOfResults, rowLen);
......@@ -370,7 +370,7 @@ static int tscBuildMetricTagSqlFunctionResult(SSqlObj *pSql) {
#if 0
SSuperTableMeta *pMetricMeta = tscGetMetaInfo(pQueryInfo, 0)->pMetricMeta;
int32_t totalNumOfResults = 1; // count function only produce one result
int32_t rowLen = tscGetResRowLength(pQueryInfo->exprsInfo);
int32_t rowLen = tscGetResRowLength(pQueryInfo->exprList);
tscInitResObjForLocalQuery(pSql, totalNumOfResults, rowLen);
......@@ -408,7 +408,7 @@ static int tscProcessQueryTags(SSqlObj *pSql) {
return pSql->res.code;
}
SSqlExpr *pExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0);
SSqlExpr *pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
if (pExpr->functionId == TSDB_FUNC_COUNT) {
return tscBuildMetricTagSqlFunctionResult(pSql);
} else {
......@@ -419,7 +419,7 @@ static int tscProcessQueryTags(SSqlObj *pSql) {
static void tscProcessCurrentUser(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
tscSetLocalQueryResult(pSql, pSql->pTscObj->user, pExpr->aliasName, TSDB_USER_LEN);
}
......@@ -434,7 +434,7 @@ static void tscProcessCurrentDB(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
tscSetLocalQueryResult(pSql, db, pExpr->aliasName, TSDB_DB_NAME_LEN);
}
......@@ -442,14 +442,14 @@ static void tscProcessServerVer(SSqlObj *pSql) {
const char* v = pSql->pTscObj->sversion;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
tscSetLocalQueryResult(pSql, v, pExpr->aliasName, tListLen(pSql->pTscObj->sversion));
}
static void tscProcessClientVer(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
tscSetLocalQueryResult(pSql, version, pExpr->aliasName, strlen(version));
}
......@@ -469,7 +469,7 @@ static void tscProcessServStatus(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
tscSetLocalQueryResult(pSql, "1", pExpr->aliasName, 2);
}
......@@ -491,7 +491,7 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 0);
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, 0);
pInfo->pSqlExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0);
pInfo->pSqlExpr = taosArrayGetP(pQueryInfo->exprList, 0);
strncpy(pRes->data, val, pField->bytes);
}
......
......@@ -105,7 +105,7 @@ static int32_t getColumnIndexByName(const SSQLToken* pToken, SQueryInfo* pQueryI
static int32_t getTableIndexByName(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex);
static int32_t optrToString(tSQLExpr* pExpr, char** exprString);
static int32_t getMeterIndex(SSQLToken* pTableToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex);
static int32_t getTableIndexImpl(SSQLToken* pTableToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex);
static int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo);
static int32_t doLocalQueryProcess(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql);
static int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCMCreateDbMsg* pCreate);
......@@ -1179,7 +1179,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
tExprNode* pNode = NULL;
SArray* colList = taosArrayInit(10, sizeof(SColIndex));
int32_t ret = exprTreeFromSqlExpr(&pNode, pItem->pNode, pQueryInfo->exprsInfo, pQueryInfo, colList);
int32_t ret = exprTreeFromSqlExpr(&pNode, pItem->pNode, pQueryInfo->exprList, pQueryInfo, colList);
if (ret != TSDB_CODE_SUCCESS) {
tExprTreeDestroy(&pNode, NULL);
return invalidSqlErrMsg(pQueryInfo->msg, "invalid arithmetic expression in select clause");
......@@ -1218,7 +1218,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
tExprNode* pNode = NULL;
// SArray* colList = taosArrayInit(10, sizeof(SColIndex));
int32_t ret = exprTreeFromSqlExpr(&pNode, pItem->pNode, pQueryInfo->exprsInfo, pQueryInfo, NULL);
int32_t ret = exprTreeFromSqlExpr(&pNode, pItem->pNode, pQueryInfo->exprList, pQueryInfo, NULL);
if (ret != TSDB_CODE_SUCCESS) {
tExprTreeDestroy(&pNode, NULL);
return invalidSqlErrMsg(pQueryInfo->msg, "invalid expression in select clause");
......@@ -1548,7 +1548,7 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr
int32_t size = tDataTypeDesc[TSDB_DATA_TYPE_BIGINT].nSize;
pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, size, false);
} else {
// count the number of meters created according to the metric
// count the number of meters created according to the super table
if (getColumnIndexByName(pToken, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(pQueryInfo->msg, msg3);
}
......@@ -2002,7 +2002,7 @@ int32_t doGetColumnIndexByName(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColum
}
}
int32_t getMeterIndex(SSQLToken* pTableToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex) {
int32_t getTableIndexImpl(SSQLToken* pTableToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex) {
if (pTableToken->n == 0) { // only one table and no table name prefix in column name
if (pQueryInfo->numOfTables == 1) {
pIndex->tableIndex = 0;
......@@ -2035,7 +2035,7 @@ int32_t getTableIndexByName(SSQLToken* pToken, SQueryInfo* pQueryInfo, SColumnIn
SSQLToken tableToken = {0};
extractTableNameFromToken(pToken, &tableToken);
if (getMeterIndex(&tableToken, pQueryInfo, pIndex) != TSDB_CODE_SUCCESS) {
if (getTableIndexImpl(&tableToken, pQueryInfo, pIndex) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_INVALID_SQL;
}
......@@ -3199,7 +3199,6 @@ static bool validateJoinExprNode(SQueryInfo* pQueryInfo, tSQLExpr* pExpr, SColum
const char* msg3 = "join column must have same type";
const char* msg4 = "self join is not allowed";
const char* msg5 = "join table must be the same type(table to table, super table to super table)";
const char* msg6 = "tags in join condition not support binary/nchar types";
tSQLExpr* pRight = pExpr->pRight;
......@@ -3234,9 +3233,6 @@ static bool validateJoinExprNode(SQueryInfo* pQueryInfo, tSQLExpr* pExpr, SColum
} else if (pLeftIndex->tableIndex == rightIndex.tableIndex) {
invalidSqlErrMsg(pQueryInfo->msg, msg4);
return false;
} else if (leftType == TSDB_DATA_TYPE_BINARY || leftType == TSDB_DATA_TYPE_NCHAR) {
invalidSqlErrMsg(pQueryInfo->msg, msg6);
return false;
}
// table to table/ super table to super table are allowed
......@@ -4044,7 +4040,7 @@ int32_t parseFillClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySQL) {
}
}
size_t size = taosArrayGetSize(pQueryInfo->exprsInfo);
size_t size = taosArrayGetSize(pQueryInfo->exprList);
if ((pFillToken->nExpr < size) ||
((pFillToken->nExpr - 1 < size) && (tscIsPointInterpQuery(pQueryInfo)))) {
......@@ -4079,7 +4075,7 @@ static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) {
pQueryInfo->order.orderColId = -1;
}
/* for metric query, set default ascending order for group output */
/* for super table query, set default ascending order for group output */
if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
pQueryInfo->groupbyExpr.orderType = TSDB_ORDER_ASC;
}
......@@ -4127,7 +4123,7 @@ int32_t parseOrderbyClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql, SSchema
SSQLToken columnName = {pVar->nLen, pVar->nType, pVar->pz};
SColumnIndex index = {0};
if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { // metric query
if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { // super table query
if (getColumnIndexByName(&columnName, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
}
......@@ -4460,7 +4456,7 @@ int32_t validateSqlFunctionInStreamSql(SQueryInfo* pQueryInfo) {
return invalidSqlErrMsg(pQueryInfo->msg, msg0);
}
size_t size = taosArrayGetSize(pQueryInfo->exprsInfo);
size_t size = taosArrayGetSize(pQueryInfo->exprList);
for (int32_t i = 0; i < size; ++i) {
int32_t functId = tscSqlExprGet(pQueryInfo, i)->functionId;
if (!IS_STREAM_QUERY_VALID(aAggs[functId].nStatus)) {
......@@ -4476,7 +4472,7 @@ int32_t validateFunctionsInIntervalOrGroupbyQuery(SQueryInfo* pQueryInfo) {
const char* msg1 = "column projection is not compatible with interval";
// multi-output set/ todo refactor
size_t size = taosArrayGetSize(pQueryInfo->exprsInfo);
size_t size = taosArrayGetSize(pQueryInfo->exprList);
for (int32_t k = 0; k < size; ++k) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k);
......@@ -4736,7 +4732,7 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL*
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
}
size_t size = taosArrayGetSize(pQueryInfo->exprsInfo);
size_t size = taosArrayGetSize(pQueryInfo->exprList);
// filter the query functions operating on "tbname" column that are not supported by normal columns.
for (int32_t i = 0; i < size; ++i) {
......@@ -4849,7 +4845,7 @@ void tscAddTimestampColumn(SQueryInfo* pQueryInfo, int16_t functionId, int16_t t
if (pExpr == NULL || pExpr->colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX || pExpr->functionId != functionId) {
SColumnIndex index = {tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX};
pExpr = tscSqlExprAppend(pQueryInfo, functionId, &index, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, TSDB_KEYSIZE, false);
pExpr = tscSqlExprInsert(pQueryInfo, 0, functionId, &index, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, TSDB_KEYSIZE, false);
pExpr->colInfo.flag = TSDB_COL_NORMAL;
// NOTE: tag column does not add to source column list
......@@ -4864,7 +4860,7 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClau
if (pParentQueryInfo->groupbyExpr.numOfGroupCols > 0) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, subClauseIndex);
size_t size = taosArrayGetSize(pQueryInfo->exprsInfo);
size_t size = taosArrayGetSize(pQueryInfo->exprList);
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, size - 1);
......@@ -4933,7 +4929,7 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex) {
static void doUpdateSqlFunctionForTagPrj(SQueryInfo* pQueryInfo) {
int32_t tagLength = 0;
size_t size = taosArrayGetSize(pQueryInfo->exprsInfo);
size_t size = taosArrayGetSize(pQueryInfo->exprList);
for (int32_t i = 0; i < size; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
......@@ -4960,7 +4956,7 @@ static void doUpdateSqlFunctionForTagPrj(SQueryInfo* pQueryInfo) {
}
static void doUpdateSqlFunctionForColPrj(SQueryInfo* pQueryInfo) {
size_t size = taosArrayGetSize(pQueryInfo->exprsInfo);
size_t size = taosArrayGetSize(pQueryInfo->exprList);
for (int32_t i = 0; i < size; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
......@@ -4998,7 +4994,7 @@ static bool onlyTagPrjFunction(SQueryInfo* pQueryInfo) {
bool hasTagPrj = false;
bool hasColumnPrj = false;
size_t size = taosArrayGetSize(pQueryInfo->exprsInfo);
size_t size = taosArrayGetSize(pQueryInfo->exprList);
for (int32_t i = 0; i < size; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId == TSDB_FUNC_PRJ) {
......@@ -5033,7 +5029,7 @@ static bool allTagPrjInGroupby(SQueryInfo* pQueryInfo) {
}
static void updateTagPrjFunction(SQueryInfo* pQueryInfo) {
size_t size = taosArrayGetSize(pQueryInfo->exprsInfo);
size_t size = taosArrayGetSize(pQueryInfo->exprList);
for (int32_t i = 0; i < size; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
......@@ -5057,9 +5053,9 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo) {
int16_t numOfSelectivity = 0;
int16_t numOfAggregation = 0;
size_t numOfExprs = taosArrayGetSize(pQueryInfo->exprsInfo);
size_t numOfExprs = taosArrayGetSize(pQueryInfo->exprList);
for (int32_t i = 0; i < numOfExprs; ++i) {
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprsInfo, i);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, i);
if (pExpr->functionId == TSDB_FUNC_TAGPRJ ||
(pExpr->functionId == TSDB_FUNC_PRJ && pExpr->colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX)) {
tagColExists = true;
......@@ -5068,7 +5064,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo) {
}
for (int32_t i = 0; i < numOfExprs; ++i) {
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprsInfo, i);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, i);
int16_t functionId = pExpr->functionId;
if (functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TS ||
......@@ -5275,7 +5271,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
return TSDB_CODE_INVALID_SQL;
}
// projection query on metric does not compatible with "group by" syntax
// projection query on super table does not compatible with "group by" syntax
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
......@@ -5491,7 +5487,7 @@ int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSqlInfo* p
pCmd->numOfCols = (int16_t)pFieldList->nField;
if (pTagList != NULL) { // create metric[optional]
if (pTagList != NULL) { // create super table[optional]
for (int32_t i = 0; i < pTagList->nField; ++i) {
tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &pTagList->p[i]);
}
......
......@@ -282,7 +282,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pReducer->nResultBufSize = pMemBuffer[0]->pageSize * 16;
pReducer->pResultBuf = (tFilePage *)calloc(1, pReducer->nResultBufSize + sizeof(tFilePage));
int32_t finalRowLength = tscGetResRowLength(pQueryInfo->exprsInfo);
int32_t finalRowLength = tscGetResRowLength(pQueryInfo->exprList);
pReducer->resColModel = finalmodel;
pReducer->resColModel->capacity = pReducer->nResultBufSize / finalRowLength;
assert(finalRowLength <= pReducer->rowSize);
......@@ -804,7 +804,7 @@ void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo
// todo merge with following function
// static void reversedCopyResultToDstBuf(SQueryInfo* pQueryInfo, SSqlRes *pRes, tFilePage *pFinalDataPage) {
//
// for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
// for (int32_t i = 0; i < pQueryInfo->exprList.numOfExprs; ++i) {
// TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
//
// int32_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
......@@ -901,7 +901,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, &pLocalReducer->interpolationInfo);
}
int32_t rowSize = tscGetResRowLength(pQueryInfo->exprsInfo);
int32_t rowSize = tscGetResRowLength(pQueryInfo->exprList);
memcpy(pRes->data, pFinalDataPage->data, pRes->numOfRows * rowSize);
pFinalDataPage->numOfElems = 0;
......@@ -1423,8 +1423,7 @@ int32_t tscDoLocalreduce(SSqlObj *pSql) {
tscResetForNextRetrieve(pRes);
if (pSql->signature != pSql || pRes == NULL || pRes->pLocalReducer == NULL) { // all data has been processed
tscTrace("%s call the drop local reducer", __FUNCTION__);
tscTrace("%p %s call the drop local reducer", pSql, __FUNCTION__);
tscDestroyLocalReducer(pSql);
return 0;
}
......@@ -1435,7 +1434,7 @@ int32_t tscDoLocalreduce(SSqlObj *pSql) {
// set the data merge in progress
int32_t prevStatus =
atomic_val_compare_exchange_32(&pLocalReducer->status, TSC_LOCALREDUCE_READY, TSC_LOCALREDUCE_IN_PROGRESS);
if (prevStatus != TSC_LOCALREDUCE_READY || pLocalReducer == NULL) {
if (prevStatus != TSC_LOCALREDUCE_READY) {
assert(prevStatus == TSC_LOCALREDUCE_TOBE_FREED); // it is in tscDestroyLocalReducer function already
return TSDB_CODE_SUCCESS;
}
......
......@@ -140,7 +140,7 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
}
} else {
tscTrace("heart beat failed, code:%d", code);
tscTrace("heart beat failed, code:%s", tstrerror(code));
}
taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
......@@ -326,11 +326,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
pRes->pRsp = NULL;
}
// ignore the error information returned from mnode when set ignore flag in sql
if (pRes->code == TSDB_CODE_DB_ALREADY_EXIST && pCmd->existsCheck && pRes->rspType == TSDB_MSG_TYPE_CM_CREATE_DB_RSP) {
pRes->code = TSDB_CODE_SUCCESS;
}
/*
* There is not response callback function for submit response.
* The actual inserted number of points is the first number.
......@@ -427,8 +422,9 @@ int tscProcessSql(SSqlObj *pSql) {
type = pQueryInfo->type;
// for heartbeat, numOfTables == 0;
assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
// while numOfTables equals to 0, it must be Heartbeat
assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) ||
pQueryInfo->numOfTables > 0);
}
tscTrace("%p SQL cmd:%d will be processed, name:%s, type:%d", pSql, pCmd->command, name, type);
......@@ -1474,12 +1470,10 @@ int tscProcessRetrieveMetricRsp(SSqlObj *pSql) {
pRes->row = 0;
uint8_t code = pRes->code;
if (pSql->fp) { // async retrieve metric data
if (pRes->code == TSDB_CODE_SUCCESS) {
(*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
} else {
tscQueueAsyncRes(pSql);
}
if (pRes->code == TSDB_CODE_SUCCESS) {
(*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
} else {
tscQueueAsyncRes(pSql);
}
return code;
......
......@@ -435,30 +435,7 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) {
return (pQueryInfo->order.order == TSDB_ORDER_DESC) ? pRes->numOfRows : -pRes->numOfRows;
}
static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pField) {
SSqlRes *pRes = &pSql->res;
if (isNull(pRes->tsrow[columnIndex], pField->type)) {
pRes->tsrow[columnIndex] = NULL;
} else if (pField->type == TSDB_DATA_TYPE_NCHAR) {
// convert unicode to native code in a temporary buffer extra one byte for terminated symbol
if (pRes->buffer[columnIndex] == NULL) {
pRes->buffer[columnIndex] = malloc(pField->bytes + TSDB_NCHAR_SIZE);
}
/* string terminated char for binary data*/
memset(pRes->buffer[columnIndex], 0, pField->bytes + TSDB_NCHAR_SIZE);
if (taosUcs4ToMbs(pRes->tsrow[columnIndex], pField->bytes, pRes->buffer[columnIndex])) {
pRes->tsrow[columnIndex] = pRes->buffer[columnIndex];
} else {
tscError("%p charset:%s to %s. val:%ls convert failed.", pSql, DEFAULT_UNICODE_ENCODEC, tsCharset, pRes->tsrow);
pRes->tsrow[columnIndex] = NULL;
}
}
}
static char *getArithemicInputSrc(void *param, const char *name, int32_t colId) {
static UNUSED_FUNC char *getArithemicInputSrc(void *param, const char *name, int32_t colId) {
// SArithmeticSupport *pSupport = (SArithmeticSupport *)param;
// SArithExprInfo * pExpr = pSupport->pArithExpr;
......@@ -475,210 +452,6 @@ static char *getArithemicInputSrc(void *param, const char *name, int32_t colId)
return 0;
}
static void **doSetResultRowData(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
assert(pRes->row >= 0 && pRes->row <= pRes->numOfRows);
if (pRes->row >= pRes->numOfRows) { // all the results has returned to invoker
tfree(pRes->tsrow);
return pRes->tsrow;
}
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
//todo refactor move away
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
for(int32_t k = 0; k < numOfExprs; ++k) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k);
if (k > 0) {
SSqlExpr* pPrev = tscSqlExprGet(pQueryInfo, k - 1);
pExpr->offset = pPrev->offset + pPrev->resBytes;
}
}
int32_t num = 0;
for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, i);
if (pInfo->pSqlExpr != NULL) {
pRes->tsrow[i] = tscGetResultColumnChr(pRes, pQueryInfo, i) + pInfo->pSqlExpr->resBytes * pRes->row;
} else {
assert(0);
}
// primary key column cannot be null in interval query, no need to check
if (i == 0 && pQueryInfo->intervalTime > 0) {
continue;
}
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
transferNcharData(pSql, i, pField);
// calculate the result from several other columns
if (pInfo->pArithExprInfo != NULL) {
SArithmeticSupport *sas = (SArithmeticSupport *)calloc(1, sizeof(SArithmeticSupport));
sas->offset = 0;
sas->pArithExpr = pInfo->pArithExprInfo;
// sas->numOfCols = sas->pArithExpr->numOfCols;
if (pRes->buffer[i] == NULL) {
pRes->buffer[i] = malloc(tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i)->bytes);
}
for(int32_t k = 0; k < sas->numOfCols; ++k) {
// int32_t columnIndex = sas->pArithExpr->colList[k].colIndex;
// SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, columnIndex);
//
// sas->elemSize[k] = pExpr->resBytes;
// sas->data[k] = (pRes->data + pRes->numOfRows* pExpr->offset) + pRes->row*pExpr->resBytes;
}
tExprTreeCalcTraverse(sas->pArithExpr->pExpr, 1, pRes->buffer[i], sas, TSDB_ORDER_ASC, getArithemicInputSrc);
pRes->tsrow[i] = pRes->buffer[i];
free(sas); //todo optimization
}
}
assert(num <= pQueryInfo->fieldsInfo.numOfOutput);
pRes->row++; // index increase one-step
return pRes->tsrow;
}
static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) {
bool hasData = true;
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
bool allSubqueryExhausted = true;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
if (pSql->pSubs[i] == NULL) {
continue;
}
// SSqlRes *pRes1 = &pSql->pSubs[i]->res;
SSqlCmd *pCmd1 = &pSql->pSubs[i]->cmd;
SQueryInfo * pQueryInfo1 = tscGetQueryInfoDetail(pCmd1, pCmd1->clauseIndex);
// STableMetaInfo *pMetaInfo = tscGetMetaInfo(pQueryInfo1, 0);
assert(pQueryInfo1->numOfTables == 1);
/*
* if the global limitation is not reached, and current result has not exhausted, or next more vnodes are
* available, goes on
*/
// if (pMetaInfo->vnodeIndex < pMetaInfo->pMetricMeta->numOfVnodes && pRes1->row < pRes1->numOfRows &&
// (!tscHasReachLimitation(pQueryInfo1, pRes1))) {
// allSubqueryExhausted = false;
// break;
// }
}
hasData = !allSubqueryExhausted;
} else { // otherwise, in case inner join, if any subquery exhausted, query completed.
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
if (pSql->pSubs[i] == 0) {
continue;
}
SSqlRes * pRes1 = &pSql->pSubs[i]->res;
SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0);
if ((pRes1->row >= pRes1->numOfRows && tscHasReachLimitation(pQueryInfo1, pRes1) &&
tscProjectionQueryOnTable(pQueryInfo1)) ||
(pRes1->numOfRows == 0)) {
hasData = false;
break;
}
}
}
return hasData;
}
static UNUSED_FUNC void **tscBuildResFromSubqueries(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
while (1) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
if (pRes->tsrow == NULL) {
pRes->tsrow = calloc(numOfExprs, POINTER_BYTES);
}
bool success = false;
int32_t numOfTableHasRes = 0;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
if (pSql->pSubs[i] != 0) {
numOfTableHasRes++;
}
}
if (numOfTableHasRes >= 2) { // do merge result
success = (doSetResultRowData(pSql->pSubs[0]) != NULL) && (doSetResultRowData(pSql->pSubs[1]) != NULL);
} else { // only one subquery
SSqlObj *pSub = pSql->pSubs[0];
if (pSub == NULL) {
pSub = pSql->pSubs[1];
}
success = (doSetResultRowData(pSub) != NULL);
}
if (success) { // current row of final output has been built, return to app
for (int32_t i = 0; i < numOfExprs; ++i) {
int32_t tableIndex = pRes->pColumnIndex[i].tableIndex;
int32_t columnIndex = pRes->pColumnIndex[i].columnIndex;
SSqlRes *pRes1 = &pSql->pSubs[tableIndex]->res;
pRes->tsrow[i] = pRes1->tsrow[columnIndex];
}
pRes->numOfTotalInCurrentClause++;
break;
} else { // continue retrieve data from vnode
if (!tscHashRemainDataInSubqueryResultSet(pSql)) {
tscTrace("%p at least one subquery exhausted, free all other %d subqueries", pSql, pSql->numOfSubs - 1);
SSubqueryState *pState = NULL;
// free all sub sqlobj
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SSqlObj *pChildObj = pSql->pSubs[i];
if (pChildObj == NULL) {
continue;
}
SJoinSubquerySupporter *pSupporter = (SJoinSubquerySupporter *)pChildObj->param;
pState = pSupporter->pState;
tscDestroyJoinSupporter(pChildObj->param);
taos_free_result(pChildObj);
}
free(pState);
return NULL;
}
tscFetchDatablockFromSubquery(pSql);
if (pRes->code != TSDB_CODE_SUCCESS) {
return NULL;
}
}
}
return pRes->tsrow;
}
static void waitForRetrieveRsp(void *param, TAOS_RES *tres, int numOfRows) {
SSqlObj* pSql = (SSqlObj*) tres;
......@@ -705,15 +478,19 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
}
// current data are exhausted, fetch more data
if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && pRes->completed != true &&
(pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_METRIC ||
pCmd->command == TSDB_SQL_FETCH || pCmd->command == TSDB_SQL_DESCRIBE_TABLE))) {
if (pRes->row >= pRes->numOfRows && pRes->completed != true &&
(pCmd->command == TSDB_SQL_RETRIEVE ||
pCmd->command == TSDB_SQL_RETRIEVE_METRIC ||
pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE ||
pCmd->command == TSDB_SQL_FETCH ||
pCmd->command == TSDB_SQL_SHOW ||
pCmd->command == TSDB_SQL_SELECT ||
pCmd->command == TSDB_SQL_DESCRIBE_TABLE)) {
taos_fetch_rows_a(res, waitForRetrieveRsp, pSql->pTscObj);
sem_wait(&pSql->rspSem);
}
return doSetResultRowData(pSql);
return doSetResultRowData(pSql, true);
}
int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
......
......@@ -24,7 +24,8 @@ typedef struct SInsertSupporter {
SSqlObj* pSql;
} SInsertSupporter;
static void freeSubqueryObj(SSqlObj* pSql);
static void freeJoinSubqueryObj(SSqlObj* pSql);
static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql);
static bool doCompare(int32_t order, int64_t left, int64_t right) {
if (order == TSDB_ORDER_ASC) {
......@@ -172,7 +173,6 @@ SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pS
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, index);
pSupporter->uid = pTableMetaInfo->pTableMeta->uid;
assert (pSupporter->uid != 0);
getTmpfilePath("join-", pSupporter->path);
......@@ -190,14 +190,20 @@ void tscDestroyJoinSupporter(SJoinSubquerySupporter* pSupporter) {
return;
}
tscSqlExprInfoDestroy(pSupporter->exprsInfo);
tscColumnListDestroy(pSupporter->colList);
if (pSupporter->exprList != NULL) {
tscSqlExprInfoDestroy(pSupporter->exprList);
}
if (pSupporter->colList != NULL) {
tscColumnListDestroy(pSupporter->colList);
}
tscFieldInfoClear(&pSupporter->fieldsInfo);
if (pSupporter->f != NULL) {
fclose(pSupporter->f);
unlink(pSupporter->path);
pSupporter->f = NULL;
}
tscTagCondRelease(&pSupporter->tagCond);
......@@ -238,7 +244,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
pSupporter = pSql->pSubs[i]->param;
if (taosArrayGetSize(pSupporter->exprsInfo) > 0) {
if (taosArrayGetSize(pSupporter->exprList) > 0) {
++numOfSub;
}
}
......@@ -249,9 +255,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
tscTrace("%p start to launch secondary subqueries, total:%d, only:%d needs to query, others are not retrieve in "
"select clause", pSql, pSql->numOfSubs, numOfSub);
/*
* the subqueries that do not actually launch the secondary query to virtual node is set as completed.
*/
//the subqueries that do not actually launch the secondary query to virtual node is set as completed.
pState = pSupporter->pState;
pState->numOfTotal = pSql->numOfSubs;
pState->numOfCompleted = (pSql->numOfSubs - numOfSub);
......@@ -264,7 +268,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
pSupporter = pPrevSub->param;
if (taosArrayGetSize(pSupporter->exprsInfo) == 0) {
if (taosArrayGetSize(pSupporter->exprList) == 0) {
tscTrace("%p subIndex: %d, not need to launch query, ignore it", pSql, i);
tscDestroyJoinSupporter(pSupporter);
......@@ -300,23 +304,25 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
pQueryInfo->intervalTime = pSupporter->interval;
pQueryInfo->groupbyExpr = pSupporter->groupbyExpr;
tscColumnListCopy(pQueryInfo->colList, pSupporter->colList, 0);
tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond);
pQueryInfo->exprsInfo = tscSqlExprCopy(pSupporter->exprsInfo, pSupporter->uid, false);
tscFieldInfoCopy(&pQueryInfo->fieldsInfo, &pSupporter->fieldsInfo);
pQueryInfo->colList = pSupporter->colList;
pQueryInfo->exprList = pSupporter->exprList;
pQueryInfo->fieldsInfo = pSupporter->fieldsInfo;
pSupporter->fieldsInfo.numOfOutput = 0;
pSupporter->exprList = NULL;
pSupporter->colList = NULL;
memset(&pSupporter->fieldsInfo, 0, sizeof(SFieldInfo));
/*
* if the first column of the secondary query is not ts function, add this function.
* Because this column is required to filter with timestamp after intersecting.
*/
SSqlExpr* pExpr = taosArrayGet(pSupporter->exprsInfo, 0);
if (pExpr->functionId != TSDB_FUNC_TS) {
tscAddTimestampColumn(pQueryInfo, TSDB_FUNC_TS, 0);
}
// SSqlExpr* pExpr = taosArrayGet(pQueryInfo->exprList, 0);
// if (pExpr->functionId != TSDB_FUNC_TS) {
// tscAddTimestampColumn(pQueryInfo, TSDB_FUNC_TS, 0);
// }
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
assert(pNew->numOfSubs == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
......@@ -334,7 +340,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
// fetch the join tag column
if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
SSqlExpr *pExpr = tscSqlExprGet(pNewQueryInfo, 0);
SSqlExpr* pExpr = tscSqlExprGet(pNewQueryInfo, 0);
assert(pQueryInfo->tagCond.joinInfo.hasJoin);
int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->uid);
......@@ -347,7 +353,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
tscTrace("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type,
taosArrayGetSize(pNewQueryInfo->exprsInfo), numOfCols,
taosArrayGetSize(pNewQueryInfo->exprList), numOfCols,
pNewQueryInfo->fieldsInfo.numOfOutput, pNewQueryInfo->pTableMetaInfo[0]->name);
}
......@@ -356,7 +362,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
tscError("%p failed to prepare subqueries objs for secondary phase query, numOfSub:%d, code:%d", pSql,
pSql->numOfSubs, pSql->res.code);
freeSubqueryObj(pSql);
freeJoinSubqueryObj(pSql);
return pSql->res.code;
}
......@@ -373,19 +379,22 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
return TSDB_CODE_SUCCESS;
}
void freeSubqueryObj(SSqlObj* pSql) {
void freeJoinSubqueryObj(SSqlObj* pSql) {
SSubqueryState* pState = NULL;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
if (pSql->pSubs[i] != NULL) {
SJoinSubquerySupporter* p = pSql->pSubs[i]->param;
pState = p->pState;
SSqlObj* pSub = pSql->pSubs[i];
if (pSub == NULL) {
continue;
}
SJoinSubquerySupporter* p = pSub->param;
pState = p->pState;
tscDestroyJoinSupporter(p);
tscDestroyJoinSupporter(p);
if (pSql->pSubs[i]->res.code == TSDB_CODE_SUCCESS) {
taos_free_result(pSql->pSubs[i]);
}
if (pSub->res.code == TSDB_CODE_SUCCESS) {
taos_free_result(pSub);
}
}
......@@ -393,15 +402,6 @@ void freeSubqueryObj(SSqlObj* pSql) {
pSql->numOfSubs = 0;
}
static void doQuitSubquery(SSqlObj* pParentSql) {
freeSubqueryObj(pParentSql);
// tsem_wait(&pParentSql->emptyRspSem);
// tsem_wait(&pParentSql->emptyRspSem);
// tsem_post(&pParentSql->rspSem);
}
static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSubquerySupporter* pSupporter) {
int32_t numOfTotal = pSupporter->pState->numOfTotal;
int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1);
......@@ -410,7 +410,7 @@ static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSubquerySupporter* pSupporter
pSqlObj->res.code = abs(pSupporter->pState->code);
tscError("%p all subquery return and query failed, global code:%d", pSqlObj, pSqlObj->res.code);
doQuitSubquery(pSqlObj);
freeJoinSubqueryObj(pSqlObj);
}
}
......@@ -422,17 +422,74 @@ static void updateQueryTimeRange(SQueryInfo* pQueryInfo, int64_t st, int64_t et)
pQueryInfo->window.ekey = et;
}
static void tSIntersectionAndLaunchSecQuery(SJoinSubquerySupporter* pSupporter, SSqlObj* pSql) {
SSqlObj* pParentSql = pSupporter->pObj;
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo* pParentQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex);
if (tscNonOrderedProjectionQueryOnSTable(pParentQueryInfo, 0)) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1);
// for projection query, need to try next vnode
// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes;
int32_t totalVnode = 0;
if ((++pTableMetaInfo->vgroupIndex) < totalVnode) {
tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql,
pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVnode, pRes->numOfTotal);
pSql->cmd.command = TSDB_SQL_SELECT;
pSql->fp = tscJoinQueryCallback;
tscProcessSql(pSql);
return;
}
}
int32_t numOfTotal = pSupporter->pState->numOfTotal;
int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1);
if (finished >= numOfTotal) {
assert(finished == numOfTotal);
if (pSupporter->pState->code != TSDB_CODE_SUCCESS) {
tscTrace("%p sub:%p, numOfSub:%d, quit from further procedure due to other queries failure", pParentSql, pSql,
pSupporter->subqueryIndex);
freeJoinSubqueryObj(pParentSql);
return;
}
tscTrace("%p all subqueries retrieve ts complete, do ts block intersect", pParentSql);
SJoinSubquerySupporter* p1 = pParentSql->pSubs[0]->param;
SJoinSubquerySupporter* p2 = pParentSql->pSubs[1]->param;
TSKEY st, et;
int64_t num = doTSBlockIntersect(pParentSql, p1, p2, &st, &et);
if (num <= 0) { // no result during ts intersect
tscTrace("%p free all sub SqlObj and quit", pParentSql);
freeJoinSubqueryObj(pParentSql);
} else {
updateQueryTimeRange(pParentQueryInfo, st, et);
tscLaunchSecondPhaseSubqueries(pParentSql);
}
}
}
static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
SJoinSubquerySupporter* pSupporter = (SJoinSubquerySupporter*)param;
SSqlObj* pParentSql = pSupporter->pObj;
SSqlObj* pParentSql = pSupporter->pObj;
SSqlObj* pSql = (SSqlObj*)tres;
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) == 0) {
if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) {
if (pSupporter->pState->code != TSDB_CODE_SUCCESS) {
tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, numOfRows,
pSupporter->pState->code);
......@@ -440,101 +497,55 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
quitAllSubquery(pParentSql, pSupporter);
return;
}
if (numOfRows < 0) {
tscError("%p sub query failed, code:%d, index:%d", pSql, numOfRows, pSupporter->subqueryIndex);
pSupporter->pState->code = numOfRows;
quitAllSubquery(pParentSql, pSupporter);
return;
} else if (numOfRows == 0) {
tSIntersectionAndLaunchSecQuery(pSupporter, pSql);
return;
}
if (numOfRows > 0) { // write the data into disk
fwrite(pSql->res.data, pSql->res.numOfRows, 1, pSupporter->f);
fclose(pSupporter->f);
STSBuf* pBuf = tsBufCreateFromFile(pSupporter->path, true);
if (pBuf == NULL) {
tscError("%p invalid ts comp file from vnode, abort sub query, file size:%d", pSql, numOfRows);
pSupporter->pState->code = TSDB_CODE_APP_ERROR; // todo set the informative code
quitAllSubquery(pParentSql, pSupporter);
return;
}
// write the compressed timestamp to disk file
fwrite(pSql->res.data, pSql->res.numOfRows, 1, pSupporter->f);
fclose(pSupporter->f);
pSupporter->f = NULL;
STSBuf* pBuf = tsBufCreateFromFile(pSupporter->path, true);
if (pBuf == NULL) {
tscError("%p invalid ts comp file from vnode, abort subquery, file size:%d", pSql, numOfRows);
if (pSupporter->pTSBuf == NULL) {
tscTrace("%p create tmp file for ts block:%s", pSql, pBuf->path);
pSupporter->pTSBuf = pBuf;
} else {
assert(pQueryInfo->numOfTables == 1); // for subquery, only one metermetaInfo
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
pSupporter->pState->code = TSDB_CODE_APP_ERROR; // todo set the informative code
quitAllSubquery(pParentSql, pSupporter);
return;
}
tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vgroupIndex);
tsBufDestory(pBuf);
}
if (pSupporter->pTSBuf == NULL) {
tscTrace("%p create tmp file for ts block:%s, size:%d bytes", pSql, pBuf->path, numOfRows);
pSupporter->pTSBuf = pBuf;
} else {
assert(pQueryInfo->numOfTables == 1); // for subquery, only one
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// open new file to save the result
tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vgroupIndex);
tsBufDestory(pBuf);
}
if (pSql->res.completed) {
tSIntersectionAndLaunchSecQuery(pSupporter, pSql);
} else { // open a new file to save the incoming result
getTmpfilePath("ts-join", pSupporter->path);
pSupporter->f = fopen(pSupporter->path, "w");
pSql->res.row = pSql->res.numOfRows;
taos_fetch_rows_a(tres, joinRetrieveCallback, param);
} else if (numOfRows == 0) { // no data from this vnode anymore
SQueryInfo* pParentQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex);
//todo refactor
if (tscNonOrderedProjectionQueryOnSTable(pParentQueryInfo, 0)) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1);
// for projection query, need to try next vnode
// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes;
int32_t totalVnode = 0;
if ((++pTableMetaInfo->vgroupIndex) < totalVnode) {
tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql,
pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVnode, pRes->numOfTotal);
pSql->cmd.command = TSDB_SQL_SELECT;
pSql->fp = tscJoinQueryCallback;
tscProcessSql(pSql);
return;
}
}
int32_t numOfTotal = pSupporter->pState->numOfTotal;
int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1);
if (finished >= numOfTotal) {
assert(finished == numOfTotal);
if (pSupporter->pState->code != TSDB_CODE_SUCCESS) {
tscTrace("%p sub:%p, numOfSub:%d, quit from further procedure due to other queries failure", pParentSql, tres,
pSupporter->subqueryIndex);
doQuitSubquery(pParentSql);
return;
}
tscTrace("%p all subqueries retrieve ts complete, do ts block intersect", pParentSql);
SJoinSubquerySupporter* p1 = pParentSql->pSubs[0]->param;
SJoinSubquerySupporter* p2 = pParentSql->pSubs[1]->param;
TSKEY st, et;
int64_t num = doTSBlockIntersect(pParentSql, p1, p2, &st, &et);
if (num <= 0) { // no result during ts intersect
tscTrace("%p free all sub SqlObj and quit", pParentSql);
doQuitSubquery(pParentSql);
} else {
updateQueryTimeRange(pParentQueryInfo, st, et);
tscLaunchSecondPhaseSubqueries(pParentSql);
}
}
} else { // failure of sub query
tscError("%p sub query failed, code:%d, index:%d", pSql, numOfRows, pSupporter->subqueryIndex);
pSupporter->pState->code = numOfRows;
quitAllSubquery(pParentSql, pSupporter);
return;
}
} else { // secondary stage retrieve, driven by taos_fetch_row or other functions
if (numOfRows < 0) {
pSupporter->pState->code = numOfRows;
tscError("%p retrieve failed, code:%d, index:%d", pSql, numOfRows, pSupporter->subqueryIndex);
tscError("%p retrieve failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);
}
if (numOfRows >= 0) {
......@@ -542,20 +553,20 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
}
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && numOfRows == 0) {
// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1);
// for projection query, need to try next vnode if current vnode is exhausted
// if ((++pTableMetaInfo->vgroupIndex) < pTableMetaInfo->pMetricMeta->numOfVnodes) {
// pSupporter->pState->numOfCompleted = 0;
// pSupporter->pState->numOfTotal = 1;
//
// pSql->cmd.command = TSDB_SQL_SELECT;
// pSql->fp = tscJoinQueryCallback;
// tscProcessSql(pSql);
//
// return;
// }
// for projection query, need to try next vnode if current vnode is exhausted
if ((++pTableMetaInfo->vgroupIndex) < pTableMetaInfo->vgroupList->numOfVgroups) {
pSupporter->pState->numOfCompleted = 0;
pSupporter->pState->numOfTotal = 1;
pSql->cmd.command = TSDB_SQL_SELECT;
pSql->fp = tscJoinQueryCallback;
tscProcessSql(pSql);
return;
}
}
int32_t numOfTotal = pSupporter->pState->numOfTotal;
......@@ -567,11 +578,23 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
pParentSql->res.code);
if (pSupporter->pState->code != TSDB_CODE_SUCCESS) {
pParentSql->res.code = abs(pSupporter->pState->code);
freeSubqueryObj(pParentSql);
pParentSql->res.code = pSupporter->pState->code;
freeJoinSubqueryObj(pParentSql);
pParentSql->res.completed = true;
}
// tsem_post(&pParentSql->rspSem);
// update the records for each subquery in parent sql object.
for(int32_t i = 0; i < pParentSql->numOfSubs; ++i) {
if (pParentSql->pSubs[i] == NULL) {
continue;
}
SSqlRes* pRes1 = &pParentSql->pSubs[i]->res;
pRes1->numOfTotalInCurrentClause += pRes1->numOfRows;
}
// data has retrieved to client, build the join results
tscBuildResFromSubqueries(pParentSql);
} else {
tscTrace("%p sub:%p completed, completed:%d, total:%d", pParentSql, tres, finished, numOfTotal);
}
......@@ -599,38 +622,61 @@ static SJoinSubquerySupporter* tscUpdateSubqueryStatus(SSqlObj* pSql, int32_t nu
}
void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
int32_t numOfFetch = 0;
assert(pSql->numOfSubs >= 1);
int32_t numOfFetch = 0;
bool hasData = true;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
if (pSql->pSubs[i] == NULL) { // this subquery does not need to involve in secondary query
// if the subquery is NULL, it does not involved in the final result generation
SSqlObj* pSub = pSql->pSubs[i];
if (pSub == NULL) {
continue;
}
SSqlRes *pRes = &pSql->pSubs[i]->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0);
// STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSqlRes *pRes = &pSub->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
// if (pRes->row >= pRes->numOfRows && pTableMetaInfo->vgroupIndex < pTableMetaInfo->pMetricMeta->numOfVnodes &&
// (!tscHasReachLimitation(pQueryInfo, pRes))) {
// numOfFetch++;
// }
} else {
if (pRes->row >= pRes->numOfRows && (!tscHasReachLimitation(pQueryInfo, pRes))) {
if (pRes->row >= pRes->numOfRows && pTableMetaInfo->vgroupIndex < pTableMetaInfo->vgroupList->numOfVgroups &&
(!tscHasReachLimitation(pQueryInfo, pRes)) && !pRes->completed) {
numOfFetch++;
}
} else {
if (!tscHasReachLimitation(pQueryInfo, pRes)) {
if (pRes->row >= pRes->numOfRows) {
hasData = false;
if (!pRes->completed) {
numOfFetch++;
}
}
} else { // has reach the limitation, no data anymore
hasData = false;
}
}
}
if (numOfFetch <= 0) {
// has data remains in client side, and continue to return data to app
if (hasData) {
tscBuildResFromSubqueries(pSql);
return;
} else if (numOfFetch <= 0) {
pSql->res.completed = true;
freeJoinSubqueryObj(pSql);
if (pSql->res.code == TSDB_CODE_SUCCESS) {
(*pSql->fp)(pSql->param, pSql, 0);
} else {
tscQueueAsyncRes(pSql);
}
return;
}
// TODO multi-vnode retrieve for projection query with limitation has bugs, since the global limiation is not handled
tscTrace("%p retrieve data from %d subqueries", pSql, numOfFetch);
SJoinSubquerySupporter* pSupporter = tscUpdateSubqueryStatus(pSql, numOfFetch);
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
......@@ -664,19 +710,6 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
tscProcessSql(pSql1);
}
}
// wait for all subquery completed
// tsem_wait(&pSql->rspSem);
// update the records for each subquery
for(int32_t i = 0; i < pSql->numOfSubs; ++i) {
if (pSql->pSubs[i] == NULL) {
continue;
}
SSqlRes* pRes1 = &pSql->pSubs[i]->res;
pRes1->numOfTotalInCurrentClause += pRes1->numOfRows;
}
}
// all subqueries return, set the result output index
......@@ -710,7 +743,7 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
SSqlCmd* pSubCmd = &pSql->pSubs[tableIndexOfSub]->cmd;
SQueryInfo* pSubQueryInfo = tscGetQueryInfoDetail(pSubCmd, 0);
size_t numOfExprs = taosArrayGetSize(pSubQueryInfo->exprsInfo);
size_t numOfExprs = taosArrayGetSize(pSubQueryInfo->exprList);
for (int32_t k = 0; k < numOfExprs; ++k) {
SSqlExpr* pSubExpr = tscSqlExprGet(pSubQueryInfo, k);
if (pExpr->functionId == pSubExpr->functionId && pExpr->colInfo.colId == pSubExpr->colInfo.colId) {
......@@ -723,35 +756,17 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
SSqlObj* pSql = (SSqlObj*)tres;
// STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
// int32_t idx = pSql->cmd.vnodeIdx;
SJoinSubquerySupporter* pSupporter = (SJoinSubquerySupporter*)param;
// if (atomic_add_fetch_32(pSupporter->numOfComplete, 1) >=
// pSupporter->numOfTotal) {
// SSqlObj *pParentObj = pSupporter->pObj;
//
// if ((pSql->cmd.type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != 1) {
// int32_t num = 0;
// tscFetchDatablockFromSubquery(pParentObj);
// TSKEY* ts = tscGetQualifiedTSList(pParentObj, &num);
//
// if (num <= 0) {
// // no qualified result
// }
//
// tscLaunchSecondPhaseSubqueries(pSql, ts, num);
// } else {
// }
// } else {
// There is only one subquery and table for each subquery.
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
assert(pQueryInfo->numOfTables == 1 && pSql->cmd.numOfClause == 1);
if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) {
if (code != TSDB_CODE_SUCCESS) { // direct call joinRetrieveCallback and set the error code
joinRetrieveCallback(param, pSql, code);
} else { // first stage query, continue to retrieve data
} else { // first stage query, continue to retrieve compressed time stamp data
pSql->fp = joinRetrieveCallback;
pSql->cmd.command = TSDB_SQL_FETCH;
tscProcessSql(pSql);
......@@ -789,24 +804,17 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
* data instead of returning to its invoker
*/
if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
// assert(pTableMetaInfo->vgroupIndex < pTableMetaInfo->pMetricMeta->numOfVnodes);
pSupporter->pState->numOfCompleted = 0; // reset the record value
pSql->fp = joinRetrieveCallback; // continue retrieve data
pSql->cmd.command = TSDB_SQL_FETCH;
tscProcessSql(pSql);
} else { // first retrieve from vnode during the secondary stage sub-query
if (pParentSql->fp == NULL) {
// tsem_post(&pParentSql->rspSem);
// set the command flag must be after the semaphore been correctly set.
if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
(*pParentSql->fp)(pParentSql->param, pParentSql, 0);
} else {
// set the command flag must be after the semaphore been correctly set.
// pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
// if (pPObj->res.code == TSDB_CODE_SUCCESS) {
// (*pPObj->fp)(pPObj->param, pPObj, 0);
// } else {
// tscQueueAsyncRes(pPObj);
// }
assert(0);
tscQueueAsyncRes(pParentSql);
}
}
}
......@@ -856,12 +864,19 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
pCol->colIndex.tableIndex = 0;
}
tscColumnListCopy(pSupporter->colList, pNewQueryInfo->colList, 0);
pSupporter->colList = pNewQueryInfo->colList;
pNewQueryInfo->colList = NULL;
pSupporter->exprList = pNewQueryInfo->exprList;
pNewQueryInfo->exprList = NULL;
pSupporter->fieldsInfo = pNewQueryInfo->fieldsInfo;
pSupporter->exprsInfo = tscSqlExprCopy(pNewQueryInfo->exprsInfo, pSupporter->uid, false);
tscFieldInfoCopy(&pSupporter->fieldsInfo, &pNewQueryInfo->fieldsInfo);
// this data needs to be transfer to support struct
memset(&pNewQueryInfo->fieldsInfo, 0, sizeof(SFieldInfo));
tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond);
pSupporter->tagCond = pNewQueryInfo->tagCond;
memset(&pNewQueryInfo->tagCond, 0, sizeof(STagCond));
pNew->cmd.numOfCols = 0;
pNewQueryInfo->intervalTime = 0;
......@@ -871,15 +886,10 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
pSupporter->groupbyExpr = pNewQueryInfo->groupbyExpr;
memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr));
// this data needs to be transfer to support struct
pNewQueryInfo->fieldsInfo.numOfOutput = 0;
// set the ts,tags that involved in join, as the output column of intermediate result
tscClearSubqueryInfo(&pNew->cmd);
SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
SColumnIndex index = {0, PRIMARYKEY_TIMESTAMP_COL_INDEX};
tscInitQueryInfo(pNewQueryInfo);
tscAddSpecialColumnForSelect(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL);
// set the tags value for ts_comp function
......@@ -892,14 +902,16 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
pExpr->numOfParams = 1;
// add the filter tag column
size_t s = taosArrayGetSize(pSupporter->colList);
if (pSupporter->colList != NULL) {
size_t s = taosArrayGetSize(pSupporter->colList);
for (int32_t i = 0; i < s; ++i) {
SColumn *pCol = taosArrayGetP(pSupporter->colList, i);
for (int32_t i = 0; i < s; ++i) {
SColumn *pCol = taosArrayGetP(pSupporter->colList, i);
if (pCol->numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered.
SColumn* p = tscColumnClone(pCol);
taosArrayPush(pNewQueryInfo->colList, &p);
if (pCol->numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered.
SColumn* p = tscColumnClone(pCol);
taosArrayPush(pNewQueryInfo->colList, &p);
}
}
}
......@@ -910,25 +922,19 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type,
tscSqlExprNumOfExprs(pNewQueryInfo), numOfCols,
pNewQueryInfo->fieldsInfo.numOfOutput, pNewQueryInfo->pTableMetaInfo[0]->name);
tscPrintSelectClause(pNew, 0);
} else {
assert(0);
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY;
}
#ifdef _DEBUG_VIEW
tscPrintSelectClause(pNew, 0);
#endif
return tscProcessSql(pNew);
}
// todo support async join query
int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
assert((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0);
SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
......@@ -955,13 +961,7 @@ int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) {
}
}
// tsem_wait(&pSql->rspSem);
if (pSql->numOfSubs <= 0) {
pSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
} else {
pSql->cmd.command = TSDB_SQL_METRIC_JOIN_RETRIEVE;
}
pSql->cmd.command = (pSql->numOfSubs <= 0)? TSDB_SQL_RETRIEVE_EMPTY_RESULT:TSDB_SQL_METRIC_JOIN_RETRIEVE;
return TSDB_CODE_SUCCESS;
}
......@@ -1051,7 +1051,9 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
trs->pParentSqlObj = pSql;
trs->pFinalColModel = pModel;
pthread_mutexattr_t mutexattr = {0};
pthread_mutexattr_t mutexattr;
memset(&mutexattr, 0, sizeof(pthread_mutexattr_t));
pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE_NP);
pthread_mutex_init(&trs->queryMutex, &mutexattr);
pthread_mutexattr_destroy(&mutexattr);
......@@ -1179,8 +1181,8 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
trsupport->localBuffer->numOfElems = 0;
pthread_mutex_unlock(&trsupport->queryMutex);
tscTrace("%p sub:%p retrieve failed, code:%d, orderOfSub:%d, retry:%d", trsupport->pParentSqlObj, pSql, numOfRows,
subqueryIndex, trsupport->numOfRetry);
tscTrace("%p sub:%p retrieve failed, code:%s, orderOfSub:%d, retry:%d", trsupport->pParentSqlObj, pSql,
tstrerror(numOfRows), subqueryIndex, trsupport->numOfRetry);
SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql);
if (pNew == NULL) {
......@@ -1576,3 +1578,230 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
return TSDB_CODE_SUCCESS;
}
void tscBuildResFromSubqueries(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
if (pRes->code != TSDB_CODE_SUCCESS) {
tscQueueAsyncRes(pSql);
return;
}
while (1) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
if (pRes->tsrow == NULL) {
pRes->tsrow = calloc(numOfExprs, POINTER_BYTES);
}
bool success = false;
int32_t numOfTableHasRes = 0;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
if (pSql->pSubs[i] != 0) {
numOfTableHasRes++;
}
}
if (numOfTableHasRes >= 2) { // do merge result
success = (doSetResultRowData(pSql->pSubs[0], false) != NULL) && (doSetResultRowData(pSql->pSubs[1], false) != NULL);
} else { // only one subquery
SSqlObj *pSub = pSql->pSubs[0];
if (pSub == NULL) {
pSub = pSql->pSubs[1];
}
success = (doSetResultRowData(pSub, false) != NULL);
}
if (success) { // current row of final output has been built, return to app
for (int32_t i = 0; i < numOfExprs; ++i) {
SColumnIndex* pIndex = &pRes->pColumnIndex[i];
SSqlRes *pRes1 = &pSql->pSubs[pIndex->tableIndex]->res;
pRes->tsrow[i] = pRes1->tsrow[pIndex->columnIndex];
}
pRes->numOfTotalInCurrentClause++;
break;
} else { // continue retrieve data from vnode
if (!tscHashRemainDataInSubqueryResultSet(pSql)) {
tscTrace("%p at least one subquery exhausted, free all other %d subqueries", pSql, pSql->numOfSubs - 1);
SSubqueryState *pState = NULL;
// free all sub sqlobj
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SSqlObj *pChildObj = pSql->pSubs[i];
if (pChildObj == NULL) {
continue;
}
SJoinSubquerySupporter *pSupporter = (SJoinSubquerySupporter *)pChildObj->param;
pState = pSupporter->pState;
tscDestroyJoinSupporter(pChildObj->param);
taos_free_result(pChildObj);
}
free(pState);
return;
}
tscFetchDatablockFromSubquery(pSql);
if (pRes->code != TSDB_CODE_SUCCESS) {
return;
}
}
}
if (pSql->res.code == TSDB_CODE_SUCCESS) {
(*pSql->fp)(pSql->param, pSql, 0);
} else {
tscQueueAsyncRes(pSql);
}
}
static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pField) {
SSqlRes *pRes = &pSql->res;
if (isNull(pRes->tsrow[columnIndex], pField->type)) {
pRes->tsrow[columnIndex] = NULL;
} else if (pField->type == TSDB_DATA_TYPE_NCHAR) {
// convert unicode to native code in a temporary buffer extra one byte for terminated symbol
if (pRes->buffer[columnIndex] == NULL) {
pRes->buffer[columnIndex] = malloc(pField->bytes + TSDB_NCHAR_SIZE);
}
/* string terminated char for binary data*/
memset(pRes->buffer[columnIndex], 0, pField->bytes + TSDB_NCHAR_SIZE);
if (taosUcs4ToMbs(pRes->tsrow[columnIndex], pField->bytes, pRes->buffer[columnIndex])) {
pRes->tsrow[columnIndex] = pRes->buffer[columnIndex];
} else {
tscError("%p charset:%s to %s. val:%ls convert failed.", pSql, DEFAULT_UNICODE_ENCODEC, tsCharset, pRes->tsrow);
pRes->tsrow[columnIndex] = NULL;
}
}
}
void **doSetResultRowData(SSqlObj *pSql, bool finalResult) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
assert(pRes->row >= 0 && pRes->row <= pRes->numOfRows);
if(pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE) {
if (pRes->completed) {
tfree(pRes->tsrow);
}
return pRes->tsrow;
}
if (pRes->row >= pRes->numOfRows) { // all the results has returned to invoker
tfree(pRes->tsrow);
return pRes->tsrow;
}
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
SFieldSupInfo* pSup = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, i);
if (pSup->pSqlExpr != NULL) {
pRes->tsrow[i] = tscGetResultColumnChr(pRes, pQueryInfo, i) + pSup->pSqlExpr->resBytes * pRes->row;
}
// primary key column cannot be null in interval query, no need to check
if (i == 0 && pQueryInfo->intervalTime > 0) {
continue;
}
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
transferNcharData(pSql, i, pField);
// calculate the result from several other columns
if (pSup->pArithExprInfo != NULL) {
// SArithmeticSupport *sas = (SArithmeticSupport *)calloc(1, sizeof(SArithmeticSupport));
// sas->offset = 0;
// sas-> = pQueryInfo->fieldsInfo.pExpr[i];
//
// sas->numOfCols = sas->pExpr->binExprInfo.numOfCols;
//
// if (pRes->buffer[i] == NULL) {
// pRes->buffer[i] = malloc(tscFieldInfoGetField(pQueryInfo, i)->bytes);
// }
//
// for(int32_t k = 0; k < sas->numOfCols; ++k) {
// int32_t columnIndex = sas->pExpr->binExprInfo.pReqColumns[k].colIdxInBuf;
// SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, columnIndex);
//
// sas->elemSize[k] = pExpr->resBytes;
// sas->data[k] = (pRes->data + pRes->numOfRows* pExpr->offset) + pRes->row*pExpr->resBytes;
// }
//
// tSQLBinaryExprCalcTraverse(sas->pExpr->binExprInfo.pBinExpr, 1, pRes->buffer[i], sas, TSQL_SO_ASC, getArithemicInputSrc);
// pRes->tsrow[i] = pRes->buffer[i];
//
// free(sas); //todo optimization
}
}
pRes->row++; // index increase one-step
return pRes->tsrow;
}
static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) {
bool hasData = true;
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
bool allSubqueryExhausted = true;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
if (pSql->pSubs[i] == NULL) {
continue;
}
SSqlRes *pRes1 = &pSql->pSubs[i]->res;
SSqlCmd *pCmd1 = &pSql->pSubs[i]->cmd;
SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(pCmd1, pCmd1->clauseIndex);
assert(pQueryInfo1->numOfTables == 1);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo1, 0);
/*
* if the global limitation is not reached, and current result has not exhausted, or next more vnodes are
* available, goes on
*/
if (pTableMetaInfo->vgroupIndex < pTableMetaInfo->vgroupList->numOfVgroups && pRes1->row < pRes1->numOfRows &&
(!tscHasReachLimitation(pQueryInfo1, pRes1))) {
allSubqueryExhausted = false;
break;
}
}
hasData = !allSubqueryExhausted;
} else { // otherwise, in case inner join, if any subquery exhausted, query completed.
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
if (pSql->pSubs[i] == 0) {
continue;
}
SSqlRes * pRes1 = &pSql->pSubs[i]->res;
SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0);
if ((pRes1->row >= pRes1->numOfRows && tscHasReachLimitation(pQueryInfo1, pRes1) &&
tscProjectionQueryOnTable(pQueryInfo1)) ||
(pRes1->numOfRows == 0)) {
hasData = false;
break;
}
}
}
return hasData;
}
......@@ -844,12 +844,12 @@ SFieldSupInfo* tscFieldInfoInsert(SFieldInfo* pFieldInfo, int32_t index, TAOS_FI
void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo) {
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->offset = 0;
for (int32_t i = 1; i < numOfExprs; ++i) {
SSqlExpr* prev = taosArrayGetP(pQueryInfo->exprsInfo, i - 1);
SSqlExpr* p = taosArrayGetP(pQueryInfo->exprsInfo, i);
SSqlExpr* prev = taosArrayGetP(pQueryInfo->exprList, i - 1);
SSqlExpr* p = taosArrayGetP(pQueryInfo->exprList, i);
p->offset = prev->offset + prev->resBytes;
}
......@@ -860,13 +860,13 @@ void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo) {
return;
}
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->offset = 0;
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 1; i < numOfExprs; ++i) {
SSqlExpr* prev = taosArrayGetP(pQueryInfo->exprsInfo, i - 1);
SSqlExpr* p = taosArrayGetP(pQueryInfo->exprsInfo, i);
SSqlExpr* prev = taosArrayGetP(pQueryInfo->exprList, i - 1);
SSqlExpr* p = taosArrayGetP(pQueryInfo->exprList, i);
p->offset = prev->offset + prev->resBytes;
}
......@@ -875,8 +875,17 @@ void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo) {
void tscFieldInfoCopy(SFieldInfo* dst, const SFieldInfo* src) {
dst->numOfOutput = src->numOfOutput;
taosArrayCopy(dst->pFields, src->pFields);
taosArrayCopy(dst->pSupportInfo, src->pSupportInfo);
if (dst->pFields == NULL) {
dst->pFields = taosArrayClone(src->pFields);
} else {
taosArrayCopy(dst->pFields, src->pFields);
}
if (dst->pSupportInfo == NULL) {
dst->pSupportInfo = taosArrayClone(src->pSupportInfo);
} else {
taosArrayCopy(dst->pSupportInfo, src->pSupportInfo);
}
}
TAOS_FIELD* tscFieldInfoGetField(SFieldInfo* pFieldInfo, int32_t index) {
......@@ -983,20 +992,20 @@ static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SCol
SSqlExpr* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t interSize, bool isTagCol) {
int32_t num = taosArrayGetSize(pQueryInfo->exprsInfo);
int32_t num = taosArrayGetSize(pQueryInfo->exprList);
if (index == num) {
return tscSqlExprAppend(pQueryInfo, functionId, pColIndex, type, size, interSize, isTagCol);
}
SSqlExpr* pExpr = doBuildSqlExpr(pQueryInfo, functionId, pColIndex, type, size, interSize, isTagCol);
taosArrayInsert(pQueryInfo->exprsInfo, index, &pExpr);
taosArrayInsert(pQueryInfo->exprList, index, &pExpr);
return pExpr;
}
SSqlExpr* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t interSize, bool isTagCol) {
SSqlExpr* pExpr = doBuildSqlExpr(pQueryInfo, functionId, pColIndex, type, size, interSize, isTagCol);
taosArrayPush(pQueryInfo->exprsInfo, &pExpr);
taosArrayPush(pQueryInfo->exprList, &pExpr);
return pExpr;
}
......@@ -1020,7 +1029,7 @@ SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functi
}
int32_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo) {
return taosArrayGetSize(pQueryInfo->exprsInfo);
return taosArrayGetSize(pQueryInfo->exprList);
}
void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes, int16_t tableIndex) {
......@@ -1037,7 +1046,7 @@ void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes,
}
SSqlExpr* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index) {
return taosArrayGetP(pQueryInfo->exprsInfo, index);
return taosArrayGetP(pQueryInfo->exprList, index);
}
void* sqlExprDestroy(SSqlExpr* pExpr) {
......@@ -1068,14 +1077,10 @@ void tscSqlExprInfoDestroy(SArray* pExprInfo) {
taosArrayDestroy(pExprInfo);
}
SArray* tscSqlExprCopy(const SArray* src, uint64_t uid, bool deepcopy) {
if (src == NULL || taosArrayGetSize(src) == 0) {
return taosArrayInit(1, POINTER_BYTES);
}
void tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy) {
assert(src != NULL && dst != NULL);
size_t size = taosArrayGetSize(src);
SArray* dst = taosArrayInit(size, POINTER_BYTES);
for (int32_t i = 0; i < size; ++i) {
SSqlExpr* pExpr = taosArrayGetP(src, i);
......@@ -1095,8 +1100,6 @@ SArray* tscSqlExprCopy(const SArray* src, uint64_t uid, bool deepcopy) {
}
}
}
return dst;
}
SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex) {
......@@ -1194,9 +1197,7 @@ static void tscColumnDestroy(SColumn* pCol) {
}
void tscColumnListCopy(SArray* dst, const SArray* src, int16_t tableIndex) {
if (src == NULL) {
return;
}
assert(src != NULL && dst != NULL);
size_t num = taosArrayGetSize(src);
for (int32_t i = 0; i < num; ++i) {
......@@ -1577,9 +1578,22 @@ STableMetaInfo* tscGetTableMetaInfoByUid(SQueryInfo* pQueryInfo, uint64_t uid, i
return tscGetMetaInfo(pQueryInfo, k);
}
void tscInitQueryInfo(SQueryInfo* pQueryInfo) {
assert(pQueryInfo->fieldsInfo.pFields == NULL);
pQueryInfo->fieldsInfo.pFields = taosArrayInit(4, sizeof(TAOS_FIELD));
assert(pQueryInfo->fieldsInfo.pSupportInfo == NULL);
pQueryInfo->fieldsInfo.pSupportInfo = taosArrayInit(4, sizeof(SFieldSupInfo));
assert(pQueryInfo->exprList == NULL);
pQueryInfo->exprList = taosArrayInit(4, POINTER_BYTES);
pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
}
int32_t tscAddSubqueryInfo(SSqlCmd* pCmd) {
assert(pCmd != NULL);
// todo refactor: remove this structure
size_t s = pCmd->numOfClause + 1;
char* tmp = realloc(pCmd->pQueryInfo, s * POINTER_BYTES);
if (tmp == NULL) {
......@@ -1589,12 +1603,7 @@ int32_t tscAddSubqueryInfo(SSqlCmd* pCmd) {
pCmd->pQueryInfo = (SQueryInfo**)tmp;
SQueryInfo* pQueryInfo = calloc(1, sizeof(SQueryInfo));
// todo refactor to extract functions.
pQueryInfo->fieldsInfo.pFields = taosArrayInit(4, sizeof(TAOS_FIELD));
pQueryInfo->fieldsInfo.pSupportInfo = taosArrayInit(4, sizeof(SFieldSupInfo));
pQueryInfo->exprsInfo = taosArrayInit(4, POINTER_BYTES);
tscInitQueryInfo(pQueryInfo);
pQueryInfo->msg = pCmd->payload; // pointer to the parent error message buffer
......@@ -1606,11 +1615,11 @@ static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) {
tscTagCondRelease(&pQueryInfo->tagCond);
tscFieldInfoClear(&pQueryInfo->fieldsInfo);
tscSqlExprInfoDestroy(pQueryInfo->exprsInfo);
memset(&pQueryInfo->exprsInfo, 0, sizeof(pQueryInfo->exprsInfo));
tscSqlExprInfoDestroy(pQueryInfo->exprList);
pQueryInfo->exprList = NULL;
tscColumnListDestroy(pQueryInfo->colList);
memset(&pQueryInfo->colList, 0, sizeof(pQueryInfo->colList));
pQueryInfo->colList = NULL;
if (pQueryInfo->groupbyExpr.columnInfo != NULL) {
taosArrayDestroy(pQueryInfo->groupbyExpr.columnInfo);
......@@ -1783,20 +1792,21 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(pnCmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
memcpy(pNewQueryInfo, pQueryInfo, sizeof(SQueryInfo));
memset(&pNewQueryInfo->fieldsInfo, 0, sizeof(SFieldInfo));
pNewQueryInfo->command = pQueryInfo->command;
pNewQueryInfo->type = pQueryInfo->type;
pNewQueryInfo->slidingTimeUnit = pQueryInfo->slidingTimeUnit;
pNewQueryInfo->window = pQueryInfo->window;
pNewQueryInfo->intervalTime = pQueryInfo->intervalTime;
pNewQueryInfo->slidingTime = pQueryInfo->slidingTime;
pNewQueryInfo->limit = pQueryInfo->limit;
pNewQueryInfo->slimit = pQueryInfo->slimit;
pNewQueryInfo->order = pQueryInfo->order;
pNewQueryInfo->clauseLimit = pQueryInfo->clauseLimit;
pNewQueryInfo->pTableMetaInfo = NULL;
pNewQueryInfo->defaultVal = NULL;
pNewQueryInfo->numOfTables = 0;
pNewQueryInfo->tsBuf = NULL;
pNewQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
pNewQueryInfo->fieldsInfo.pFields = taosArrayInit(4, sizeof(TAOS_FIELD));
pNewQueryInfo->fieldsInfo.pSupportInfo = taosArrayInit(4, sizeof(SFieldSupInfo));
pNewQueryInfo->exprsInfo = taosArrayInit(4, POINTER_BYTES);
tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond);
if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) {
......@@ -1821,7 +1831,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
}
uint64_t uid = pTableMetaInfo->pTableMeta->uid;
pNewQueryInfo->exprsInfo = tscSqlExprCopy(pQueryInfo->exprsInfo, uid, true);
tscSqlExprCopy(pNewQueryInfo->exprList, pQueryInfo->exprList, uid, true);
int32_t numOfOutput = tscSqlExprNumOfExprs(pNewQueryInfo);
......@@ -1841,20 +1851,21 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
}
}
// make sure the the sqlExpr for each fields is correct
// make sure the the sqlExpr for each fields is correct
// todo handle the agg arithmetic expression
for(int32_t f = 0; f < pNewQueryInfo->fieldsInfo.numOfOutput; ++f) {
TAOS_FIELD* field = tscFieldInfoGetField(&pNewQueryInfo->fieldsInfo, f);
for(int32_t k1 = 0; k1 < numOfExprs; ++k1) {
SSqlExpr* pExpr1 = tscSqlExprGet(pNewQueryInfo, k1);
if (strcmp(field->name, pExpr1->aliasName) == 0) {
if (strcmp(field->name, pExpr1->aliasName) == 0) { // eatablish link according to the result field name
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pNewQueryInfo->fieldsInfo, f);
pInfo->pSqlExpr = pExpr1;
}
}
}
tscFieldInfoUpdateOffsetForInterResult(pNewQueryInfo);
tscFieldInfoUpdateOffset(pNewQueryInfo);
}
pNew->fp = fp;
......
......@@ -25,13 +25,10 @@ __attribute__((unused)) static FORCE_INLINE size_t copy(char* dst, const char* s
}
void extractTableName(const char* tableId, char* name) {
size_t offset = strcspn(tableId, &TS_PATH_DELIMITER[0]);
offset = strcspn(&tableId[offset], &TS_PATH_DELIMITER[0]);
size_t s1 = strcspn(tableId, &TS_PATH_DELIMITER[0]);
size_t s2 = strcspn(&tableId[s1 + 1], &TS_PATH_DELIMITER[0]);
strncpy(name, &tableId[offset], TSDB_TABLE_NAME_LEN);
// char* r = skipSegments(tableId, TS_PATH_DELIMITER[0], 2);
// return copy(name, r, TS_PATH_DELIMITER[0]);
strncpy(name, &tableId[s1 + s2 + 2], TSDB_TABLE_NAME_LEN);
}
char* extractDBName(const char* tableId, char* name) {
......
......@@ -199,7 +199,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_SHELL_VNODE_BITS 24
#define TSDB_SHELL_SID_MASK 0xFF
#define TSDB_HTTP_TOKEN_LEN 20
#define TSDB_SHOW_SQL_LEN 32
#define TSDB_SHOW_SQL_LEN 512
#define TSDB_METER_STATE_OFFLINE 0
#define TSDB_METER_STATE_ONLLINE 1
......
......@@ -31,11 +31,11 @@ extern int32_t qdebugFlag;
#define qError(...) \
if (qdebugFlag & DEBUG_ERROR) { \
taosPrintLog("ERROR RPC ", qdebugFlag, __VA_ARGS__); \
taosPrintLog("ERROR QRY ", qdebugFlag, __VA_ARGS__); \
}
#define qWarn(...) \
if (qdebugFlag & DEBUG_WARN) { \
taosPrintLog("WARN RPC ", qdebugFlag, __VA_ARGS__); \
taosPrintLog("WARN QRY ", qdebugFlag, __VA_ARGS__); \
}
#ifdef __cplusplus
......
......@@ -5112,7 +5112,6 @@ static void tableQueryImpl(SQInfo *pQInfo) {
pQInfo->pointsInterpo += numOfInterpo;
qTrace("QInfo:%p current:%d returned, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total);
sem_post(&pQInfo->dataReady);
return;
}
......@@ -5133,7 +5132,6 @@ static void tableQueryImpl(SQInfo *pQInfo) {
if (pQuery->rec.rows > 0) {
qTrace("QInfo:%p %d rows returned from group results, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total);
sem_post(&pQInfo->dataReady);
return;
}
}
......@@ -5141,7 +5139,6 @@ static void tableQueryImpl(SQInfo *pQInfo) {
qTrace("QInfo:%p query over, %d rows are returned", pQInfo, pQuery->rec.total);
// vnodePrintQueryStatistics(pSupporter);
sem_post(&pQInfo->dataReady);
return;
}
......@@ -5166,14 +5163,12 @@ static void tableQueryImpl(SQInfo *pQInfo) {
/* check if query is killed or not */
if (isQueryKilled(pQInfo)) {
qTrace("QInfo:%p query is killed", pQInfo);
} else {
// STableId* pTableId = taosArrayGet(pQInfo->groupInfo, 0);
// qTrace("QInfo:%p uid:%" PRIu64 " tid:%d, query completed, %" PRId64 " rows returned, numOfTotal:%" PRId64 "
// rows",
// pQInfo, pTableId->uid, pTableId->tid, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
} else {// todo set the table uid and tid in log
// SArray* p = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0);
// SPair* pair = taosArrayGet(p, 0);
qTrace("QInfo:%p query paused, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows",
pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
}
sem_post(&pQInfo->dataReady);
}
static void stableQueryImpl(SQInfo *pQInfo) {
......@@ -5201,8 +5196,6 @@ static void stableQueryImpl(SQInfo *pQInfo) {
pQuery->rec.total);
// vnodePrintQueryStatistics(pSupporter);
}
sem_post(&pQInfo->dataReady);
}
static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg, SColumnInfo* pTagCols) {
......@@ -6017,9 +6010,15 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
unlink(pQuery->sdata[0]->data);
} else {
// todo return the error code to client
qError("QInfo:%p failed to open tmp file to send ts-comp data to client, path:%s, reason:%s", pQInfo,
pQuery->sdata[0]->data, strerror(errno));
}
// all data returned, set query over
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
setQueryStatus(pQuery, QUERY_OVER);
}
} else {
doCopyQueryResultToMsg(pQInfo, pQuery->rec.rows, data);
}
......@@ -6143,7 +6142,8 @@ void qTableQuery(qinfo_t qinfo) {
} else {
tableQueryImpl(pQInfo);
}
sem_post(&pQInfo->dataReady);
// vnodeDecRefCount(pQInfo);
}
......
......@@ -98,13 +98,13 @@ void taosArrayRemove(SArray* pArray, size_t index);
* @param pDst
* @param pSrc
*/
void taosArrayCopy(SArray* pDst, SArray* pSrc);
void taosArrayCopy(SArray* pDst, const SArray* pSrc);
/**
* clone a new array
* @param pSrc
*/
SArray* taosArrayClone(SArray* pSrc);
SArray* taosArrayClone(const SArray* pSrc);
/**
* destroy array list
......
......@@ -143,7 +143,7 @@ void taosArrayRemove(SArray* pArray, size_t index) {
pArray->size -= 1;
}
void taosArrayCopy(SArray* pDst, SArray* pSrc) {
void taosArrayCopy(SArray* pDst, const SArray* pSrc) {
assert(pSrc != NULL && pDst != NULL);
if (pDst->capacity < pSrc->size) {
......@@ -162,7 +162,7 @@ void taosArrayCopy(SArray* pDst, SArray* pSrc) {
pDst->size = pSrc->size;
}
SArray* taosArrayClone(SArray* pSrc) {
SArray* taosArrayClone(const SArray* pSrc) {
assert(pSrc != NULL);
if (pSrc->size == 0) { // empty array list
......
......@@ -97,7 +97,7 @@ static int32_t vnodeProcessRetrieveMsg(SVnodeObj *pVnode, void *pCont, int32_t c
if (qHasMoreResultsToRetrieve(pQInfo)) {
pRet->qhandle = pQInfo;
code = TSDB_CODE_ACTION_NEED_REPROCESSED;
} else {
} else {
// no further execution invoked, release the ref to vnode
qDestroyQueryInfo(pQInfo);
vnodeRelease(pVnode);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册