提交 a6295cf4 编写于 作者: A Alex Duan

memory and file data merge function refactor

上级 eba76d84
...@@ -1190,66 +1190,63 @@ static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock, ...@@ -1190,66 +1190,63 @@ static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock,
return code; return code;
} }
static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) { // search last keyList[ret] < key order asc and keyList[ret] > key order desc
int firstPos, lastPos, midPos = -1; static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int order) {
int numOfRows; // start end posistion
TSKEY* keyList; int s, e;
s = pos;
assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
// check
if (num <= 0) return -1; assert(pos >=0 && pos < num);
assert(num > 0);
keyList = (TSKEY*)pValue;
firstPos = 0; if (order == TSDB_ORDER_ASC) {
lastPos = num - 1;
if (order == TSDB_ORDER_DESC) {
// find the first position which is smaller than the key // find the first position which is smaller than the key
e = num - 1;
if (key < keyList[pos])
return -1;
while (1) { while (1) {
if (key >= keyList[lastPos]) return lastPos; // check can return
if (key == keyList[firstPos]) return firstPos; if (key >= keyList[e])
if (key < keyList[firstPos]) return firstPos - 1; return e;
if (key <= keyList[s])
numOfRows = lastPos - firstPos + 1; return s;
midPos = (numOfRows >> 1) + firstPos; if (e - s <= 1)
return s;
if (key < keyList[midPos]) {
lastPos = midPos - 1; // change start or end position
} else if (key > keyList[midPos]) { int mid = s + (e - s + 1)/2;
firstPos = midPos + 1; if (keyList[mid] > key)
} else { e = mid;
break; else if(keyList[mid] < key)
} s = mid;
} else
return mid;
} else { }
} else { // DESC
// find the first position which is bigger than the key // find the first position which is bigger than the key
while (1) { e = 0;
if (key <= keyList[firstPos]) return firstPos; if (key > keyList[pos])
if (key == keyList[lastPos]) return lastPos; return -1;
while (1) {
if (key > keyList[lastPos]) { // check can return
lastPos = lastPos + 1; if (key <= keyList[e])
if (lastPos >= num) return e;
return -1; if (key >= keyList[s])
return s;
if (s - e <= 1)
return s;
// change start or end position
int mid = s - (s - e + 1)/2;
if (keyList[mid] < key)
e = mid;
else if(keyList[mid] > key)
s = mid;
else else
return lastPos; return mid;
} }
numOfRows = lastPos - firstPos + 1;
midPos = (numOfRows >> 1) + firstPos;
if (key < keyList[midPos]) {
lastPos = midPos - 1;
} else if (key > keyList[midPos]) {
firstPos = midPos + 1;
} else {
break;
}
} }
}
return midPos;
} }
int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) { int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) {
...@@ -1558,7 +1555,6 @@ static void copyAllRemainRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STabl ...@@ -1558,7 +1555,6 @@ static void copyAllRemainRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STabl
int32_t getEndPosInDataBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBlockInfo) { int32_t getEndPosInDataBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBlockInfo) {
// NOTE: reverse the order to find the end position in data block // NOTE: reverse the order to find the end position in data block
int32_t endPos = -1; int32_t endPos = -1;
int32_t order = ASCENDING_TRAVERSE(pQueryHandle->order)? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
SDataCols* pCols = pQueryHandle->rhelper.pDCols[0]; SDataCols* pCols = pQueryHandle->rhelper.pDCols[0];
...@@ -1571,7 +1567,9 @@ int32_t getEndPosInDataBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBl ...@@ -1571,7 +1567,9 @@ int32_t getEndPosInDataBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBl
cur->mixBlock = (cur->pos != pBlockInfo->rows - 1); cur->mixBlock = (cur->pos != pBlockInfo->rows - 1);
} else { } else {
assert(pCols->numOfRows > 0); assert(pCols->numOfRows > 0);
endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pQueryHandle->window.ekey, order); int pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0 : pBlockInfo->rows - 1;
endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pos, pQueryHandle->window.ekey, pQueryHandle->order);
assert(endPos != -1);
cur->mixBlock = true; cur->mixBlock = true;
} }
...@@ -1591,17 +1589,16 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1591,17 +1589,16 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_INDEX && assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_INDEX &&
cur->pos >= 0 && cur->pos < pBlock->numOfRows); cur->pos >= 0 && cur->pos < pBlock->numOfRows);
TSKEY* tsArray = pCols->cols[0].pData; // key read from file
assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst && tsArray[pBlock->numOfRows-1] == pBlock->keyLast); TSKEY* keyFile = pCols->cols[0].pData;
assert(pCols->numOfRows == pBlock->numOfRows && keyFile[0] == pBlock->keyFirst && keyFile[pBlock->numOfRows-1] == pBlock->keyLast);
// for search the endPos, so the order needs to reverse
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1; int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle)); int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle));
STable* pTable = pCheckInfo->pTableObj; STable* pTable = pCheckInfo->pTableObj;
int32_t endPos = getEndPosInDataBlock(pQueryHandle, &blockInfo); int32_t endPos = getEndPosInDataBlock(pQueryHandle, &blockInfo);
tsdbDebug("%p uid:%" PRIu64",tid:%d start merge data block, file block range:%"PRIu64"-%"PRIu64" rows:%d, start:%d," tsdbDebug("%p uid:%" PRIu64",tid:%d start merge data block, file block range:%"PRIu64"-%"PRIu64" rows:%d, start:%d,"
"end:%d, 0x%"PRIx64, "end:%d, 0x%"PRIx64,
...@@ -1614,6 +1611,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1614,6 +1611,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
int16_t rv = -1; int16_t rv = -1;
STSchema* pSchema = NULL; STSchema* pSchema = NULL;
// position in file ->fpos
int32_t pos = cur->pos; int32_t pos = cur->pos;
cur->win = TSWINDOW_INITIALIZER; cur->win = TSWINDOW_INITIALIZER;
...@@ -1629,70 +1627,84 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1629,70 +1627,84 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
break; break;
} }
TSKEY key = dataRowKey(row); TSKEY keyMem = dataRowKey(row);
if ((key > pQueryHandle->window.ekey && ASCENDING_TRAVERSE(pQueryHandle->order)) || // key in memory large query endKey order by asc or litter query endKey order by desc, no overlap so not need merge
(key < pQueryHandle->window.ekey && !ASCENDING_TRAVERSE(pQueryHandle->order))) { if ((keyMem > pQueryHandle->window.ekey && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(keyMem < pQueryHandle->window.ekey && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
break; break;
} }
if (((pos > endPos || tsArray[pos] > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) || // above condition same with key in file
((pos < endPos || tsArray[pos] < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))) { if (((pos > endPos || keyFile[pos] > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
((pos < endPos || keyFile[pos] < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
break; break;
} }
if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) || // put keyMem row
(key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) { if ((keyMem < keyFile[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(keyMem > keyFile[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
// update new version
if (rv != dataRowVersion(row)) { if (rv != dataRowVersion(row)) {
pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row));
rv = dataRowVersion(row); rv = dataRowVersion(row);
pSchema = tsdbGetTableSchemaByVersion(pTable, rv);
} }
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable, pSchema); copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable, pSchema);
numOfRows += 1; numOfRows += 1;
// record start key with memory key if not
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key; cur->win.skey = keyMem;
} }
cur->win.ekey = key; cur->win.ekey = keyMem;
cur->lastKey = key + step; cur->lastKey = keyMem + step;
cur->mixBlock = true; cur->mixBlock = true;
moveToNextRowInMem(pCheckInfo); moveToNextRowInMem(pCheckInfo);
} else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it // same select mem key if update is true
} else if (keyMem == keyFile[pos]) {
if (pCfg->update) { if (pCfg->update) {
// update new version
if (rv != dataRowVersion(row)) { if (rv != dataRowVersion(row)) {
pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row));
rv = dataRowVersion(row); rv = dataRowVersion(row);
pSchema = tsdbGetTableSchemaByVersion(pTable, rv);
} }
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable, pSchema); copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable, pSchema);
numOfRows += 1; numOfRows += 1;
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key; cur->win.skey = keyMem;
} }
cur->win.ekey = key; cur->win.ekey = keyMem;
cur->lastKey = key + step; cur->lastKey = keyMem + step;
cur->mixBlock = true; cur->mixBlock = true;
//mem move next
moveToNextRowInMem(pCheckInfo); moveToNextRowInMem(pCheckInfo);
//file move next, discard file row
pos += step; pos += step;
} else { } else {
// not update, only mem move to next, discard mem row
moveToNextRowInMem(pCheckInfo); moveToNextRowInMem(pCheckInfo);
} }
} else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) || // put file row
(key < tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) { } else if ((keyMem > keyFile[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(keyMem < keyFile[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = tsArray[pos]; cur->win.skey = keyFile[pos];
} }
int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order); int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pos, keyMem, pQueryHandle->order);
assert(end != -1); assert(end != -1);
if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it // same, if update
if (keyFile[end] == keyMem) {
if (!pCfg->update) { if (!pCfg->update) {
//can't update, ignore current mem row skip next
moveToNextRowInMem(pCheckInfo); moveToNextRowInMem(pCheckInfo);
} else { } else {
// can update, don't copy then deal on next loop with keyMem == keyFile[pos]
end -= step; end -= step;
} }
} }
...@@ -1700,10 +1712,17 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1700,10 +1712,17 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
int32_t qstart = 0, qend = 0; int32_t qstart = 0, qend = 0;
getQualifiedRowsPos(pQueryHandle, pos, end, numOfRows, &qstart, &qend); getQualifiedRowsPos(pQueryHandle, pos, end, numOfRows, &qstart, &qend);
numOfRows = doCopyRowsFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, qstart, qend); if(qend >= qstart) {
pos += (qend - qstart + 1) * step; // copy qend - qstart + 1 rows from file
numOfRows = doCopyRowsFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, qstart, qend);
cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? tsArray[qend]:tsArray[qstart]; int32_t num = qend - qstart + 1;
pos += num * step;
} else {
// nothing copy from file
pos += step;
}
cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? keyFile[qend] : keyFile[qstart];
cur->lastKey = cur->win.ekey + step; cur->lastKey = cur->win.ekey + step;
} }
} while (numOfRows < pQueryHandle->outputCapacity); } while (numOfRows < pQueryHandle->outputCapacity);
...@@ -1718,7 +1737,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1718,7 +1737,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
((dataRowKey((SDataRow)SL_GET_NODE_DATA(node)) < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))) { ((dataRowKey((SDataRow)SL_GET_NODE_DATA(node)) < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
// no data in cache or data in cache is greater than the ekey of time window, load data from file block // no data in cache or data in cache is greater than the ekey of time window, load data from file block
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = tsArray[pos]; cur->win.skey = keyFile[pos];
} }
int32_t start = -1, end = -1; int32_t start = -1, end = -1;
...@@ -1727,7 +1746,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1727,7 +1746,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
numOfRows = doCopyRowsFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end); numOfRows = doCopyRowsFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
pos += (end - start + 1) * step; pos += (end - start + 1) * step;
cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? tsArray[end]:tsArray[start]; cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? keyFile[end] : keyFile[start];
cur->lastKey = cur->win.ekey + step; cur->lastKey = cur->win.ekey + step;
cur->mixBlock = true; cur->mixBlock = true;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册