提交 7466d787 编写于 作者: H Haojun Liao

[td-5573]<enhance>: support order clause in outer query.

上级 8ad04a51
......@@ -227,6 +227,8 @@ typedef int (*__col_compar_fn_t)(tOrderDescriptor *, int32_t numOfRows, int32_t
void tColDataQSort(tOrderDescriptor *, int32_t numOfRows, int32_t start, int32_t end, char *data, int32_t orderType);
void taoscQSort(void** pCols, SSchema* pSchema, int32_t numOfCols, int32_t numOfRows, int32_t index, __compar_fn_t compareFn);
int32_t compare_sa(tOrderDescriptor *, int32_t numOfRows, int32_t idx1, int32_t idx2, char *data);
int32_t compare_sd(tOrderDescriptor *, int32_t numOfRows, int32_t idx1, int32_t idx2, char *data);
......
......@@ -5214,7 +5214,76 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx
return pOperator;
}
SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows) {
static int32_t doMergeSDatablock(SSDataBlock* pDest, SSDataBlock* pSrc) {
assert(pSrc != NULL && pDest != NULL && pDest->info.numOfCols == pSrc->info.numOfCols);
int32_t numOfCols = pSrc->info.numOfCols;
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock->pData, i);
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock->pData, i);
int32_t newSize = (pDest->info.rows + pSrc->info.rows) * pCol2->info.bytes;
char* tmp = realloc(pCol2->pData, newSize);
if (tmp != NULL) {
pCol2->pData = tmp;
int32_t offset = pCol2->info.bytes * pDest->info.rows;
memcpy(pCol2->pData + offset, pCol1->pData, pSrc->info.rows * pCol2->info.bytes);
} else {
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* doSort(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SOrderOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pBlock = NULL;
while(1) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
// start to flush data into disk and try do multiway merge sort
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
break;
}
int32_t code = doMergeSDatablock(pInfo->pDataBlock, pBlock);
if (code != TSDB_CODE_SUCCESS) {
// todo handle error
}
}
int32_t numOfCols = pInfo->pDataBlock->info.numOfCols;
void** pCols = calloc(numOfCols, POINTER_BYTES);
SSchema* pSchema = calloc(numOfCols, sizeof(SSchema));
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p1 = taosArrayGet(pInfo->pDataBlock->pDataBlock, i);
pCols[i] = p1->pData;
pSchema[i].colId = p1->info.colId;
pSchema[i].bytes = p1->info.bytes;
pSchema[i].type = p1->info.type;
}
__compar_fn_t comp = getKeyComparFunc(pSchema[pInfo->colIndex].type, pInfo->order);
taoscQSort(pCols, pSchema, numOfCols, pInfo->pDataBlock->info.rows, pInfo->colIndex, comp);
tfree(pCols);
tfree(pSchema);
return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL;
}
SOperatorInfo *createOrderOperatorInfo(SExprInfo* pExpr, int32_t numOfOutput) {
SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo));
{
......@@ -6011,106 +6080,6 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
}
}
static int32_t doMergeSDatablock(SSDataBlock* pDest, SSDataBlock* pSrc) {
assert(pSrc != NULL && pDest != NULL && pDest->info.numOfCols == pSrc->info.numOfCols);
int32_t numOfCols = pSrc->info.numOfCols;
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock->pData, i);
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock->pData, i);
int32_t newSize = (pDest->info.rows + pSrc->info.rows) * pCol2->info.bytes;
char* tmp = realloc(pCol2->pData, newSize);
if (tmp != NULL) {
pCol2->pData = tmp;
int32_t offset = pCol2->info.bytes * pDest->info.rows;
memcpy(pCol2->pData + offset, pCol1->pData, pSrc->info.rows * pCol2->info.bytes);
} else {
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* doSort(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SOrderOperatorInfo* pInfo = pOperator->info;
// SSDataBlock* pRes = pInfo->pRes;
// pRes->info.rows = 0;
SSDataBlock* pBlock = NULL;
while(1) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
// start to flush data into disk and try do multiway merge sort
if (pBlock == NULL) {
// setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
// pOperator->status = OP_EXEC_DONE;
break;
}
int32_t code = doMergeSDatablock(pInfo->pDataBlock, pBlock);
if (code != TSDB_CODE_SUCCESS) {
// return code;
}
/*SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->colIndex);
int16_t bytes = pColInfoData->info.bytes;
int16_t type = pColInfoData->info.type;
// ensure the output buffer size
SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, 0);
if (pRes->info.rows + pBlock->info.rows > pInfo->outputCapacity) {
int32_t newSize = pRes->info.rows + pBlock->info.rows;
char* tmp = realloc(pResultColInfoData->pData, newSize * bytes);
if (tmp == NULL) {
return NULL;
} else {
pResultColInfoData->pData = tmp;
pInfo->outputCapacity = newSize;
}
}
for(int32_t i = 0; i < pBlock->info.rows; ++i) {
char* val = ((char*)pColInfoData->pData) + bytes * i;
if (isNull(val, type)) {
continue;
}
size_t keyLen = 0;
if (IS_VAR_DATA_TYPE(pOperator->pExpr->base.colType)) {
tstr* var = (tstr*)(val);
keyLen = varDataLen(var);
} else {
keyLen = bytes;
}
int dummy;
void* res = taosHashGet(pInfo->pSet, val, keyLen);
if (res == NULL) {
taosHashPut(pInfo->pSet, val, keyLen, &dummy, sizeof(dummy));
char* start = pResultColInfoData->pData + bytes * pInfo->pRes->info.rows;
memcpy(start, val, bytes);
pRes->info.rows += 1;
}
}
if (pRes->info.rows >= pInfo->threshold) {
break;
}*/
}
return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL;
}
// todo set the attribute of query scan count
static int32_t getNumOfScanTimes(SQueryAttr* pQueryAttr) {
for(int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
......
......@@ -12,7 +12,6 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "qExtbuffer.h"
#include "os.h"
#include "qAggMain.h"
#include "queryLog.h"
......@@ -21,6 +20,8 @@
#include "taosmsg.h"
#include "tulog.h"
#include "qExecutor.h"
#include "qExtbuffer.h"
#include "tcompare.h"
#define COLMODEL_GET_VAL(data, schema, allrow, rowId, colId) \
(data + (schema)->pFields[colId].offset * (allrow) + (rowId) * (schema)->pFields[colId].field.bytes)
......@@ -767,6 +768,60 @@ void tColDataQSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta
free(buf);
}
void taoscQSort(void** pCols, SSchema* pSchema, int32_t numOfCols, int32_t numOfRows, int32_t index, __compar_fn_t compareFn) {
assert(numOfRows > 0 && numOfCols > 0 && index >= 0 && index < numOfCols);
int32_t bytes = pSchema[index].bytes;
int32_t size = bytes + sizeof(int32_t);
char* buf = calloc(1, size * numOfRows);
for(int32_t i = 0; i < numOfRows; ++i) {
char* dest = buf + size * i;
memcpy(dest, pCols[index] + bytes * i, bytes);
*(int32_t*)(dest+bytes) = i;
}
qsort(buf, numOfRows, size, compareFn);
int32_t prevLength = 0;
char* p = NULL;
for(int32_t i = 0; i < numOfCols; ++i) {
int32_t bytes1 = pSchema[i].bytes;
if (i == index) {
for(int32_t j = 0; j < numOfRows; ++j){
char* src = buf + (j * size);
char* dest = pCols[i] + (j * bytes1);
memcpy(dest, src, bytes1);
}
} else {
// make sure memory buffer is enough
if (prevLength < bytes1) {
char *tmp = realloc(p, bytes1 * numOfRows);
assert(tmp);
p = tmp;
prevLength = bytes1;
}
memcpy(p, pCols[i], bytes1 * numOfRows);
for(int32_t j = 0; j < numOfRows; ++j){
char* dest = pCols[i] + bytes1 * j;
int32_t newPos = *(int32_t*)(buf + (j * size) + bytes);
char* src = p + (newPos * bytes1);
memcpy(dest, src, bytes1);
}
}
}
tfree(buf);
tfree(p);
}
/*
* deep copy of sschema
*/
......
......@@ -237,7 +237,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
}
pBucket->elemPerPage = (pBucket->bufPageSize - sizeof(tFilePage))/pBucket->bytes;
pBucket->comparFn = getKeyComparFunc(pBucket->type);
pBucket->comparFn = getKeyComparFunc(pBucket->type, TSDB_ORDER_ASC);
pBucket->hashFunc = getHashFunc(pBucket->type);
if (pBucket->hashFunc == NULL) {
......
#include <gtest/gtest.h>
#include <iostream>
#include "taos.h"
#include "tsdb.h"
#include "qExtbuffer.h"
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
namespace {
int32_t comp(const void* p1, const void* p2) {
int32_t* x1 = (int32_t*) p1;
int32_t* x2 = (int32_t*) p2;
if (*x1 == *x2) {
return 0;
} else {
return (*x1 > *x2)? 1:-1;
}
}
int32_t comp1(const void* p1, const void* p2) {
int32_t ret = strncmp((char*) p1, (char*) p2, 20);
if (ret == 0) {
return 0;
} else {
return ret > 0 ? 1:-1;
}
}
}
TEST(testCase, colunmnwise_sort_test) {
// void taoscQSort(void** pCols, SSchema* pSchema, int32_t numOfCols, int32_t numOfRows, int32_t index, __compar_fn_t compareFn)
void* pCols[2] = {0};
SSchema s[2] = {{0}};
s[0].type = TSDB_DATA_TYPE_INT;
s[0].bytes = 4;
s[0].colId = 0;
strcpy(s[0].name, "col1");
s[1].type = TSDB_DATA_TYPE_BINARY;
s[1].bytes = 20;
s[1].colId = 1;
strcpy(s[1].name, "col2");
int32_t* p = (int32_t*) calloc(5, sizeof(int32_t));
p[0] = 12;
p[1] = 8;
p[2] = 99;
p[3] = 7;
p[4] = 1;
char* t1 = (char*) calloc(5, 20);
strcpy(t1, "abc");
strcpy(t1 + 20, "def");
strcpy(t1 + 40, "xyz");
strcpy(t1 + 60, "klm");
strcpy(t1 + 80, "hij");
pCols[0] = (char*) p;
pCols[1] = (char*) t1;
taoscQSort(reinterpret_cast<void**>(pCols), s, 2, 5, 0, comp);
int32_t* px = (int32_t*) pCols[0];
ASSERT_EQ(px[0], 1);
ASSERT_EQ(px[1], 7);
ASSERT_EQ(px[2], 8);
ASSERT_EQ(px[3], 12);
ASSERT_EQ(px[4], 99);
char* px1 = (char*) pCols[1];
ASSERT_STRCASEEQ(px1 + 20 * 0, "hij");
ASSERT_STRCASEEQ(px1 + 20 * 1, "klm");
ASSERT_STRCASEEQ(px1 + 20 * 2, "def");
ASSERT_STRCASEEQ(px1 + 20 * 3, "abc");
ASSERT_STRCASEEQ(px1 + 20 * 4, "xyz");
taoscQSort(pCols, s, 2, 5, 1, comp1);
px = (int32_t*) pCols[0];
ASSERT_EQ(px[0], 12);
ASSERT_EQ(px[1], 8);
ASSERT_EQ(px[2], 1);
ASSERT_EQ(px[3], 7);
ASSERT_EQ(px[4], 99);
px1 = (char*) pCols[1];
ASSERT_STRCASEEQ(px1 + 20 * 0, "abc");
ASSERT_STRCASEEQ(px1 + 20 * 1, "def");
ASSERT_STRCASEEQ(px1 + 20 * 2, "hij");
ASSERT_STRCASEEQ(px1 + 20 * 3, "klm");
ASSERT_STRCASEEQ(px1 + 20 * 4, "xyz");
}
TEST(testCase, columnsort_test) {
SSchema field[1] = {
{TSDB_DATA_TYPE_INT, "k", sizeof(int32_t)},
};
const int32_t num = 2000;
int32_t *d = (int32_t *)malloc(sizeof(int32_t) * num);
for (int32_t i = 0; i < num; ++i) {
d[i] = i % 4;
}
const int32_t numOfOrderCols = 1;
int32_t orderColIdx = 0;
SColumnModel *pModel = createColumnModel(field, 1, 1000);
tOrderDescriptor *pDesc = tOrderDesCreate(&orderColIdx, numOfOrderCols, pModel, 1);
tColDataQSort(pDesc, num, 0, num - 1, (char *)d, 1);
for (int32_t i = 0; i < num; ++i) {
printf("%d\t", d[i]);
}
printf("\n");
destroyColumnModel(pModel);
}
\ No newline at end of file
#include "os.h"
#include <gtest/gtest.h>
#include <cassert>
#include <iostream>
#include "taos.h"
......
......@@ -47,7 +47,7 @@ int WCSPatternMatch(const wchar_t *pattern, const wchar_t *str, size_t size, con
int32_t doCompare(const char* a, const char* b, int32_t type, size_t size);
__compar_fn_t getKeyComparFunc(int32_t keyType);
__compar_fn_t getKeyComparFunc(int32_t keyType, int32_t order);
__compar_fn_t getComparFunc(int32_t type, int32_t optr);
......
......@@ -16,37 +16,52 @@
#include "os.h"
#include "ttype.h"
#include "tcompare.h"
#include "tarray.h"
#include "hash.h"
int32_t compareInt32Val(const void *pLeft, const void *pRight) {
int32_t left = GET_INT32_VAL(pLeft), right = GET_INT32_VAL(pRight);
int32_t compareInt8Val(const void *pLeft, const void *pRight) {
int8_t left = GET_INT8_VAL(pLeft), right = GET_INT8_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
return 0;
}
int32_t compareInt64Val(const void *pLeft, const void *pRight) {
int64_t left = GET_INT64_VAL(pLeft), right = GET_INT64_VAL(pRight);
int32_t compareInt8ValDesc(const void *pLeft, const void *pRight) {
return compareInt8Val(pRight, pLeft);
}
int32_t compareInt16Val(const void *pLeft, const void *pRight) {
int16_t left = GET_INT16_VAL(pLeft), right = GET_INT16_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
return 0;
}
int32_t compareInt16Val(const void *pLeft, const void *pRight) {
int16_t left = GET_INT16_VAL(pLeft), right = GET_INT16_VAL(pRight);
int32_t compareInt16ValDesc(const void* pLeft, const void* pRight) {
return compareInt16Val(pRight, pLeft);
}
int32_t compareInt32Val(const void *pLeft, const void *pRight) {
int32_t left = GET_INT32_VAL(pLeft), right = GET_INT32_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
return 0;
}
int32_t compareInt8Val(const void *pLeft, const void *pRight) {
int8_t left = GET_INT8_VAL(pLeft), right = GET_INT8_VAL(pRight);
int32_t compareInt32ValDesc(const void* pLeft, const void* pRight) {
return compareInt32Val(pRight, pLeft);
}
int32_t compareInt64Val(const void *pLeft, const void *pRight) {
int64_t left = GET_INT64_VAL(pLeft), right = GET_INT64_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
return 0;
}
int32_t compareInt64ValDesc(const void* pLeft, const void* pRight) {
return compareInt64Val(pRight, pLeft);
}
int32_t compareUint32Val(const void *pLeft, const void *pRight) {
int32_t left = GET_UINT32_VAL(pLeft), right = GET_UINT32_VAL(pRight);
if (left > right) return 1;
......@@ -54,6 +69,10 @@ int32_t compareUint32Val(const void *pLeft, const void *pRight) {
return 0;
}
int32_t compareUint32ValDesc(const void* pLeft, const void* pRight) {
return compareUint32Val(pRight, pLeft);
}
int32_t compareUint64Val(const void *pLeft, const void *pRight) {
int64_t left = GET_UINT64_VAL(pLeft), right = GET_UINT64_VAL(pRight);
if (left > right) return 1;
......@@ -61,6 +80,10 @@ int32_t compareUint64Val(const void *pLeft, const void *pRight) {
return 0;
}
int32_t compareUint64ValDesc(const void* pLeft, const void* pRight) {
return compareUint64Val(pRight, pLeft);
}
int32_t compareUint16Val(const void *pLeft, const void *pRight) {
int16_t left = GET_UINT16_VAL(pLeft), right = GET_UINT16_VAL(pRight);
if (left > right) return 1;
......@@ -68,6 +91,10 @@ int32_t compareUint16Val(const void *pLeft, const void *pRight) {
return 0;
}
int32_t compareUint16ValDesc(const void* pLeft, const void* pRight) {
return compareUint16Val(pRight, pLeft);
}
int32_t compareUint8Val(const void* pLeft, const void* pRight) {
uint8_t left = GET_UINT8_VAL(pLeft), right = GET_UINT8_VAL(pRight);
if (left > right) return 1;
......@@ -75,6 +102,10 @@ int32_t compareUint8Val(const void* pLeft, const void* pRight) {
return 0;
}
int32_t compareUint8ValDesc(const void* pLeft, const void* pRight) {
return compareUint8Val(pRight, pLeft);
}
int32_t compareFloatVal(const void *pLeft, const void *pRight) {
float p1 = GET_FLOAT_VAL(pLeft);
float p2 = GET_FLOAT_VAL(pRight);
......@@ -96,6 +127,10 @@ int32_t compareFloatVal(const void *pLeft, const void *pRight) {
return FLT_GREATER(p1, p2) ? 1: -1;
}
int32_t compareFloatValDesc(const void* pLeft, const void* pRight) {
return compareFloatVal(pRight, pLeft);
}
int32_t compareDoubleVal(const void *pLeft, const void *pRight) {
double p1 = GET_DOUBLE_VAL(pLeft);
double p2 = GET_DOUBLE_VAL(pRight);
......@@ -117,6 +152,10 @@ int32_t compareDoubleVal(const void *pLeft, const void *pRight) {
return FLT_GREATER(p1, p2) ? 1: -1;
}
int32_t compareDoubleValDesc(const void* pLeft, const void* pRight) {
return compareDoubleVal(pRight, pLeft);
}
int32_t compareLenPrefixedStr(const void *pLeft, const void *pRight) {
int32_t len1 = varDataLen(pLeft);
int32_t len2 = varDataLen(pRight);
......@@ -133,6 +172,10 @@ int32_t compareLenPrefixedStr(const void *pLeft, const void *pRight) {
}
}
int32_t compareLenPrefixedStrDesc(const void* pLeft, const void* pRight) {
return compareLenPrefixedStr(pRight, pLeft);
}
int32_t compareLenPrefixedWStr(const void *pLeft, const void *pRight) {
int32_t len1 = varDataLen(pLeft);
int32_t len2 = varDataLen(pRight);
......@@ -149,6 +192,10 @@ int32_t compareLenPrefixedWStr(const void *pLeft, const void *pRight) {
}
}
int32_t compareLenPrefixedWStrDesc(const void* pLeft, const void* pRight) {
return compareLenPrefixedWStr(pRight, pLeft);
}
/*
* Compare two strings
* TSDB_MATCH: Match
......@@ -349,50 +396,50 @@ __compar_fn_t getComparFunc(int32_t type, int32_t optr) {
return comparFn;
}
__compar_fn_t getKeyComparFunc(int32_t keyType) {
__compar_fn_t getKeyComparFunc(int32_t keyType, int32_t order) {
__compar_fn_t comparFn = NULL;
switch (keyType) {
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_BOOL:
comparFn = compareInt8Val;
comparFn = (order == TSDB_ORDER_ASC)? compareInt8Val:compareInt8ValDesc;
break;
case TSDB_DATA_TYPE_SMALLINT:
comparFn = compareInt16Val;
comparFn = (order == TSDB_ORDER_ASC)? compareInt16Val:compareInt16ValDesc;
break;
case TSDB_DATA_TYPE_INT:
comparFn = compareInt32Val;
comparFn = (order == TSDB_ORDER_ASC)? compareInt32Val:compareInt32ValDesc;
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP:
comparFn = compareInt64Val;
comparFn = (order == TSDB_ORDER_ASC)? compareInt64Val:compareInt64ValDesc;
break;
case TSDB_DATA_TYPE_FLOAT:
comparFn = compareFloatVal;
comparFn = (order == TSDB_ORDER_ASC)? compareFloatVal:compareFloatValDesc;
break;
case TSDB_DATA_TYPE_DOUBLE:
comparFn = compareDoubleVal;
comparFn = (order == TSDB_ORDER_ASC)? compareDoubleVal:compareDoubleValDesc;
break;
case TSDB_DATA_TYPE_UTINYINT:
comparFn = compareUint8Val;
comparFn = (order == TSDB_ORDER_ASC)? compareUint8Val:compareUint8ValDesc;
break;
case TSDB_DATA_TYPE_USMALLINT:
comparFn = compareUint16Val;
comparFn = (order == TSDB_ORDER_ASC)? compareUint16Val:compareUint16ValDesc;
break;
case TSDB_DATA_TYPE_UINT:
comparFn = compareUint32Val;
comparFn = (order == TSDB_ORDER_ASC)? compareUint32Val:compareUint32ValDesc;
break;
case TSDB_DATA_TYPE_UBIGINT:
comparFn = compareUint64Val;
comparFn = (order == TSDB_ORDER_ASC)? compareUint64Val:compareUint64ValDesc;
break;
case TSDB_DATA_TYPE_BINARY:
comparFn = compareLenPrefixedStr;
comparFn = (order == TSDB_ORDER_ASC)? compareLenPrefixedStr:compareLenPrefixedStrDesc;
break;
case TSDB_DATA_TYPE_NCHAR:
comparFn = compareLenPrefixedWStr;
comparFn = (order == TSDB_ORDER_ASC)? compareLenPrefixedWStr:compareLenPrefixedWStrDesc;
break;
default:
comparFn = compareInt32Val;
comparFn = (order == TSDB_ORDER_ASC)? compareInt32Val:compareInt32ValDesc;
break;
}
......
......@@ -54,7 +54,7 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, _
pSkipList->keyFn = fn;
pSkipList->seed = rand();
if (comparFn == NULL) {
pSkipList->comparFn = getKeyComparFunc(keyType);
pSkipList->comparFn = getKeyComparFunc(keyType, TSDB_ORDER_ASC);
} else {
pSkipList->comparFn = comparFn;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册