未验证 提交 c638f9d5 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #14889 from taosdata/feature/3_liaohj

fix(query):fix invalid write in sample query processing.
...@@ -542,8 +542,10 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) { ...@@ -542,8 +542,10 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) {
} }
int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
pBlock->info.rows = *(int32_t*)buf; int32_t numOfRows = *(int32_t*) buf;
blockDataEnsureCapacity(pBlock, numOfRows);
pBlock->info.rows = numOfRows;
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
const char* pStart = buf + sizeof(uint32_t); const char* pStart = buf + sizeof(uint32_t);
...@@ -589,6 +591,7 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { ...@@ -589,6 +591,7 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// todo remove this
int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity) { int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity) {
pBlock->info.rows = *(int32_t*)buf; pBlock->info.rows = *(int32_t*)buf;
pBlock->info.groupId = *(uint64_t*)(buf + sizeof(int32_t)); pBlock->info.groupId = *(uint64_t*)(buf + sizeof(int32_t));
......
...@@ -53,7 +53,7 @@ static void setNullRow(SSDataBlock* pBlock, int64_t ts, int32_t rowIndex) { ...@@ -53,7 +53,7 @@ static void setNullRow(SSDataBlock* pBlock, int64_t ts, int32_t rowIndex) {
// the first are always the timestamp column, so start from the second column. // the first are always the timestamp column, so start from the second column.
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, i);
if (p->info.type == TSDB_DATA_TYPE_TIMESTAMP) { if (p->info.type == TSDB_DATA_TYPE_TIMESTAMP) { // handle timestamp
colDataAppend(p, rowIndex, (const char*)&ts, false); colDataAppend(p, rowIndex, (const char*)&ts, false);
} else { } else {
colDataAppendNULL(p, rowIndex); colDataAppendNULL(p, rowIndex);
...@@ -83,16 +83,21 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock* ...@@ -83,16 +83,21 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
if (pFillInfo->type == TSDB_FILL_PREV) { if (pFillInfo->type == TSDB_FILL_PREV) {
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev : pFillInfo->next; SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev : pFillInfo->next;
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) { for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i]; SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (TSDB_COL_IS_TAG(pCol->flag)) { if (TSDB_COL_IS_TAG(pCol->flag)) {
continue; continue;
} }
SGroupKeys* pKey = taosArrayGet(p, i);
SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol)); SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol));
if (pDstColInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
colDataAppend(pDstColInfoData, index, (const char*)&ts, false);
} else {
SGroupKeys* pKey = taosArrayGet(p, i);
doSetVal(pDstColInfoData, index, pKey); doSetVal(pDstColInfoData, index, pKey);
} }
}
} else if (pFillInfo->type == TSDB_FILL_NEXT) { } else if (pFillInfo->type == TSDB_FILL_NEXT) {
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next : pFillInfo->prev; SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next : pFillInfo->prev;
// todo refactor: start from 0 not 1 // todo refactor: start from 0 not 1
...@@ -264,9 +269,8 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t ...@@ -264,9 +269,8 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
assert(pFillInfo->currentKey == ts); assert(pFillInfo->currentKey == ts);
if (pFillInfo->type == TSDB_FILL_NEXT && (pFillInfo->index + 1) < pFillInfo->numOfRows) { if (pFillInfo->type == TSDB_FILL_NEXT && (pFillInfo->index + 1) < pFillInfo->numOfRows) {
++pFillInfo->index; int32_t nextRowIndex = pFillInfo->index + 1;
copyCurrentRowIntoBuf(pFillInfo, pFillInfo->index, pFillInfo->next); copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, pFillInfo->next);
--pFillInfo->index;
} }
// assign rows to dst buffer // assign rows to dst buffer
......
...@@ -3461,9 +3461,16 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData ...@@ -3461,9 +3461,16 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
} }
} }
/*
* +------------------------------------+--------------+--------------+
* | null bitmap | | |
* |(n columns, one bit for each column)| src column #1| src column #2|
* +------------------------------------+--------------+--------------+
*/
void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) { void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
SFilePage* pPage = NULL; SFilePage* pPage = NULL;
// todo refactor: move away
int32_t completeRowSize = pCtx->subsidiaries.num * sizeof(bool); int32_t completeRowSize = pCtx->subsidiaries.num * sizeof(bool);
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) { for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j]; SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
...@@ -3476,12 +3483,15 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS ...@@ -3476,12 +3483,15 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
} else { } else {
pPage = getBufPage(pCtx->pBuf, pCtx->curBufPage); pPage = getBufPage(pCtx->pBuf, pCtx->curBufPage);
if (pPage->num + completeRowSize > getBufPageSize(pCtx->pBuf)) { if (pPage->num + completeRowSize > getBufPageSize(pCtx->pBuf)) {
// current page is all used, let's prepare a new buffer page
releaseBufPage(pCtx->pBuf, pPage);
pPage = getNewBufPage(pCtx->pBuf, 0, &pCtx->curBufPage); pPage = getNewBufPage(pCtx->pBuf, 0, &pCtx->curBufPage);
pPage->num = sizeof(SFilePage); pPage->num = sizeof(SFilePage);
} }
} }
pPos->pageId = pCtx->curBufPage; pPos->pageId = pCtx->curBufPage;
pPos->offset = pPage->num;
// keep the current row data, extract method // keep the current row data, extract method
int32_t offset = 0; int32_t offset = 0;
...@@ -3509,7 +3519,6 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS ...@@ -3509,7 +3519,6 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
offset += pCol->info.bytes; offset += pCol->info.bytes;
} }
pPos->offset = pPage->num;
pPage->num += completeRowSize; pPage->num += completeRowSize;
setBufPageDirty(pPage, true); setBufPageDirty(pPage, true);
...@@ -4839,7 +4848,7 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da ...@@ -4839,7 +4848,7 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da
if (pInfo->numSampled < pInfo->samples) { if (pInfo->numSampled < pInfo->samples) {
sampleAssignResult(pInfo, data, pInfo->numSampled); sampleAssignResult(pInfo, data, pInfo->numSampled);
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
saveTupleData(pCtx, index, pCtx->pSrcBlock, pInfo->tuplePos + pInfo->numSampled * sizeof(STuplePos)); saveTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[pInfo->numSampled]);
} }
pInfo->numSampled++; pInfo->numSampled++;
} else { } else {
...@@ -4847,7 +4856,7 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da ...@@ -4847,7 +4856,7 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da
if (j < pInfo->samples) { if (j < pInfo->samples) {
sampleAssignResult(pInfo, data, j); sampleAssignResult(pInfo, data, j);
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
copyTupleData(pCtx, index, pCtx->pSrcBlock, pInfo->tuplePos + j * sizeof(STuplePos)); copyTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[j]);
} }
} }
} }
...@@ -4885,7 +4894,7 @@ int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -4885,7 +4894,7 @@ int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t currentRow = pBlock->info.rows; int32_t currentRow = pBlock->info.rows;
for (int32_t i = 0; i < pInfo->numSampled; ++i) { for (int32_t i = 0; i < pInfo->numSampled; ++i) {
colDataAppend(pCol, currentRow + i, pInfo->data + i * pInfo->colBytes, false); colDataAppend(pCol, currentRow + i, pInfo->data + i * pInfo->colBytes, false);
setSelectivityValue(pCtx, pBlock, pInfo->tuplePos + i * sizeof(STuplePos), currentRow + i); setSelectivityValue(pCtx, pBlock, &pInfo->tuplePos[i], currentRow + i);
} }
return pInfo->numSampled; return pInfo->numSampled;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册