提交 df08ffd2 编写于 作者: G Ganlin Zhao

Merge branch 'develop' into fix/TD-11248

......@@ -468,8 +468,8 @@ function install_service_on_systemd() {
function install_taosadapter_service() {
if ((${service_mod}==0)); then
[ -f ${script_dir}/cfg/taosadapter.service ] &&\
${csudo} cp ${script_dir}/cfg/taosadapter.service \
[ -f ${script_dir}/../cfg/taosadapter.service ] &&\
${csudo} cp ${script_dir}/../cfg/taosadapter.service \
${service_config_dir}/ || :
${csudo} systemctl daemon-reload
fi
......
......@@ -62,7 +62,7 @@ typedef struct SRetrieveSupport {
uint32_t numOfRetry; // record the number of retry times
} SRetrieveSupport;
int32_t tscCreateGlobalMergerEnv(SQueryInfo* pQueryInfo, tExtMemBuffer ***pMemBuffer, int32_t numOfSub, tOrderDescriptor **pDesc, uint32_t nBufferSize, int64_t id);
int32_t tscCreateGlobalMergerEnv(SQueryInfo* pQueryInfo, tExtMemBuffer ***pMemBuffer, int32_t numOfSub, tOrderDescriptor **pDesc, uint32_t* nBufferSize, int64_t id);
void tscDestroyGlobalMergerEnv(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, int32_t numOfVnodes);
......
......@@ -407,8 +407,8 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SQueryInfo*
}
int32_t tscCreateGlobalMergerEnv(SQueryInfo *pQueryInfo, tExtMemBuffer ***pMemBuffer, int32_t numOfSub,
tOrderDescriptor **pOrderDesc, uint32_t nBufferSizes, int64_t id) {
SSchema *pSchema = NULL;
tOrderDescriptor **pOrderDesc, uint32_t* nBufferSizes, int64_t id) {
SSchema1 *pSchema = NULL;
SColumnModel *pModel = NULL;
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......@@ -421,7 +421,7 @@ int32_t tscCreateGlobalMergerEnv(SQueryInfo *pQueryInfo, tExtMemBuffer ***pMemBu
size_t size = tscNumOfExprs(pQueryInfo);
pSchema = (SSchema *)calloc(1, sizeof(SSchema) * size);
pSchema = (SSchema1 *)calloc(1, sizeof(SSchema1) * size);
if (pSchema == NULL) {
tscError("0x%"PRIx64" failed to allocate memory", id);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -440,7 +440,10 @@ int32_t tscCreateGlobalMergerEnv(SQueryInfo *pQueryInfo, tExtMemBuffer ***pMemBu
int32_t capacity = 0;
if (rlen != 0) {
capacity = nBufferSizes / rlen;
if ((*nBufferSizes) < rlen) {
(*nBufferSizes) = rlen * 2;
}
capacity = (*nBufferSizes) / rlen;
}
pModel = createColumnModel(pSchema, (int32_t)size, capacity);
......@@ -457,7 +460,7 @@ int32_t tscCreateGlobalMergerEnv(SQueryInfo *pQueryInfo, tExtMemBuffer ***pMemBu
assert(numOfSub <= pTableMetaInfo->vgroupList->numOfVgroups);
for (int32_t i = 0; i < numOfSub; ++i) {
(*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pg, pModel);
(*pMemBuffer)[i] = createExtMemBuffer(*nBufferSizes, rlen, pg, pModel);
(*pMemBuffer)[i]->flushModel = MULTIPLE_APPEND_MODEL;
}
......
......@@ -2399,7 +2399,7 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS
}
int16_t resType = 0;
int16_t resBytes = 0;
int32_t resBytes = 0;
int32_t interBufSize = 0;
getResultDataInfo(pSchema->type, pSchema->bytes, f, 0, &resType, &resBytes, &interBufSize, 0, false, pUdfInfo);
......@@ -2638,7 +2638,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
}
int16_t resultType = 0;
int16_t resultSize = 0;
int32_t resultSize = 0;
int32_t intermediateResSize = 0;
if (getResultDataInfo(pSchema->type, pSchema->bytes, functionId, 0, &resultType, &resultSize,
......@@ -2897,7 +2897,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
tVariant* pVariant = &pParamElem[1].pNode->value;
int16_t resultType = pSchema->type;
int16_t resultSize = pSchema->bytes;
int32_t resultSize = pSchema->bytes;
int32_t interResult = 0;
char val[8] = {0};
......@@ -3079,7 +3079,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
s = pTagSchema[index.columnIndex];
}
int16_t bytes = 0;
int32_t bytes = 0;
int16_t type = 0;
int32_t inter = 0;
......@@ -3106,7 +3106,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
int32_t inter = 0;
int16_t resType = 0;
int16_t bytes = 0;
int32_t bytes = 0;
getResultDataInfo(TSDB_DATA_TYPE_INT, 4, TSDB_FUNC_BLKINFO, 0, &resType, &bytes, &inter, 0, 0, NULL);
......@@ -3159,7 +3159,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
int32_t inter = 0;
int16_t resType = 0;
int16_t bytes = 0;
int32_t bytes = 0;
getResultDataInfo(TSDB_DATA_TYPE_INT, 4, functionId, 0, &resType, &bytes, &inter, 0, false, pUdfInfo);
SExprInfo* pExpr = tscExprAppend(pQueryInfo, functionId, &index, resType, bytes, getNewResColId(pCmd), inter, false);
......@@ -3479,7 +3479,7 @@ int32_t tscTansformFuncForSTableQuery(SQueryInfo* pQueryInfo) {
assert(tscGetNumOfTags(pTableMetaInfo->pTableMeta) >= 0);
int16_t bytes = 0;
int32_t bytes = 0;
int16_t type = 0;
int32_t interBytes = 0;
......
......@@ -2000,7 +2000,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
colIndex.columnIndex = tscGetTagColIndexById(pTableMetaInfo->pTableMeta, tagColId);
int16_t bytes = 0;
int32_t bytes = 0;
int16_t type = 0;
int32_t inter = 0;
......@@ -2636,7 +2636,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
pRes->qId = 0x1; // hack the qhandle check
const uint32_t nBufferSize = (1u << 18u); // 256KB, default buffer size
uint32_t nBufferSize = (1u << 18u); // 256KB, default buffer size
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......@@ -2652,7 +2652,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
return ret;
}
ret = tscCreateGlobalMergerEnv(pQueryInfo, &pMemoryBuf, pSql->subState.numOfSub, &pDesc, nBufferSize, pSql->self);
ret = tscCreateGlobalMergerEnv(pQueryInfo, &pMemoryBuf, pSql->subState.numOfSub, &pDesc, &nBufferSize, pSql->self);
if (ret != 0) {
pRes->code = ret;
tscAsyncResultOnError(pSql);
......
......@@ -51,7 +51,7 @@ typedef struct SSqlExpr {
int16_t functionId; // function id in aAgg array
int16_t resType; // return value type
int16_t resBytes; // length of return value
int32_t resBytes; // length of return value
int32_t interBytes; // inter result buffer size
int16_t colType; // table column type
......
......@@ -438,7 +438,7 @@ typedef struct SColumnFilterList {
typedef struct SColumnInfo {
int16_t colId;
int16_t type;
int16_t bytes;
int32_t bytes;
SColumnFilterList flist;
} SColumnInfo;
......
......@@ -141,6 +141,8 @@ extern char configDir[];
{ TSDB_DATA_TYPE_FLOAT, TSDB_DATA_TYPE_INT, TSDB_DATA_TYPE_FLOAT }
#define DEFAULT_DATATYPE \
{ "FLOAT", "INT", "FLOAT" }
#define DEFAULT_DATALENGTH \
{ 4, 4, 4 }
#define DEFAULT_BINWIDTH 64
#define DEFAULT_COL_COUNT 4
#define DEFAULT_LEN_ONE_ROW 76
......@@ -306,6 +308,7 @@ typedef struct SArguments_S {
bool async_mode;
char data_type[MAX_NUM_COLUMNS + 1];
char * dataType[MAX_NUM_COLUMNS + 1];
int32_t data_length[MAX_NUM_COLUMNS + 1];
uint32_t binwidth;
uint32_t columnCount;
uint64_t lenOfOneRow;
......
......@@ -1338,9 +1338,10 @@ void setParaFromArg() {
g_args.prepared_rand = min(g_args.insertRows, MAX_PREPARED_RAND);
g_Dbs.aggr_func = g_args.aggr_func;
char dataString[TSDB_MAX_BYTES_PER_ROW];
char * data_type = g_args.data_type;
char **dataType = g_args.dataType;
char dataString[TSDB_MAX_BYTES_PER_ROW];
char * data_type = g_args.data_type;
char ** dataType = g_args.dataType;
int32_t *data_length = g_args.data_length;
memset(dataString, 0, TSDB_MAX_BYTES_PER_ROW);
......@@ -1469,6 +1470,47 @@ void setParaFromArg() {
} else {
g_Dbs.threadCountForCreateTbl = g_args.nthreads;
g_Dbs.db[0].superTbls[0].tagCount = 0;
for (int i = 0; i < MAX_NUM_COLUMNS; i++) {
if (data_type[i] == TSDB_DATA_TYPE_NULL) {
break;
}
if (1 == regexMatch(dataType[i],
"^(NCHAR|BINARY)(\\([1-9][0-9]*\\))$",
REG_ICASE | REG_EXTENDED)) {
sscanf(dataType[i], "%[^(](%[^)]", type, length);
data_length[i] = atoi(length);
} else {
switch (data_type[i]) {
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_TINYINT:
data_length[i] = sizeof(char);
break;
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_USMALLINT:
data_length[i] = sizeof(int16_t);
break;
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT:
data_length[i] = sizeof(int32_t);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
data_length[i] = sizeof(int64_t);
break;
case TSDB_DATA_TYPE_FLOAT:
data_length[i] = sizeof(float);
break;
case TSDB_DATA_TYPE_DOUBLE:
data_length[i] = sizeof(double);
break;
default:
data_length[i] = g_args.binwidth;
break;
}
}
}
}
}
......@@ -1698,8 +1740,15 @@ void *queryNtableAggrFunc(void *sarg) {
double totalT = 0;
uint64_t count = 0;
for (int64_t i = 0; i < ntables; i++) {
sprintf(command, "SELECT %s FROM %s%" PRId64 " WHERE ts>= %" PRIu64,
aggreFunc[j], tb_prefix, i, startTime);
if (g_args.escapeChar) {
sprintf(command,
"SELECT %s FROM `%s%" PRId64 "` WHERE ts>= %" PRIu64,
aggreFunc[j], tb_prefix, i, startTime);
} else {
sprintf(command,
"SELECT %s FROM %s%" PRId64 " WHERE ts>= %" PRIu64,
aggreFunc[j], tb_prefix, i, startTime);
}
double t = (double)taosGetTimestampUs();
debugPrint("%s() LN%d, sql command: %s\n", __func__, __LINE__,
......@@ -1708,9 +1757,9 @@ void *queryNtableAggrFunc(void *sarg) {
int32_t code = taos_errno(pSql);
if (code != 0) {
errorPrint("Failed to query:%s\n", taos_errstr(pSql));
errorPrint("Failed to query <%s>, reason:%s\n", command,
taos_errstr(pSql));
taos_free_result(pSql);
taos_close(taos);
fclose(fp);
free(command);
return NULL;
......
......@@ -868,8 +868,8 @@ int64_t generateStbRowData(SSuperTable *stbInfo, char *recBuf,
return strlen(recBuf);
}
static int64_t generateData(char *recBuf, char *data_type, int64_t timestamp,
int lenOfBinary) {
static int64_t generateData(char *recBuf, char *data_type, int32_t *data_length,
int64_t timestamp) {
memset(recBuf, 0, MAX_DATA_SIZE);
char *pstr = recBuf;
pstr += sprintf(pstr, "(%" PRId64 "", timestamp);
......@@ -915,13 +915,13 @@ static int64_t generateData(char *recBuf, char *data_type, int64_t timestamp,
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
s = calloc(1, lenOfBinary + 1);
s = calloc(1, data_length[i] + 1);
if (NULL == s) {
errorPrint("%s", "failed to allocate memory\n");
return -1;
}
rand_string(s, lenOfBinary);
rand_string(s, data_length[i]);
pstr += sprintf(pstr, ",\"%s\"", s);
free(s);
break;
......@@ -1156,20 +1156,18 @@ static int32_t generateDataTailWithoutStb(
int64_t retLen = 0;
char *data_type = g_args.data_type;
int lenOfBinary = g_args.binwidth;
char * data_type = g_args.data_type;
int32_t *data_length = g_args.data_length;
if (g_args.disorderRatio) {
retLen =
generateData(data, data_type,
generateData(data, data_type, data_length,
startTime + getTSRandTail(g_args.timestamp_step, k,
g_args.disorderRatio,
g_args.disorderRange),
lenOfBinary);
g_args.disorderRange));
} else {
retLen = generateData(data, data_type,
startTime + g_args.timestamp_step * k,
lenOfBinary);
retLen = generateData(data, data_type, data_length,
startTime + g_args.timestamp_step * k);
}
if (len > remainderBufLen) break;
......
......@@ -1043,10 +1043,8 @@ int createChildTables() {
// normal table
len = snprintf(tblColsBuf, TSDB_MAX_BYTES_PER_ROW, "(TS TIMESTAMP");
for (int j = 0; j < g_args.columnCount; j++) {
if ((strncasecmp(g_args.dataType[j], "BINARY",
strlen("BINARY")) == 0) ||
(strncasecmp(g_args.dataType[j], "NCHAR",
strlen("NCHAR")) == 0)) {
if ((strcasecmp(g_args.dataType[j], "BINARY") == 0) ||
(strcasecmp(g_args.dataType[j], "NCHAR") == 0)) {
snprintf(tblColsBuf + len, TSDB_MAX_BYTES_PER_ROW - len,
",C%d %s(%d)", j, g_args.dataType[j],
g_args.binwidth);
......
......@@ -45,6 +45,7 @@ SArguments g_args = {
DEFAULT_SYNC_MODE, // mode : sync or async
DEFAULT_DATA_TYPE, // data_type
DEFAULT_DATATYPE, // dataType
DEFAULT_DATALENGTH, // data_length
DEFAULT_BINWIDTH, // binwidth
DEFAULT_COL_COUNT, // columnCount, timestamp + float + int + float
DEFAULT_LEN_ONE_ROW, // lenOfOneRow
......
......@@ -183,7 +183,7 @@ typedef struct SQLFunctionCtx {
int16_t inputBytes;
int16_t outputType;
int16_t outputBytes; // size of results, determined by function and input column data type
int32_t outputBytes; // size of results, determined by function and input column data type
int32_t interBufBytes; // internal buffer size
bool hasNull; // null value exist in current block
bool requireNull; // require null in some function
......@@ -227,7 +227,7 @@ typedef struct SAggFunctionInfo {
#define GET_RES_INFO(ctx) ((ctx)->resultInfo)
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
int16_t *len, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo);
int32_t *len, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo);
int32_t isValidFunction(const char* name, int32_t len);
#define IS_STREAM_QUERY_VALID(x) (((x)&TSDB_FUNCSTATE_STREAM) != 0)
......
......@@ -75,8 +75,15 @@ typedef struct tFilePagesItem {
tFilePage item;
} tFilePagesItem;
typedef struct SSchema1 {
uint8_t type;
char name[TSDB_COL_NAME_LEN];
int16_t colId;
int32_t bytes;
} SSchema1;
typedef struct SSchemaEx {
struct SSchema field;
SSchema1 field;
int32_t offset;
} SSchemaEx;
......@@ -178,7 +185,7 @@ bool tExtMemBufferIsAllDataInMem(tExtMemBuffer *pMemBuffer);
* @param blockCapacity
* @return
*/
SColumnModel *createColumnModel(SSchema *fields, int32_t numOfCols, int32_t blockCapacity);
SColumnModel *createColumnModel(SSchema1 *fields, int32_t numOfCols, int32_t blockCapacity);
/**
*
......@@ -199,7 +206,7 @@ void destroyColumnModel(SColumnModel *pModel);
void tColModelCompact(SColumnModel *pModel, tFilePage *inputBuffer, int32_t maxElemsCapacity);
void tColModelErase(SColumnModel *pModel, tFilePage *inputBuffer, int32_t maxCapacity, int32_t s, int32_t e);
SSchema *getColumnModelSchema(SColumnModel *pColumnModel, int32_t index);
SSchema1 *getColumnModelSchema(SColumnModel *pColumnModel, int32_t index);
int16_t getColumnModelOffset(SColumnModel *pColumnModel, int32_t index);
......
......@@ -197,7 +197,7 @@ typedef struct {
} SSampleFuncInfo;
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
int16_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo) {
int32_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo) {
if (!isValidDataType(dataType)) {
qError("Illegal data type %d or data type length %d", dataType, dataBytes);
return TSDB_CODE_TSC_INVALID_OPERATION;
......@@ -210,7 +210,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
functionId == TSDB_FUNC_FLOOR || functionId == TSDB_FUNC_ROUND)
{
*type = (int16_t)dataType;
*bytes = (int16_t)dataBytes;
*bytes = dataBytes;
if (functionId == TSDB_FUNC_INTERP) {
*interBytes = sizeof(SInterpInfoDetail);
......@@ -224,7 +224,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
// (uid, tid) + VGID + TAGSIZE + VARSTR_HEADER_SIZE
if (functionId == TSDB_FUNC_TID_TAG) { // todo use struct
*type = TSDB_DATA_TYPE_BINARY;
*bytes = (int16_t)(dataBytes + sizeof(int16_t) + sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t) + VARSTR_HEADER_SIZE);
*bytes = (dataBytes + sizeof(int16_t) + sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t) + VARSTR_HEADER_SIZE);
*interBytes = 0;
return TSDB_CODE_SUCCESS;
}
......@@ -302,7 +302,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
if (functionId == TSDB_FUNC_MIN || functionId == TSDB_FUNC_MAX) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = (int16_t)(dataBytes + DATA_SET_FLAG_SIZE);
*bytes = (dataBytes + DATA_SET_FLAG_SIZE);
*interBytes = *bytes;
return TSDB_CODE_SUCCESS;
......@@ -325,13 +325,13 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = (int16_t)(sizeof(STopBotInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param);
*bytes = (sizeof(STopBotInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param);
*interBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_SAMPLE) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = (int16_t)(sizeof(SSampleFuncInfo) + dataBytes*param + sizeof(int64_t)*param + extLength*param);
*bytes = (sizeof(SSampleFuncInfo) + dataBytes*param + sizeof(int64_t)*param + extLength*param);
*interBytes = *bytes;
return TSDB_CODE_SUCCESS;
......@@ -344,14 +344,14 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
} else if (functionId == TSDB_FUNC_APERCT) {
*type = TSDB_DATA_TYPE_BINARY;
int16_t bytesHist = sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1) + sizeof(SHistogramInfo) + sizeof(SAPercentileInfo);
int16_t bytesDigest = (int16_t)(sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION));
int32_t bytesDigest = (int32_t) (sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION));
*bytes = MAX(bytesHist, bytesDigest);
*interBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_LAST_ROW) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = (int16_t)(sizeof(SLastrowInfo) + dataBytes);
*bytes = (sizeof(SLastrowInfo) + dataBytes);
*interBytes = *bytes;
return TSDB_CODE_SUCCESS;
......@@ -379,7 +379,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*type = TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(double);
int16_t bytesHist = sizeof(SAPercentileInfo) + sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1);
int16_t bytesDigest = (int16_t)(sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION));
int32_t bytesDigest = (int32_t) (sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION));
*interBytes = MAX(bytesHist, bytesDigest);
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_TWA) {
......@@ -416,31 +416,31 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*interBytes = sizeof(SStddevInfo);
} else if (functionId == TSDB_FUNC_MIN || functionId == TSDB_FUNC_MAX) {
*type = (int16_t)dataType;
*bytes = (int16_t)dataBytes;
*bytes = dataBytes;
*interBytes = dataBytes + DATA_SET_FLAG_SIZE;
} else if (functionId == TSDB_FUNC_FIRST || functionId == TSDB_FUNC_LAST) {
*type = (int16_t)dataType;
*bytes = (int16_t)dataBytes;
*interBytes = (int16_t)(dataBytes + sizeof(SFirstLastInfo));
*bytes = dataBytes;
*interBytes = (dataBytes + sizeof(SFirstLastInfo));
} else if (functionId == TSDB_FUNC_SPREAD) {
*type = (int16_t)TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(double);
*interBytes = sizeof(SSpreadInfo);
} else if (functionId == TSDB_FUNC_PERCT) {
*type = (int16_t)TSDB_DATA_TYPE_DOUBLE;
*bytes = (int16_t)sizeof(double);
*interBytes = (int16_t)sizeof(SPercentileInfo);
*bytes = sizeof(double);
*interBytes = sizeof(SPercentileInfo);
} else if (functionId == TSDB_FUNC_LEASTSQR) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = MAX(TSDB_AVG_FUNCTION_INTER_BUFFER_SIZE, sizeof(SLeastsquaresInfo)); // string
*interBytes = *bytes;
} else if (functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_LAST_DST) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = (int16_t)(dataBytes + sizeof(SFirstLastInfo));
*bytes = (dataBytes + sizeof(SFirstLastInfo));
*interBytes = *bytes;
} else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
*type = (int16_t)dataType;
*bytes = (int16_t)dataBytes;
*bytes = dataBytes;
size_t size = sizeof(STopBotInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param;
......@@ -448,12 +448,12 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*interBytes = (int32_t)size;
} else if (functionId == TSDB_FUNC_SAMPLE) {
*type = (int16_t)dataType;
*bytes = (int16_t)dataBytes;
*bytes = dataBytes;
size_t size = sizeof(SSampleFuncInfo) + dataBytes*param + sizeof(int64_t)*param + extLength*param;
*interBytes = (int32_t)size;
} else if (functionId == TSDB_FUNC_LAST_ROW) {
*type = (int16_t)dataType;
*bytes = (int16_t)dataBytes;
*bytes = dataBytes;
*interBytes = dataBytes;
} else if (functionId == TSDB_FUNC_STDDEV_DST) {
*type = TSDB_DATA_TYPE_BINARY;
......
......@@ -2639,6 +2639,14 @@ static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, i
while(((*rowsize) * MIN_ROWS_PER_PAGE) > (*ps) - overhead) {
*ps = ((*ps) << 1u);
}
if (*ps > 5 * 1024 * 1024) {
MIN_ROWS_PER_PAGE = 2;
*ps = DEFAULT_INTERN_BUF_PAGE_SIZE;
while(((*rowsize) * MIN_ROWS_PER_PAGE) > (*ps) - overhead) {
*ps = ((*ps) << 1u);
}
}
}
#define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_BINARY && (_t) != TSDB_DATA_TYPE_NCHAR)
......@@ -4792,8 +4800,8 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
int32_t ps = DEFAULT_PAGE_SIZE;
getIntermediateBufInfo(pRuntimeEnv, &ps, &pQueryAttr->intermediateResultRowSize);
int32_t TENMB = 1024*1024*10;
int32_t code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, ps, TENMB, pQInfo->qId);
int32_t TWENTYMB = 1024*1024*20;
int32_t code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, ps, TWENTYMB, pQInfo->qId);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......
......@@ -521,7 +521,7 @@ static void swap(SColumnModel *pColumnModel, int32_t count, int32_t s1, char *da
void *first = COLMODEL_GET_VAL(data1, pColumnModel, count, s1, i);
void *second = COLMODEL_GET_VAL(data1, pColumnModel, count, s2, i);
SSchema* pSchema = &pColumnModel->pFields[i].field;
SSchema1* pSchema = &pColumnModel->pFields[i].field;
tsDataSwap(first, second, pSchema->type, pSchema->bytes, buf);
}
}
......@@ -750,7 +750,7 @@ void tColDataQSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta
size_t width = 0;
for(int32_t i = 0; i < pModel->numOfCols; ++i) {
SSchema* pSchema = &pModel->pFields[i].field;
SSchema1* pSchema = &pModel->pFields[i].field;
if (width < pSchema->bytes) {
width = pSchema->bytes;
}
......@@ -771,7 +771,7 @@ void tColDataQSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta
/*
* deep copy of sschema
*/
SColumnModel *createColumnModel(SSchema *fields, int32_t numOfCols, int32_t blockCapacity) {
SColumnModel *createColumnModel(SSchema1 *fields, int32_t numOfCols, int32_t blockCapacity) {
SColumnModel *pColumnModel = (SColumnModel *)calloc(1, sizeof(SColumnModel) + numOfCols * sizeof(SSchemaEx));
if (pColumnModel == NULL) {
return NULL;
......@@ -1023,7 +1023,7 @@ void tColModelCompact(SColumnModel *pModel, tFilePage *inputBuffer, int32_t maxE
}
}
SSchema* getColumnModelSchema(SColumnModel *pColumnModel, int32_t index) {
SSchema1* getColumnModelSchema(SColumnModel *pColumnModel, int32_t index) {
assert(pColumnModel != NULL && index >= 0 && index < pColumnModel->numOfCols);
return &pColumnModel->pFields[index].field;
}
......@@ -1045,7 +1045,7 @@ void tColModelErase(SColumnModel *pModel, tFilePage *inputBuffer, int32_t blockC
/* start from the second column */
for (int32_t i = 0; i < pModel->numOfCols; ++i) {
int16_t offset = getColumnModelOffset(pModel, i);
SSchema* pSchema = getColumnModelSchema(pModel, i);
SSchema1* pSchema = getColumnModelSchema(pModel, i);
char *startPos = inputBuffer->data + offset * blockCapacity + s * pSchema->bytes;
char *endPos = startPos + pSchema->bytes * removed;
......
......@@ -97,7 +97,7 @@ TEST(testCase, colunmnwise_sort_test) {
}
TEST(testCase, columnsort_test) {
SSchema field[1] = {
SSchema1 field[1] = {
{TSDB_DATA_TYPE_INT, "k", sizeof(int32_t)},
};
......
......@@ -192,7 +192,7 @@ void largeDataTest() {
void qsortTest() {
printf("running : %s\n", __FUNCTION__);
SSchema field[1] = {
SSchema1 field[1] = {
{TSDB_DATA_TYPE_INT, "k", sizeof(int32_t)},
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册