提交 f700e474 编写于 作者: H Haojun Liao

[td-14393] fix bug.

上级 b984e12e
...@@ -574,11 +574,13 @@ typedef struct SPartitionOperatorInfo { ...@@ -574,11 +574,13 @@ typedef struct SPartitionOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SArray* pGroupCols; SArray* pGroupCols;
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys> SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
bool isInit; // denote if current val is initialized or not
char* keyBuf; // group by keys for hash char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width int32_t groupKeyLen; // total group by column width
SHashObj* pGroupSet; // quick locate the window object for each result SHashObj* pGroupSet; // quick locate the window object for each result
SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file
int32_t rowCapacity; // maximum number of rows for each buffer page
int32_t* columnOffset; // start position for each column data
void* pGroupIter; // group iterator void* pGroupIter; // group iterator
int32_t pageIndex; // page index of current group int32_t pageIndex; // page index of current group
......
...@@ -298,6 +298,10 @@ SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) { ...@@ -298,6 +298,10 @@ SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) {
idata.info.precision = pDescNode->dataType.precision; idata.info.precision = pDescNode->dataType.precision;
taosArrayPush(pBlock->pDataBlock, &idata); taosArrayPush(pBlock->pDataBlock, &idata);
if (IS_VAR_DATA_TYPE(idata.info.type)) {
pBlock->info.hasVarCol = true;
}
} }
return pBlock; return pBlock;
......
...@@ -25,6 +25,8 @@ ...@@ -25,6 +25,8 @@
#include "thash.h" #include "thash.h"
#include "ttypes.h" #include "ttypes.h"
static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity);
static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) { static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) {
SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param; SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
...@@ -357,15 +359,13 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -357,15 +359,13 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
int32_t numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols); int32_t numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols);
for (int32_t j = 0; j < pBlock->info.rows; ++j) { for (int32_t j = 0; j < pBlock->info.rows; ++j) {
// Compare with the previous row of this column, and do not set the output buffer again if they are identical.
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols); recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
int32_t numOfRows = blockDataGetCapacityInRow(pInfo->binfo.pRes, 4096);
SDataGroupInfo* p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len); SDataGroupInfo* p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);
void* pPage = NULL;
void* pPage = NULL;
if (p == NULL) { // it is a new group if (p == NULL) { // it is a new group
SDataGroupInfo gi = {0}; SDataGroupInfo gi = {0};
gi.pPageList = taosArrayInit(100, sizeof(int32_t)); gi.pPageList = taosArrayInit(100, sizeof(int32_t));
...@@ -383,7 +383,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -383,7 +383,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
pPage = getBufPage(pInfo->pBuf, *curId); pPage = getBufPage(pInfo->pBuf, *curId);
int32_t *rows = (int32_t*) pPage; int32_t *rows = (int32_t*) pPage;
if (*rows >= numOfRows) { if (*rows >= pInfo->rowCapacity) {
// add a new page for current group // add a new page for current group
int32_t pageId = 0; int32_t pageId = 0;
pPage = getNewBufPage(pInfo->pBuf, 0, &pageId); pPage = getNewBufPage(pInfo->pBuf, 0, &pageId);
...@@ -393,43 +393,54 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -393,43 +393,54 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
} }
} }
int32_t* rows = (int32_t*) pPage; // add one for this group
(*rows) += 1; p->numOfRows += 1;
int32_t* offset = taosMemoryCalloc(pBlock->info.numOfCols, sizeof(int32_t)); int32_t* rows = (int32_t*) pPage;
offset[0] = sizeof(int32_t);
int32_t numOfCols = pInfo->binfo.pRes->info.numOfCols; size_t numOfCols = pInfo->binfo.pRes->info.numOfCols;
for(int32_t i = 0; i < numOfCols - 1; ++i) { for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { int32_t bytes = pColInfoData->info.bytes;
offset[i + 1] = pColInfoData->info.bytes * numOfRows + numOfRows * sizeof(int32_t) + sizeof(int32_t) + offset[i]; int32_t startOffset = pInfo->columnOffset[i];
} else {
offset[i + 1] = pColInfoData->info.bytes * numOfRows + BitmapLen(numOfRows) + sizeof(int32_t) + offset[i];
}
}
for(int32_t i = 0; i < numOfCols; ++i) { char* columnLen = NULL;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); int32_t contentLen = 0;
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
// todo int32_t* offset = pPage + startOffset;
columnLen = pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity;
char* data = (char*)(columnLen + sizeof(int32_t));
if (colDataIsNull_s(pColInfoData, j)) {
offset[(*rows)] = -1;
contentLen = 0;
} else {
offset[*rows] = (*columnLen);
char* src = colDataGetData(pColInfoData, j);
memcpy(data + (*columnLen), src, varDataTLen(src));
contentLen = varDataTLen(src);
}
} else { } else {
char* bitmap = pPage + offset[i]; char* bitmap = pPage + startOffset;
int32_t* lenx = pPage + offset[i] + BitmapLen(numOfRows); columnLen = pPage + startOffset + BitmapLen(pInfo->rowCapacity);
char* data = (char*) lenx + sizeof(int32_t); char* data = (char*) columnLen + sizeof(int32_t);
(*lenx )+= pColInfoData->info.bytes;
bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j); bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j);
if (isNull) { if (isNull) {
colDataSetNull_f(bitmap, (*rows) - 1); colDataSetNull_f(bitmap, (*rows));
} else { } else {
memcpy(data + ((*rows) - 1)* pColInfoData->info.bytes, colDataGetData(pColInfoData, j), pColInfoData->info.bytes); memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes);
} }
contentLen = bytes;
} }
(*columnLen) += contentLen;
} }
(*rows) += 1;
setBufPageDirty(pPage, true); setBufPageDirty(pPage, true);
releaseBufPage(pInfo->pBuf, pPage); releaseBufPage(pInfo->pBuf, pPage);
} }
...@@ -437,6 +448,30 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -437,6 +448,30 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
// todo set the consistent group id according to the group keys // todo set the consistent group id according to the group keys
} }
int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) {
size_t numOfCols = pBlock->info.numOfCols;
int32_t* offset = taosMemoryCalloc(pBlock->info.numOfCols, sizeof(int32_t));
offset[0] = sizeof(int32_t); // the number of rows in current page, ref to SSDataBlock paged serialization format
for(int32_t i = 0; i < numOfCols - 1; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
int32_t bytes = pColInfoData->info.bytes;
int32_t payloadLen = bytes * rowCapacity;
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
// offset segment + content length + payload
offset[i + 1] = rowCapacity * sizeof(int32_t) + sizeof(int32_t) + payloadLen + offset[i];
} else {
// bitmap + content length + payload
offset[i + 1] = BitmapLen(rowCapacity) + sizeof(int32_t) + payloadLen + offset[i];
}
}
return offset;
}
static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
SPartitionOperatorInfo* pInfo = pOperator->info; SPartitionOperatorInfo* pInfo = pOperator->info;
...@@ -445,6 +480,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { ...@@ -445,6 +480,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
// try next group data // try next group data
pInfo->pGroupIter = taosHashIterate(pInfo->pGroupSet, pInfo->pGroupIter); pInfo->pGroupIter = taosHashIterate(pInfo->pGroupSet, pInfo->pGroupIter);
if (pInfo->pGroupIter == NULL) { if (pInfo->pGroupIter == NULL) {
pOperator->status = OP_EXEC_DONE;
return NULL; return NULL;
} }
...@@ -455,8 +491,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { ...@@ -455,8 +491,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex); int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex);
void* page = getBufPage(pInfo->pBuf, *pageId); void* page = getBufPage(pInfo->pBuf, *pageId);
int32_t numOfRows = blockDataGetCapacityInRow(pInfo->binfo.pRes, 4096); blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity);
blockDataFromBuf1(pInfo->binfo.pRes, page, numOfRows);
pInfo->pageIndex += 1; pInfo->pageIndex += 1;
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
...@@ -495,11 +530,12 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -495,11 +530,12 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator, bool* newgroup) {
} }
static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) { static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) {
SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param; SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
taosMemoryFreeClear(pInfo->keyBuf);
taosArrayDestroy(pInfo->pGroupCols); taosArrayDestroy(pInfo->pGroupCols);
taosArrayDestroy(pInfo->pGroupColVals); taosArrayDestroy(pInfo->pGroupColVals);
taosMemoryFree(pInfo->keyBuf);
taosMemoryFree(pInfo->columnOffset);
} }
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResultBlock, SArray* pGroupColList, SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResultBlock, SArray* pGroupColList,
...@@ -523,6 +559,9 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBloc ...@@ -523,6 +559,9 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBloc
goto _error; goto _error;
} }
pInfo->rowCapacity = blockDataGetCapacityInRow(pResultBlock, getBufPageSize(pInfo->pBuf));
pInfo->columnOffset = setupColumnOffset(pResultBlock, pInfo->rowCapacity);
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList); code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
......
...@@ -48,10 +48,8 @@ struct SDiskbasedBuf { ...@@ -48,10 +48,8 @@ struct SDiskbasedBuf {
}; };
static int32_t createDiskFile(SDiskbasedBuf* pBuf) { static int32_t createDiskFile(SDiskbasedBuf* pBuf) {
// pBuf->file = fopen(pBuf->path, "wb+"); pBuf->pFile = taosOpenFile(pBuf->path, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
pBuf->pFile = taosOpenFile(pBuf->path, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
if (pBuf->pFile == NULL) { if (pBuf->pFile == NULL) {
// qError("failed to create tmp file: %s on disk. %s", pBuf->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册