提交 c29ff049 编写于 作者: H Haojun Liao

[td-90] support multi-version SDataRow in cache

上级 73075efb
......@@ -269,8 +269,7 @@ void dataColSetNullAt(SDataCol *pCol, int index) {
if (IS_VAR_DATA_TYPE(pCol->type)) {
pCol->dataOff[index] = pCol->len;
char *ptr = POINTER_SHIFT(pCol->pData, pCol->len);
varDataLen(ptr) = (pCol->type == TSDB_DATA_TYPE_BINARY) ? sizeof(char) : TSDB_NCHAR_SIZE;
setNull(varDataVal(ptr), pCol->type, pCol->bytes);
setVardataNull(ptr, pCol->type);
pCol->len += varDataTLen(ptr);
} else {
setNull(POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * index), pCol->type, pCol->bytes);
......
......@@ -233,8 +233,6 @@ TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TsdbRepoT *tsdb, STsdbQueryCond*
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo);
pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL;
// pQueryHandle->outputCapacity = 2; // only allowed two rows to be loaded
changeQueryHandleForInterpQuery(pQueryHandle);
return pQueryHandle;
}
......@@ -618,54 +616,19 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock, STableCheckInfo* pCheckInfo){
SQueryFilePos* cur = &pQueryHandle->cur;
SDataBlockInfo binfo = getTrueDataBlockInfo(pCheckInfo, pBlock);
/*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo);
TSKEY k1 = TSKEY_INITIAL_VAL, k2 = TSKEY_INITIAL_VAL;
if (pCheckInfo->iter != NULL && tSkipListIterGet(pCheckInfo->iter) != NULL) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
SDataRow row = SL_GET_NODE_DATA(node);
k1 = dataRowKey(row);
if (k1 == binfo.window.skey) {
if (tSkipListIterNext(pCheckInfo->iter)) {
node = tSkipListIterGet(pCheckInfo->iter);
row = SL_GET_NODE_DATA(node);
k1 = dataRowKey(row);
} else {
k1 = TSKEY_INITIAL_VAL;
}
}
}
if (pCheckInfo->iiter != NULL && tSkipListIterGet(pCheckInfo->iiter) != NULL) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
SDataRow row = SL_GET_NODE_DATA(node);
k2 = dataRowKey(row);
if (k2 == binfo.window.skey) {
if (tSkipListIterNext(pCheckInfo->iiter)) {
node = tSkipListIterGet(pCheckInfo->iiter);
row = SL_GET_NODE_DATA(node);
k2 = dataRowKey(row);
} else {
k2 = TSKEY_INITIAL_VAL;
}
}
}
SDataRow row = getSDataRowInTableMem(pCheckInfo);
TSKEY key = (row != NULL)? dataRowKey(row):TSKEY_INITIAL_VAL;
cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(binfo.rows-1);
if ((ASCENDING_TRAVERSE(pQueryHandle->order) &&
((k1 != TSKEY_INITIAL_VAL && k1 <= binfo.window.ekey) || (k2 != TSKEY_INITIAL_VAL && k2 <= binfo.window.ekey))) ||
(!ASCENDING_TRAVERSE(pQueryHandle->order) &&
((k1 != TSKEY_INITIAL_VAL && k1 >= binfo.window.skey) || (k2 != TSKEY_INITIAL_VAL && k2 >= binfo.window.skey)))) {
if ((ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
(!ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
if ((ASCENDING_TRAVERSE(pQueryHandle->order) &&
((k1 != TSKEY_INITIAL_VAL && k1 < binfo.window.skey) || (k2 != TSKEY_INITIAL_VAL && k2 < binfo.window.skey))) ||
(!ASCENDING_TRAVERSE(pQueryHandle->order) &&
(((k1 != TSKEY_INITIAL_VAL && k1 > binfo.window.skey) || (k2 != TSKEY_INITIAL_VAL && k2 > binfo.window.skey))))) {
if ((ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
(!ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) {
// do not load file block into buffer
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1;
......@@ -756,7 +719,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
return pQueryHandle->realNumOfRows > 0;
}
static int vnodeBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
int firstPos, lastPos, midPos = -1;
int numOfRows;
TSKEY* keyList;
......@@ -868,37 +831,63 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap
return numOfRows + num;
}
static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, int32_t capacity,
int32_t numOfRows, SDataRow row, STSchema* pSchema) {
int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);
int32_t numOfTableCols = schemaNCols(pSchema);
static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, SDataRow row,
STsdbMeta *pMeta, int32_t numOfCols, STable* pTable) {
char* pData = NULL;
for (int32_t i = 0; i < numOfCols; ++i) {
// the schema version info is embeded in SDataRow
STSchema* pSchema = tsdbGetTableSchemaByVersion(pMeta, pTable, dataRowVersion(row));
int32_t numOfRowCols = schemaNCols(pSchema);
int32_t i = 0, j = 0;
while(i < numOfCols && j < numOfRowCols) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
if (pSchema->columns[j].colId < pColInfo->info.colId) {
j++;
continue;
}
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
} else {
pData = pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes;
}
int32_t offset = 0;
for (int32_t j = 0; j < numOfTableCols; ++j) {
if (pColInfo->info.colId == pSchema->columns[j].colId) {
offset = pSchema->columns[j].offset;
break;
if (pSchema->columns[j].colId == pColInfo->info.colId) {
void* value = tdGetRowDataOfCol(row, pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset);
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
memcpy(pData, value, varDataTLen(value));
} else {
memcpy(pData, value, pColInfo->info.bytes);
}
j++;
i++;
} else { // pColInfo->info.colId < pSchema->columns[j].colId, it is a NULL data
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pData, pColInfo->info.type);
} else {
setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
}
i++;
}
assert(offset != -1); // todo handle error
void* value = tdGetRowDataOfCol(row, pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + offset);
}
while (i < numOfCols) { // the remain columns are all null data
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
} else {
pData = pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes;
}
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
memcpy(pData, value, varDataTLen(value));
setVardataNull(pData, pColInfo->info.type);
} else {
memcpy(pData, value, pColInfo->info.bytes);
setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
}
i++;
}
}
......@@ -911,7 +900,16 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
initTableMemIterator(pQueryHandle, pCheckInfo);
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
// for search the endPos, so the order needs to reverse
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);
STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb);
STable* pTable = pCheckInfo->pTableObj;
int32_t endPos = cur->pos;
if (ASCENDING_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) {
endPos = blockInfo.rows - 1;
......@@ -920,8 +918,8 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
endPos = 0;
cur->mixBlock = (cur->pos != blockInfo.rows - 1);
} else {
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
endPos = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pQueryHandle->window.ekey, order);
assert(pCols->numOfRows > 0);
endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pQueryHandle->window.ekey, order);
cur->mixBlock = true;
}
......@@ -933,8 +931,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
int32_t numOfRows = 0;
pQueryHandle->cur.win = TSWINDOW_INITIALIZER;
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
// no data in buffer, load data from file directly
if (pCheckInfo->iiter == NULL && pCheckInfo->iter == NULL) {
int32_t start = cur->pos;
......@@ -950,12 +947,11 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
// todo opt in case of no data in buffer
numOfRows = copyDataFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
if (!ASCENDING_TRAVERSE(pQueryHandle->order) && numOfRows < pQueryHandle->outputCapacity) {
int32_t emptySize = pQueryHandle->outputCapacity - numOfRows;
int32_t reqNumOfCols = taosArrayGetSize(pQueryHandle->pColumns);
for(int32_t i = 0; i < reqNumOfCols; ++i) {
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
}
......@@ -969,20 +965,15 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
pQueryHandle->realNumOfRows = numOfRows;
cur->rows = numOfRows;
return;
} else if (pCheckInfo->iter != NULL && pCheckInfo->iiter == NULL) {
// } else if (pCheckInfo->iter == NULL && pCheckInfo->iiter != NULL) {
// } else { // iter and iiter are all not NULL, three-way merge data block
STSchema* pSchema = tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj);
} else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
SSkipListNode* node = NULL;
do {
node = tSkipListIterGet(pCheckInfo->iter);
if (node == NULL) {
SDataRow row = getSDataRowInTableMem(pCheckInfo);
if (row == NULL) {
break;
}
SDataRow row = SL_GET_NODE_DATA(node);
TSKEY key = dataRowKey(row);
TSKEY key = dataRowKey(row);
if ((key > pQueryHandle->window.ekey && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key < pQueryHandle->window.ekey && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
break;
......@@ -995,7 +986,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
copyOneRowFromMem(pQueryHandle, pCheckInfo, pQueryHandle->outputCapacity, numOfRows, row, pSchema);
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, pMeta, numOfCols, pTable);
numOfRows += 1;
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key;
......@@ -1005,17 +996,16 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
cur->lastKey = key + step;
cur->mixBlock = true;
tSkipListIterNext(pCheckInfo->iter);
moveToNextRow(pCheckInfo);
} else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it
tSkipListIterNext(pCheckInfo->iter);
moveToNextRow(pCheckInfo);
} else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key < tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = tsArray[pos];
}
int32_t order = ASCENDING_TRAVERSE(pQueryHandle->order) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
int32_t end = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order);
int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order);
if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it
tSkipListIterNext(pCheckInfo->iter);
}
......@@ -1093,9 +1083,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
if (numOfRows < pQueryHandle->outputCapacity) {
int32_t emptySize = pQueryHandle->outputCapacity - numOfRows;
int32_t requiredNumOfCols = taosArrayGetSize(pQueryHandle->pColumns);
for(int32_t i = 0; i < requiredNumOfCols; ++i) {
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
}
......@@ -1567,9 +1555,6 @@ void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) {
for(int32_t i = 0; i < numOfTables; ++i) {
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
if (pCheckInfo->pTableObj->tableId.uid == 12094628167747) {
printf("abc\n");
}
if (pCheckInfo->pTableObj->lastKey > key) {
key = pCheckInfo->pTableObj->lastKey;
index = i;
......@@ -1652,9 +1637,9 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
*skey = TSKEY_INITIAL_VAL;
int64_t st = taosGetTimestampUs();
STSchema* pSchema = tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj);
int32_t numOfTableCols = schemaNCols(pSchema);
STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb);
STable* pTable = pCheckInfo->pTableObj;
do {
SDataRow row = getSDataRowInTableMem(pCheckInfo);
if (row == NULL) {
......@@ -1662,10 +1647,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
}
TSKEY key = dataRowKey(row);
if ((key > maxKey && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key < maxKey && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
if ((key > maxKey && ASCENDING_TRAVERSE(pQueryHandle->order)) || (key < maxKey && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
tsdbTrace("%p key:%"PRIu64" beyond qrange:%"PRId64" - %"PRId64", no more data in buffer", pQueryHandle, key, pQueryHandle->window.skey,
pQueryHandle->window.ekey);
......@@ -1677,59 +1659,8 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
}
*ekey = key;
char* pData = NULL;
int32_t i = 0, j = 0;
while(i < numOfCols && j < numOfTableCols) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
if (pSchema->columns[j].colId < pColInfo->info.colId) {
j++;
continue;
}
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
} else {
pData = pColInfo->pData + (maxRowsToRead - numOfRows - 1) * pColInfo->info.bytes;
}
if (pSchema->columns[j].colId == pColInfo->info.colId) {
void* value = tdGetRowDataOfCol(row, pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset);
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
memcpy(pData, value, varDataTLen(value));
} else {
memcpy(pData, value, pColInfo->info.bytes);
}
j++;
i++;
} else { // pColInfo->info.colId < pSchema->columns[j].colId, it is a NULL data
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pData, pColInfo->info.type);
} else {
setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
}
i++;
}
}
while (i < numOfCols) { // the remain columns are all null data
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
} else {
pData = pColInfo->pData + (maxRowsToRead - numOfRows - 1) * pColInfo->info.bytes;
}
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pData, pColInfo->info.type);
} else {
setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
}
i++;
}
copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, pMeta, numOfCols, pTable);
if (++numOfRows >= maxRowsToRead) {
moveToNextRow(pCheckInfo);
break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册