提交 b411156e 编写于 作者: H hzcheng

Merge branch 'develop' into feature/2.0tsdb

......@@ -48,7 +48,7 @@ ELSEIF (TD_WINDOWS_64)
IF (NOT TD_GODLL)
SET_TARGET_PROPERTIES(taos PROPERTIES LINK_FLAGS /DEF:${TD_COMMUNITY_DIR}/src/client/src/taos.def)
ENDIF ()
TARGET_LINK_LIBRARIES(taos trpc)
TARGET_LINK_LIBRARIES(taos trpc tutil)
ELSEIF (TD_DARWIN_64)
SET(CMAKE_MACOSX_RPATH 1)
......
......@@ -120,7 +120,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
void tscDestroyLocalReducer(SSqlObj *pSql);
int32_t tscDoLocalreduce(SSqlObj *pSql);
int32_t tscDoLocalMerge(SSqlObj *pSql);
#ifdef __cplusplus
}
......
......@@ -29,8 +29,8 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql);
int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql);
void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code);
SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index);
void tscDestroyJoinSupporter(SJoinSubquerySupporter* pSupporter);
SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index);
void tscDestroyJoinSupporter(SJoinSupporter* pSupporter);
int32_t tscHandleMasterJoinQuery(SSqlObj* pSql);
......@@ -38,6 +38,9 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql);
int32_t tscHandleMultivnodeInsert(SSqlObj *pSql);
void tscBuildResFromSubqueries(SSqlObj *pSql);
void **doSetResultRowData(SSqlObj *pSql, bool finalResult);
#ifdef __cplusplus
}
#endif
......
......@@ -51,22 +51,38 @@ typedef struct SParsedDataColInfo {
bool hasVal[TSDB_MAX_COLUMNS];
} SParsedDataColInfo;
typedef struct SJoinSubquerySupporter {
typedef struct STidTags {
int64_t uid;
int32_t tid;
int32_t vgId;
char tag[];
} STidTags;
typedef struct SJoinSupporter {
SSubqueryState* pState;
SSqlObj* pObj; // parent SqlObj
int32_t subqueryIndex; // index of sub query
int64_t interval; // interval time
SLimitVal limit; // limit info
uint64_t uid; // query meter uid
SArray* colList; // previous query information
SArray* exprsInfo;
SArray* colList; // previous query information, no need to use this attribute, and the corresponding attribution
SArray* exprList;
SFieldInfo fieldsInfo;
STagCond tagCond;
SSqlGroupbyExpr groupbyExpr;
struct STSBuf* pTSBuf; // the TSBuf struct that holds the compressed timestamp array
FILE* f; // temporary file in order to create TSBuf
char path[PATH_MAX]; // temporary file path
} SJoinSubquerySupporter;
char path[PATH_MAX]; // temporary file path, todo dynamic allocate memory
int32_t tagSize; // the length of each in the first filter stage
char* pIdTagList; // result of first stage tags
int32_t totalLen;
int32_t num;
} SJoinSupporter;
typedef struct SVgroupTableInfo {
SCMVgroupInfo vgInfo;
SArray* itemList; //SArray<STableIdInfo>
} SVgroupTableInfo;
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name,
STableMeta* pTableMeta, STableDataBlocks** dataBlocks);
......@@ -87,7 +103,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList,
int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta,
STableDataBlocks** dataBlocks);
UNUSED_FUNC STableIdInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx);
//UNUSED_FUNC STableIdInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx);
/**
*
......@@ -120,7 +136,7 @@ void addRequiredTagColumn(STableMetaInfo* pTableMetaInfo, SColumnIndex* index);
int32_t tscSetTableId(STableMetaInfo* pTableMetaInfo, SSQLToken* pzTableName, SSqlObj* pSql);
void tscClearInterpInfo(SQueryInfo* pQueryInfo);
bool tscIsInsertOrImportData(char* sqlstr);
bool tscIsInsertData(char* sqlstr);
/* use for keep current db info temporarily, for handle table with db prefix */
// todo remove it
......@@ -160,7 +176,7 @@ SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functi
int32_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo);
SSqlExpr* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index);
SArray* tscSqlExprCopy(const SArray* src, uint64_t uid, bool deepcopy);
void tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy);
void tscSqlExprInfoDestroy(SArray* pExprInfo);
SColumn* tscColumnClone(const SColumn* src);
......@@ -190,7 +206,6 @@ bool tscShouldFreeHeatBeat(SSqlObj* pHb);
void tscCleanSqlCmd(SSqlCmd* pCmd);
bool tscShouldBeFreed(SSqlObj* pSql);
void tscClearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache);
STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd *pCmd, int32_t subClauseIndex, int32_t tableIndex);
STableMetaInfo* tscGetMetaInfo(SQueryInfo *pQueryInfo, int32_t tableIndex);
......@@ -204,7 +219,10 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo *pQueryInfo);
int32_t tscAddSubqueryInfo(SSqlCmd *pCmd);
void tscFreeQueryInfo(SSqlCmd* pCmd);
void tscInitQueryInfo(SQueryInfo* pQueryInfo);
void tscClearSubqueryInfo(SSqlCmd* pCmd);
int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex);
......
......@@ -45,8 +45,6 @@ enum {
DATA_FROM_DATA_FILE = 2,
};
typedef SCMSTableVgroupRspMsg SVgroupsInfo;
typedef struct STableComInfo {
uint8_t numOfTags;
uint8_t precision;
......@@ -69,12 +67,13 @@ typedef struct STableMeta {
} STableMeta;
typedef struct STableMetaInfo {
STableMeta * pTableMeta; // table meta, cached in client side and acquried by name
STableMeta * pTableMeta; // table meta, cached in client side and acquired by name
SVgroupsInfo *vgroupList;
SArray *pVgroupTables; // SArray<SVgroupTableInfo>
/*
* 1. keep the vnode index during the multi-vnode super table projection query
* 2. keep the vnode index for multi-vnode insertion
* 1. keep the vgroup index during the multi-vnode super table projection query
* 2. keep the vgroup index for multi-vnode insertion
*/
int32_t vgroupIndex;
char name[TSDB_TABLE_ID_LEN]; // (super) table name
......@@ -102,7 +101,7 @@ typedef struct SColumnIndex {
typedef struct SFieldSupInfo {
bool visible;
SArithExprInfo *pArithExprInfo;
SExprInfo *pArithExprInfo;
SSqlExpr * pSqlExpr;
} SFieldSupInfo;
......@@ -206,7 +205,7 @@ typedef struct SQueryInfo {
SArray * colList; // SArray<SColumn*>
SFieldInfo fieldsInfo;
SArray * exprsInfo; // SArray<SSqlExpr*>
SArray * exprList; // SArray<SSqlExpr*>
SLimitVal limit;
SLimitVal slimit;
STagCond tagCond;
......@@ -382,7 +381,6 @@ int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscDestroyResPointerInfo(SSqlRes *pRes);
void tscResetSqlCmdObj(SSqlCmd *pCmd);
void tscFreeResData(SSqlObj *pSql);
/**
* free query result of the sql object
......@@ -395,7 +393,7 @@ void tscFreeSqlResult(SSqlObj *pSql);
* Note: this function is multi-thread safe.
* @param pObj
*/
void tscFreeSqlObjPartial(SSqlObj *pObj);
void tscPartiallyFreeSqlObj(SSqlObj *pObj);
/**
* free sql object, release allocated resource
......@@ -423,7 +421,7 @@ int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *s
void tscQueueAsyncFreeResult(SSqlObj *pSql);
int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
char * tscGetResultColumnChr(SSqlRes *pRes, SQueryInfo *pQueryInfo, int32_t column);
char * tscGetResultColumnChr(SSqlRes *pRes, SQueryInfo *pQueryInfo, int32_t column, int16_t bytes);
extern void * pVnodeConn;
extern void * tscCacheHandle;
......
......@@ -14,17 +14,18 @@
*/
#include "os.h"
#include "tutil.h"
#include "tnote.h"
#include "trpc.h"
#include "tscLog.h"
#include "tscProfile.h"
#include "tscSubquery.h"
#include "tscSecondaryMerge.h"
#include "tscUtil.h"
#include "tsclient.h"
#include "tsocket.h"
#include "tutil.h"
#include "tnote.h"
#include "tsched.h"
#include "tschemautil.h"
#include "tsclient.h"
static void tscProcessFetchRow(SSchedMsg *pMsg);
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
......@@ -44,9 +45,9 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
SSqlRes *pRes = &pSql->res;
pSql->signature = pSql;
pSql->param = param;
pSql->pTscObj = pObj;
pSql->maxRetry = TSDB_MAX_REPLICA_NUM;
pSql->param = param;
pSql->pTscObj = pObj;
pSql->maxRetry = TSDB_MAX_REPLICA_NUM;
pSql->fp = fp;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
......@@ -145,7 +146,7 @@ static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) {
}
// local reducer has handle this situation during super table non-projection query.
if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC) {
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE) {
pRes->numOfTotalInCurrentClause += pRes->numOfRows;
}
......@@ -175,7 +176,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
}
pSql->fp = fp;
if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC && pCmd->command < TSDB_SQL_LOCAL) {
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
}
tscProcessSql(pSql);
......@@ -219,12 +220,17 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
pSql->param = param;
tscResetForNextRetrieve(pRes);
if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC && pCmd->command < TSDB_SQL_LOCAL) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
// handle the sub queries of join query
if (pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE) {
tscFetchDatablockFromSubquery(pSql);
} else {
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
}
tscProcessSql(pSql);
}
tscProcessSql(pSql);
}
void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW), void *param) {
......@@ -251,7 +257,7 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW),
tscResetForNextRetrieve(pRes);
pSql->fp = tscAsyncFetchSingleRowProxy;
if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC && pCmd->command < TSDB_SQL_LOCAL) {
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
}
......@@ -311,7 +317,7 @@ void tscProcessFetchRow(SSchedMsg *pMsg) {
SFieldSupInfo* pSup = taosArrayGet(pQueryInfo->fieldsInfo.pSupportInfo, i);
if (pSup->pSqlExpr != NULL) {
pRes->tsrow[i] = tscGetResultColumnChr(pRes, pQueryInfo, i);
pRes->tsrow[i] = tscGetResultColumnChr(pRes, pQueryInfo, i, pSup->pSqlExpr->resBytes);
} else {
// todo add
}
......
......@@ -153,7 +153,7 @@ typedef struct SRateInfo {
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
int16_t *bytes, int16_t *intermediateResBytes, int16_t extLength, bool isSuperTable) {
int16_t *bytes, int16_t *interResBytes, int16_t extLength, bool isSuperTable) {
if (!isValidDataType(dataType, dataBytes)) {
tscError("Illegal data type %d or data type length %d", dataType, dataBytes);
return TSDB_CODE_INVALID_SQL;
......@@ -164,28 +164,35 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_INTERP) {
*type = (int16_t)dataType;
*bytes = (int16_t)dataBytes;
*intermediateResBytes = *bytes + sizeof(SResultInfo);
*interResBytes = *bytes + sizeof(SResultInfo);
return TSDB_CODE_SUCCESS;
}
if (functionId == TSDB_FUNC_TID_TAG) { // todo use struct
*type = TSDB_DATA_TYPE_BINARY;
*bytes = dataBytes + sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t); // (uid, tid) + VGID + TAGSIZE
*interResBytes = *bytes;
return TSDB_CODE_SUCCESS;
}
if (functionId == TSDB_FUNC_COUNT) {
*type = TSDB_DATA_TYPE_BIGINT;
*bytes = sizeof(int64_t);
*intermediateResBytes = *bytes;
*interResBytes = *bytes;
return TSDB_CODE_SUCCESS;
}
if (functionId == TSDB_FUNC_ARITHM) {
*type = TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(double);
*intermediateResBytes = *bytes;
*interResBytes = *bytes;
return TSDB_CODE_SUCCESS;
}
if (functionId == TSDB_FUNC_TS_COMP) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = sizeof(int32_t); // this results is compressed ts data
*intermediateResBytes = POINTER_BYTES;
*interResBytes = POINTER_BYTES;
return TSDB_CODE_SUCCESS;
}
......@@ -193,54 +200,54 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
if (functionId == TSDB_FUNC_MIN || functionId == TSDB_FUNC_MAX) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = dataBytes + DATA_SET_FLAG_SIZE;
*intermediateResBytes = *bytes;
*interResBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_SUM) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = sizeof(SSumInfo);
*intermediateResBytes = *bytes;
*interResBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_AVG) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = sizeof(SAvgInfo);
*intermediateResBytes = *bytes;
*interResBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE) {
*type = TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(SRateInfo);
*intermediateResBytes = sizeof(SRateInfo);
*interResBytes = sizeof(SRateInfo);
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = sizeof(STopBotInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param;
*intermediateResBytes = *bytes;
*interResBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_SPREAD) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = sizeof(SSpreadInfo);
*intermediateResBytes = *bytes;
*interResBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_APERCT) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1) + sizeof(SHistogramInfo) + sizeof(SAPercentileInfo);
*intermediateResBytes = *bytes;
*interResBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_LAST_ROW) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = sizeof(SLastrowInfo) + dataBytes;
*intermediateResBytes = *bytes;
*interResBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_TWA) {
*type = TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(STwaInfo);
*intermediateResBytes = *bytes;
*interResBytes = *bytes;
return TSDB_CODE_SUCCESS;
}
}
......@@ -253,57 +260,57 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
}
*bytes = sizeof(int64_t);
*intermediateResBytes = sizeof(SSumInfo);
*interResBytes = sizeof(SSumInfo);
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_APERCT) {
*type = TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(double);
*intermediateResBytes =
*interResBytes =
sizeof(SAPercentileInfo) + sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1);
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_TWA) {
*type = TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(double);
*intermediateResBytes = sizeof(STwaInfo);
*interResBytes = sizeof(STwaInfo);
return TSDB_CODE_SUCCESS;
}
if (functionId == TSDB_FUNC_AVG) {
*type = TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(double);
*intermediateResBytes = sizeof(SAvgInfo);
*interResBytes = sizeof(SAvgInfo);
} else if (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE) {
*type = TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(double);
*intermediateResBytes = sizeof(SRateInfo);
*interResBytes = sizeof(SRateInfo);
} else if (functionId == TSDB_FUNC_STDDEV) {
*type = TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(double);
*intermediateResBytes = sizeof(SStddevInfo);
*interResBytes = sizeof(SStddevInfo);
} else if (functionId == TSDB_FUNC_MIN || functionId == TSDB_FUNC_MAX) {
*type = (int16_t)dataType;
*bytes = (int16_t)dataBytes;
*intermediateResBytes = dataBytes + DATA_SET_FLAG_SIZE;
*interResBytes = dataBytes + DATA_SET_FLAG_SIZE;
} else if (functionId == TSDB_FUNC_FIRST || functionId == TSDB_FUNC_LAST) {
*type = (int16_t)dataType;
*bytes = (int16_t)dataBytes;
*intermediateResBytes = dataBytes + sizeof(SResultInfo);
*interResBytes = dataBytes + sizeof(SResultInfo);
} else if (functionId == TSDB_FUNC_SPREAD) {
*type = (int16_t)TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(double);
*intermediateResBytes = sizeof(SSpreadInfo);
*interResBytes = sizeof(SSpreadInfo);
} else if (functionId == TSDB_FUNC_PERCT) {
*type = (int16_t)TSDB_DATA_TYPE_DOUBLE;
*bytes = (int16_t)sizeof(double);
*intermediateResBytes = (int16_t)sizeof(double);
*interResBytes = (int16_t)sizeof(double);
} else if (functionId == TSDB_FUNC_LEASTSQR) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = TSDB_AVG_FUNCTION_INTER_BUFFER_SIZE; // string
*intermediateResBytes = *bytes + sizeof(SResultInfo);
*interResBytes = *bytes + sizeof(SResultInfo);
} else if (functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_LAST_DST) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = dataBytes + sizeof(SFirstLastInfo);
*intermediateResBytes = *bytes;
*interResBytes = *bytes;
} else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
*type = (int16_t)dataType;
*bytes = (int16_t)dataBytes;
......@@ -311,11 +318,11 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
size_t size = sizeof(STopBotInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param;
// the output column may be larger than sizeof(STopBotInfo)
*intermediateResBytes = size;
*interResBytes = size;
} else if (functionId == TSDB_FUNC_LAST_ROW) {
*type = (int16_t)dataType;
*bytes = (int16_t)dataBytes;
*intermediateResBytes = dataBytes + sizeof(SLastrowInfo);
*interResBytes = dataBytes + sizeof(SLastrowInfo);
} else {
return TSDB_CODE_INVALID_SQL;
}
......@@ -1837,6 +1844,7 @@ static void last_row_function(SQLFunctionCtx *pCtx) {
assignVal(pCtx->aOutputBuf, pData, pCtx->inputBytes, pCtx->inputType);
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
pResInfo->hasResult = DATA_SET_FLAG;
SLastrowInfo *pInfo = (SLastrowInfo *)pResInfo->interResultBuf;
pInfo->ts = pCtx->param[0].i64Key;
......@@ -1856,14 +1864,17 @@ static void last_row_function(SQLFunctionCtx *pCtx) {
static void last_row_finalizer(SQLFunctionCtx *pCtx) {
// do nothing at the first stage
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
if (pCtx->currentStage == SECONDARY_STAGE_MERGE) {
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
if (pResInfo->hasResult != DATA_SET_FLAG) {
setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
return;
}
} else {
// do nothing
if (pResInfo->hasResult != DATA_SET_FLAG) {
setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
return;
}
}
GET_RES_INFO(pCtx)->numOfRes = 1;
......@@ -2954,14 +2965,28 @@ static void tag_project_function(SQLFunctionCtx *pCtx) {
assert(pCtx->inputBytes == pCtx->outputBytes);
for (int32_t i = 0; i < pCtx->size; ++i) {
tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->outputType);
char* output = pCtx->aOutputBuf;
if (pCtx->tag.nType == TSDB_DATA_TYPE_BINARY || pCtx->tag.nType == TSDB_DATA_TYPE_NCHAR) {
*(int16_t*) output = pCtx->tag.nLen;
output += VARSTR_HEADER_SIZE;
}
tVariantDump(&pCtx->tag, output, pCtx->outputType);
pCtx->aOutputBuf += pCtx->outputBytes;
}
}
static void tag_project_function_f(SQLFunctionCtx *pCtx, int32_t index) {
INC_INIT_VAL(pCtx, 1);
tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->tag.nType);
char* output = pCtx->aOutputBuf;
if (pCtx->tag.nType == TSDB_DATA_TYPE_BINARY || pCtx->tag.nType == TSDB_DATA_TYPE_NCHAR) {
*(int16_t*) output = pCtx->tag.nLen;
output += VARSTR_HEADER_SIZE;
}
tVariantDump(&pCtx->tag, output, pCtx->tag.nType);
pCtx->aOutputBuf += pCtx->outputBytes;
}
......@@ -2974,12 +2999,30 @@ static void tag_project_function_f(SQLFunctionCtx *pCtx, int32_t index) {
*/
static void tag_function(SQLFunctionCtx *pCtx) {
SET_VAL(pCtx, 1, 1);
tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->tag.nType);
char* output = pCtx->aOutputBuf;
// todo refactor to dump length presented string(var string)
if (pCtx->tag.nType == TSDB_DATA_TYPE_BINARY || pCtx->tag.nType == TSDB_DATA_TYPE_NCHAR) {
*(int16_t*) output = pCtx->tag.nLen;
output += VARSTR_HEADER_SIZE;
}
tVariantDump(&pCtx->tag, output, pCtx->tag.nType);
}
static void tag_function_f(SQLFunctionCtx *pCtx, int32_t index) {
SET_VAL(pCtx, 1, 1);
tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->tag.nType);
char* output = pCtx->aOutputBuf;
// todo refactor to dump length presented string(var string)
if (pCtx->tag.nType == TSDB_DATA_TYPE_BINARY || pCtx->tag.nType == TSDB_DATA_TYPE_NCHAR) {
*(int16_t*) output = pCtx->tag.nLen;
output += VARSTR_HEADER_SIZE;
}
tVariantDump(&pCtx->tag, output, pCtx->tag.nType);
}
static void copy_function(SQLFunctionCtx *pCtx) {
......@@ -4836,7 +4879,7 @@ SQLAggFuncElem aAggs[] = {{
"apercentile",
TSDB_FUNC_APERCT,
TSDB_FUNC_APERCT,
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_METRIC,
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_STABLE,
apercentile_function_setup,
apercentile_function,
apercentile_function_f,
......@@ -4881,7 +4924,7 @@ SQLAggFuncElem aAggs[] = {{
"last_row",
TSDB_FUNC_LAST_ROW,
TSDB_FUNC_LAST_ROW,
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_NEED_TS |
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS |
TSDB_FUNCSTATE_SELECTIVITY,
first_last_function_setup,
last_row_function,
......@@ -4897,7 +4940,7 @@ SQLAggFuncElem aAggs[] = {{
"top",
TSDB_FUNC_TOP,
TSDB_FUNC_TOP,
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_NEED_TS |
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_NEED_TS |
TSDB_FUNCSTATE_SELECTIVITY,
top_bottom_function_setup,
top_function,
......@@ -4913,7 +4956,7 @@ SQLAggFuncElem aAggs[] = {{
"bottom",
TSDB_FUNC_BOTTOM,
TSDB_FUNC_BOTTOM,
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_NEED_TS |
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_NEED_TS |
TSDB_FUNCSTATE_SELECTIVITY,
top_bottom_function_setup,
bottom_function,
......@@ -5079,7 +5122,7 @@ SQLAggFuncElem aAggs[] = {{
"arithmetic",
TSDB_FUNC_ARITHM,
TSDB_FUNC_ARITHM,
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_NEED_TS,
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS,
function_setup,
arithmetic_function,
arithmetic_function_f,
......@@ -5140,7 +5183,7 @@ SQLAggFuncElem aAggs[] = {{
"interp",
TSDB_FUNC_INTERP,
TSDB_FUNC_INTERP,
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_NEED_TS,
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_OF | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS,
function_setup,
interp_function,
do_sum_f, // todo filter handle
......@@ -5239,4 +5282,19 @@ SQLAggFuncElem aAggs[] = {{
sumrate_func_merge,
sumrate_func_second_merge,
data_req_load_info,
},
{
// 34
"tid_tag", // return table id and the corresponding tags for join match
TSDB_FUNC_TID_TAG,
TSDB_FUNC_TID_TAG,
TSDB_FUNCSTATE_MO,
function_setup,
noop1,
noop2,
no_next_step,
noop1,
noop1,
noop1,
data_req_load_info,
}};
......@@ -131,17 +131,23 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
for (int32_t i = 0; i < numOfRows; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 0);
strncpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, 0) * totalNumOfRows + pField->bytes * i, pSchema[i].name,
TSDB_COL_NAME_LEN);
char* dst = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 0) * totalNumOfRows + pField->bytes * i;
STR_WITH_MAXSIZE_TO_VARSTR(dst, pSchema[i].name, TSDB_COL_NAME_LEN);
char *type = tDataTypeDesc[pSchema[i].type].aName;
pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 1);
strncpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, 1) * totalNumOfRows + pField->bytes * i, type, pField->bytes);
dst = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 1) * totalNumOfRows + pField->bytes * i;
STR_TO_VARSTR(dst, type);
int32_t bytes = pSchema[i].bytes;
if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
bytes = bytes / TSDB_NCHAR_SIZE;
if (pSchema[i].type == TSDB_DATA_TYPE_BINARY || pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
bytes -= VARSTR_HEADER_SIZE;
if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
bytes = bytes / TSDB_NCHAR_SIZE;
}
}
pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 2);
......@@ -149,8 +155,8 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 3);
if (i >= tscGetNumOfColumns(pMeta) && tscGetNumOfTags(pMeta) != 0) {
strncpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, 3) * totalNumOfRows + pField->bytes * i, "tag",
strlen("tag") + 1);
char* output = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 3) * totalNumOfRows + pField->bytes * i;
STR_WITH_SIZE_TO_VARSTR(output, "TAG", 3);
}
}
......@@ -163,13 +169,15 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
for (int32_t i = numOfRows; i < totalNumOfRows; ++i) {
// field name
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 0);
strncpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, 0) * totalNumOfRows + pField->bytes * i, pSchema[i].name,
TSDB_COL_NAME_LEN);
char* output = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 0) * totalNumOfRows + pField->bytes * i;
STR_WITH_MAXSIZE_TO_VARSTR(output, pSchema[i].name, TSDB_COL_NAME_LEN);
// type name
pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 1);
char *type = tDataTypeDesc[pSchema[i].type].aName;
strncpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, 1) * totalNumOfRows + pField->bytes * i, type, pField->bytes);
output = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 1) * totalNumOfRows + pField->bytes * i;
STR_WITH_MAXSIZE_TO_VARSTR(output, type, pField->bytes);
// type length
int32_t bytes = pSchema[i].bytes;
......@@ -183,49 +191,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
// tag value
pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 3);
char *target = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 3) * totalNumOfRows + pField->bytes * i;
if (isNull(pTagValue, pSchema[i].type)) {
sprintf(target, "%s", TSDB_DATA_NULL_STR);
} else {
switch (pSchema[i].type) {
case TSDB_DATA_TYPE_BINARY:
/* binary are not null-terminated string */
strncpy(target, pTagValue, pSchema[i].bytes);
break;
case TSDB_DATA_TYPE_NCHAR:
taosUcs4ToMbs(pTagValue, pSchema[i].bytes, target);
break;
case TSDB_DATA_TYPE_FLOAT: {
float fv = 0;
fv = GET_FLOAT_VAL(pTagValue);
sprintf(target, "%f", fv);
} break;
case TSDB_DATA_TYPE_DOUBLE: {
double dv = 0;
dv = GET_DOUBLE_VAL(pTagValue);
sprintf(target, "%lf", dv);
} break;
case TSDB_DATA_TYPE_TINYINT:
sprintf(target, "%d", *(int8_t *)pTagValue);
break;
case TSDB_DATA_TYPE_SMALLINT:
sprintf(target, "%d", *(int16_t *)pTagValue);
break;
case TSDB_DATA_TYPE_INT:
sprintf(target, "%d", *(int32_t *)pTagValue);
break;
case TSDB_DATA_TYPE_BIGINT:
sprintf(target, "%" PRId64 "", *(int64_t *)pTagValue);
break;
case TSDB_DATA_TYPE_BOOL: {
char *val = (*((int8_t *)pTagValue) == 0) ? "false" : "true";
sprintf(target, "%s", val);
break;
}
default:
break;
}
}
STR_WITH_SIZE_TO_VARSTR(target, "TAG", 3);
pTagValue += pSchema[i].bytes;
}
......@@ -233,7 +199,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
return 0;
}
static int32_t tscBuildMeterSchemaResultFields(SSqlObj *pSql, int32_t numOfCols, int32_t typeColLength,
static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols, int32_t typeColLength,
int32_t noteColLength) {
int32_t rowLen = 0;
SColumnIndex index = {0};
......@@ -243,14 +209,14 @@ static int32_t tscBuildMeterSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
pQueryInfo->order.order = TSDB_ORDER_ASC;
TAOS_FIELD f = {.type = TSDB_DATA_TYPE_BINARY, .bytes = TSDB_COL_NAME_LEN};
TAOS_FIELD f = {.type = TSDB_DATA_TYPE_BINARY, .bytes = TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE};
strncpy(f.name, "Field", TSDB_COL_NAME_LEN);
SFieldSupInfo* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, TSDB_COL_NAME_LEN,
TSDB_COL_NAME_LEN, false);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY,
TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE, TSDB_COL_NAME_LEN, false);
rowLen += TSDB_COL_NAME_LEN;
rowLen += (TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE);
f.bytes = typeColLength;
f.type = TSDB_DATA_TYPE_BINARY;
......@@ -289,137 +255,24 @@ static int32_t tscProcessDescribeTable(SSqlObj *pSql) {
assert(tscGetMetaInfo(pQueryInfo, 0)->pTableMeta != NULL);
const int32_t NUM_OF_DESCRIBE_TABLE_COLUMNS = 4;
const int32_t NUM_OF_DESC_TABLE_COLUMNS = 4;
const int32_t TYPE_COLUMN_LENGTH = 16;
const int32_t NOTE_COLUMN_MIN_LENGTH = 8;
int32_t note_field_length = tscMaxLengthOfTagsFields(pSql);
if (note_field_length == 0) {
note_field_length = NOTE_COLUMN_MIN_LENGTH;
int32_t noteFieldLen = tscMaxLengthOfTagsFields(pSql);
if (noteFieldLen == 0) {
noteFieldLen = NOTE_COLUMN_MIN_LENGTH;
}
int32_t rowLen =
tscBuildMeterSchemaResultFields(pSql, NUM_OF_DESCRIBE_TABLE_COLUMNS, TYPE_COLUMN_LENGTH, note_field_length);
int32_t rowLen = tscBuildTableSchemaResultFields(pSql, NUM_OF_DESC_TABLE_COLUMNS, TYPE_COLUMN_LENGTH, noteFieldLen);
tscFieldInfoUpdateOffset(pQueryInfo);
return tscSetValueToResObj(pSql, rowLen);
}
// todo add order support
static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) {
#if 0
// the result structure has been completed in sql parse, so we
// only need to reorganize the results in the column format
SSqlCmd * pCmd = &pSql->cmd;
SSqlRes * pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
SSchema * pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
int32_t vOffset[TSDB_MAX_COLUMNS] = {0};
for (int32_t f = 1; f < pTableMetaInfo->numOfTags; ++f) {
int16_t tagColumnIndex = pTableMetaInfo->tagColumnIndex[f - 1];
if (tagColumnIndex == -1) {
vOffset[f] = vOffset[f - 1] + TSDB_TABLE_NAME_LEN;
} else {
vOffset[f] = vOffset[f - 1] + pSchema[tagColumnIndex].bytes;
}
}
int32_t totalNumOfResults = pMetricMeta->numOfTables;
int32_t rowLen = tscGetResRowLength(pQueryInfo->exprsInfo);
tscInitResObjForLocalQuery(pSql, totalNumOfResults, rowLen);
int32_t rowIdx = 0;
for (int32_t i = 0; i < pMetricMeta->numOfVnodes; ++i) {
SVnodeSidList *pSidList = (SVnodeSidList *)((char *)pMetricMeta + pMetricMeta->list[i]);
for (int32_t j = 0; j < pSidList->numOfSids; ++j) {
STableIdInfo *pSidExt = tscGetMeterSidInfo(pSidList, j);
for (int32_t k = 0; k < pQueryInfo->fieldsInfo.numOfOutput; ++k) {
SColIndex *pColIndex = &tscSqlExprGet(pQueryInfo, k)->colInfo;
int16_t offsetId = pColIndex->colIdx;
assert((pColIndex->flag & TSDB_COL_TAG) != 0);
assert(0);
char * val = NULL;//pSidExt->tags + vOffset[offsetId];
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, k);
memcpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, k) * totalNumOfResults + pField->bytes * rowIdx, val,
(size_t)pField->bytes);
}
rowIdx++;
}
}
#endif
return 0;
}
static int tscBuildMetricTagSqlFunctionResult(SSqlObj *pSql) {
// SSqlCmd *pCmd = &pSql->cmd;
// SSqlRes *pRes = &pSql->res;
// SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
#if 0
SSuperTableMeta *pMetricMeta = tscGetMetaInfo(pQueryInfo, 0)->pMetricMeta;
int32_t totalNumOfResults = 1; // count function only produce one result
int32_t rowLen = tscGetResRowLength(pQueryInfo->exprsInfo);
tscInitResObjForLocalQuery(pSql, totalNumOfResults, rowLen);
int32_t rowIdx = 0;
for (int32_t i = 0; i < totalNumOfResults; ++i) {
for (int32_t k = 0; k < pQueryInfo->fieldsInfo.numOfOutput; ++k) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->colInfo.colIdx == -1 && pExpr->functionId == TSDB_FUNC_COUNT) {
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, k);
memcpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, i) * totalNumOfResults + pField->bytes * rowIdx,
&pMetricMeta->numOfTables, sizeof(pMetricMeta->numOfTables));
} else {
tscError("not support operations");
continue;
}
}
rowIdx++;
}
#endif
return 0;
}
static int tscProcessQueryTags(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMeta *pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
if (pTableMeta == NULL || tscGetNumOfTags(pTableMeta) == 0 || tscGetNumOfColumns(pTableMeta) == 0) {
strcpy(pCmd->payload, "invalid table");
pSql->res.code = TSDB_CODE_INVALID_TABLE;
return pSql->res.code;
}
SSqlExpr *pExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0);
if (pExpr->functionId == TSDB_FUNC_COUNT) {
return tscBuildMetricTagSqlFunctionResult(pSql);
} else {
return tscBuildMetricTagProjectionResult(pSql);
}
}
static void tscProcessCurrentUser(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
tscSetLocalQueryResult(pSql, pSql->pTscObj->user, pExpr->aliasName, TSDB_USER_LEN);
}
......@@ -434,7 +287,7 @@ static void tscProcessCurrentDB(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
tscSetLocalQueryResult(pSql, db, pExpr->aliasName, TSDB_DB_NAME_LEN);
}
......@@ -442,14 +295,14 @@ static void tscProcessServerVer(SSqlObj *pSql) {
const char* v = pSql->pTscObj->sversion;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
tscSetLocalQueryResult(pSql, v, pExpr->aliasName, tListLen(pSql->pTscObj->sversion));
}
static void tscProcessClientVer(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
tscSetLocalQueryResult(pSql, version, pExpr->aliasName, strlen(version));
}
......@@ -469,7 +322,7 @@ static void tscProcessServStatus(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
tscSetLocalQueryResult(pSql, "1", pExpr->aliasName, 2);
}
......@@ -491,7 +344,7 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 0);
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, 0);
pInfo->pSqlExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0);
pInfo->pSqlExpr = taosArrayGetP(pQueryInfo->exprList, 0);
strncpy(pRes->data, val, pField->bytes);
}
......@@ -503,8 +356,6 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
pSql->res.code = (uint8_t)taosCfgDynamicOptions(pCmd->payload);
} else if (pCmd->command == TSDB_SQL_DESCRIBE_TABLE) {
pSql->res.code = (uint8_t)tscProcessDescribeTable(pSql);
} else if (pCmd->command == TSDB_SQL_RETRIEVE_TAGS) {
pSql->res.code = (uint8_t)tscProcessQueryTags(pSql);
} else if (pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
/*
* set the qhandle to be 1 in order to pass the qhandle check, and to call partial release function to
......
......@@ -305,32 +305,37 @@ int32_t tsParseOneColumnData(SSchema *pSchema, SSQLToken *pToken, char *payload,
case TSDB_DATA_TYPE_BINARY:
// binary data cannot be null-terminated char string, otherwise the last char of the string is lost
if (pToken->type == TK_NULL) {
*(int16_t*) payload = sizeof(int8_t);
payload += VARSTR_HEADER_SIZE;
*payload = TSDB_DATA_BINARY_NULL;
} else { // too long values will return invalid sql, not be truncated automatically
if (pToken->n > pSchema->bytes) {
return tscInvalidSQLErrMsg(msg, "string data overflow", pToken->z);
}
strncpy(payload, pToken->z, pToken->n);
if (pToken->n < pSchema->bytes) {
payload[pToken->n] = 0; // add the null-terminated char if the length of the string is shorter than the available space
}
STR_WITH_SIZE_TO_VARSTR(payload, pToken->z, pToken->n);
}
break;
case TSDB_DATA_TYPE_NCHAR:
if (pToken->type == TK_NULL) {
*(uint32_t *)payload = TSDB_DATA_NCHAR_NULL;
*(int16_t*) payload = sizeof(int32_t);
payload += VARSTR_HEADER_SIZE;
*(uint32_t*) payload = TSDB_DATA_NCHAR_NULL;
} else {
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
if (!taosMbsToUcs4(pToken->z, pToken->n, payload, pSchema->bytes)) {
int32_t resLen = -1;
if (!taosMbsToUcs4(pToken->z, pToken->n, payload + VARSTR_HEADER_SIZE, pSchema->bytes, &resLen)) {
char buf[512] = {0};
snprintf(buf, 512, "%s", strerror(errno));
return tscInvalidSQLErrMsg(msg, buf, pToken->z);
}
*(uint16_t*)payload = (uint16_t) (resLen * TSDB_NCHAR_SIZE);
}
break;
......@@ -1301,13 +1306,13 @@ int tsParseSql(SSqlObj *pSql, bool initialParse) {
char* p = pSql->sqlstr;
pSql->sqlstr = NULL;
tscFreeSqlObjPartial(pSql);
tscPartiallyFreeSqlObj(pSql);
pSql->sqlstr = p;
} else {
tscTrace("continue parse sql: %s", pSql->cmd.curSql);
}
if (tscIsInsertOrImportData(pSql->sqlstr)) {
if (tscIsInsertData(pSql->sqlstr)) {
/*
* Set the fp before parse the sql string, in case of getTableMeta failed, in which
* the error handle callback function can rightfully restore the user-defined callback function (fp).
......
......@@ -300,7 +300,7 @@ static int doBindParam(char* data, SParamInfo* param, TAOS_BIND* bind) {
break;
case TSDB_DATA_TYPE_NCHAR:
if (!taosMbsToUcs4(bind->buffer, *bind->length, data + param->offset, param->bytes)) {
if (!taosMbsToUcs4(bind->buffer, *bind->length, data + param->offset, param->bytes, NULL)) {
return TSDB_CODE_INVALID_VALUE;
}
return TSDB_CODE_SUCCESS;
......@@ -455,7 +455,7 @@ static int insertStmtExecute(STscStmt* stmt) {
// tscTrace("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj);
if (pRes->code != TSDB_CODE_SUCCESS) {
tscFreeSqlObjPartial(pSql);
tscPartiallyFreeSqlObj(pSql);
}
return pRes->code;
......@@ -510,7 +510,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
strtolower(sqlstr, sqlstr);
pStmt->pSql->sqlstr = sqlstr;
if (tscIsInsertOrImportData(sqlstr)) {
if (tscIsInsertData(sqlstr)) {
pStmt->isInsert = true;
return insertStmtPrepare(pStmt);
}
......
此差异已折叠。
......@@ -83,7 +83,6 @@ STableComInfo tscGetTableInfo(const STableMeta* pTableMeta) {
return pTableMeta->tableInfo;
}
bool isValidSchema(struct SSchema* pSchema, int32_t numOfCols) {
if (!VALIDNUMOFCOLS(numOfCols)) {
return false;
......@@ -184,7 +183,6 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
return pTableMeta;
}
/**
* the TableMeta data format in memory is as follows:
*
......
......@@ -288,7 +288,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pReducer->nResultBufSize = pMemBuffer[0]->pageSize * 16;
pReducer->pResultBuf = (tFilePage *)calloc(1, pReducer->nResultBufSize + sizeof(tFilePage));
int32_t finalRowLength = tscGetResRowLength(pQueryInfo->exprsInfo);
int32_t finalRowLength = tscGetResRowLength(pQueryInfo->exprList);
pReducer->resColModel = finalmodel;
pReducer->resColModel->capacity = pReducer->nResultBufSize / finalRowLength;
assert(finalRowLength <= pReducer->rowSize);
......@@ -810,7 +810,7 @@ void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo
// todo merge with following function
// static void reversedCopyResultToDstBuf(SQueryInfo* pQueryInfo, SSqlRes *pRes, tFilePage *pFinalDataPage) {
//
// for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
// for (int32_t i = 0; i < pQueryInfo->exprList.numOfExprs; ++i) {
// TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
//
// int32_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
......@@ -907,7 +907,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, &pLocalReducer->interpolationInfo);
}
int32_t rowSize = tscGetResRowLength(pQueryInfo->exprsInfo);
int32_t rowSize = tscGetResRowLength(pQueryInfo->exprList);
memcpy(pRes->data, pFinalDataPage->data, pRes->numOfRows * rowSize);
pFinalDataPage->numOfElems = 0;
......@@ -1422,15 +1422,14 @@ static void doProcessResultInNextWindow(SSqlObj *pSql, int32_t numOfRes) {
doExecuteSecondaryMerge(pCmd, pLocalReducer, true);
}
int32_t tscDoLocalreduce(SSqlObj *pSql) {
int32_t tscDoLocalMerge(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
tscResetForNextRetrieve(pRes);
if (pSql->signature != pSql || pRes == NULL || pRes->pLocalReducer == NULL) { // all data has been processed
tscTrace("%s call the drop local reducer", __FUNCTION__);
tscTrace("%p %s call the drop local reducer", pSql, __FUNCTION__);
tscDestroyLocalReducer(pSql);
return 0;
}
......@@ -1441,7 +1440,7 @@ int32_t tscDoLocalreduce(SSqlObj *pSql) {
// set the data merge in progress
int32_t prevStatus =
atomic_val_compare_exchange_32(&pLocalReducer->status, TSC_LOCALREDUCE_READY, TSC_LOCALREDUCE_IN_PROGRESS);
if (prevStatus != TSC_LOCALREDUCE_READY || pLocalReducer == NULL) {
if (prevStatus != TSC_LOCALREDUCE_READY) {
assert(prevStatus == TSC_LOCALREDUCE_TOBE_FREED); // it is in tscDestroyLocalReducer function already
return TSDB_CODE_SUCCESS;
}
......
此差异已折叠。
......@@ -220,8 +220,9 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port,
void taos_close(TAOS *taos) {
STscObj *pObj = (STscObj *)taos;
if (pObj == NULL) return;
if (pObj->signature != pObj) return;
if (pObj == NULL || pObj->signature != pObj) {
return;
}
if (pObj->pHb != NULL) {
tscSetFreeHeatBeat(pObj);
......@@ -266,7 +267,7 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) {
}
if (pRes->code != TSDB_CODE_SUCCESS) {
tscFreeSqlObjPartial(pSql);
tscPartiallyFreeSqlObj(pSql);
}
return pRes->code;
......@@ -414,7 +415,7 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) {
}
// secondary merge has handle this situation
if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC) {
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE) {
pRes->numOfTotalInCurrentClause += pRes->numOfRows;
}
......@@ -422,8 +423,9 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) {
if (pQueryInfo == NULL)
return 0;
assert(0);
for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
pRes->tsrow[i] = tscGetResultColumnChr(pRes, pQueryInfo, i);
pRes->tsrow[i] = tscGetResultColumnChr(pRes, pQueryInfo, i, 0);
}
*rows = pRes->tsrow;
......@@ -431,32 +433,9 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) {
return (pQueryInfo->order.order == TSDB_ORDER_DESC) ? pRes->numOfRows : -pRes->numOfRows;
}
static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pField) {
SSqlRes *pRes = &pSql->res;
if (isNull(pRes->tsrow[columnIndex], pField->type)) {
pRes->tsrow[columnIndex] = NULL;
} else if (pField->type == TSDB_DATA_TYPE_NCHAR) {
// convert unicode to native code in a temporary buffer extra one byte for terminated symbol
if (pRes->buffer[columnIndex] == NULL) {
pRes->buffer[columnIndex] = malloc(pField->bytes + TSDB_NCHAR_SIZE);
}
/* string terminated char for binary data*/
memset(pRes->buffer[columnIndex], 0, pField->bytes + TSDB_NCHAR_SIZE);
if (taosUcs4ToMbs(pRes->tsrow[columnIndex], pField->bytes, pRes->buffer[columnIndex])) {
pRes->tsrow[columnIndex] = pRes->buffer[columnIndex];
} else {
tscError("%p charset:%s to %s. val:%ls convert failed.", pSql, DEFAULT_UNICODE_ENCODEC, tsCharset, pRes->tsrow);
pRes->tsrow[columnIndex] = NULL;
}
}
}
static char *getArithemicInputSrc(void *param, const char *name, int32_t colId) {
static UNUSED_FUNC char *getArithemicInputSrc(void *param, const char *name, int32_t colId) {
// SArithmeticSupport *pSupport = (SArithmeticSupport *)param;
// SArithExprInfo * pExpr = pSupport->pArithExpr;
// SExprInfo * pExpr = pSupport->pArithExpr;
// int32_t index = -1;
// for (int32_t i = 0; i < pExpr->numOfCols; ++i) {
......@@ -471,210 +450,6 @@ static char *getArithemicInputSrc(void *param, const char *name, int32_t colId)
return 0;
}
static void **doSetResultRowData(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
assert(pRes->row >= 0 && pRes->row <= pRes->numOfRows);
if (pRes->row >= pRes->numOfRows) { // all the results has returned to invoker
tfree(pRes->tsrow);
return pRes->tsrow;
}
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
//todo refactor move away
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
for(int32_t k = 0; k < numOfExprs; ++k) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k);
if (k > 0) {
SSqlExpr* pPrev = tscSqlExprGet(pQueryInfo, k - 1);
pExpr->offset = pPrev->offset + pPrev->resBytes;
}
}
int32_t num = 0;
for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, i);
if (pInfo->pSqlExpr != NULL) {
pRes->tsrow[i] = tscGetResultColumnChr(pRes, pQueryInfo, i) + pInfo->pSqlExpr->resBytes * pRes->row;
} else {
assert(0);
}
// primary key column cannot be null in interval query, no need to check
if (i == 0 && pQueryInfo->intervalTime > 0) {
continue;
}
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
transferNcharData(pSql, i, pField);
// calculate the result from several other columns
if (pInfo->pArithExprInfo != NULL) {
SArithmeticSupport *sas = (SArithmeticSupport *)calloc(1, sizeof(SArithmeticSupport));
sas->offset = 0;
sas->pArithExpr = pInfo->pArithExprInfo;
// sas->numOfCols = sas->pArithExpr->numOfCols;
if (pRes->buffer[i] == NULL) {
pRes->buffer[i] = malloc(tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i)->bytes);
}
for(int32_t k = 0; k < sas->numOfCols; ++k) {
// int32_t columnIndex = sas->pArithExpr->colList[k].colIndex;
// SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, columnIndex);
//
// sas->elemSize[k] = pExpr->resBytes;
// sas->data[k] = (pRes->data + pRes->numOfRows* pExpr->offset) + pRes->row*pExpr->resBytes;
}
tExprTreeCalcTraverse(sas->pArithExpr->pExpr, 1, pRes->buffer[i], sas, TSDB_ORDER_ASC, getArithemicInputSrc);
pRes->tsrow[i] = pRes->buffer[i];
free(sas); //todo optimization
}
}
assert(num <= pQueryInfo->fieldsInfo.numOfOutput);
pRes->row++; // index increase one-step
return pRes->tsrow;
}
static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) {
bool hasData = true;
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
bool allSubqueryExhausted = true;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
if (pSql->pSubs[i] == NULL) {
continue;
}
// SSqlRes *pRes1 = &pSql->pSubs[i]->res;
SSqlCmd *pCmd1 = &pSql->pSubs[i]->cmd;
SQueryInfo * pQueryInfo1 = tscGetQueryInfoDetail(pCmd1, pCmd1->clauseIndex);
// STableMetaInfo *pMetaInfo = tscGetMetaInfo(pQueryInfo1, 0);
assert(pQueryInfo1->numOfTables == 1);
/*
* if the global limitation is not reached, and current result has not exhausted, or next more vnodes are
* available, goes on
*/
// if (pMetaInfo->vnodeIndex < pMetaInfo->pMetricMeta->numOfVnodes && pRes1->row < pRes1->numOfRows &&
// (!tscHasReachLimitation(pQueryInfo1, pRes1))) {
// allSubqueryExhausted = false;
// break;
// }
}
hasData = !allSubqueryExhausted;
} else { // otherwise, in case inner join, if any subquery exhausted, query completed.
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
if (pSql->pSubs[i] == 0) {
continue;
}
SSqlRes * pRes1 = &pSql->pSubs[i]->res;
SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0);
if ((pRes1->row >= pRes1->numOfRows && tscHasReachLimitation(pQueryInfo1, pRes1) &&
tscProjectionQueryOnTable(pQueryInfo1)) ||
(pRes1->numOfRows == 0)) {
hasData = false;
break;
}
}
}
return hasData;
}
static UNUSED_FUNC void **tscBuildResFromSubqueries(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
while (1) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
if (pRes->tsrow == NULL) {
pRes->tsrow = calloc(numOfExprs, POINTER_BYTES);
}
bool success = false;
int32_t numOfTableHasRes = 0;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
if (pSql->pSubs[i] != 0) {
numOfTableHasRes++;
}
}
if (numOfTableHasRes >= 2) { // do merge result
success = (doSetResultRowData(pSql->pSubs[0]) != NULL) && (doSetResultRowData(pSql->pSubs[1]) != NULL);
} else { // only one subquery
SSqlObj *pSub = pSql->pSubs[0];
if (pSub == NULL) {
pSub = pSql->pSubs[1];
}
success = (doSetResultRowData(pSub) != NULL);
}
if (success) { // current row of final output has been built, return to app
for (int32_t i = 0; i < numOfExprs; ++i) {
int32_t tableIndex = pRes->pColumnIndex[i].tableIndex;
int32_t columnIndex = pRes->pColumnIndex[i].columnIndex;
SSqlRes *pRes1 = &pSql->pSubs[tableIndex]->res;
pRes->tsrow[i] = pRes1->tsrow[columnIndex];
}
pRes->numOfTotalInCurrentClause++;
break;
} else { // continue retrieve data from vnode
if (!tscHashRemainDataInSubqueryResultSet(pSql)) {
tscTrace("%p at least one subquery exhausted, free all other %d subqueries", pSql, pSql->numOfSubs - 1);
SSubqueryState *pState = NULL;
// free all sub sqlobj
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SSqlObj *pChildObj = pSql->pSubs[i];
if (pChildObj == NULL) {
continue;
}
SJoinSubquerySupporter *pSupporter = (SJoinSubquerySupporter *)pChildObj->param;
pState = pSupporter->pState;
tscDestroyJoinSupporter(pChildObj->param);
taos_free_result(pChildObj);
}
free(pState);
return NULL;
}
tscFetchDatablockFromSubquery(pSql);
if (pRes->code != TSDB_CODE_SUCCESS) {
return NULL;
}
}
}
return pRes->tsrow;
}
static void waitForRetrieveRsp(void *param, TAOS_RES *tres, int numOfRows) {
SSqlObj* pSql = (SSqlObj*) tres;
......@@ -701,15 +476,19 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
}
// current data are exhausted, fetch more data
if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && pRes->completed != true &&
(pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_METRIC ||
pCmd->command == TSDB_SQL_FETCH || pCmd->command == TSDB_SQL_DESCRIBE_TABLE))) {
if (pRes->row >= pRes->numOfRows && pRes->completed != true &&
(pCmd->command == TSDB_SQL_RETRIEVE ||
pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE ||
pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE ||
pCmd->command == TSDB_SQL_FETCH ||
pCmd->command == TSDB_SQL_SHOW ||
pCmd->command == TSDB_SQL_SELECT ||
pCmd->command == TSDB_SQL_DESCRIBE_TABLE)) {
taos_fetch_rows_a(res, waitForRetrieveRsp, pSql->pTscObj);
sem_wait(&pSql->rspSem);
}
return doSetResultRowData(pSql);
return doSetResultRowData(pSql, true);
}
int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
......@@ -795,7 +574,7 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
if (keepCmd) {
tscFreeSqlResult(pSql);
} else {
tscFreeSqlObjPartial(pSql);
tscPartiallyFreeSqlObj(pSql);
}
}
......@@ -805,7 +584,7 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
// set freeFlag to 1 in retrieve message if there are un-retrieved results
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
if (pQueryInfo == NULL) {
tscFreeSqlObjPartial(pSql);
tscPartiallyFreeSqlObj(pSql);
return;
}
......@@ -857,7 +636,7 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
tscFreeSqlResult(pSql);
tscTrace("%p sql result is freed by app while sql command is kept", pSql);
} else {
tscFreeSqlObjPartial(pSql);
tscPartiallyFreeSqlObj(pSql);
tscTrace("%p sql result is freed by app", pSql);
}
} else { // for async release, remove its link
......@@ -878,7 +657,7 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
tscFreeSqlResult(pSql);
tscTrace("%p sql result is freed while sql command is kept", pSql);
} else {
tscFreeSqlObjPartial(pSql);
tscPartiallyFreeSqlObj(pSql);
tscTrace("%p sql result is freed by app", pSql);
}
}
......@@ -1223,7 +1002,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
tscTrace("%p load multi metermeta result:%d %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj);
if (pRes->code != TSDB_CODE_SUCCESS) {
tscFreeSqlObjPartial(pSql);
tscPartiallyFreeSqlObj(pSql);
}
return pRes->code;
......
此差异已折叠。
此差异已折叠。
......@@ -26,6 +26,20 @@
extern "C" {
#endif
#define STR_TO_VARSTR(x, str) do {VarDataLenT __len = strlen(str); \
*(VarDataLenT*)(x) = __len; \
strncpy((char*)(x) + VARSTR_HEADER_SIZE, (str), __len);} while(0);
#define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) do {\
char* _e = stpncpy((char*)(x) + VARSTR_HEADER_SIZE, (str), (_maxs));\
*(VarDataLenT*)(x) = _e - (x);\
} while(0)
#define STR_WITH_SIZE_TO_VARSTR(x, str, _size) do {\
*(VarDataLenT*)(x) = (_size); \
strncpy((char*)(x) + VARSTR_HEADER_SIZE, (str), (_size));\
} while(0);
// ----------------- TSDB COLUMN DEFINITION
typedef struct {
int8_t type; // Column type
......@@ -182,7 +196,7 @@ void tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData);
void tdFreeDataCols(SDataCols *pCols);
void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols);
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop);
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!!
int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge);
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows);
......
......@@ -172,28 +172,9 @@ int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int32_
switch (type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
// set offset
*(VarDataOffsetT *)POINTER_DRIFT(row, toffset) = dataRowLen(row);
// set length
VarDataLenT slen = 0;
if (isNull(value, type)) {
slen = (type == TSDB_DATA_TYPE_BINARY) ? sizeof(int8_t) : sizeof(int32_t);
} else {
if (type == TSDB_DATA_TYPE_BINARY) {
slen = strnlen((char *)value, bytes);
} else {
slen = wcsnlen((wchar_t *)value, (bytes) / TSDB_NCHAR_SIZE) * TSDB_NCHAR_SIZE;
}
}
ASSERT(slen <= bytes);
*(VarDataLenT *)ptr = slen;
ptr = POINTER_DRIFT(ptr, sizeof(VarDataLenT));
memcpy((void *)ptr, value, slen);
dataRowLen(row) += (sizeof(int16_t) + slen);
memcpy(ptr, value, varDataTLen(value));
dataRowLen(row) += varDataTLen(value);
break;
default:
memcpy(POINTER_DRIFT(row, toffset), value, TYPE_BYTES[type]);
......@@ -443,8 +424,8 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i), target->numOfPoints,
target->maxPoints);
}
target->numOfPoints++;
}
target->numOfPoints++;
} else {
pTarget = tdDupDataCols(target, true);
if (pTarget == NULL) goto _err;
......
......@@ -25,13 +25,10 @@ __attribute__((unused)) static FORCE_INLINE size_t copy(char* dst, const char* s
}
void extractTableName(const char* tableId, char* name) {
size_t offset = strcspn(tableId, &TS_PATH_DELIMITER[0]);
offset = strcspn(&tableId[offset], &TS_PATH_DELIMITER[0]);
size_t s1 = strcspn(tableId, &TS_PATH_DELIMITER[0]);
size_t s2 = strcspn(&tableId[s1 + 1], &TS_PATH_DELIMITER[0]);
strncpy(name, &tableId[offset], TSDB_TABLE_NAME_LEN);
// char* r = skipSegments(tableId, TS_PATH_DELIMITER[0], 2);
// return copy(name, r, TS_PATH_DELIMITER[0]);
strncpy(name, &tableId[s1 + s2 + 2], TSDB_TABLE_NAME_LEN);
}
char* extractDBName(const char* tableId, char* name) {
......
......@@ -198,7 +198,7 @@ void assignVal(char *val, const char *src, int32_t len, int32_t type) {
break;
};
case TSDB_DATA_TYPE_BINARY: {
strncpy(val, src, len);
varDataCopy(val, src);
break;
};
case TSDB_DATA_TYPE_NCHAR: {
......
......@@ -228,7 +228,7 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
int32_t num = taosGetQueueNumber(pWorker->qset);
if (num > 0) {
usleep(30000);
usleep(30);
sched_yield();
} else {
taosFreeQall(pWorker->qall);
......
......@@ -23,12 +23,12 @@ typedef void* qinfo_t;
/**
* create the qinfo object according to QueryTableMsg
* @param pVnode
* @param tsdb
* @param pQueryTableMsg
* @param qinfo
* @return
*/
int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo);
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo);
/**
* Destroy QInfo object
......
......@@ -35,12 +35,15 @@ extern "C" {
// ----------------- For variable data types such as TSDB_DATA_TYPE_BINARY and TSDB_DATA_TYPE_NCHAR
typedef int32_t VarDataOffsetT;
typedef int16_t VarDataLenT;
#define varDataLen(v) ((VarDataLenT *)(v))[0]
#define varDataTLen(v) (sizeof(VarDataLenT) + varDataLen(v))
#define varDataVal(v) ((void *)((char *)v + sizeof(VarDataLenT)))
#define varDataLen(v) ((VarDataLenT *)(v))[0]
#define varDataTLen(v) (sizeof(VarDataLenT) + varDataLen(v))
#define varDataVal(v) ((void *)((char *)v + sizeof(VarDataLenT)))
#define varDataCopy(dst, v) memcpy((dst), (void*) (v), varDataTLen(v))
// this data type is internally used only in 'in' query to hold the values
#define TSDB_DATA_TYPE_ARRAY (TSDB_DATA_TYPE_NCHAR + 1)
#define VARSTR_HEADER_SIZE sizeof(VarDataLenT)
// Bytes for each type.
extern const int32_t TYPE_BYTES[11];
......@@ -209,7 +212,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_SHELL_VNODE_BITS 24
#define TSDB_SHELL_SID_MASK 0xFF
#define TSDB_HTTP_TOKEN_LEN 20
#define TSDB_SHOW_SQL_LEN 32
#define TSDB_SHOW_SQL_LEN 512
#define TSDB_METER_STATE_OFFLINE 0
#define TSDB_METER_STATE_ONLLINE 1
......@@ -290,27 +293,30 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_MAX_RPC_THREADS 5
#define TSDB_QUERY_TYPE_NON_TYPE 0x00U // none type
#define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01U // free qhandle at vnode
#define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type
#define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01u // free qhandle at vnode
/*
* 1. ordinary sub query for select * from super_table
* 2. all sqlobj generated by createSubqueryObj with this flag
*/
#define TSDB_QUERY_TYPE_SUBQUERY 0x02U
#define TSDB_QUERY_TYPE_STABLE_SUBQUERY 0x04U // two-stage subquery for super table
#define TSDB_QUERY_TYPE_SUBQUERY 0x02u
#define TSDB_QUERY_TYPE_STABLE_SUBQUERY 0x04u // two-stage subquery for super table
#define TSDB_QUERY_TYPE_TABLE_QUERY 0x08U // query ordinary table; below only apply to client side
#define TSDB_QUERY_TYPE_STABLE_QUERY 0x10U // query on super table
#define TSDB_QUERY_TYPE_JOIN_QUERY 0x20U // join query
#define TSDB_QUERY_TYPE_PROJECTION_QUERY 0x40U // select *,columns... query
#define TSDB_QUERY_TYPE_JOIN_SEC_STAGE 0x80U // join sub query at the second stage
#define TSDB_QUERY_TYPE_TABLE_QUERY 0x08u // query ordinary table; below only apply to client side
#define TSDB_QUERY_TYPE_STABLE_QUERY 0x10u // query on super table
#define TSDB_QUERY_TYPE_JOIN_QUERY 0x20u // join query
#define TSDB_QUERY_TYPE_PROJECTION_QUERY 0x40u // select *,columns... query
#define TSDB_QUERY_TYPE_JOIN_SEC_STAGE 0x80u // join sub query at the second stage
#define TSDB_QUERY_TYPE_INSERT 0x100U // insert type
#define TSDB_QUERY_TYPE_IMPORT 0x200U // import data
#define TSDB_QUERY_TYPE_TAG_FILTER_QUERY 0x400u
#define TSDB_QUERY_TYPE_INSERT 0x100u // insert type
#define TSDB_QUERY_TYPE_IMPORT 0x200u // import data
#define TSDB_QUERY_TYPE_MULTITABLE_QUERY 0x800u
#define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0)
#define TSDB_QUERY_SET_TYPE(x, _type) ((x) |= (_type))
#define TSDB_QUERY_CLEAR_TYPE(x, _type) ((x) &= (~_type))
#define TSDB_QUERY_RESET_TYPE(x) ((x) = TSDB_QUERY_TYPE_NON_TYPE)
#define TSDB_ORDER_ASC 1
......
此差异已折叠。
此差异已折叠。
......@@ -22,7 +22,7 @@ extern "C" {
typedef enum _VN_STATUS {
TAOS_VN_STATUS_INIT,
TAOS_VN_STATUS_CREATING,
TAOS_VN_STATUS_UPDATING,
TAOS_VN_STATUS_READY,
TAOS_VN_STATUS_CLOSING,
TAOS_VN_STATUS_DELETING,
......
......@@ -66,6 +66,7 @@ typedef struct SMnodeObj {
SDnodeObj *pDnode;
} SMnodeObj;
// todo use dynamic length string
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
int8_t type;
......
此差异已折叠。
此差异已折叠。
......@@ -22,6 +22,7 @@
#include "tutil.h"
#include "ttime.h"
#include "tsocket.h"
#include "tdataformat.h"
#include "mgmtDef.h"
#include "mgmtLog.h"
#include "mgmtMnode.h"
......@@ -288,13 +289,13 @@ static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 40;
pShow->bytes[cols] = 40 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "end point");
strcpy(pSchema[cols].name, "end_point");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 10;
pShow->bytes[cols] = 12 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "role");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
......@@ -302,7 +303,7 @@ static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create time");
strcpy(pSchema[cols].name, "create_time");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
......@@ -339,11 +340,12 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strncpy(pWrite, pMnode->pDnode->dnodeEp, pShow->bytes[cols]-1);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pMnode->pDnode->dnodeEp, pShow->bytes[cols] - VARSTR_HEADER_SIZE);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, mgmtGetMnodeRoleStr(pMnode->role));
char* roles = mgmtGetMnodeRoleStr(pMnode->role);
STR_TO_VARSTR(pWrite, roles);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
......
......@@ -458,6 +458,10 @@ static int sdbWrite(void *param, void *data, int type) {
// for data from WAL or forward, version may be smaller
if (pHead->version <= tsSdbObj.version) {
pthread_mutex_unlock(&tsSdbObj.mutex);
if (type == TAOS_QTYPE_FWD && tsSdbObj.sync != NULL) {
sdbTrace("forward request is received, version:%" PRIu64 " confirm it", pHead->version);
syncConfirmForward(tsSdbObj.sync, pHead->version, TSDB_CODE_SUCCESS);
}
return TSDB_CODE_SUCCESS;
} else if (pHead->version != tsSdbObj.version + 1) {
pthread_mutex_unlock(&tsSdbObj.mutex);
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
#run unique/column/replica3.sim
run unique/column/replica3.sim
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册