未验证 提交 5fdaa434 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #12626 from taosdata/feature/3.0_liaohj

fix(query): fix the invalid column length value during spill out the data into disk
...@@ -600,10 +600,11 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { ...@@ -600,10 +600,11 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
} }
int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity) { int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity) {
pBlock->info.rows = *(int32_t*)buf; pBlock->info.rows = *(int32_t*) buf;
pBlock->info.groupId = *(uint64_t*) (buf + sizeof(int32_t));
int32_t numOfCols = pBlock->info.numOfCols; int32_t numOfCols = pBlock->info.numOfCols;
const char* pStart = buf + sizeof(uint32_t); const char* pStart = buf + sizeof(uint32_t) + sizeof(uint64_t);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
...@@ -669,7 +670,7 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock) { ...@@ -669,7 +670,7 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock) {
return sizeof(int32_t) + sizeof(uint64_t) + pBlock->info.numOfCols * sizeof(int32_t); return sizeof(int32_t) + sizeof(uint64_t) + pBlock->info.numOfCols * sizeof(int32_t);
} }
double blockDataGetSerialRowSize(const SSDataBlock* pBlock) { double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
ASSERT(pBlock != NULL); ASSERT(pBlock != NULL);
double rowSize = 0; double rowSize = 0;
...@@ -1224,7 +1225,27 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { ...@@ -1224,7 +1225,27 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
} }
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) { size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) {
return (int32_t)((pageSize - blockDataGetSerialMetaSize(pBlock)) / blockDataGetSerialRowSize(pBlock)); int32_t payloadSize = pageSize - blockDataGetSerialMetaSize(pBlock);
int32_t rowSize = pBlock->info.rowSize;
int32_t nRows = payloadSize / rowSize;
// the true value must be less than the value of nRows
int32_t additional = 0;
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
additional += nRows * sizeof(int32_t);
} else {
additional += BitmapLen(nRows);
}
}
int32_t newRows = (payloadSize - additional) / rowSize;
ASSERT(newRows <= nRows && newRows > 1);
return newRows;
} }
void colDataDestroy(SColumnInfoData* pColData) { void colDataDestroy(SColumnInfoData* pColData) {
......
...@@ -396,8 +396,11 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -396,8 +396,11 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
pGInfo->groupId = calcGroupId(pInfo->keyBuf, len); pGInfo->groupId = calcGroupId(pInfo->keyBuf, len);
} }
// number of rows
int32_t* rows = (int32_t*) pPage; int32_t* rows = (int32_t*) pPage;
// group id
size_t numOfCols = pOperator->numOfExprs; size_t numOfCols = pOperator->numOfExprs;
for(int32_t i = 0; i < numOfCols; ++i) { for(int32_t i = 0; i < numOfCols; ++i) {
SExprInfo* pExpr = &pOperator->pExpr[i]; SExprInfo* pExpr = &pOperator->pExpr[i];
...@@ -408,13 +411,13 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -408,13 +411,13 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
int32_t bytes = pColInfoData->info.bytes; int32_t bytes = pColInfoData->info.bytes;
int32_t startOffset = pInfo->columnOffset[i]; int32_t startOffset = pInfo->columnOffset[i];
char* columnLen = NULL; int32_t* columnLen = NULL;
int32_t contentLen = 0; int32_t contentLen = 0;
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
int32_t* offset = (int32_t*)((char*)pPage + startOffset); int32_t* offset = (int32_t*)((char*)pPage + startOffset);
columnLen = (char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity; columnLen = (int32_t*) ((char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity);
char* data = (char*)(columnLen + sizeof(int32_t)); char* data = (char*)((char*) columnLen + sizeof(int32_t));
if (colDataIsNull_s(pColInfoData, j)) { if (colDataIsNull_s(pColInfoData, j)) {
offset[(*rows)] = -1; offset[(*rows)] = -1;
...@@ -423,11 +426,15 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -423,11 +426,15 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
offset[*rows] = (*columnLen); offset[*rows] = (*columnLen);
char* src = colDataGetData(pColInfoData, j); char* src = colDataGetData(pColInfoData, j);
memcpy(data + (*columnLen), src, varDataTLen(src)); memcpy(data + (*columnLen), src, varDataTLen(src));
int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage);
ASSERT(v > 0);
printf("len:%d\n", v);
contentLen = varDataTLen(src); contentLen = varDataTLen(src);
} }
} else { } else {
char* bitmap = (char*)pPage + startOffset; char* bitmap = (char*)pPage + startOffset;
columnLen = (char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity); columnLen = (int32_t*) ((char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity));
char* data = (char*) columnLen + sizeof(int32_t); char* data = (char*) columnLen + sizeof(int32_t);
bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j); bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j);
...@@ -440,6 +447,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -440,6 +447,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
} }
(*columnLen) += contentLen; (*columnLen) += contentLen;
ASSERT(*columnLen >= 0);
} }
(*rows) += 1; (*rows) += 1;
...@@ -476,7 +484,12 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf ...@@ -476,7 +484,12 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
pPage = getNewBufPage(pInfo->pBuf, 0, &pageId); pPage = getNewBufPage(pInfo->pBuf, 0, &pageId);
taosArrayPush(p->pPageList, &pageId); taosArrayPush(p->pPageList, &pageId);
*(int32_t*) pPage = 0; // // number of rows
// *(int32_t*) pPage = 0;
//
// uint64_t* groupId = (pPage + sizeof(int32_t));
// *groupId = 0;
memset(pPage, 0, getBufPageSize(pInfo->pBuf));
} }
} }
...@@ -500,7 +513,7 @@ int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) { ...@@ -500,7 +513,7 @@ int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) {
size_t numOfCols = pBlock->info.numOfCols; size_t numOfCols = pBlock->info.numOfCols;
int32_t* offset = taosMemoryCalloc(pBlock->info.numOfCols, sizeof(int32_t)); int32_t* offset = taosMemoryCalloc(pBlock->info.numOfCols, sizeof(int32_t));
offset[0] = sizeof(int32_t); // the number of rows in current page, ref to SSDataBlock paged serialization format offset[0] = sizeof(int32_t) + sizeof(uint64_t); // the number of rows in current page, ref to SSDataBlock paged serialization format
for(int32_t i = 0; i < numOfCols - 1; ++i) { for(int32_t i = 0; i < numOfCols - 1; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
...@@ -571,7 +584,6 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) { ...@@ -571,7 +584,6 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
break; break;
} }
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfExprs);
doHashPartition(pOperator, pBlock); doHashPartition(pOperator, pBlock);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册