提交 de5fb152 编写于 作者: H hjxilinx

[td-32] fix bugs and refactor codes

上级 a6b0e4a9
...@@ -294,20 +294,21 @@ typedef struct SResRec { ...@@ -294,20 +294,21 @@ typedef struct SResRec {
struct STSBuf; struct STSBuf;
typedef struct { typedef struct {
int32_t code;
int64_t numOfRows; // num of results in current retrieved int64_t numOfRows; // num of results in current retrieved
int64_t numOfTotal; // num of total results int64_t numOfTotal; // num of total results
int64_t numOfTotalInCurrentClause; // num of total result in current subclause int64_t numOfTotalInCurrentClause; // num of total result in current subclause
char * pRsp; char * pRsp;
int rspType; int32_t rspType;
int rspLen; int32_t rspLen;
uint64_t qhandle; uint64_t qhandle;
int64_t uid; int64_t uid;
int64_t useconds; int64_t useconds;
int64_t offset; // offset value from vnode during projection query of stable int64_t offset; // offset value from vnode during projection query of stable
int row; int32_t row;
int16_t numOfCols; int16_t numOfCols;
int16_t precision; int16_t precision;
bool completed;
int32_t code;
int32_t numOfGroups; int32_t numOfGroups;
SResRec * pGroupRec; SResRec * pGroupRec;
char * data; char * data;
......
...@@ -209,7 +209,6 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -209,7 +209,6 @@ int tscSendMsgToServer(SSqlObj *pSql) {
} }
void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
tscTrace("response:%s is received, len:%d error:%s", taosMsg[rpcMsg->msgType], rpcMsg->contLen, tstrerror(rpcMsg->code));
SSqlObj *pSql = (SSqlObj *)rpcMsg->handle; SSqlObj *pSql = (SSqlObj *)rpcMsg->handle;
if (pSql == NULL || pSql->signature != pSql) { if (pSql == NULL || pSql->signature != pSql) {
tscError("%p sql is already released, signature:%p", pSql, pSql->signature); tscError("%p sql is already released, signature:%p", pSql, pSql->signature);
...@@ -256,7 +255,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { ...@@ -256,7 +255,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
return; return;
} else { } else {
tscTrace("%p it shall renew meter meta, code:%d", pSql, rpcMsg->code); tscTrace("%p it shall renew meter meta, code:%d", pSql, tstrerror(rpcMsg->code));
pSql->maxRetry = TSDB_VNODES_SUPPORT * 2; pSql->maxRetry = TSDB_VNODES_SUPPORT * 2;
pSql->res.code = rpcMsg->code; // keep the previous error code pSql->res.code = rpcMsg->code; // keep the previous error code
...@@ -278,7 +277,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { ...@@ -278,7 +277,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
if (pRes->code != TSDB_CODE_QUERY_CANCELLED) { if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_NETWORK_UNAVAIL; pRes->code = (rpcMsg->code != TSDB_CODE_SUCCESS) ? rpcMsg->code : TSDB_CODE_NETWORK_UNAVAIL;
} else { } else {
tscTrace("%p query is cancelled, code:%d", pSql, pRes->code); tscTrace("%p query is cancelled, code:%d", pSql, tstrerror(pRes->code));
} }
if (pRes->code != TSDB_CODE_QUERY_CANCELLED) { if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
...@@ -318,19 +317,17 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { ...@@ -318,19 +317,17 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
tscTrace("%p cmd:%d code:%d, inserted rows:%d, rsp len:%d", pSql, pCmd->command, pRes->code, tscTrace("%p cmd:%d code:%d, inserted rows:%d, rsp len:%d", pSql, pCmd->command, pRes->code,
*(int32_t *)pRes->pRsp, pRes->rspLen); *(int32_t *)pRes->pRsp, pRes->rspLen);
} else { } else {
tscTrace("%p cmd:%d code:%d rsp len:%d", pSql, pCmd->command, pRes->code, pRes->rspLen); tscTrace("%p cmd:%d code:%s rsp len:%d", pSql, pCmd->command, tstrerror(pRes->code), pRes->rspLen);
} }
} }
if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command])
rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql); rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
if (rpcMsg->code != TSDB_CODE_ACTION_IN_PROGRESS) { if (rpcMsg->code != TSDB_CODE_ACTION_IN_PROGRESS) {
int command = pCmd->command; int command = pCmd->command;
void *taosres = tscKeepConn[command] ? pSql : NULL; void *taosres = tscKeepConn[command] ? pSql : NULL;
rpcMsg->code = pRes->code ? -pRes->code : pRes->numOfRows; tscTrace("%p Async SQL result:%s res:%p", pSql, tstrerror(pRes->code), taosres);
tscTrace("%p Async SQL result:%d res:%p", pSql, rpcMsg->code, taosres);
/* /*
* Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj * Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj
...@@ -2304,6 +2301,7 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) { ...@@ -2304,6 +2301,7 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) {
pRes->precision = htons(pRetrieve->precision); pRes->precision = htons(pRetrieve->precision);
pRes->offset = htobe64(pRetrieve->offset); pRes->offset = htobe64(pRetrieve->offset);
pRes->useconds = htobe64(pRetrieve->useconds); pRes->useconds = htobe64(pRetrieve->useconds);
pRes->completed = (pRetrieve->completed == 1);
pRes->data = pRetrieve->data; pRes->data = pRetrieve->data;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
......
...@@ -729,7 +729,8 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { ...@@ -729,7 +729,8 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
if (pRes->qhandle == 0 || pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pCmd->command == TSDB_SQL_INSERT) { if (pRes->qhandle == 0 || pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pCmd->command == TSDB_SQL_INSERT ||
pRes->completed) {
return NULL; return NULL;
} }
......
...@@ -93,8 +93,8 @@ void dnodeRead(SRpcMsg *pMsg) { ...@@ -93,8 +93,8 @@ void dnodeRead(SRpcMsg *pMsg) {
while (leftLen > 0) { while (leftLen > 0) {
SMsgHead *pHead = (SMsgHead *) pCont; SMsgHead *pHead = (SMsgHead *) pCont;
pHead->vgId = 1;//htonl(pHead->vgId); pHead->vgId = htonl(pHead->vgId);
pHead->contLen = pMsg->contLen; //htonl(pHead->contLen); pHead->contLen = htonl(pHead->contLen);
void *pVnode = dnodeGetVnode(pHead->vgId); void *pVnode = dnodeGetVnode(pHead->vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
...@@ -104,12 +104,13 @@ void dnodeRead(SRpcMsg *pMsg) { ...@@ -104,12 +104,13 @@ void dnodeRead(SRpcMsg *pMsg) {
} }
// put message into queue // put message into queue
SReadMsg readMsg; SReadMsg readMsg = {
readMsg.rpcMsg = *pMsg; .rpcMsg = *pMsg,
readMsg.pCont = pCont; .pCont = pCont,
readMsg.contLen = pHead->contLen; .contLen = pHead->contLen,
readMsg.pRpcContext = pRpcContext; .pRpcContext = pRpcContext,
readMsg.pVnode = pVnode; .pVnode = pVnode,
};
taos_queue queue = dnodeGetVnodeRworker(pVnode); taos_queue queue = dnodeGetVnodeRworker(pVnode);
taosWriteQitem(queue, &readMsg); taosWriteQitem(queue, &readMsg);
...@@ -177,8 +178,6 @@ static void *dnodeProcessReadQueue(void *param) { ...@@ -177,8 +178,6 @@ static void *dnodeProcessReadQueue(void *param) {
} else { } else {
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
} }
dnodeProcessReadResult(&readMsg);
} }
return NULL; return NULL;
...@@ -252,17 +251,19 @@ static void dnodeProcessQueryMsg(SReadMsg *pMsg) { ...@@ -252,17 +251,19 @@ static void dnodeProcessQueryMsg(SReadMsg *pMsg) {
qTableQuery(pQInfo); qTableQuery(pQInfo);
} }
static int32_t c = 0;
static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
SRetrieveTableMsg *pRetrieve = pMsg->pCont; SRetrieveTableMsg *pRetrieve = pMsg->pCont;
void *pQInfo = (void*) htobe64(pRetrieve->qhandle); void *pQInfo = (void*) htobe64(pRetrieve->qhandle);
dTrace("QInfo:%p vgId:%d, retrieve msg is received", pQInfo, pRetrieve->header.vgId); dTrace("QInfo:%p vgId:%d, retrieve msg is received", pQInfo, pRetrieve->header.vgId);
if ((++c)%2 == 0) {
int32_t k = 1;
}
int32_t rowSize = 0; int32_t rowSize = 0;
int32_t numOfRows = 0; int32_t numOfRows = 0;
int32_t contLen = 0; int32_t contLen = 0;
SRpcMsg rpcRsp = {0};
SRetrieveTableRsp *pRsp = NULL; SRetrieveTableRsp *pRsp = NULL;
int32_t code = qRetrieveQueryResultInfo(pQInfo, &numOfRows, &rowSize); int32_t code = qRetrieveQueryResultInfo(pQInfo, &numOfRows, &rowSize);
...@@ -276,7 +277,7 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { ...@@ -276,7 +277,7 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
code = qDumpRetrieveResult(pQInfo, &pRsp, &contLen); code = qDumpRetrieveResult(pQInfo, &pRsp, &contLen);
} }
rpcRsp = (SRpcMsg) { SRpcMsg rpcRsp = (SRpcMsg) {
.handle = pMsg->rpcMsg.handle, .handle = pMsg->rpcMsg.handle,
.pCont = pRsp, .pCont = pRsp,
.contLen = contLen, .contLen = contLen,
...@@ -285,4 +286,7 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { ...@@ -285,4 +286,7 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
}; };
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
//todo merge result should be done here
//dnodeProcessReadResult(&readMsg);
} }
...@@ -33,7 +33,7 @@ typedef struct SData { ...@@ -33,7 +33,7 @@ typedef struct SData {
} SData; } SData;
enum { enum {
ST_QUERY_KILLED = 0, // query killed // ST_QUERY_KILLED = 0, // query killed
ST_QUERY_PAUSED = 1, // query paused, due to full of the response buffer ST_QUERY_PAUSED = 1, // query paused, due to full of the response buffer
ST_QUERY_COMPLETED = 2, // query completed ST_QUERY_COMPLETED = 2, // query completed
}; };
...@@ -142,8 +142,8 @@ typedef struct SQuery { ...@@ -142,8 +142,8 @@ typedef struct SQuery {
SResultRec rec; SResultRec rec;
int32_t pos; int32_t pos;
int64_t pointsOffset; // the number of points offset to save read data int64_t pointsOffset; // the number of points offset to save read data
SData** sdata; SData** sdata;
int32_t capacity; int32_t capacity;
SSingleColumnFilterInfo* pFilterInfo; SSingleColumnFilterInfo* pFilterInfo;
} SQuery; } SQuery;
...@@ -170,14 +170,14 @@ typedef struct SQueryRuntimeEnv { ...@@ -170,14 +170,14 @@ typedef struct SQueryRuntimeEnv {
} SQueryRuntimeEnv; } SQueryRuntimeEnv;
typedef struct SQInfo { typedef struct SQInfo {
uint64_t signature; void* signature;
void* pVnode; void* pVnode;
TSKEY startTime; TSKEY startTime;
int64_t elapsedTime; TSKEY elapsedTime;
SResultRec rec; SResultRec rec;
int32_t pointsInterpo; int32_t pointsInterpo;
int32_t code; // error code to returned to client int32_t code; // error code to returned to client
int32_t killed; // denotes if current query is killed // int32_t killed; // denotes if current query is killed
sem_t dataReady; sem_t dataReady;
SArray* pTableIdList; // table list SArray* pTableIdList; // table list
SQueryRuntimeEnv runtimeEnv; SQueryRuntimeEnv runtimeEnv;
......
...@@ -64,38 +64,24 @@ typedef struct SPointInterpoSupporter { ...@@ -64,38 +64,24 @@ typedef struct SPointInterpoSupporter {
} SPointInterpoSupporter; } SPointInterpoSupporter;
typedef enum { typedef enum {
// when query starts to execute, this status will set
/*
* the program will call this function again, if this status is set.
* used to transfer from QUERY_RESBUF_FULL
*/
QUERY_NOT_COMPLETED = 0x1u, QUERY_NOT_COMPLETED = 0x1u,
/* /* result output buffer is full, current query is paused.
* output buffer is full, so, the next query will be employed, * this status is only exist in group-by clause and diff/add/division/multiply/ query.
* in this case, we need to set the appropriated start scan point for
* the next query.
*
* this status is only exist in group-by clause and
* diff/add/division/multiply/ query.
*/ */
QUERY_RESBUF_FULL = 0x2u, QUERY_RESBUF_FULL = 0x2u,
/* /* query is over
* query is over * 1. this status is used in one row result query process, e.g., count/sum/first/last/ avg...etc.
* 1. this status is used in one row result query process, e.g., * 2. when all data within queried time window, it is also denoted as query_completed
* count/sum/first/last/
* avg...etc.
* 2. when the query range on timestamp is satisfied, it is also denoted as
* query_compeleted
*/ */
QUERY_COMPLETED = 0x4u, QUERY_COMPLETED = 0x4u,
/* /* when the result is not completed return to client, this status will be
* all data has been scanned, so current search is stopped, * usually used in case of interval query with interpolation option
* At last, the function will transfer this status to QUERY_COMPLETED
*/ */
QUERY_NO_DATA_TO_CHECK = 0x8u, QUERY_OVER = 0x8u,
} vnodeQueryStatus; } vnodeQueryStatus;
static void setQueryStatus(SQuery *pQuery, int8_t status); static void setQueryStatus(SQuery *pQuery, int8_t status);
...@@ -1301,7 +1287,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStat ...@@ -1301,7 +1287,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStat
if (pRuntimeEnv->pTSBuf != NULL) { if (pRuntimeEnv->pTSBuf != NULL) {
// if timestamp filter list is empty, quit current query // if timestamp filter list is empty, quit current query
if (!tsBufNextPos(pRuntimeEnv->pTSBuf)) { if (!tsBufNextPos(pRuntimeEnv->pTSBuf)) {
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); setQueryStatus(pQuery, QUERY_COMPLETED);
break; break;
} }
} }
...@@ -1621,10 +1607,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -1621,10 +1607,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf); pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf);
} }
bool isQueryKilled(SQuery *pQuery) { static bool isQueryKilled(SQuery *pQuery) {
return false;
SQInfo *pQInfo = (SQInfo *)GET_QINFO_ADDR(pQuery);
#if 0 #if 0
/* /*
* check if the queried meter is going to be deleted. * check if the queried meter is going to be deleted.
...@@ -1638,9 +1621,14 @@ bool isQueryKilled(SQuery *pQuery) { ...@@ -1638,9 +1621,14 @@ bool isQueryKilled(SQuery *pQuery) {
return (pQInfo->killed == 1); return (pQInfo->killed == 1);
#endif #endif
return 0; return 0;
} }
static bool setQueryKilled(SQInfo* pQInfo) {
pQInfo->code = TSDB_CODE_QUERY_CANCELLED;
}
bool isFixedOutputQuery(SQuery *pQuery) { bool isFixedOutputQuery(SQuery *pQuery) {
if (pQuery->intervalTime != 0) { if (pQuery->intervalTime != 0) {
return false; return false;
...@@ -2664,7 +2652,6 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2664,7 +2652,6 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
while (tsdbNextDataBlock(pQueryHandle)) { while (tsdbNextDataBlock(pQueryHandle)) {
// check if query is killed or not set the status of query to pass the status check // check if query is killed or not set the status of query to pass the status check
if (isQueryKilled(pQuery)) { if (isQueryKilled(pQuery)) {
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
return cnt; return cnt;
} }
...@@ -2714,7 +2701,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -2714,7 +2701,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
} }
if (isIntervalQuery(pQuery) && IS_MASTER_SCAN(pRuntimeEnv)) { if (isIntervalQuery(pQuery) && IS_MASTER_SCAN(pRuntimeEnv)) {
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
int32_t step = QUERY_IS_ASC_QUERY(pQuery) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP; int32_t step = QUERY_IS_ASC_QUERY(pQuery) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP;
closeAllTimeWindow(&pRuntimeEnv->windowResInfo); closeAllTimeWindow(&pRuntimeEnv->windowResInfo);
...@@ -3631,7 +3618,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -3631,7 +3618,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) {
/* check if query is killed or not */ /* check if query is killed or not */
if (isQueryKilled(pQuery)) { if (isQueryKilled(pQuery)) {
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); // setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
return; return;
} }
} }
...@@ -4111,7 +4098,7 @@ bool vnodeHasRemainResults(void *handle) { ...@@ -4111,7 +4098,7 @@ bool vnodeHasRemainResults(void *handle) {
} }
// query has completed // query has completed
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
TSKEY ekey = taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->intervalTime, TSKEY ekey = taosGetRevisedEndKey(pQuery->window.ekey, pQuery->order.order, pQuery->intervalTime,
pQuery->slidingTimeUnit, pQuery->precision); pQuery->slidingTimeUnit, pQuery->precision);
// int32_t numOfTotal = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY // int32_t numOfTotal = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY
...@@ -4272,7 +4259,7 @@ int32_t initQInfo(SQInfo *pQInfo, void *param, void* tsdb) { ...@@ -4272,7 +4259,7 @@ int32_t initQInfo(SQInfo *pQInfo, void *param, void* tsdb) {
pQuery->window.ekey, pQuery->order.order); pQuery->window.ekey, pQuery->order.order);
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
pQInfo->killed = 1; setQueryStatus(pQuery, QUERY_COMPLETED);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -5024,7 +5011,7 @@ static void tableFixedOutputProcessor(SQInfo *pQInfo) { ...@@ -5024,7 +5011,7 @@ static void tableFixedOutputProcessor(SQInfo *pQInfo) {
// since the numOfOutputElems must be identical for all sql functions that are allowed to be executed simutanelously. // since the numOfOutputElems must be identical for all sql functions that are allowed to be executed simutanelously.
pQuery->rec.pointsRead = getNumOfResult(pRuntimeEnv); pQuery->rec.pointsRead = getNumOfResult(pRuntimeEnv);
// assert(pQuery->pointsRead <= pQuery->pointsToRead && // assert(pQuery->pointsRead <= pQuery->pointsToRead &&
// Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)); // Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED));
// must be top/bottom query if offset > 0 // must be top/bottom query if offset > 0
if (pQuery->limit.offset > 0) { if (pQuery->limit.offset > 0) {
...@@ -5128,7 +5115,7 @@ static void vnodeSingleMeterIntervalMainLooper(SQueryRuntimeEnv *pRuntimeEnv) { ...@@ -5128,7 +5115,7 @@ static void vnodeSingleMeterIntervalMainLooper(SQueryRuntimeEnv *pRuntimeEnv) {
pQuery->limit.offset -= c; pQuery->limit.offset -= c;
} }
if (Q_STATUS_EQUAL(pQuery->status, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
break; break;
} }
...@@ -5178,7 +5165,7 @@ static void tableIntervalProcessor(SQInfo *pQInfo) { ...@@ -5178,7 +5165,7 @@ static void tableIntervalProcessor(SQInfo *pQInfo) {
pQInfo, (tFilePage **)pQuery->sdata, (tFilePage **)pInterpoBuf, pQuery->rec.pointsRead, &numOfInterpo); pQInfo, (tFilePage **)pQuery->sdata, (tFilePage **)pInterpoBuf, pQuery->rec.pointsRead, &numOfInterpo);
dTrace("QInfo: %p interpo completed, final:%d", pQInfo, pQuery->rec.pointsRead); dTrace("QInfo: %p interpo completed, final:%d", pQInfo, pQuery->rec.pointsRead);
if (pQuery->rec.pointsRead > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) { if (pQuery->rec.pointsRead > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
doRevisedResultsByLimit(pQInfo); doRevisedResultsByLimit(pQInfo);
break; break;
} }
...@@ -5206,17 +5193,20 @@ static void tableIntervalProcessor(SQInfo *pQInfo) { ...@@ -5206,17 +5193,20 @@ static void tableIntervalProcessor(SQInfo *pQInfo) {
} }
void qTableQuery(SQInfo *pQInfo) { void qTableQuery(SQInfo *pQInfo) {
assert(pQInfo != NULL); if (pQInfo == NULL || pQInfo->signature != pQInfo) {
dTrace("%p freed abort query", pQInfo);
if (pQInfo->killed) {
dTrace("QInfo:%p it is already killed, abort", pQInfo);
return; return;
} }
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
SQuery *pQuery = pRuntimeEnv->pQuery;
// dTrace("vid:%d sid:%d id:%s, query thread is created, numOfQueries:%d, QInfo:%p", pQInfo); if (isQueryKilled(pQuery)) {
dTrace("QInfo:%p it is already killed, abort", pQInfo);
return;
}
dTrace("QInfo:%p query task is launched", pQInfo);
if (vnodeHasRemainResults(pQInfo)) { if (vnodeHasRemainResults(pQInfo)) {
/* /*
...@@ -5242,7 +5232,7 @@ void qTableQuery(SQInfo *pQInfo) { ...@@ -5242,7 +5232,7 @@ void qTableQuery(SQInfo *pQInfo) {
} }
// here we have scan all qualified data in both data file and cache // here we have scan all qualified data in both data file and cache
if (Q_STATUS_EQUAL(pQuery->status, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
// continue to get push data from the group result // continue to get push data from the group result
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || if (isGroupbyNormalCol(pQuery->pGroupbyExpr) ||
(pQuery->intervalTime > 0 && pQInfo->rec.pointsTotal < pQuery->limit.limit)) { (pQuery->intervalTime > 0 && pQInfo->rec.pointsTotal < pQuery->limit.limit)) {
...@@ -5303,10 +5293,8 @@ void qTableQuery(SQInfo *pQInfo) { ...@@ -5303,10 +5293,8 @@ void qTableQuery(SQInfo *pQInfo) {
/* check if query is killed or not */ /* check if query is killed or not */
if (isQueryKilled(pQuery)) { if (isQueryKilled(pQuery)) {
dTrace("QInfo:%p query is killed", pQInfo); dTrace("QInfo:%p query is killed", pQInfo);
// pQInfo->over = 1;
} else { } else {
// dTrace("QInfo:%p vid:%d sid:%d id:%s, meter query thread completed, %d points are returned", pQInfo, dTrace("QInfo:%p query task completed, %d points are returned", pQInfo, pQuery->rec.pointsRead);
// pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead);
} }
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
...@@ -5989,21 +5977,16 @@ bool isQInfoValid(void *param) { ...@@ -5989,21 +5977,16 @@ bool isQInfoValid(void *param) {
return (sig == (uint64_t)pQInfo); return (sig == (uint64_t)pQInfo);
} }
void vnodeFreeQInfo(SQInfo *pQInfo, bool decQueryRef) { void vnodeFreeQInfo(SQInfo *pQInfo) {
if (!isQInfoValid(pQInfo)) { if (!isQInfoValid(pQInfo)) {
return; return;
} }
pQInfo->killed = 1; SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
setQueryKilled(pQInfo);
dTrace("QInfo:%p start to free SQInfo", pQInfo); dTrace("QInfo:%p start to free SQInfo", pQInfo);
for (int32_t col = 0; col < pQuery->numOfOutputCols; ++col) {
if (decQueryRef) {
vnodeDecMeterRefcnt(pQInfo);
}
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
for (int col = 0; col < pQuery->numOfOutputCols; ++col) {
tfree(pQuery->sdata[col]); tfree(pQuery->sdata[col]);
} }
...@@ -6049,7 +6032,7 @@ void vnodeFreeQInfo(SQInfo *pQInfo, bool decQueryRef) { ...@@ -6049,7 +6032,7 @@ void vnodeFreeQInfo(SQInfo *pQInfo, bool decQueryRef) {
tfree(pQuery->pGroupbyExpr); tfree(pQuery->pGroupbyExpr);
tfree(pQuery); tfree(pQuery);
// dTrace("QInfo:%p vid:%d sid:%d meterId:%s, QInfo is freed", pQInfo, pObj->vnode, pObj->sid, pObj->meterId); dTrace("QInfo:%p QInfo is freed", pQInfo);
// destroy signature, in order to avoid the query process pass the object safety check // destroy signature, in order to avoid the query process pass the object safety check
memset(pQInfo, 0, sizeof(SQInfo)); memset(pQInfo, 0, sizeof(SQInfo));
...@@ -6105,7 +6088,7 @@ static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyE ...@@ -6105,7 +6088,7 @@ static int32_t createQInfo(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyE
_error: _error:
// table query ref will be decrease during error handling // table query ref will be decrease during error handling
vnodeFreeQInfo(*pQInfo, false); vnodeFreeQInfo(*pQInfo);
return code; return code;
} }
...@@ -6176,28 +6159,25 @@ int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo, int32_t *numOfRows, int32_t *ro ...@@ -6176,28 +6159,25 @@ int32_t qRetrieveQueryResultInfo(SQInfo *pQInfo, int32_t *numOfRows, int32_t *ro
if (pQInfo == NULL || !isQInfoValid(pQInfo)) { if (pQInfo == NULL || !isQInfoValid(pQInfo)) {
return TSDB_CODE_INVALID_QHANDLE; return TSDB_CODE_INVALID_QHANDLE;
} }
if (pQInfo->killed) { SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
if (isQueryKilled(pQuery)) {
dTrace("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code); dTrace("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code);
if (pQInfo->code == TSDB_CODE_SUCCESS) { if (pQInfo->code == TSDB_CODE_SUCCESS) {
return TSDB_CODE_QUERY_CANCELLED; return TSDB_CODE_QUERY_CANCELLED;
} else { // in case of not TSDB_CODE_SUCCESS, return the code to client } else { // in case of not TSDB_CODE_SUCCESS, return the code to client
return abs(pQInfo->code); return (pQInfo->code >= 0)? pQInfo->code:(-pQInfo->code);
} }
} }
sem_wait(&pQInfo->dataReady); sem_wait(&pQInfo->dataReady);
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
*numOfRows = pQInfo->rec.pointsRead; *numOfRows = pQInfo->rec.pointsRead;
*rowsize = pQuery->rowSize; *rowsize = pQuery->rowSize;
dTrace("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%d", pQInfo, *rowsize, *numOfRows, pQInfo->code); dTrace("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%d", pQInfo, *rowsize, *numOfRows, pQInfo->code);
if (pQInfo->code < 0) { // less than 0 means there are error existed. return (pQInfo->code >= 0)? pQInfo->code:(-pQInfo->code);
return -pQInfo->code;
}
} }
static size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows) { static size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows) {
...@@ -6250,6 +6230,11 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int32_t *size) { ...@@ -6250,6 +6230,11 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int32_t *size) {
pQInfo->rec.pointsTotal += pQInfo->rec.pointsRead; pQInfo->rec.pointsTotal += pQInfo->rec.pointsRead;
dTrace("QInfo:%p current:%d, total:%d", pQInfo, pQInfo->rec.pointsRead, pQInfo->rec.pointsTotal); dTrace("QInfo:%p current:%d, total:%d", pQInfo, pQInfo->rec.pointsRead, pQInfo->rec.pointsTotal);
setQueryStatus(pQuery, QUERY_COMPLETED);
return TSDB_CODE_SUCCESS;
// todo if interpolation exists, the result may be dump to client by several rounds
} }
static void addToTaskQueue(SQInfo* pQInfo) { static void addToTaskQueue(SQInfo* pQInfo) {
...@@ -6261,11 +6246,7 @@ static void addToTaskQueue(SQInfo* pQInfo) { ...@@ -6261,11 +6246,7 @@ static void addToTaskQueue(SQInfo* pQInfo) {
dTrace("QInfo:%p set query flag, sig:%" PRIu64 ", func:%s", pQInfo, pQInfo->signature, __FUNCTION__); dTrace("QInfo:%p set query flag, sig:%" PRIu64 ", func:%s", pQInfo, pQInfo->signature, __FUNCTION__);
#endif #endif
if (pQInfo->killed == 1) { // todo add to task queue
dTrace("%p freed or killed, abort query", pQInfo);
} else {
// todo add to task queue
}
} }
} }
...@@ -6293,12 +6274,20 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c ...@@ -6293,12 +6274,20 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c
} }
if (pQInfo->rec.pointsRead > 0 && code == TSDB_CODE_SUCCESS) { if (pQInfo->rec.pointsRead > 0 && code == TSDB_CODE_SUCCESS) {
doDumpQueryResult(pQInfo, (*pRsp)->data, NULL); code = doDumpQueryResult(pQInfo, (*pRsp)->data, NULL);
// has more data to return or need next round to execute
addToTaskQueue(pQInfo); addToTaskQueue(pQInfo);
return TSDB_CODE_SUCCESS; } else if (isQueryKilled(pQuery)) {
code = TSDB_CODE_QUERY_CANCELLED;
} }
assert(code != TSDB_CODE_ACTION_IN_PROGRESS); if (isQueryKilled(pQuery) || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
(*pRsp)->completed = 1; // notify no more result to client
vnodeFreeQInfo(pQInfo);
}
return code;
// if (numOfRows == 0 && (pRetrieve->qhandle == (uint64_t)pObj->qhandle) && (code != TSDB_CODE_ACTION_IN_PROGRESS)) { // if (numOfRows == 0 && (pRetrieve->qhandle == (uint64_t)pObj->qhandle) && (code != TSDB_CODE_ACTION_IN_PROGRESS)) {
// dTrace("QInfo:%p %s free qhandle code:%d", pObj->qhandle, __FUNCTION__, code); // dTrace("QInfo:%p %s free qhandle code:%d", pObj->qhandle, __FUNCTION__, code);
......
...@@ -162,8 +162,6 @@ void taosDeleteStrHash(void *handle, char *string) { ...@@ -162,8 +162,6 @@ void taosDeleteStrHash(void *handle, char *string) {
if (pObj == NULL || pObj->maxSessions == 0) return; if (pObj == NULL || pObj->maxSessions == 0) return;
if (string == NULL || string[0] == 0) return; if (string == NULL || string[0] == 0) return;
return;
hash = (*(pObj->hashFp))(pObj, string); hash = (*(pObj->hashFp))(pObj, string);
pthread_mutex_lock(&pObj->mutex); pthread_mutex_lock(&pObj->mutex);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册