未验证 提交 ab08e5cf 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #8744 from taosdata/feature/query

[td-11169]<FIX>: fix query caused taosd crash.
......@@ -62,7 +62,7 @@ typedef struct SRetrieveSupport {
uint32_t numOfRetry; // record the number of retry times
} SRetrieveSupport;
int32_t tscCreateGlobalMergerEnv(SQueryInfo* pQueryInfo, tExtMemBuffer ***pMemBuffer, int32_t numOfSub, tOrderDescriptor **pDesc, uint32_t nBufferSize, int64_t id);
int32_t tscCreateGlobalMergerEnv(SQueryInfo* pQueryInfo, tExtMemBuffer ***pMemBuffer, int32_t numOfSub, tOrderDescriptor **pDesc, uint32_t* nBufferSize, int64_t id);
void tscDestroyGlobalMergerEnv(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, int32_t numOfVnodes);
......
......@@ -407,8 +407,8 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SQueryInfo*
}
int32_t tscCreateGlobalMergerEnv(SQueryInfo *pQueryInfo, tExtMemBuffer ***pMemBuffer, int32_t numOfSub,
tOrderDescriptor **pOrderDesc, uint32_t nBufferSizes, int64_t id) {
SSchema *pSchema = NULL;
tOrderDescriptor **pOrderDesc, uint32_t* nBufferSizes, int64_t id) {
SSchema1 *pSchema = NULL;
SColumnModel *pModel = NULL;
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......@@ -421,7 +421,7 @@ int32_t tscCreateGlobalMergerEnv(SQueryInfo *pQueryInfo, tExtMemBuffer ***pMemBu
size_t size = tscNumOfExprs(pQueryInfo);
pSchema = (SSchema *)calloc(1, sizeof(SSchema) * size);
pSchema = (SSchema1 *)calloc(1, sizeof(SSchema1) * size);
if (pSchema == NULL) {
tscError("0x%"PRIx64" failed to allocate memory", id);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -440,7 +440,10 @@ int32_t tscCreateGlobalMergerEnv(SQueryInfo *pQueryInfo, tExtMemBuffer ***pMemBu
int32_t capacity = 0;
if (rlen != 0) {
capacity = nBufferSizes / rlen;
if ((*nBufferSizes) < rlen) {
(*nBufferSizes) = rlen * 2;
}
capacity = (*nBufferSizes) / rlen;
}
pModel = createColumnModel(pSchema, (int32_t)size, capacity);
......@@ -457,7 +460,7 @@ int32_t tscCreateGlobalMergerEnv(SQueryInfo *pQueryInfo, tExtMemBuffer ***pMemBu
assert(numOfSub <= pTableMetaInfo->vgroupList->numOfVgroups);
for (int32_t i = 0; i < numOfSub; ++i) {
(*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pg, pModel);
(*pMemBuffer)[i] = createExtMemBuffer(*nBufferSizes, rlen, pg, pModel);
(*pMemBuffer)[i]->flushModel = MULTIPLE_APPEND_MODEL;
}
......
......@@ -2399,7 +2399,7 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS
}
int16_t resType = 0;
int16_t resBytes = 0;
int32_t resBytes = 0;
int32_t interBufSize = 0;
getResultDataInfo(pSchema->type, pSchema->bytes, f, 0, &resType, &resBytes, &interBufSize, 0, false, pUdfInfo);
......@@ -2638,7 +2638,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
}
int16_t resultType = 0;
int16_t resultSize = 0;
int32_t resultSize = 0;
int32_t intermediateResSize = 0;
if (getResultDataInfo(pSchema->type, pSchema->bytes, functionId, 0, &resultType, &resultSize,
......@@ -2897,7 +2897,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
tVariant* pVariant = &pParamElem[1].pNode->value;
int16_t resultType = pSchema->type;
int16_t resultSize = pSchema->bytes;
int32_t resultSize = pSchema->bytes;
int32_t interResult = 0;
char val[8] = {0};
......@@ -3079,7 +3079,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
s = pTagSchema[index.columnIndex];
}
int16_t bytes = 0;
int32_t bytes = 0;
int16_t type = 0;
int32_t inter = 0;
......@@ -3106,7 +3106,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
int32_t inter = 0;
int16_t resType = 0;
int16_t bytes = 0;
int32_t bytes = 0;
getResultDataInfo(TSDB_DATA_TYPE_INT, 4, TSDB_FUNC_BLKINFO, 0, &resType, &bytes, &inter, 0, 0, NULL);
......@@ -3159,7 +3159,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
int32_t inter = 0;
int16_t resType = 0;
int16_t bytes = 0;
int32_t bytes = 0;
getResultDataInfo(TSDB_DATA_TYPE_INT, 4, functionId, 0, &resType, &bytes, &inter, 0, false, pUdfInfo);
SExprInfo* pExpr = tscExprAppend(pQueryInfo, functionId, &index, resType, bytes, getNewResColId(pCmd), inter, false);
......@@ -3479,7 +3479,7 @@ int32_t tscTansformFuncForSTableQuery(SQueryInfo* pQueryInfo) {
assert(tscGetNumOfTags(pTableMetaInfo->pTableMeta) >= 0);
int16_t bytes = 0;
int32_t bytes = 0;
int16_t type = 0;
int32_t interBytes = 0;
......
......@@ -2000,7 +2000,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
colIndex.columnIndex = tscGetTagColIndexById(pTableMetaInfo->pTableMeta, tagColId);
int16_t bytes = 0;
int32_t bytes = 0;
int16_t type = 0;
int32_t inter = 0;
......@@ -2636,7 +2636,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
pRes->qId = 0x1; // hack the qhandle check
const uint32_t nBufferSize = (1u << 18u); // 256KB, default buffer size
uint32_t nBufferSize = (1u << 18u); // 256KB, default buffer size
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......@@ -2652,7 +2652,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
return ret;
}
ret = tscCreateGlobalMergerEnv(pQueryInfo, &pMemoryBuf, pSql->subState.numOfSub, &pDesc, nBufferSize, pSql->self);
ret = tscCreateGlobalMergerEnv(pQueryInfo, &pMemoryBuf, pSql->subState.numOfSub, &pDesc, &nBufferSize, pSql->self);
if (ret != 0) {
pRes->code = ret;
tscAsyncResultOnError(pSql);
......
......@@ -51,7 +51,7 @@ typedef struct SSqlExpr {
int16_t functionId; // function id in aAgg array
int16_t resType; // return value type
int16_t resBytes; // length of return value
int32_t resBytes; // length of return value
int32_t interBytes; // inter result buffer size
int16_t colType; // table column type
......
......@@ -438,7 +438,7 @@ typedef struct SColumnFilterList {
typedef struct SColumnInfo {
int16_t colId;
int16_t type;
int16_t bytes;
int32_t bytes;
SColumnFilterList flist;
} SColumnInfo;
......
......@@ -183,7 +183,7 @@ typedef struct SQLFunctionCtx {
int16_t inputBytes;
int16_t outputType;
int16_t outputBytes; // size of results, determined by function and input column data type
int32_t outputBytes; // size of results, determined by function and input column data type
int32_t interBufBytes; // internal buffer size
bool hasNull; // null value exist in current block
bool requireNull; // require null in some function
......@@ -227,7 +227,7 @@ typedef struct SAggFunctionInfo {
#define GET_RES_INFO(ctx) ((ctx)->resultInfo)
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
int16_t *len, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo);
int32_t *len, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo);
int32_t isValidFunction(const char* name, int32_t len);
#define IS_STREAM_QUERY_VALID(x) (((x)&TSDB_FUNCSTATE_STREAM) != 0)
......
......@@ -75,8 +75,15 @@ typedef struct tFilePagesItem {
tFilePage item;
} tFilePagesItem;
typedef struct SSchema1 {
uint8_t type;
char name[TSDB_COL_NAME_LEN];
int16_t colId;
int32_t bytes;
} SSchema1;
typedef struct SSchemaEx {
struct SSchema field;
SSchema1 field;
int32_t offset;
} SSchemaEx;
......@@ -178,7 +185,7 @@ bool tExtMemBufferIsAllDataInMem(tExtMemBuffer *pMemBuffer);
* @param blockCapacity
* @return
*/
SColumnModel *createColumnModel(SSchema *fields, int32_t numOfCols, int32_t blockCapacity);
SColumnModel *createColumnModel(SSchema1 *fields, int32_t numOfCols, int32_t blockCapacity);
/**
*
......@@ -199,7 +206,7 @@ void destroyColumnModel(SColumnModel *pModel);
void tColModelCompact(SColumnModel *pModel, tFilePage *inputBuffer, int32_t maxElemsCapacity);
void tColModelErase(SColumnModel *pModel, tFilePage *inputBuffer, int32_t maxCapacity, int32_t s, int32_t e);
SSchema *getColumnModelSchema(SColumnModel *pColumnModel, int32_t index);
SSchema1 *getColumnModelSchema(SColumnModel *pColumnModel, int32_t index);
int16_t getColumnModelOffset(SColumnModel *pColumnModel, int32_t index);
......
......@@ -197,7 +197,7 @@ typedef struct {
} SSampleFuncInfo;
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
int16_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo) {
int32_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo) {
if (!isValidDataType(dataType)) {
qError("Illegal data type %d or data type length %d", dataType, dataBytes);
return TSDB_CODE_TSC_INVALID_OPERATION;
......@@ -210,7 +210,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
functionId == TSDB_FUNC_FLOOR || functionId == TSDB_FUNC_ROUND)
{
*type = (int16_t)dataType;
*bytes = (int16_t)dataBytes;
*bytes = dataBytes;
if (functionId == TSDB_FUNC_INTERP) {
*interBytes = sizeof(SInterpInfoDetail);
......@@ -224,7 +224,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
// (uid, tid) + VGID + TAGSIZE + VARSTR_HEADER_SIZE
if (functionId == TSDB_FUNC_TID_TAG) { // todo use struct
*type = TSDB_DATA_TYPE_BINARY;
*bytes = (int16_t)(dataBytes + sizeof(int16_t) + sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t) + VARSTR_HEADER_SIZE);
*bytes = (dataBytes + sizeof(int16_t) + sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t) + VARSTR_HEADER_SIZE);
*interBytes = 0;
return TSDB_CODE_SUCCESS;
}
......@@ -302,7 +302,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
if (functionId == TSDB_FUNC_MIN || functionId == TSDB_FUNC_MAX) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = (int16_t)(dataBytes + DATA_SET_FLAG_SIZE);
*bytes = (dataBytes + DATA_SET_FLAG_SIZE);
*interBytes = *bytes;
return TSDB_CODE_SUCCESS;
......@@ -325,13 +325,13 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = (int16_t)(sizeof(STopBotInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param);
*bytes = (sizeof(STopBotInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param);
*interBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_SAMPLE) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = (int16_t)(sizeof(SSampleFuncInfo) + dataBytes*param + sizeof(int64_t)*param + extLength*param);
*bytes = (sizeof(SSampleFuncInfo) + dataBytes*param + sizeof(int64_t)*param + extLength*param);
*interBytes = *bytes;
return TSDB_CODE_SUCCESS;
......@@ -344,14 +344,14 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
} else if (functionId == TSDB_FUNC_APERCT) {
*type = TSDB_DATA_TYPE_BINARY;
int16_t bytesHist = sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1) + sizeof(SHistogramInfo) + sizeof(SAPercentileInfo);
int16_t bytesDigest = (int16_t)(sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION));
int32_t bytesDigest = (int32_t) (sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION));
*bytes = MAX(bytesHist, bytesDigest);
*interBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_LAST_ROW) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = (int16_t)(sizeof(SLastrowInfo) + dataBytes);
*bytes = (sizeof(SLastrowInfo) + dataBytes);
*interBytes = *bytes;
return TSDB_CODE_SUCCESS;
......@@ -379,7 +379,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*type = TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(double);
int16_t bytesHist = sizeof(SAPercentileInfo) + sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1);
int16_t bytesDigest = (int16_t)(sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION));
int32_t bytesDigest = (int32_t) (sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION));
*interBytes = MAX(bytesHist, bytesDigest);
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_TWA) {
......@@ -416,31 +416,31 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*interBytes = sizeof(SStddevInfo);
} else if (functionId == TSDB_FUNC_MIN || functionId == TSDB_FUNC_MAX) {
*type = (int16_t)dataType;
*bytes = (int16_t)dataBytes;
*bytes = dataBytes;
*interBytes = dataBytes + DATA_SET_FLAG_SIZE;
} else if (functionId == TSDB_FUNC_FIRST || functionId == TSDB_FUNC_LAST) {
*type = (int16_t)dataType;
*bytes = (int16_t)dataBytes;
*interBytes = (int16_t)(dataBytes + sizeof(SFirstLastInfo));
*bytes = dataBytes;
*interBytes = (dataBytes + sizeof(SFirstLastInfo));
} else if (functionId == TSDB_FUNC_SPREAD) {
*type = (int16_t)TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(double);
*interBytes = sizeof(SSpreadInfo);
} else if (functionId == TSDB_FUNC_PERCT) {
*type = (int16_t)TSDB_DATA_TYPE_DOUBLE;
*bytes = (int16_t)sizeof(double);
*interBytes = (int16_t)sizeof(SPercentileInfo);
*bytes = sizeof(double);
*interBytes = sizeof(SPercentileInfo);
} else if (functionId == TSDB_FUNC_LEASTSQR) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = MAX(TSDB_AVG_FUNCTION_INTER_BUFFER_SIZE, sizeof(SLeastsquaresInfo)); // string
*interBytes = *bytes;
} else if (functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_LAST_DST) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = (int16_t)(dataBytes + sizeof(SFirstLastInfo));
*bytes = (dataBytes + sizeof(SFirstLastInfo));
*interBytes = *bytes;
} else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
*type = (int16_t)dataType;
*bytes = (int16_t)dataBytes;
*bytes = dataBytes;
size_t size = sizeof(STopBotInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param;
......@@ -448,12 +448,12 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*interBytes = (int32_t)size;
} else if (functionId == TSDB_FUNC_SAMPLE) {
*type = (int16_t)dataType;
*bytes = (int16_t)dataBytes;
*bytes = dataBytes;
size_t size = sizeof(SSampleFuncInfo) + dataBytes*param + sizeof(int64_t)*param + extLength*param;
*interBytes = (int32_t)size;
} else if (functionId == TSDB_FUNC_LAST_ROW) {
*type = (int16_t)dataType;
*bytes = (int16_t)dataBytes;
*bytes = dataBytes;
*interBytes = dataBytes;
} else if (functionId == TSDB_FUNC_STDDEV_DST) {
*type = TSDB_DATA_TYPE_BINARY;
......
......@@ -2639,6 +2639,14 @@ static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, i
while(((*rowsize) * MIN_ROWS_PER_PAGE) > (*ps) - overhead) {
*ps = ((*ps) << 1u);
}
if (*ps > 5 * 1024 * 1024) {
MIN_ROWS_PER_PAGE = 2;
*ps = DEFAULT_INTERN_BUF_PAGE_SIZE;
while(((*rowsize) * MIN_ROWS_PER_PAGE) > (*ps) - overhead) {
*ps = ((*ps) << 1u);
}
}
}
#define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_BINARY && (_t) != TSDB_DATA_TYPE_NCHAR)
......@@ -4792,8 +4800,8 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
int32_t ps = DEFAULT_PAGE_SIZE;
getIntermediateBufInfo(pRuntimeEnv, &ps, &pQueryAttr->intermediateResultRowSize);
int32_t TENMB = 1024*1024*10;
int32_t code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, ps, TENMB, pQInfo->qId);
int32_t TWENTYMB = 1024*1024*20;
int32_t code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, ps, TWENTYMB, pQInfo->qId);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......
......@@ -521,7 +521,7 @@ static void swap(SColumnModel *pColumnModel, int32_t count, int32_t s1, char *da
void *first = COLMODEL_GET_VAL(data1, pColumnModel, count, s1, i);
void *second = COLMODEL_GET_VAL(data1, pColumnModel, count, s2, i);
SSchema* pSchema = &pColumnModel->pFields[i].field;
SSchema1* pSchema = &pColumnModel->pFields[i].field;
tsDataSwap(first, second, pSchema->type, pSchema->bytes, buf);
}
}
......@@ -750,7 +750,7 @@ void tColDataQSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta
size_t width = 0;
for(int32_t i = 0; i < pModel->numOfCols; ++i) {
SSchema* pSchema = &pModel->pFields[i].field;
SSchema1* pSchema = &pModel->pFields[i].field;
if (width < pSchema->bytes) {
width = pSchema->bytes;
}
......@@ -771,7 +771,7 @@ void tColDataQSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta
/*
* deep copy of sschema
*/
SColumnModel *createColumnModel(SSchema *fields, int32_t numOfCols, int32_t blockCapacity) {
SColumnModel *createColumnModel(SSchema1 *fields, int32_t numOfCols, int32_t blockCapacity) {
SColumnModel *pColumnModel = (SColumnModel *)calloc(1, sizeof(SColumnModel) + numOfCols * sizeof(SSchemaEx));
if (pColumnModel == NULL) {
return NULL;
......@@ -1023,7 +1023,7 @@ void tColModelCompact(SColumnModel *pModel, tFilePage *inputBuffer, int32_t maxE
}
}
SSchema* getColumnModelSchema(SColumnModel *pColumnModel, int32_t index) {
SSchema1* getColumnModelSchema(SColumnModel *pColumnModel, int32_t index) {
assert(pColumnModel != NULL && index >= 0 && index < pColumnModel->numOfCols);
return &pColumnModel->pFields[index].field;
}
......@@ -1045,7 +1045,7 @@ void tColModelErase(SColumnModel *pModel, tFilePage *inputBuffer, int32_t blockC
/* start from the second column */
for (int32_t i = 0; i < pModel->numOfCols; ++i) {
int16_t offset = getColumnModelOffset(pModel, i);
SSchema* pSchema = getColumnModelSchema(pModel, i);
SSchema1* pSchema = getColumnModelSchema(pModel, i);
char *startPos = inputBuffer->data + offset * blockCapacity + s * pSchema->bytes;
char *endPos = startPos + pSchema->bytes * removed;
......
......@@ -97,7 +97,7 @@ TEST(testCase, colunmnwise_sort_test) {
}
TEST(testCase, columnsort_test) {
SSchema field[1] = {
SSchema1 field[1] = {
{TSDB_DATA_TYPE_INT, "k", sizeof(int32_t)},
};
......
......@@ -192,7 +192,7 @@ void largeDataTest() {
void qsortTest() {
printf("running : %s\n", __FUNCTION__);
SSchema field[1] = {
SSchema1 field[1] = {
{TSDB_DATA_TYPE_INT, "k", sizeof(int32_t)},
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册