提交 8e3f5135 编写于 作者: L Liu Jicong

fix(tsdb): close fd

上级 0fccdace
......@@ -63,15 +63,15 @@ typedef struct SBlockLoadSuppInfo {
} SBlockLoadSuppInfo;
typedef struct SFilesetIter {
int32_t numOfFiles; // number of total files
int32_t index; // current accessed index in the list
SArray* pFileList; // data file list
int32_t order;
int32_t numOfFiles; // number of total files
int32_t index; // current accessed index in the list
SArray* pFileList; // data file list
int32_t order;
} SFilesetIter;
typedef struct SFileDataBlockInfo {
int32_t
tbBlockIdx; // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
tbBlockIdx; // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
uint64_t uid;
} SFileDataBlockInfo;
......@@ -119,10 +119,10 @@ struct STsdbReader {
int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
SBlockLoadSuppInfo suppInfo;
SIOCostSummary cost;
STSchema* pSchema;
SDataFReader* pFileReader;
SVersionRange verRange;
SIOCostSummary cost;
STSchema* pSchema;
SDataFReader* pFileReader;
SVersionRange verRange;
};
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
......@@ -287,9 +287,7 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, const STsdbFSState* pFSt
return TSDB_CODE_SUCCESS;
}
static void cleanupFilesetIterator(SFilesetIter* pIter) {
taosArrayDestroy(pIter->pFileList);
}
static void cleanupFilesetIterator(SFilesetIter* pIter) { taosArrayDestroy(pIter->pFileList); }
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
bool asc = ASCENDING_TRAVERSE(pIter->order);
......@@ -304,6 +302,7 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
STimeWindow win = {0};
while (1) {
/*if (pReader->pFileReader != NULL) tsdbDataFReaderClose(&pReader->pFileReader);*/
pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
......@@ -349,9 +348,7 @@ static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
}
}
static void cleanupDataBlockIterator(SDataBlockIter* pIter) {
taosArrayDestroy(pIter->blockList);
}
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
static void initReaderStatus(SReaderStatus* pStatus) {
pStatus->pTableIter = NULL;
......@@ -392,8 +389,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
initReaderStatus(&pReader->status);
pReader->pTsdb =
getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
pReader->suid = pCond->suid;
pReader->order = pCond->order;
pReader->capacity = 4096;
......@@ -833,7 +829,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
uint8_t *pb = NULL, *pb1 = NULL;
int32_t code = tsdbReadColData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pSupInfo->colIds, numOfCols,
pBlockData, &pb, &pb1);
pBlockData, &pb, &pb1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -1459,18 +1455,18 @@ static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBl
}
TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
// ts is not overlap
if (pBlock->minKey.ts > pLast->ts || pBlock->maxKey.ts < pFirst->ts) {
return false;
}
int32_t step = ASCENDING_TRAVERSE(order)? 1:-1;
int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
// version is not overlap
size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);
for(int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += step) {
for (int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += step) {
TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
if (p->version >= pBlock->minVersion) {
......@@ -1502,8 +1498,8 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBloc
}
// has duplicated ts of different version in this block
bool hasDup = (pBlock->nSubBlock == 1)? pBlock->hasDup:true;
bool overlapWithDel= overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
bool hasDup = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
bool overlapWithDel = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
return (overlapWithNeighbor || hasDup || dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock) ||
keyOverlapFileBlock(key, pBlock, &pReader->verRange) || (pBlock->nRow > pReader->capacity) || overlapWithDel);
......@@ -2220,17 +2216,18 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret
}
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
int64_t startVer = (pCond->startVersion == -1)? 0:pCond->startVersion;
int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion;
if (VND_IS_RSMA(pVnode)) {
return (SVersionRange){.minVer = startVer, .maxVer = tdRSmaGetMaxSubmitVer(pVnode->pSma, level)};
}
int64_t endVer = 0;
if (pCond->endVersion == -1) { // user not specified end version, set current maximum version of vnode as the endVersion
if (pCond->endVersion ==
-1) { // user not specified end version, set current maximum version of vnode as the endVersion
endVer = pVnode->state.applied;
} else {
endVer = (pCond->endVersion > pVnode->state.applied)? pVnode->state.applied:pCond->endVersion;
endVer = (pCond->endVersion > pVnode->state.applied) ? pVnode->state.applied : pCond->endVersion;
}
return (SVersionRange){.minVer = startVer, .maxVer = endVer};
......@@ -2274,9 +2271,9 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32
if (pDelList == NULL) {
return false;
}
size_t num = taosArrayGetSize(pDelList);
bool asc = ASCENDING_TRAVERSE(order);
int32_t step = asc? 1:-1;
size_t num = taosArrayGetSize(pDelList);
bool asc = ASCENDING_TRAVERSE(order);
int32_t step = asc ? 1 : -1;
if (asc) {
if (*index >= num - 1) {
......@@ -2823,7 +2820,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
taosMemoryFree(pSupInfo->colIds);
taosArrayDestroy(pSupInfo->pColAgg);
for(int32_t i = 0; i < blockDataGetNumOfCols(pReader->pResBlock); ++i) {
for (int32_t i = 0; i < blockDataGetNumOfCols(pReader->pResBlock); ++i) {
if (pSupInfo->buildBuf[i] != NULL) {
taosMemoryFreeClear(pSupInfo->buildBuf[i]);
}
......@@ -2835,7 +2832,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
destroyBlockScanInfo(pReader->status.pTableMap);
blockDataDestroy(pReader->pResBlock);
if (pReader->pFileReader != NULL) tsdbDataFReaderClose(&pReader->pFileReader);
#if 0
// if (pReader->status.pTableScanInfo != NULL) {
// pReader->status.pTableScanInfo = destroyTableCheckInfo(pReader->status.pTableScanInfo);
......@@ -3011,8 +3008,8 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
return TSDB_CODE_SUCCESS;
}
pReader->order = pCond->order;
pReader->type = BLOCK_LOAD_OFFSET_ORDER;
pReader->order = pCond->order;
pReader->type = BLOCK_LOAD_OFFSET_ORDER;
pReader->status.loadFromFile = true;
pReader->status.pTableIter = NULL;
......@@ -3028,6 +3025,8 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
int32_t numOfTables = 1;
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
tsdbDataFReaderClose(&pReader->pFileReader);
STsdbFSState* pFState = pReader->pTsdb->fs->cState;
initFilesetIterator(&pReader->status.fileIter, pFState, pReader->order, pReader->idStr);
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
......@@ -3114,13 +3113,12 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
}
/*
hasNext = blockIteratorNext(&pStatus->blockIter);
*/
/*
hasNext = blockIteratorNext(&pStatus->blockIter);
*/
// tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
// pReader->pFileGroup->fid, pReader->idStr);
// tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
// pReader->pFileGroup->fid, pReader->idStr);
}
return code;
......@@ -3158,7 +3156,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
return rows;
}
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t *suid) {
int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) {
int32_t sversion = 1;
SMetaReader mr = {0};
......@@ -3171,7 +3169,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
}
*suid = 0;
if (mr.me.type == TSDB_CHILD_TABLE) {
*suid = mr.me.ctbEntry.suid;
code = metaGetTableEntryByUid(&mr, *suid);
......@@ -3188,8 +3186,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
metaReaderClear(&mr);
*pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion);
return TSDB_CODE_SUCCESS;
}
......@@ -288,7 +288,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
if (!tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) {
while (1) {
uint8_t type = pOperator->operatorType;
/*pOperator->status = OP_OPENED;*/
pOperator->status = OP_OPENED;
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamScanInfo* pInfo = pOperator->info;
if (pOffset->type == TMQ_OFFSET__LOG) {
......@@ -326,6 +326,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
if (pTableInfo->uid == uid) {
found = true;
pTableScanInfo->currentTable = i;
break;
}
}
......
......@@ -1221,6 +1221,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
}
// TODO clean data block
if (pInfo->pRes->info.rows > 0) {
qDebug("stream scan log return %d rows", pInfo->pRes->info.rows);
return pInfo->pRes;
}
} else if (ret.fetchType == FETCH_TYPE__META) {
......@@ -1231,7 +1232,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
} else if (ret.fetchType == FETCH_TYPE__NONE) {
pTaskInfo->streamInfo.lastStatus = ret.offset;
ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 >= pTaskInfo->streamInfo.prepareStatus.version);
qDebug("stream scan return null");
qDebug("stream scan log return null");
return NULL;
} else {
ASSERT(0);
......@@ -1239,7 +1240,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
}
} else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
return pResult && pResult->info.rows > 0 ? pResult : NULL;
if (pResult && pResult->info.rows > 0) {
qDebug("stream scan tsdb return %d rows", pResult->info.rows);
return pResult;
}
qDebug("stream scan tsdb return null");
return NULL;
} else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META) {
// TODO scan meta
ASSERT(0);
......@@ -1292,7 +1298,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
}
return pBlock;
} else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
qInfo("scan mode %d", pInfo->scanMode);
qDebug("scan mode %d", pInfo->scanMode);
if (pInfo->scanMode == STREAM_SCAN_FROM_RES) {
blockDataDestroy(pInfo->pUpdateRes);
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
......@@ -1387,7 +1393,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
}
}
}
qInfo("scan rows: %d", pBlockInfo->rows);
qDebug("scan rows: %d", pBlockInfo->rows);
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
#if 0
......
......@@ -209,7 +209,7 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED);
qInfo("task %d receive dispatch rsp", pTask->taskId);
qDebug("task %d receive dispatch rsp", pTask->taskId);
int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus);
ASSERT(old == TASK_OUTPUT_STATUS__WAIT);
......
......@@ -303,7 +303,7 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) {
}
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);
qInfo("stream continue dispatching: task %d", pTask->taskId);
qDebug("stream continue dispatching: task %d", pTask->taskId);
SRpcMsg dispatchMsg = {0};
SEpSet* pEpSet = NULL;
......
......@@ -26,12 +26,12 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->isDataScan);
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
qInfo("task %d %p set submit input %p %p %d", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef);
qDebug("task %d %p set submit input %p %p %d", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef);
qSetStreamInput(exec, pSubmit->data, STREAM_INPUT__DATA_SUBMIT, false);
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
SStreamDataBlock* pBlock = (SStreamDataBlock*)data;
SArray* blocks = pBlock->blocks;
qInfo("task %d %p set ssdata input", pTask->taskId, pTask);
qDebug("task %d %p set ssdata input", pTask->taskId, pTask);
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_BLOCK, false);
} else if (pItem->type == STREAM_INPUT__DROP) {
// TODO exec drop
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册