未验证 提交 28338ec6 编写于 作者: X xiao-yu-wang 提交者: GitHub

Merge branch 'develop' into feature/TD-10986

......@@ -233,7 +233,7 @@ static int32_t tscFlushTmpBufferImpl(tExtMemBuffer *pMemoryBuf, tOrderDescriptor
// sort before flush to disk, the data must be consecutively put on tFilePage.
if (pDesc->orderInfo.numOfCols > 0) {
tColDataQSort(pDesc, (int32_t)pPage->num, 0, (int32_t)pPage->num - 1, pPage->data, orderType);
tColDataMergeSort(pDesc, (int32_t)pPage->num, 0, (int32_t)pPage->num - 1, pPage->data, orderType);
}
#ifdef _DEBUG_VIEW
......@@ -364,7 +364,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SQueryInfo*
SExprInfo* pExprInfo = tscExprGet(pQueryInfo, j);
int32_t functionId = pExprInfo->base.functionId;
if (pColIndex->colId == pExprInfo->base.colInfo.colId && (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TAG)) {
if (pColIndex->colId == pExprInfo->base.colInfo.colId && (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ)) {
orderColIndexList[i] = j;
break;
}
......
......@@ -436,7 +436,7 @@ int32_t readFromFile(char *name, uint32_t *len, void **buf) {
int32_t handleUserDefinedFunc(SSqlObj* pSql, struct SSqlInfo* pInfo) {
const char *msg1 = "invalidate function name";
const char *msg1 = "invalid function name or length";
const char *msg2 = "path is too long";
const char *msg3 = "invalid outputtype";
#ifdef LUA_EMBEDDED
......@@ -1488,7 +1488,7 @@ static bool validateTableColumnInfo(SArray* pFieldList, SSqlCmd* pCmd) {
const char* msg3 = "duplicated column names";
const char* msg4 = "invalid data type";
const char* msg5 = "invalid binary/nchar column length";
const char* msg6 = "invalid column name";
const char* msg6 = "invalid column name or length";
const char* msg7 = "too many columns";
// number of fields no less than 2
......@@ -1559,7 +1559,7 @@ static bool validateTagParams(SArray* pTagsList, SArray* pFieldList, SSqlCmd* pC
const char* msg3 = "duplicated column names";
//const char* msg4 = "timestamp not allowed in tags";
const char* msg5 = "invalid data type in tags";
const char* msg6 = "invalid tag name";
const char* msg6 = "invalid tag name or length";
const char* msg7 = "invalid binary/nchar tag length";
// number of fields at least 1
......@@ -1628,7 +1628,7 @@ static bool validateTagParams(SArray* pTagsList, SArray* pFieldList, SSqlCmd* pC
*/
int32_t validateOneTag(SSqlCmd* pCmd, TAOS_FIELD* pTagField) {
const char* msg3 = "tag length too long";
const char* msg4 = "invalid tag name";
const char* msg4 = "invalid tag name or length";
const char* msg5 = "invalid binary/nchar tag length";
const char* msg6 = "invalid data type in tags";
const char* msg7 = "too many columns";
......@@ -1701,7 +1701,7 @@ int32_t validateOneColumn(SSqlCmd* pCmd, TAOS_FIELD* pColField) {
const char* msg1 = "too many columns";
const char* msg3 = "column length too long";
const char* msg4 = "invalid data type";
const char* msg5 = "invalid column name";
const char* msg5 = "invalid column name or length";
const char* msg6 = "invalid column length";
// assert(pCmd->numOfClause == 1);
......@@ -2059,7 +2059,7 @@ int32_t validateSelectNodeList(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SArray* pS
const char* msg8 = "not support distinct in nest query";
const char* msg9 = "_block_dist not support subquery, only support stable/table";
const char* msg10 = "not support group by in block func";
const char* msg11 = "invalid alias name";
const char* msg11 = "invalid alias name or length";
// too many result columns not support order by in query
if (taosArrayGetSize(pSelNodeList) > TSDB_MAX_COLUMNS) {
......@@ -6803,6 +6803,10 @@ int32_t validateLocalConfig(SMiscInfo* pOptions) {
}
int32_t validateColumnName(char* name) {
if (strlen(name) == 0) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
bool ret = taosIsKeyWordToken(name, (int32_t)strlen(name));
if (ret) {
return TSDB_CODE_TSC_INVALID_OPERATION;
......
......@@ -268,10 +268,6 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
functionId != TSDB_FUNC_TS &&
functionId != TSDB_FUNC_ARITHM &&
functionId != TSDB_FUNC_TS_COMP &&
functionId != TSDB_FUNC_DIFF &&
functionId != TSDB_FUNC_DERIVATIVE &&
functionId != TSDB_FUNC_MAVG &&
functionId != TSDB_FUNC_CSUM &&
functionId != TSDB_FUNC_TS_DUMMY &&
functionId != TSDB_FUNC_TID_TAG &&
functionId != TSDB_FUNC_CEIL &&
......@@ -3465,6 +3461,7 @@ int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) {
pQueryInfo->sessionWindow = pSrc->sessionWindow;
pQueryInfo->pTableMetaInfo = NULL;
pQueryInfo->multigroupResult = pSrc->multigroupResult;
pQueryInfo->stateWindow = pSrc->stateWindow;
pQueryInfo->bufLen = pSrc->bufLen;
pQueryInfo->orderProjectQuery = pSrc->orderProjectQuery;
......
......@@ -141,6 +141,8 @@ extern char configDir[];
{ TSDB_DATA_TYPE_FLOAT, TSDB_DATA_TYPE_INT, TSDB_DATA_TYPE_FLOAT }
#define DEFAULT_DATATYPE \
{ "FLOAT", "INT", "FLOAT" }
#define DEFAULT_DATALENGTH \
{ 4, 4, 4 }
#define DEFAULT_BINWIDTH 64
#define DEFAULT_COL_COUNT 4
#define DEFAULT_LEN_ONE_ROW 76
......@@ -306,6 +308,7 @@ typedef struct SArguments_S {
bool async_mode;
char data_type[MAX_NUM_COLUMNS + 1];
char * dataType[MAX_NUM_COLUMNS + 1];
int32_t data_length[MAX_NUM_COLUMNS + 1];
uint32_t binwidth;
uint32_t columnCount;
uint64_t lenOfOneRow;
......
......@@ -1338,9 +1338,10 @@ void setParaFromArg() {
g_args.prepared_rand = min(g_args.insertRows, MAX_PREPARED_RAND);
g_Dbs.aggr_func = g_args.aggr_func;
char dataString[TSDB_MAX_BYTES_PER_ROW];
char * data_type = g_args.data_type;
char **dataType = g_args.dataType;
char dataString[TSDB_MAX_BYTES_PER_ROW];
char * data_type = g_args.data_type;
char ** dataType = g_args.dataType;
int32_t *data_length = g_args.data_length;
memset(dataString, 0, TSDB_MAX_BYTES_PER_ROW);
......@@ -1469,6 +1470,47 @@ void setParaFromArg() {
} else {
g_Dbs.threadCountForCreateTbl = g_args.nthreads;
g_Dbs.db[0].superTbls[0].tagCount = 0;
for (int i = 0; i < MAX_NUM_COLUMNS; i++) {
if (data_type[i] == TSDB_DATA_TYPE_NULL) {
break;
}
if (1 == regexMatch(dataType[i],
"^(NCHAR|BINARY)(\\([1-9][0-9]*\\))$",
REG_ICASE | REG_EXTENDED)) {
sscanf(dataType[i], "%[^(](%[^)]", type, length);
data_length[i] = atoi(length);
} else {
switch (data_type[i]) {
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_TINYINT:
data_length[i] = sizeof(char);
break;
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_USMALLINT:
data_length[i] = sizeof(int16_t);
break;
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT:
data_length[i] = sizeof(int32_t);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
data_length[i] = sizeof(int64_t);
break;
case TSDB_DATA_TYPE_FLOAT:
data_length[i] = sizeof(float);
break;
case TSDB_DATA_TYPE_DOUBLE:
data_length[i] = sizeof(double);
break;
default:
data_length[i] = g_args.binwidth;
break;
}
}
}
}
}
......@@ -1698,8 +1740,15 @@ void *queryNtableAggrFunc(void *sarg) {
double totalT = 0;
uint64_t count = 0;
for (int64_t i = 0; i < ntables; i++) {
sprintf(command, "SELECT %s FROM %s%" PRId64 " WHERE ts>= %" PRIu64,
aggreFunc[j], tb_prefix, i, startTime);
if (g_args.escapeChar) {
sprintf(command,
"SELECT %s FROM `%s%" PRId64 "` WHERE ts>= %" PRIu64,
aggreFunc[j], tb_prefix, i, startTime);
} else {
sprintf(command,
"SELECT %s FROM %s%" PRId64 " WHERE ts>= %" PRIu64,
aggreFunc[j], tb_prefix, i, startTime);
}
double t = (double)taosGetTimestampUs();
debugPrint("%s() LN%d, sql command: %s\n", __func__, __LINE__,
......@@ -1708,9 +1757,9 @@ void *queryNtableAggrFunc(void *sarg) {
int32_t code = taos_errno(pSql);
if (code != 0) {
errorPrint("Failed to query:%s\n", taos_errstr(pSql));
errorPrint("Failed to query <%s>, reason:%s\n", command,
taos_errstr(pSql));
taos_free_result(pSql);
taos_close(taos);
fclose(fp);
free(command);
return NULL;
......
......@@ -868,8 +868,8 @@ int64_t generateStbRowData(SSuperTable *stbInfo, char *recBuf,
return strlen(recBuf);
}
static int64_t generateData(char *recBuf, char *data_type, int64_t timestamp,
int lenOfBinary) {
static int64_t generateData(char *recBuf, char *data_type, int32_t *data_length,
int64_t timestamp) {
memset(recBuf, 0, MAX_DATA_SIZE);
char *pstr = recBuf;
pstr += sprintf(pstr, "(%" PRId64 "", timestamp);
......@@ -915,13 +915,13 @@ static int64_t generateData(char *recBuf, char *data_type, int64_t timestamp,
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
s = calloc(1, lenOfBinary + 1);
s = calloc(1, data_length[i] + 1);
if (NULL == s) {
errorPrint("%s", "failed to allocate memory\n");
return -1;
}
rand_string(s, lenOfBinary);
rand_string(s, data_length[i]);
pstr += sprintf(pstr, ",\"%s\"", s);
free(s);
break;
......@@ -1156,20 +1156,18 @@ static int32_t generateDataTailWithoutStb(
int64_t retLen = 0;
char *data_type = g_args.data_type;
int lenOfBinary = g_args.binwidth;
char * data_type = g_args.data_type;
int32_t *data_length = g_args.data_length;
if (g_args.disorderRatio) {
retLen =
generateData(data, data_type,
generateData(data, data_type, data_length,
startTime + getTSRandTail(g_args.timestamp_step, k,
g_args.disorderRatio,
g_args.disorderRange),
lenOfBinary);
g_args.disorderRange));
} else {
retLen = generateData(data, data_type,
startTime + g_args.timestamp_step * k,
lenOfBinary);
retLen = generateData(data, data_type, data_length,
startTime + g_args.timestamp_step * k);
}
if (len > remainderBufLen) break;
......
......@@ -1043,10 +1043,8 @@ int createChildTables() {
// normal table
len = snprintf(tblColsBuf, TSDB_MAX_BYTES_PER_ROW, "(TS TIMESTAMP");
for (int j = 0; j < g_args.columnCount; j++) {
if ((strncasecmp(g_args.dataType[j], "BINARY",
strlen("BINARY")) == 0) ||
(strncasecmp(g_args.dataType[j], "NCHAR",
strlen("NCHAR")) == 0)) {
if ((strcasecmp(g_args.dataType[j], "BINARY") == 0) ||
(strcasecmp(g_args.dataType[j], "NCHAR") == 0)) {
snprintf(tblColsBuf + len, TSDB_MAX_BYTES_PER_ROW - len,
",C%d %s(%d)", j, g_args.dataType[j],
g_args.binwidth);
......
......@@ -45,6 +45,7 @@ SArguments g_args = {
DEFAULT_SYNC_MODE, // mode : sync or async
DEFAULT_DATA_TYPE, // data_type
DEFAULT_DATATYPE, // dataType
DEFAULT_DATALENGTH, // data_length
DEFAULT_BINWIDTH, // binwidth
DEFAULT_COL_COUNT, // columnCount, timestamp + float + int + float
DEFAULT_LEN_ONE_ROW, // lenOfOneRow
......
......@@ -53,7 +53,7 @@ static void httpStopThread(HttpThread *pThread) {
break;
}
} while (0);
if (r) {
if (r && taosCheckPthreadValid(pThread->thread)) {
pthread_cancel(pThread->thread);
}
#else
......@@ -63,15 +63,21 @@ static void httpStopThread(HttpThread *pThread) {
httpError("%s, failed to create eventfd, will call pthread_cancel instead, which may result in data corruption: %s",
pThread->label, strerror(errno));
pThread->stop = true;
pthread_cancel(pThread->thread);
if (taosCheckPthreadValid(pThread->thread)) {
pthread_cancel(pThread->thread);
}
} else if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
httpError("%s, failed to call epoll_ctl, will call pthread_cancel instead, which may result in data corruption: %s",
pThread->label, strerror(errno));
pthread_cancel(pThread->thread);
if (taosCheckPthreadValid(pThread->thread)) {
pthread_cancel(pThread->thread);
}
}
#endif // __APPLE__
pthread_join(pThread->thread, NULL);
if (taosCheckPthreadValid(pThread->thread)) {
pthread_join(pThread->thread, NULL);
}
#ifdef __APPLE__
if (sv[0] != -1) {
......
......@@ -236,6 +236,9 @@ typedef int (*__col_compar_fn_t)(tOrderDescriptor *, int32_t numOfRows, int32_t
void tColDataQSort(tOrderDescriptor *, int32_t numOfRows, int32_t start, int32_t end, char *data, int32_t orderType);
void tColDataMergeSort(tOrderDescriptor *, int32_t numOfRows, int32_t start, int32_t end, char *data, int32_t orderType);
void taoscQSort(void** pCols, SSchema* pSchema, int32_t numOfCols, int32_t numOfRows, int32_t index, __compar_fn_t compareFn);
int32_t compare_sa(tOrderDescriptor *, int32_t numOfRows, int32_t idx1, int32_t idx2, char *data);
......
......@@ -202,6 +202,14 @@ typedef struct SElapsedInfo {
TSKEY max;
} SElapsedInfo;
typedef struct {
bool valueAssigned;
union {
int64_t i64Prev;
double d64Prev;
};
} SDiffFuncInfo;
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
int32_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo) {
if (!isValidDataType(dataType)) {
......@@ -220,10 +228,13 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
if (functionId == TSDB_FUNC_INTERP) {
*interBytes = sizeof(SInterpInfoDetail);
} else if (functionId == TSDB_FUNC_DIFF) {
*interBytes = sizeof(SDiffFuncInfo);
} else {
*interBytes = 0;
}
return TSDB_CODE_SUCCESS;
}
......@@ -2999,18 +3010,16 @@ static void full_copy_function(SQLFunctionCtx *pCtx) {
}
}
enum {
INITIAL_VALUE_NOT_ASSIGNED = 0,
};
static bool diff_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) {
if (!function_setup(pCtx, pResInfo)) {
return false;
}
// diff function require the value is set to -1
pCtx->param[1].nType = INITIAL_VALUE_NOT_ASSIGNED;
return false;
SDiffFuncInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
pDiffInfo->valueAssigned = false;
pDiffInfo->i64Prev = 0;
return true;
}
static bool deriv_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResultInfo) {
......@@ -3216,22 +3225,14 @@ static void deriv_function(SQLFunctionCtx *pCtx) {
GET_RES_INFO(pCtx)->numOfRes += notNullElems;
}
#define DIFF_IMPL(ctx, d, type) \
do { \
if ((ctx)->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED) { \
(ctx)->param[1].nType = (ctx)->inputType; \
*(type *)&(ctx)->param[1].i64 = *(type *)(d); \
} else { \
*(type *)(ctx)->pOutput = *(type *)(d) - (*(type *)(&(ctx)->param[1].i64)); \
*(type *)(&(ctx)->param[1].i64) = *(type *)(d); \
*(int64_t *)(ctx)->ptsOutputBuf = GET_TS_DATA(ctx, index); \
} \
} while (0);
// TODO difference in date column
static void diff_function(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SDiffFuncInfo *pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
void *data = GET_INPUT_DATA_LIST(pCtx);
bool isFirstBlock = (pCtx->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED);
bool isFirstBlock = (pDiffInfo->valueAssigned == false);
int32_t notNullElems = 0;
......@@ -3251,15 +3252,15 @@ static void diff_function(SQLFunctionCtx *pCtx) {
continue;
}
if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
*pOutput = (int32_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null
if (pDiffInfo->valueAssigned) {
*pOutput = (int32_t)(pData[i] - pDiffInfo->i64Prev); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0;
pOutput += 1;
pTimestamp += 1;
}
pCtx->param[1].i64 = pData[i];
pCtx->param[1].nType = pCtx->inputType;
pDiffInfo->i64Prev = pData[i];
pDiffInfo->valueAssigned = true;
notNullElems++;
}
break;
......@@ -3273,15 +3274,15 @@ static void diff_function(SQLFunctionCtx *pCtx) {
continue;
}
if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
*pOutput = pData[i] - pCtx->param[1].i64; // direct previous may be null
if (pDiffInfo->valueAssigned) {
*pOutput = pData[i] - pDiffInfo->i64Prev; // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0;
pOutput += 1;
pTimestamp += 1;
}
pCtx->param[1].i64 = pData[i];
pCtx->param[1].nType = pCtx->inputType;
pDiffInfo->i64Prev = pData[i];
pDiffInfo->valueAssigned = true;
notNullElems++;
}
break;
......@@ -3295,15 +3296,15 @@ static void diff_function(SQLFunctionCtx *pCtx) {
continue;
}
if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
SET_DOUBLE_VAL(pOutput, pData[i] - pCtx->param[1].dKey); // direct previous may be null
if (pDiffInfo->valueAssigned) { // initial value is not set yet
SET_DOUBLE_VAL(pOutput, pData[i] - pDiffInfo->d64Prev); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0;
pOutput += 1;
pTimestamp += 1;
}
pCtx->param[1].dKey = pData[i];
pCtx->param[1].nType = pCtx->inputType;
pDiffInfo->d64Prev = pData[i];
pDiffInfo->valueAssigned = true;
notNullElems++;
}
break;
......@@ -3317,15 +3318,15 @@ static void diff_function(SQLFunctionCtx *pCtx) {
continue;
}
if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
*pOutput = (float)(pData[i] - pCtx->param[1].dKey); // direct previous may be null
if (pDiffInfo->valueAssigned) { // initial value is not set yet
*pOutput = (float)(pData[i] - pDiffInfo->d64Prev); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0;
pOutput += 1;
pTimestamp += 1;
}
pCtx->param[1].dKey = pData[i];
pCtx->param[1].nType = pCtx->inputType;
pDiffInfo->d64Prev = pData[i];
pDiffInfo->valueAssigned = true;
notNullElems++;
}
break;
......@@ -3339,15 +3340,15 @@ static void diff_function(SQLFunctionCtx *pCtx) {
continue;
}
if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
*pOutput = (int16_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null
if (pDiffInfo->valueAssigned) { // initial value is not set yet
*pOutput = (int16_t)(pData[i] - pDiffInfo->i64Prev); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0;
pOutput += 1;
pTimestamp += 1;
}
pCtx->param[1].i64 = pData[i];
pCtx->param[1].nType = pCtx->inputType;
pDiffInfo->i64Prev = pData[i];
pDiffInfo->valueAssigned = true;
notNullElems++;
}
break;
......@@ -3362,15 +3363,15 @@ static void diff_function(SQLFunctionCtx *pCtx) {
continue;
}
if (pCtx->param[1].nType != INITIAL_VALUE_NOT_ASSIGNED) { // initial value is not set yet
*pOutput = (int8_t)(pData[i] - pCtx->param[1].i64); // direct previous may be null
if (pDiffInfo->valueAssigned) { // initial value is not set yet
*pOutput = (int8_t)(pData[i] - pDiffInfo->i64Prev); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0;
pOutput += 1;
pTimestamp += 1;
}
pCtx->param[1].i64 = pData[i];
pCtx->param[1].nType = pCtx->inputType;
pDiffInfo->i64Prev = pData[i];
pDiffInfo->valueAssigned = true;
notNullElems++;
}
break;
......@@ -3380,7 +3381,7 @@ static void diff_function(SQLFunctionCtx *pCtx) {
}
// initial value is not set yet
if (pCtx->param[1].nType == INITIAL_VALUE_NOT_ASSIGNED || notNullElems <= 0) {
if (!pDiffInfo->valueAssigned || notNullElems <= 0) {
/*
* 1. current block and blocks before are full of null
* 2. current block may be null value
......
......@@ -641,6 +641,89 @@ static UNUSED_FUNC void tRowModelDisplay(tOrderDescriptor *pDescriptor, int32_t
printf("\n");
}
static void mergeSortIndicesByOrderColumns(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t start, int32_t end, char *data,
int32_t orderType, __col_compar_fn_t compareFn, int32_t* indices, int32_t* aux) {
if (end <= start) {
return;
}
int32_t mid = start + (end-start)/2;
mergeSortIndicesByOrderColumns(pDescriptor, numOfRows, start, mid, data, orderType, compareFn, indices, aux);
mergeSortIndicesByOrderColumns(pDescriptor, numOfRows, mid+1, end, data, orderType, compareFn, indices, aux);
int32_t left = start;
int32_t right = mid + 1;
int32_t k;
for (k = start; k <= end; ++k) {
if (left == mid+1) {
aux[k] = indices[right];
++right;
} else if (right == end+1) {
aux[k] = indices[left];
++left;
} else {
int32_t ret = compareFn(pDescriptor, numOfRows, indices[left], indices[right], data);
if (ret <= 0) {
aux[k] = indices[left];
++left;
} else {
aux[k] = indices[right];
++right;
}
}
}
for (k = start; k <= end; ++k) {
indices[k] = aux[k];
}
}
static void columnwiseMergeSortImpl(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t start, int32_t end, char* data,
int32_t orderType, __col_compar_fn_t compareFn) {
int32_t* indices = malloc(numOfRows * sizeof(int32_t));
int32_t* aux = malloc(numOfRows * sizeof(int32_t));
for (int32_t i = 0; i < numOfRows; ++i) {
indices[i] = i;
}
mergeSortIndicesByOrderColumns(pDescriptor, numOfRows, 0, numOfRows-1, data, orderType, compareFn, indices, aux);
int32_t numOfCols = pDescriptor->pColumnModel->numOfCols;
int32_t prevLength = 0;
char* p = NULL;
for(int32_t i = 0; i < numOfCols; ++i) {
int16_t colOffset = getColumnModelOffset(pDescriptor->pColumnModel, i);
int32_t colBytes = pDescriptor->pColumnModel->pFields[i].field.bytes;
// make sure memory buffer is enough
if (prevLength < colBytes) {
char *tmp = realloc(p, colBytes * numOfRows);
assert(tmp);
p = tmp;
prevLength = colBytes;
}
char* colData = data + colOffset * numOfRows;
memcpy(p, colData, colBytes * numOfRows);
for(int32_t j = 0; j < numOfRows; ++j){
char* dest = colData + colBytes * j;
int32_t newPos = indices[j];
char* src = p + (newPos * colBytes);
memcpy(dest, src, colBytes);
}
}
tfree(p);
tfree(aux);
tfree(indices);
}
static void columnwiseQSortImpl(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t start, int32_t end, char *data,
int32_t orderType, __col_compar_fn_t compareFn, void* buf) {
#ifdef _DEBUG_VIEW
......@@ -742,9 +825,9 @@ static void columnwiseQSortImpl(tOrderDescriptor *pDescriptor, int32_t numOfRows
}
}
void tColDataQSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t start, int32_t end, char *data, int32_t order) {
void tColDataQSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t start, int32_t end, char *data, int32_t orderType) {
// short array sort, incur another sort procedure instead of quick sort process
__col_compar_fn_t compareFn = (order == TSDB_ORDER_ASC) ? compare_sa : compare_sd;
__col_compar_fn_t compareFn = (orderType == TSDB_ORDER_ASC) ? compare_sa : compare_sd;
SColumnModel* pModel = pDescriptor->pColumnModel;
......@@ -762,12 +845,40 @@ void tColDataQSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta
if (end - start + 1 <= 8) {
tColDataInsertSort(pDescriptor, numOfRows, start, end, data, compareFn, buf);
} else {
columnwiseQSortImpl(pDescriptor, numOfRows, start, end, data, order, compareFn, buf);
columnwiseQSortImpl(pDescriptor, numOfRows, start, end, data, orderType, compareFn, buf);
}
free(buf);
}
void tColDataMergeSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t start, int32_t end, char *data, int32_t orderType) {
// short array sort, incur another sort procedure instead of quick sort process
__col_compar_fn_t compareFn = (orderType == TSDB_ORDER_ASC) ? compare_sa : compare_sd;
SColumnModel* pModel = pDescriptor->pColumnModel;
size_t width = 0;
for(int32_t i = 0; i < pModel->numOfCols; ++i) {
SSchema1* pSchema = &pModel->pFields[i].field;
if (width < pSchema->bytes) {
width = pSchema->bytes;
}
}
char* buf = malloc(width);
assert(width > 0 && buf != NULL);
if (end - start + 1 <= 8) {
tColDataInsertSort(pDescriptor, numOfRows, start, end, data, compareFn, buf);
} else {
columnwiseMergeSortImpl(pDescriptor, numOfRows, start, end, data, orderType, compareFn);
}
free(buf);
}
/*
* deep copy of sschema
*/
......
......@@ -680,7 +680,7 @@ void tSetColumnInfo(TAOS_FIELD *pField, SStrToken *pName, TAOS_FIELD *pType) {
// column name is too long, set the it to be invalid.
if ((int32_t) pName->n >= maxLen) {
pName->n = -1;
pField->name[0] = 0;
} else {
strncpy(pField->name, pName->z, pName->n);
pField->name[pName->n] = 0;
......
......@@ -38,6 +38,7 @@ run general/compute/stddev.sim
run general/compute/sum.sim
run general/compute/top.sim
run general/compute/block_dist.sim
run general/compute/table_group.sim
run general/db/alter_option.sim
run general/db/alter_tables_d2.sim
run general/db/alter_tables_v1.sim
......
此差异已折叠。
......@@ -20,3 +20,4 @@ run general/compute/stddev.sim
run general/compute/sum.sim
run general/compute/top.sim
run general/compute/block_dist.sim
run general/compute/table_group.sim
......@@ -38,6 +38,7 @@ run general/compute/stddev.sim
run general/compute/sum.sim
run general/compute/top.sim
run general/compute/block_dist.sim
run general/compute/table_group.sim
run general/db/alter_option.sim
run general/db/alter_tables_d2.sim
run general/db/alter_tables_v1.sim
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册