提交 f6c0f746 编写于 作者: S Shengliang Guan

merge from 2.0 into master

......@@ -121,7 +121,7 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) {
}
if (!tsMnodeShowMetaFp[pShowMsg->type] || !tsMnodeShowRetrieveFp[pShowMsg->type]) {
mError("show type:%s is not support", mnodeGetShowType(pShowMsg->type));
mWarn("show type:%s is not support", mnodeGetShowType(pShowMsg->type));
return TSDB_CODE_COM_OPS_NOT_SUPPORT;
}
......
......@@ -4246,7 +4246,7 @@ void generateBlockDistResult(STableBlockDist *pTableBlockDist, char* result) {
"5th=[%d], 10th=[%d], 20th=[%d], 30th=[%d], 40th=[%d], 50th=[%d]\n\t "
"60th=[%d], 70th=[%d], 80th=[%d], 90th=[%d], 95th=[%d], 99th=[%d]\n\t "
"Min=[%"PRId64"(Rows)] Max=[%"PRId64"(Rows)] Avg=[%"PRId64"(Rows)] Stddev=[%.2f] \n\t "
"Rows=[%"PRIu64"], Blocks=[%"PRId64"], SmallBlocks=[%d], Size=[%.3f(Kb)] Comp=[%.2f]\n\t "
"Rows=[%"PRIu64"], Blocks=[%"PRId64"], SmallBlocks=[%d], Size=[%.3f(Kb)] Comp=[%.5g]\n\t "
"RowsInMem=[%d] \n\t",
percentiles[0], percentiles[1], percentiles[2], percentiles[3], percentiles[4], percentiles[5],
percentiles[6], percentiles[7], percentiles[8], percentiles[9], percentiles[10], percentiles[11],
......
......@@ -1385,66 +1385,63 @@ static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock,
return code;
}
static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
int firstPos, lastPos, midPos = -1;
int numOfRows;
TSKEY* keyList;
assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
if (num <= 0) return -1;
keyList = (TSKEY*)pValue;
firstPos = 0;
lastPos = num - 1;
if (order == TSDB_ORDER_DESC) {
// search last keyList[ret] < key order asc and keyList[ret] > key order desc
static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int order) {
// start end posistion
int s, e;
s = pos;
// check
assert(pos >=0 && pos < num);
assert(num > 0);
if (order == TSDB_ORDER_ASC) {
// find the first position which is smaller than the key
e = num - 1;
if (key < keyList[pos])
return -1;
while (1) {
if (key >= keyList[lastPos]) return lastPos;
if (key == keyList[firstPos]) return firstPos;
if (key < keyList[firstPos]) return firstPos - 1;
numOfRows = lastPos - firstPos + 1;
midPos = (numOfRows >> 1) + firstPos;
if (key < keyList[midPos]) {
lastPos = midPos - 1;
} else if (key > keyList[midPos]) {
firstPos = midPos + 1;
} else {
break;
}
}
} else {
// check can return
if (key >= keyList[e])
return e;
if (key <= keyList[s])
return s;
if (e - s <= 1)
return s;
// change start or end position
int mid = s + (e - s + 1)/2;
if (keyList[mid] > key)
e = mid;
else if(keyList[mid] < key)
s = mid;
else
return mid;
}
} else { // DESC
// find the first position which is bigger than the key
while (1) {
if (key <= keyList[firstPos]) return firstPos;
if (key == keyList[lastPos]) return lastPos;
if (key > keyList[lastPos]) {
lastPos = lastPos + 1;
if (lastPos >= num)
return -1;
e = 0;
if (key > keyList[pos])
return -1;
while (1) {
// check can return
if (key <= keyList[e])
return e;
if (key >= keyList[s])
return s;
if (s - e <= 1)
return s;
// change start or end position
int mid = s - (s - e + 1)/2;
if (keyList[mid] < key)
e = mid;
else if(keyList[mid] > key)
s = mid;
else
return lastPos;
}
numOfRows = lastPos - firstPos + 1;
midPos = (numOfRows >> 1) + firstPos;
if (key < keyList[midPos]) {
lastPos = midPos - 1;
} else if (key > keyList[midPos]) {
firstPos = midPos + 1;
} else {
break;
}
return mid;
}
}
}
return midPos;
}
static int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) {
......@@ -1852,7 +1849,6 @@ static void copyAllRemainRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STabl
int32_t getEndPosInDataBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBlockInfo) {
// NOTE: reverse the order to find the end position in data block
int32_t endPos = -1;
int32_t order = ASCENDING_TRAVERSE(pQueryHandle->order)? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
SQueryFilePos* cur = &pQueryHandle->cur;
SDataCols* pCols = pQueryHandle->rhelper.pDCols[0];
......@@ -1865,7 +1861,9 @@ int32_t getEndPosInDataBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBl
cur->mixBlock = (cur->pos != pBlockInfo->rows - 1);
} else {
assert(pCols->numOfRows > 0);
endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pQueryHandle->window.ekey, order);
int pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0 : pBlockInfo->rows - 1;
endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pos, pQueryHandle->window.ekey, pQueryHandle->order);
assert(endPos != -1);
cur->mixBlock = true;
}
......@@ -1885,17 +1883,16 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_INDEX &&
cur->pos >= 0 && cur->pos < pBlock->numOfRows);
TSKEY* tsArray = pCols->cols[0].pData;
assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst && tsArray[pBlock->numOfRows-1] == pBlock->keyLast);
// for search the endPos, so the order needs to reverse
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
// key read from file
TSKEY* keyFile = pCols->cols[0].pData;
assert(pCols->numOfRows == pBlock->numOfRows && keyFile[0] == pBlock->keyFirst && keyFile[pBlock->numOfRows-1] == pBlock->keyLast);
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle));
STable* pTable = pCheckInfo->pTableObj;
int32_t endPos = getEndPosInDataBlock(pQueryHandle, &blockInfo);
tsdbDebug("%p uid:%" PRIu64",tid:%d start merge data block, file block range:%"PRIu64"-%"PRIu64" rows:%d, start:%d,"
"end:%d, 0x%"PRIx64,
......@@ -1910,6 +1907,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
STSchema* pSchema1 = NULL;
STSchema* pSchema2 = NULL;
// position in file ->fpos
int32_t pos = cur->pos;
cur->win = TSWINDOW_INITIALIZER;
......@@ -1932,9 +1930,13 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
break;
}
if (((pos > endPos || tsArray[pos] > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
((pos < endPos || tsArray[pos] < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
break;
// break if pos not in this block endPos range. note old code when pos is -1 can crash.
if(ASCENDING_TRAVERSE(pQueryHandle->order)) { //ASC
if(pos > endPos || keyFile[pos] > pQueryHandle->window.ekey)
break;
} else { //DESC
if(pos < endPos || keyFile[pos] < pQueryHandle->window.ekey)
break;
}
if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
......@@ -1950,16 +1952,18 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
mergeTwoRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pTable, pSchema1, pSchema2, true);
numOfRows += 1;
// record start key with memory key if not
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key;
cur->win.skey = keyMem;
}
cur->win.ekey = key;
cur->lastKey = key + step;
cur->win.ekey = keyMem;
cur->lastKey = keyMem + step;
cur->mixBlock = true;
moveToNextRowInMem(pCheckInfo);
} else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it
// same select mem key if update is true
} else if (keyMem == keyFile[pos]) {
if (pCfg->update) {
if(pCfg->update == TD_ROW_PARTIAL_UPDATE) {
doCopyRowsFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, pos, pos);
......@@ -1977,31 +1981,36 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
mergeTwoRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pTable, pSchema1, pSchema2, forceSetNull);
numOfRows += 1;
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key;
cur->win.skey = keyMem;
}
cur->win.ekey = key;
cur->lastKey = key + step;
cur->win.ekey = keyMem;
cur->lastKey = keyMem + step;
cur->mixBlock = true;
//mem move next
moveToNextRowInMem(pCheckInfo);
//file move next, discard file row
pos += step;
} else {
// not update, only mem move to next, discard mem row
moveToNextRowInMem(pCheckInfo);
}
} else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key < tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
// put file row
} else if ((keyMem > keyFile[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(keyMem < keyFile[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = tsArray[pos];
cur->win.skey = keyFile[pos];
}
int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order);
int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pos, keyMem, pQueryHandle->order);
assert(end != -1);
if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it
if (pCfg->update == TD_ROW_DISCARD_UPDATE) {
moveToNextRowInMem(pCheckInfo);
} else {
// can update, don't copy then deal on next loop with keyMem == keyFile[pos]
end -= step;
}
}
......@@ -2009,10 +2018,17 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
int32_t qstart = 0, qend = 0;
getQualifiedRowsPos(pQueryHandle, pos, end, numOfRows, &qstart, &qend);
numOfRows = doCopyRowsFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, qstart, qend);
pos += (qend - qstart + 1) * step;
cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? tsArray[qend]:tsArray[qstart];
if(qend >= qstart) {
// copy qend - qstart + 1 rows from file
numOfRows = doCopyRowsFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, qstart, qend);
int32_t num = qend - qstart + 1;
pos += num * step;
} else {
// nothing copy from file
pos += step;
}
cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? keyFile[qend] : keyFile[qstart];
cur->lastKey = cur->win.ekey + step;
}
} while (numOfRows < pQueryHandle->outputCapacity);
......@@ -2029,7 +2045,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
!ASCENDING_TRAVERSE(pQueryHandle->order))) {
// no data in cache or data in cache is greater than the ekey of time window, load data from file block
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = tsArray[pos];
cur->win.skey = keyFile[pos];
}
int32_t start = -1, end = -1;
......@@ -2038,7 +2054,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
numOfRows = doCopyRowsFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
pos += (end - start + 1) * step;
cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? tsArray[end]:tsArray[start];
cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? keyFile[end] : keyFile[start];
cur->lastKey = cur->win.ekey + step;
cur->mixBlock = true;
}
......@@ -2969,6 +2985,9 @@ static bool loadDataBlockFromTableSeq(STsdbQueryHandle* pQueryHandle) {
// handle data in cache situation
bool tsdbNextDataBlock(TsdbQueryHandleT pHandle) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
if (pQueryHandle == NULL) {
return false;
}
if (emptyQueryTimewindow(pQueryHandle)) {
tsdbDebug("%p query window not overlaps with the data set, no result returned, 0x%"PRIx64, pQueryHandle, pQueryHandle->qId);
......@@ -3088,6 +3107,9 @@ static int32_t doGetExternalRow(STsdbQueryHandle* pQueryHandle, int16_t type, SM
pSecQueryHandle = tsdbQueryTablesImpl(pQueryHandle->pTsdb, &cond, pQueryHandle->qId, pMemRef);
tfree(cond.colList);
if (pSecQueryHandle == NULL) {
goto out_of_memory;
}
// current table, only one table
STableCheckInfo* pCurrent = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册