未验证 提交 052ace85 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #7198 from taosdata/feature/query

Feature/query
......@@ -29,15 +29,16 @@ extern "C" {
#include "tsched.h"
#include "tsclient.h"
#define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \
#define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_SUPER_TABLE))
#define UTIL_TABLE_IS_CHILD_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_CHILD_TABLE))
#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo)\
(!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo)))
#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \
#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo) \
(!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo) || UTIL_TABLE_IS_TMP_TABLE(metaInfo)))
#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE))
#pragma pack(push,1)
......@@ -221,7 +222,7 @@ void tscExprDestroy(SArray* pExprInfo);
int32_t createProjectionExpr(SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SExprInfo*** pExpr, int32_t* num);
void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta);
void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta, uint64_t id);
SColumn* tscColumnClone(const SColumn* src);
void tscColumnCopy(SColumn* pDest, const SColumn* pSrc);
......@@ -320,7 +321,7 @@ void tscPrintSelNodeList(SSqlObj* pSql, int32_t subClauseIndex);
bool hasMoreVnodesToTry(SSqlObj *pSql);
bool hasMoreClauseToTry(SSqlObj* pSql);
void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta);
void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeCachedMeta, uint64_t id);
void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp);
......@@ -359,7 +360,7 @@ bool vgroupInfoIdentical(SNewVgroupInfo *pExisted, SVgroupMsg* src);
SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg);
STblCond* tsGetTableFilter(SArray* filters, uint64_t uid, int16_t idx);
void tscRemoveTableMetaBuf(STableMetaInfo* pTableMetaInfo, uint64_t id);
void tscRemoveCachedTableMeta(STableMetaInfo* pTableMetaInfo, uint64_t id);
#ifdef __cplusplus
}
......
......@@ -447,7 +447,7 @@ void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBloc
void handleDownstreamOperator(SSqlObj** pSqlList, int32_t numOfUpstream, SQueryInfo* px, SSqlObj* pParent);
void destroyTableNameList(SInsertStatementParam* pInsertParam);
void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta);
void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta, uint64_t id);
/**
* free query result of the sql object
......
......@@ -351,7 +351,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if (pSql->pStream == NULL) {
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) {
if (pQueryInfo != NULL && TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) {
tscDebug("0x%" PRIx64 " continue parse sql after get table-meta", pSql->self);
code = tsParseSql(pSql, false);
......@@ -381,7 +381,6 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
} else {
if (pSql->retryReason != TSDB_CODE_SUCCESS) {
tscDebug("0x%" PRIx64 " update cached table-meta, re-validate sql statement and send query again", pSql->self);
tscResetSqlCmd(pCmd, false);
pSql->retryReason = TSDB_CODE_SUCCESS;
} else {
tscDebug("0x%" PRIx64 " cached table-meta, continue validate sql statement and send query", pSql->self);
......
......@@ -1595,7 +1595,7 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
if (pSql->parseRetry < 1 && (ret == TSDB_CODE_TSC_SQL_SYNTAX_ERROR || ret == TSDB_CODE_TSC_INVALID_OPERATION)) {
tscDebug("0x%"PRIx64 " parse insert sql statement failed, code:%s, clear meta cache and retry ", pSql->self, tstrerror(ret));
tscResetSqlCmd(pCmd, true);
tscResetSqlCmd(pCmd, true, pSql->self);
pSql->parseRetry++;
if ((ret = tsInsertInitialCheck(pSql)) == TSDB_CODE_SUCCESS) {
......@@ -1612,7 +1612,7 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
if (ret == TSDB_CODE_TSC_INVALID_OPERATION && pSql->parseRetry < 1 && sqlInfo.type == TSDB_SQL_SELECT) {
tscDebug("0x%"PRIx64 " parse query sql statement failed, code:%s, clear meta cache and retry ", pSql->self, tstrerror(ret));
tscResetSqlCmd(pCmd, true);
tscResetSqlCmd(pCmd, true, pSql->self);
pSql->parseRetry++;
ret = tscValidateSqlInfo(pSql, &sqlInfo);
......
......@@ -1694,7 +1694,7 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
if (taosHashGetSize(pCmd->insertParam.pTableBlockHashList) > 0) {
SHashObj* hashList = pCmd->insertParam.pTableBlockHashList;
pCmd->insertParam.pTableBlockHashList = NULL;
tscResetSqlCmd(pCmd, false);
tscResetSqlCmd(pCmd, false, pSql->self);
pCmd->insertParam.pTableBlockHashList = hashList;
}
......
......@@ -18,11 +18,11 @@
#include "tsclient.h"
#include "tsocket.h"
#include "ttimer.h"
#include "tutil.h"
#include "taosmsg.h"
#include "tcq.h"
#include "taos.h"
#include "tscUtil.h"
void tscSaveSlowQueryFp(void *handle, void *tmrId);
TAOS *tscSlowQueryConn = NULL;
......@@ -227,16 +227,16 @@ void tscKillStream(STscObj *pObj, uint32_t killId) {
int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
SHeartBeatMsg *pHeartbeat = pMsg;
int allocedQueriesNum = pHeartbeat->numOfQueries;
int allocedStreamsNum = pHeartbeat->numOfStreams;
pHeartbeat->numOfQueries = 0;
SQueryDesc *pQdesc = (SQueryDesc *)pHeartbeat->pData;
// We extract the lock to tscBuildHeartBeatMsg function.
int64_t now = taosGetTimestampMs();
SSqlObj *pSql = pObj->sqlList;
while (pSql) {
/*
* avoid sqlobj may not be correctly removed from sql list
......@@ -248,41 +248,55 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
}
tstrncpy(pQdesc->sql, pSql->sqlstr, sizeof(pQdesc->sql));
pQdesc->stime = htobe64(pSql->stime);
pQdesc->queryId = htonl(pSql->queryId);
//pQdesc->useconds = htobe64(pSql->res.useconds);
pQdesc->stime = htobe64(pSql->stime);
pQdesc->queryId = htonl(pSql->queryId);
pQdesc->useconds = htobe64(now - pSql->stime);
pQdesc->qId = htobe64(pSql->res.qId);
pQdesc->qId = htobe64(pSql->res.qId);
pQdesc->sqlObjId = htobe64(pSql->self);
pQdesc->pid = pHeartbeat->pid;
pQdesc->stableQuery = pSql->cmd.pQueryInfo->stableQuery;
pQdesc->pid = pHeartbeat->pid;
pQdesc->numOfSub = pSql->subState.numOfSub;
// todo race condition
pQdesc->stableQuery = 0;
char *p = pQdesc->subSqlInfo;
int32_t remainLen = sizeof(pQdesc->subSqlInfo);
if (pQdesc->numOfSub == 0) {
snprintf(p, remainLen, "N/A");
} else {
int32_t len;
for (int32_t i = 0; i < pQdesc->numOfSub; ++i) {
len = snprintf(p, remainLen, "[%d]0x%" PRIx64 "(%c) ", i,
pSql->pSubs[i]->self,
pSql->subState.states[i] ? 'C' : 'I');
if (len > remainLen) {
break;
// SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
// if (pQueryInfo != NULL) {
// pQdesc->stableQuery = (pQueryInfo->stableQuery)?1:0;
// } else {
// pQdesc->stableQuery = 0;
// }
if (pSql->pSubs != NULL && pSql->subState.states != NULL) {
for (int32_t i = 0; i < pQdesc->numOfSub; ++i) {
SSqlObj *psub = pSql->pSubs[i];
int64_t self = (psub != NULL)? psub->self : 0;
int32_t len = snprintf(p, remainLen, "[%d]0x%" PRIx64 "(%c) ", i, self, pSql->subState.states[i] ? 'C' : 'I');
if (len > remainLen) {
break;
}
remainLen -= len;
p += len;
}
remainLen -= len;
p += len;
}
}
pQdesc->numOfSub = htonl(pQdesc->numOfSub);
pQdesc->numOfSub = htonl(pQdesc->numOfSub);
taosGetFqdn(pQdesc->fqdn);
pHeartbeat->numOfQueries++;
pQdesc++;
pSql = pSql->next;
if (pHeartbeat->numOfQueries >= allocedQueriesNum) break;
if (pHeartbeat->numOfQueries >= allocedQueriesNum) {
break;
}
}
pHeartbeat->numOfStreams = 0;
......
......@@ -3725,7 +3725,8 @@ static int32_t doExtractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo,
if (pRight->tokenId != TK_SET || !serializeExprListToVariant(pRight->Expr.paramList, &pVal, colType, timePrecision)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg);
}
pColumnFilter->pz = (int64_t)calloc(1, pVal->nLen + 1);
pColumnFilter->pz = (int64_t)calloc(1, pVal->nLen);
pColumnFilter->len = pVal->nLen;
pColumnFilter->filterstr = 1;
memcpy((char *)(pColumnFilter->pz), (char *)(pVal->pz), pVal->nLen);
......@@ -5746,14 +5747,19 @@ static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) {
pQueryInfo->order.order = TSDB_ORDER_ASC;
if (isTopBottomQuery(pQueryInfo)) {
pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
} else { // in case of select tbname from super_table, the defualt order column can not be the primary ts column
pQueryInfo->order.orderColId = INT32_MIN;
} else { // in case of select tbname from super_table, the default order column can not be the primary ts column
pQueryInfo->order.orderColId = INT32_MIN; // todo define a macro
}
/* for super table query, set default ascending order for group output */
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
pQueryInfo->groupbyExpr.orderType = TSDB_ORDER_ASC;
}
if (pQueryInfo->distinct) {
pQueryInfo->order.order = TSDB_ORDER_ASC;
pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
}
}
int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, SSchema* pSchema) {
......@@ -5761,26 +5767,21 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
const char* msg1 = "invalid column name in orderby clause";
const char* msg2 = "too many order by columns";
const char* msg3 = "only primary timestamp/tbname/first tag in groupby clause allowed";
const char* msg4 = "only tag in groupby clause allowed in order by";
const char* msg5 = "only primary timestamp/column in top/bottom function allowed as orderby column";
const char* msg6 = "only primary timestamp allowed as the second orderby column";
const char* msg7 = "only primary timestamp/column in groupby clause allowed as orderby column";
const char* msg8 = "only column in groupby clause allowed as orderby column";
const char* msg4 = "only tag in groupby clause allowed in order clause";
const char* msg5 = "only primary timestamp/column in top/bottom function allowed as order column";
const char* msg6 = "only primary timestamp allowed as the second order column";
const char* msg7 = "only primary timestamp/column in groupby clause allowed as order column";
const char* msg8 = "only column in groupby clause allowed as order column";
setDefaultOrderInfo(pQueryInfo);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (pQueryInfo->distinct == true) {
pQueryInfo->order.order = TSDB_ORDER_ASC;
pQueryInfo->order.orderColId = 0;
return TSDB_CODE_SUCCESS;
}
if (pSqlNode->pSortOrder == NULL) {
if (pQueryInfo->distinct || pSqlNode->pSortOrder == NULL) {
return TSDB_CODE_SUCCESS;
}
SArray* pSortorder = pSqlNode->pSortOrder;
char* pMsgBuf = tscGetErrorMsgPayload(pCmd);
SArray* pSortOrder = pSqlNode->pSortOrder;
/*
* for table query, there is only one or none order option is allowed, which is the
......@@ -5788,19 +5789,19 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
*
* for super table query, the order option must be less than 3.
*/
size_t size = taosArrayGetSize(pSortorder);
if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) {
size_t size = taosArrayGetSize(pSortOrder);
if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_TMP_TABLE(pTableMetaInfo)) {
if (size > 1) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg0);
return invalidOperationMsg(pMsgBuf, msg0);
}
} else {
if (size > 2) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
return invalidOperationMsg(pMsgBuf, msg2);
}
}
// handle the first part of order by
tVariant* pVar = taosArrayGet(pSortorder, 0);
tVariant* pVar = taosArrayGet(pSortOrder, 0);
// e.g., order by 1 asc, return directly with out further check.
if (pVar->nType >= TSDB_DATA_TYPE_TINYINT && pVar->nType <= TSDB_DATA_TYPE_BIGINT) {
......@@ -5812,7 +5813,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { // super table query
if (getColumnIndexByName(&columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
return invalidOperationMsg(pMsgBuf, msg1);
}
bool orderByTags = false;
......@@ -5824,7 +5825,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
// it is a tag column
if (pQueryInfo->groupbyExpr.columnInfo == NULL) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4);
return invalidOperationMsg(pMsgBuf, msg4);
}
SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, 0);
if (relTagIndex == pColIndex->colIndex) {
......@@ -5845,13 +5846,14 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
orderByGroupbyCol = true;
}
}
if (!(orderByTags || orderByTS || orderByGroupbyCol) && !isTopBottomQuery(pQueryInfo)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
return invalidOperationMsg(pMsgBuf, msg3);
} else { // order by top/bottom result value column is not supported in case of interval query.
assert(!(orderByTags && orderByTS && orderByGroupbyCol));
}
size_t s = taosArrayGetSize(pSortorder);
size_t s = taosArrayGetSize(pSortOrder);
if (s == 1) {
if (orderByTags) {
pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
......@@ -5870,7 +5872,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
pExpr = tscExprGet(pQueryInfo, 1);
if (pExpr->base.colInfo.colIndex != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5);
return invalidOperationMsg(pMsgBuf, msg5);
}
tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0);
......@@ -5888,9 +5890,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
addPrimaryTsColIntoResult(pQueryInfo, pCmd);
}
}
}
if (s == 2) {
} else {
tVariantListItem *pItem = taosArrayGet(pSqlNode->pSortOrder, 0);
if (orderByTags) {
pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
......@@ -5907,22 +5907,23 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
tVariant* pVar2 = &pItem->pVar;
SStrToken cname = {pVar2->nLen, pVar2->nType, pVar2->pz};
if (getColumnIndexByName(&cname, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
return invalidOperationMsg(pMsgBuf, msg1);
}
if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
return invalidOperationMsg(pMsgBuf, msg6);
} else {
tVariantListItem* p1 = taosArrayGet(pSortorder, 1);
tVariantListItem* p1 = taosArrayGet(pSortOrder, 1);
pQueryInfo->order.order = p1->sortOrder;
pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
}
}
} else { // meter query
if (getColumnIndexByName(&columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
} else if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo)) { // check order by clause for normal table & temp table
if (getColumnIndexByName(&columnName, pQueryInfo, &index, pMsgBuf) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(pMsgBuf, msg1);
}
if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX && !isTopBottomQuery(pQueryInfo)) {
bool validOrder = false;
SArray *columnInfo = pQueryInfo->groupbyExpr.columnInfo;
......@@ -5930,23 +5931,23 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
SColIndex* pColIndex = taosArrayGet(columnInfo, 0);
validOrder = (pColIndex->colIndex == index.columnIndex);
}
if (!validOrder) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
return invalidOperationMsg(pMsgBuf, msg7);
}
tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0);
pQueryInfo->groupbyExpr.orderIndex = pSchema[index.columnIndex].colId;
pQueryInfo->groupbyExpr.orderType = p1->sortOrder;
}
if (isTopBottomQuery(pQueryInfo)) {
bool validOrder = false;
SArray *columnInfo = pQueryInfo->groupbyExpr.columnInfo;
if (columnInfo != NULL && taosArrayGetSize(columnInfo) > 0) {
SColIndex* pColIndex = taosArrayGet(columnInfo, 0);
validOrder = (pColIndex->colIndex == index.columnIndex);
if (!validOrder) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg8);
if (pColIndex->colIndex == index.columnIndex) {
return invalidOperationMsg(pMsgBuf, msg8);
}
} else {
/* order of top/bottom query in interval is not valid */
......@@ -5955,9 +5956,8 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
pExpr = tscExprGet(pQueryInfo, 1);
if (pExpr->base.colInfo.colIndex != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5);
return invalidOperationMsg(pMsgBuf, msg5);
}
validOrder = true;
}
tVariantListItem* pItem = taosArrayGet(pSqlNode->pSortOrder, 0);
......@@ -5967,6 +5967,18 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
return TSDB_CODE_SUCCESS;
}
tVariantListItem* pItem = taosArrayGet(pSqlNode->pSortOrder, 0);
pQueryInfo->order.order = pItem->sortOrder;
pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId;
} else {
// handle the temp table order by clause. You can order by any single column in case of the temp table, created by
// inner subquery.
assert(UTIL_TABLE_IS_TMP_TABLE(pTableMetaInfo) && taosArrayGetSize(pSqlNode->pSortOrder) == 1);
if (getColumnIndexByName(&columnName, pQueryInfo, &index, pMsgBuf) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(pMsgBuf, msg1);
}
tVariantListItem* pItem = taosArrayGet(pSqlNode->pSortOrder, 0);
pQueryInfo->order.order = pItem->sortOrder;
pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId;
......@@ -8734,8 +8746,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
const char* msg8 = "condition missing for join query";
const char* msg9 = "not support 3 level select";
int32_t code = TSDB_CODE_SUCCESS;
int32_t code = TSDB_CODE_SUCCESS;
SSqlCmd* pCmd = &pSql->cmd;
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......@@ -8757,7 +8768,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
}
if (pSqlNode->from->type == SQL_NODE_FROM_SUBQUERY) {
clearAllTableMetaInfo(pQueryInfo, false);
clearAllTableMetaInfo(pQueryInfo, false, pSql->self);
pQueryInfo->numOfTables = 0;
// parse the subquery in the first place
......@@ -9026,8 +9037,6 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
pQueryInfo->simpleAgg = isSimpleAggregateRv(pQueryInfo);
pQueryInfo->onlyTagQuery = onlyTagPrjFunction(pQueryInfo);
pQueryInfo->groupbyColumn = tscGroupbyColumn(pQueryInfo);
//pQueryInfo->globalMerge = tscIsTwoStageSTableQuery(pQueryInfo, 0);
pQueryInfo->arithmeticOnAgg = tsIsArithmeticQueryOnAggResult(pQueryInfo);
pQueryInfo->orderProjectQuery = tscOrderedProjectionQueryOnSTable(pQueryInfo, 0);
......
......@@ -525,6 +525,7 @@ static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) {
}
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
int64_t st = taosGetTimestampUs();
SSchedMsg schedMsg = {0};
schedMsg.fp = doProcessMsgFromServer;
......@@ -543,6 +544,11 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
schedMsg.msg = NULL;
taosScheduleTask(tscQhandle, &schedMsg);
int64_t et = taosGetTimestampUs();
if (et - st > 100) {
tscDebug("add message to task queue, elapsed time:%"PRId64, et - st);
}
}
int doBuildAndSendMsg(SSqlObj *pSql) {
......@@ -897,16 +903,16 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
SQueryAttr query = {{0}};
tscCreateQueryFromQueryInfo(pQueryInfo, &query, pSql);
query.vgId = pTableMeta->vgId;
SArray* tableScanOperator = createTableScanPlan(&query);
SArray* queryOperator = createExecOperatorPlan(&query);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version));
......@@ -2269,6 +2275,10 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
pMsg = buf;
}
if (pParentCmd->pTableMetaMap == NULL) {
pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
}
for (int32_t i = 0; i < pMultiMeta->numOfTables; i++) {
STableMetaMsg *pMetaMsg = (STableMetaMsg *)pMsg;
int32_t code = tableMetaMsgConvert(pMetaMsg);
......@@ -2605,7 +2615,7 @@ int tscProcessDropDbRsp(SSqlObj *pSql) {
int tscProcessDropTableRsp(SSqlObj *pSql) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0);
tscRemoveTableMetaBuf(pTableMetaInfo, pSql->self);
tscRemoveCachedTableMeta(pTableMetaInfo, pSql->self);
tfree(pTableMetaInfo->pTableMeta);
return 0;
}
......@@ -2990,13 +3000,11 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid);
}
// remove stored tableMeta info in hash table
tscRemoveTableMetaBuf(pTableMetaInfo, pSql->self);
pCmd->pTableMetaMap = tscCleanupTableMetaMap(pCmd->pTableMetaMap);
pCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
// remove stored tableMeta info in hash table
tscResetSqlCmd(pCmd, true, pSql->self);
SArray* pNameList = taosArrayInit(1, POINTER_BYTES);
SArray* pNameList = taosArrayInit(1, POINTER_BYTES);
SArray* vgroupList = taosArrayInit(1, POINTER_BYTES);
char* n = strdup(name);
......
......@@ -113,7 +113,7 @@ static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) {
pQueryInfo->command = TSDB_SQL_SELECT;
pSql->fp = tscProcessStreamQueryCallback;
pSql->fp = tscProcessStreamQueryCallback;
pSql->fetchFp = tscProcessStreamQueryCallback;
executeQuery(pSql, pQueryInfo);
tscIncStreamExecutionCount(pStream);
......@@ -142,6 +142,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
if(pSql == NULL) {
return ;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
tscDebug("0x%"PRIx64" add into timer", pSql->self);
......@@ -186,14 +187,16 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
}
// launch stream computing in a new thread
SSchedMsg schedMsg = { 0 };
schedMsg.fp = tscProcessStreamLaunchQuery;
SSchedMsg schedMsg = {0};
schedMsg.fp = tscProcessStreamLaunchQuery;
schedMsg.ahandle = pStream;
schedMsg.thandle = (void *)1;
schedMsg.msg = NULL;
schedMsg.msg = NULL;
taosScheduleTask(tscQhandle, &schedMsg);
}
static void cbParseSql(void* param, TAOS_RES* res, int code);
static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOfRows) {
SSqlStream *pStream = (SSqlStream *)param;
if (tres == NULL || numOfRows < 0) {
......@@ -201,24 +204,26 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
tscError("0x%"PRIx64" stream:%p, query data failed, code:0x%08x, retry in %" PRId64 "ms", pStream->pSql->self,
pStream, numOfRows, retryDelay);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0);
char name[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pTableMetaInfo->name, name);
taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
tfree(pTableMetaInfo->pTableMeta);
SSqlObj* pSql = pStream->pSql;
tscFreeSqlResult(pStream->pSql);
tscFreeSubobj(pStream->pSql);
tfree(pStream->pSql->pSubs);
pStream->pSql->subState.numOfSub = 0;
tscFreeSqlResult(pSql);
tscFreeSubobj(pSql);
tfree(pSql->pSubs);
pSql->subState.numOfSub = 0;
pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList);
int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_SUCCESS) {
cbParseSql(pStream, pSql, code);
} else if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
tscDebug("0x%"PRIx64" CQ taso_open_stream IN Process", pSql->self);
} else {
tscError("0x%"PRIx64" open stream failed, code:%s", pSql->self, tstrerror(code));
taosReleaseRef(tscObjRef, pSql->self);
free(pStream);
}
tscSetRetryTimer(pStream, pStream->pSql, retryDelay);
return;
// tscSetRetryTimer(pStream, pStream->pSql, retryDelay);
// return;
}
taos_fetch_rows_a(tres, tscProcessStreamRetrieveResult, param);
......@@ -555,7 +560,6 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
if (code != TSDB_CODE_SUCCESS) {
pSql->res.code = code;
tscError("0x%"PRIx64" open stream failed, sql:%s, reason:%s, code:%s", pSql->self, pSql->sqlstr, pCmd->payload, tstrerror(code));
pStream->fp(pStream->param, NULL, NULL);
return;
}
......@@ -582,9 +586,10 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
// set stime with ltime if ltime > stime
const char* dstTable = pStream->dstTable? pStream->dstTable: "";
tscDebug(" CQ table=%s ltime is %"PRId64, dstTable, pStream->ltime);
tscDebug("0x%"PRIx64" CQ table %s ltime is %"PRId64, pSql->self, dstTable, pStream->ltime);
if(pStream->ltime != INT64_MIN && pStream->ltime > pStream->stime) {
tscWarn(" CQ set stream %s stime=%"PRId64" replace with ltime=%"PRId64" if ltime>0 ", dstTable, pStream->stime, pStream->ltime);
tscWarn("0x%"PRIx64" CQ set stream %s stime=%"PRId64" replace with ltime=%"PRId64" if ltime > 0", pSql->self, dstTable, pStream->stime, pStream->ltime);
pStream->stime = pStream->ltime;
}
......@@ -592,7 +597,6 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
pCmd->command = TSDB_SQL_SELECT;
tscAddIntoStreamList(pStream);
taosTmrReset(tscProcessStreamTimer, (int32_t)starttime, pStream, tscTmr, &pStream->pTimer);
tscDebug("0x%"PRIx64" stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql->self,
......@@ -659,10 +663,9 @@ void cbParseSql(void* param, TAOS_RES* res, int code) {
char sql[128] = "";
sprintf(sql, "select last_row(*) from %s;", pStream->dstTable);
taos_query_a(pSql->pTscObj, sql, fpStreamLastRow, param);
return ;
}
TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const char *sqlstr, void (*fp)(void *, TAOS_RES *, TAOS_ROW),
int64_t stime, void *param, void (*callback)(void *), void* cqhandle) {
STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) return NULL;
......@@ -697,14 +700,12 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
pStream->param = param;
pStream->pSql = pSql;
pStream->cqhandle = cqhandle;
pSql->pStream = pStream;
pSql->param = pStream;
pSql->maxRetry = TSDB_MAX_REPLICA;
tscSetStreamDestTable(pStream, dstTable);
pSql->pStream = pStream;
pSql->param = pStream;
pSql->maxRetry = TSDB_MAX_REPLICA;
pSql->sqlstr = calloc(1, strlen(sqlstr) + 1);
if (pSql->sqlstr == NULL) {
tscError("0x%"PRIx64" failed to malloc sql string buffer", pSql->self);
......@@ -725,14 +726,13 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
pSql->fp = cbParseSql;
pSql->fetchFp = cbParseSql;
registerSqlObj(pSql);
int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_SUCCESS) {
cbParseSql(pStream, pSql, code);
} else if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
tscDebug(" CQ taso_open_stream IN Process. sql=%s", sqlstr);
tscDebug("0x%"PRIx64" CQ taso_open_stream IN Process", pSql->self);
} else {
tscError("0x%"PRIx64" open stream failed, sql:%s, code:%s", pSql->self, sqlstr, tstrerror(code));
taosReleaseRef(tscObjRef, pSql->self);
......@@ -743,7 +743,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
return pStream;
}
TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *, TAOS_ROW),
int64_t stime, void *param, void (*callback)(void *)) {
return taos_open_stream_withname(taos, "", sqlstr, fp, stime, param, callback, NULL);
}
......
......@@ -2727,16 +2727,10 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
int32_t code = pParentSql->res.code;
if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && pParentSql->retry < pParentSql->maxRetry) {
// remove the cached tableMeta and vgroup id list, and then parse the sql again
SSqlCmd* pParentCmd = &pParentSql->cmd;
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pParentCmd, 0);
tscRemoveTableMetaBuf(pTableMetaInfo, pParentSql->self);
tscResetSqlCmd( &pParentSql->cmd, true, pParentSql->self);
pParentCmd->pTableMetaMap = tscCleanupTableMetaMap(pParentCmd->pTableMetaMap);
pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pParentSql->res.code = TSDB_CODE_SUCCESS;
pParentSql->retry++;
pParentSql->res.code = TSDB_CODE_SUCCESS;
tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", pParentSql->self,
tstrerror(code), pParentSql->retry);
......@@ -3040,7 +3034,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
assert(code == taos_errno(pSql));
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && (code != TSDB_CODE_TDB_INVALID_TABLE_ID)) {
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && (code != TSDB_CODE_TDB_INVALID_TABLE_ID && code != TSDB_CODE_VND_INVALID_VGROUP_ID)) {
tscError("0x%"PRIx64" sub:0x%"PRIx64" failed code:%s, retry:%d", pParentSql->self, pSql->self, tstrerror(code), trsupport->numOfRetry);
int32_t sent = 0;
......@@ -3151,7 +3145,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
numOfFailed += 1;
// clean up tableMeta in cache
tscFreeQueryInfo(&pSql->cmd, false);
tscFreeQueryInfo(&pSql->cmd, false, pSql->self);
SQueryInfo* pQueryInfo = tscGetQueryInfoS(&pSql->cmd);
STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, 0);
tscAddTableMetaInfo(pQueryInfo, &pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL);
......@@ -3173,7 +3167,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
}
pParentObj->res.code = TSDB_CODE_SUCCESS;
tscResetSqlCmd(&pParentObj->cmd, false);
tscResetSqlCmd(&pParentObj->cmd, false, pParentObj->self);
// in case of insert, redo parsing the sql string and build new submit data block for two reasons:
// 1. the table Id(tid & uid) may have been update, the submit block needs to be updated accordingly.
......
......@@ -1228,11 +1228,9 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
if (pCond && pCond->cond) {
createQueryFilter(pCond->cond, pCond->len, &pFilters);
}
//createInputDataFlterInfo(px, numOfCol1, &numOfFilterCols, &pFilterInfo);
}
SOperatorInfo* pSourceOperator = createDummyInputOperator(pSqlObjList[0], pSchema, numOfCol1, pFilters);
pOutput->precision = pSqlObjList[0]->res.precision;
SSchema* schema = NULL;
......@@ -1332,12 +1330,13 @@ static void tscDestroyResPointerInfo(SSqlRes* pRes) {
pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free
}
void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) {
void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeCachedMeta, uint64_t id) {
if (pCmd == NULL) {
return;
}
SQueryInfo* pQueryInfo = pCmd->pQueryInfo;
while(pQueryInfo != NULL) {
SQueryInfo* p = pQueryInfo->sibling;
......@@ -1346,7 +1345,7 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) {
SQueryInfo* pUpQueryInfo = taosArrayGetP(pQueryInfo->pUpstream, i);
freeQueryInfoImpl(pUpQueryInfo);
clearAllTableMetaInfo(pUpQueryInfo, removeMeta);
clearAllTableMetaInfo(pUpQueryInfo, removeCachedMeta, id);
if (pUpQueryInfo->pQInfo != NULL) {
qDestroyQueryInfo(pUpQueryInfo->pQInfo);
pUpQueryInfo->pQInfo = NULL;
......@@ -1362,7 +1361,7 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) {
}
freeQueryInfoImpl(pQueryInfo);
clearAllTableMetaInfo(pQueryInfo, removeMeta);
clearAllTableMetaInfo(pQueryInfo, removeCachedMeta, id);
if (pQueryInfo->pQInfo != NULL) {
qDestroyQueryInfo(pQueryInfo->pQInfo);
......@@ -1391,7 +1390,7 @@ void destroyTableNameList(SInsertStatementParam* pInsertParam) {
tfree(pInsertParam->pTableNameList);
}
void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta) {
void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta, uint64_t id) {
pCmd->command = 0;
pCmd->numOfCols = 0;
pCmd->count = 0;
......@@ -1405,19 +1404,8 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta) {
tfree(pCmd->insertParam.tagData.data);
pCmd->insertParam.tagData.dataLen = 0;
tscFreeQueryInfo(pCmd, clearCachedMeta);
if (pCmd->pTableMetaMap != NULL) {
STableMetaVgroupInfo* p = taosHashIterate(pCmd->pTableMetaMap, NULL);
while (p) {
taosArrayDestroy(p->vgroupIdList);
tfree(p->pTableMeta);
p = taosHashIterate(pCmd->pTableMetaMap, p);
}
taosHashCleanup(pCmd->pTableMetaMap);
pCmd->pTableMetaMap = NULL;
}
tscFreeQueryInfo(pCmd, clearCachedMeta, id);
pCmd->pTableMetaMap = tscCleanupTableMetaMap(pCmd->pTableMetaMap);
}
void* tscCleanupTableMetaMap(SHashObj* pTableMetaMap) {
......@@ -1513,8 +1501,6 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tscFreeMetaSqlObj(&pSql->metaRid);
tscFreeMetaSqlObj(&pSql->svgroupRid);
tscFreeSubobj(pSql);
SSqlCmd* pCmd = &pSql->cmd;
int32_t cmd = pCmd->command;
if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_GLOBALMERGE || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
......@@ -1522,6 +1508,8 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tscRemoveFromSqlList(pSql);
}
tscFreeSubobj(pSql);
pSql->signature = NULL;
pSql->fp = NULL;
tfree(pSql->sqlstr);
......@@ -1532,7 +1520,7 @@ void tscFreeSqlObj(SSqlObj* pSql) {
pSql->self = 0;
tscFreeSqlResult(pSql);
tscResetSqlCmd(pCmd, false);
tscResetSqlCmd(pCmd, false, pSql->self);
memset(pCmd->payload, 0, (size_t)pCmd->allocSize);
tfree(pCmd->payload);
......@@ -3379,20 +3367,15 @@ SArray* tscVgroupTableInfoDup(SArray* pVgroupTables) {
return pa;
}
void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta) {
void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta, uint64_t id) {
for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
if (removeMeta) {
char name[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pTableMetaInfo->name, name);
taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
tscRemoveCachedTableMeta(pTableMetaInfo, id);
}
tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables);
tscClearTableMetaInfo(pTableMetaInfo);
free(pTableMetaInfo);
}
tfree(pQueryInfo->pTableMetaInfo);
......@@ -3459,10 +3442,12 @@ void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo) {
}
tfree(pTableMetaInfo->pTableMeta);
pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList);
tscColumnListDestroy(pTableMetaInfo->tagColList);
pTableMetaInfo->tagColList = NULL;
free(pTableMetaInfo);
}
void tscResetForNextRetrieve(SSqlRes* pRes) {
......@@ -3860,13 +3845,7 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
// todo refactor
tscDebug("0x%"PRIx64" all subquery response received, retry", pParentSql->self);
SSqlCmd* pParentCmd = &pParentSql->cmd;
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pParentCmd, 0);
tscRemoveTableMetaBuf(pTableMetaInfo, pParentSql->self);
pParentCmd->pTableMetaMap = tscCleanupTableMetaMap(pParentCmd->pTableMetaMap);
pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
tscResetSqlCmd(&pParentSql->cmd, true, pParentSql->self);
pParentSql->res.code = TSDB_CODE_SUCCESS;
pParentSql->retry++;
......@@ -3885,7 +3864,7 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
return;
}
SQueryInfo *pQueryInfo = tscGetQueryInfo(pParentCmd);
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pParentSql->cmd);
executeQuery(pParentSql, pQueryInfo);
return;
}
......@@ -5009,7 +4988,7 @@ SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg) {
return info;
}
void tscRemoveTableMetaBuf(STableMetaInfo* pTableMetaInfo, uint64_t id) {
void tscRemoveCachedTableMeta(STableMetaInfo* pTableMetaInfo, uint64_t id) {
char fname[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pTableMetaInfo->name, fname);
......
......@@ -70,12 +70,11 @@ SColumnFilterInfo* tFilterInfoDup(const SColumnFilterInfo* src, int32_t numOfFil
memcpy(pFilter, src, sizeof(SColumnFilterInfo) * numOfFilters);
for (int32_t j = 0; j < numOfFilters; ++j) {
if (pFilter[j].filterstr) {
size_t len = (size_t) pFilter[j].len + 1 * TSDB_NCHAR_SIZE;
pFilter[j].pz = (int64_t) calloc(1, len);
memcpy((char*)pFilter[j].pz, (char*)src[j].pz, (size_t)len);
memcpy((char*)pFilter[j].pz, (char*)src[j].pz, (size_t) pFilter[j].len);
}
}
......
Subproject commit a44ec1ca493ad01b2bf825b6418f69e11f548206
Subproject commit 4a4d79099b076b8ff12d5b4fdbcba54049a6866d
......@@ -879,7 +879,7 @@ typedef struct {
uint64_t sqlObjId;
int32_t pid;
char fqdn[TSDB_FQDN_LEN];
bool stableQuery;
uint8_t stableQuery;
int32_t numOfSub;
char subSqlInfo[TSDB_SHOW_SUBQUERY_LEN]; //include subqueries' index, Obj IDs and states(C-complete/I-imcomplete)
} SQueryDesc;
......
......@@ -335,6 +335,7 @@ enum OPERATOR_TYPE_E {
OP_StateWindow = 22,
OP_AllTimeWindow = 23,
OP_AllMultiTableTimeInterval = 24,
OP_Order = 25,
};
typedef struct SOperatorInfo {
......@@ -422,7 +423,6 @@ typedef struct STableScanInfo {
int32_t *rowCellInfoOffset;
SExprInfo *pExpr;
SSDataBlock block;
bool loadExternalRows; // load external rows (prev & next rows)
int32_t numOfOutput;
int64_t elapsedTime;
......@@ -513,7 +513,7 @@ typedef struct SStateWindowOperatorInfo {
int32_t start;
char* prevData; // previous data
bool reptScan;
} SStateWindowOperatorInfo ;
} SStateWindowOperatorInfo;
typedef struct SDistinctOperatorInfo {
SHashObj *pSet;
......@@ -546,6 +546,13 @@ typedef struct SMultiwayMergeInfo {
SArray *udfInfo;
} SMultiwayMergeInfo;
// todo support the disk-based sort
typedef struct SOrderOperatorInfo {
int32_t colIndex;
int32_t order;
SSDataBlock *pDataBlock;
} SOrderOperatorInfo;
void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream);
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
......@@ -575,6 +582,7 @@ SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter);
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput);
SOperatorInfo* createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal);
SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
......
......@@ -227,6 +227,8 @@ typedef int (*__col_compar_fn_t)(tOrderDescriptor *, int32_t numOfRows, int32_t
void tColDataQSort(tOrderDescriptor *, int32_t numOfRows, int32_t start, int32_t end, char *data, int32_t orderType);
void taoscQSort(void** pCols, SSchema* pSchema, int32_t numOfCols, int32_t numOfRows, int32_t index, __compar_fn_t compareFn);
int32_t compare_sa(tOrderDescriptor *, int32_t numOfRows, int32_t idx1, int32_t idx2, char *data);
int32_t compare_sd(tOrderDescriptor *, int32_t numOfRows, int32_t idx1, int32_t idx2, char *data);
......
......@@ -16,7 +16,6 @@
#include "qFill.h"
#include "taosmsg.h"
#include "tglobal.h"
#include "talgo.h"
#include "exception.h"
#include "hash.h"
......@@ -224,6 +223,7 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput);
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput);
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput);
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput);
static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput);
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
......@@ -242,8 +242,7 @@ static void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
static void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr);
static void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes);
static void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo,
SQLFunctionCtx* pCtx, int32_t* rowCellInfoOffset, int32_t numOfOutput,
int32_t groupIndex);
SQLFunctionCtx* pCtx, int32_t* rowCellInfoOffset, int32_t numOfOutput, int32_t tableGroupId);
SArray* getOrderCheckColumns(SQueryAttr* pQuery);
......@@ -963,8 +962,6 @@ void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t
break;
}
}
return;
}
static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset,
......@@ -1622,12 +1619,12 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
}
startPos = pSDataBlock->info.rows - 1;
// window start(end) key interpolation
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &win, startPos, forwardStep);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput);
}
break;
}
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
......@@ -2213,6 +2210,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
}
break;
}
case OP_StateWindow: {
pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
......@@ -2229,24 +2227,20 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case OP_Filter: { // todo refactor
int32_t numOfFilterCols = 0;
// if (pQueryAttr->numOfFilterCols > 0) {
// pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1,
// pQueryAttr->numOfOutput, pQueryAttr->tableCols, pQueryAttr->numOfFilterCols);
// } else {
if (pQueryAttr->stableQuery) {
SColumnInfo* pColInfo =
extractColumnFilterInfo(pQueryAttr->pExpr3, pQueryAttr->numOfExpr3, &numOfFilterCols);
pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
pQueryAttr->numOfExpr3, pColInfo, numOfFilterCols);
freeColumnInfo(pColInfo, pQueryAttr->numOfExpr3);
} else {
SColumnInfo* pColInfo =
extractColumnFilterInfo(pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &numOfFilterCols);
pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1,
pQueryAttr->numOfOutput, pColInfo, numOfFilterCols);
freeColumnInfo(pColInfo, pQueryAttr->numOfOutput);
}
// }
if (pQueryAttr->stableQuery) {
SColumnInfo* pColInfo =
extractColumnFilterInfo(pQueryAttr->pExpr3, pQueryAttr->numOfExpr3, &numOfFilterCols);
pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
pQueryAttr->numOfExpr3, pColInfo, numOfFilterCols);
freeColumnInfo(pColInfo, pQueryAttr->numOfExpr3);
} else {
SColumnInfo* pColInfo =
extractColumnFilterInfo(pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &numOfFilterCols);
pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1,
pQueryAttr->numOfOutput, pColInfo, numOfFilterCols);
freeColumnInfo(pColInfo, pQueryAttr->numOfOutput);
}
break;
}
......@@ -2258,11 +2252,12 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case OP_MultiwayMergeSort: {
bool groupMix = true;
if(pQueryAttr->slimit.offset != 0 || pQueryAttr->slimit.limit != -1) {
if (pQueryAttr->slimit.offset != 0 || pQueryAttr->slimit.limit != -1) {
groupMix = false;
}
pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput,
4096, merger, groupMix); // TODO hack it
4096, merger, groupMix); // TODO hack it
break;
}
......@@ -2283,6 +2278,11 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
break;
}
case OP_Order: {
pRuntimeEnv->proot = createOrderOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &pQueryAttr->order);
break;
}
default: {
assert(0);
}
......@@ -3040,7 +3040,7 @@ void doSetFilterColInfo(SFilterInfo * pFilters, SSDataBlock* pBlock) {
int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
uint32_t* status) {
*status = BLK_DATA_NO_NEEDED;
pBlock->pDataBlock = NULL;
pBlock->pDataBlock = NULL;
pBlock->pBlockStatis = NULL;
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
......@@ -3050,6 +3050,9 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
SQInfo* pQInfo = pRuntimeEnv->qinfo;
SQueryCostInfo* pCost = &pQInfo->summary;
pCost->totalBlocks += 1;
pCost->totalRows += pBlock->info.rows;
if (pRuntimeEnv->pTsBuf != NULL) {
(*status) = BLK_DATA_ALL_NEEDED;
......@@ -3081,7 +3084,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
// check if this data block is required to load
if ((*status) != BLK_DATA_ALL_NEEDED) {
bool needFilter = true;
// the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
// the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
......@@ -3488,12 +3491,11 @@ void copyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, int32_t threshold, SSDataBl
}
}
// enough results in data buffer, return
if (pBlock->info.rows >= threshold) {
break;
}
// enough results in data buffer, return
if (pBlock->info.rows >= threshold) {
break;
}
}
}
static void updateTableQueryInfoForReverseScan(STableQueryInfo *pTableQueryInfo) {
......@@ -5399,6 +5401,114 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx
return pOperator;
}
static int32_t doMergeSDatablock(SSDataBlock* pDest, SSDataBlock* pSrc) {
assert(pSrc != NULL && pDest != NULL && pDest->info.numOfCols == pSrc->info.numOfCols);
int32_t numOfCols = pSrc->info.numOfCols;
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i);
int32_t newSize = (pDest->info.rows + pSrc->info.rows) * pCol2->info.bytes;
char* tmp = realloc(pCol2->pData, newSize);
if (tmp != NULL) {
pCol2->pData = tmp;
int32_t offset = pCol2->info.bytes * pDest->info.rows;
memcpy(pCol2->pData + offset, pCol1->pData, pSrc->info.rows * pCol2->info.bytes);
} else {
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
}
pDest->info.rows += pSrc->info.rows;
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* doSort(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SOrderOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pBlock = NULL;
while(1) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
// start to flush data into disk and try do multiway merge sort
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
break;
}
int32_t code = doMergeSDatablock(pInfo->pDataBlock, pBlock);
if (code != TSDB_CODE_SUCCESS) {
// todo handle error
}
}
int32_t numOfCols = pInfo->pDataBlock->info.numOfCols;
void** pCols = calloc(numOfCols, POINTER_BYTES);
SSchema* pSchema = calloc(numOfCols, sizeof(SSchema));
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p1 = taosArrayGet(pInfo->pDataBlock->pDataBlock, i);
pCols[i] = p1->pData;
pSchema[i].colId = p1->info.colId;
pSchema[i].bytes = p1->info.bytes;
pSchema[i].type = (uint8_t) p1->info.type;
}
__compar_fn_t comp = getKeyComparFunc(pSchema[pInfo->colIndex].type, pInfo->order);
taoscQSort(pCols, pSchema, numOfCols, pInfo->pDataBlock->info.rows, pInfo->colIndex, comp);
tfree(pCols);
tfree(pSchema);
return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL;
}
SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal) {
SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo));
{
SSDataBlock* pDataBlock = calloc(1, sizeof(SSDataBlock));
pDataBlock->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData));
for(int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData col = {{0}};
col.info.colId = pExpr[i].base.colInfo.colId;
col.info.bytes = pExpr[i].base.colBytes;
col.info.type = pExpr[i].base.colType;
taosArrayPush(pDataBlock->pDataBlock, &col);
if (col.info.colId == pOrderVal->orderColId) {
pInfo->colIndex = i;
}
}
pDataBlock->info.numOfCols = numOfOutput;
pInfo->order = pOrderVal->order;
pInfo->pDataBlock = pDataBlock;
}
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "InMemoryOrder";
pOperator->operatorType = OP_Order;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
pOperator->exec = doSort;
pOperator->cleanup = destroyOrderOperatorInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
appendUpstream(pOperator, upstream);
return pOperator;
}
static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) {
return pTableScanInfo->order;
}
......@@ -5838,11 +5948,15 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->status == OP_RES_TO_RETURN) {
int64_t st = taosGetTimestampUs();
copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
SQInfo* pQInfo = pRuntimeEnv->qinfo;
pQInfo->summary.firstStageMergeTime += (taosGetTimestampUs() - st);
return pIntervalInfo->pRes;
}
......@@ -5930,17 +6044,18 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) {
doCloseAllTimeWindow(pRuntimeEnv);
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
int64_t st = taosGetTimestampUs();
copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
SQInfo* pQInfo = pRuntimeEnv->qinfo;
pQInfo->summary.firstStageMergeTime += (taosGetTimestampUs() - st);
return pIntervalInfo->pRes;
}
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STableQueryInfo* item = pRuntimeEnv->current;
......@@ -6073,6 +6188,7 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
return pBInfo->pRes->info.rows == 0? NULL:pBInfo->pRes;
}
static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
......@@ -6399,6 +6515,11 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
}
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param;
pInfo->pDataBlock = destroyOutputBuf(pInfo->pDataBlock);
}
static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) {
SFilterOperatorInfo* pInfo = (SFilterOperatorInfo*) param;
doDestroyFilterInfo(pInfo->pFilterInfo, pInfo->numOfFilterCols);
......@@ -6747,7 +6868,6 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doFill;
pOperator->cleanup = destroySFillOperatorInfo;
......@@ -6956,11 +7076,9 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
return NULL;
}
SDistinctOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pRes = pInfo->pRes;
pRes->info.rows = 0;
SSDataBlock* pBlock = NULL;
while(1) {
......
......@@ -12,7 +12,6 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "qExtbuffer.h"
#include "os.h"
#include "qAggMain.h"
#include "queryLog.h"
......@@ -21,6 +20,8 @@
#include "taosmsg.h"
#include "tulog.h"
#include "qExecutor.h"
#include "qExtbuffer.h"
#include "tcompare.h"
#define COLMODEL_GET_VAL(data, schema, allrow, rowId, colId) \
(data + (schema)->pFields[colId].offset * (allrow) + (rowId) * (schema)->pFields[colId].field.bytes)
......@@ -767,6 +768,60 @@ void tColDataQSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta
free(buf);
}
void taoscQSort(void** pCols, SSchema* pSchema, int32_t numOfCols, int32_t numOfRows, int32_t index, __compar_fn_t compareFn) {
assert(numOfRows > 0 && numOfCols > 0 && index >= 0 && index < numOfCols);
int32_t bytes = pSchema[index].bytes;
int32_t size = bytes + sizeof(int32_t);
char* buf = calloc(1, size * numOfRows);
for(int32_t i = 0; i < numOfRows; ++i) {
char* dest = buf + size * i;
memcpy(dest, ((char*)pCols[index]) + bytes * i, bytes);
*(int32_t*)(dest+bytes) = i;
}
qsort(buf, numOfRows, size, compareFn);
int32_t prevLength = 0;
char* p = NULL;
for(int32_t i = 0; i < numOfCols; ++i) {
int32_t bytes1 = pSchema[i].bytes;
if (i == index) {
for(int32_t j = 0; j < numOfRows; ++j){
char* src = buf + (j * size);
char* dest = (char*) pCols[i] + (j * bytes1);
memcpy(dest, src, bytes1);
}
} else {
// make sure memory buffer is enough
if (prevLength < bytes1) {
char *tmp = realloc(p, bytes1 * numOfRows);
assert(tmp);
p = tmp;
prevLength = bytes1;
}
memcpy(p, pCols[i], bytes1 * numOfRows);
for(int32_t j = 0; j < numOfRows; ++j){
char* dest = (char*) pCols[i] + bytes1 * j;
int32_t newPos = *(int32_t*)(buf + (j * size) + bytes);
char* src = p + (newPos * bytes1);
memcpy(dest, src, bytes1);
}
}
}
tfree(buf);
tfree(p);
}
/*
* deep copy of sschema
*/
......
......@@ -237,7 +237,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
}
pBucket->elemPerPage = (pBucket->bufPageSize - sizeof(tFilePage))/pBucket->bytes;
pBucket->comparFn = getKeyComparFunc(pBucket->type);
pBucket->comparFn = getKeyComparFunc(pBucket->type, TSDB_ORDER_ASC);
pBucket->hashFunc = getHashFunc(pBucket->type);
if (pBucket->hashFunc == NULL) {
......
......@@ -557,10 +557,9 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
int32_t op = 0;
if (onlyQueryTags(pQueryAttr)) { // do nothing for tags query
if (onlyQueryTags(pQueryAttr)) {
op = OP_TagScan;
taosArrayPush(plan, &op);
}
op = OP_TagScan;
taosArrayPush(plan, &op);
if (pQueryAttr->distinct) {
op = OP_Distinct;
taosArrayPush(plan, &op);
......@@ -651,8 +650,14 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
taosArrayPush(plan, &op);
}
}
// outer query order by support
int32_t orderColId = pQueryAttr->order.orderColId;
if (pQueryAttr->vgId == 0 && orderColId != PRIMARYKEY_TIMESTAMP_COL_INDEX && orderColId != INT32_MIN) {
op = OP_Order;
taosArrayPush(plan, &op);
}
}
if (pQueryAttr->limit.limit > 0 || pQueryAttr->limit.offset > 0) {
op = OP_Limit;
......
......@@ -215,7 +215,6 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
return code;
}
bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
SQInfo *pQInfo = (SQInfo *)qinfo;
assert(pQInfo && pQInfo->signature == pQInfo);
......@@ -256,7 +255,11 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
bool newgroup = false;
publishOperatorProfEvent(pRuntimeEnv->proot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
int64_t st = taosGetTimestampUs();
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot, &newgroup);
pQInfo->summary.elapsedTime += (taosGetTimestampUs() - st);
publishOperatorProfEvent(pRuntimeEnv->proot, QUERY_PROF_AFTER_OPERATOR_EXEC);
pRuntimeEnv->resultInfo.total += GET_NUM_OF_RESULTS(pRuntimeEnv);
......
#include <gtest/gtest.h>
#include <iostream>
#include "taos.h"
#include "tsdb.h"
#include "qExtbuffer.h"
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
namespace {
int32_t comp(const void* p1, const void* p2) {
int32_t* x1 = (int32_t*) p1;
int32_t* x2 = (int32_t*) p2;
if (*x1 == *x2) {
return 0;
} else {
return (*x1 > *x2)? 1:-1;
}
}
int32_t comp1(const void* p1, const void* p2) {
int32_t ret = strncmp((char*) p1, (char*) p2, 20);
if (ret == 0) {
return 0;
} else {
return ret > 0 ? 1:-1;
}
}
}
TEST(testCase, colunmnwise_sort_test) {
// void taoscQSort(void** pCols, SSchema* pSchema, int32_t numOfCols, int32_t numOfRows, int32_t index, __compar_fn_t compareFn)
void* pCols[2] = {0};
SSchema s[2] = {{0}};
s[0].type = TSDB_DATA_TYPE_INT;
s[0].bytes = 4;
s[0].colId = 0;
strcpy(s[0].name, "col1");
s[1].type = TSDB_DATA_TYPE_BINARY;
s[1].bytes = 20;
s[1].colId = 1;
strcpy(s[1].name, "col2");
int32_t* p = (int32_t*) calloc(5, sizeof(int32_t));
p[0] = 12;
p[1] = 8;
p[2] = 99;
p[3] = 7;
p[4] = 1;
char* t1 = (char*) calloc(5, 20);
strcpy(t1, "abc");
strcpy(t1 + 20, "def");
strcpy(t1 + 40, "xyz");
strcpy(t1 + 60, "klm");
strcpy(t1 + 80, "hij");
pCols[0] = (char*) p;
pCols[1] = (char*) t1;
taoscQSort(reinterpret_cast<void**>(pCols), s, 2, 5, 0, comp);
int32_t* px = (int32_t*) pCols[0];
ASSERT_EQ(px[0], 1);
ASSERT_EQ(px[1], 7);
ASSERT_EQ(px[2], 8);
ASSERT_EQ(px[3], 12);
ASSERT_EQ(px[4], 99);
char* px1 = (char*) pCols[1];
ASSERT_STRCASEEQ(px1 + 20 * 0, "hij");
ASSERT_STRCASEEQ(px1 + 20 * 1, "klm");
ASSERT_STRCASEEQ(px1 + 20 * 2, "def");
ASSERT_STRCASEEQ(px1 + 20 * 3, "abc");
ASSERT_STRCASEEQ(px1 + 20 * 4, "xyz");
taoscQSort(pCols, s, 2, 5, 1, comp1);
px = (int32_t*) pCols[0];
ASSERT_EQ(px[0], 12);
ASSERT_EQ(px[1], 8);
ASSERT_EQ(px[2], 1);
ASSERT_EQ(px[3], 7);
ASSERT_EQ(px[4], 99);
px1 = (char*) pCols[1];
ASSERT_STRCASEEQ(px1 + 20 * 0, "abc");
ASSERT_STRCASEEQ(px1 + 20 * 1, "def");
ASSERT_STRCASEEQ(px1 + 20 * 2, "hij");
ASSERT_STRCASEEQ(px1 + 20 * 3, "klm");
ASSERT_STRCASEEQ(px1 + 20 * 4, "xyz");
}
TEST(testCase, columnsort_test) {
SSchema field[1] = {
{TSDB_DATA_TYPE_INT, "k", sizeof(int32_t)},
};
const int32_t num = 2000;
int32_t *d = (int32_t *)malloc(sizeof(int32_t) * num);
for (int32_t i = 0; i < num; ++i) {
d[i] = i % 4;
}
const int32_t numOfOrderCols = 1;
int32_t orderColIdx = 0;
SColumnModel *pModel = createColumnModel(field, 1, 1000);
tOrderDescriptor *pDesc = tOrderDesCreate(&orderColIdx, numOfOrderCols, pModel, 1);
tColDataQSort(pDesc, num, 0, num - 1, (char *)d, 1);
for (int32_t i = 0; i < num; ++i) {
printf("%d\t", d[i]);
}
printf("\n");
destroyColumnModel(pModel);
}
\ No newline at end of file
#include "os.h"
#include <gtest/gtest.h>
#include <cassert>
#include <iostream>
#include "taos.h"
......
......@@ -47,7 +47,7 @@ int WCSPatternMatch(const wchar_t *pattern, const wchar_t *str, size_t size, con
int32_t doCompare(const char* a, const char* b, int32_t type, size_t size);
__compar_fn_t getKeyComparFunc(int32_t keyType);
__compar_fn_t getKeyComparFunc(int32_t keyType, int32_t order);
__compar_fn_t getComparFunc(int32_t type, int32_t optr);
......
......@@ -16,53 +16,68 @@
#include "os.h"
#include "ttype.h"
#include "tcompare.h"
#include "tarray.h"
#include "hash.h"
int32_t setCompareBytes1(const void *pLeft, const void *pRight) {
return NULL != taosHashGet((SHashObj *)pRight, pLeft, 1) ? 1 : 0;
return NULL != taosHashGet((SHashObj *)pRight, pLeft, 1) ? 1 : 0;
}
int32_t setCompareBytes2(const void *pLeft, const void *pRight) {
return NULL != taosHashGet((SHashObj *)pRight, pLeft, 2) ? 1 : 0;
return NULL != taosHashGet((SHashObj *)pRight, pLeft, 2) ? 1 : 0;
}
int32_t setCompareBytes4(const void *pLeft, const void *pRight) {
return NULL != taosHashGet((SHashObj *)pRight, pLeft, 4) ? 1 : 0;
return NULL != taosHashGet((SHashObj *)pRight, pLeft, 4) ? 1 : 0;
}
int32_t setCompareBytes8(const void *pLeft, const void *pRight) {
return NULL != taosHashGet((SHashObj *)pRight, pLeft, 8) ? 1 : 0;
return NULL != taosHashGet((SHashObj *)pRight, pLeft, 8) ? 1 : 0;
}
int32_t compareInt32Val(const void *pLeft, const void *pRight) {
int32_t left = GET_INT32_VAL(pLeft), right = GET_INT32_VAL(pRight);
int32_t compareInt8Val(const void *pLeft, const void *pRight) {
int8_t left = GET_INT8_VAL(pLeft), right = GET_INT8_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
return 0;
}
int32_t compareInt64Val(const void *pLeft, const void *pRight) {
int64_t left = GET_INT64_VAL(pLeft), right = GET_INT64_VAL(pRight);
int32_t compareInt8ValDesc(const void *pLeft, const void *pRight) {
return compareInt8Val(pRight, pLeft);
}
int32_t compareInt16Val(const void *pLeft, const void *pRight) {
int16_t left = GET_INT16_VAL(pLeft), right = GET_INT16_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
return 0;
}
int32_t compareInt16Val(const void *pLeft, const void *pRight) {
int16_t left = GET_INT16_VAL(pLeft), right = GET_INT16_VAL(pRight);
int32_t compareInt16ValDesc(const void* pLeft, const void* pRight) {
return compareInt16Val(pRight, pLeft);
}
int32_t compareInt32Val(const void *pLeft, const void *pRight) {
int32_t left = GET_INT32_VAL(pLeft), right = GET_INT32_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
return 0;
}
int32_t compareInt8Val(const void *pLeft, const void *pRight) {
int8_t left = GET_INT8_VAL(pLeft), right = GET_INT8_VAL(pRight);
int32_t compareInt32ValDesc(const void* pLeft, const void* pRight) {
return compareInt32Val(pRight, pLeft);
}
int32_t compareInt64Val(const void *pLeft, const void *pRight) {
int64_t left = GET_INT64_VAL(pLeft), right = GET_INT64_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
return 0;
}
int32_t compareInt64ValDesc(const void* pLeft, const void* pRight) {
return compareInt64Val(pRight, pLeft);
}
int32_t compareUint32Val(const void *pLeft, const void *pRight) {
uint32_t left = GET_UINT32_VAL(pLeft), right = GET_UINT32_VAL(pRight);
if (left > right) return 1;
......@@ -70,6 +85,10 @@ int32_t compareUint32Val(const void *pLeft, const void *pRight) {
return 0;
}
int32_t compareUint32ValDesc(const void* pLeft, const void* pRight) {
return compareUint32Val(pRight, pLeft);
}
int32_t compareUint64Val(const void *pLeft, const void *pRight) {
uint64_t left = GET_UINT64_VAL(pLeft), right = GET_UINT64_VAL(pRight);
if (left > right) return 1;
......@@ -77,6 +96,10 @@ int32_t compareUint64Val(const void *pLeft, const void *pRight) {
return 0;
}
int32_t compareUint64ValDesc(const void* pLeft, const void* pRight) {
return compareUint64Val(pRight, pLeft);
}
int32_t compareUint16Val(const void *pLeft, const void *pRight) {
uint16_t left = GET_UINT16_VAL(pLeft), right = GET_UINT16_VAL(pRight);
if (left > right) return 1;
......@@ -84,6 +107,10 @@ int32_t compareUint16Val(const void *pLeft, const void *pRight) {
return 0;
}
int32_t compareUint16ValDesc(const void* pLeft, const void* pRight) {
return compareUint16Val(pRight, pLeft);
}
int32_t compareUint8Val(const void* pLeft, const void* pRight) {
uint8_t left = GET_UINT8_VAL(pLeft), right = GET_UINT8_VAL(pRight);
if (left > right) return 1;
......@@ -91,6 +118,10 @@ int32_t compareUint8Val(const void* pLeft, const void* pRight) {
return 0;
}
int32_t compareUint8ValDesc(const void* pLeft, const void* pRight) {
return compareUint8Val(pRight, pLeft);
}
int32_t compareFloatVal(const void *pLeft, const void *pRight) {
float p1 = GET_FLOAT_VAL(pLeft);
float p2 = GET_FLOAT_VAL(pRight);
......@@ -112,6 +143,10 @@ int32_t compareFloatVal(const void *pLeft, const void *pRight) {
return FLT_GREATER(p1, p2) ? 1: -1;
}
int32_t compareFloatValDesc(const void* pLeft, const void* pRight) {
return compareFloatVal(pRight, pLeft);
}
int32_t compareDoubleVal(const void *pLeft, const void *pRight) {
double p1 = GET_DOUBLE_VAL(pLeft);
double p2 = GET_DOUBLE_VAL(pRight);
......@@ -133,6 +168,10 @@ int32_t compareDoubleVal(const void *pLeft, const void *pRight) {
return FLT_GREATER(p1, p2) ? 1: -1;
}
int32_t compareDoubleValDesc(const void* pLeft, const void* pRight) {
return compareDoubleVal(pRight, pLeft);
}
int32_t compareLenPrefixedStr(const void *pLeft, const void *pRight) {
int32_t len1 = varDataLen(pLeft);
int32_t len2 = varDataLen(pRight);
......@@ -149,6 +188,10 @@ int32_t compareLenPrefixedStr(const void *pLeft, const void *pRight) {
}
}
int32_t compareLenPrefixedStrDesc(const void* pLeft, const void* pRight) {
return compareLenPrefixedStr(pRight, pLeft);
}
int32_t compareLenPrefixedWStr(const void *pLeft, const void *pRight) {
int32_t len1 = varDataLen(pLeft);
int32_t len2 = varDataLen(pRight);
......@@ -165,6 +208,10 @@ int32_t compareLenPrefixedWStr(const void *pLeft, const void *pRight) {
}
}
int32_t compareLenPrefixedWStrDesc(const void* pLeft, const void* pRight) {
return compareLenPrefixedWStr(pRight, pLeft);
}
/*
* Compare two strings
* TSDB_MATCH: Match
......@@ -303,10 +350,6 @@ int32_t taosArrayCompareString(const void* a, const void* b) {
return compareLenPrefixedStr(x, y);
}
//static int32_t compareFindStrInArray(const void* pLeft, const void* pRight) {
// const SArray* arr = (const SArray*) pRight;
// return taosArraySearchString(arr, pLeft, taosArrayCompareString, TD_EQ) == NULL ? 0 : 1;
//}
int32_t compareFindItemInSet(const void *pLeft, const void* pRight) {
return NULL != taosHashGet((SHashObj *)pRight, varDataVal(pLeft), varDataLen(pLeft)) ? 1 : 0;
}
......@@ -330,26 +373,26 @@ __compar_fn_t getComparFunc(int32_t type, int32_t optr) {
if (optr == TSDB_RELATION_IN && (type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR)) {
switch (type) {
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_UTINYINT:
return setCompareBytes1;
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_USMALLINT:
return setCompareBytes2;
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_FLOAT:
return setCompareBytes4;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_TIMESTAMP:
return setCompareBytes8;
default:
assert(0);
}
}
switch (type) {
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT: comparFn = compareInt8Val; break;
......@@ -395,50 +438,50 @@ __compar_fn_t getComparFunc(int32_t type, int32_t optr) {
return comparFn;
}
__compar_fn_t getKeyComparFunc(int32_t keyType) {
__compar_fn_t getKeyComparFunc(int32_t keyType, int32_t order) {
__compar_fn_t comparFn = NULL;
switch (keyType) {
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_BOOL:
comparFn = compareInt8Val;
comparFn = (order == TSDB_ORDER_ASC)? compareInt8Val:compareInt8ValDesc;
break;
case TSDB_DATA_TYPE_SMALLINT:
comparFn = compareInt16Val;
comparFn = (order == TSDB_ORDER_ASC)? compareInt16Val:compareInt16ValDesc;
break;
case TSDB_DATA_TYPE_INT:
comparFn = compareInt32Val;
comparFn = (order == TSDB_ORDER_ASC)? compareInt32Val:compareInt32ValDesc;
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP:
comparFn = compareInt64Val;
comparFn = (order == TSDB_ORDER_ASC)? compareInt64Val:compareInt64ValDesc;
break;
case TSDB_DATA_TYPE_FLOAT:
comparFn = compareFloatVal;
comparFn = (order == TSDB_ORDER_ASC)? compareFloatVal:compareFloatValDesc;
break;
case TSDB_DATA_TYPE_DOUBLE:
comparFn = compareDoubleVal;
comparFn = (order == TSDB_ORDER_ASC)? compareDoubleVal:compareDoubleValDesc;
break;
case TSDB_DATA_TYPE_UTINYINT:
comparFn = compareUint8Val;
comparFn = (order == TSDB_ORDER_ASC)? compareUint8Val:compareUint8ValDesc;
break;
case TSDB_DATA_TYPE_USMALLINT:
comparFn = compareUint16Val;
comparFn = (order == TSDB_ORDER_ASC)? compareUint16Val:compareUint16ValDesc;
break;
case TSDB_DATA_TYPE_UINT:
comparFn = compareUint32Val;
comparFn = (order == TSDB_ORDER_ASC)? compareUint32Val:compareUint32ValDesc;
break;
case TSDB_DATA_TYPE_UBIGINT:
comparFn = compareUint64Val;
comparFn = (order == TSDB_ORDER_ASC)? compareUint64Val:compareUint64ValDesc;
break;
case TSDB_DATA_TYPE_BINARY:
comparFn = compareLenPrefixedStr;
comparFn = (order == TSDB_ORDER_ASC)? compareLenPrefixedStr:compareLenPrefixedStrDesc;
break;
case TSDB_DATA_TYPE_NCHAR:
comparFn = compareLenPrefixedWStr;
comparFn = (order == TSDB_ORDER_ASC)? compareLenPrefixedWStr:compareLenPrefixedWStrDesc;
break;
default:
comparFn = compareInt32Val;
comparFn = (order == TSDB_ORDER_ASC)? compareInt32Val:compareInt32ValDesc;
break;
}
......
......@@ -54,7 +54,7 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, _
pSkipList->keyFn = fn;
pSkipList->seed = rand();
if (comparFn == NULL) {
pSkipList->comparFn = getKeyComparFunc(keyType);
pSkipList->comparFn = getKeyComparFunc(keyType, TSDB_ORDER_ASC);
} else {
pSkipList->comparFn = comparFn;
}
......
......@@ -287,13 +287,9 @@ class TDTestCase:
tdLog.info(len(sql))
tdSql.error(sql)
endTime = time.time()
print("total time %ds" % (endTime - startTime))
os.system("rm -rf query/long_where_query.py.sql")
#os.system("rm -rf query/long_where_query.py.sql")
def stop(self):
......
......@@ -30,7 +30,7 @@ sql create dnode $hostname2
system sh/exec.sh -n dnode2 -s start
sql create dnode $hostname3
system sh/exec.sh -n dnode3 -s start
sleep 3000
sleep 5000
sql show dnodes
print dnode1 $data5_1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册