未验证 提交 a986d70b 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #19260 from taosdata/fix/long_query

fix(query): long query fix to avoid buffer pool locking too long
...@@ -216,6 +216,7 @@ int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver); ...@@ -216,6 +216,7 @@ int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver);
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo); int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
int32_t qStreamRestoreParam(qTaskInfo_t tinfo); int32_t qStreamRestoreParam(qTaskInfo_t tinfo);
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo); bool qStreamRecoverScanFinished(qTaskInfo_t tinfo);
void qStreamCloseTsdbReader(void* task);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -54,7 +54,7 @@ int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs); ...@@ -54,7 +54,7 @@ int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs);
void vnodeDestroy(const char *path, STfs *pTfs); void vnodeDestroy(const char *path, STfs *pTfs);
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
void vnodePreClose(SVnode *pVnode); void vnodePreClose(SVnode *pVnode);
void vnodeSyncCheckTimeout(SVnode* pVnode); void vnodeSyncCheckTimeout(SVnode *pVnode);
void vnodeClose(SVnode *pVnode); void vnodeClose(SVnode *pVnode);
int32_t vnodeStart(SVnode *pVnode); int32_t vnodeStart(SVnode *pVnode);
...@@ -175,7 +175,8 @@ int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableL ...@@ -175,7 +175,8 @@ int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableL
void tsdbReaderClose(STsdbReader *pReader); void tsdbReaderClose(STsdbReader *pReader);
bool tsdbNextDataBlock(STsdbReader *pReader); bool tsdbNextDataBlock(STsdbReader *pReader);
void tsdbRetrieveDataBlockInfo(const STsdbReader *pReader, int32_t *rows, uint64_t *uid, STimeWindow *pWindow); void tsdbRetrieveDataBlockInfo(const STsdbReader *pReader, int32_t *rows, uint64_t *uid, STimeWindow *pWindow);
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock* pDataBlock, bool *allHave); int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave);
void tsdbReleaseDataBlock(STsdbReader *pReader);
SSDataBlock *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList); SSDataBlock *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond); int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo); int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
...@@ -185,7 +186,7 @@ void *tsdbGetIvtIdx(SMeta *pMeta); ...@@ -185,7 +186,7 @@ void *tsdbGetIvtIdx(SMeta *pMeta);
uint64_t getReaderMaxVersion(STsdbReader *pReader); uint64_t getReaderMaxVersion(STsdbReader *pReader);
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
uint64_t suid, void **pReader, const char* idstr); uint64_t suid, void **pReader, const char *idstr);
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids); int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids);
void *tsdbCacherowsReaderClose(void *pReader); void *tsdbCacherowsReaderClose(void *pReader);
int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
......
...@@ -65,6 +65,7 @@ typedef struct SSmaInfo SSmaInfo; ...@@ -65,6 +65,7 @@ typedef struct SSmaInfo SSmaInfo;
typedef struct SBlockCol SBlockCol; typedef struct SBlockCol SBlockCol;
typedef struct SVersionRange SVersionRange; typedef struct SVersionRange SVersionRange;
typedef struct SLDataIter SLDataIter; typedef struct SLDataIter SLDataIter;
typedef struct SQueryNode SQueryNode;
typedef struct SDiskCol SDiskCol; typedef struct SDiskCol SDiskCol;
typedef struct SDiskData SDiskData; typedef struct SDiskData SDiskData;
typedef struct SDiskDataBuilder SDiskDataBuilder; typedef struct SDiskDataBuilder SDiskDataBuilder;
...@@ -208,11 +209,13 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in ...@@ -208,11 +209,13 @@ int32_t tsdbDecmprColData(uint8_t *pIn, SBlockCol *pBlockCol, int8_t cmprAlg, in
uint8_t **ppBuf); uint8_t **ppBuf);
// tsdbMemTable ============================================================================================== // tsdbMemTable ==============================================================================================
// SMemTable // SMemTable
typedef int32_t (*_tsdb_reseek_func_t)(void *pQHandle);
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable); int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable);
void tsdbMemTableDestroy(SMemTable *pMemTable); void tsdbMemTableDestroy(SMemTable *pMemTable);
STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid); STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid);
void tsdbRefMemTable(SMemTable *pMemTable); int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _tsdb_reseek_func_t reseek, SQueryNode **ppNode);
void tsdbUnrefMemTable(SMemTable *pMemTable); int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode);
SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable); SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable);
// STbDataIter // STbDataIter
int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter); int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter **ppIter);
...@@ -290,8 +293,8 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader); ...@@ -290,8 +293,8 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader);
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData); int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData);
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx); int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx);
// tsdbRead.c ============================================================================================== // tsdbRead.c ==============================================================================================
int32_t tsdbTakeReadSnap(STsdb *pTsdb, STsdbReadSnap **ppSnap, const char *id); int32_t tsdbTakeReadSnap(STsdbReader *pReader, _tsdb_reseek_func_t reseek, STsdbReadSnap **ppSnap);
void tsdbUntakeReadSnap(STsdb *pTsdb, STsdbReadSnap *pSnap, const char *id); void tsdbUntakeReadSnap(STsdbReader *pReader, STsdbReadSnap *pSnap);
// tsdbMerge.c ============================================================================================== // tsdbMerge.c ==============================================================================================
int32_t tsdbMerge(STsdb *pTsdb); int32_t tsdbMerge(STsdb *pTsdb);
...@@ -366,11 +369,20 @@ struct STbData { ...@@ -366,11 +369,20 @@ struct STbData {
STbData *next; STbData *next;
}; };
struct SQueryNode {
SQueryNode *pNext;
SQueryNode **ppNext;
void *pQHandle;
_tsdb_reseek_func_t reseek;
};
struct SMemTable { struct SMemTable {
SRWLatch latch; SRWLatch latch;
STsdb *pTsdb; STsdb *pTsdb;
SVBufPool *pPool; SVBufPool *pPool;
volatile int32_t nRef; volatile int32_t nRef;
int64_t minVer;
int64_t maxVer;
TSKEY minKey; TSKEY minKey;
TSKEY maxKey; TSKEY maxKey;
int64_t nRow; int64_t nRow;
...@@ -380,6 +392,7 @@ struct SMemTable { ...@@ -380,6 +392,7 @@ struct SMemTable {
int32_t nBucket; int32_t nBucket;
STbData **aBucket; STbData **aBucket;
}; };
SQueryNode qList;
}; };
struct TSDBROW { struct TSDBROW {
...@@ -605,9 +618,11 @@ struct SDelFWriter { ...@@ -605,9 +618,11 @@ struct SDelFWriter {
}; };
struct STsdbReadSnap { struct STsdbReadSnap {
SMemTable *pMem; SMemTable *pMem;
SMemTable *pIMem; SQueryNode *pNode;
STsdbFS fs; SMemTable *pIMem;
SQueryNode *pINode;
STsdbFS fs;
}; };
struct SDataFWriter { struct SDataFWriter {
...@@ -726,6 +741,9 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); ...@@ -726,6 +741,9 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
// tsdbCache ============================================================================================== // tsdbCache ==============================================================================================
typedef struct SCacheRowsReader { typedef struct SCacheRowsReader {
STsdb *pTsdb;
SVersionRange verRange;
TdThreadMutex readerMutex;
SVnode *pVnode; SVnode *pVnode;
STSchema *pSchema; STSchema *pSchema;
uint64_t uid; uint64_t uid;
......
...@@ -87,7 +87,8 @@ typedef struct SCommitInfo SCommitInfo; ...@@ -87,7 +87,8 @@ typedef struct SCommitInfo SCommitInfo;
#define VNODE_RSMA1_DIR "rsma1" #define VNODE_RSMA1_DIR "rsma1"
#define VNODE_RSMA2_DIR "rsma2" #define VNODE_RSMA2_DIR "rsma2"
#define VND_INFO_FNAME "vnode.json" #define VNODE_BUF_POOL_SEG 1 // TODO: change parameter here for sync/async commit
#define VND_INFO_FNAME "vnode.json"
// vnd.h // vnd.h
......
...@@ -725,6 +725,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -725,6 +725,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey);
taosWLockLatch(&pTq->pushLock); taosWLockLatch(&pTq->pushLock);
int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey)); int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey));
if (code != 0) { if (code != 0) {
...@@ -791,6 +793,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL ...@@ -791,6 +793,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
SMqRebVgReq req = {0}; SMqRebVgReq req = {0};
tDecodeSMqRebVgReq(msg, &req); tDecodeSMqRebVgReq(msg, &req);
// todo lock // todo lock
tqDebug("vgId:%d, tq process sub req %s", pTq->pVnode->config.vgId, req.subKey);
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) { if (pHandle == NULL) {
if (req.oldConsumerId != -1) { if (req.oldConsumerId != -1) {
...@@ -881,6 +886,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL ...@@ -881,6 +886,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
atomic_store_64(&pHandle->consumerId, req.newConsumerId); atomic_store_64(&pHandle->consumerId, req.newConsumerId);
atomic_add_fetch_32(&pHandle->epoch, 1); atomic_add_fetch_32(&pHandle->epoch, 1);
taosMemoryFree(req.qmsg); taosMemoryFree(req.qmsg);
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
qStreamCloseTsdbReader(pHandle->execHandle.task);
}
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
} }
// close handle // close handle
......
...@@ -109,6 +109,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, ...@@ -109,6 +109,8 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
p->type = type; p->type = type;
p->pVnode = pVnode; p->pVnode = pVnode;
p->pTsdb = p->pVnode->pTsdb;
p->verRange = (SVersionRange){.minVer = 0, .maxVer = UINT64_MAX};
p->numOfCols = numOfCols; p->numOfCols = numOfCols;
p->suid = suid; p->suid = suid;
...@@ -145,6 +147,7 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, ...@@ -145,6 +147,7 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
} }
p->idstr = taosMemoryStrDup(idstr); p->idstr = taosMemoryStrDup(idstr);
taosThreadMutexInit(&p->readerMutex, NULL);
*pReader = p; *pReader = p;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -164,7 +167,9 @@ void* tsdbCacherowsReaderClose(void* pReader) { ...@@ -164,7 +167,9 @@ void* tsdbCacherowsReaderClose(void* pReader) {
destroyLastBlockLoadInfo(p->pLoadInfo); destroyLastBlockLoadInfo(p->pLoadInfo);
taosMemoryFree((void*) p->idstr); taosMemoryFree((void*)p->idstr);
taosThreadMutexDestroy(&p->readerMutex);
taosMemoryFree(pReader); taosMemoryFree(pReader);
return NULL; return NULL;
} }
...@@ -199,6 +204,19 @@ static void freeItem(void* pItem) { ...@@ -199,6 +204,19 @@ static void freeItem(void* pItem) {
} }
} }
static int32_t tsdbCacheQueryReseek(void* pQHandle) {
int32_t code = 0;
SCacheRowsReader* pReader = pQHandle;
taosThreadMutexLock(&pReader->readerMutex);
// pause current reader's state if not paused, save ts & version for resuming
// just wait for the big all tables' snapshot untaking for now
taosThreadMutexUnlock(&pReader->readerMutex);
return code;
}
int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) { int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) {
if (pReader == NULL || pResBlock == NULL) { if (pReader == NULL || pResBlock == NULL) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
...@@ -241,7 +259,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -241,7 +259,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
taosArrayPush(pLastCols, &p); taosArrayPush(pLastCols, &p);
} }
tsdbTakeReadSnap(pr->pVnode->pTsdb, &pr->pReadSnap, "cache-l"); taosThreadMutexLock(&pr->readerMutex);
tsdbTakeReadSnap((STsdbReader*)pr, tsdbCacheQueryReseek, &pr->pReadSnap);
pr->pDataFReader = NULL; pr->pDataFReader = NULL;
pr->pDataFReaderLast = NULL; pr->pDataFReaderLast = NULL;
...@@ -355,8 +374,9 @@ _end: ...@@ -355,8 +374,9 @@ _end:
tsdbDataFReaderClose(&pr->pDataFReaderLast); tsdbDataFReaderClose(&pr->pDataFReaderLast);
tsdbDataFReaderClose(&pr->pDataFReader); tsdbDataFReaderClose(&pr->pDataFReader);
tsdbUntakeReadSnap(pr->pVnode->pTsdb, pr->pReadSnap, "cache-l");
resetLastBlockLoadInfo(pr->pLoadInfo); resetLastBlockLoadInfo(pr->pLoadInfo);
tsdbUntakeReadSnap((STsdbReader*)pr, pr->pReadSnap);
taosThreadMutexUnlock(&pr->readerMutex);
for (int32_t j = 0; j < pr->numOfCols; ++j) { for (int32_t j = 0; j < pr->numOfCols; ++j) {
taosMemoryFree(pRes[j]); taosMemoryFree(pRes[j]);
......
...@@ -175,7 +175,7 @@ int32_t tsdbCommit(STsdb *pTsdb, SCommitInfo *pInfo) { ...@@ -175,7 +175,7 @@ int32_t tsdbCommit(STsdb *pTsdb, SCommitInfo *pInfo) {
pTsdb->imem = NULL; pTsdb->imem = NULL;
taosThreadRwlockUnlock(&pTsdb->rwLock); taosThreadRwlockUnlock(&pTsdb->rwLock);
tsdbUnrefMemTable(pMemTable); tsdbUnrefMemTable(pMemTable, NULL);
goto _exit; goto _exit;
} }
...@@ -1664,7 +1664,7 @@ int32_t tsdbFinishCommit(STsdb *pTsdb) { ...@@ -1664,7 +1664,7 @@ int32_t tsdbFinishCommit(STsdb *pTsdb) {
// unlock // unlock
taosThreadRwlockUnlock(&pTsdb->rwLock); taosThreadRwlockUnlock(&pTsdb->rwLock);
if (pMemTable) { if (pMemTable) {
tsdbUnrefMemTable(pMemTable); tsdbUnrefMemTable(pMemTable, NULL);
} }
_exit: _exit:
......
...@@ -50,6 +50,8 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) { ...@@ -50,6 +50,8 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
pMemTable->pTsdb = pTsdb; pMemTable->pTsdb = pTsdb;
pMemTable->pPool = pTsdb->pVnode->inUse; pMemTable->pPool = pTsdb->pVnode->inUse;
pMemTable->nRef = 1; pMemTable->nRef = 1;
pMemTable->minVer = VERSION_MAX;
pMemTable->maxVer = VERSION_MIN;
pMemTable->minKey = TSKEY_MAX; pMemTable->minKey = TSKEY_MAX;
pMemTable->maxKey = TSKEY_MIN; pMemTable->maxKey = TSKEY_MIN;
pMemTable->nRow = 0; pMemTable->nRow = 0;
...@@ -62,6 +64,8 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) { ...@@ -62,6 +64,8 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
taosMemoryFree(pMemTable); taosMemoryFree(pMemTable);
goto _err; goto _err;
} }
pMemTable->qList.pNext = &pMemTable->qList;
pMemTable->qList.ppNext = &pMemTable->qList.pNext;
vnodeBufPoolRef(pMemTable->pPool); vnodeBufPoolRef(pMemTable->pPool);
*ppMemTable = pMemTable; *ppMemTable = pMemTable;
...@@ -146,6 +150,10 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitTbData *pSubmi ...@@ -146,6 +150,10 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitTbData *pSubmi
} }
if (code) goto _err; if (code) goto _err;
// update
pMemTable->minVer = TMIN(pMemTable->minVer, version);
pMemTable->maxVer = TMAX(pMemTable->maxVer, version);
return code; return code;
_err: _err:
...@@ -197,6 +205,8 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid ...@@ -197,6 +205,8 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
} }
pMemTable->nDel++; pMemTable->nDel++;
pMemTable->minVer = TMIN(pMemTable->minVer, version);
pMemTable->maxVer = TMIN(pMemTable->maxVer, version);
if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) { if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) {
tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid, eKey); tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid, eKey);
...@@ -237,7 +247,6 @@ void *tsdbTbDataIterDestroy(STbDataIter *pIter) { ...@@ -237,7 +247,6 @@ void *tsdbTbDataIterDestroy(STbDataIter *pIter) {
if (pIter) { if (pIter) {
taosMemoryFree(pIter); taosMemoryFree(pIter);
} }
return NULL; return NULL;
} }
...@@ -740,16 +749,45 @@ _exit: ...@@ -740,16 +749,45 @@ _exit:
int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; } int32_t tsdbGetNRowsInTbData(STbData *pTbData) { return pTbData->sl.size; }
void tsdbRefMemTable(SMemTable *pMemTable) { int32_t tsdbRefMemTable(SMemTable *pMemTable, void *pQHandle, _tsdb_reseek_func_t reseek, SQueryNode **ppNode) {
int32_t code = 0;
int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1); int32_t nRef = atomic_fetch_add_32(&pMemTable->nRef, 1);
ASSERT(nRef > 0); ASSERT(nRef > 0);
/*
// register handle (todo: take concurrency in consideration)
*ppNode = taosMemoryMalloc(sizeof(SQueryNode));
if (*ppNode == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
(*ppNode)->pQHandle = pQHandle;
(*ppNode)->reseek = reseek;
(*ppNode)->pNext = pMemTable->qList.pNext;
(*ppNode)->ppNext = &pMemTable->qList.pNext;
pMemTable->qList.pNext->ppNext = &(*ppNode)->pNext;
pMemTable->qList.pNext = *ppNode;
*/
_exit:
return code;
} }
void tsdbUnrefMemTable(SMemTable *pMemTable) { int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode) {
int32_t code = 0;
/*
// unregister handle (todo: take concurrency in consideration)
if (pNode) {
pNode->pNext->ppNext = pNode->ppNext;
*pNode->ppNext = pNode->pNext;
taosMemoryFree(pNode);
}
*/
int32_t nRef = atomic_sub_fetch_32(&pMemTable->nRef, 1); int32_t nRef = atomic_sub_fetch_32(&pMemTable->nRef, 1);
if (nRef == 0) { if (nRef == 0) {
tsdbMemTableDestroy(pMemTable); tsdbMemTableDestroy(pMemTable);
} }
return code;
} }
static FORCE_INLINE int32_t tbDataPCmprFn(const void *p1, const void *p2) { static FORCE_INLINE int32_t tbDataPCmprFn(const void *p1, const void *p2) {
...@@ -789,3 +827,29 @@ SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable) { ...@@ -789,3 +827,29 @@ SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable) {
_exit: _exit:
return aTbDataP; return aTbDataP;
} }
int32_t tsdbRecycleMemTable(SMemTable *pMemTable) {
int32_t code = 0;
SQueryNode *pNode = pMemTable->qList.pNext;
while (1) {
ASSERT(pNode != &pMemTable->qList);
SQueryNode *pNextNode = pNode->pNext;
if (pNextNode == &pMemTable->qList) {
code = (*pNode->reseek)(pNode->pQHandle);
if (code) goto _exit;
break;
} else {
code = (*pNode->reseek)(pNode->pQHandle);
if (code) goto _exit;
pNode = pMemTable->qList.pNext;
ASSERT(pNode == pNextNode);
}
}
// NOTE: Take care here, pMemTable is destroyed
_exit:
return code;
}
...@@ -74,7 +74,7 @@ int vnodeOpenBufPool(SVnode *pVnode) { ...@@ -74,7 +74,7 @@ int vnodeOpenBufPool(SVnode *pVnode) {
ASSERT(pVnode->pPool == NULL); ASSERT(pVnode->pPool == NULL);
for (int i = 0; i < 3; i++) { for (int i = 0; i < VNODE_BUF_POOL_SEG; i++) {
// create pool // create pool
if (vnodeBufPoolCreate(pVnode, size, &pPool)) { if (vnodeBufPoolCreate(pVnode, size, &pPool)) {
vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
......
...@@ -1207,3 +1207,4 @@ void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { ...@@ -1207,3 +1207,4 @@ void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
destroySendMsgInfo(pSendInfo); destroySendMsgInfo(pSendInfo);
} }
...@@ -2654,3 +2654,24 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta ...@@ -2654,3 +2654,24 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta
blockDataUpdateTsWindow(pBlock, 0); blockDataUpdateTsWindow(pBlock, 0);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void qStreamCloseTsdbReader(void* task) {
if (task == NULL) return;
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)task;
SOperatorInfo* pOp = pTaskInfo->pRoot;
qDebug("stream close tsdb reader, reset status uid %" PRId64 " ts %" PRId64, pTaskInfo->streamInfo.lastStatus.uid,
pTaskInfo->streamInfo.lastStatus.ts);
pTaskInfo->streamInfo.lastStatus = (STqOffsetVal){0};
while (pOp->numOfDownstream == 1 && pOp->pDownstream[0]) {
SOperatorInfo* pDownstreamOp = pOp->pDownstream[0];
if (pDownstreamOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamScanInfo* pInfo = pDownstreamOp->info;
if (pInfo->pTableScanOp) {
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
tsdbReaderClose(pTSInfo->base.dataReader);
pTSInfo->base.dataReader = NULL;
return;
}
}
}
}
...@@ -173,7 +173,7 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro ...@@ -173,7 +173,7 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro
if (NULL == *pPage) { if (NULL == *pPage) {
return NULL; return NULL;
} }
return (SResultRow*)((char*)(*pPage) + p1->offset); return (SResultRow*)((char*)(*pPage) + p1->offset);
} }
...@@ -306,12 +306,14 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca ...@@ -306,12 +306,14 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->filterOutBlocks += 1; pCost->filterOutBlocks += 1;
pCost->totalRows += pBlock->info.rows; pCost->totalRows += pBlock->info.rows;
tsdbReleaseDataBlock(pTableScanInfo->dataReader);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) { } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1); doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
pCost->skipBlocks += 1; pCost->skipBlocks += 1;
tsdbReleaseDataBlock(pTableScanInfo->dataReader);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) { } else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) {
pCost->loadBlockStatis += 1; pCost->loadBlockStatis += 1;
...@@ -321,6 +323,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca ...@@ -321,6 +323,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1); doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
tsdbReleaseDataBlock(pTableScanInfo->dataReader);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { } else {
qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo)); qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo));
...@@ -342,6 +345,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca ...@@ -342,6 +345,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
pCost->filterOutBlocks += 1; pCost->filterOutBlocks += 1;
(*status) = FUNC_DATA_REQUIRED_FILTEROUT; (*status) = FUNC_DATA_REQUIRED_FILTEROUT;
tsdbReleaseDataBlock(pTableScanInfo->dataReader);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
...@@ -356,7 +360,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca ...@@ -356,7 +360,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->skipBlocks += 1; pCost->skipBlocks += 1;
tsdbReleaseDataBlock(pTableScanInfo->dataReader);
*status = FUNC_DATA_REQUIRED_FILTEROUT; *status = FUNC_DATA_REQUIRED_FILTEROUT;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1584,7 +1588,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1584,7 +1588,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
if (pResult && pResult->info.rows > 0) { if (pResult && pResult->info.rows > 0) {
qDebug("queue scan tsdb return %d rows", pResult->info.rows); qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64, pResult->info.rows,
pResult->info.window.skey, pResult->info.window.ekey);
pTaskInfo->streamInfo.returned = 1; pTaskInfo->streamInfo.returned = 1;
return pResult; return pResult;
} else { } else {
...@@ -2554,6 +2559,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { ...@@ -2554,6 +2559,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
} }
STsdbReader* reader = pInfo->base.dataReader; STsdbReader* reader = pInfo->base.dataReader;
qTrace("tsdb/read-table-data: %p, enter next reader", reader);
while (tsdbNextDataBlock(reader)) { while (tsdbNextDataBlock(reader)) {
if (isTaskKilled(pTaskInfo)) { if (isTaskKilled(pTaskInfo)) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
...@@ -2588,6 +2594,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { ...@@ -2588,6 +2594,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
pOperator->resultInfo.totalRows += pBlock->info.rows; pOperator->resultInfo.totalRows += pBlock->info.rows;
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
qTrace("tsdb/read-table-data: %p, close reader", reader);
tsdbReaderClose(pInfo->base.dataReader); tsdbReaderClose(pInfo->base.dataReader);
pInfo->base.dataReader = NULL; pInfo->base.dataReader = NULL;
return pBlock; return pBlock;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册