未验证 提交 6fbf8c6c 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #13031 from taosdata/feature/3_liaohj

fix(query): prepare enough buffer before convert string.
...@@ -1246,7 +1246,7 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) { ...@@ -1246,7 +1246,7 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) {
} }
int32_t newRows = (payloadSize - additional) / rowSize; int32_t newRows = (payloadSize - additional) / rowSize;
ASSERT(newRows <= nRows && newRows > 1); ASSERT(newRows <= nRows && newRows >= 1);
return newRows; return newRows;
} }
......
...@@ -257,6 +257,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) { ...@@ -257,6 +257,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
terrno = rowsRead; terrno = rowsRead;
mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id); mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
mndReleaseShowObj(pShow, true); mndReleaseShowObj(pShow, true);
blockDataDestroy(pBlock);
return -1; return -1;
} }
......
...@@ -94,10 +94,8 @@ typedef struct SLimit { ...@@ -94,10 +94,8 @@ typedef struct SLimit {
typedef struct STableScanAnalyzeInfo SFileBlockLoadRecorder; typedef struct STableScanAnalyzeInfo SFileBlockLoadRecorder;
typedef struct STaskCostInfo { typedef struct STaskCostInfo {
int64_t created; int64_t created;
int64_t start; int64_t start;
int64_t end;
uint64_t loadStatisTime; uint64_t loadStatisTime;
uint64_t loadFileBlockTime; uint64_t loadFileBlockTime;
uint64_t loadDataInCacheTime; uint64_t loadDataInCacheTime;
...@@ -185,7 +183,7 @@ typedef struct SExecTaskInfo { ...@@ -185,7 +183,7 @@ typedef struct SExecTaskInfo {
STaskCostInfo cost; STaskCostInfo cost;
int64_t owner; // if it is in execution int64_t owner; // if it is in execution
int32_t code; int32_t code;
uint64_t totalRows; // total number of rows // uint64_t totalRows; // total number of rows
struct { struct {
char *tablename; char *tablename;
char *dbname; char *dbname;
...@@ -676,6 +674,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI ...@@ -676,6 +674,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
SArray* pColList); SArray* pColList);
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win); void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win);
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag); int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag);
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
void doSetOperatorCompleted(SOperatorInfo* pOperator); void doSetOperatorCompleted(SOperatorInfo* pOperator);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SArray* pColMatchInfo); void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SArray* pColMatchInfo);
......
...@@ -126,8 +126,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { ...@@ -126,8 +126,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
pTaskInfo->code = ret; pTaskInfo->code = ret;
cleanUpUdfs(); cleanUpUdfs();
qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
tstrerror(pTaskInfo->code));
return pTaskInfo->code; return pTaskInfo->code;
} }
...@@ -142,12 +141,13 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { ...@@ -142,12 +141,13 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
*useconds = pTaskInfo->cost.elapsedTime; *useconds = pTaskInfo->cost.elapsedTime;
} }
cleanUpUdfs();
int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0; int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0;
pTaskInfo->totalRows += current; uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
cleanUpUdfs();
qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms", qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0, el/1000.0); GET_TASKID(pTaskInfo), current, total, 0, el/1000.0);
atomic_store_64(&pTaskInfo->owner, 0); atomic_store_64(&pTaskInfo->owner, 0);
return pTaskInfo->code; return pTaskInfo->code;
...@@ -197,7 +197,7 @@ int32_t qIsTaskCompleted(qTaskInfo_t qinfo) { ...@@ -197,7 +197,7 @@ int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
void qDestroyTask(qTaskInfo_t qTaskHandle) { void qDestroyTask(qTaskInfo_t qTaskHandle) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) qTaskHandle; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) qTaskHandle;
qDebug("%s execTask completed, numOfRows:%"PRId64, GET_TASKID(pTaskInfo), pTaskInfo->totalRows); qDebug("%s execTask completed, numOfRows:%"PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);
queryCostStatis(pTaskInfo); // print the query cost summary queryCostStatis(pTaskInfo); // print the query cost summary
doDestroyTask(pTaskInfo); doDestroyTask(pTaskInfo);
......
...@@ -3938,6 +3938,21 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { ...@@ -3938,6 +3938,21 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
} }
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) {
*defaultPgsz = 4096;
while (*defaultPgsz < rowSize * 4) {
*defaultPgsz <<= 1u;
}
// at least four pages need to be in buffer
*defaultBufsz = 4096 * 256;
if ((*defaultBufsz) <= (*defaultPgsz)) {
(*defaultBufsz) = (*defaultPgsz) * 4;
}
return 0;
}
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
const char* pKey) { const char* pKey) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
...@@ -3950,18 +3965,11 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n ...@@ -3950,18 +3965,11 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
uint32_t defaultPgsz = 4096; uint32_t defaultPgsz = 0;
while (defaultPgsz < pAggSup->resultRowSize * 4) { uint32_t defaultBufsz = 0;
defaultPgsz <<= 1u; getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
}
// at least four pages need to be in buffer
int32_t defaultBufsz = 4096 * 256;
if (defaultBufsz <= defaultPgsz) {
defaultBufsz = defaultPgsz * 4;
}
int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, TD_TMP_DIR_PATH); int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, TD_TMP_DIR_PATH);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
......
...@@ -439,7 +439,6 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -439,7 +439,6 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
memcpy(data + (*columnLen), src, varDataTLen(src)); memcpy(data + (*columnLen), src, varDataTLen(src));
int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage); int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage);
ASSERT(v > 0); ASSERT(v > 0);
printf("len:%d\n", v);
contentLen = varDataTLen(src); contentLen = varDataTLen(src);
} }
...@@ -490,16 +489,13 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf ...@@ -490,16 +489,13 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
int32_t *rows = (int32_t*) pPage; int32_t *rows = (int32_t*) pPage;
if (*rows >= pInfo->rowCapacity) { if (*rows >= pInfo->rowCapacity) {
// release buffer
releaseBufPage(pInfo->pBuf, pPage);
// add a new page for current group // add a new page for current group
int32_t pageId = 0; int32_t pageId = 0;
pPage = getNewBufPage(pInfo->pBuf, 0, &pageId); pPage = getNewBufPage(pInfo->pBuf, 0, &pageId);
taosArrayPush(p->pPageList, &pageId); taosArrayPush(p->pPageList, &pageId);
// // number of rows
// *(int32_t*) pPage = 0;
//
// uint64_t* groupId = (pPage + sizeof(int32_t));
// *groupId = 0;
memset(pPage, 0, getBufPageSize(pInfo->pBuf)); memset(pPage, 0, getBufPageSize(pInfo->pBuf));
} }
} }
...@@ -566,6 +562,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { ...@@ -566,6 +562,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity); blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity);
pInfo->pageIndex += 1; pInfo->pageIndex += 1;
releaseBufPage(pInfo->pBuf, page);
blockDataUpdateTsWindow(pInfo->binfo.pRes, 0); blockDataUpdateTsWindow(pInfo->binfo.pRes, 0);
pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId; pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId;
...@@ -631,7 +628,11 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -631,7 +628,11 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo*
goto _error; goto _error;
} }
int32_t code = createDiskbasedBuf(&pInfo->pBuf, 4096, 4096 * 256, pTaskInfo->id.str, TD_TMP_DIR_PATH); uint32_t defaultPgsz = 0;
uint32_t defaultBufsz = 0;
getBufferPgSize(pResultBlock->info.rowSize, &defaultPgsz, &defaultBufsz);
int32_t code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, TD_TMP_DIR_PATH);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
......
...@@ -707,6 +707,7 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu ...@@ -707,6 +707,7 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
int16_t inputType = GET_PARAM_TYPE(&pInput[0]); int16_t inputType = GET_PARAM_TYPE(&pInput[0]);
int16_t inputLen = GET_PARAM_BYTES(&pInput[0]);
int16_t outputType = GET_PARAM_TYPE(&pOutput[0]); int16_t outputType = GET_PARAM_TYPE(&pOutput[0]);
int64_t outputLen = GET_PARAM_BYTES(&pOutput[0]); int64_t outputLen = GET_PARAM_BYTES(&pOutput[0]);
...@@ -718,15 +719,15 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp ...@@ -718,15 +719,15 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
colDataAppendNULL(pOutput->columnData, i); colDataAppendNULL(pOutput->columnData, i);
continue; continue;
} }
char *input = colDataGetData(pInput[0].columnData, i); char *input = colDataGetData(pInput[0].columnData, i);
switch(outputType) { switch(outputType) {
case TSDB_DATA_TYPE_BIGINT: { case TSDB_DATA_TYPE_BIGINT: {
if (inputType == TSDB_DATA_TYPE_BINARY) { if (inputType == TSDB_DATA_TYPE_BINARY) {
memcpy(output, varDataVal(input), varDataLen(input)); *(int64_t *)output = taosStr2Int64(varDataVal(input), NULL, 10);
*(int64_t *)output = taosStr2Int64(output, NULL, 10);
} else if (inputType == TSDB_DATA_TYPE_NCHAR) { } else if (inputType == TSDB_DATA_TYPE_NCHAR) {
char *newBuf = taosMemoryCalloc(1, outputLen * TSDB_NCHAR_SIZE + 1); char *newBuf = taosMemoryCalloc(1, inputLen);
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf); int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf);
if (len < 0) { if (len < 0) {
taosMemoryFree(newBuf); taosMemoryFree(newBuf);
...@@ -742,10 +743,9 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp ...@@ -742,10 +743,9 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
} }
case TSDB_DATA_TYPE_UBIGINT: { case TSDB_DATA_TYPE_UBIGINT: {
if (inputType == TSDB_DATA_TYPE_BINARY) { if (inputType == TSDB_DATA_TYPE_BINARY) {
memcpy(output, varDataVal(input), varDataLen(input)); *(uint64_t *)output = taosStr2UInt64(varDataVal(input), NULL, 10);
*(uint64_t *)output = taosStr2UInt64(output, NULL, 10);
} else if (inputType == TSDB_DATA_TYPE_NCHAR) { } else if (inputType == TSDB_DATA_TYPE_NCHAR) {
char *newBuf = taosMemoryCalloc(1, outputLen * TSDB_NCHAR_SIZE + 1); char *newBuf = taosMemoryCalloc(1, inputLen);
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf); int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf);
if (len < 0) { if (len < 0) {
taosMemoryFree(newBuf); taosMemoryFree(newBuf);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册