diff --git a/src/client/src/tscGlobalmerge.c b/src/client/src/tscGlobalmerge.c index 8e0ca0ab328de6f1b765e3e1f7ea457438cb9cae..f84ed0af9ba0979dad42b17958a715790214df13 100644 --- a/src/client/src/tscGlobalmerge.c +++ b/src/client/src/tscGlobalmerge.c @@ -233,7 +233,7 @@ static int32_t tscFlushTmpBufferImpl(tExtMemBuffer *pMemoryBuf, tOrderDescriptor // sort before flush to disk, the data must be consecutively put on tFilePage. if (pDesc->orderInfo.numOfCols > 0) { - tColDataQSort(pDesc, (int32_t)pPage->num, 0, (int32_t)pPage->num - 1, pPage->data, orderType); + tColDataSort(pDesc, (int32_t)pPage->num, 0, (int32_t)pPage->num - 1, pPage->data, orderType, true); } #ifdef _DEBUG_VIEW @@ -364,7 +364,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SQueryInfo* SExprInfo* pExprInfo = tscExprGet(pQueryInfo, j); int32_t functionId = pExprInfo->base.functionId; - if (pColIndex->colId == pExprInfo->base.colInfo.colId && (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TAG)) { + if (pColIndex->colId == pExprInfo->base.colInfo.colId && (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ)) { orderColIndexList[i] = j; break; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 50b9a8fd7eea703dffe34f8ad5eb204b310f0d7e..5489f3ba67d87a9edd73961bcae1440698fe6b4f 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -268,10 +268,6 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) { functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_ARITHM && functionId != TSDB_FUNC_TS_COMP && - functionId != TSDB_FUNC_DIFF && - functionId != TSDB_FUNC_DERIVATIVE && - functionId != TSDB_FUNC_MAVG && - functionId != TSDB_FUNC_CSUM && functionId != TSDB_FUNC_TS_DUMMY && functionId != TSDB_FUNC_TID_TAG && functionId != TSDB_FUNC_CEIL && diff --git a/src/query/inc/qExtbuffer.h b/src/query/inc/qExtbuffer.h index d4a9ed0cbcef0a52085dcd60569270037fb57908..bce90ae732a0501f8525fa6826d4213e801cb7e8 100644 --- a/src/query/inc/qExtbuffer.h +++ b/src/query/inc/qExtbuffer.h @@ -227,7 +227,7 @@ void tColModelAppend(SColumnModel *dstModel, tFilePage *dstPage, void *srcData, typedef int (*__col_compar_fn_t)(tOrderDescriptor *, int32_t numOfRows, int32_t idx1, int32_t idx2, char *data); -void tColDataQSort(tOrderDescriptor *, int32_t numOfRows, int32_t start, int32_t end, char *data, int32_t orderType); +void tColDataSort(tOrderDescriptor *, int32_t numOfRows, int32_t start, int32_t end, char *data, int32_t orderType, bool stableSort); void taoscQSort(void** pCols, SSchema* pSchema, int32_t numOfCols, int32_t numOfRows, int32_t index, __compar_fn_t compareFn); diff --git a/src/query/src/qExtbuffer.c b/src/query/src/qExtbuffer.c index 5994099a0d1ad6b1a87aa19edb6151680128f6df..ea1c8bf8a1dab605837d8ea840b076c36f96a729 100644 --- a/src/query/src/qExtbuffer.c +++ b/src/query/src/qExtbuffer.c @@ -641,6 +641,89 @@ static UNUSED_FUNC void tRowModelDisplay(tOrderDescriptor *pDescriptor, int32_t printf("\n"); } +static void mergeSortIndicesByOrderColumns(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t start, int32_t end, char *data, + int32_t orderType, __col_compar_fn_t compareFn, int32_t* indices, int32_t* aux) { + if (end <= start) { + return; + } + + + int32_t mid = start + (end-start)/2; + mergeSortIndicesByOrderColumns(pDescriptor, numOfRows, start, mid, data, orderType, compareFn, indices, aux); + mergeSortIndicesByOrderColumns(pDescriptor, numOfRows, mid+1, end, data, orderType, compareFn, indices, aux); + int32_t left = start; + int32_t right = mid + 1; + int32_t k; + for (k = start; k <= end; ++k) { + if (left == mid+1) { + aux[k] = indices[right]; + ++right; + } else if (right == end+1) { + aux[k] = indices[left]; + ++left; + } else { + int32_t ret = compareFn(pDescriptor, numOfRows, indices[left], indices[right], data); + if (ret <= 0) { + aux[k] = indices[left]; + ++left; + } else { + aux[k] = indices[right]; + ++right; + } + } + } + + for (k = start; k <= end; ++k) { + indices[k] = aux[k]; + } +} + +static void columnwiseMergeSortImpl(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t start, int32_t end, char* data, + int32_t orderType, __col_compar_fn_t compareFn) { + int32_t* indices = malloc(numOfRows * sizeof(int32_t)); + int32_t* aux = malloc(numOfRows * sizeof(int32_t)); + + for (int32_t i = 0; i <= numOfRows; ++i) { + indices[i] = i; + } + + mergeSortIndicesByOrderColumns(pDescriptor, numOfRows, 0, numOfRows-1, data, orderType, compareFn, indices, aux); + + int32_t numOfCols = pDescriptor->pColumnModel->numOfCols; + + int32_t prevLength = 0; + char* p = NULL; + + for(int32_t i = 0; i < numOfCols; ++i) { + int16_t colOffset = getColumnModelOffset(pDescriptor->pColumnModel, i); + int32_t colBytes = pDescriptor->pColumnModel->pFields[i].field.bytes; + // make sure memory buffer is enough + if (prevLength < colBytes) { + char *tmp = realloc(p, colBytes * numOfRows); + assert(tmp); + + p = tmp; + prevLength = colBytes; + } + + char* colData = data + colOffset * numOfRows; + memcpy(p, colData, colBytes * numOfRows); + + for(int32_t j = 0; j < numOfRows; ++j){ + char* dest = colData + colBytes * j; + + int32_t newPos = indices[j]; + char* src = p + (newPos * colBytes); + memcpy(dest, src, colBytes); + } + + } + + tfree(p); + tfree(aux); + tfree(indices); +} + static void columnwiseQSortImpl(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t start, int32_t end, char *data, int32_t orderType, __col_compar_fn_t compareFn, void* buf) { #ifdef _DEBUG_VIEW @@ -742,9 +825,9 @@ static void columnwiseQSortImpl(tOrderDescriptor *pDescriptor, int32_t numOfRows } } -void tColDataQSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t start, int32_t end, char *data, int32_t order) { +void tColDataSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t start, int32_t end, char *data, int32_t orderType, bool stableSort) { // short array sort, incur another sort procedure instead of quick sort process - __col_compar_fn_t compareFn = (order == TSDB_ORDER_ASC) ? compare_sa : compare_sd; + __col_compar_fn_t compareFn = (orderType == TSDB_ORDER_ASC) ? compare_sa : compare_sd; SColumnModel* pModel = pDescriptor->pColumnModel; @@ -761,8 +844,10 @@ void tColDataQSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta if (end - start + 1 <= 8) { tColDataInsertSort(pDescriptor, numOfRows, start, end, data, compareFn, buf); + } else if (stableSort) { + columnwiseMergeSortImpl(pDescriptor, numOfRows, start, end, data, orderType, compareFn); } else { - columnwiseQSortImpl(pDescriptor, numOfRows, start, end, data, order, compareFn, buf); + columnwiseQSortImpl(pDescriptor, numOfRows, start, end, data, orderType, compareFn, buf); } free(buf);