未验证 提交 498721a4 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #8717 from taosdata/feature/d7

merge from 2.0 into master
...@@ -121,7 +121,7 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) { ...@@ -121,7 +121,7 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) {
} }
if (!tsMnodeShowMetaFp[pShowMsg->type] || !tsMnodeShowRetrieveFp[pShowMsg->type]) { if (!tsMnodeShowMetaFp[pShowMsg->type] || !tsMnodeShowRetrieveFp[pShowMsg->type]) {
mError("show type:%s is not support", mnodeGetShowType(pShowMsg->type)); mWarn("show type:%s is not support", mnodeGetShowType(pShowMsg->type));
return TSDB_CODE_COM_OPS_NOT_SUPPORT; return TSDB_CODE_COM_OPS_NOT_SUPPORT;
} }
......
...@@ -4246,7 +4246,7 @@ void generateBlockDistResult(STableBlockDist *pTableBlockDist, char* result) { ...@@ -4246,7 +4246,7 @@ void generateBlockDistResult(STableBlockDist *pTableBlockDist, char* result) {
"5th=[%d], 10th=[%d], 20th=[%d], 30th=[%d], 40th=[%d], 50th=[%d]\n\t " "5th=[%d], 10th=[%d], 20th=[%d], 30th=[%d], 40th=[%d], 50th=[%d]\n\t "
"60th=[%d], 70th=[%d], 80th=[%d], 90th=[%d], 95th=[%d], 99th=[%d]\n\t " "60th=[%d], 70th=[%d], 80th=[%d], 90th=[%d], 95th=[%d], 99th=[%d]\n\t "
"Min=[%"PRId64"(Rows)] Max=[%"PRId64"(Rows)] Avg=[%"PRId64"(Rows)] Stddev=[%.2f] \n\t " "Min=[%"PRId64"(Rows)] Max=[%"PRId64"(Rows)] Avg=[%"PRId64"(Rows)] Stddev=[%.2f] \n\t "
"Rows=[%"PRIu64"], Blocks=[%"PRId64"], SmallBlocks=[%d], Size=[%.3f(Kb)] Comp=[%.2f]\n\t " "Rows=[%"PRIu64"], Blocks=[%"PRId64"], SmallBlocks=[%d], Size=[%.3f(Kb)] Comp=[%.5g]\n\t "
"RowsInMem=[%d] \n\t", "RowsInMem=[%d] \n\t",
percentiles[0], percentiles[1], percentiles[2], percentiles[3], percentiles[4], percentiles[5], percentiles[0], percentiles[1], percentiles[2], percentiles[3], percentiles[4], percentiles[5],
percentiles[6], percentiles[7], percentiles[8], percentiles[9], percentiles[10], percentiles[11], percentiles[6], percentiles[7], percentiles[8], percentiles[9], percentiles[10], percentiles[11],
......
...@@ -1385,66 +1385,63 @@ static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock, ...@@ -1385,66 +1385,63 @@ static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock,
return code; return code;
} }
static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) { // search last keyList[ret] < key order asc and keyList[ret] > key order desc
int firstPos, lastPos, midPos = -1; static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int order) {
int numOfRows; // start end posistion
TSKEY* keyList; int s, e;
s = pos;
assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
// check
if (num <= 0) return -1; assert(pos >=0 && pos < num);
assert(num > 0);
keyList = (TSKEY*)pValue;
firstPos = 0; if (order == TSDB_ORDER_ASC) {
lastPos = num - 1;
if (order == TSDB_ORDER_DESC) {
// find the first position which is smaller than the key // find the first position which is smaller than the key
e = num - 1;
if (key < keyList[pos])
return -1;
while (1) { while (1) {
if (key >= keyList[lastPos]) return lastPos; // check can return
if (key == keyList[firstPos]) return firstPos; if (key >= keyList[e])
if (key < keyList[firstPos]) return firstPos - 1; return e;
if (key <= keyList[s])
numOfRows = lastPos - firstPos + 1; return s;
midPos = (numOfRows >> 1) + firstPos; if (e - s <= 1)
return s;
if (key < keyList[midPos]) {
lastPos = midPos - 1; // change start or end position
} else if (key > keyList[midPos]) { int mid = s + (e - s + 1)/2;
firstPos = midPos + 1; if (keyList[mid] > key)
} else { e = mid;
break; else if(keyList[mid] < key)
} s = mid;
} else
return mid;
} else { }
} else { // DESC
// find the first position which is bigger than the key // find the first position which is bigger than the key
while (1) { e = 0;
if (key <= keyList[firstPos]) return firstPos; if (key > keyList[pos])
if (key == keyList[lastPos]) return lastPos; return -1;
while (1) {
if (key > keyList[lastPos]) { // check can return
lastPos = lastPos + 1; if (key <= keyList[e])
if (lastPos >= num) return e;
return -1; if (key >= keyList[s])
return s;
if (s - e <= 1)
return s;
// change start or end position
int mid = s - (s - e + 1)/2;
if (keyList[mid] < key)
e = mid;
else if(keyList[mid] > key)
s = mid;
else else
return lastPos; return mid;
} }
numOfRows = lastPos - firstPos + 1;
midPos = (numOfRows >> 1) + firstPos;
if (key < keyList[midPos]) {
lastPos = midPos - 1;
} else if (key > keyList[midPos]) {
firstPos = midPos + 1;
} else {
break;
}
} }
}
return midPos;
} }
static int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) { static int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) {
...@@ -1852,7 +1849,6 @@ static void copyAllRemainRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STabl ...@@ -1852,7 +1849,6 @@ static void copyAllRemainRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STabl
int32_t getEndPosInDataBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBlockInfo) { int32_t getEndPosInDataBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBlockInfo) {
// NOTE: reverse the order to find the end position in data block // NOTE: reverse the order to find the end position in data block
int32_t endPos = -1; int32_t endPos = -1;
int32_t order = ASCENDING_TRAVERSE(pQueryHandle->order)? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
SDataCols* pCols = pQueryHandle->rhelper.pDCols[0]; SDataCols* pCols = pQueryHandle->rhelper.pDCols[0];
...@@ -1865,7 +1861,9 @@ int32_t getEndPosInDataBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBl ...@@ -1865,7 +1861,9 @@ int32_t getEndPosInDataBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBl
cur->mixBlock = (cur->pos != pBlockInfo->rows - 1); cur->mixBlock = (cur->pos != pBlockInfo->rows - 1);
} else { } else {
assert(pCols->numOfRows > 0); assert(pCols->numOfRows > 0);
endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pQueryHandle->window.ekey, order); int pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0 : pBlockInfo->rows - 1;
endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pos, pQueryHandle->window.ekey, pQueryHandle->order);
assert(endPos != -1);
cur->mixBlock = true; cur->mixBlock = true;
} }
...@@ -1885,17 +1883,16 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1885,17 +1883,16 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_INDEX && assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_INDEX &&
cur->pos >= 0 && cur->pos < pBlock->numOfRows); cur->pos >= 0 && cur->pos < pBlock->numOfRows);
TSKEY* tsArray = pCols->cols[0].pData; // key read from file
assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst && tsArray[pBlock->numOfRows-1] == pBlock->keyLast); TSKEY* keyFile = pCols->cols[0].pData;
assert(pCols->numOfRows == pBlock->numOfRows && keyFile[0] == pBlock->keyFirst && keyFile[pBlock->numOfRows-1] == pBlock->keyLast);
// for search the endPos, so the order needs to reverse
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1; int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle)); int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle));
STable* pTable = pCheckInfo->pTableObj; STable* pTable = pCheckInfo->pTableObj;
int32_t endPos = getEndPosInDataBlock(pQueryHandle, &blockInfo); int32_t endPos = getEndPosInDataBlock(pQueryHandle, &blockInfo);
tsdbDebug("%p uid:%" PRIu64",tid:%d start merge data block, file block range:%"PRIu64"-%"PRIu64" rows:%d, start:%d," tsdbDebug("%p uid:%" PRIu64",tid:%d start merge data block, file block range:%"PRIu64"-%"PRIu64" rows:%d, start:%d,"
"end:%d, 0x%"PRIx64, "end:%d, 0x%"PRIx64,
...@@ -1910,6 +1907,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1910,6 +1907,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
STSchema* pSchema1 = NULL; STSchema* pSchema1 = NULL;
STSchema* pSchema2 = NULL; STSchema* pSchema2 = NULL;
// position in file ->fpos
int32_t pos = cur->pos; int32_t pos = cur->pos;
cur->win = TSWINDOW_INITIALIZER; cur->win = TSWINDOW_INITIALIZER;
...@@ -1926,19 +1924,23 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1926,19 +1924,23 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
break; break;
} }
TSKEY key = memRowKey(row1); TSKEY keyMem = memRowKey(row1);
if ((key > pQueryHandle->window.ekey && ASCENDING_TRAVERSE(pQueryHandle->order)) || if ((keyMem > pQueryHandle->window.ekey && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key < pQueryHandle->window.ekey && !ASCENDING_TRAVERSE(pQueryHandle->order))) { (keyMem < pQueryHandle->window.ekey && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
break; break;
} }
if (((pos > endPos || tsArray[pos] > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) || // break if pos not in this block endPos range. note old code when pos is -1 can crash.
((pos < endPos || tsArray[pos] < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))) { if(ASCENDING_TRAVERSE(pQueryHandle->order)) { //ASC
break; if(pos > endPos || keyFile[pos] > pQueryHandle->window.ekey)
break;
} else { //DESC
if(pos < endPos || keyFile[pos] < pQueryHandle->window.ekey)
break;
} }
if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) || if ((keyMem < keyFile[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) { (keyMem > keyFile[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
if (rv1 != memRowVersion(row1)) { if (rv1 != memRowVersion(row1)) {
pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1), (int8_t)memRowType(row1)); pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1), (int8_t)memRowType(row1));
rv1 = memRowVersion(row1); rv1 = memRowVersion(row1);
...@@ -1950,16 +1952,18 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1950,16 +1952,18 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
mergeTwoRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pTable, pSchema1, pSchema2, true); mergeTwoRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pTable, pSchema1, pSchema2, true);
numOfRows += 1; numOfRows += 1;
// record start key with memory key if not
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key; cur->win.skey = keyMem;
} }
cur->win.ekey = key; cur->win.ekey = keyMem;
cur->lastKey = key + step; cur->lastKey = keyMem + step;
cur->mixBlock = true; cur->mixBlock = true;
moveToNextRowInMem(pCheckInfo); moveToNextRowInMem(pCheckInfo);
} else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it // same select mem key if update is true
} else if (keyMem == keyFile[pos]) {
if (pCfg->update) { if (pCfg->update) {
if(pCfg->update == TD_ROW_PARTIAL_UPDATE) { if(pCfg->update == TD_ROW_PARTIAL_UPDATE) {
doCopyRowsFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, pos, pos); doCopyRowsFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, pos, pos);
...@@ -1977,31 +1981,36 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1977,31 +1981,36 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
mergeTwoRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pTable, pSchema1, pSchema2, forceSetNull); mergeTwoRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pTable, pSchema1, pSchema2, forceSetNull);
numOfRows += 1; numOfRows += 1;
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key; cur->win.skey = keyMem;
} }
cur->win.ekey = key; cur->win.ekey = keyMem;
cur->lastKey = key + step; cur->lastKey = keyMem + step;
cur->mixBlock = true; cur->mixBlock = true;
//mem move next
moveToNextRowInMem(pCheckInfo); moveToNextRowInMem(pCheckInfo);
//file move next, discard file row
pos += step; pos += step;
} else { } else {
// not update, only mem move to next, discard mem row
moveToNextRowInMem(pCheckInfo); moveToNextRowInMem(pCheckInfo);
} }
} else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) || // put file row
(key < tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) { } else if ((keyMem > keyFile[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(keyMem < keyFile[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = tsArray[pos]; cur->win.skey = keyFile[pos];
} }
int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order); int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pos, keyMem, pQueryHandle->order);
assert(end != -1); assert(end != -1);
if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it if (keyFile[end] == keyMem) { // the value of key in cache equals to the end timestamp value, ignore it
if (pCfg->update == TD_ROW_DISCARD_UPDATE) { if (pCfg->update == TD_ROW_DISCARD_UPDATE) {
moveToNextRowInMem(pCheckInfo); moveToNextRowInMem(pCheckInfo);
} else { } else {
// can update, don't copy then deal on next loop with keyMem == keyFile[pos]
end -= step; end -= step;
} }
} }
...@@ -2009,10 +2018,17 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -2009,10 +2018,17 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
int32_t qstart = 0, qend = 0; int32_t qstart = 0, qend = 0;
getQualifiedRowsPos(pQueryHandle, pos, end, numOfRows, &qstart, &qend); getQualifiedRowsPos(pQueryHandle, pos, end, numOfRows, &qstart, &qend);
numOfRows = doCopyRowsFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, qstart, qend); if(qend >= qstart) {
pos += (qend - qstart + 1) * step; // copy qend - qstart + 1 rows from file
numOfRows = doCopyRowsFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, qstart, qend);
cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? tsArray[qend]:tsArray[qstart]; int32_t num = qend - qstart + 1;
pos += num * step;
} else {
// nothing copy from file
pos += step;
}
cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? keyFile[qend] : keyFile[qstart];
cur->lastKey = cur->win.ekey + step; cur->lastKey = cur->win.ekey + step;
} }
} while (numOfRows < pQueryHandle->outputCapacity); } while (numOfRows < pQueryHandle->outputCapacity);
...@@ -2029,7 +2045,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -2029,7 +2045,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
!ASCENDING_TRAVERSE(pQueryHandle->order))) { !ASCENDING_TRAVERSE(pQueryHandle->order))) {
// no data in cache or data in cache is greater than the ekey of time window, load data from file block // no data in cache or data in cache is greater than the ekey of time window, load data from file block
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = tsArray[pos]; cur->win.skey = keyFile[pos];
} }
int32_t start = -1, end = -1; int32_t start = -1, end = -1;
...@@ -2038,7 +2054,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -2038,7 +2054,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
numOfRows = doCopyRowsFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end); numOfRows = doCopyRowsFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
pos += (end - start + 1) * step; pos += (end - start + 1) * step;
cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? tsArray[end]:tsArray[start]; cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? keyFile[end] : keyFile[start];
cur->lastKey = cur->win.ekey + step; cur->lastKey = cur->win.ekey + step;
cur->mixBlock = true; cur->mixBlock = true;
} }
...@@ -2969,6 +2985,9 @@ static bool loadDataBlockFromTableSeq(STsdbQueryHandle* pQueryHandle) { ...@@ -2969,6 +2985,9 @@ static bool loadDataBlockFromTableSeq(STsdbQueryHandle* pQueryHandle) {
// handle data in cache situation // handle data in cache situation
bool tsdbNextDataBlock(TsdbQueryHandleT pHandle) { bool tsdbNextDataBlock(TsdbQueryHandleT pHandle) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle; STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
if (pQueryHandle == NULL) {
return false;
}
if (emptyQueryTimewindow(pQueryHandle)) { if (emptyQueryTimewindow(pQueryHandle)) {
tsdbDebug("%p query window not overlaps with the data set, no result returned, 0x%"PRIx64, pQueryHandle, pQueryHandle->qId); tsdbDebug("%p query window not overlaps with the data set, no result returned, 0x%"PRIx64, pQueryHandle, pQueryHandle->qId);
...@@ -3088,6 +3107,9 @@ static int32_t doGetExternalRow(STsdbQueryHandle* pQueryHandle, int16_t type, SM ...@@ -3088,6 +3107,9 @@ static int32_t doGetExternalRow(STsdbQueryHandle* pQueryHandle, int16_t type, SM
pSecQueryHandle = tsdbQueryTablesImpl(pQueryHandle->pTsdb, &cond, pQueryHandle->qId, pMemRef); pSecQueryHandle = tsdbQueryTablesImpl(pQueryHandle->pTsdb, &cond, pQueryHandle->qId, pMemRef);
tfree(cond.colList); tfree(cond.colList);
if (pSecQueryHandle == NULL) {
goto out_of_memory;
}
// current table, only one table // current table, only one table
STableCheckInfo* pCurrent = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex); STableCheckInfo* pCurrent = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册