diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 2dd580dda00651d8d7d4d84505147a8f464d954c..fdc0ae909519eba49b48257d65515537016c3f0a 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -294,20 +294,21 @@ typedef struct SResRec { struct STSBuf; typedef struct { - int32_t code; int64_t numOfRows; // num of results in current retrieved int64_t numOfTotal; // num of total results int64_t numOfTotalInCurrentClause; // num of total result in current subclause char * pRsp; - int rspType; - int rspLen; + int32_t rspType; + int32_t rspLen; uint64_t qhandle; int64_t uid; int64_t useconds; int64_t offset; // offset value from vnode during projection query of stable - int row; + int32_t row; int16_t numOfCols; int16_t precision; + bool completed; + int32_t code; int32_t numOfGroups; SResRec * pGroupRec; char * data; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 605c7a22cca5f00c9b33556fcb283e6db419f407..348c0709f17bfcf2bb9c54ddfdd310021f08bd42 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -209,7 +209,6 @@ int tscSendMsgToServer(SSqlObj *pSql) { } void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { - tscTrace("response:%s is received, len:%d error:%s", taosMsg[rpcMsg->msgType], rpcMsg->contLen, tstrerror(rpcMsg->code)); SSqlObj *pSql = (SSqlObj *)rpcMsg->handle; if (pSql == NULL || pSql->signature != pSql) { tscError("%p sql is already released, signature:%p", pSql, pSql->signature); @@ -256,7 +255,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { rpcFreeCont(rpcMsg->pCont); return; } else { - tscTrace("%p it shall renew meter meta, code:%d", pSql, rpcMsg->code); + tscTrace("%p it shall renew meter meta, code:%d", pSql, tstrerror(rpcMsg->code)); pSql->maxRetry = TSDB_VNODES_SUPPORT * 2; pSql->res.code = rpcMsg->code; // keep the previous error code @@ -278,7 +277,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { if (pRes->code != TSDB_CODE_QUERY_CANCELLED) { pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_NETWORK_UNAVAIL; } else { - tscTrace("%p query is cancelled, code:%d", pSql, pRes->code); + tscTrace("%p query is cancelled, code:%d", pSql, tstrerror(pRes->code)); } if (pRes->code != TSDB_CODE_QUERY_CANCELLED) { @@ -318,19 +317,17 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { tscTrace("%p cmd:%d code:%d, inserted rows:%d, rsp len:%d", pSql, pCmd->command, pRes->code, *(int32_t *)pRes->pRsp, pRes->rspLen); } else { - tscTrace("%p cmd:%d code:%d rsp len:%d", pSql, pCmd->command, pRes->code, pRes->rspLen); + tscTrace("%p cmd:%d code:%s rsp len:%d", pSql, pCmd->command, tstrerror(pRes->code), pRes->rspLen); } } if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql); - + if (rpcMsg->code != TSDB_CODE_ACTION_IN_PROGRESS) { int command = pCmd->command; void *taosres = tscKeepConn[command] ? pSql : NULL; - rpcMsg->code = pRes->code ? -pRes->code : pRes->numOfRows; - - tscTrace("%p Async SQL result:%d res:%p", pSql, rpcMsg->code, taosres); + tscTrace("%p Async SQL result:%s res:%p", pSql, tstrerror(pRes->code), taosres); /* * Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj @@ -2304,6 +2301,7 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) { pRes->precision = htons(pRetrieve->precision); pRes->offset = htobe64(pRetrieve->offset); pRes->useconds = htobe64(pRetrieve->useconds); + pRes->completed = (pRetrieve->completed == 1); pRes->data = pRetrieve->data; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 63612d0f5f21d6cbacc3ef7118a2f2825624db08..4885cf7cc3cc064f05672be40d3c8303b85de67d 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -653,62 +653,6 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) { return pRes->tsrow; } -TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) { - SSqlObj *pSql = (SSqlObj *)res; - SSqlCmd *pCmd = &pSql->cmd; - SSqlRes *pRes = &pSql->res; - - if (pRes->qhandle == 0 || pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) { - return NULL; - } - - if (pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE) { - tscFetchDatablockFromSubquery(pSql); - - if (pRes->code == TSDB_CODE_SUCCESS) { - tscTrace("%p data from all subqueries have been retrieved to client", pSql); - return tscBuildResFromSubqueries(pSql); - } else { - tscTrace("%p retrieve data from subquery failed, code:%d", pSql, pRes->code); - return NULL; - } - - } else if (pRes->row >= pRes->numOfRows) { - /** - * NOT a join query - * - * If the data block of current result set have been consumed already, try fetch next result - * data block from virtual node. - */ - tscResetForNextRetrieve(pRes); - - if (pCmd->command < TSDB_SQL_LOCAL) { - pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; - } - - tscProcessSql(pSql); // retrieve data from virtual node - - // if failed to retrieve data from current virtual node, try next one if exists - if (hasMoreVnodesToTry(pSql)) { - tscTryQueryNextVnode(pSql, NULL); - } - - /* - * local reducer has handle this case, - * so no need to add the pRes->numOfRows for super table query - */ - if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC) { - pRes->numOfTotalInCurrentClause += pRes->numOfRows; - } - - if (pRes->numOfRows == 0) { - return NULL; - } - } - - return doSetResultRowData(pSql); -} - static void asyncFetchCallback(void *param, TAOS_RES *tres, int numOfRows) { SSqlObj* pSql = (SSqlObj*) tres; if (numOfRows < 0) { @@ -729,7 +673,10 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - if (pRes->qhandle == 0 || pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pCmd->command == TSDB_SQL_INSERT) { + if (pRes->qhandle == 0 || + pRes->completed || + pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || + pCmd->command == TSDB_SQL_INSERT) { return NULL; } diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index 9e508045838c814d2159bb5f62779d5453085009..3419128c72647c5a88dd62ab1ff51f80f407e26c 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -93,8 +93,8 @@ void dnodeRead(SRpcMsg *pMsg) { while (leftLen > 0) { SMsgHead *pHead = (SMsgHead *) pCont; - pHead->vgId = 1;//htonl(pHead->vgId); - pHead->contLen = pMsg->contLen; //htonl(pHead->contLen); + pHead->vgId = htonl(pHead->vgId); + pHead->contLen = htonl(pHead->contLen); void *pVnode = dnodeGetVnode(pHead->vgId); if (pVnode == NULL) { @@ -104,12 +104,13 @@ void dnodeRead(SRpcMsg *pMsg) { } // put message into queue - SReadMsg readMsg; - readMsg.rpcMsg = *pMsg; - readMsg.pCont = pCont; - readMsg.contLen = pHead->contLen; - readMsg.pRpcContext = pRpcContext; - readMsg.pVnode = pVnode; + SReadMsg readMsg = { + .rpcMsg = *pMsg, + .pCont = pCont, + .contLen = pHead->contLen, + .pRpcContext = pRpcContext, + .pVnode = pVnode, + }; taos_queue queue = dnodeGetVnodeRworker(pVnode); taosWriteQitem(queue, &readMsg); @@ -177,8 +178,6 @@ static void *dnodeProcessReadQueue(void *param) { } else { terrno = TSDB_CODE_MSG_NOT_PROCESSED; } - - dnodeProcessReadResult(&readMsg); } return NULL; @@ -252,17 +251,19 @@ static void dnodeProcessQueryMsg(SReadMsg *pMsg) { qTableQuery(pQInfo); } +static int32_t c = 0; static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { SRetrieveTableMsg *pRetrieve = pMsg->pCont; void *pQInfo = (void*) htobe64(pRetrieve->qhandle); dTrace("QInfo:%p vgId:%d, retrieve msg is received", pQInfo, pRetrieve->header.vgId); - + if ((++c)%2 == 0) { + int32_t k = 1; + } int32_t rowSize = 0; int32_t numOfRows = 0; int32_t contLen = 0; - SRpcMsg rpcRsp = {0}; SRetrieveTableRsp *pRsp = NULL; int32_t code = qRetrieveQueryResultInfo(pQInfo, &numOfRows, &rowSize); @@ -276,7 +277,7 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { code = qDumpRetrieveResult(pQInfo, &pRsp, &contLen); } - rpcRsp = (SRpcMsg) { + SRpcMsg rpcRsp = (SRpcMsg) { .handle = pMsg->rpcMsg.handle, .pCont = pRsp, .contLen = contLen, @@ -285,4 +286,7 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { }; rpcSendResponse(&rpcRsp); + + //todo merge result should be done here + //dnodeProcessReadResult(&readMsg); } diff --git a/src/query/inc/queryExecutor.h b/src/query/inc/queryExecutor.h index b10d869780e9d26b7bf5af6699b961279562ccd9..4ce606f599b8e04f8db50ab30999ebc1d837c274 100644 --- a/src/query/inc/queryExecutor.h +++ b/src/query/inc/queryExecutor.h @@ -33,7 +33,7 @@ typedef struct SData { } SData; enum { - ST_QUERY_KILLED = 0, // query killed +// ST_QUERY_KILLED = 0, // query killed ST_QUERY_PAUSED = 1, // query paused, due to full of the response buffer ST_QUERY_COMPLETED = 2, // query completed }; @@ -142,8 +142,8 @@ typedef struct SQuery { SResultRec rec; int32_t pos; int64_t pointsOffset; // the number of points offset to save read data - SData** sdata; - int32_t capacity; + SData** sdata; + int32_t capacity; SSingleColumnFilterInfo* pFilterInfo; } SQuery; @@ -170,14 +170,14 @@ typedef struct SQueryRuntimeEnv { } SQueryRuntimeEnv; typedef struct SQInfo { - uint64_t signature; + void* signature; void* pVnode; TSKEY startTime; - int64_t elapsedTime; + TSKEY elapsedTime; SResultRec rec; int32_t pointsInterpo; int32_t code; // error code to returned to client - int32_t killed; // denotes if current query is killed +// int32_t killed; // denotes if current query is killed sem_t dataReady; SArray* pTableIdList; // table list SQueryRuntimeEnv runtimeEnv; diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index 696d8c37c9a9ff638f587651dcffd8bcf132fb4d..cbce5097ec77f538f16d2d24e4f6817a4dd06746 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -64,38 +64,24 @@ typedef struct SPointInterpoSupporter { } SPointInterpoSupporter; typedef enum { - - /* - * the program will call this function again, if this status is set. - * used to transfer from QUERY_RESBUF_FULL - */ + // when query starts to execute, this status will set QUERY_NOT_COMPLETED = 0x1u, - /* - * output buffer is full, so, the next query will be employed, - * in this case, we need to set the appropriated start scan point for - * the next query. - * - * this status is only exist in group-by clause and - * diff/add/division/multiply/ query. + /* result output buffer is full, current query is paused. + * this status is only exist in group-by clause and diff/add/division/multiply/ query. */ QUERY_RESBUF_FULL = 0x2u, - /* - * query is over - * 1. this status is used in one row result query process, e.g., - * count/sum/first/last/ - * avg...etc. - * 2. when the query range on timestamp is satisfied, it is also denoted as - * query_compeleted + /* query is over + * 1. this status is used in one row result query process, e.g., count/sum/first/last/ avg...etc. + * 2. when all data within queried time window, it is also denoted as query_completed */ QUERY_COMPLETED = 0x4u, - - /* - * all data has been scanned, so current search is stopped, - * At last, the function will transfer this status to QUERY_COMPLETED + + /* when the result is not completed return to client, this status will be + * usually used in case of interval query with interpolation option */ - QUERY_NO_DATA_TO_CHECK = 0x8u, + QUERY_OVER = 0x8u, } vnodeQueryStatus; static void setQueryStatus(SQuery *pQuery, int8_t status); @@ -1301,7 +1287,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStat if (pRuntimeEnv->pTSBuf != NULL) { // if timestamp filter list is empty, quit current query if (!tsBufNextPos(pRuntimeEnv->pTSBuf)) { - setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); + setQueryStatus(pQuery, QUERY_COMPLETED); break; } } @@ -1621,10 +1607,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf); } -bool isQueryKilled(SQuery *pQuery) { - return false; - - SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery); +static bool isQueryKilled(SQuery *pQuery) { #if 0 /* * check if the queried meter is going to be deleted. @@ -1638,9 +1621,14 @@ bool isQueryKilled(SQuery *pQuery) { return (pQInfo->killed == 1); #endif + return 0; } +static bool setQueryKilled(SQInfo* pQInfo) { + pQInfo->code = TSDB_CODE_QUERY_CANCELLED; +} + bool isFixedOutputQuery(SQuery *pQuery) { if (pQuery->intervalTime != 0) { return false; @@ -2664,7 +2652,6 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { while (tsdbNextDataBlock(pQueryHandle)) { // check if query is killed or not set the status of query to pass the status check if (isQueryKilled(pQuery)) { - setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return cnt; } @@ -2714,7 +2701,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { } if (isIntervalQuery(pQuery) && IS_MASTER_SCAN(pRuntimeEnv)) { - if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) { + if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { int32_t step = QUERY_IS_ASC_QUERY(pQuery) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP; closeAllTimeWindow(&pRuntimeEnv->windowResInfo); @@ -3631,7 +3618,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) { /* check if query is killed or not */ if (isQueryKilled(pQuery)) { - setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); +// setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return; } } @@ -4111,7 +4098,7 @@ bool vnodeHasRemainResults(void *handle) { } // query has completed - if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) { + if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { TSKEY ekey = taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->intervalTime, pQuery->slidingTimeUnit, pQuery->precision); // int32_t numOfTotal = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY @@ -4272,7 +4259,7 @@ int32_t initQInfo(SQInfo *pQInfo, void *param, void* tsdb) { pQuery->window.ekey, pQuery->order.order); sem_post(&pQInfo->dataReady); - pQInfo->killed = 1; + setQueryStatus(pQuery, QUERY_COMPLETED); return TSDB_CODE_SUCCESS; } @@ -5024,7 +5011,7 @@ static void tableFixedOutputProcessor(SQInfo *pQInfo) { // since the numOfOutputElems must be identical for all sql functions that are allowed to be executed simutanelously. pQuery->rec.pointsRead = getNumOfResult(pRuntimeEnv); // assert(pQuery->pointsRead <= pQuery->pointsToRead && - // Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)); + // Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED)); // must be top/bottom query if offset > 0 if (pQuery->limit.offset > 0) { @@ -5128,7 +5115,7 @@ static void vnodeSingleMeterIntervalMainLooper(SQueryRuntimeEnv *pRuntimeEnv) { pQuery->limit.offset -= c; } - if (Q_STATUS_EQUAL(pQuery->status, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) { + if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { break; } @@ -5178,7 +5165,7 @@ static void tableIntervalProcessor(SQInfo *pQInfo) { pQInfo, (tFilePage **)pQuery->sdata, (tFilePage **)pInterpoBuf, pQuery->rec.pointsRead, &numOfInterpo); dTrace("QInfo: %p interpo completed, final:%d", pQInfo, pQuery->rec.pointsRead); - if (pQuery->rec.pointsRead > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) { + if (pQuery->rec.pointsRead > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { doRevisedResultsByLimit(pQInfo); break; } @@ -5206,17 +5193,20 @@ static void tableIntervalProcessor(SQInfo *pQInfo) { } void qTableQuery(SQInfo *pQInfo) { - assert(pQInfo != NULL); - - if (pQInfo->killed) { - dTrace("QInfo:%p it is already killed, abort", pQInfo); + if (pQInfo == NULL || pQInfo->signature != pQInfo) { + dTrace("%p freed abort query", pQInfo); return; } - + SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - SQuery * pQuery = pRuntimeEnv->pQuery; - - // dTrace("vid:%d sid:%d id:%s, query thread is created, numOfQueries:%d, QInfo:%p", pQInfo); + + SQuery *pQuery = pRuntimeEnv->pQuery; + if (isQueryKilled(pQuery)) { + dTrace("QInfo:%p it is already killed, abort", pQInfo); + return; + } + + dTrace("QInfo:%p query task is launched", pQInfo); if (vnodeHasRemainResults(pQInfo)) { /* @@ -5242,7 +5232,7 @@ void qTableQuery(SQInfo *pQInfo) { } // here we have scan all qualified data in both data file and cache - if (Q_STATUS_EQUAL(pQuery->status, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) { + if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { // continue to get push data from the group result if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->intervalTime > 0 && pQInfo->rec.pointsTotal < pQuery->limit.limit)) { @@ -5303,10 +5293,8 @@ void qTableQuery(SQInfo *pQInfo) { /* check if query is killed or not */ if (isQueryKilled(pQuery)) { dTrace("QInfo:%p query is killed", pQInfo); - // pQInfo->over = 1; } else { - // dTrace("QInfo:%p vid:%d sid:%d id:%s, meter query thread completed, %d points are returned", pQInfo, - // pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead); + dTrace("QInfo:%p query task completed, %d points are returned", pQInfo, pQuery->rec.pointsRead); } sem_post(&pQInfo->dataReady); @@ -5989,21 +5977,16 @@ bool isQInfoValid(void *param) { return (sig == (uint64_t)pQInfo); } -void vnodeFreeQInfo(SQInfo *pQInfo, bool decQueryRef) { +void vnodeFreeQInfo(SQInfo *pQInfo) { if (!isQInfoValid(pQInfo)) { return; } - pQInfo->killed = 1; + SQuery* pQuery = pQInfo->runtimeEnv.pQuery; + setQueryKilled(pQInfo); + dTrace("QInfo:%p start to free SQInfo", pQInfo); - - if (decQueryRef) { - vnodeDecMeterRefcnt(pQInfo); - } - - SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - - for (int col = 0; col < pQuery->numOfOutputCols; ++col) { + for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) { tfree(pQuery->sdata[col]); } @@ -6049,7 +6032,7 @@ void vnodeFreeQInfo(SQInfo *pQInfo, bool decQueryRef) { tfree(pQuery->pGroupbyExpr); tfree(pQuery); - // dTrace("QInfo:%p vid:%d sid:%d meterId:%s, QInfo is freed", pQInfo, pObj->vnode, pObj->sid, pObj->meterId); + dTrace("QInfo:%p QInfo is freed", pQInfo); // destroy signature, in order to avoid the query process pass the object safety check memset(pQInfo, 0, sizeof(SQInfo)); @@ -6105,7 +6088,7 @@ static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyE _error: // table query ref will be decrease during error handling - vnodeFreeQInfo(*pQInfo, false); + vnodeFreeQInfo(*pQInfo); return code; } @@ -6176,28 +6159,25 @@ int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo, int32_t *numOfRows, int32_t *ro if (pQInfo == NULL || !isQInfoValid(pQInfo)) { return TSDB_CODE_INVALID_QHANDLE; } - - if (pQInfo->killed) { + + SQuery *pQuery = pQInfo->runtimeEnv.pQuery; + if (isQueryKilled(pQuery)) { dTrace("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code); if (pQInfo->code == TSDB_CODE_SUCCESS) { return TSDB_CODE_QUERY_CANCELLED; } else { // in case of not TSDB_CODE_SUCCESS, return the code to client - return abs(pQInfo->code); + return (pQInfo->code >= 0)? pQInfo->code:(-pQInfo->code); } } sem_wait(&pQInfo->dataReady); - SQuery *pQuery = pQInfo->runtimeEnv.pQuery; - *numOfRows = pQInfo->rec.pointsRead; *rowsize = pQuery->rowSize; dTrace("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%d", pQInfo, *rowsize, *numOfRows, pQInfo->code); - - if (pQInfo->code < 0) { // less than 0 means there are error existed. - return -pQInfo->code; - } + + return (pQInfo->code >= 0)? pQInfo->code:(-pQInfo->code); } static size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows) { @@ -6250,6 +6230,11 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int32_t *size) { pQInfo->rec.pointsTotal += pQInfo->rec.pointsRead; dTrace("QInfo:%p current:%d, total:%d", pQInfo, pQInfo->rec.pointsRead, pQInfo->rec.pointsTotal); + + setQueryStatus(pQuery, QUERY_COMPLETED); + return TSDB_CODE_SUCCESS; + + // todo if interpolation exists, the result may be dump to client by several rounds } static void addToTaskQueue(SQInfo* pQInfo) { @@ -6261,11 +6246,7 @@ static void addToTaskQueue(SQInfo* pQInfo) { dTrace("QInfo:%p set query flag, sig:%" PRIu64 ", func:%s", pQInfo, pQInfo->signature, __FUNCTION__); #endif - if (pQInfo->killed == 1) { - dTrace("%p freed or killed, abort query", pQInfo); - } else { - // todo add to task queue - } + // todo add to task queue } } @@ -6293,12 +6274,20 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c } if (pQInfo->rec.pointsRead > 0 && code == TSDB_CODE_SUCCESS) { - doDumpQueryResult(pQInfo, (*pRsp)->data, NULL); + code = doDumpQueryResult(pQInfo, (*pRsp)->data, NULL); + + // has more data to return or need next round to execute addToTaskQueue(pQInfo); - return TSDB_CODE_SUCCESS; + } else if (isQueryKilled(pQuery)) { + code = TSDB_CODE_QUERY_CANCELLED; } - assert(code != TSDB_CODE_ACTION_IN_PROGRESS); + if (isQueryKilled(pQuery) || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { + (*pRsp)->completed = 1; // notify no more result to client + vnodeFreeQInfo(pQInfo); + } + + return code; // if (numOfRows == 0 && (pRetrieve->qhandle == (uint64_t)pObj->qhandle) && (code != TSDB_CODE_ACTION_IN_PROGRESS)) { // dTrace("QInfo:%p %s free qhandle code:%d", pObj->qhandle, __FUNCTION__, code); diff --git a/src/util/src/shash.c b/src/util/src/shash.c index 525d00e81e047867701f4dff377013a93ab05f40..da97af84bbc957ba102add1b34bff23d71c91d0e 100644 --- a/src/util/src/shash.c +++ b/src/util/src/shash.c @@ -162,8 +162,6 @@ void taosDeleteStrHash(void *handle, char *string) { if (pObj == NULL || pObj->maxSessions == 0) return; if (string == NULL || string[0] == 0) return; - return; - hash = (*(pObj->hashFp))(pObj, string); pthread_mutex_lock(&pObj->mutex);