提交 42b23e64 编写于 作者: H Haojun Liao

fix(query): stop tsdb reader asap.

上级 dbf28b43
......@@ -21,10 +21,9 @@
#define getCurrentKeyInLastBlock(_r) ((_r)->currentKey)
typedef enum {
READER_STATUS_SUSPEND = 0x1,
READER_STATUS_SHOULD_STOP = 0x2,
READER_STATUS_NORMAL = 0x3,
} EReaderExecStatus;
READER_STATUS_SUSPEND = 0x1,
READER_STATUS_NORMAL = 0x2,
} EReaderStatus;
typedef enum {
EXTERNAL_ROWS_PREV = 0x1,
......@@ -184,6 +183,7 @@ typedef struct STsdbReaderAttr {
STimeWindow window;
bool freeBlock;
SVersionRange verRange;
int16_t order;
} STsdbReaderAttr;
typedef struct SResultBlockInfo {
......@@ -196,7 +196,8 @@ struct STsdbReader {
STsdb* pTsdb;
SVersionRange verRange;
TdThreadMutex readerMutex;
EReaderExecStatus flag;
EReaderStatus flag;
int32_t code;
uint64_t suid;
int16_t order;
EReadMode readMode;
......@@ -2995,9 +2996,9 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
while (1) {
// only check here, since the iterate data in memory is very fast.
if (pReader->flag == READER_STATUS_SHOULD_STOP) {
tsdbWarn("tsdb reader is stopped ASAP, %s", pReader->idStr);
return TSDB_CODE_SUCCESS;
if (pReader->code != TSDB_CODE_SUCCESS) {
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr);
return pReader->code;
}
bool hasNext = false;
......@@ -3093,9 +3094,9 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
while (1) {
if (pReader->flag == READER_STATUS_SHOULD_STOP) {
tsdbWarn("tsdb reader is stopped ASAP, %s", pReader->idStr);
return TSDB_CODE_SUCCESS;
if (pReader->code == TSDB_CODE_SUCCESS) {
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr);
return pReader->code;
}
// load the last data block of current table
......@@ -3246,7 +3247,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
}
}
return code;
return (pReader->code != TSDB_CODE_SUCCESS)? pReader->code:code;
}
static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReader) {
......@@ -3395,9 +3396,9 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
STableUidList* pUidList = &pStatus->uidList;
while (1) {
if (pReader->flag == READER_STATUS_SHOULD_STOP) {
tsdbWarn("tsdb reader is stopped ASAP, %s", pReader->idStr);
return TSDB_CODE_SUCCESS;
if (pReader->code == TSDB_CODE_SUCCESS) {
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr);
return pReader->code;
}
STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter;
......@@ -3493,7 +3494,7 @@ static ERetrieveType doReadDataFromLastFiles(STsdbReader* pReader) {
terrno = 0;
code = doLoadLastBlockSequentially(pReader);
if (code != TSDB_CODE_SUCCESS || pReader->flag == READER_STATUS_SHOULD_STOP) {
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return TSDB_READ_RETURN;
}
......@@ -3507,8 +3508,7 @@ static ERetrieveType doReadDataFromLastFiles(STsdbReader* pReader) {
code = initForFirstBlockInFile(pReader, pBlockIter);
// error happens or all the data files are completely checked
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false) ||
pReader->flag == READER_STATUS_SHOULD_STOP) {
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
terrno = code;
return TSDB_READ_RETURN;
}
......@@ -3536,13 +3536,9 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
}
code = doBuildDataBlock(pReader);
if (code != TSDB_CODE_SUCCESS || pReader->flag == READER_STATUS_SHOULD_STOP) {
if (code != TSDB_CODE_SUCCESS || pResBlock->info.rows > 0) {
return code;
}
if (pResBlock->info.rows > 0) {
return TSDB_CODE_SUCCESS;
}
}
while (1) {
......@@ -3581,13 +3577,9 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
code = doBuildDataBlock(pReader);
}
if (code != TSDB_CODE_SUCCESS || pReader->flag == READER_STATUS_SHOULD_STOP) {
if (code != TSDB_CODE_SUCCESS || pResBlock->info.rows > 0) {
return code;
}
if (pResBlock->info.rows > 0) {
return TSDB_CODE_SUCCESS;
}
}
}
......@@ -4849,8 +4841,8 @@ int32_t tsdbNextDataBlock(STsdbReader* pReader, bool* hasNext) {
*hasNext = false;
if (isEmptyQueryTimeWindow(&pReader->window) || pReader->step == EXTERNAL_ROWS_NEXT) {
return code;
if (isEmptyQueryTimeWindow(&pReader->window) || pReader->step == EXTERNAL_ROWS_NEXT || pReader->code != TSDB_CODE_SUCCESS) {
return (pReader->code != TSDB_CODE_SUCCESS)? pReader->code:code;
}
SReaderStatus* pStatus = &pReader->status;
......@@ -5456,4 +5448,4 @@ void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) {
pReader->idStr = taosStrdup(idstr);
}
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { pReader->flag = READER_STATUS_SHOULD_STOP; }
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED; }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册