未验证 提交 14aa15ea 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #8510 from taosdata/release/ver-2.0.20.19

memory and file data merge function refactor
......@@ -1197,66 +1197,63 @@ static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock,
return code;
}
static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
int firstPos, lastPos, midPos = -1;
int numOfRows;
TSKEY* keyList;
assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
if (num <= 0) return -1;
keyList = (TSKEY*)pValue;
firstPos = 0;
lastPos = num - 1;
if (order == TSDB_ORDER_DESC) {
// search last keyList[ret] < key order asc and keyList[ret] > key order desc
static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int order) {
// start end posistion
int s, e;
s = pos;
// check
assert(pos >=0 && pos < num);
assert(num > 0);
if (order == TSDB_ORDER_ASC) {
// find the first position which is smaller than the key
e = num - 1;
if (key < keyList[pos])
return -1;
while (1) {
if (key >= keyList[lastPos]) return lastPos;
if (key == keyList[firstPos]) return firstPos;
if (key < keyList[firstPos]) return firstPos - 1;
numOfRows = lastPos - firstPos + 1;
midPos = (numOfRows >> 1) + firstPos;
if (key < keyList[midPos]) {
lastPos = midPos - 1;
} else if (key > keyList[midPos]) {
firstPos = midPos + 1;
} else {
break;
}
}
} else {
// check can return
if (key >= keyList[e])
return e;
if (key <= keyList[s])
return s;
if (e - s <= 1)
return s;
// change start or end position
int mid = s + (e - s + 1)/2;
if (keyList[mid] > key)
e = mid;
else if(keyList[mid] < key)
s = mid;
else
return mid;
}
} else { // DESC
// find the first position which is bigger than the key
while (1) {
if (key <= keyList[firstPos]) return firstPos;
if (key == keyList[lastPos]) return lastPos;
if (key > keyList[lastPos]) {
lastPos = lastPos + 1;
if (lastPos >= num)
return -1;
e = 0;
if (key > keyList[pos])
return -1;
while (1) {
// check can return
if (key <= keyList[e])
return e;
if (key >= keyList[s])
return s;
if (s - e <= 1)
return s;
// change start or end position
int mid = s - (s - e + 1)/2;
if (keyList[mid] < key)
e = mid;
else if(keyList[mid] > key)
s = mid;
else
return lastPos;
}
numOfRows = lastPos - firstPos + 1;
midPos = (numOfRows >> 1) + firstPos;
if (key < keyList[midPos]) {
lastPos = midPos - 1;
} else if (key > keyList[midPos]) {
firstPos = midPos + 1;
} else {
break;
}
return mid;
}
}
}
return midPos;
}
int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) {
......@@ -1565,7 +1562,6 @@ static void copyAllRemainRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STabl
int32_t getEndPosInDataBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBlockInfo) {
// NOTE: reverse the order to find the end position in data block
int32_t endPos = -1;
int32_t order = ASCENDING_TRAVERSE(pQueryHandle->order)? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
SQueryFilePos* cur = &pQueryHandle->cur;
SDataCols* pCols = pQueryHandle->rhelper.pDCols[0];
......@@ -1578,7 +1574,9 @@ int32_t getEndPosInDataBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBl
cur->mixBlock = (cur->pos != pBlockInfo->rows - 1);
} else {
assert(pCols->numOfRows > 0);
endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pQueryHandle->window.ekey, order);
int pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0 : pBlockInfo->rows - 1;
endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pos, pQueryHandle->window.ekey, pQueryHandle->order);
assert(endPos != -1);
cur->mixBlock = true;
}
......@@ -1598,17 +1596,16 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_INDEX &&
cur->pos >= 0 && cur->pos < pBlock->numOfRows);
TSKEY* tsArray = pCols->cols[0].pData;
assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst && tsArray[pBlock->numOfRows-1] == pBlock->keyLast);
// for search the endPos, so the order needs to reverse
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
// key read from file
TSKEY* keyFile = pCols->cols[0].pData;
assert(pCols->numOfRows == pBlock->numOfRows && keyFile[0] == pBlock->keyFirst && keyFile[pBlock->numOfRows-1] == pBlock->keyLast);
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle));
STable* pTable = pCheckInfo->pTableObj;
int32_t endPos = getEndPosInDataBlock(pQueryHandle, &blockInfo);
tsdbDebug("%p uid:%" PRIu64",tid:%d start merge data block, file block range:%"PRIu64"-%"PRIu64" rows:%d, start:%d,"
"end:%d, 0x%"PRIx64,
......@@ -1621,6 +1618,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
int16_t rv = -1;
STSchema* pSchema = NULL;
// position in file ->fpos
int32_t pos = cur->pos;
cur->win = TSWINDOW_INITIALIZER;
......@@ -1636,70 +1634,84 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
break;
}
TSKEY key = dataRowKey(row);
if ((key > pQueryHandle->window.ekey && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key < pQueryHandle->window.ekey && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
TSKEY keyMem = dataRowKey(row);
// key in memory large query endKey order by asc or litter query endKey order by desc, no overlap so not need merge
if ((keyMem > pQueryHandle->window.ekey && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(keyMem < pQueryHandle->window.ekey && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
break;
}
if (((pos > endPos || tsArray[pos] > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
((pos < endPos || tsArray[pos] < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
// above condition same with key in file
if (((pos > endPos || keyFile[pos] > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
((pos < endPos || keyFile[pos] < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
break;
}
if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
// put keyMem row
if ((keyMem < keyFile[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(keyMem > keyFile[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
// update new version
if (rv != dataRowVersion(row)) {
pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row));
rv = dataRowVersion(row);
pSchema = tsdbGetTableSchemaByVersion(pTable, rv);
}
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable, pSchema);
numOfRows += 1;
// record start key with memory key if not
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key;
cur->win.skey = keyMem;
}
cur->win.ekey = key;
cur->lastKey = key + step;
cur->win.ekey = keyMem;
cur->lastKey = keyMem + step;
cur->mixBlock = true;
moveToNextRowInMem(pCheckInfo);
} else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it
// same select mem key if update is true
} else if (keyMem == keyFile[pos]) {
if (pCfg->update) {
// update new version
if (rv != dataRowVersion(row)) {
pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row));
rv = dataRowVersion(row);
pSchema = tsdbGetTableSchemaByVersion(pTable, rv);
}
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable, pSchema);
numOfRows += 1;
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key;
cur->win.skey = keyMem;
}
cur->win.ekey = key;
cur->lastKey = key + step;
cur->win.ekey = keyMem;
cur->lastKey = keyMem + step;
cur->mixBlock = true;
//mem move next
moveToNextRowInMem(pCheckInfo);
//file move next, discard file row
pos += step;
} else {
// not update, only mem move to next, discard mem row
moveToNextRowInMem(pCheckInfo);
}
} else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key < tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
// put file row
} else if ((keyMem > keyFile[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(keyMem < keyFile[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = tsArray[pos];
cur->win.skey = keyFile[pos];
}
int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order);
int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pos, keyMem, pQueryHandle->order);
assert(end != -1);
if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it
// same, if update
if (keyFile[end] == keyMem) {
if (!pCfg->update) {
//can't update, ignore current mem row skip next
moveToNextRowInMem(pCheckInfo);
} else {
// can update, don't copy then deal on next loop with keyMem == keyFile[pos]
end -= step;
}
}
......@@ -1707,10 +1719,17 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
int32_t qstart = 0, qend = 0;
getQualifiedRowsPos(pQueryHandle, pos, end, numOfRows, &qstart, &qend);
numOfRows = doCopyRowsFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, qstart, qend);
pos += (qend - qstart + 1) * step;
cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? tsArray[qend]:tsArray[qstart];
if(qend >= qstart) {
// copy qend - qstart + 1 rows from file
numOfRows = doCopyRowsFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, qstart, qend);
int32_t num = qend - qstart + 1;
pos += num * step;
} else {
// nothing copy from file
pos += step;
}
cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? keyFile[qend] : keyFile[qstart];
cur->lastKey = cur->win.ekey + step;
}
} while (numOfRows < pQueryHandle->outputCapacity);
......@@ -1725,7 +1744,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
((dataRowKey((SDataRow)SL_GET_NODE_DATA(node)) < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
// no data in cache or data in cache is greater than the ekey of time window, load data from file block
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = tsArray[pos];
cur->win.skey = keyFile[pos];
}
int32_t start = -1, end = -1;
......@@ -1734,7 +1753,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
numOfRows = doCopyRowsFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
pos += (end - start + 1) * step;
cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? tsArray[end]:tsArray[start];
cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? keyFile[end] : keyFile[start];
cur->lastKey = cur->win.ekey + step;
cur->mixBlock = true;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册