提交 f5eebabd 编写于 作者: S Shenglian Zhou

finish moveagg function

上级 3262f7e2
......@@ -71,14 +71,13 @@ extern "C" {
#define TSDB_FUNC_BLKINFO 33
#define TSDB_FUNC_CSUM 34
#define TSDB_FUNC_HLL 35
#define TSDB_FUNC_MODE 36
#define TSDB_FUNC_SAMPLE 37
#define TSDB_FUNC_MAVG 35
#define TSDB_FUNC_SAMPLE 36
#define TSDB_FUNC_MODE 37
#define TSDB_FUNC_CEIL 38
#define TSDB_FUNC_FLOOR 39
#define TSDB_FUNC_ROUND 40
#define TSDB_FUNC_MAVG 41
#define TSDB_FUNC_HLL 41
#define TSDB_FUNC_HISTOGRAM 42
#define TSDB_FUNCSTATE_SO 0x1u // single output
......
......@@ -4413,7 +4413,7 @@ typedef struct {
int32_t pos;
double sum;
int32_t numPoints;
int64_t* points;
double* points;
} SMovingAvgInfo;
static bool mavg_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) {
......@@ -4422,10 +4422,10 @@ static bool mavg_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResIn
}
SMovingAvgInfo* mavgInfo = GET_ROWCELL_INTERBUF(pResInfo);
mavgInfo->pos = -1;
mavgInfo->pos = 0;
mavgInfo->sum = 0;
mavgInfo->numPoints = (int32_t)pCtx->param[0].i64;
mavgInfo->points = (int64_t*)((char*)mavgInfo + sizeof(mavgInfo));
mavgInfo->points = (double*)((char*)mavgInfo + sizeof(mavgInfo));
return true;
}
......@@ -4445,96 +4445,169 @@ static void mavg_function(SQLFunctionCtx *pCtx) {
switch (pCtx->inputType) {
case TSDB_DATA_TYPE_INT: {
int32_t *pData = (int32_t *)data;
int32_t *pOutput = (int32_t *)pCtx->pOutput;
double *pOutput = (double *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
continue;
}
pCumSumInfo->cumSum += pData[i];
*pTimestamp = (tsList != NULL) ? tsList[i] : 0;
SET_DOUBLE_VAL(pOutput, pCumSumInfo->cumSum);
if (mavgInfo->pos < mavgInfo->numPoints - 1) {
mavgInfo->points[mavgInfo->pos] = (double)pData[i];
mavgInfo->sum += pData[i];
} else {
int32_t pos = mavgInfo->pos % mavgInfo->numPoints;
if (mavgInfo->pos != mavgInfo->numPoints -1) {
mavgInfo->sum = mavgInfo->sum + (double)pData[i] - mavgInfo->points[pos];
} else {
mavgInfo->sum += (double)pData[i];
}
++notNullElems;
pOutput += 1;
pTimestamp += 1;
mavgInfo->points[pos] = pData[i];
*pTimestamp = (tsList != NULL) ? tsList[i] : 0;
SET_DOUBLE_VAL(pOutput, mavgInfo->sum / mavgInfo->numPoints)
++notNullElems;
pOutput += 1;
pTimestamp += 1;
}
++mavgInfo->pos;
}
break;
}
case TSDB_DATA_TYPE_BIGINT: {
int64_t *pData = (int64_t *)data;
int64_t *pOutput = (int64_t *)pCtx->pOutput;
double *pOutput = (double *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
continue;
}
pCumSumInfo->cumSum += pData[i];
*pTimestamp = (tsList != NULL) ? tsList[i] : 0;
SET_DOUBLE_VAL(pOutput, pCumSumInfo->cumSum);
if (mavgInfo->pos < mavgInfo->numPoints - 1) {
mavgInfo->points[mavgInfo->pos] = (double)pData[i];
mavgInfo->sum += pData[i];
} else {
int32_t pos = mavgInfo->pos % mavgInfo->numPoints;
if (mavgInfo->pos != mavgInfo->numPoints -1) {
mavgInfo->sum = mavgInfo->sum + (double)pData[i] - mavgInfo->points[pos];
} else {
mavgInfo->sum += (double)pData[i];
}
++notNullElems;
pOutput += 1;
pTimestamp += 1;
mavgInfo->points[pos] = pData[i];
*pTimestamp = (tsList != NULL) ? tsList[i] : 0;
SET_DOUBLE_VAL(pOutput, mavgInfo->sum / mavgInfo->numPoints)
++notNullElems;
pOutput += 1;
pTimestamp += 1;
}
++mavgInfo->pos;
}
break;
}
case TSDB_DATA_TYPE_TINYINT: {
int8_t *pData = (int8_t *)data;
int8_t *pOutput = (int8_t *)pCtx->pOutput;
double *pOutput = (double *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
continue;
}
pCumSumInfo->cumSum += pData[i];
*pTimestamp = (tsList != NULL) ? tsList[i] : 0;
SET_DOUBLE_VAL(pOutput, pCumSumInfo->cumSum);
if (mavgInfo->pos < mavgInfo->numPoints - 1) {
mavgInfo->points[mavgInfo->pos] = (double)pData[i];
mavgInfo->sum += pData[i];
} else {
int32_t pos = mavgInfo->pos % mavgInfo->numPoints;
if (mavgInfo->pos != mavgInfo->numPoints -1) {
mavgInfo->sum = mavgInfo->sum + (double)pData[i] - mavgInfo->points[pos];
} else {
mavgInfo->sum += (double)pData[i];
}
++notNullElems;
pOutput += 1;
pTimestamp += 1;
mavgInfo->points[pos] = pData[i];
*pTimestamp = (tsList != NULL) ? tsList[i] : 0;
SET_DOUBLE_VAL(pOutput, mavgInfo->sum / mavgInfo->numPoints)
++notNullElems;
pOutput += 1;
pTimestamp += 1;
}
++mavgInfo->pos;
}
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
int16_t *pData = (int16_t *)data;
int16_t *pOutput = (int16_t *)pCtx->pOutput;
double *pOutput = (double *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
continue;
}
pCumSumInfo->cumSum += pData[i];
*pTimestamp = (tsList != NULL) ? tsList[i] : 0;
SET_DOUBLE_VAL(pOutput, pCumSumInfo->cumSum);
if (mavgInfo->pos < mavgInfo->numPoints - 1) {
mavgInfo->points[mavgInfo->pos] = (double)pData[i];
mavgInfo->sum += pData[i];
} else {
int32_t pos = mavgInfo->pos % mavgInfo->numPoints;
if (mavgInfo->pos != mavgInfo->numPoints -1) {
mavgInfo->sum = mavgInfo->sum + (double)pData[i] - mavgInfo->points[pos];
} else {
mavgInfo->sum += (double)pData[i];
}
++notNullElems;
pOutput += 1;
pTimestamp += 1;
mavgInfo->points[pos] = pData[i];
*pTimestamp = (tsList != NULL) ? tsList[i] : 0;
SET_DOUBLE_VAL(pOutput, mavgInfo->sum / mavgInfo->numPoints)
++notNullElems;
pOutput += 1;
pTimestamp += 1;
}
++mavgInfo->pos;
}
break;
}
case TSDB_DATA_TYPE_FLOAT: {
float *pData = (float *)data;
float *pOutput = (float *)pCtx->pOutput;
double *pOutput = (double *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
continue;
}
pCumSumInfo->cumSum += pData[i];
*pTimestamp = (tsList != NULL) ? tsList[i] : 0;
SET_DOUBLE_VAL(pOutput, pCumSumInfo->cumSum);
if (mavgInfo->pos < mavgInfo->numPoints - 1) {
mavgInfo->points[mavgInfo->pos] = (double)pData[i];
mavgInfo->sum += pData[i];
} else {
int32_t pos = mavgInfo->pos % mavgInfo->numPoints;
if (mavgInfo->pos != mavgInfo->numPoints -1) {
mavgInfo->sum = mavgInfo->sum + (double)pData[i] - mavgInfo->points[pos];
} else {
mavgInfo->sum += (double)pData[i];
}
++notNullElems;
pOutput += 1;
pTimestamp += 1;
mavgInfo->points[pos] = pData[i];
*pTimestamp = (tsList != NULL) ? tsList[i] : 0;
SET_DOUBLE_VAL(pOutput, mavgInfo->sum / mavgInfo->numPoints)
++notNullElems;
pOutput += 1;
pTimestamp += 1;
}
++mavgInfo->pos;
}
break;
}
......@@ -4546,16 +4619,31 @@ static void mavg_function(SQLFunctionCtx *pCtx) {
continue;
}
pCumSumInfo->cumSum += pData[i];
*pTimestamp = (tsList != NULL) ? tsList[i] : 0;
SET_DOUBLE_VAL(pOutput, pCumSumInfo->cumSum);
if (mavgInfo->pos < mavgInfo->numPoints - 1) {
mavgInfo->points[mavgInfo->pos] = (double)pData[i];
mavgInfo->sum += pData[i];
} else {
int32_t pos = mavgInfo->pos % mavgInfo->numPoints;
if (mavgInfo->pos != mavgInfo->numPoints - 1) {
mavgInfo->sum = mavgInfo->sum + (double)pData[i] - mavgInfo->points[pos];
} else {
mavgInfo->sum += (double)pData[i];
}
++notNullElems;
pOutput += 1;
pTimestamp += 1;
mavgInfo->points[pos] = pData[i];
*pTimestamp = (tsList != NULL) ? tsList[i] : 0;
SET_DOUBLE_VAL(pOutput, mavgInfo->sum / mavgInfo->numPoints)
++notNullElems;
pOutput += 1;
pTimestamp += 1;
}
++mavgInfo->pos;
}
break;
}
default:
qError("error input type");
}
......@@ -4568,7 +4656,179 @@ static void mavg_function(SQLFunctionCtx *pCtx) {
}
}
//////////////////////////////////////////////////////////////////////////////////
typedef struct {
int32_t notNullElems;
int32_t num;
tValuePair **res;
} SSampleFuncInfo;
static void sampleValuePairAssign(tValuePair *dst, tVariant* srcVariant, int64_t tsKey, char *pTags, SExtTagsInfo *pTagInfo) {
dst->timestamp = tsKey;
tVariantAssign(&dst->v, srcVariant);
int32_t size = 0;
for (int32_t i = 0; i < pTagInfo->numOfTagCols; ++i) {
SQLFunctionCtx* ctx = pTagInfo->pTagCtxList[i];
if (ctx->functionId == TSDB_FUNC_TS_DUMMY) {
ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
ctx->tag.i64 = tsKey;
}
tVariantDump(&ctx->tag, dst->pTags + size, ctx->tag.nType, true);
size += pTagInfo->pTagCtxList[i]->outputBytes;
}
}
static void do_sample_function_add(SSampleFuncInfo *pInfo, int32_t maxLen, void *pData, int64_t ts, uint16_t type, SExtTagsInfo *pTagInfo, char *pTags) {
tVariant val = {0};
tVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type);
tValuePair **pList = pInfo->res;
assert(pList != NULL);
pInfo->notNullElems++;
if (pInfo->num < maxLen) {
sampleValuePairAssign(pList[pInfo->num], &val, ts, pTags, pTagInfo);
pInfo->num++;
} else {
int32_t j = rand() % (pInfo->notNullElems);
if (j < maxLen) {
sampleValuePairAssign(pList[j], &val, ts, pTags, pTagInfo);
}
}
}
static void copySampleFuncRes(SQLFunctionCtx *pCtx, int32_t type) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SSampleFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo);
tValuePair **tvp = pRes->res;
int32_t step = QUERY_ASC_FORWARD_STEP;
int32_t len = (int32_t)(GET_RES_INFO(pCtx)->numOfRes);
for (int32_t i = 0; i < len; ++i) {
char* output = pCtx->pOutput;
tVariantDump(&tvp[i]->v, (char*)output, type, true);
output += step * (pCtx->outputBytes);
}
// set the output timestamp of each record.
TSKEY *output = pCtx->ptsOutputBuf;
for (int32_t i = 0; i < len; ++i, output += step) {
*output = tvp[i]->timestamp;
}
// set the corresponding tag data for each record
// todo check malloc failure
char **pData = calloc(pCtx->tagInfo.numOfTagCols, POINTER_BYTES);
for (int32_t i = 0; i < pCtx->tagInfo.numOfTagCols; ++i) {
pData[i] = pCtx->tagInfo.pTagCtxList[i]->pOutput;
}
for (int32_t i = 0; i < len; ++i, output += step) {
int16_t offset = 0;
for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) {
memcpy(pData[j], tvp[i]->pTags + offset, (size_t)pCtx->tagInfo.pTagCtxList[j]->outputBytes);
offset += pCtx->tagInfo.pTagCtxList[j]->outputBytes;
pData[j] += pCtx->tagInfo.pTagCtxList[j]->outputBytes;
}
}
tfree(pData);
}
/*
* +-----------------------------------+-------------one value pair-----------+------------next value pair-----------+
* |-------------pointer area----------|----ts---+-----+-----n tags-----------|----ts---+-----+-----n tags-----------|
* +..[Value Pointer1][Value Pointer2].|timestamp|value|tags1|tags2|....|tagsn|timestamp|value|tags1|tags2|....|tagsn+
*/
static void buildSampleFuncStruct(SSampleFuncInfo *pSampleFuncInfo, SQLFunctionCtx *pCtx) {
char *tmp = (char *)pSampleFuncInfo + sizeof(SSampleFuncInfo);
pSampleFuncInfo->res = (tValuePair**) tmp;
tmp += POINTER_BYTES * pCtx->param[0].i64;
size_t size = sizeof(tValuePair) + pCtx->tagInfo.tagsLen;
for (int32_t i = 0; i < pCtx->param[0].i64; ++i) {
pSampleFuncInfo->res[i] = (tValuePair*) tmp;
pSampleFuncInfo->res[i]->pTags = tmp + sizeof(tValuePair);
tmp += size;
}
}
static bool sample_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) {
if (!function_setup(pCtx, pResInfo)) {
return false;
}
SSampleFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo);
buildSampleFuncStruct(pRes, pCtx);
srand(taosSafeRand());
pRes->notNullElems = 0;
return true;
}
static void sample_function(SQLFunctionCtx *pCtx) {
int32_t notNullElems = 0;
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SSampleFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo);
assert(pRes->num >= 0);
if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(SSampleFuncInfo) + POINTER_BYTES * pCtx->param[0].i64)) {
buildSampleFuncStruct(pRes, pCtx);
}
for (int32_t i = 0; i < pCtx->size; ++i) {
char *data = GET_INPUT_DATA(pCtx, i);
if (pCtx->hasNull && isNull(data, pCtx->inputType)) {
continue;
}
notNullElems++;
// NOTE: Set the default timestamp if it is missing [todo refactor]
TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0;
do_sample_function_add(pRes, (int32_t)pCtx->param[0].i64, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL);
}
if (!pCtx->hasNull) {
assert(pCtx->size == notNullElems);
}
// treat the result as only one result
SET_VAL(pCtx, notNullElems, 1);
if (notNullElems > 0) {
pResInfo->hasResult = DATA_SET_FLAG;
}
}
static void sample_func_finalizer(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SSampleFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo);
if (pRes->num == 0) { // no result
assert(pResInfo->hasResult != DATA_SET_FLAG);
}
GET_TRUE_DATA_TYPE();
copySampleFuncRes(pCtx, type);
doFinalizer(pCtx);
}
//////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////
/*
......@@ -5015,4 +5275,28 @@ SAggFunctionInfo aAggs[] = {{
noop1,
dataBlockRequired,
},
{
// 35
"mavg",
TSDB_FUNC_MAVG,
TSDB_FUNC_INVALID_ID,
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY,
mavg_function_setup,
mavg_function,
doFinalizer,
noop1,
dataBlockRequired,
},
{
// 36
"sample",
TSDB_FUNC_SAMPLE,
TSDB_FUNC_INVALID_ID,
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY,
sample_function_setup,
sample_function,
sample_func_finalizer,
noop1,
dataBlockRequired,
},
};
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册