未验证 提交 42bbe9d9 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #8735 from taosdata/szhou/hotfix/tbname-group-limit

TD-11087:fix csum/diff/derivate group by tbname limit bugs
......@@ -233,7 +233,7 @@ static int32_t tscFlushTmpBufferImpl(tExtMemBuffer *pMemoryBuf, tOrderDescriptor
// sort before flush to disk, the data must be consecutively put on tFilePage.
if (pDesc->orderInfo.numOfCols > 0) {
tColDataQSort(pDesc, (int32_t)pPage->num, 0, (int32_t)pPage->num - 1, pPage->data, orderType);
tColDataMergeSort(pDesc, (int32_t)pPage->num, 0, (int32_t)pPage->num - 1, pPage->data, orderType);
}
#ifdef _DEBUG_VIEW
......@@ -364,7 +364,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SQueryInfo*
SExprInfo* pExprInfo = tscExprGet(pQueryInfo, j);
int32_t functionId = pExprInfo->base.functionId;
if (pColIndex->colId == pExprInfo->base.colInfo.colId && (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TAG)) {
if (pColIndex->colId == pExprInfo->base.colInfo.colId && (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ)) {
orderColIndexList[i] = j;
break;
}
......
......@@ -268,10 +268,6 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
functionId != TSDB_FUNC_TS &&
functionId != TSDB_FUNC_ARITHM &&
functionId != TSDB_FUNC_TS_COMP &&
functionId != TSDB_FUNC_DIFF &&
functionId != TSDB_FUNC_DERIVATIVE &&
functionId != TSDB_FUNC_MAVG &&
functionId != TSDB_FUNC_CSUM &&
functionId != TSDB_FUNC_TS_DUMMY &&
functionId != TSDB_FUNC_TID_TAG &&
functionId != TSDB_FUNC_CEIL &&
......
......@@ -236,6 +236,9 @@ typedef int (*__col_compar_fn_t)(tOrderDescriptor *, int32_t numOfRows, int32_t
void tColDataQSort(tOrderDescriptor *, int32_t numOfRows, int32_t start, int32_t end, char *data, int32_t orderType);
void tColDataMergeSort(tOrderDescriptor *, int32_t numOfRows, int32_t start, int32_t end, char *data, int32_t orderType);
void taoscQSort(void** pCols, SSchema* pSchema, int32_t numOfCols, int32_t numOfRows, int32_t index, __compar_fn_t compareFn);
int32_t compare_sa(tOrderDescriptor *, int32_t numOfRows, int32_t idx1, int32_t idx2, char *data);
......
......@@ -196,6 +196,14 @@ typedef struct {
char *taglists;
} SSampleFuncInfo;
typedef struct {
bool valueAssigned;
union {
int64_t i64Prev;
double d64Prev;
};
} SDiffFuncInfo;
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
int32_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo) {
if (!isValidDataType(dataType)) {
......@@ -214,10 +222,13 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
if (functionId == TSDB_FUNC_INTERP) {
*interBytes = sizeof(SInterpInfoDetail);
} else if (functionId == TSDB_FUNC_DIFF) {
*interBytes = sizeof(SDiffFuncInfo);
} else {
*interBytes = 0;
}
return TSDB_CODE_SUCCESS;
}
......@@ -2984,18 +2995,16 @@ static void full_copy_function(SQLFunctionCtx *pCtx) {
}
}
enum {
INITIAL_VALUE_NOT_ASSIGNED = 0,
};
static bool diff_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) {
if (!function_setup(pCtx, pResInfo)) {
return false;
}
// diff function require the value is set to -1
pCtx->param[1].nType = INITIAL_VALUE_NOT_ASSIGNED;
return false;
SDiffFuncInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
pDiffInfo->valueAssigned = false;
pDiffInfo->i64Prev = 0;
return true;
}
static bool deriv_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo) {
......@@ -3201,22 +3210,14 @@ static void deriv_function(SQLFunctionCtx *pCtx) {
GET_RES_INFO(pCtx)->numOfRes += notNullElems;
}
#define DIFF_IMPL(ctx, d, type) \
do { \
if ((ctx)->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { \
(ctx)->param[1].nType = (ctx)->inputType; \
*(type *)&(ctx)->param[1].i64 = *(type *)(d); \
} else { \
*(type *)(ctx)->pOutput = *(type *)(d) - (*(type *)(&(ctx)->param[1].i64)); \
*(type *)(&(ctx)->param[1].i64) = *(type *)(d); \
*(int64_t *)(ctx)->ptsOutputBuf = GET_TS_DATA(ctx, index); \
} \
} while (0);
// TODO difference in date column
static void diff_function(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SDiffFuncInfo *pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
void *data = GET_INPUT_DATA_LIST(pCtx);
bool isFirstBlock = (pCtx->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED);
bool isFirstBlock = (pDiffInfo->valueAssigned == false);
int32_t notNullElems = 0;
......@@ -3236,15 +3237,15 @@ static void diff_function(SQLFunctionCtx *pCtx) {
continue;
}
if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
*pOutput = (int32_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null
if (pDiffInfo->valueAssigned) {
*pOutput = (int32_t)(pData[i] - pDiffInfo->i64Prev); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0;
pOutput += 1;
pTimestamp += 1;
}
pCtx->param[1].i64 = pData[i];
pCtx->param[1].nType = pCtx->inputType;
pDiffInfo->i64Prev = pData[i];
pDiffInfo->valueAssigned = true;
notNullElems++;
}
break;
......@@ -3258,15 +3259,15 @@ static void diff_function(SQLFunctionCtx *pCtx) {
continue;
}
if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
*pOutput = pData[i] - pCtx->param[1].i64; // direct previous may be null
if (pDiffInfo->valueAssigned) {
*pOutput = pData[i] - pDiffInfo->i64Prev; // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0;
pOutput += 1;
pTimestamp += 1;
}
pCtx->param[1].i64 = pData[i];
pCtx->param[1].nType = pCtx->inputType;
pDiffInfo->i64Prev = pData[i];
pDiffInfo->valueAssigned = true;
notNullElems++;
}
break;
......@@ -3280,15 +3281,15 @@ static void diff_function(SQLFunctionCtx *pCtx) {
continue;
}
if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
SET_DOUBLE_VAL(pOutput, pData[i] - pCtx->param[1].dKey); // direct previous may be null
if (pDiffInfo->valueAssigned) { // initial value is not set yet
SET_DOUBLE_VAL(pOutput, pData[i] - pDiffInfo->d64Prev); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0;
pOutput += 1;
pTimestamp += 1;
}
pCtx->param[1].dKey = pData[i];
pCtx->param[1].nType = pCtx->inputType;
pDiffInfo->d64Prev = pData[i];
pDiffInfo->valueAssigned = true;
notNullElems++;
}
break;
......@@ -3302,15 +3303,15 @@ static void diff_function(SQLFunctionCtx *pCtx) {
continue;
}
if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
*pOutput = (float)(pData[i] - pCtx->param[1].dKey); // direct previous may be null
if (pDiffInfo->valueAssigned) { // initial value is not set yet
*pOutput = (float)(pData[i] - pDiffInfo->d64Prev); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0;
pOutput += 1;
pTimestamp += 1;
}
pCtx->param[1].dKey = pData[i];
pCtx->param[1].nType = pCtx->inputType;
pDiffInfo->d64Prev = pData[i];
pDiffInfo->valueAssigned = true;
notNullElems++;
}
break;
......@@ -3324,15 +3325,15 @@ static void diff_function(SQLFunctionCtx *pCtx) {
continue;
}
if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
*pOutput = (int16_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null
if (pDiffInfo->valueAssigned) { // initial value is not set yet
*pOutput = (int16_t)(pData[i] - pDiffInfo->i64Prev); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0;
pOutput += 1;
pTimestamp += 1;
}
pCtx->param[1].i64 = pData[i];
pCtx->param[1].nType = pCtx->inputType;
pDiffInfo->i64Prev = pData[i];
pDiffInfo->valueAssigned = true;
notNullElems++;
}
break;
......@@ -3347,15 +3348,15 @@ static void diff_function(SQLFunctionCtx *pCtx) {
continue;
}
if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
*pOutput = (int8_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null
if (pDiffInfo->valueAssigned) { // initial value is not set yet
*pOutput = (int8_t)(pData[i] - pDiffInfo->i64Prev); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0;
pOutput += 1;
pTimestamp += 1;
}
pCtx->param[1].i64 = pData[i];
pCtx->param[1].nType = pCtx->inputType;
pDiffInfo->i64Prev = pData[i];
pDiffInfo->valueAssigned = true;
notNullElems++;
}
break;
......@@ -3365,7 +3366,7 @@ static void diff_function(SQLFunctionCtx *pCtx) {
}
// initial value is not set yet
if (pCtx->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED || notNullElems <= 0) {
if (!pDiffInfo->valueAssigned || notNullElems <= 0) {
/*
* 1. current block and blocks before are full of null
* 2. current block may be null value
......
......@@ -641,6 +641,89 @@ static UNUSED_FUNC void tRowModelDisplay(tOrderDescriptor *pDescriptor, int32_t
printf("\n");
}
static void mergeSortIndicesByOrderColumns(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t start, int32_t end, char *data,
int32_t orderType, __col_compar_fn_t compareFn, int32_t* indices, int32_t* aux) {
if (end <= start) {
return;
}
int32_t mid = start + (end-start)/2;
mergeSortIndicesByOrderColumns(pDescriptor, numOfRows, start, mid, data, orderType, compareFn, indices, aux);
mergeSortIndicesByOrderColumns(pDescriptor, numOfRows, mid+1, end, data, orderType, compareFn, indices, aux);
int32_t left = start;
int32_t right = mid + 1;
int32_t k;
for (k = start; k <= end; ++k) {
if (left == mid+1) {
aux[k] = indices[right];
++right;
} else if (right == end+1) {
aux[k] = indices[left];
++left;
} else {
int32_t ret = compareFn(pDescriptor, numOfRows, indices[left], indices[right], data);
if (ret <= 0) {
aux[k] = indices[left];
++left;
} else {
aux[k] = indices[right];
++right;
}
}
}
for (k = start; k <= end; ++k) {
indices[k] = aux[k];
}
}
static void columnwiseMergeSortImpl(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t start, int32_t end, char* data,
int32_t orderType, __col_compar_fn_t compareFn) {
int32_t* indices = malloc(numOfRows * sizeof(int32_t));
int32_t* aux = malloc(numOfRows * sizeof(int32_t));
for (int32_t i = 0; i < numOfRows; ++i) {
indices[i] = i;
}
mergeSortIndicesByOrderColumns(pDescriptor, numOfRows, 0, numOfRows-1, data, orderType, compareFn, indices, aux);
int32_t numOfCols = pDescriptor->pColumnModel->numOfCols;
int32_t prevLength = 0;
char* p = NULL;
for(int32_t i = 0; i < numOfCols; ++i) {
int16_t colOffset = getColumnModelOffset(pDescriptor->pColumnModel, i);
int32_t colBytes = pDescriptor->pColumnModel->pFields[i].field.bytes;
// make sure memory buffer is enough
if (prevLength < colBytes) {
char *tmp = realloc(p, colBytes * numOfRows);
assert(tmp);
p = tmp;
prevLength = colBytes;
}
char* colData = data + colOffset * numOfRows;
memcpy(p, colData, colBytes * numOfRows);
for(int32_t j = 0; j < numOfRows; ++j){
char* dest = colData + colBytes * j;
int32_t newPos = indices[j];
char* src = p + (newPos * colBytes);
memcpy(dest, src, colBytes);
}
}
tfree(p);
tfree(aux);
tfree(indices);
}
static void columnwiseQSortImpl(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t start, int32_t end, char *data,
int32_t orderType, __col_compar_fn_t compareFn, void* buf) {
#ifdef _DEBUG_VIEW
......@@ -742,9 +825,35 @@ static void columnwiseQSortImpl(tOrderDescriptor *pDescriptor, int32_t numOfRows
}
}
void tColDataQSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t start, int32_t end, char *data, int32_t order) {
void tColDataQSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t start, int32_t end, char *data, int32_t orderType) {
// short array sort, incur another sort procedure instead of quick sort process
__col_compar_fn_t compareFn = (orderType == TSDB_ORDER_ASC) ? compare_sa : compare_sd;
SColumnModel* pModel = pDescriptor->pColumnModel;
size_t width = 0;
for(int32_t i = 0; i < pModel->numOfCols; ++i) {
SSchema1* pSchema = &pModel->pFields[i].field;
if (width < pSchema->bytes) {
width = pSchema->bytes;
}
}
char* buf = malloc(width);
assert(width > 0 && buf != NULL);
if (end - start + 1 <= 8) {
tColDataInsertSort(pDescriptor, numOfRows, start, end, data, compareFn, buf);
} else {
columnwiseQSortImpl(pDescriptor, numOfRows, start, end, data, orderType, compareFn, buf);
}
free(buf);
}
void tColDataMergeSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t start, int32_t end, char *data, int32_t orderType) {
// short array sort, incur another sort procedure instead of quick sort process
__col_compar_fn_t compareFn = (order == TSDB_ORDER_ASC) ? compare_sa : compare_sd;
__col_compar_fn_t compareFn = (orderType == TSDB_ORDER_ASC) ? compare_sa : compare_sd;
SColumnModel* pModel = pDescriptor->pColumnModel;
......@@ -762,12 +871,14 @@ void tColDataQSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta
if (end - start + 1 <= 8) {
tColDataInsertSort(pDescriptor, numOfRows, start, end, data, compareFn, buf);
} else {
columnwiseQSortImpl(pDescriptor, numOfRows, start, end, data, order, compareFn, buf);
columnwiseMergeSortImpl(pDescriptor, numOfRows, start, end, data, orderType, compareFn);
}
free(buf);
}
/*
* deep copy of sschema
*/
......
......@@ -38,6 +38,7 @@ run general/compute/stddev.sim
run general/compute/sum.sim
run general/compute/top.sim
run general/compute/block_dist.sim
run general/compute/table_group.sim
run general/db/alter_option.sim
run general/db/alter_tables_d2.sim
run general/db/alter_tables_v1.sim
......
此差异已折叠。
......@@ -20,3 +20,4 @@ run general/compute/stddev.sim
run general/compute/sum.sim
run general/compute/top.sim
run general/compute/block_dist.sim
run general/compute/table_group.sim
......@@ -38,6 +38,7 @@ run general/compute/stddev.sim
run general/compute/sum.sim
run general/compute/top.sim
run general/compute/block_dist.sim
run general/compute/table_group.sim
run general/db/alter_option.sim
run general/db/alter_tables_d2.sim
run general/db/alter_tables_v1.sim
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册