提交 b9c46e37 编写于 作者: S shenglian zhou

fix sample group by tbname coredump

上级 ca0d6645
......@@ -648,7 +648,8 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
for(int32_t j = 0; j < numOfExpr; ++j) {
pCtx[j].pOutput += (pCtx[j].outputBytes * numOfRows);
if (pCtx[j].functionId == TSDB_FUNC_TOP || pCtx[j].functionId == TSDB_FUNC_BOTTOM) {
if (pCtx[j].functionId == TSDB_FUNC_TOP || pCtx[j].functionId == TSDB_FUNC_BOTTOM ||
pCtx[j].functionId == TSDB_FUNC_SAMPLE) {
if(j > 0) pCtx[j].ptsOutputBuf = pCtx[j - 1].pOutput;
}
}
......
......@@ -3343,7 +3343,8 @@ int32_t tscTansformFuncForSTableQuery(SQueryInfo* pQueryInfo) {
if ((functionId >= TSDB_FUNC_SUM && functionId <= TSDB_FUNC_TWA) ||
(functionId >= TSDB_FUNC_FIRST_DST && functionId <= TSDB_FUNC_STDDEV_DST) ||
(functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_IRATE)) {
(functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_IRATE) ||
(functionId == TSDB_FUNC_SAMPLE)) {
if (getResultDataInfo(pSrcSchema->type, pSrcSchema->bytes, functionId, (int32_t)pExpr->base.param[0].i64, &type, &bytes,
&interBytes, 0, true, NULL) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
......
......@@ -74,9 +74,7 @@ extern "C" {
#define TSDB_FUNC_ROUND 35
#define TSDB_FUNC_CSUM 36
#define TSDB_FUNC_MAVG 37
#define TSDB_FUNC_SAMPLE 38
#define TSDB_FUNC_BLKINFO 39
......
......@@ -183,7 +183,8 @@ typedef struct {
typedef struct {
int32_t totalPoints;
int32_t numSampled;
tVariant *values;
int16_t colBytes;
char *values;
int64_t *timeStamps;
} SSampleFuncInfo;
......@@ -276,14 +277,6 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
return TSDB_CODE_SUCCESS;
}
if (functionId == TSDB_FUNC_SAMPLE) {
*type = (int16_t)dataType;
*bytes = (int16_t)dataBytes;
size_t size = sizeof(SSampleFuncInfo) + sizeof(tVariant) * param + sizeof(int64_t) * param;
*interBytes = (int32_t)size;
return TSDB_CODE_SUCCESS;
}
if (isSuperTable) {
if (functionId < 0) {
if (pUdfInfo->bufSize > 0) {
......@@ -328,6 +321,10 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*interBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_SAMPLE) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = (int16_t)(sizeof(SSampleFuncInfo) + sizeof(dataBytes) * param + sizeof(int64_t) * param);
*interBytes = *bytes;
} else if (functionId == TSDB_FUNC_SPREAD) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = sizeof(SSpreadInfo);
......@@ -436,6 +433,11 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
// the output column may be larger than sizeof(STopBotInfo)
*interBytes = (int32_t)size;
} else if (functionId == TSDB_FUNC_SAMPLE) {
*type = (int16_t)dataType;
*bytes = (int16_t)dataBytes;
size_t size = sizeof(SSampleFuncInfo) + sizeof(dataBytes) * param + sizeof(int64_t) * param;
*interBytes = (int32_t)size;
} else if (functionId == TSDB_FUNC_LAST_ROW) {
*type = (int16_t)dataType;
*bytes = (int16_t)dataBytes;
......@@ -4669,15 +4671,19 @@ static void mavg_function(SQLFunctionCtx *pCtx) {
//////////////////////////////////////////////////////////////////////////////////
// Sample function with reservoir sampling algorithm
static void assignResultSample(SSampleFuncInfo *pInfo, int32_t index, int64_t ts, void *pData, uint16_t type, int16_t bytes) {
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
int32_t maxLen = bytes - VARSTR_HEADER_SIZE;
int32_t len = (varDataLen(pData) > maxLen)? maxLen:varDataLen(pData);
tVariantCreateFromBinary(pInfo->values + index, varDataVal(pData), len, type);
} else {
tVariantCreateFromBinary(pInfo->values + index, pData, bytes, type);
static SSampleFuncInfo* getSampleFuncOutputInfo(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
// only the first_stage stable is directly written data into final output buffer
if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) {
return (SSampleFuncInfo *) pCtx->pOutput;
} else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer
return GET_ROWCELL_INTERBUF(pResInfo);
}
}
static void assignResultSample(SSampleFuncInfo *pInfo, int32_t index, int64_t ts, void *pData, uint16_t type, int16_t bytes) {
assignVal(pInfo->values + index*bytes, pData, bytes, type);
*(pInfo->timeStamps + index) = ts;
return;
}
......@@ -4702,8 +4708,7 @@ static void copySampleFuncRes(SQLFunctionCtx *pCtx, int32_t type) {
TSKEY* pTimestamp = pCtx->ptsOutputBuf;
char* pOutput = pCtx->pOutput;
for (int32_t i = 0; i < pRes->numSampled; ++i) {
tVariantDump(pRes->values + i, (char*)pOutput, type, true);
assignVal(pOutput, pRes->values + i*pCtx->inputBytes, pRes->colBytes, type);
*pTimestamp = *(pRes->timeStamps + i);
pOutput += pCtx->outputBytes;
......@@ -4719,11 +4724,12 @@ static bool sample_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pRes
srand(taosSafeRand());
SSampleFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo);
SSampleFuncInfo *pRes = getSampleFuncOutputInfo(pCtx);
pRes->totalPoints = 0;
pRes->numSampled = 0;
pRes->values = (tVariant*) ((char*)pRes + sizeof(SSampleFuncInfo));
pRes->timeStamps = (int64_t*)((char*)pRes->values + sizeof(tVariant) * pCtx->param[0].i64);
pRes->values = ((char*)pRes + sizeof(SSampleFuncInfo));
pRes->colBytes = (pCtx->currentStage != MERGE_STAGE) ? pCtx->inputBytes : pCtx->outputBytes;
pRes->timeStamps = (int64_t *)((char *)pRes->values + pRes->colBytes * pCtx->param[0].i64);
return true;
}
......@@ -4731,11 +4737,11 @@ static void sample_function(SQLFunctionCtx *pCtx) {
int32_t notNullElems = 0;
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SSampleFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo);
SSampleFuncInfo *pRes = getSampleFuncOutputInfo(pCtx);
if (pRes->values != (tVariant*) ((char*)pRes + sizeof(SSampleFuncInfo))) {
pRes->values = (tVariant*) ((char*)pRes + sizeof(SSampleFuncInfo));
pRes->timeStamps = (int64_t*)((char*)pRes->values + sizeof(tVariant) * pCtx->param[0].i64);
if (pRes->values != ((char*)pRes + sizeof(SSampleFuncInfo))) {
pRes->values = ((char*)pRes + sizeof(SSampleFuncInfo));
pRes->timeStamps = (int64_t*)((char*)pRes->values + pRes->colBytes * pCtx->param[0].i64);
}
for (int32_t i = 0; i < pCtx->size; ++i) {
......@@ -4747,7 +4753,7 @@ static void sample_function(SQLFunctionCtx *pCtx) {
notNullElems++;
TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0;
do_reservoir_sample(pRes, (int32_t)pCtx->param[0].i64, ts, data, pCtx->inputType, pCtx->inputBytes);
do_reservoir_sample(pRes, (int32_t)pCtx->param[0].i64, ts, data, pCtx->inputType, pRes->colBytes);
}
if (!pCtx->hasNull) {
......@@ -4762,6 +4768,23 @@ static void sample_function(SQLFunctionCtx *pCtx) {
}
}
static void sample_func_merge(SQLFunctionCtx *pCtx) {
SSampleFuncInfo* pInput = (SSampleFuncInfo*)GET_INPUT_DATA_LIST(pCtx);
pInput->values = ((char*)pInput + sizeof(SSampleFuncInfo));
pInput->timeStamps = (int64_t*)((char*)pInput->values + pInput->colBytes * pCtx->param[0].i64);
SSampleFuncInfo *pOutput = getSampleFuncOutputInfo(pCtx);
for (int32_t i = 0; i < pInput->numSampled; ++i) {
assignResultSample(pOutput, i, pInput->timeStamps[i],
pInput->values + i * pInput->colBytes, pCtx->outputType, pInput->colBytes);
}
SET_VAL(pCtx, pInput->numSampled, pOutput->numSampled);
if (pOutput->numSampled > 0) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
pResInfo->hasResult = DATA_SET_FLAG;
}
}
static void sample_func_finalizer(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
......@@ -4775,10 +4798,6 @@ static void sample_func_finalizer(SQLFunctionCtx *pCtx) {
GET_TRUE_DATA_TYPE();
copySampleFuncRes(pCtx, type);
for (int32_t i = 0; i < pRes->numSampled; ++i) {
tVariantDestroy(pRes->values + i);
}
doFinalizer(pCtx);
}
......@@ -5266,12 +5285,12 @@ SAggFunctionInfo aAggs[] = {{
// 38
"sample",
TSDB_FUNC_SAMPLE,
TSDB_FUNC_INVALID_ID,
TSDB_FUNC_SAMPLE,
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY,
sample_function_setup,
sample_function,
sample_func_finalizer,
noop1,
sample_func_merge,
dataBlockRequired,
},
{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册