diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index b99a8a46d0874ee57d338389bde864fc1c9ae514..f687d7f244a42e255817a278e6169ce294788a38 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -176,8 +176,6 @@ SColumn* tscColumnListInsert(SArray* pColList, SColumnIndex* colIndex); SArray* tscColumnListClone(const SArray* src, int16_t tableIndex); void tscColumnListDestroy(SArray* pColList); -SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters); - int32_t tscValidateName(SSQLToken* pToken); void tscIncStreamExecutionCount(void* pStream); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 6ce94d5aa4d10688d7f92dfe2c9ccbfc8852bc9d..c8754e5bebd7a460ed3e33a5e56a1e535dbf7035 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -32,8 +32,8 @@ extern "C" { #include "qExecutor.h" #include "qsqlparser.h" -#include "qsqltype.h" #include "qtsbuf.h" +#include "tcmdtype.h" // forward declaration struct SSqlInfo; @@ -395,7 +395,6 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void *param, void **taos); void waitForQueryRsp(void *param, TAOS_RES *tres, int code) ; -int doAsyncParseSql(SSqlObj* pSql); void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, void (*fp)(), void *param, const char *sqlstr, size_t sqlLen); void tscProcessMultiVnodesImportFromFile(SSqlObj *pSql); @@ -403,13 +402,14 @@ void tscKillSTableQuery(SSqlObj *pSql); void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen); bool tscIsUpdateQuery(SSqlObj* pSql); bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes); + +// todo remove this function. bool tscResultsetFetchCompleted(TAOS_RES *result); char *tscGetErrorMsgPayload(SSqlCmd *pCmd); int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql); -void tscQueueAsyncFreeResult(SSqlObj *pSql); int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo); void tscGetResultColumnChr(SSqlRes *pRes, SFieldInfo* pFieldInfo, int32_t column); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 2de45bcc6e0a0be3f41f492d0a0b760c7585d3a4..100c9029a78cb2d74a13c6d84c7c85f34197b0e0 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -213,27 +213,34 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi // handle the sub queries of join query if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) { tscFetchDatablockFromSubquery(pSql); - } else if (pRes->completed && pCmd->command == TSDB_SQL_FETCH) { - if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes. - tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode); - return; - } else { - /* + } else if (pRes->completed) { + if(pCmd->command == TSDB_SQL_FETCH) { + if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes. + tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode); + return; + } else { + /* * all available virtual node has been checked already, now we need to check * for the next subclause queries - */ - if (pCmd->clauseIndex < pCmd->numOfClause - 1) { - tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode); - return; - } - - /* + */ + if (pCmd->clauseIndex < pCmd->numOfClause - 1) { + tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode); + return; + } + + /* * 1. has reach the limitation * 2. no remain virtual nodes to be retrieved anymore - */ + */ + (*pSql->fetchFp)(param, pSql, 0); + } + return; + } else if (pCmd->command == TSDB_SQL_RETRIEVE) { + // in case of show command, return no data (*pSql->fetchFp)(param, pSql, 0); + } else { + assert(0); } - return; } else { // current query is not completed, continue retrieve from node if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) { pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; @@ -405,17 +412,6 @@ void tscProcessAsyncFree(SSchedMsg *pMsg) { taos_free_result(pSql); } -void tscQueueAsyncFreeResult(SSqlObj *pSql) { - tscDebug("%p sqlObj put in queue to async free", pSql); - - SSchedMsg schedMsg = { 0 }; - schedMsg.fp = tscProcessAsyncFree; - schedMsg.ahandle = pSql; - schedMsg.thandle = (void *)1; - schedMsg.msg = NULL; - taosScheduleTask(tscQhandle, &schedMsg); -} - int tscSendMsgToServer(SSqlObj *pSql); void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 9f557f5529e03ee504aea2c30cc6bfac113a06aa..d90a19da279170c89c180d46f2ed69828d788678 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -4452,6 +4452,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { tVariantList* pVarList = pAlterSQL->varList; tVariant* pTagName = &pVarList->a[0].pVar; + int16_t numOfTags = tscGetNumOfTags(pTableMeta); SColumnIndex columnIndex = COLUMN_INDEX_INITIALIZER; SSQLToken name = {.type = TK_STRING, .z = pTagName->pz, .n = pTagName->nLen}; @@ -4475,8 +4476,10 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { (pVarList->a[1].pVar.nLen + VARSTR_HEADER_SIZE) > pTagsSchema->bytes) { return invalidSqlErrMsg(pQueryInfo->msg, msg14); } - - int32_t size = sizeof(SUpdateTableTagValMsg) + pTagsSchema->bytes + TSDB_EXTRA_PAYLOAD_SIZE; + + int32_t schemaLen = sizeof(STColumn) * numOfTags; + int32_t size = sizeof(SUpdateTableTagValMsg) + pTagsSchema->bytes + schemaLen + TSDB_EXTRA_PAYLOAD_SIZE; + if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) { tscError("%p failed to malloc for alter table msg", pSql); return TSDB_CODE_TSC_OUT_OF_MEMORY; @@ -4487,11 +4490,25 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { pUpdateMsg->tid = htonl(pTableMeta->sid); pUpdateMsg->uid = htobe64(pTableMeta->uid); pUpdateMsg->colId = htons(pTagsSchema->colId); - pUpdateMsg->type = htons(pTagsSchema->type); - pUpdateMsg->bytes = htons(pTagsSchema->bytes); pUpdateMsg->tversion = htons(pTableMeta->tversion); - - tVariantDump(&pVarList->a[1].pVar, pUpdateMsg->data, pTagsSchema->type, true); + pUpdateMsg->numOfTags = htons(numOfTags); + pUpdateMsg->schemaLen = htonl(schemaLen); + + // the schema is located after the msg body, then followed by true tag value + char* d = pUpdateMsg->data; + SSchema* pTagCols = tscGetTableTagSchema(pTableMeta); + for (int i = 0; i < numOfTags; ++i) { + STColumn* pCol = (STColumn*) d; + pCol->colId = htons(pTagCols[i].colId); + pCol->bytes = htons(pTagCols[i].bytes); + pCol->type = pTagCols[i].type; + pCol->offset = 0; + + d += sizeof(STColumn); + } + + // copy the tag value to msg body + tVariantDump(&pVarList->a[1].pVar, pUpdateMsg->data + schemaLen, pTagsSchema->type, true); int32_t len = 0; if (pTagsSchema->type != TSDB_DATA_TYPE_BINARY && pTagsSchema->type != TSDB_DATA_TYPE_NCHAR) { @@ -4502,7 +4519,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { pUpdateMsg->tagValLen = htonl(len); // length may be changed after dump data - int32_t total = sizeof(SUpdateTableTagValMsg) + len; + int32_t total = sizeof(SUpdateTableTagValMsg) + len + schemaLen; pUpdateMsg->head.contLen = htonl(total); } else if (pAlterSQL->type == TSDB_ALTER_TABLE_ADD_COLUMN) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index e85ade60e509373dd52a37e8774a6bca1c7b9d56..1b2334b99815ab44a382df90fb7d298b545f504d 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -14,8 +14,8 @@ */ #include "os.h" -#include "qsqltype.h" #include "tcache.h" +#include "tcmdtype.h" #include "trpc.h" #include "tscLocalMerge.h" #include "tscLog.h" diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 26a81c597f077e5101b3920e792c650d00ca50f3..9b6eff71232f609c18d02ab34c0c3d2ccf76312d 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1115,31 +1115,6 @@ SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex) { return taosArrayGetP(pColumnList, i); } -SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters) { - if (numOfFilters == 0) { - assert(src == NULL); - return NULL; - } - - SColumnFilterInfo* pFilter = calloc(1, numOfFilters * sizeof(SColumnFilterInfo)); - - memcpy(pFilter, src, sizeof(SColumnFilterInfo) * numOfFilters); - for (int32_t j = 0; j < numOfFilters; ++j) { - - if (pFilter[j].filterstr) { - size_t len = (size_t) pFilter[j].len + 1 * TSDB_NCHAR_SIZE; - pFilter[j].pz = (int64_t) calloc(1, len); - - memcpy((char*)pFilter[j].pz, (char*)src[j].pz, (size_t)len); - } - } - - assert(src->filterstr == 0 || src->filterstr == 1); - assert(!(src->lowerRelOptr == TSDB_RELATION_INVALID && src->upperRelOptr == TSDB_RELATION_INVALID)); - - return pFilter; -} - static void destroyFilterInfo(SColumnFilterInfo* pFilterInfo, int32_t numOfFilters) { for(int32_t i = 0; i < numOfFilters; ++i) { if (pFilterInfo[i].filterstr) { diff --git a/src/common/inc/qsqltype.h b/src/common/inc/tcmdtype.h similarity index 97% rename from src/common/inc/qsqltype.h rename to src/common/inc/tcmdtype.h index 6f6493d17ca8b3a3c180332a728d0529dc6d474a..90fb5bf47854313a67e395eea7b99a992a579889 100644 --- a/src/common/inc/qsqltype.h +++ b/src/common/inc/tcmdtype.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef TDENGINE_QSQLCMD_H -#define TDENGINE_QSQLCMD_H +#ifndef TDENGINE_TSQLMSGTYPE_H +#define TDENGINE_TSQLMSGTYPE_H #ifdef __cplusplus extern "C" { @@ -109,4 +109,4 @@ extern char *sqlCmd[]; } #endif -#endif // TDENGINE_QSQLCMD_H +#endif // TDENGINE_TSQLMSGTYPE_H diff --git a/src/common/inc/tname.h b/src/common/inc/tname.h index d2008c9ff8181b54db83fdd92f777ba74489ce73..10d725db32102576837621c0dc50a822ba9a0104 100644 --- a/src/common/inc/tname.h +++ b/src/common/inc/tname.h @@ -27,4 +27,6 @@ SSchema tGetTableNameColumnSchema(); bool tscValidateTableNameLength(size_t len); +SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters); + #endif // TDENGINE_NAME_H diff --git a/src/common/src/sqlcmdstr.c b/src/common/src/sqlcmdstr.c index 8584ba79761835989ab7a3e24d88824c14d107c5..672106523e7c7eab8c606db1940dd9485e7f4c8f 100644 --- a/src/common/src/sqlcmdstr.c +++ b/src/common/src/sqlcmdstr.c @@ -15,4 +15,4 @@ #define TSDB_SQL_C -#include "qsqltype.h" +#include "tcmdtype.h" diff --git a/src/common/src/tname.c b/src/common/src/tname.c index 2514ed26e55e54eddf54d83933beecdfbf4e06fa..295015d466456843c5ec149763d96edc2029f864 100644 --- a/src/common/src/tname.c +++ b/src/common/src/tname.c @@ -49,4 +49,29 @@ SSchema tGetTableNameColumnSchema() { bool tscValidateTableNameLength(size_t len) { return len < TSDB_TABLE_NAME_LEN; -} \ No newline at end of file +} + +SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters) { + if (numOfFilters == 0) { + assert(src == NULL); + return NULL; + } + + SColumnFilterInfo* pFilter = calloc(1, numOfFilters * sizeof(SColumnFilterInfo)); + + memcpy(pFilter, src, sizeof(SColumnFilterInfo) * numOfFilters); + for (int32_t j = 0; j < numOfFilters; ++j) { + + if (pFilter[j].filterstr) { + size_t len = (size_t) pFilter[j].len + 1 * TSDB_NCHAR_SIZE; + pFilter[j].pz = (int64_t) calloc(1, len); + + memcpy((char*)pFilter[j].pz, (char*)src[j].pz, (size_t)len); + } + } + + assert(src->filterstr == 0 || src->filterstr == 1); + assert(!(src->lowerRelOptr == TSDB_RELATION_INVALID && src->upperRelOptr == TSDB_RELATION_INVALID)); + + return pFilter; +} diff --git a/src/inc/query.h b/src/inc/query.h index af3a89682c8c931b4e3aa5db250bb5e082759bdd..eb8abace6278bb96c6ab5b4984735b09f347923e 100644 --- a/src/inc/query.h +++ b/src/inc/query.h @@ -44,7 +44,7 @@ void qDestroyQueryInfo(qinfo_t qinfo); * @param qinfo * @return */ -void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param); +void qTableQuery(qinfo_t qinfo); /** * Retrieve the produced results information, if current query is not paused or completed, diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index cb25242d27d3f8e50af643df3c447c7aaae76484..6155f08e76646ff887a51be60a0bfa500a6f873c 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -285,9 +285,9 @@ typedef struct { int32_t tid; int16_t tversion; int16_t colId; - int16_t type; - int16_t bytes; int32_t tagValLen; + int16_t numOfTags; + int32_t schemaLen; char data[]; } SUpdateTableTagValMsg; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index d7d59230b65bff91d28f42e732a54acb176019f7..3aa1b60be576e9b0d3559d0f011c171a9af33a16 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -95,16 +95,13 @@ typedef struct SSingleColumnFilterInfo { } SSingleColumnFilterInfo; typedef struct STableQueryInfo { // todo merge with the STableQueryInfo struct - int32_t tableIndex; - int32_t groupIndex; // group id in table list TSKEY lastKey; - int32_t numOfRes; + int32_t groupIndex; // group id in table list int16_t queryRangeSet; // denote if the query range is set, only available for interval query int64_t tag; STimeWindow win; STSCursor cur; - void* pTable; // for retrieve the page id list - + void* pTable; // for retrieve the page id list SWindowResInfo windowResInfo; } STableQueryInfo; @@ -127,11 +124,6 @@ typedef struct SQueryCostInfo { uint64_t computTime; } SQueryCostInfo; -//typedef struct SGroupItem { -// void *pTable; -// STableQueryInfo *info; -//} SGroupItem; - typedef struct SQuery { int16_t numOfCols; int16_t numOfTags; @@ -173,12 +165,12 @@ typedef struct SQueryRuntimeEnv { STSBuf* pTSBuf; STSCursor cur; SQueryCostInfo summary; - bool stableQuery; // super table query or not void* pQueryHandle; void* pSecQueryHandle; // another thread for - SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file + bool stableQuery; // super table query or not bool topBotQuery; // false int32_t prevGroupId; // previous executed group id + SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file } SQueryRuntimeEnv; typedef struct SQInfo { @@ -205,7 +197,8 @@ typedef struct SQInfo { */ int32_t tableIndex; int32_t numOfGroupResultPages; - _qinfo_free_fn_t fn; + _qinfo_free_fn_t freeFn; + jmp_buf env; } SQInfo; #endif // TDENGINE_QUERYEXECUTOR_H diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index f33d739ba1f41e22c90ee33f6cc6133febbe0bee..8e0537016a900d32c6771e39f52ce8e1af73bf85 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ #include "os.h" +#include "taosmsg.h" #include "qfill.h" #include "hash.h" @@ -22,9 +23,8 @@ #include "qresultBuf.h" #include "query.h" #include "queryLog.h" -#include "taosmsg.h" #include "tlosertree.h" -#include "tscUtil.h" // todo move the function to common module +#include "exception.h" #include "tscompression.h" #include "ttime.h" @@ -87,6 +87,17 @@ typedef struct { STSCursor cur; } SQueryStatusInfo; +static UNUSED_FUNC void *u_malloc (size_t __size) { +// uint32_t v = rand(); +// if (v % 5 <= 1) { +// return NULL; +// } else { + return malloc(__size); +// } +} + +#define malloc u_malloc + #define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st))) #define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList) #define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index))) @@ -2586,7 +2597,6 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { } int64_t getNumOfResultWindowRes(SQuery *pQuery, SWindowResult *pWindowRes) { -// int64_t maxOutput = 0; for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { int32_t functionId = pQuery->pSelectExpr[j].base.functionId; @@ -2604,15 +2614,6 @@ int64_t getNumOfResultWindowRes(SQuery *pQuery, SWindowResult *pWindowRes) { if (pResultInfo->numOfRes > 0) { return pResultInfo->numOfRes; } -// if (pResultInfo != NULL && maxOutput < pResultInfo->numOfRes) { -// maxOutput = pResultInfo->numOfRes; -// -// if (maxOutput > 0) { -// break; -// } -// } -// -// assert(pResultInfo != NULL); } return 0; @@ -2623,12 +2624,19 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { SQuery * pQuery = pRuntimeEnv->pQuery; size_t size = taosArrayGetSize(pGroup); - tFilePage **buffer = pQuery->sdata; - int32_t * posList = calloc(size, sizeof(int32_t)); + int32_t* posList = calloc(size, sizeof(int32_t)); STableQueryInfo **pTableList = malloc(POINTER_BYTES * size); + if (pTableList == NULL || posList == NULL) { + tfree(posList); + tfree(pTableList); + + qError("QInfo:%p failed alloc memory", pQInfo); + longjmp(pQInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } + // todo opt for the case of one table per group int32_t numOfTables = 0; for (int32_t i = 0; i < size; ++i) { @@ -4069,7 +4077,7 @@ static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) { return pFillCol; } -int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bool isSTableQuery, void* freeParam, _qinfo_free_fn_t fn) { +int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bool isSTableQuery) { int32_t code = TSDB_CODE_SUCCESS; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; @@ -4083,8 +4091,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo pQInfo->tsdb = tsdb; pQInfo->vgId = vgId; - pQInfo->param = freeParam; - pQInfo->fn = fn; pRuntimeEnv->pQuery = pQuery; pRuntimeEnv->pTSBuf = pTsBuf; @@ -4932,14 +4938,6 @@ static void tableQueryImpl(SQInfo *pQInfo) { // record the total elapsed time pRuntimeEnv->summary.elapsedTime += (taosGetTimestampUs() - st); assert(pQInfo->tableqinfoGroupInfo.numOfTables == 1); - - /* check if query is killed or not */ - if (isQueryKilled(pQInfo)) { - qDebug("QInfo:%p query is killed", pQInfo); - } else { - qDebug("QInfo:%p query paused, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows", - pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows); - } } static void stableQueryImpl(SQInfo *pQInfo) { @@ -4961,10 +4959,6 @@ static void stableQueryImpl(SQInfo *pQInfo) { // record the total elapsed time pQInfo->runtimeEnv.summary.elapsedTime += (taosGetTimestampUs() - st); - - if (pQuery->rec.rows == 0) { - qDebug("QInfo:%p over, %zu tables queried, %"PRId64" rows are returned", pQInfo, pQInfo->tableqinfoGroupInfo.numOfTables, pQuery->rec.total); - } } static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg, SColumnInfo* pTagCols) { @@ -5076,6 +5070,8 @@ static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **p */ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, SSqlFuncMsg ***pExpr, char **tagCond, char** tbnameCond, SColIndex **groupbyCols, SColumnInfo** tagCols) { + int32_t code = TSDB_CODE_SUCCESS; + pQueryMsg->numOfTables = htonl(pQueryMsg->numOfTables); pQueryMsg->window.skey = htobe64(pQueryMsg->window.skey); @@ -5102,7 +5098,8 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, // query msg safety check if (!validateQueryMsg(pQueryMsg)) { - return TSDB_CODE_QRY_INVALID_MSG; + code = TSDB_CODE_QRY_INVALID_MSG; + goto _cleanup; } char *pMsg = (char *)(pQueryMsg->colList) + sizeof(SColumnInfo) * pQueryMsg->numOfCols; @@ -5174,7 +5171,8 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, int16_t functionId = pExprMsg->functionId; if (functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_TAG_DUMMY) { if (pExprMsg->colInfo.flag != TSDB_COL_TAG) { // ignore the column index check for arithmetic expression. - return TSDB_CODE_QRY_INVALID_MSG; + code = TSDB_CODE_QRY_INVALID_MSG; + goto _cleanup; } } else { // if (!validateExprColumnInfo(pQueryMsg, pExprMsg)) { @@ -5186,6 +5184,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, } if (!validateQuerySourceCols(pQueryMsg, *pExpr)) { + code = TSDB_CODE_QRY_INVALID_MSG; goto _cleanup; } @@ -5193,6 +5192,10 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, if (pQueryMsg->numOfGroupCols > 0) { // group by tag columns *groupbyCols = malloc(pQueryMsg->numOfGroupCols * sizeof(SColIndex)); + if (*groupbyCols == NULL) { + code = TSDB_CODE_QRY_OUT_OF_MEMORY; + goto _cleanup; + } for (int32_t i = 0; i < pQueryMsg->numOfGroupCols; ++i) { (*groupbyCols)[i].colId = *(int16_t *)pMsg; @@ -5248,7 +5251,13 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, if (*pMsg != 0) { size_t len = strlen(pMsg) + 1; + *tbnameCond = malloc(len); + if (*tbnameCond == NULL) { + code = TSDB_CODE_QRY_OUT_OF_MEMORY; + goto _cleanup; + } + strcpy(*tbnameCond, pMsg); pMsg += len; } @@ -5258,7 +5267,8 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->queryType, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols, pQueryMsg->order, pQueryMsg->numOfOutput, pQueryMsg->numOfCols, pQueryMsg->intervalTime, pQueryMsg->fillType, pQueryMsg->tsLen, pQueryMsg->tsNumOfBlocks, pQueryMsg->limit, pQueryMsg->offset); - return 0; + + return TSDB_CODE_SUCCESS; _cleanup: tfree(*pExpr); @@ -5268,7 +5278,8 @@ _cleanup: tfree(*groupbyCols); tfree(*tagCols); tfree(*tagCond); - return TSDB_CODE_QRY_INVALID_MSG; + + return code; } static int32_t buildAirthmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTableMsg *pQueryMsg) { @@ -5656,7 +5667,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, STableQueryInfo* item = createTableQueryInfo(&pQInfo->runtimeEnv, pTable, window); item->groupIndex = i; - item->tableIndex = tableIndex++; taosArrayPush(p1, &item); taosHashPut(pQInfo->tableqinfoGroupInfo.map, &id.tid, sizeof(id.tid), &item, POINTER_BYTES); } @@ -5670,7 +5680,8 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, pQuery->window = pQueryMsg->window; if (sem_init(&pQInfo->dataReady, 0, 0) != 0) { - qError("QInfo:%p init dataReady sem failed, reason:%s", pQInfo, strerror(errno)); + int32_t code = TAOS_SYSTEM_ERROR(errno); + qError("QInfo:%p init dataReady sem failed, reason:%s", pQInfo, tstrerror(code)); goto _cleanup; } @@ -5681,7 +5692,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, _cleanup: freeQInfo(pQInfo); - return NULL; } @@ -5723,6 +5733,9 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ return TSDB_CODE_SUCCESS; } + pQInfo->param = param; + pQInfo->freeFn = fn; + if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) { qDebug("QInfo:%p no table qualified for tag filter, abort query", pQInfo); setQueryStatus(pQuery, QUERY_COMPLETED); @@ -5732,7 +5745,7 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ } // filter the qualified - if ((code = doInitQInfo(pQInfo, pTSBuf, tsdb, vgId, isSTable, param, fn)) != TSDB_CODE_SUCCESS) { + if ((code = doInitQInfo(pQInfo, pTSBuf, tsdb, vgId, isSTable)) != TSDB_CODE_SUCCESS) { goto _error; } @@ -6032,19 +6045,19 @@ void qDestroyQueryInfo(qinfo_t qHandle) { qDebug("QInfo:%p dec refCount, value:%d", pQInfo, ref); if (ref == 0) { - _qinfo_free_fn_t fn = pQInfo->fn; + _qinfo_free_fn_t freeFp = pQInfo->freeFn; void* param = pQInfo->param; doDestoryQueryInfo(pQInfo); - if (fn != NULL) { + if (freeFp != NULL) { assert(param != NULL); - fn(param); + freeFp(param); } } } -void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param) { +void qTableQuery(qinfo_t qinfo) { SQInfo *pQInfo = (SQInfo *)qinfo; if (pQInfo == NULL || pQInfo->signature != pQInfo) { @@ -6054,17 +6067,34 @@ void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param) { if (isQueryKilled(pQInfo)) { qDebug("QInfo:%p it is already killed, abort", pQInfo); + + sem_post(&pQInfo->dataReady); qDestroyQueryInfo(pQInfo); return; } if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) { qDebug("QInfo:%p no table exists for query, abort", pQInfo); + + sem_post(&pQInfo->dataReady); + qDestroyQueryInfo(pQInfo); + return; + } + + int32_t ret = setjmp(pQInfo->env); + // error occurs, record the error code and return to client + if (ret != TSDB_CODE_SUCCESS) { + pQInfo->code = ret; + qDebug("QInfo:%p query abort due to error occurs, code:%s", pQInfo, tstrerror(pQInfo->code)); + sem_post(&pQInfo->dataReady); + qDestroyQueryInfo(pQInfo); + return; } qDebug("QInfo:%p query task is launched", pQInfo); + SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; if (onlyQueryTags(pQInfo->runtimeEnv.pQuery)) { assert(pQInfo->runtimeEnv.pQueryHandle == NULL); buildTagQueryResult(pQInfo); // todo support the limit/offset @@ -6074,6 +6104,16 @@ void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param) { tableQueryImpl(pQInfo); } + SQuery* pQuery = pRuntimeEnv->pQuery; + if (isQueryKilled(pQInfo)) { + qDebug("QInfo:%p query is killed", pQInfo); + } else if (pQuery->rec.rows == 0) { + qDebug("QInfo:%p over, %zu tables queried, %"PRId64" rows are returned", pQInfo, pQInfo->tableqinfoGroupInfo.numOfTables, pQuery->rec.total); + } else { + qDebug("QInfo:%p query paused, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows", + pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows); + } + sem_post(&pQInfo->dataReady); qDestroyQueryInfo(pQInfo); } diff --git a/src/query/src/qast.c b/src/query/src/qast.c index dc3b1499bbe02fd389cb1502361fd5cdc46bc54e..721cd8ae5a2fec6233faf84f71ecf08378ab19d9 100644 --- a/src/query/src/qast.c +++ b/src/query/src/qast.c @@ -1173,9 +1173,7 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) { size_t len = strlen(cond) + VARSTR_HEADER_SIZE; char* p = exception_malloc(len); - varDataSetLen(p, len - VARSTR_HEADER_SIZE); - memcpy(varDataVal(p), cond, len); - + STR_WITH_SIZE_TO_VARSTR(p, cond, len - VARSTR_HEADER_SIZE); taosArrayPush(pVal->arr, &p); } diff --git a/src/query/src/qparserImpl.c b/src/query/src/qparserImpl.c index 928b9eb8738e5c3ee299ed595fb163182c76e5a8..d4ac540d2fc725488bae83fb8890840f6ddb59a9 100644 --- a/src/query/src/qparserImpl.c +++ b/src/query/src/qparserImpl.c @@ -15,16 +15,16 @@ #include "os.h" #include "qsqlparser.h" +#include "queryLog.h" #include "taosdef.h" #include "taosmsg.h" +#include "tcmdtype.h" #include "tglobal.h" #include "tstoken.h" +#include "tstrbuild.h" #include "ttime.h" #include "ttokendef.h" #include "tutil.h" -#include "qsqltype.h" -#include "tstrbuild.h" -#include "queryLog.h" SSqlInfo qSQLParse(const char *pStr) { void *pParser = ParseAlloc(malloc); diff --git a/src/query/src/sql.c b/src/query/src/sql.c index e75802a98f4e94c23034072d908b8b5ece3acd7a..eafb052593b121047e3388b7ead3e754ffd3e55d 100644 --- a/src/query/src/sql.c +++ b/src/query/src/sql.c @@ -25,17 +25,17 @@ #include /************ Begin %include sections from the grammar ************************/ +#include +#include #include #include #include -#include -#include -#include "tutil.h" #include "qsqlparser.h" +#include "tcmdtype.h" #include "tstoken.h" -#include "tvariant.h" #include "ttokendef.h" -#include "qsqltype.h" +#include "tutil.h" +#include "tvariant.h" /**************** End of %include directives **********************************/ /* These constants specify the various numeric values for terminal symbols ** in a format understandable to "makeheaders". This section is blank unless diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 9c9ac1699a38f1545de273b41792ecb32677b812..25a1cf88ab75fe5772f0099ed5dc528a3b967fcd 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -255,17 +255,46 @@ _err: return NULL; } +static int32_t colIdCompar(const void* left, const void* right) { + int16_t colId = *(int16_t*) left; + STColumn* p2 = (STColumn*) right; + + if (colId == p2->colId) { + return 0; + } + + return (colId < p2->colId)? -1:1; +} + int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) { STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbMeta *pMeta = pRepo->tsdbMeta; - int16_t tversion = htons(pMsg->tversion); - STable *pTable = tsdbGetTableByUid(pMeta, htobe64(pMsg->uid)); + pMsg->uid = htobe64(pMsg->uid); + pMsg->tid = htonl(pMsg->tid); + pMsg->tversion = htons(pMsg->tversion); + pMsg->colId = htons(pMsg->colId); + pMsg->tagValLen = htonl(pMsg->tagValLen); + pMsg->numOfTags = htons(pMsg->numOfTags); + pMsg->schemaLen = htonl(pMsg->schemaLen); + assert(pMsg->schemaLen == sizeof(STColumn) * pMsg->numOfTags); + + char* d = pMsg->data; + for(int32_t i = 0; i < pMsg->numOfTags; ++i) { + STColumn* pCol = (STColumn*) d; + pCol->colId = htons(pCol->colId); + pCol->bytes = htonl(pCol->bytes); + assert(pCol->offset == 0); + + d += sizeof(STColumn); + } + + STable *pTable = tsdbGetTableByUid(pMeta, pMsg->uid); if (pTable == NULL) { terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; return -1; } - if (TABLE_TID(pTable) != htonl(pMsg->tid)) { + if (TABLE_TID(pTable) != pMsg->tid) { terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; return -1; } @@ -277,10 +306,10 @@ int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) { return -1; } - if (schemaVersion(tsdbGetTableTagSchema(pTable)) < tversion) { + if (schemaVersion(tsdbGetTableTagSchema(pTable)) < pMsg->tversion) { tsdbDebug("vgId:%d server tag version %d is older than client tag version %d, try to config", REPO_ID(pRepo), - schemaVersion(tsdbGetTableTagSchema(pTable)), tversion); - void *msg = (*pRepo->appH.configFunc)(pRepo->config.tsdbId, htonl(pMsg->tid)); + schemaVersion(tsdbGetTableTagSchema(pTable)), pMsg->tversion); + void *msg = (*pRepo->appH.configFunc)(pRepo->config.tsdbId, pMsg->tid); if (msg == NULL) return -1; // Deal with error her @@ -299,19 +328,24 @@ int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) { STSchema *pTagSchema = tsdbGetTableTagSchema(pTable); - if (schemaVersion(pTagSchema) > tversion) { + if (schemaVersion(pTagSchema) > pMsg->tversion) { tsdbError( "vgId:%d failed to update tag value of table %s since version out of date, client tag version %d server tag " "version %d", - REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), tversion, schemaVersion(pTable->tagSchema)); + REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), pMsg->tversion, schemaVersion(pTable->tagSchema)); return TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE; } - if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == htons(pMsg->colId)) { + if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == pMsg->colId) { tsdbRemoveTableFromIndex(pMeta, pTable); } // TODO: remove table from index if it is the first column of tag - tdSetKVRowDataOfCol(&pTable->tagVal, htons(pMsg->colId), htons(pMsg->type), pMsg->data); - if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == htons(pMsg->colId)) { + + // TODO: convert the tag schema from client, and then extract the type and bytes from schema according to colId + STColumn* res = bsearch(&pMsg->colId, pMsg->data, pMsg->numOfTags, sizeof(STColumn), colIdCompar); + assert(res != NULL); + + tdSetKVRowDataOfCol(&pTable->tagVal, pMsg->colId, res->type, pMsg->data + pMsg->schemaLen); + if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == pMsg->colId) { tsdbAddTableIntoIndex(pMeta, pTable); } return TSDB_CODE_SUCCESS; diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 19a022e0a7acb92643e1282970d4d625da33a75f..cc2b6757fc5b148f3634b338bb15132dd0b1a3a8 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -74,9 +74,6 @@ typedef struct STableCheckInfo { SDataCols* pDataCols; int32_t chosen; // indicate which iterator should move forward bool initBuf; // whether to initialize the in-memory skip list iterator or not - SMemTable* mem; // in-mem buffer, hold the ref count - SMemTable* imem; // imem buffer, hold the ref count to avoid release - SSkipListIterator* iter; // mem buffer skip list iterator SSkipListIterator* iiter; // imem buffer skip list iterator } STableCheckInfo; @@ -113,6 +110,8 @@ typedef struct STsdbQueryHandle { SFileGroupIter fileIter; SRWHelper rhelper; STableBlockInfo* pDataBlockInfo; + SMemTable* mem; // mem-table + SMemTable* imem; // imem-table, acquired from snapshot SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */ SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */ @@ -138,9 +137,6 @@ static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) { } TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, void* qinfo) { - // todo 1. filter not exist table - // todo 2. add the reference count for each table that is involved in query - STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle)); pQueryHandle->order = pCond->order; pQueryHandle->window = pCond->twindow; @@ -154,6 +150,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock; tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb); + tsdbTakeMemSnapshot(pQueryHandle->pTsdb, &pQueryHandle->mem, &pQueryHandle->imem); size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList); assert(sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0); @@ -252,22 +249,22 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh pCheckInfo->initBuf = true; int32_t order = pHandle->order; - tsdbTakeMemSnapshot(pHandle->pTsdb, &pCheckInfo->mem, &pCheckInfo->imem); +// tsdbTakeMemSnapshot(pHandle->pTsdb, &pCheckInfo->mem, &pCheckInfo->imem); // no data in buffer, abort - if (pCheckInfo->mem == NULL && pCheckInfo->imem == NULL) { + if (pHandle->mem == NULL && pHandle->imem == NULL) { return false; } assert(pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL); - if (pCheckInfo->mem && pCheckInfo->mem->tData[pCheckInfo->tableId.tid] != NULL) { - pCheckInfo->iter = tSkipListCreateIterFromVal(pCheckInfo->mem->tData[pCheckInfo->tableId.tid]->pData, + if (pHandle->mem && pHandle->mem->tData[pCheckInfo->tableId.tid] != NULL) { + pCheckInfo->iter = tSkipListCreateIterFromVal(pHandle->mem->tData[pCheckInfo->tableId.tid]->pData, (const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order); } - if (pCheckInfo->imem && pCheckInfo->imem->tData[pCheckInfo->tableId.tid] != NULL) { - pCheckInfo->iiter = tSkipListCreateIterFromVal(pCheckInfo->imem->tData[pCheckInfo->tableId.tid]->pData, + if (pHandle->imem && pHandle->imem->tData[pCheckInfo->tableId.tid] != NULL) { + pCheckInfo->iiter = tSkipListCreateIterFromVal(pHandle->imem->tData[pCheckInfo->tableId.tid]->pData, (const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order); } @@ -2319,9 +2316,6 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); tSkipListDestroyIter(pTableCheckInfo->iter); - tsdbUnRefMemTable(pQueryHandle->pTsdb, pTableCheckInfo->mem); - tsdbUnRefMemTable(pQueryHandle->pTsdb, pTableCheckInfo->imem); - if (pTableCheckInfo->pDataCols != NULL) { tfree(pTableCheckInfo->pDataCols->buf); } @@ -2341,9 +2335,12 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { taosArrayDestroy(pQueryHandle->pColumns); tfree(pQueryHandle->pDataBlockInfo); tfree(pQueryHandle->statis); - + + // todo check error + tsdbUnRefMemTable(pQueryHandle->pTsdb, pQueryHandle->mem); + tsdbUnRefMemTable(pQueryHandle->pTsdb, pQueryHandle->imem); + tsdbDestroyHelper(&pQueryHandle->rhelper); - tfree(pQueryHandle); } diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index ab760b34504b8e31bfe2942bfc1e05a5f99715b0..745bb2cb8f6a4f172f2caf8ea148ae93b7656ea9 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -555,7 +555,6 @@ void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem) { pCacheObj->freeFp(pElem->pData->data); } - uError("free obj:%p", pElem->pData); free(pElem->pData); free(pElem); } diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 95e794f22d0ed3bfcc8a0eccb8f2727e5207a77b..5eb78fda526607ab6518c850b25c1d533b8767a2 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -324,7 +324,7 @@ void vnodeRelease(void *pVnodeRaw) { assert(refCount >= 0); if (refCount > 0) { - vTrace("vgId:%d, release vnode, refCount:%d", vgId, refCount); + vDebug("vgId:%d, release vnode, refCount:%d", vgId, refCount); return; } @@ -388,7 +388,7 @@ void *vnodeAccquireVnode(int32_t vgId) { if (pVnode == NULL) return pVnode; atomic_add_fetch_32(&pVnode->refCount, 1); - vTrace("vgId:%d, get vnode, refCount:%d", pVnode->vgId, pVnode->refCount); + vDebug("vgId:%d, get vnode, refCount:%d", pVnode->vgId, pVnode->refCount); return pVnode; } @@ -466,6 +466,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { vTrace("vgId:%d, vnode will cleanup, refCount:%d", pVnode->vgId, pVnode->refCount); // release local resources only after cutting off outside connections + taosCacheCleanup(pVnode->qHandlePool); vnodeRelease(pVnode); } diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 6b157a036752c4e260707a902a778b90281c1b44..48d2fa6878945f799f240918037c167466ce191d 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -78,15 +78,14 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { // this message arrived here by means of the *query* message, so release the vnode is necessary void** qhandle = taosCacheAcquireByKey(pVnode->qHandlePool, (void*) &killQueryMsg->qhandle, sizeof(killQueryMsg->qhandle)); - if (qhandle == NULL || *qhandle == NULL) { // todo handle invalid qhandle error - + if (qhandle == NULL || *qhandle == NULL) { + vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle); } else { -// qKillQuery((qinfo_t) killQueryMsg->qhandle); taosCacheRelease(pVnode->qHandlePool, (void**) &qhandle, true); } vnodeRelease(pVnode); - return TSDB_CODE_TSC_QUERY_CANCELLED; // todo change the error code + return TSDB_CODE_TSC_QUERY_CANCELLED; } int32_t code = TSDB_CODE_SUCCESS; @@ -97,8 +96,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, pVnode, vnodeRelease, &pQInfo); SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); - pRsp->qhandle = htobe64((uint64_t) (pQInfo)); - pRsp->code = code; + pRsp->code = code; + pRsp->qhandle = 0; pRet->len = sizeof(SQueryTableRsp); pRet->rsp = pRsp; @@ -120,6 +119,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { handle = taosCachePut(pVnode->qHandlePool, pQInfo, sizeof(pQInfo), &pQInfo, sizeof(pQInfo), tsShellActivityTimer * 2); assert(*handle == pQInfo); + pRsp->qhandle = htobe64((uint64_t) (handle)); } else { assert(pQInfo == NULL); vnodeRelease(pVnode); @@ -128,13 +128,17 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { vDebug("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo); } else { assert(pCont != NULL); - pQInfo = pCont; + pQInfo = *(void**)(pCont); + handle = pCont; code = TSDB_CODE_VND_ACTION_IN_PROGRESS; + vDebug("vgId:%d, QInfo:%p, dnode query msg in progress", pVnode->vgId, pQInfo); } if (pQInfo != NULL) { - qTableQuery(pQInfo, vnodeRelease, pVnode); // do execute query + qTableQuery(pQInfo); // do execute query + + assert(handle != NULL); taosCacheRelease(pVnode->qHandlePool, (void**) &handle, false); } @@ -146,52 +150,54 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { SRspRet *pRet = &pReadMsg->rspRet; SRetrieveTableMsg *pRetrieve = pCont; - void *pQInfo = (void*) htobe64(pRetrieve->qhandle); + void **pQInfo = (void*) htobe64(pRetrieve->qhandle); pRetrieve->free = htons(pRetrieve->free); + vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed", pVnode->vgId, *pQInfo); + memset(pRet, 0, sizeof(SRspRet)); int32_t ret = 0; - void** handle = taosCacheAcquireByKey(pVnode->qHandlePool, &pQInfo, sizeof(pQInfo)); - if (handle == NULL || *handle != pQInfo) { + void** handle = taosCacheAcquireByKey(pVnode->qHandlePool, pQInfo, sizeof(pQInfo)); + if (handle == NULL || handle != pQInfo) { ret = TSDB_CODE_QRY_INVALID_QHANDLE; } if (pRetrieve->free == 1) { - vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pQInfo); - - taosCacheRelease(pVnode->qHandlePool, handle, true); -// int32_t ret = qKillQuery(pQInfo); + if (ret == TSDB_CODE_SUCCESS) { + vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pQInfo); - pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); - pRet->len = sizeof(SRetrieveTableRsp); + taosCacheRelease(pVnode->qHandlePool, (void**) &handle, true); + pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); + pRet->len = sizeof(SRetrieveTableRsp); - memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); - SRetrieveTableRsp* pRsp = pRet->rsp; - pRsp->numOfRows = 0; - pRsp->completed = true; - pRsp->useconds = 0; + memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); + SRetrieveTableRsp* pRsp = pRet->rsp; + pRsp->numOfRows = 0; + pRsp->completed = true; + pRsp->useconds = 0; + } else { // todo handle error + } return ret; } - vDebug("vgId:%d, QInfo:%p, retrieve msg is received", pVnode->vgId, pQInfo); + vDebug("vgId:%d, QInfo:%p, retrieve msg is received", pVnode->vgId, *pQInfo); - int32_t code = qRetrieveQueryResultInfo(pQInfo); + int32_t code = qRetrieveQueryResultInfo(*pQInfo); if (code != TSDB_CODE_SUCCESS) { //TODO pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); } else { // todo check code and handle error in build result set - code = qDumpRetrieveResult(pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len); + code = qDumpRetrieveResult(*pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len); - if (qHasMoreResultsToRetrieve(pQInfo)) { - pRet->qhandle = pQInfo; + if (qHasMoreResultsToRetrieve(*pQInfo)) { + pRet->qhandle = handle; code = TSDB_CODE_VND_ACTION_NEED_REPROCESSED; } else { // no further execution invoked, release the ref to vnode taosCacheRelease(pVnode->qHandlePool, (void**) &handle, true); -// qDestroyQueryInfo(pQInfo); } }