提交 151fd39d 编写于 作者: H Hongze Cheng

more code

上级 44915af5
......@@ -384,6 +384,8 @@ struct SMemTable {
STsdb *pTsdb;
SVBufPool *pPool;
volatile int32_t nRef;
int64_t minVer;
int64_t maxVer;
TSKEY minKey;
TSKEY maxKey;
int64_t nRow;
......@@ -393,6 +395,7 @@ struct SMemTable {
int32_t nBucket;
STbData **aBucket;
};
STsdbReader *pReaderList;
};
struct TSDBROW {
......
......@@ -44,6 +44,8 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
pMemTable->pTsdb = pTsdb;
pMemTable->pPool = pTsdb->pVnode->inUse;
pMemTable->nRef = 1;
pMemTable->minVer = VERSION_MAX;
pMemTable->maxVer = VERSION_MIN;
pMemTable->minKey = TSKEY_MAX;
pMemTable->maxKey = TSKEY_MIN;
pMemTable->nRow = 0;
......@@ -130,6 +132,10 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgI
goto _err;
}
// update
pMemTable->minVer = TMIN(pMemTable->minVer, version);
pMemTable->maxVer = TMAX(pMemTable->maxVer, version);
return code;
_err:
......@@ -179,6 +185,8 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
}
pMemTable->nDel++;
pMemTable->minVer = TMIN(pMemTable->minVer, version);
pMemTable->maxVer = TMIN(pMemTable->maxVer, version);
if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) {
tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid, eKey);
......@@ -219,7 +227,6 @@ void *tsdbTbDataIterDestroy(STbDataIter *pIter) {
if (pIter) {
taosMemoryFree(pIter);
}
return NULL;
}
......
......@@ -758,51 +758,43 @@ static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int or
s = pos;
// check
assert(pos >=0 && pos < num);
assert(pos >= 0 && pos < num);
assert(num > 0);
if (order == TSDB_ORDER_ASC) {
// find the first position which is smaller than the key
e = num - 1;
if (key < keyList[pos])
return -1;
e = num - 1;
if (key < keyList[pos]) return -1;
while (1) {
// check can return
if (key >= keyList[e])
return e;
if (key <= keyList[s])
return s;
if (e - s <= 1)
return s;
if (key >= keyList[e]) return e;
if (key <= keyList[s]) return s;
if (e - s <= 1) return s;
// change start or end position
int mid = s + (e - s + 1)/2;
int mid = s + (e - s + 1) / 2;
if (keyList[mid] > key)
e = mid;
else if(keyList[mid] < key)
else if (keyList[mid] < key)
s = mid;
else
return mid;
}
} else { // DESC
} else { // DESC
// find the first position which is bigger than the key
e = 0;
if (key > keyList[pos])
return -1;
e = 0;
if (key > keyList[pos]) return -1;
while (1) {
// check can return
if (key <= keyList[e])
return e;
if (key >= keyList[s])
return s;
if (s - e <= 1)
return s;
if (key <= keyList[e]) return e;
if (key >= keyList[s]) return s;
if (s - e <= 1) return s;
// change start or end position
int mid = s - (s - e + 1)/2;
int mid = s - (s - e + 1) / 2;
if (keyList[mid] < key)
e = mid;
else if(keyList[mid] > key)
else if (keyList[mid] > key)
s = mid;
else
return mid;
......@@ -813,7 +805,7 @@ static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int or
int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SDataBlk* pBlock, int32_t pos) {
// NOTE: reverse the order to find the end position in data block
int32_t endPos = -1;
bool asc = ASCENDING_TRAVERSE(pReader->order);
bool asc = ASCENDING_TRAVERSE(pReader->order);
if (asc && pReader->window.ekey >= pBlock->maxKey.ts) {
endPos = pBlock->nRow - 1;
......@@ -849,8 +841,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
} else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts) {
pDumpInfo->rowIndex = pBlock->nRow - 1;
} else {
int32_t pos = asc? pBlock->nRow-1:0;
int32_t order = (pReader->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
int32_t pos = asc ? pBlock->nRow - 1 : 0;
int32_t order = (pReader->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, pReader->window.skey, order);
}
......@@ -863,13 +855,13 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
endIndex += step;
int32_t remain = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex);
if (remain > pReader->capacity) { // output buffer check
if (remain > pReader->capacity) { // output buffer check
remain = pReader->capacity;
}
int32_t rowIndex = 0;
int32_t i = 0;
int32_t i = 0;
SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
if (asc) {
......@@ -938,7 +930,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
setBlockAllDumped(pDumpInfo, ts, pReader->order);
} else {
int64_t k = asc? pBlock->maxKey.ts:pBlock->minKey.ts;
int64_t k = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
setBlockAllDumped(pDumpInfo, k, pReader->order);
}
......@@ -948,8 +940,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows,
pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain,
unDumpedRows, pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
return TSDB_CODE_SUCCESS;
}
......@@ -2242,7 +2234,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
if (pBlockInfo != NULL) {
pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
// it is a clean block, load it directly
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader)) {
......@@ -2257,7 +2249,6 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
SBlockData* pBlockData = &pReader->status.fileBlockData;
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
while (1) {
// todo check the validate of row in file block
bool hasBlockData = false;
......@@ -2299,12 +2290,12 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
}
}
_end:
_end:
pResBlock->info.uid = pBlockScanInfo->uid;
blockDataUpdateTsWindow(pResBlock, 0);
setComposedBlockFlag(pReader, true);
double el = (taosGetTimestampUs() - st)/1000.0;
double el = (taosGetTimestampUs() - st) / 1000.0;
pReader->cost.composedBlocks += 1;
pReader->cost.buildComposedBlockTime += el;
......@@ -3366,7 +3357,8 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S
tColDataGetValue(pData, rowIndex, &cv);
doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
j += 1;
} else if (pData->cid > pCol->info.colId) { // the specified column does not exist in file block, fill with null data
} else if (pData->cid >
pCol->info.colId) { // the specified column does not exist in file block, fill with null data
colDataAppendNULL(pCol, outputRowIndex);
}
......@@ -3492,13 +3484,14 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
if (pCond->suid != 0) {
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, /*pCond->endVersion*/ -1);
if (pReader->pSchema == NULL) {
tsdbError("failed to get table schema, suid:%"PRIu64", ver:%"PRId64" , %s", pReader->suid, -1, pReader->idStr);
tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:%" PRId64 " , %s", pReader->suid, -1,
pReader->idStr);
}
} else if (taosArrayGetSize(pTableList) > 0) {
STableKeyInfo* pKey = taosArrayGet(pTableList, 0);
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, /*pCond->endVersion*/ -1);
if (pReader->pSchema == NULL) {
tsdbError("failed to get table schema, uid:%"PRIu64", ver:%"PRId64" , %s", pKey->uid, -1, pReader->idStr);
tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:%" PRId64 " , %s", pKey->uid, -1, pReader->idStr);
}
}
......@@ -3611,7 +3604,8 @@ void tsdbReaderClose(STsdbReader* pReader) {
tsdbDebug(
"%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
" SMA-time:%.2f ms, fileBlocks:%" PRId64 ", fileBlocks-load-time:%.2f ms, "
" SMA-time:%.2f ms, fileBlocks:%" PRId64
", fileBlocks-load-time:%.2f ms, "
"build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64 ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64
", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb %s",
pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime, pCost->numOfBlocks,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册