提交 01fb092e 编写于 作者: S shenglian zhou

stable sort to merge data from one vnode and make diff/derivate/csum/mavg not...

stable sort to merge data from one vnode and make diff/derivate/csum/mavg not project query on stable
上级 5b379d5a
...@@ -233,7 +233,7 @@ static int32_t tscFlushTmpBufferImpl(tExtMemBuffer *pMemoryBuf, tOrderDescriptor ...@@ -233,7 +233,7 @@ static int32_t tscFlushTmpBufferImpl(tExtMemBuffer *pMemoryBuf, tOrderDescriptor
// sort before flush to disk, the data must be consecutively put on tFilePage. // sort before flush to disk, the data must be consecutively put on tFilePage.
if (pDesc->orderInfo.numOfCols > 0) { if (pDesc->orderInfo.numOfCols > 0) {
tColDataQSort(pDesc, (int32_t)pPage->num, 0, (int32_t)pPage->num - 1, pPage->data, orderType); tColDataSort(pDesc, (int32_t)pPage->num, 0, (int32_t)pPage->num - 1, pPage->data, orderType, true);
} }
#ifdef _DEBUG_VIEW #ifdef _DEBUG_VIEW
...@@ -364,7 +364,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SQueryInfo* ...@@ -364,7 +364,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SQueryInfo*
SExprInfo* pExprInfo = tscExprGet(pQueryInfo, j); SExprInfo* pExprInfo = tscExprGet(pQueryInfo, j);
int32_t functionId = pExprInfo->base.functionId; int32_t functionId = pExprInfo->base.functionId;
if (pColIndex->colId == pExprInfo->base.colInfo.colId && (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TAG)) { if (pColIndex->colId == pExprInfo->base.colInfo.colId && (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ)) {
orderColIndexList[i] = j; orderColIndexList[i] = j;
break; break;
} }
......
...@@ -268,10 +268,6 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) { ...@@ -268,10 +268,6 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_TS &&
functionId != TSDB_FUNC_ARITHM && functionId != TSDB_FUNC_ARITHM &&
functionId != TSDB_FUNC_TS_COMP && functionId != TSDB_FUNC_TS_COMP &&
functionId != TSDB_FUNC_DIFF &&
functionId != TSDB_FUNC_DERIVATIVE &&
functionId != TSDB_FUNC_MAVG &&
functionId != TSDB_FUNC_CSUM &&
functionId != TSDB_FUNC_TS_DUMMY && functionId != TSDB_FUNC_TS_DUMMY &&
functionId != TSDB_FUNC_TID_TAG && functionId != TSDB_FUNC_TID_TAG &&
functionId != TSDB_FUNC_CEIL && functionId != TSDB_FUNC_CEIL &&
......
...@@ -227,7 +227,7 @@ void tColModelAppend(SColumnModel *dstModel, tFilePage *dstPage, void *srcData, ...@@ -227,7 +227,7 @@ void tColModelAppend(SColumnModel *dstModel, tFilePage *dstPage, void *srcData,
typedef int (*__col_compar_fn_t)(tOrderDescriptor *, int32_t numOfRows, int32_t idx1, int32_t idx2, char *data); typedef int (*__col_compar_fn_t)(tOrderDescriptor *, int32_t numOfRows, int32_t idx1, int32_t idx2, char *data);
void tColDataQSort(tOrderDescriptor *, int32_t numOfRows, int32_t start, int32_t end, char *data, int32_t orderType); void tColDataSort(tOrderDescriptor *, int32_t numOfRows, int32_t start, int32_t end, char *data, int32_t orderType, bool stableSort);
void taoscQSort(void** pCols, SSchema* pSchema, int32_t numOfCols, int32_t numOfRows, int32_t index, __compar_fn_t compareFn); void taoscQSort(void** pCols, SSchema* pSchema, int32_t numOfCols, int32_t numOfRows, int32_t index, __compar_fn_t compareFn);
......
...@@ -641,6 +641,89 @@ static UNUSED_FUNC void tRowModelDisplay(tOrderDescriptor *pDescriptor, int32_t ...@@ -641,6 +641,89 @@ static UNUSED_FUNC void tRowModelDisplay(tOrderDescriptor *pDescriptor, int32_t
printf("\n"); printf("\n");
} }
static void mergeSortIndicesByOrderColumns(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t start, int32_t end, char *data,
int32_t orderType, __col_compar_fn_t compareFn, int32_t* indices, int32_t* aux) {
if (end <= start) {
return;
}
int32_t mid = start + (end-start)/2;
mergeSortIndicesByOrderColumns(pDescriptor, numOfRows, start, mid, data, orderType, compareFn, indices, aux);
mergeSortIndicesByOrderColumns(pDescriptor, numOfRows, mid+1, end, data, orderType, compareFn, indices, aux);
int32_t left = start;
int32_t right = mid + 1;
int32_t k;
for (k = start; k <= end; ++k) {
if (left == mid+1) {
aux[k] = indices[right];
++right;
} else if (right == end+1) {
aux[k] = indices[left];
++left;
} else {
int32_t ret = compareFn(pDescriptor, numOfRows, indices[left], indices[right], data);
if (ret <= 0) {
aux[k] = indices[left];
++left;
} else {
aux[k] = indices[right];
++right;
}
}
}
for (k = start; k <= end; ++k) {
indices[k] = aux[k];
}
}
static void columnwiseMergeSortImpl(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t start, int32_t end, char* data,
int32_t orderType, __col_compar_fn_t compareFn) {
int32_t* indices = malloc(numOfRows * sizeof(int32_t));
int32_t* aux = malloc(numOfRows * sizeof(int32_t));
for (int32_t i = 0; i <= numOfRows; ++i) {
indices[i] = i;
}
mergeSortIndicesByOrderColumns(pDescriptor, numOfRows, 0, numOfRows-1, data, orderType, compareFn, indices, aux);
int32_t numOfCols = pDescriptor->pColumnModel->numOfCols;
int32_t prevLength = 0;
char* p = NULL;
for(int32_t i = 0; i < numOfCols; ++i) {
int16_t colOffset = getColumnModelOffset(pDescriptor->pColumnModel, i);
int32_t colBytes = pDescriptor->pColumnModel->pFields[i].field.bytes;
// make sure memory buffer is enough
if (prevLength < colBytes) {
char *tmp = realloc(p, colBytes * numOfRows);
assert(tmp);
p = tmp;
prevLength = colBytes;
}
char* colData = data + colOffset * numOfRows;
memcpy(p, colData, colBytes * numOfRows);
for(int32_t j = 0; j < numOfRows; ++j){
char* dest = colData + colBytes * j;
int32_t newPos = indices[j];
char* src = p + (newPos * colBytes);
memcpy(dest, src, colBytes);
}
}
tfree(p);
tfree(aux);
tfree(indices);
}
static void columnwiseQSortImpl(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t start, int32_t end, char *data, static void columnwiseQSortImpl(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t start, int32_t end, char *data,
int32_t orderType, __col_compar_fn_t compareFn, void* buf) { int32_t orderType, __col_compar_fn_t compareFn, void* buf) {
#ifdef _DEBUG_VIEW #ifdef _DEBUG_VIEW
...@@ -742,9 +825,9 @@ static void columnwiseQSortImpl(tOrderDescriptor *pDescriptor, int32_t numOfRows ...@@ -742,9 +825,9 @@ static void columnwiseQSortImpl(tOrderDescriptor *pDescriptor, int32_t numOfRows
} }
} }
void tColDataQSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t start, int32_t end, char *data, int32_t order) { void tColDataSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t start, int32_t end, char *data, int32_t orderType, bool stableSort) {
// short array sort, incur another sort procedure instead of quick sort process // short array sort, incur another sort procedure instead of quick sort process
__col_compar_fn_t compareFn = (order == TSDB_ORDER_ASC) ? compare_sa : compare_sd; __col_compar_fn_t compareFn = (orderType == TSDB_ORDER_ASC) ? compare_sa : compare_sd;
SColumnModel* pModel = pDescriptor->pColumnModel; SColumnModel* pModel = pDescriptor->pColumnModel;
...@@ -761,8 +844,10 @@ void tColDataQSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta ...@@ -761,8 +844,10 @@ void tColDataQSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta
if (end - start + 1 <= 8) { if (end - start + 1 <= 8) {
tColDataInsertSort(pDescriptor, numOfRows, start, end, data, compareFn, buf); tColDataInsertSort(pDescriptor, numOfRows, start, end, data, compareFn, buf);
} else if (stableSort) {
columnwiseMergeSortImpl(pDescriptor, numOfRows, start, end, data, orderType, compareFn);
} else { } else {
columnwiseQSortImpl(pDescriptor, numOfRows, start, end, data, order, compareFn, buf); columnwiseQSortImpl(pDescriptor, numOfRows, start, end, data, orderType, compareFn, buf);
} }
free(buf); free(buf);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册