提交 fbeb0098 编写于 作者: S shenglian zhou

finish the csum/mavg/sample functions implementation

上级 ce7c99ff
...@@ -2499,6 +2499,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -2499,6 +2499,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
case TSDB_FUNC_MAX: case TSDB_FUNC_MAX:
case TSDB_FUNC_DIFF: case TSDB_FUNC_DIFF:
case TSDB_FUNC_DERIVATIVE: case TSDB_FUNC_DERIVATIVE:
case TSDB_FUNC_CSUM:
case TSDB_FUNC_CEIL: case TSDB_FUNC_CEIL:
case TSDB_FUNC_FLOOR: case TSDB_FUNC_FLOOR:
case TSDB_FUNC_ROUND: case TSDB_FUNC_ROUND:
...@@ -2551,7 +2552,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -2551,7 +2552,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
} }
// set the first column ts for diff query // set the first column ts for diff query
if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) { if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE || functionId == TSDB_FUNC_CSUM) {
SColumnIndex indexTS = {.tableIndex = index.tableIndex, .columnIndex = 0}; SColumnIndex indexTS = {.tableIndex = index.tableIndex, .columnIndex = 0};
SExprInfo* pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP, SExprInfo* pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP,
TSDB_KEYSIZE, getNewResColId(pCmd), TSDB_KEYSIZE, false); TSDB_KEYSIZE, getNewResColId(pCmd), TSDB_KEYSIZE, false);
...@@ -2747,6 +2748,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -2747,6 +2748,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
case TSDB_FUNC_TOP: case TSDB_FUNC_TOP:
case TSDB_FUNC_BOTTOM: case TSDB_FUNC_BOTTOM:
case TSDB_FUNC_MAVG:
case TSDB_FUNC_SAMPLE:
case TSDB_FUNC_PERCT: case TSDB_FUNC_PERCT:
case TSDB_FUNC_APERCT: { case TSDB_FUNC_APERCT: {
// 1. valid the number of parameters // 1. valid the number of parameters
...@@ -2778,7 +2781,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -2778,7 +2781,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
} }
// 2. valid the column type // 2. valid the column type
if (!IS_NUMERIC_TYPE(pSchema->type)) { if (functionId != TSDB_FUNC_SAMPLE && !IS_NUMERIC_TYPE(pSchema->type)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
} }
...@@ -2820,13 +2823,13 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -2820,13 +2823,13 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
} else { } else {
tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT, true); tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT, true);
int64_t nTop = GET_INT32_VAL(val); int64_t numRowsSelected = GET_INT32_VAL(val);
if (nTop <= 0 || nTop > 100) { // todo use macro if (numRowsSelected <= 0 || numRowsSelected > 100) { // todo use macro
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg12); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg12);
} }
// todo REFACTOR // todo REFACTOR
// set the first column ts for top/bottom query // set the first column ts for top/bottom/mavg/sample query
SColumnIndex index1 = {index.tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX}; SColumnIndex index1 = {index.tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX};
pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS, &index1, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, getNewResColId(pCmd), pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS, &index1, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, getNewResColId(pCmd),
0, false); 0, false);
...@@ -6284,7 +6287,9 @@ int32_t validateFunctionsInIntervalOrGroupbyQuery(SSqlCmd* pCmd, SQueryInfo* pQu ...@@ -6284,7 +6287,9 @@ int32_t validateFunctionsInIntervalOrGroupbyQuery(SSqlCmd* pCmd, SQueryInfo* pQu
} }
int32_t f = pExpr->base.functionId; int32_t f = pExpr->base.functionId;
if ((f == TSDB_FUNC_PRJ && pExpr->base.numOfParams == 0) || f == TSDB_FUNC_DIFF || f == TSDB_FUNC_ARITHM || f == TSDB_FUNC_DERIVATIVE || if ((f == TSDB_FUNC_PRJ && pExpr->base.numOfParams == 0) ||
f == TSDB_FUNC_DIFF || f == TSDB_FUNC_ARITHM || f == TSDB_FUNC_DERIVATIVE ||
f == TSDB_FUNC_CSUM || f == TSDB_FUNC_MAVG ||
f == TSDB_FUNC_CEIL || f == TSDB_FUNC_FLOOR || f == TSDB_FUNC_ROUND) f == TSDB_FUNC_CEIL || f == TSDB_FUNC_FLOOR || f == TSDB_FUNC_ROUND)
{ {
isProjectionFunction = true; isProjectionFunction = true;
......
...@@ -169,6 +169,24 @@ typedef struct SDerivInfo { ...@@ -169,6 +169,24 @@ typedef struct SDerivInfo {
bool valueSet; // the value has been set already bool valueSet; // the value has been set already
} SDerivInfo; } SDerivInfo;
typedef struct {
double cumSum;
} SCumSumInfo;
typedef struct {
int32_t pos;
double sum;
int32_t numPointsK;
double* points;
} SMovingAvgInfo;
typedef struct {
int32_t totalPoints;
int32_t numSampled;
tVariant *values;
int64_t *timeStamps;
} SSampleFuncInfo;
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type, int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
int16_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo) { int16_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo) {
if (!isValidDataType(dataType)) { if (!isValidDataType(dataType)) {
...@@ -237,6 +255,35 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -237,6 +255,35 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (functionId == TSDB_FUNC_CSUM) {
if (IS_SIGNED_NUMERIC_TYPE(dataType)) {
*type = TSDB_DATA_TYPE_BIGINT;
} else if (IS_UNSIGNED_NUMERIC_TYPE(dataType)) {
*type = TSDB_DATA_TYPE_UBIGINT;
} else {
*type = TSDB_DATA_TYPE_DOUBLE;
}
*bytes = sizeof(int64_t);
*interBytes = sizeof(SCumSumInfo);
return TSDB_CODE_SUCCESS;
}
if (functionId == TSDB_FUNC_MAVG) {
*type = TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(double);
*interBytes = sizeof(SMovingAvgInfo) + sizeof(double) * param;
return TSDB_CODE_SUCCESS;
}
if (functionId == TSDB_FUNC_SAMPLE) {
*type = (int16_t)dataType;
*bytes = (int16_t)dataBytes;
size_t size = sizeof(SSampleFuncInfo) + sizeof(tVariant) * param + sizeof(int64_t) * param;
*interBytes = (int32_t)size;
return TSDB_CODE_SUCCESS;
}
if (isSuperTable) { if (isSuperTable) {
if (functionId < 0) { if (functionId < 0) {
if (pUdfInfo->bufSize > 0) { if (pUdfInfo->bufSize > 0) {
...@@ -4085,6 +4132,8 @@ static void irate_function(SQLFunctionCtx *pCtx) { ...@@ -4085,6 +4132,8 @@ static void irate_function(SQLFunctionCtx *pCtx) {
} }
} }
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
void blockInfo_func(SQLFunctionCtx* pCtx) { void blockInfo_func(SQLFunctionCtx* pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
STableBlockDist* pDist = (STableBlockDist*) GET_ROWCELL_INTERBUF(pResInfo); STableBlockDist* pDist = (STableBlockDist*) GET_ROWCELL_INTERBUF(pResInfo);
...@@ -4258,6 +4307,8 @@ void blockinfo_func_finalizer(SQLFunctionCtx* pCtx) { ...@@ -4258,6 +4307,8 @@ void blockinfo_func_finalizer(SQLFunctionCtx* pCtx) {
doFinalizer(pCtx); doFinalizer(pCtx);
} }
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
#define CFR_SET_VAL(type, data, pCtx, func, i, step, notNullElems) \ #define CFR_SET_VAL(type, data, pCtx, func, i, step, notNullElems) \
do { \ do { \
type *pData = (type *) data; \ type *pData = (type *) data; \
...@@ -4484,10 +4535,7 @@ static void round_function(SQLFunctionCtx *pCtx) { ...@@ -4484,10 +4535,7 @@ static void round_function(SQLFunctionCtx *pCtx) {
#undef CFR_SET_VAL_DOUBLE #undef CFR_SET_VAL_DOUBLE
////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////
//cumulative_sum function
typedef struct {
double cumSum;
} SCumSumInfo;
static bool csum_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { static bool csum_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) {
if (!function_setup(pCtx, pResInfo)) { if (!function_setup(pCtx, pResInfo)) {
...@@ -4503,8 +4551,6 @@ static void csum_function(SQLFunctionCtx *pCtx) { ...@@ -4503,8 +4551,6 @@ static void csum_function(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SCumSumInfo* pCumSumInfo = GET_ROWCELL_INTERBUF(pResInfo); SCumSumInfo* pCumSumInfo = GET_ROWCELL_INTERBUF(pResInfo);
void* data = GET_INPUT_DATA_LIST(pCtx);
int32_t notNullElems = 0; int32_t notNullElems = 0;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
int32_t i = (pCtx->order = TSDB_ORDER_ASC) ? 0 : pCtx->size -1; int32_t i = (pCtx->order = TSDB_ORDER_ASC) ? 0 : pCtx->size -1;
...@@ -4512,125 +4558,37 @@ static void csum_function(SQLFunctionCtx *pCtx) { ...@@ -4512,125 +4558,37 @@ static void csum_function(SQLFunctionCtx *pCtx) {
TSKEY* pTimestamp = pCtx->ptsOutputBuf; TSKEY* pTimestamp = pCtx->ptsOutputBuf;
TSKEY* tsList = GET_TS_LIST(pCtx); TSKEY* tsList = GET_TS_LIST(pCtx);
switch (pCtx->inputType) { qDebug("%p csum_function() size:%d, hasNull:%d", pCtx, pCtx->size, pCtx->hasNull);
case TSDB_DATA_TYPE_INT: {
int32_t *pData = (int32_t *)data;
int32_t *pOutput = (int32_t *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
continue;
}
pCumSumInfo->cumSum += pData[i];
*pTimestamp = (tsList != NULL) ? tsList[i] : 0;
SET_DOUBLE_VAL(pOutput, pCumSumInfo->cumSum);
++notNullElems;
pOutput += 1;
pTimestamp += 1;
}
break;
}
case TSDB_DATA_TYPE_BIGINT: {
int64_t *pData = (int64_t *)data;
int64_t *pOutput = (int64_t *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
continue;
}
pCumSumInfo->cumSum += pData[i];
*pTimestamp = (tsList != NULL) ? tsList[i] : 0;
SET_DOUBLE_VAL(pOutput, pCumSumInfo->cumSum);
++notNullElems;
pOutput += 1;
pTimestamp += 1;
}
break;
}
case TSDB_DATA_TYPE_TINYINT: {
int8_t *pData = (int8_t *)data;
int8_t *pOutput = (int8_t *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
continue;
}
pCumSumInfo->cumSum += pData[i];
*pTimestamp = (tsList != NULL) ? tsList[i] : 0;
SET_DOUBLE_VAL(pOutput, pCumSumInfo->cumSum);
++notNullElems;
pOutput += 1;
pTimestamp += 1;
}
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
int16_t *pData = (int16_t *)data;
int16_t *pOutput = (int16_t *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) { for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { char* pData = GET_INPUT_DATA(pCtx, i);
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
qDebug("%p csum_function() index of null data:%d", pCtx, i);
continue; continue;
} }
pCumSumInfo->cumSum += pData[i]; double v = 0;
*pTimestamp = (tsList != NULL) ? tsList[i] : 0; GET_TYPED_DATA(v, double, pCtx->inputType, pData);
SET_DOUBLE_VAL(pOutput, pCumSumInfo->cumSum); pCumSumInfo->cumSum += v;
++notNullElems;
pOutput += 1;
pTimestamp += 1;
}
break;
}
case TSDB_DATA_TYPE_FLOAT: {
float *pData = (float *)data;
float *pOutput = (float *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
continue;
}
pCumSumInfo->cumSum += pData[i];
*pTimestamp = (tsList != NULL) ? tsList[i] : 0; *pTimestamp = (tsList != NULL) ? tsList[i] : 0;
SET_DOUBLE_VAL(pOutput, pCumSumInfo->cumSum); if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) {
int64_t *retVal = (int64_t *)pCtx->pOutput;
++notNullElems; *retVal = (int64_t)(pCumSumInfo->cumSum);
pOutput += 1; } else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) {
pTimestamp += 1; uint64_t *retVal = (uint64_t *)pCtx->pOutput;
} *retVal = (uint64_t)(pCumSumInfo->cumSum);
break; } else if (IS_FLOAT_TYPE(pCtx->inputType)) {
} double *retVal = (double*) pCtx->pOutput;
SET_DOUBLE_VAL(retVal, pCumSumInfo->cumSum);
case TSDB_DATA_TYPE_DOUBLE: {
double *pData = (double *)data;
double *pOutput = (double *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
continue;
} }
pCumSumInfo->cumSum += pData[i];
*pTimestamp = (tsList != NULL) ? tsList[i] : 0;
SET_DOUBLE_VAL(pOutput, pCumSumInfo->cumSum);
++notNullElems; ++notNullElems;
pOutput += 1; pCtx->pOutput += pCtx->outputBytes;
pTimestamp += 1; pTimestamp++;
}
break;
}
default:
qError("error input type");
} }
if (notNullElems <= 0) { if (notNullElems == 0) {
assert(pCtx->hasNull); assert(pCtx->hasNull);
} else { } else {
GET_RES_INFO(pCtx)->numOfRes += notNullElems; GET_RES_INFO(pCtx)->numOfRes += notNullElems;
...@@ -4639,14 +4597,7 @@ static void csum_function(SQLFunctionCtx *pCtx) { ...@@ -4639,14 +4597,7 @@ static void csum_function(SQLFunctionCtx *pCtx) {
} }
////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////
// Simple Moving_average function
typedef struct {
int32_t pos;
double sum;
int32_t numPoints;
double* points;
} SMovingAvgInfo;
static bool mavg_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { static bool mavg_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) {
if (!function_setup(pCtx, pResInfo)) { if (!function_setup(pCtx, pResInfo)) {
...@@ -4656,7 +4607,7 @@ static bool mavg_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResIn ...@@ -4656,7 +4607,7 @@ static bool mavg_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResIn
SMovingAvgInfo* mavgInfo = GET_ROWCELL_INTERBUF(pResInfo); SMovingAvgInfo* mavgInfo = GET_ROWCELL_INTERBUF(pResInfo);
mavgInfo->pos = 0; mavgInfo->pos = 0;
mavgInfo->sum = 0; mavgInfo->sum = 0;
mavgInfo->numPoints = (int32_t)pCtx->param[0].i64; mavgInfo->numPointsK = (int32_t)pCtx->param[0].i64;
mavgInfo->points = (double*)((char*)mavgInfo + sizeof(mavgInfo)); mavgInfo->points = (double*)((char*)mavgInfo + sizeof(mavgInfo));
return true; return true;
} }
...@@ -4665,8 +4616,6 @@ static void mavg_function(SQLFunctionCtx *pCtx) { ...@@ -4665,8 +4616,6 @@ static void mavg_function(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SMovingAvgInfo* mavgInfo = GET_ROWCELL_INTERBUF(pResInfo); SMovingAvgInfo* mavgInfo = GET_ROWCELL_INTERBUF(pResInfo);
void* data = GET_INPUT_DATA_LIST(pCtx);
int32_t notNullElems = 0; int32_t notNullElems = 0;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
int32_t i = (pCtx->order = TSDB_ORDER_ASC) ? 0 : pCtx->size -1; int32_t i = (pCtx->order = TSDB_ORDER_ASC) ? 0 : pCtx->size -1;
...@@ -4674,211 +4623,39 @@ static void mavg_function(SQLFunctionCtx *pCtx) { ...@@ -4674,211 +4623,39 @@ static void mavg_function(SQLFunctionCtx *pCtx) {
TSKEY* pTimestamp = pCtx->ptsOutputBuf; TSKEY* pTimestamp = pCtx->ptsOutputBuf;
TSKEY* tsList = GET_TS_LIST(pCtx); TSKEY* tsList = GET_TS_LIST(pCtx);
switch (pCtx->inputType) {
case TSDB_DATA_TYPE_INT: {
int32_t *pData = (int32_t *)data;
double *pOutput = (double *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) { for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { char* pData = GET_INPUT_DATA(pCtx, i);
continue; if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
} qDebug("%p mavg_function() index of null data:%d", pCtx, i);
if (mavgInfo->pos < mavgInfo->numPoints - 1) {
mavgInfo->points[mavgInfo->pos] = (double)pData[i];
mavgInfo->sum += pData[i];
} else {
int32_t pos = mavgInfo->pos % mavgInfo->numPoints;
if (mavgInfo->pos != mavgInfo->numPoints -1) {
mavgInfo->sum = mavgInfo->sum + (double)pData[i] - mavgInfo->points[pos];
} else {
mavgInfo->sum += (double)pData[i];
}
mavgInfo->points[pos] = pData[i];
*pTimestamp = (tsList != NULL) ? tsList[i] : 0;
SET_DOUBLE_VAL(pOutput, mavgInfo->sum / mavgInfo->numPoints)
++notNullElems;
pOutput += 1;
pTimestamp += 1;
}
++mavgInfo->pos;
}
break;
}
case TSDB_DATA_TYPE_BIGINT: {
int64_t *pData = (int64_t *)data;
double *pOutput = (double *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
continue;
}
if (mavgInfo->pos < mavgInfo->numPoints - 1) {
mavgInfo->points[mavgInfo->pos] = (double)pData[i];
mavgInfo->sum += pData[i];
} else {
int32_t pos = mavgInfo->pos % mavgInfo->numPoints;
if (mavgInfo->pos != mavgInfo->numPoints -1) {
mavgInfo->sum = mavgInfo->sum + (double)pData[i] - mavgInfo->points[pos];
} else {
mavgInfo->sum += (double)pData[i];
}
mavgInfo->points[pos] = pData[i];
*pTimestamp = (tsList != NULL) ? tsList[i] : 0;
SET_DOUBLE_VAL(pOutput, mavgInfo->sum / mavgInfo->numPoints)
++notNullElems;
pOutput += 1;
pTimestamp += 1;
}
++mavgInfo->pos;
}
break;
}
case TSDB_DATA_TYPE_TINYINT: {
int8_t *pData = (int8_t *)data;
double *pOutput = (double *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
continue;
}
if (mavgInfo->pos < mavgInfo->numPoints - 1) {
mavgInfo->points[mavgInfo->pos] = (double)pData[i];
mavgInfo->sum += pData[i];
} else {
int32_t pos = mavgInfo->pos % mavgInfo->numPoints;
if (mavgInfo->pos != mavgInfo->numPoints -1) {
mavgInfo->sum = mavgInfo->sum + (double)pData[i] - mavgInfo->points[pos];
} else {
mavgInfo->sum += (double)pData[i];
}
mavgInfo->points[pos] = pData[i];
*pTimestamp = (tsList != NULL) ? tsList[i] : 0;
SET_DOUBLE_VAL(pOutput, mavgInfo->sum / mavgInfo->numPoints)
++notNullElems;
pOutput += 1;
pTimestamp += 1;
}
++mavgInfo->pos;
}
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
int16_t *pData = (int16_t *)data;
double *pOutput = (double *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
continue; continue;
} }
if (mavgInfo->pos < mavgInfo->numPoints - 1) { double v = 0;
mavgInfo->points[mavgInfo->pos] = (double)pData[i]; GET_TYPED_DATA(v, double, pCtx->inputType, pData);
mavgInfo->sum += pData[i];
} else {
int32_t pos = mavgInfo->pos % mavgInfo->numPoints;
if (mavgInfo->pos != mavgInfo->numPoints -1) {
mavgInfo->sum = mavgInfo->sum + (double)pData[i] - mavgInfo->points[pos];
} else {
mavgInfo->sum += (double)pData[i];
}
mavgInfo->points[pos] = pData[i];
*pTimestamp = (tsList != NULL) ? tsList[i] : 0;
SET_DOUBLE_VAL(pOutput, mavgInfo->sum / mavgInfo->numPoints)
++notNullElems;
pOutput += 1;
pTimestamp += 1;
}
++mavgInfo->pos;
}
break;
}
case TSDB_DATA_TYPE_FLOAT: {
float *pData = (float *)data;
double *pOutput = (double *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
continue;
}
if (mavgInfo->pos < mavgInfo->numPoints - 1) { if (mavgInfo->pos < mavgInfo->numPointsK - 1) {
mavgInfo->points[mavgInfo->pos] = (double)pData[i]; mavgInfo->points[mavgInfo->pos] = v;
mavgInfo->sum += pData[i]; mavgInfo->sum += v;
} else { } else {
int32_t pos = mavgInfo->pos % mavgInfo->numPoints; int32_t pos = mavgInfo->pos % mavgInfo->numPointsK;
if (mavgInfo->pos != mavgInfo->numPoints -1) { if (mavgInfo->pos != mavgInfo->numPointsK -1) {
mavgInfo->sum = mavgInfo->sum + (double)pData[i] - mavgInfo->points[pos]; mavgInfo->sum = mavgInfo->sum + v - mavgInfo->points[pos];
} else { } else {
mavgInfo->sum += (double)pData[i]; mavgInfo->sum += v;
} }
mavgInfo->points[pos] = pData[i]; mavgInfo->points[pos] = v;
*pTimestamp = (tsList != NULL) ? tsList[i] : 0; *pTimestamp = (tsList != NULL) ? tsList[i] : 0;
SET_DOUBLE_VAL(pOutput, mavgInfo->sum / mavgInfo->numPoints) SET_DOUBLE_VAL(pCtx->pOutput, mavgInfo->sum / mavgInfo->numPointsK)
++notNullElems;
pOutput += 1;
pTimestamp += 1;
}
++mavgInfo->pos;
}
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
double *pData = (double *)data;
double *pOutput = (double *)pCtx->pOutput;
for (; i < pCtx->size && i >= 0; i += step) {
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
continue;
}
if (mavgInfo->pos < mavgInfo->numPoints - 1) {
mavgInfo->points[mavgInfo->pos] = (double)pData[i];
mavgInfo->sum += pData[i];
} else {
int32_t pos = mavgInfo->pos % mavgInfo->numPoints;
if (mavgInfo->pos != mavgInfo->numPoints - 1) {
mavgInfo->sum = mavgInfo->sum + (double)pData[i] - mavgInfo->points[pos];
} else {
mavgInfo->sum += (double)pData[i];
}
mavgInfo->points[pos] = pData[i];
*pTimestamp = (tsList != NULL) ? tsList[i] : 0;
SET_DOUBLE_VAL(pOutput, mavgInfo->sum / mavgInfo->numPoints)
++notNullElems; ++notNullElems;
pOutput += 1; pCtx->pOutput += pCtx->outputBytes;
pTimestamp += 1; pCtx->ptsOutputBuf++;
} }
++mavgInfo->pos; ++mavgInfo->pos;
} }
break;
}
default:
qError("error input type");
}
if (notNullElems <= 0) { if (notNullElems <= 0) {
assert(pCtx->hasNull); assert(pCtx->hasNull);
...@@ -4888,123 +4665,64 @@ static void mavg_function(SQLFunctionCtx *pCtx) { ...@@ -4888,123 +4665,64 @@ static void mavg_function(SQLFunctionCtx *pCtx) {
} }
} }
////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////
typedef struct { // Sample function with reservoir sampling algorithm
int32_t notNullElems;
int32_t num;
tValuePair **res;
} SSampleFuncInfo;
static void sampleValuePairAssign(tValuePair *dst, tVariant* srcVariant, int64_t tsKey, char *pTags, SExtTagsInfo *pTagInfo) {
dst->timestamp = tsKey;
tVariantAssign(&dst->v, srcVariant);
int32_t size = 0;
for (int32_t i = 0; i < pTagInfo->numOfTagCols; ++i) { static void assignResultSample(SSampleFuncInfo *pInfo, int32_t index, int64_t ts, void *pData, uint16_t type, int16_t bytes) {
SQLFunctionCtx* ctx = pTagInfo->pTagCtxList[i]; if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
if (ctx->functionId == TSDB_FUNC_TS_DUMMY) { int32_t maxLen = bytes - VARSTR_HEADER_SIZE;
ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; int32_t len = (varDataLen(pData) > maxLen)? maxLen:varDataLen(pData);
ctx->tag.i64 = tsKey; tVariantCreateFromBinary(pInfo->values + index, varDataVal(pData), len, type);
} } else {
tVariantCreateFromBinary(pInfo->values + index, pData, bytes, type);
tVariantDump(&ctx->tag, dst->pTags + size, ctx->tag.nType, true);
size += pTagInfo->pTagCtxList[i]->outputBytes;
} }
*(pInfo->timeStamps + pInfo->numSampled) = ts;
return;
} }
static void do_reservoir_sample(SSampleFuncInfo *pInfo, int32_t samplesK, int64_t ts, void *pData, uint16_t type, int16_t bytes) {
static void do_sample_function_add(SSampleFuncInfo *pInfo, int32_t maxLen, void *pData, int64_t ts, uint16_t type, SExtTagsInfo *pTagInfo, char *pTags) { pInfo->totalPoints++;
tVariant val = {0}; if (pInfo->numSampled < samplesK) {
tVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type); assignResultSample(pInfo, pInfo->numSampled, ts, pData, type, bytes);
pInfo->numSampled++;
tValuePair **pList = pInfo->res;
assert(pList != NULL);
pInfo->notNullElems++;
if (pInfo->num < maxLen) {
sampleValuePairAssign(pList[pInfo->num], &val, ts, pTags, pTagInfo);
pInfo->num++;
} else { } else {
int32_t j = rand() % (pInfo->notNullElems); int32_t j = rand() % (pInfo->totalPoints);
if (j < maxLen) { if (j < samplesK) {
sampleValuePairAssign(pList[j], &val, ts, pTags, pTagInfo); assignResultSample(pInfo, j, ts, pData, bytes, type);
*(pInfo->timeStamps + j) = ts;
} }
} }
} }
static void copySampleFuncRes(SQLFunctionCtx *pCtx, int32_t type) { static void copySampleFuncRes(SQLFunctionCtx *pCtx, int32_t type) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SSampleFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo); SSampleFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo);
tValuePair **tvp = pRes->res; TSKEY* pTimestamp = pCtx->ptsOutputBuf;
for (int32_t i = 0; i < pRes->numSampled; ++i) {
int32_t step = QUERY_ASC_FORWARD_STEP;
int32_t len = (int32_t)(GET_RES_INFO(pCtx)->numOfRes);
for (int32_t i = 0; i < len; ++i) {
char* output = pCtx->pOutput; char* output = pCtx->pOutput;
tVariantDump(&tvp[i]->v, (char*)output, type, true); tVariantDump(pRes->values + i, (char*)output, type, true);
output += step * (pCtx->outputBytes); *pTimestamp = *(pRes->timeStamps + i);
}
// set the output timestamp of each record.
TSKEY *output = pCtx->ptsOutputBuf;
for (int32_t i = 0; i < len; ++i, output += step) {
*output = tvp[i]->timestamp;
}
// set the corresponding tag data for each record
// todo check malloc failure
char **pData = calloc(pCtx->tagInfo.numOfTagCols, POINTER_BYTES);
for (int32_t i = 0; i < pCtx->tagInfo.numOfTagCols; ++i) {
pData[i] = pCtx->tagInfo.pTagCtxList[i]->pOutput;
}
for (int32_t i = 0; i < len; ++i, output += step) { pCtx->pOutput += pCtx->outputBytes;
int16_t offset = 0; pTimestamp++;
for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) {
memcpy(pData[j], tvp[i]->pTags + offset, (size_t)pCtx->tagInfo.pTagCtxList[j]->outputBytes);
offset += pCtx->tagInfo.pTagCtxList[j]->outputBytes;
pData[j] += pCtx->tagInfo.pTagCtxList[j]->outputBytes;
}
} }
tfree(pData);
} }
/*
* +-----------------------------------+-------------one value pair-----------+------------next value pair-----------+
* |-------------pointer area----------|----ts---+-----+-----n tags-----------|----ts---+-----+-----n tags-----------|
* +..[Value Pointer1][Value Pointer2].|timestamp|value|tags1|tags2|....|tagsn|timestamp|value|tags1|tags2|....|tagsn+
*/
static void buildSampleFuncStruct(SSampleFuncInfo *pSampleFuncInfo, SQLFunctionCtx *pCtx) {
char *tmp = (char *)pSampleFuncInfo + sizeof(SSampleFuncInfo);
pSampleFuncInfo->res = (tValuePair**) tmp;
tmp += POINTER_BYTES * pCtx->param[0].i64;
size_t size = sizeof(tValuePair) + pCtx->tagInfo.tagsLen;
for (int32_t i = 0; i < pCtx->param[0].i64; ++i) {
pSampleFuncInfo->res[i] = (tValuePair*) tmp;
pSampleFuncInfo->res[i]->pTags = tmp + sizeof(tValuePair);
tmp += size;
}
}
static bool sample_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { static bool sample_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) {
if (!function_setup(pCtx, pResInfo)) { if (!function_setup(pCtx, pResInfo)) {
return false; return false;
} }
SSampleFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo);
buildSampleFuncStruct(pRes, pCtx);
srand(taosSafeRand()); srand(taosSafeRand());
pRes->notNullElems = 0;
SSampleFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo);
pRes->totalPoints = 0;
pRes->numSampled = 0;
pRes->values = (tVariant*) ((char*)pRes + sizeof(SSampleFuncInfo));
pRes->timeStamps = (int64_t*)((char*)pRes->values + sizeof(tVariant) * pCtx->param[0].i64);
return true; return true;
} }
...@@ -5014,10 +4732,9 @@ static void sample_function(SQLFunctionCtx *pCtx) { ...@@ -5014,10 +4732,9 @@ static void sample_function(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SSampleFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo); SSampleFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo);
assert(pRes->num >= 0); if (pRes->values != (tVariant*) ((char*)pRes + sizeof(SSampleFuncInfo))) {
pRes->values = (tVariant*) ((char*)pRes + sizeof(SSampleFuncInfo));
if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(SSampleFuncInfo) + POINTER_BYTES * pCtx->param[0].i64)) { pRes->timeStamps = (int64_t*)((char*)pRes->values + sizeof(tVariant) * pCtx->param[0].i64);
buildSampleFuncStruct(pRes, pCtx);
} }
for (int32_t i = 0; i < pCtx->size; ++i) { for (int32_t i = 0; i < pCtx->size; ++i) {
...@@ -5028,9 +4745,8 @@ static void sample_function(SQLFunctionCtx *pCtx) { ...@@ -5028,9 +4745,8 @@ static void sample_function(SQLFunctionCtx *pCtx) {
notNullElems++; notNullElems++;
// NOTE: Set the default timestamp if it is missing [todo refactor]
TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0; TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0;
do_sample_function_add(pRes, (int32_t)pCtx->param[0].i64, data, ts, pCtx->inputType, &pCtx->tagInfo, NULL); do_reservoir_sample(pRes, (int32_t)pCtx->param[0].i64, ts, data, pCtx->inputType, pCtx->inputBytes);
} }
if (!pCtx->hasNull) { if (!pCtx->hasNull) {
...@@ -5050,14 +4766,17 @@ static void sample_func_finalizer(SQLFunctionCtx *pCtx) { ...@@ -5050,14 +4766,17 @@ static void sample_func_finalizer(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SSampleFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo); SSampleFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo);
if (pRes->num == 0) { // no result if (pRes->numSampled == 0) { // no result
assert(pResInfo->hasResult != DATA_SET_FLAG); assert(pResInfo->hasResult != DATA_SET_FLAG);
} }
GET_TRUE_DATA_TYPE(); GET_TRUE_DATA_TYPE();
copySampleFuncRes(pCtx, type); copySampleFuncRes(pCtx, type);
for (int32_t i = 0; i < pRes->numSampled; ++i) {
tVariantDestroy(pRes->values + i);
}
doFinalizer(pCtx); doFinalizer(pCtx);
} }
...@@ -5079,8 +4798,10 @@ int32_t functionCompatList[] = { ...@@ -5079,8 +4798,10 @@ int32_t functionCompatList[] = {
4, -1, -1, 1, 1, 1, 1, 1, 1, -1, 4, -1, -1, 1, 1, 1, 1, 1, 1, -1,
// tag, colprj, tagprj, arithm, diff, first_dist, last_dist, stddev_dst, interp rate, irate // tag, colprj, tagprj, arithm, diff, first_dist, last_dist, stddev_dst, interp rate, irate
1, 1, 1, 1, -1, 1, 1, 1, 5, 1, 1, 1, 1, 1, 1, -1, 1, 1, 1, 5, 1, 1,
// tid_tag, deriv, ceil, floor, round, csum, mavg, sample, block_info // tid_tag, deriv, ceil, floor, round, csum, mavg, sample,
6, 8, 1, 1, 1, -1, -1, -1, 7 6, 8, 1, 1, 1, -1, -1, -1,
// block_info
7
}; };
SAggFunctionInfo aAggs[] = {{ SAggFunctionInfo aAggs[] = {{
...@@ -5544,7 +5265,7 @@ SAggFunctionInfo aAggs[] = {{ ...@@ -5544,7 +5265,7 @@ SAggFunctionInfo aAggs[] = {{
"sample", "sample",
TSDB_FUNC_SAMPLE, TSDB_FUNC_SAMPLE,
TSDB_FUNC_INVALID_ID, TSDB_FUNC_INVALID_ID,
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY, TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY,
sample_function_setup, sample_function_setup,
sample_function, sample_function,
sample_func_finalizer, sample_func_finalizer,
......
...@@ -2060,7 +2060,7 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr ...@@ -2060,7 +2060,7 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr
int32_t functionId = pCtx->functionId; int32_t functionId = pCtx->functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
int32_t f = pExpr[0].base.functionId; int32_t f = pExpr[i-1].base.functionId;
assert(f == TSDB_FUNC_TS || f == TSDB_FUNC_TS_DUMMY); assert(f == TSDB_FUNC_TS || f == TSDB_FUNC_TS_DUMMY);
pCtx->param[2].i64 = pQueryAttr->order.order; pCtx->param[2].i64 = pQueryAttr->order.order;
...@@ -3653,7 +3653,8 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i ...@@ -3653,7 +3653,8 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i
// set the timestamp output buffer for top/bottom/diff query // set the timestamp output buffer for top/bottom/diff query
int32_t fid = pCtx[i].functionId; int32_t fid = pCtx[i].functionId;
if (fid == TSDB_FUNC_TOP || fid == TSDB_FUNC_BOTTOM || fid == TSDB_FUNC_DIFF || fid == TSDB_FUNC_DERIVATIVE) { if (fid == TSDB_FUNC_TOP || fid == TSDB_FUNC_BOTTOM || fid == TSDB_FUNC_DIFF || fid == TSDB_FUNC_DERIVATIVE ||
fid == TSDB_FUNC_SAMPLE || fid == TSDB_FUNC_MAVG || fid == TSDB_FUNC_CSUM) {
if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput; if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput;
} }
} }
...@@ -3690,7 +3691,10 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf ...@@ -3690,7 +3691,10 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf
// set the correct pointer after the memory buffer reallocated. // set the correct pointer after the memory buffer reallocated.
int32_t functionId = pBInfo->pCtx[i].functionId; int32_t functionId = pBInfo->pCtx[i].functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) { if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM ||
functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE ||
functionId == TSDB_FUNC_CSUM || functionId == TSDB_FUNC_MAVG ||
functionId == TSDB_FUNC_SAMPLE ) {
if (i > 0) pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[i-1].pOutput; if (i > 0) pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[i-1].pOutput;
} }
} }
...@@ -3702,7 +3706,9 @@ void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput) ...@@ -3702,7 +3706,9 @@ void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput)
char *src = NULL; char *src = NULL;
for (int32_t i = 0; i < numOfOutput; i++) { for (int32_t i = 0; i < numOfOutput; i++) {
int32_t functionId = pCtx[i].functionId; int32_t functionId = pCtx[i].functionId;
if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) { if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE ||
functionId == TSDB_FUNC_MAVG || functionId == TSDB_FUNC_CSUM ||
functionId == TSDB_FUNC_SAMPLE) {
needCopyTs = true; needCopyTs = true;
if (i > 0 && pCtx[i-1].functionId == TSDB_FUNC_TS_DUMMY){ if (i > 0 && pCtx[i-1].functionId == TSDB_FUNC_TS_DUMMY){
SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, i - 1); // find ts data SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, i - 1); // find ts data
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册