未验证 提交 dd2ed98f 编写于 作者: H haojun Liao 提交者: GitHub

Merge pull request #5918 from taosdata/feature/query

Feature/query
......@@ -44,23 +44,15 @@ typedef struct SLocalMerger {
int32_t numOfCompleted;
int32_t numOfVnode;
SLoserTreeInfo * pLoserTree;
char * prevRowOfInput;
tFilePage * pResultBuf;
int32_t nResultBufSize;
tFilePage * pTempBuffer;
struct SQLFunctionCtx *pCtx;
int32_t rowSize; // size of each intermediate result.
bool hasPrevRow; // cannot be released
bool hasUnprocessedRow;
tOrderDescriptor * pDesc;
SColumnModel * resColModel;
SColumnModel* finalModel;
tExtMemBuffer ** pExtMemBuffer; // disk-based buffer
SFillInfo* pFillInfo; // interpolation support structure
char* pFinalRes; // result data after interpo
tFilePage* discardData;
bool discard;
int32_t offset; // limit offset value
bool orderPrjOnSTable; // projection query on stable
} SLocalMerger;
......@@ -94,7 +86,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
void tscDestroyLocalMerger(SSqlObj *pSql);
int32_t tscDoLocalMerge(SSqlObj *pSql);
//int32_t tscDoLocalMerge(SSqlObj *pSql);
#ifdef __cplusplus
}
......
......@@ -85,15 +85,13 @@ typedef struct SMergeTsCtx {
int8_t compared;
}SMergeTsCtx;
typedef struct SVgroupTableInfo {
SVgroupInfo vgInfo;
SArray* itemList; //SArray<STableIdInfo>
SArray *itemList; // SArray<STableIdInfo>
} SVgroupTableInfo;
static FORCE_INLINE SQueryInfo* tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex) {
static FORCE_INLINE SQueryInfo* tscGetQueryInfo(SSqlCmd* pCmd, int32_t subClauseIndex) {
assert(pCmd != NULL && subClauseIndex >= 0);
if (pCmd->pQueryInfo == NULL || subClauseIndex >= pCmd->numOfClause) {
return NULL;
}
......@@ -101,6 +99,8 @@ static FORCE_INLINE SQueryInfo* tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t sub
return pCmd->pQueryInfo[subClauseIndex];
}
SQueryInfo* tscGetActiveQueryInfo(SSqlCmd* pCmd);
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks);
void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta);
void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf);
......@@ -129,7 +129,13 @@ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo);
bool tscIsTWAQuery(SQueryInfo* pQueryInfo);
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo);
bool tscGroupbyColumn(SQueryInfo* pQueryInfo);
bool tscIsTopbotQuery(SQueryInfo* pQueryInfo);
bool tscIsTopBotQuery(SQueryInfo* pQueryInfo);
bool hasTagValOutput(SQueryInfo* pQueryInfo);
bool timeWindowInterpoRequired(SQueryInfo *pQueryInfo);
bool isStabledev(SQueryInfo* pQueryInfo);
bool isTsCompQuery(SQueryInfo* pQueryInfo);
bool isSimpleAggregate(SQueryInfo* pQueryInfo);
bool isBlockDistQuery(SQueryInfo* pQueryInfo);
int32_t tscGetTopbotQueryParam(SQueryInfo* pQueryInfo);
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex);
......@@ -143,7 +149,7 @@ bool tscQueryTags(SQueryInfo* pQueryInfo);
bool tscMultiRoundQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryBlockInfo(SQueryInfo* pQueryInfo);
SSqlExpr* tscAddFuncInSelectClause(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId,
SExprInfo* tscAddFuncInSelectClause(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId,
SColumnIndex* pIndex, SSchema* pColSchema, int16_t colType);
int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pzTableName, SSqlObj* pSql);
......@@ -174,27 +180,29 @@ void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes)
int32_t tscGetResRowLength(SArray* pExprList);
SSqlExpr* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
SExprInfo* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, bool isTagCol);
SSqlExpr* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
SExprInfo* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, bool isTagCol);
SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type,
SExprInfo* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type,
int16_t size);
size_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo);
void tscInsertPrimaryTsSourceColumn(SQueryInfo* pQueryInfo, SColumnIndex* pIndex);
void tscInsertPrimaryTsSourceColumn(SQueryInfo* pQueryInfo, uint64_t uid);
SSqlExpr* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index);
SExprInfo* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index);
int32_t tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy);
void tscSqlExprAssign(SExprInfo* dst, const SExprInfo* src);
void tscSqlExprInfoDestroy(SArray* pExprInfo);
SColumn* tscColumnClone(const SColumn* src);
bool tscColumnExists(SArray* pColumnList, SColumnIndex* pColIndex);
SColumn* tscColumnListInsert(SArray* pColList, SColumnIndex* colIndex);
SArray* tscColumnListClone(const SArray* src, int16_t tableIndex);
bool tscColumnExists(SArray* pColumnList, int32_t columnIndex, uint64_t uid);
SColumn* tscColumnListInsert(SArray* pColumnList, int32_t columnIndex, uint64_t uid, SSchema* pSchema);
void tscColumnListDestroy(SArray* pColList);
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void tscDequoteAndTrimToken(SStrToken* pToken);
int32_t tscValidateName(SStrToken* pToken);
......@@ -216,8 +224,11 @@ bool tscShouldBeFreed(SSqlObj* pSql);
STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd *pCmd, int32_t subClauseIndex, int32_t tableIndex);
STableMetaInfo* tscGetMetaInfo(SQueryInfo *pQueryInfo, int32_t tableIndex);
SQueryInfo *tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex);
SQueryInfo *tscGetQueryInfoDetailSafely(SSqlCmd *pCmd, int32_t subClauseIndex);
void tscInitQueryInfo(SQueryInfo* pQueryInfo);
void tscClearSubqueryInfo(SSqlCmd* pCmd);
int32_t tscAddQueryInfo(SSqlCmd *pCmd);
SQueryInfo *tscGetQueryInfo(SSqlCmd* pCmd, int32_t subClauseIndex);
SQueryInfo *tscGetQueryInfoS(SSqlCmd *pCmd, int32_t subClauseIndex);
void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo);
......@@ -225,11 +236,7 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, SName* name, STableM
SVgroupsInfo* vgroupList, SArray* pTagCols, SArray* pVgroupTables);
STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo *pQueryInfo);
int32_t tscAddSubqueryInfo(SSqlCmd *pCmd);
void tscInitQueryInfo(SQueryInfo* pQueryInfo);
void tscClearSubqueryInfo(SSqlCmd* pCmd);
void tscFreeVgroupTableInfo(SArray* pVgroupTables);
SArray* tscVgroupTableInfoDup(SArray* pVgroupTables);
void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index);
......@@ -241,6 +248,8 @@ int tscGetTableMetaEx(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, bool creat
void tscResetForNextRetrieve(SSqlRes* pRes);
void tscDoQuery(SSqlObj* pSql);
void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo);
void doExecuteQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo);
SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *pInfo);
void* tscVgroupInfoClear(SVgroupsInfo *pInfo);
......@@ -274,7 +283,7 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex);
int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid);
int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId);
void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex);
void tscPrintSelNodeList(SSqlObj* pSql, int32_t subClauseIndex);
bool hasMoreVnodesToTry(SSqlObj *pSql);
bool hasMoreClauseToTry(SSqlObj* pSql);
......@@ -299,7 +308,10 @@ CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta);
uint32_t tscGetTableMetaMaxSize();
int32_t tscCreateTableMetaFromCChildMeta(STableMeta* pChild, const char* name);
STableMeta* tscTableMetaDup(STableMeta* pTableMeta);
int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAttr, void* addr);
void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchema* pSchema);
void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr, int32_t stage);
void* malloc_throw(size_t size);
void* calloc_throw(size_t nmemb, size_t size);
......
......@@ -35,6 +35,7 @@ extern "C" {
#include "qExecutor.h"
#include "qSqlparser.h"
#include "qTsbuf.h"
#include "qUtil.h"
#include "tcmdtype.h"
// forward declaration
......@@ -96,56 +97,29 @@ typedef struct STableMetaInfo {
SArray *tagColList; // SArray<SColumn*>, involved tag columns
} STableMetaInfo;
typedef struct SColumnIndex {
int16_t tableIndex;
int16_t columnIndex;
} SColumnIndex;
typedef struct SFieldInfo {
int16_t numOfOutput; // number of column in result
TAOS_FIELD* final;
SArray *internalField; // SArray<SInternalField>
} SFieldInfo;
typedef struct SColumn {
SColumnIndex colIndex;
int32_t numOfFilters;
SColumnFilterInfo *filterInfo;
uint64_t tableUid;
int32_t columnIndex;
SColumnInfo info;
} SColumn;
/* the structure for sql function in select clause */
typedef struct SSqlExpr {
char aliasName[TSDB_COL_NAME_LEN]; // as aliasName
SColIndex colInfo;
uint64_t uid; // refactor use the pointer
int16_t functionId; // function id in aAgg array
int16_t resType; // return value type
int16_t resBytes; // length of return value
int32_t interBytes; // inter result buffer size
int16_t numOfParams; // argument value of each function
tVariant param[3]; // parameters are not more than 3
int32_t offset; // sub result column value of arithmetic expression.
int16_t resColId; // result column id
SColumn *pFilter; // expr filter
} SSqlExpr;
typedef struct SExprFilter {
tSqlExpr *pExpr; //used for having parse
SSqlExpr *pSqlExpr;
SArray *fp;
SColumn *pFilters; //having filter info
}SExprFilter;
typedef struct SInternalField {
TAOS_FIELD field;
bool visible;
SExprInfo *pArithExprInfo;
SSqlExpr *pSqlExpr;
SExprFilter *pFieldFilters;
SExprInfo *pExpr;
} SInternalField;
typedef struct SFieldInfo {
int16_t numOfOutput; // number of column in result
TAOS_FIELD* final;
SArray *internalField; // SArray<SInternalField>
} SFieldInfo;
typedef struct SCond {
uint64_t uid;
int32_t len; // length of tag query condition data
......@@ -160,7 +134,7 @@ typedef struct SJoinNode {
} SJoinNode;
typedef struct SJoinInfo {
bool hasJoin;
bool hasJoin;
SJoinNode* joinTables[TSDB_MAX_JOIN_TABLE_NUM];
} SJoinInfo;
......@@ -229,13 +203,14 @@ typedef struct SQueryInfo {
SInterval interval; // tumble time window
SSessionWindow sessionWindow; // session time window
SSqlGroupbyExpr groupbyExpr; // group by tags info
SSqlGroupbyExpr groupbyExpr; // groupby tags info
SArray * colList; // SArray<SColumn*>
SFieldInfo fieldsInfo;
SArray * exprList; // SArray<SSqlExpr*>
SArray * exprList; // SArray<SExprInfo*>
SLimitVal limit;
SLimitVal slimit;
STagCond tagCond;
SOrderVal order;
int16_t fillType; // final result fill type
int16_t numOfTables;
......@@ -254,7 +229,15 @@ typedef struct SQueryInfo {
int32_t round; // 0/1/....
int32_t bufLen;
char* buf;
int32_t havingFieldNum;
SQInfo* pQInfo; // global merge operator
SArray* pDSOperator; // data source operator
SArray* pPhyOperator; // physical query execution plan
SQueryAttr* pQueryAttr; // query object
struct SQueryInfo *sibling; // sibling
SArray *pUpstream; // SArray<struct SQueryInfo>
struct SQueryInfo *pDownstream;
int32_t havingFieldNum;
} SQueryInfo;
typedef struct {
......@@ -269,8 +252,6 @@ typedef struct {
};
uint32_t insertType; // TODO remove it
int32_t clauseIndex; // index of multiple subclause query
char * curSql; // current sql, resume position of sql after parsing paused
int8_t parseFinished;
char reserve2[3]; // fix bus error on arm32
......@@ -280,22 +261,26 @@ typedef struct {
uint32_t allocSize;
char * payload;
int32_t payloadLen;
SQueryInfo **pQueryInfo;
int32_t numOfClause;
int32_t clauseIndex; // index of multiple subclause query
SQueryInfo *active; // current active query info
int32_t batchSize; // for parameter ('?') binding and batch processing
int32_t numOfParams;
int8_t dataSourceType; // load data from file or not
char reserve4[3]; // fix bus error on arm32
char reserve4[3]; // fix bus error on arm32
int8_t submitSchema; // submit block is built with table schema
char reserve5[3]; // fix bus error on arm32
char reserve5[3]; // fix bus error on arm32
STagData tagData; // NOTE: pTagData->data is used as a variant length array
SName **pTableNameList; // all involved tableMeta list of current insert sql statement.
int32_t numOfTables;
SHashObj *pTableBlockHashList; // data block for each table
SArray *pDataBlocks; // SArray<STableDataBlocks*>. Merged submit block for each vgroup
SArray *pDataBlocks; // SArray<STableDataBlocks*>. Merged submit block for each vgroup
} SSqlCmd;
typedef struct SResRec {
......@@ -438,7 +423,7 @@ void tscInitMsgsFp();
int tsParseSql(SSqlObj *pSql, bool initial);
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet);
int tscProcessSql(SSqlObj *pSql);
int tscBuildAndSendRequest(SSqlObj *pSql, SQueryInfo* pQueryInfo);
int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex);
void tscAsyncResultOnError(SSqlObj *pSql);
......@@ -453,6 +438,9 @@ void tscRestoreFuncForSTableQuery(SQueryInfo *pQueryInfo);
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock);
void handleDownstreamOperator(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void destroyTableNameList(SSqlCmd* pCmd);
void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta);
......@@ -500,47 +488,6 @@ int32_t tscSQLSyntaxErrMsg(char* msg, const char* additionalInfo, const char* s
int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex, int32_t offset) {
SInternalField* pInfo = (SInternalField*) TARRAY_GET_ELEM(pFieldInfo->internalField, columnIndex);
int32_t type = pInfo->field.type;
int32_t bytes = pInfo->field.bytes;
char* pData = pRes->data + (int32_t)(offset * pRes->numOfRows + bytes * pRes->row);
UNUSED(pData);
// user defined constant value output columns
if (pInfo->pSqlExpr != NULL && TSDB_COL_IS_UD_COL(pInfo->pSqlExpr->colInfo.flag)) {
if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) {
pData = pInfo->pSqlExpr->param[1].pz;
pRes->length[columnIndex] = pInfo->pSqlExpr->param[1].nLen;
pRes->tsrow[columnIndex] = (pInfo->pSqlExpr->param[1].nType == TSDB_DATA_TYPE_NULL) ? NULL : (unsigned char*)pData;
} else {
assert(bytes == tDataTypes[type].bytes);
pRes->tsrow[columnIndex] = isNull(pData, type) ? NULL : (unsigned char*)&pInfo->pSqlExpr->param[1].i64;
pRes->length[columnIndex] = bytes;
}
} else {
if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) {
int32_t realLen = varDataLen(pData);
assert(realLen <= bytes - VARSTR_HEADER_SIZE);
pRes->tsrow[columnIndex] = (isNull(pData, type)) ? NULL : (unsigned char*)((tstr *)pData)->data;
if (realLen < pInfo->pSqlExpr->resBytes - VARSTR_HEADER_SIZE) { // todo refactor
*(pData + realLen + VARSTR_HEADER_SIZE) = 0;
}
pRes->length[columnIndex] = realLen;
} else {
assert(bytes == tDataTypes[type].bytes);
pRes->tsrow[columnIndex] = isNull(pData, type) ? NULL : (unsigned char*)pData;
pRes->length[columnIndex] = bytes;
}
}
}
extern int32_t sentinel;
extern SHashObj *tscVgroupMap;
extern SHashObj *tscTableMetaInfo;
......
......@@ -49,7 +49,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
pSql->sqlstr = calloc(1, sqlLen + 1);
if (pSql->sqlstr == NULL) {
tscError("%p failed to malloc sql string buffer", pSql);
tscError("0x%"PRIx64" failed to malloc sql string buffer", pSql->self);
pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscAsyncResultOnError(pSql);
return;
......@@ -69,7 +69,8 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
return;
}
tscDoQuery(pSql);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
executeQuery(pSql, pQueryInfo);
}
// TODO return the correct error code to client in tscQueueAsyncError
......@@ -80,7 +81,7 @@ void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *pa
TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param) {
STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) {
tscError("bug!!! pObj:%p", pObj);
tscError("pObj:%p is NULL or freed", pObj);
terrno = TSDB_CODE_TSC_DISCONNECTED;
tscQueueAsyncError(fp, param, TSDB_CODE_TSC_DISCONNECTED);
return NULL;
......@@ -179,7 +180,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
tscFetchDatablockForSubquery(pSql);
} else {
tscProcessSql(pSql);
tscBuildAndSendRequest(pSql, NULL);
}
}
......@@ -193,8 +194,8 @@ static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOf
tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchRowsProxy);
}
void taos_fetch_rows_a(TAOS_RES *taosa, __async_cb_func_t fp, void *param) {
SSqlObj *pSql = (SSqlObj *)taosa;
void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
SSqlObj *pSql = (SSqlObj *)tres;
if (pSql == NULL || pSql->signature != pSql) {
tscError("sql object is NULL");
tscQueueAsyncError(fp, param, TSDB_CODE_TSC_DISCONNECTED);
......@@ -206,18 +207,16 @@ void taos_fetch_rows_a(TAOS_RES *taosa, __async_cb_func_t fp, void *param) {
// user-defined callback function is stored in fetchFp
pSql->fetchFp = fp;
pSql->fp = tscAsyncFetchRowsProxy;
pSql->fp = tscAsyncFetchRowsProxy;
pSql->param = param;
if (pRes->qId == 0) {
tscError("qhandle is NULL");
tscError("qhandle is invalid");
pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE;
pSql->param = param;
tscAsyncResultOnError(pSql);
return;
}
pSql->param = param;
tscResetForNextRetrieve(pRes);
// handle the sub queries of join query
......@@ -255,8 +254,9 @@ void taos_fetch_rows_a(TAOS_RES *taosa, __async_cb_func_t fp, void *param) {
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
}
tscProcessSql(pSql);
SQueryInfo* pQueryInfo1 = tscGetActiveQueryInfo(&pSql->cmd);
tscBuildAndSendRequest(pSql, pQueryInfo1);
}
}
......@@ -288,7 +288,7 @@ static void tscAsyncResultCallback(SSchedMsg *pMsg) {
}
assert(pSql->res.code != TSDB_CODE_SUCCESS);
tscError("%p invoke user specified function due to error occurred, code:%s", pSql, tstrerror(pSql->res.code));
tscError("0x%"PRIx64" async result callback, code:%s", pSql->self, tstrerror(pSql->res.code));
SSqlRes *pRes = &pSql->res;
if (pSql->fp == NULL || pSql->fetchFp == NULL){
......@@ -323,7 +323,7 @@ static int32_t updateMetaBeforeRetryQuery(SSqlObj* pSql, STableMetaInfo* pTableM
SSchema *pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
for (int32_t i = 0; i < numOfExprs; ++i) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
SSqlExpr *pExpr = &(tscSqlExprGet(pQueryInfo, i)->base);
pExpr->uid = pTableMetaInfo->pTableMeta->id.uid;
if (pExpr->colInfo.colIndex >= 0) {
......@@ -344,7 +344,7 @@ static int32_t updateMetaBeforeRetryQuery(SSqlObj* pSql, STableMetaInfo* pTableM
// validate the table columns information
for (int32_t i = 0; i < taosArrayGetSize(pQueryInfo->colList); ++i) {
SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i);
if (pCol->colIndex.columnIndex >= numOfCols) {
if (pCol->columnIndex >= numOfCols) {
return pSql->retryReason;
}
}
......@@ -368,13 +368,13 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
SSqlObj *sub = (SSqlObj*) res;
const char* msg = (sub->cmd.command == TSDB_SQL_STABLEVGROUP)? "vgroup-list":"table-meta";
if (code != TSDB_CODE_SUCCESS) {
tscError("%p get %s failed, code:%s", pSql, msg, tstrerror(code));
tscError("0x%"PRIx64" get %s failed, code:%s", pSql->self, msg, tstrerror(code));
goto _error;
}
tscDebug("0x%"PRIx64" get %s successfully", pSql->self, msg);
if (pSql->pStream == NULL) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
// check if it is a sub-query of super table query first, if true, enter another routine
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY|TSDB_QUERY_TYPE_SUBQUERY|TSDB_QUERY_TYPE_TAG_FILTER_QUERY))) {
......@@ -396,8 +396,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
goto _error;
}
// tscProcessSql can add error into async res
tscProcessSql(pSql);
// tscBuildAndSendRequest can add error into async res
tscBuildAndSendRequest(pSql, NULL);
taosReleaseRef(tscObjRef, pSql->self);
return;
} else { // continue to process normal async query
......@@ -428,9 +428,9 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
goto _error;
}
tscProcessSql(pSql);
tscBuildAndSendRequest(pSql, NULL);
} else { // in all other cases, simple retry
tscProcessSql(pSql);
tscBuildAndSendRequest(pSql, NULL);
}
taosReleaseRef(tscObjRef, pSql->self);
......@@ -447,21 +447,29 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
}
if (pCmd->insertType == TSDB_QUERY_TYPE_STMT_INSERT) {
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else {
assert(code == TSDB_CODE_SUCCESS);
assert(code == TSDB_CODE_SUCCESS);
}
(*pSql->fp)(pSql->param, pSql, code);
taosReleaseRef(tscObjRef, pSql->self);
return;
} else if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) {
if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) {
tscImportDataFromFile(pSql);
} else {
tscHandleMultivnodeInsert(pSql);
}
} else {
SQueryInfo* pQueryInfo1 = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
executeQuery(pSql, pQueryInfo1);
}
// proceed to invoke the tscDoQuery();
taosReleaseRef(tscObjRef, pSql->self);
return;
}
}
......@@ -498,7 +506,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
return;
}
tscDoQuery(pSql);
// tscDoQuery(pSql);
taosReleaseRef(tscObjRef, pSql->self);
......
......@@ -53,7 +53,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
SSqlRes *pRes = &pSql->res;
// one column for each row
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta;
......@@ -154,14 +154,14 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
pSql->cmd.numOfCols = numOfCols;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
pQueryInfo->order.order = TSDB_ORDER_ASC;
TAOS_FIELD f = {.type = TSDB_DATA_TYPE_BINARY, .bytes = (TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE};
tstrncpy(f.name, "Field", sizeof(f.name));
SInternalField* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY,
pInfo->pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY,
(TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE, -1000, (TSDB_COL_NAME_LEN - 1), false);
rowLen += ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE);
......@@ -171,7 +171,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
tstrncpy(f.name, "Type", sizeof(f.name));
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (int16_t)(typeColLength + VARSTR_HEADER_SIZE),
pInfo->pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (int16_t)(typeColLength + VARSTR_HEADER_SIZE),
-1000, typeColLength, false);
rowLen += typeColLength + VARSTR_HEADER_SIZE;
......@@ -181,7 +181,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
tstrncpy(f.name, "Length", sizeof(f.name));
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_INT, sizeof(int32_t),
pInfo->pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_INT, sizeof(int32_t),
-1000, sizeof(int32_t), false);
rowLen += sizeof(int32_t);
......@@ -191,7 +191,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
tstrncpy(f.name, "Note", sizeof(f.name));
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (int16_t)(noteColLength + VARSTR_HEADER_SIZE),
pInfo->pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (int16_t)(noteColLength + VARSTR_HEADER_SIZE),
-1000, noteColLength, false);
rowLen += noteColLength + VARSTR_HEADER_SIZE;
......@@ -199,7 +199,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
}
static int32_t tscProcessDescribeTable(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
assert(tscGetMetaInfo(pQueryInfo, 0)->pTableMeta != NULL);
......@@ -389,7 +389,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
SColumnIndex index = {0};
pSql->cmd.numOfCols = 2;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
pQueryInfo->order.order = TSDB_ORDER_ASC;
TAOS_FIELD f;
......@@ -404,7 +404,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
}
SInternalField* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, f.bytes, -1000, f.bytes - VARSTR_HEADER_SIZE, false);
pInfo->pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, f.bytes, -1000, f.bytes - VARSTR_HEADER_SIZE, false);
rowLen += f.bytes;
......@@ -417,7 +417,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
}
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY,
pInfo->pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY,
(int16_t)(ddlLen + VARSTR_HEADER_SIZE), -1000, ddlLen, false);
rowLen += ddlLen + VARSTR_HEADER_SIZE;
......@@ -427,7 +427,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
static int32_t tscSCreateSetValueToResObj(SSqlObj *pSql, int32_t rowLen, const char *tableName, const char *ddl) {
SSqlRes *pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
int32_t numOfRows = 1;
if (strlen(ddl) == 0) {
......@@ -444,7 +444,7 @@ static int32_t tscSCreateSetValueToResObj(SSqlObj *pSql, int32_t rowLen, const c
return 0;
}
static int32_t tscSCreateBuildResult(SSqlObj *pSql, BuildType type, const char *str, const char *result) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
int32_t rowLen = tscSCreateBuildResultFields(pSql, type, result);
tscFieldInfoUpdateOffset(pQueryInfo);
......@@ -552,7 +552,7 @@ static int32_t tscGetTableTagColumnName(SSqlObj *pSql, char **result) {
return TSDB_CODE_SUCCESS;
}
static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, char *ddl) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta;
......@@ -606,7 +606,7 @@ static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, ch
}
static int32_t tscRebuildDDLForNormalTable(SSqlObj *pSql, const char *tableName, char *ddl) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta;
......@@ -633,7 +633,7 @@ static int32_t tscRebuildDDLForNormalTable(SSqlObj *pSql, const char *tableName,
}
static int32_t tscRebuildDDLForSuperTable(SSqlObj *pSql, const char *tableName, char *ddl) {
char *result = ddl;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta;
......@@ -674,7 +674,7 @@ static int32_t tscRebuildDDLForSuperTable(SSqlObj *pSql, const char *tableName,
}
static int32_t tscProcessShowCreateTable(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert(pTableMetaInfo->pTableMeta != NULL);
......@@ -700,7 +700,7 @@ static int32_t tscProcessShowCreateTable(SSqlObj *pSql) {
}
static int32_t tscProcessShowCreateDatabase(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......@@ -727,7 +727,7 @@ static int32_t tscProcessShowCreateDatabase(SSqlObj *pSql) {
return TSDB_CODE_TSC_ACTION_IN_PROGRESS;
}
static int32_t tscProcessCurrentUser(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resBytes = TSDB_USER_LEN + TSDB_DATA_TYPE_BINARY;
......@@ -754,7 +754,7 @@ static int32_t tscProcessCurrentDB(SSqlObj *pSql) {
extractDBName(pSql->pTscObj->db, db);
pthread_mutex_unlock(&pSql->pTscObj->mutex);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, pSql->cmd.clauseIndex);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resType = TSDB_DATA_TYPE_BINARY;
......@@ -781,7 +781,7 @@ static int32_t tscProcessCurrentDB(SSqlObj *pSql) {
static int32_t tscProcessServerVer(SSqlObj *pSql) {
const char* v = pSql->pTscObj->sversion;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, pSql->cmd.clauseIndex);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resType = TSDB_DATA_TYPE_BINARY;
......@@ -804,7 +804,7 @@ static int32_t tscProcessServerVer(SSqlObj *pSql) {
}
static int32_t tscProcessClientVer(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resType = TSDB_DATA_TYPE_BINARY;
......@@ -856,7 +856,7 @@ static int32_t tscProcessServStatus(SSqlObj *pSql) {
return pSql->res.code;
}
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
int32_t val = 1;
......@@ -870,7 +870,7 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa
pCmd->numOfCols = 1;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
pQueryInfo->order.order = TSDB_ORDER_ASC;
tscFieldInfoClear(&pQueryInfo->fieldsInfo);
......@@ -882,7 +882,7 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa
tscInitResObjForLocalQuery(pSql, 1, (int32_t)valueLength);
SInternalField* pInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, 0);
pInfo->pSqlExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pInfo->pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
memcpy(pRes->data, val, pInfo->field.bytes);
}
......@@ -926,7 +926,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
pRes->code = tscProcessServStatus(pSql);
} else {
pRes->code = TSDB_CODE_TSC_INVALID_SQL;
tscError("%p not support command:%d", pSql, pCmd->command);
tscError("0x%"PRIx64" not support command:%d", pSql->self, pCmd->command);
}
// keep the code in local variable in order to avoid invalid read in case of async query
......
此差异已折叠。
......@@ -748,7 +748,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
const int32_t STABLE_INDEX = 1;
SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
char *sql = *sqlstr;
......@@ -829,6 +829,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
index = 0;
sToken = tStrGetToken(sql, &index, false);
if (sToken.type != TK_TAGS && sToken.type != TK_LP) {
tscDestroyBoundColumnInfo(&spd);
return tscInvalidSQLErrMsg(pCmd->payload, "keyword TAGS expected", sToken.z);
}
......@@ -841,6 +842,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
char* end = NULL;
code = parseBoundColumns(pCmd, &spd, pTagSchema, sql, &end);
if (code != TSDB_CODE_SUCCESS) {
tscDestroyBoundColumnInfo(&spd);
return code;
}
......@@ -858,11 +860,13 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
sql += index;
if (sToken.type != TK_LP) {
tscDestroyBoundColumnInfo(&spd);
return tscInvalidSQLErrMsg(pCmd->payload, "( is expected", sToken.z);
}
SKVRowBuilder kvRowBuilder = {0};
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
tscDestroyBoundColumnInfo(&spd);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
......@@ -875,6 +879,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
if (TK_ILLEGAL == sToken.type) {
tdDestroyKVRowBuilder(&kvRowBuilder);
tscDestroyBoundColumnInfo(&spd);
return TSDB_CODE_TSC_INVALID_SQL;
}
......@@ -892,6 +897,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
code = tsParseOneColumn(pSchema, &sToken, tagVal, pCmd->payload, &sql, false, tinfo.precision);
if (code != TSDB_CODE_SUCCESS) {
tdDestroyKVRowBuilder(&kvRowBuilder);
tscDestroyBoundColumnInfo(&spd);
return code;
}
......@@ -1065,7 +1071,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
int32_t totalNum = 0;
int32_t code = TSDB_CODE_SUCCESS;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
assert(pQueryInfo != NULL);
STableMetaInfo *pTableMetaInfo = (pQueryInfo->numOfTables == 0)? tscAddEmptyMetaInfo(pQueryInfo):tscGetMetaInfo(pQueryInfo, 0);
......@@ -1141,7 +1147,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
return code;
}
tscError("%p async insert parse error, code:%s", pSql, tstrerror(code));
tscError("0x%"PRIx64" async insert parse error, code:%s", pSql->self, tstrerror(code));
pCmd->curSql = NULL;
goto _clean;
}
......@@ -1285,7 +1291,7 @@ int tsInsertInitialCheck(SSqlObj *pSql) {
pCmd->count = 0;
pCmd->command = TSDB_SQL_INSERT;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfoS(pCmd, pCmd->clauseIndex);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT | pCmd->insertType);
......@@ -1375,7 +1381,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock
return code;
}
return tscProcessSql(pSql);
return tscBuildAndSendRequest(pSql, NULL);
}
typedef struct SImportFileSupport {
......@@ -1409,7 +1415,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
assert(pSql->res.numOfRows == 0);
int32_t ret = fseek(fp, 0, SEEK_SET);
if (ret < 0) {
tscError("%p failed to seek SEEK_SET since:%s", pSql, tstrerror(errno));
tscError("0x%"PRIx64" failed to seek SEEK_SET since:%s", pSql->self, tstrerror(errno));
code = TAOS_SYSTEM_ERROR(errno);
goto _error;
}
......@@ -1521,6 +1527,7 @@ void tscImportDataFromFile(SSqlObj *pSql) {
}
assert(pCmd->dataSourceType == DATA_FROM_DATA_FILE && strlen(pCmd->payload) != 0);
pCmd->active = pCmd->pQueryInfo[0];
SImportFileSupport *pSupporter = calloc(1, sizeof(SImportFileSupport));
SSqlObj *pNew = createSubqueryObj(pSql, 0, parseFileSendDataBlock, pSupporter, TSDB_SQL_INSERT, NULL);
......@@ -1529,7 +1536,7 @@ void tscImportDataFromFile(SSqlObj *pSql) {
FILE *fp = fopen(pCmd->payload, "rb");
if (fp == NULL) {
pSql->res.code = TAOS_SYSTEM_ERROR(errno);
tscError("%p failed to open file %s to load data from file, code:%s", pSql, pCmd->payload, tstrerror(pSql->res.code));
tscError("0x%"PRIx64" failed to open file %s to load data from file, code:%s", pSql->self, pCmd->payload, tstrerror(pSql->res.code));
tfree(pSupporter);
taos_free_result(pNew);
......
......@@ -815,7 +815,7 @@ static int insertStmtExecute(STscStmt* stmt) {
pRes->numOfRows = 0;
pRes->numOfTotal = 0;
tscProcessSql(pSql);
tscBuildAndSendRequest(pSql, NULL);
// wait for the callback function to post the semaphore
tsem_wait(&pSql->rspSem);
......
......@@ -104,7 +104,7 @@ void tscSaveSlowQuery(SSqlObj *pSql) {
char *sql = malloc(sqlSize);
if (sql == NULL) {
tscError("%p failed to allocate memory to sent slow query to dnode", pSql);
tscError("0x%"PRIx64" failed to allocate memory to sent slow query to dnode", pSql->self);
return;
}
......@@ -249,8 +249,8 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
pQdesc->stime = htobe64(pSql->stime);
pQdesc->queryId = htonl(pSql->queryId);
//pQdesc->useconds = htobe64(pSql->res.useconds);
pQdesc->useconds = htobe64(now - pSql->stime); // use local time instead of sever rsp elapsed time
pQdesc->qHandle = htobe64(pSql->res.qId);
pQdesc->useconds = htobe64(now - pSql->stime);
pQdesc->qId = htobe64(pSql->res.qId);
pHeartbeat->numOfQueries++;
pQdesc++;
......
此差异已折叠。
此差异已折叠。
......@@ -191,7 +191,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
pSql->fp = syncConnCallback;
pSql->param = pSql;
tscProcessSql(pSql);
tscBuildAndSendRequest(pSql, NULL);
tsem_wait(&pSql->rspSem);
if (pSql->res.code != TSDB_CODE_SUCCESS) {
......@@ -265,7 +265,7 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port,
if (taos) *taos = pObj;
pSql->fetchFp = fp;
pSql->res.code = tscProcessSql(pSql);
pSql->res.code = tscBuildAndSendRequest(pSql, NULL);
tscDebug("%p DB async connection is opening", taos);
return pObj;
}
......@@ -373,7 +373,7 @@ int taos_num_fields(TAOS_RES *res) {
if (pSql == NULL || pSql->signature != pSql) return 0;
int32_t num = 0;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
if (pQueryInfo == NULL) {
return num;
}
......@@ -408,7 +408,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
SSqlRes *pRes = &pSql->res;
if (pSql == NULL || pSql->signature != pSql) return 0;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
if (pQueryInfo == NULL) {
return NULL;
}
......@@ -559,7 +559,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) {
return true;
}
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
if ((pQueryInfo == NULL) || tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
return true;
......@@ -578,7 +578,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
tscDebug("0x%"PRIx64" send msg to dnode to free qhandle ASAP before free sqlObj, command:%s", pSql->self, sqlCmd[pCmd->command]);
tscProcessSql(pSql);
tscBuildAndSendRequest(pSql, NULL);
return false;
}
......@@ -588,7 +588,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) {
void taos_free_result(TAOS_RES *res) {
SSqlObj* pSql = (SSqlObj*) res;
if (pSql == NULL || pSql->signature != pSql) {
tscError("%p already released sqlObj", res);
tscError("0x%"PRIx64" already released sqlObj", pSql ? pSql->self : -1);
return;
}
......@@ -672,7 +672,7 @@ char *taos_get_client_info() { return version; }
static void tscKillSTableQuery(SSqlObj *pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
return;
......@@ -723,7 +723,7 @@ void taos_stop_query(TAOS_RES *res) {
// set the error code for master pSqlObj firstly
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
assert(pSql->rpcRid <= 0);
......@@ -753,7 +753,7 @@ bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
return true;
}
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
if (pQueryInfo == NULL) {
return true;
}
......@@ -881,15 +881,14 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
int32_t sqlLen = (int32_t)strlen(sql);
if (sqlLen > tsMaxSQLStringLen) {
tscError("%p sql too long", pSql);
tscError("0x%"PRIx64" sql too long", pSql->self);
tfree(pSql);
return TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
}
pSql->sqlstr = realloc(pSql->sqlstr, sqlLen + 1);
if (pSql->sqlstr == NULL) {
tscError("%p failed to malloc sql string buffer", pSql);
tscDebug("0x%"PRIx64" Valid SQL result:%d, %s pObj:%p", pSql->self, pRes->code, taos_errstr(pSql), pObj);
tscError("0x%"PRIx64" failed to malloc sql string buffer", pSql->self);
tfree(pSql);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
......@@ -914,7 +913,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
}
if (code != TSDB_CODE_SUCCESS) {
tscDebug("0x%"PRIx64" Valid SQL result:%d, %s pObj:%p", pSql->self, code, taos_errstr(pSql), pObj);
tscError("0x%"PRIx64" invalid SQL result:%d, %s pObj:%p", pSql->self, code, taos_errstr(pSql), pObj);
}
taos_free_result(pSql);
......@@ -933,7 +932,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t
int code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
char *str = (char *)tblNameList;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfoS(pCmd, pCmd->clauseIndex);
if (pQueryInfo == NULL) {
pSql->res.code = terrno;
return terrno;
......@@ -1031,14 +1030,14 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
int32_t tblListLen = (int32_t)strlen(tableNameList);
if (tblListLen > MAX_TABLE_NAME_LENGTH) {
tscError("%p tableNameList too long, length:%d, maximum allowed:%d", pSql, tblListLen, MAX_TABLE_NAME_LENGTH);
tscError("0x%"PRIx64" tableNameList too long, length:%d, maximum allowed:%d", pSql->self, tblListLen, MAX_TABLE_NAME_LENGTH);
tscFreeSqlObj(pSql);
return TSDB_CODE_TSC_INVALID_SQL;
}
char *str = calloc(1, tblListLen + 1);
if (str == NULL) {
tscError("%p failed to malloc sql string buffer", pSql);
tscError("0x%"PRIx64" failed to malloc sql string buffer", pSql->self);
tscFreeSqlObj(pSql);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
......@@ -1048,7 +1047,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
/*
* set the qhandle to 0 before return in order to erase the qhandle value assigned in the previous successful query.
* If qhandle is NOT set 0, the function of taos_free_result() will send message to server by calling tscProcessSql()
* If qhandle is NOT set 0, the function of taos_free_result() will send message to server by calling tscBuildAndSendRequest()
* to free connection, which may cause segment fault, when the parse phrase is not even successfully executed.
*/
pRes->qId = 0;
......@@ -1061,7 +1060,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
tscDoQuery(pSql);
tscDebug("0x%"PRIx64" load multi table meta result:%d %s pObj:%p", pSql->self, pRes->code, taos_errstr(pSql), pObj);
tscDebug("0x%"PRIx64" load multi-table meta result:%d %s pObj:%p", pSql->self, pRes->code, taos_errstr(pSql), pObj);
if ((code = pRes->code) != TSDB_CODE_SUCCESS) {
tscFreeSqlObj(pSql);
}
......
......@@ -37,8 +37,8 @@ static int64_t getDelayValueAfterTimewindowClosed(SSqlStream* pStream, int64_t l
static bool isProjectStream(SQueryInfo* pQueryInfo) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId != TSDB_FUNC_PRJ) {
SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->base.functionId != TSDB_FUNC_PRJ) {
return false;
}
}
......@@ -89,7 +89,7 @@ static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) {
return;
}
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
......@@ -102,7 +102,7 @@ static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) {
}
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) && (pTableMetaInfo->pVgroupTables == NULL) && (pTableMetaInfo->vgroupList == NULL || pTableMetaInfo->vgroupList->numOfVgroups <= 0)) {
tscDebug("%p empty vgroup list", pSql);
tscDebug("0x%"PRIx64" empty vgroup list", pSql->self);
pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList);
code = TSDB_CODE_TSC_APP_ERROR;
}
......@@ -110,13 +110,13 @@ static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) {
// failed to get table Meta or vgroup list, retry in 10sec.
if (code == TSDB_CODE_SUCCESS) {
tscTansformFuncForSTableQuery(pQueryInfo);
tscDebug("0x%"PRIx64" stream:%p, start stream query on:%s", pSql->self, pStream, tNameGetTableName(&pTableMetaInfo->name));
tscDebug("0x%"PRIx64" stream:%p started to query table:%s", pSql->self, pStream, tNameGetTableName(&pTableMetaInfo->name));
pQueryInfo->command = TSDB_SQL_SELECT;
pSql->fp = tscProcessStreamQueryCallback;
pSql->fetchFp = tscProcessStreamQueryCallback;
tscDoQuery(pSql);
executeQuery(pSql, pQueryInfo);
tscIncStreamExecutionCount(pStream);
} else {
setRetryInfo(pStream, code);
......@@ -138,8 +138,8 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
pStream->numOfRes = 0; // reset the numOfRes.
SSqlObj *pSql = pStream->pSql;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
tscDebug("0x%"PRIx64" add into timer", pSql->self);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
tscDebug("0x%"PRIx64" timer launch query", pSql->self);
if (pStream->isProject) {
/*
......@@ -194,8 +194,8 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
SSqlStream *pStream = (SSqlStream *)param;
if (tres == NULL || numOfRows < 0) {
int64_t retryDelay = tscGetRetryDelayTime(pStream, pStream->interval.sliding, pStream->precision);
tscError("%p stream:%p, query data failed, code:0x%08x, retry in %" PRId64 "ms", pStream->pSql, pStream, numOfRows,
retryDelay);
tscError("0x%"PRIx64" stream:%p, query data failed, code:0x%08x, retry in %" PRId64 "ms", pStream->pSql->self,
pStream, numOfRows, retryDelay);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0);
......@@ -203,6 +203,14 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
tNameExtractFullName(&pTableMetaInfo->name, name);
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
tfree(pTableMetaInfo->pTableMeta);
tscFreeSqlResult(pStream->pSql);
tscFreeSubobj(pStream->pSql);
tfree(pStream->pSql->pSubs);
pStream->pSql->subState.numOfSub = 0;
pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList);
tscSetRetryTimer(pStream, pStream->pSql, retryDelay);
......@@ -216,7 +224,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
static void tscStreamFillTimeGap(SSqlStream* pStream, TSKEY ts) {
#if 0
SSqlObj * pSql = pStream->pSql;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
if (pQueryInfo->fillType != TSDB_FILL_SET_VALUE && pQueryInfo->fillType != TSDB_FILL_NULL) {
return;
......@@ -259,13 +267,14 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
if (pSql == NULL || numOfRows < 0) {
int64_t retryDelayTime = tscGetRetryDelayTime(pStream, pStream->interval.sliding, pStream->precision);
tscError("%p stream:%p, retrieve data failed, code:0x%08x, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retryDelayTime);
tscError("0x%"PRIx64" stream:%p, retrieve data failed, code:0x%08x, retry in %" PRId64 " ms", pSql->self, pStream, numOfRows, retryDelayTime);
tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime);
return;
}
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
if (numOfRows > 0) { // when reaching here the first execution of stream computing is successful.
for(int32_t i = 0; i < numOfRows; ++i) {
......@@ -292,7 +301,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
/* no resuls in the query range, retry */
// todo set retry dynamic time
int32_t retry = tsProjectExecInterval;
tscError("%p stream:%p, retrieve no data, code:0x%08x, retry in %" PRId32 "ms", pSql, pStream, numOfRows, retry);
tscError("0x%"PRIx64" stream:%p, retrieve no data, code:0x%08x, retry in %" PRId32 "ms", pSql->self, pStream, numOfRows, retry);
tscSetRetryTimer(pStream, pStream->pSql, retry);
return;
......@@ -305,6 +314,10 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
pStream->numOfRes);
tfree(pTableMetaInfo->pTableMeta);
if (pQueryInfo->pQInfo != NULL) {
qDestroyQueryInfo(pQueryInfo->pQInfo);
pQueryInfo->pQInfo = NULL;
}
tscFreeSqlResult(pSql);
tscFreeSubobj(pSql);
......@@ -337,10 +350,10 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer)
return;
}
tscDebug("0x%"PRIx64" stream:%p, next start at %" PRId64 ", in %" PRId64 "ms. delay:%" PRId64 "ms qrange %" PRId64 "-%" PRId64, pStream->pSql->self, pStream,
tscDebug("0x%"PRIx64" stream:%p, next start at %" PRId64 "(ts window ekey), in %" PRId64 " ms. delay:%" PRId64 "ms qrange %" PRId64 "-%" PRId64, pStream->pSql->self, pStream,
now + timer, timer, delay, pStream->stime, etime);
} else {
tscDebug("0x%"PRIx64" stream:%p, next start at %" PRId64 ", in %" PRId64 "ms. delay:%" PRId64 "ms qrange %" PRId64 "-%" PRId64, pStream->pSql->self, pStream,
tscDebug("0x%"PRIx64" stream:%p, next start at %" PRId64 "(ts window ekey), in %" PRId64 " ms. delay:%" PRId64 "ms qrange %" PRId64 "-%" PRId64, pStream->pSql->self, pStream,
pStream->stime, timer, delay, pStream->stime - pStream->interval.interval, pStream->stime - 1);
}
......@@ -398,7 +411,6 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
}
} else {
int64_t stime = taosTimeTruncate(pStream->stime - 1, &pStream->interval, pStream->precision);
//int64_t stime = taosGetIntervalStartTimestamp(pStream->stime - 1, pStream->interval.interval, pStream->interval.interval, pStream->interval.intervalUnit, pStream->precision);
if (stime >= pStream->etime) {
tscDebug("0x%"PRIx64" stream:%p, stime:%" PRId64 " is larger than end time: %" PRId64 ", stop the stream", pStream->pSql->self, pStream,
pStream->stime, pStream->etime);
......@@ -432,7 +444,7 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
int64_t minIntervalTime =
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinIntervalTime * 1000L : tsMinIntervalTime;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
if (!pStream->isProject && pQueryInfo->interval.interval == 0) {
sprintf(pSql->cmd.payload, "the interval value is 0");
......@@ -440,7 +452,7 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
}
if (pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit!= 'y' && pQueryInfo->interval.interval < minIntervalTime) {
tscWarn("%p stream:%p, original sample interval:%" PRId64 " too small, reset to:%" PRId64, pSql, pStream,
tscWarn("0x%"PRIx64" stream:%p, original sample interval:%" PRId64 " too small, reset to:%" PRId64, pSql->self, pStream,
(int64_t)pQueryInfo->interval.interval, minIntervalTime);
pQueryInfo->interval.interval = minIntervalTime;
}
......@@ -457,14 +469,14 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinSlidingTime * 1000L : tsMinSlidingTime;
if (pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit!= 'y' && pQueryInfo->interval.sliding < minSlidingTime) {
tscWarn("%p stream:%p, original sliding value:%" PRId64 " too small, reset to:%" PRId64, pSql, pStream,
tscWarn("0x%"PRIx64" stream:%p, original sliding value:%" PRId64 " too small, reset to:%" PRId64, pSql->self, pStream,
pQueryInfo->interval.sliding, minSlidingTime);
pQueryInfo->interval.sliding = minSlidingTime;
}
if (pQueryInfo->interval.sliding > pQueryInfo->interval.interval) {
tscWarn("%p stream:%p, sliding value:%" PRId64 " can not be larger than interval range, reset to:%" PRId64, pSql, pStream,
tscWarn("0x%"PRIx64" stream:%p, sliding value:%" PRId64 " can not be larger than interval range, reset to:%" PRId64, pSql->self, pStream,
pQueryInfo->interval.sliding, pQueryInfo->interval.interval);
pQueryInfo->interval.sliding = pQueryInfo->interval.interval;
......@@ -482,7 +494,7 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
}
static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, int64_t stime) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
if (pStream->isProject) {
// no data in table, flush all data till now to destination meter, 10sec delay
......@@ -502,12 +514,12 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in
} else {
return stime;
}
stime = taosTimeTruncate(stime, &pStream->interval, pStream->precision);
} else {
int64_t newStime = taosTimeTruncate(stime, &pStream->interval, pStream->precision);
if (newStime != stime) {
tscWarn("%p stream:%p, last timestamp:%" PRId64 ", reset to:%" PRId64, pSql, pStream, stime, newStime);
tscWarn("0x%"PRIx64" stream:%p, last timestamp:%" PRId64 ", reset to:%" PRId64, pSql->self, pStream, stime, newStime);
stime = newStime;
}
}
......@@ -538,13 +550,13 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
if (code != TSDB_CODE_SUCCESS) {
pSql->res.code = code;
tscError("%p open stream failed, sql:%s, reason:%s, code:%s", pSql, pSql->sqlstr, pCmd->payload, tstrerror(code));
tscError("0x%"PRIx64" open stream failed, sql:%s, reason:%s, code:%s", pSql->self, pSql->sqlstr, pCmd->payload, tstrerror(code));
pStream->fp(pStream->param, NULL, NULL);
return;
}
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, 0);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
......@@ -557,7 +569,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
if (tscSetSlidingWindowInfo(pSql, pStream) != TSDB_CODE_SUCCESS) {
pSql->res.code = code;
tscError("%p stream %p open failed, since the interval value is incorrect", pSql, pStream);
tscError("0x%"PRIx64" stream %p open failed, since the interval value is incorrect", pSql->self, pStream);
pStream->fp(pStream->param, NULL, NULL);
return;
}
......@@ -597,7 +609,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
SSqlStream *pStream = (SSqlStream *)calloc(1, sizeof(SSqlStream));
if (pStream == NULL) {
tscError("%p open stream failed, sql:%s, reason:%s, code:0x%08x", pSql, sqlstr, pCmd->payload, pRes->code);
tscError("0x%"PRIx64" open stream failed, sql:%s, reason:%s, code:0x%08x", pSql->self, sqlstr, pCmd->payload, pRes->code);
tscFreeSqlObj(pSql);
return NULL;
}
......@@ -613,26 +625,26 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
pSql->sqlstr = calloc(1, strlen(sqlstr) + 1);
if (pSql->sqlstr == NULL) {
tscError("%p failed to malloc sql string buffer", pSql);
tscError("0x%"PRIx64" failed to malloc sql string buffer", pSql->self);
tscFreeSqlObj(pSql);
return NULL;
}
strtolower(pSql->sqlstr, sqlstr);
tscDebugL("%p SQL: %s", pSql, pSql->sqlstr);
registerSqlObj(pSql);
tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr);
tsem_init(&pSql->rspSem, 0, 0);
pSql->fp = tscCreateStream;
pSql->fetchFp = tscCreateStream;
registerSqlObj(pSql);
int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_SUCCESS) {
tscCreateStream(pStream, pSql, code);
} else if (code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
tscError("%p open stream failed, sql:%s, code:%s", pSql, sqlstr, tstrerror(code));
tscError("0x%"PRIx64" open stream failed, sql:%s, code:%s", pSql->self, sqlstr, tstrerror(code));
taosReleaseRef(tscObjRef, pSql->self);
free(pStream);
return NULL;
......
......@@ -224,11 +224,11 @@ static SArray* getTableList( SSqlObj* pSql ) {
SSqlObj* pNew = taos_query(pSql->pTscObj, sql);
if (pNew == NULL) {
tscError("failed to retrieve table id: cannot create new sql object.");
tscError("0x%"PRIx64"failed to retrieve table id: cannot create new sql object.", pSql->self);
return NULL;
} else if (taos_errno(pNew) != TSDB_CODE_SUCCESS) {
tscError("failed to retrieve table id: %s", tstrerror(taos_errno(pNew)));
tscError("0x%"PRIx64"failed to retrieve table id,error: %s", pSql->self, tstrerror(taos_errno(pNew)));
return NULL;
}
......@@ -284,7 +284,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
}
size_t numOfTables = taosArrayGetSize(tables);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, 0);
SArray* progress = taosArrayInit(numOfTables, sizeof(SSubscriptionProgress));
for( size_t i = 0; i < numOfTables; i++ ) {
STidTags* tt = taosArrayGet( tables, i );
......@@ -304,7 +304,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
}
taosArrayDestroy(tables);
TSDB_QUERY_SET_TYPE(tscGetQueryInfoDetail(pCmd, 0)->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
TSDB_QUERY_SET_TYPE(tscGetQueryInfo(pCmd, 0)->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
return 1;
}
......@@ -487,11 +487,13 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
if (pSql == NULL) {
return NULL;
}
if (pSub->pSql->self != 0) {
taosReleaseRef(tscObjRef, pSub->pSql->self);
} else {
tscFreeSqlObj(pSub->pSql);
}
pSub->pSql = pSql;
pSql->pSubscription = pSub;
}
......@@ -502,7 +504,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
if (taosArrayGetSize(pSub->progress) > 0) { // fix crash in single table subscription
size_t size = taosArrayGetSize(pSub->progress);
......@@ -555,7 +557,10 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
pSql->fp = asyncCallback;
pSql->fetchFp = asyncCallback;
pSql->param = pSub;
tscDoQuery(pSql);
pSql->cmd.active = pQueryInfo;
executeQuery(pSql, pQueryInfo);
tsem_wait(&pSub->sem);
if (pRes->code != TSDB_CODE_SUCCESS) {
......
此差异已折叠。
此差异已折叠。
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TAST_H
#define TDENGINE_TAST_H
#ifndef TDENGINE_TEXPR_H
#define TDENGINE_TEXPR_H
#ifdef __cplusplus
extern "C" {
......@@ -62,32 +62,32 @@ typedef struct tExprNode {
uint8_t nodeType;
union {
struct {
uint8_t optr; // filter operator
uint8_t hasPK; // 0: do not contain primary filter, 1: contain
void * info; // support filter operation on this expression only available for leaf node
uint8_t optr; // filter operator
uint8_t hasPK; // 0: do not contain primary filter, 1: contain
void *info; // support filter operation on this expression only available for leaf node
struct tExprNode *pLeft; // left child pointer
struct tExprNode *pRight; // right child pointer
} _node;
struct SSchema *pSchema;
tVariant * pVal;
struct SSchema *pSchema;
tVariant *pVal;
};
} tExprNode;
typedef struct SExprTraverseSupp {
__result_filter_fn_t nodeFilterFn;
__do_filter_suppl_fn_t setupInfoFn;
void * pExtInfo;
void *pExtInfo;
} SExprTraverseSupp;
void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *));
void exprTreeToBinary(SBufferWriter* bw, tExprNode* pExprTree);
tExprNode* exprTreeFromBinary(const void* data, size_t size);
tExprNode* exprTreeFromTableName(const char* tbnameCond);
tExprNode* exprdup(tExprNode* pTree);
void exprTreeToBinary(SBufferWriter* bw, tExprNode* pExprTree);
bool exprTreeApplayFilter(tExprNode *pExpr, const void *pItem, SExprTraverseSupp *param);
bool exprTreeApplyFilter(tExprNode *pExpr, const void *pItem, SExprTraverseSupp *param);
typedef void (*_arithmetic_operator_fn_t)(void *left, int32_t numLeft, int32_t leftType, void *right, int32_t numRight,
int32_t rightType, void *output, int32_t order);
......@@ -99,4 +99,4 @@ void arithmeticTreeTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput,
}
#endif
#endif // TDENGINE_TAST_H
#endif // TDENGINE_TEXPR_H
......@@ -41,6 +41,35 @@ typedef struct SResPair {
double avg;
} SResPair;
// the structure for sql function in select clause
typedef struct SSqlExpr {
char aliasName[TSDB_COL_NAME_LEN]; // as aliasName
SColIndex colInfo;
uint64_t uid; // refactor use the pointer
int16_t functionId; // function id in aAgg array
int16_t resType; // return value type
int16_t resBytes; // length of return value
int32_t interBytes; // inter result buffer size
int16_t colType; // table column type
int16_t colBytes; // table column bytes
int16_t numOfParams; // argument value of each function
tVariant param[3]; // parameters are not more than 3
int32_t offset; // sub result column value of arithmetic expression.
int16_t resColId; // result column id
SColumnFilterList flist;
} SSqlExpr;
typedef struct SExprInfo {
SSqlExpr base;
struct tExprNode *pExpr;
} SExprInfo;
#define TSDB_DB_NAME_T 1
#define TSDB_TABLE_NAME_T 2
......
......@@ -15,6 +15,7 @@
#include "os.h"
#include "texpr.h"
#include "exception.h"
#include "taosdef.h"
#include "taosmsg.h"
......@@ -145,25 +146,25 @@ static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) {
*pExpr = NULL;
}
bool exprTreeApplayFilter(tExprNode *pExpr, const void *pItem, SExprTraverseSupp *param) {
bool exprTreeApplyFilter(tExprNode *pExpr, const void *pItem, SExprTraverseSupp *param) {
tExprNode *pLeft = pExpr->_node.pLeft;
tExprNode *pRight = pExpr->_node.pRight;
//non-leaf nodes, recursively traverse the expression tree in the post-root order
if (pLeft->nodeType == TSQL_NODE_EXPR && pRight->nodeType == TSQL_NODE_EXPR) {
if (pExpr->_node.optr == TSDB_RELATION_OR) { // or
if (exprTreeApplayFilter(pLeft, pItem, param)) {
if (exprTreeApplyFilter(pLeft, pItem, param)) {
return true;
}
// left child does not satisfy the query condition, try right child
return exprTreeApplayFilter(pRight, pItem, param);
return exprTreeApplyFilter(pRight, pItem, param);
} else { // and
if (!exprTreeApplayFilter(pLeft, pItem, param)) {
if (!exprTreeApplyFilter(pLeft, pItem, param)) {
return false;
}
return exprTreeApplayFilter(pRight, pItem, param);
return exprTreeApplyFilter(pRight, pItem, param);
}
}
......@@ -463,3 +464,28 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) {
CLEANUP_EXECUTE_TO(anchor, false);
return expr;
}
tExprNode* exprdup(tExprNode* pTree) {
if (pTree == NULL) {
return NULL;
}
tExprNode* pNode = calloc(1, sizeof(tExprNode));
if (pTree->nodeType == TSQL_NODE_EXPR) {
tExprNode* pLeft = exprdup(pTree->_node.pLeft);
tExprNode* pRight = exprdup(pTree->_node.pRight);
pNode->nodeType = TSQL_NODE_EXPR;
pNode->_node.pLeft = pLeft;
pNode->_node.pRight = pRight;
} else if (pTree->nodeType == TSQL_NODE_VALUE) {
pNode->pVal = calloc(1, sizeof(tVariant));
tVariantAssign(pNode->pVal, pTree->pVal);
} else if (pTree->nodeType == TSQL_NODE_COL) {
pNode->pSchema = calloc(1, sizeof(SSchema));
*pNode->pSchema = *pTree->pSchema;
}
return pNode;
}
......@@ -68,6 +68,7 @@ bool tscValidateTableNameLength(size_t len) {
return len < TSDB_TABLE_NAME_LEN;
}
// TODO refactor
SColumnFilterInfo* tFilterInfoDup(const SColumnFilterInfo* src, int32_t numOfFilters) {
if (numOfFilters == 0) {
assert(src == NULL);
......
......@@ -48,6 +48,13 @@ void tVariantCreate(tVariant *pVar, SStrToken *token) {
case TSDB_DATA_TYPE_INT:{
ret = tStrToInteger(token->z, token->type, token->n, &pVar->i64, true);
if (ret != 0) {
SStrToken t = {0};
tSQLGetToken(token->z, &t.type);
if (t.type == TK_MINUS) { // it is a signed number which is greater than INT64_MAX or less than INT64_MIN
pVar->nType = -1; // -1 means error type
return;
}
// data overflow, try unsigned parse the input number
ret = tStrToInteger(token->z, token->type, token->n, &pVar->i64, false);
if (ret != 0) {
......
......@@ -243,8 +243,9 @@ do { \
#define TSDB_MAX_REPLICA 5
#define TSDB_TBNAME_COLUMN_INDEX (-1)
#define TSDB_BLOCK_DIST_COLUMN_INDEX (-2)
#define TSDB_UD_COLUMN_INDEX (-100)
#define TSDB_BLOCK_DIST_COLUMN_INDEX (-2)
#define TSDB_UD_COLUMN_INDEX (-1000)
#define TSDB_RES_COL_ID (-5000)
#define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta
......@@ -388,9 +389,10 @@ typedef enum {
typedef enum {
TSDB_SUPER_TABLE = 0, // super table
TSDB_CHILD_TABLE = 1, // table created from super table
TSDB_NORMAL_TABLE = 2, // ordinary table
TSDB_STREAM_TABLE = 3, // table created from stream computing
TSDB_TABLE_MAX = 4
TSDB_NORMAL_TABLE = 2, // ordinary table
TSDB_STREAM_TABLE = 3, // table created from stream computing
TSDB_TEMP_TABLE = 4, // temp table created by nest query
TSDB_TABLE_MAX = 5
} ETableType;
typedef enum {
......
......@@ -399,7 +399,6 @@ typedef struct SColIndex {
char name[TSDB_COL_NAME_LEN]; // TODO remove it
} SColIndex;
typedef struct SColumnFilterInfo {
int16_t lowerRelOptr;
int16_t upperRelOptr;
......@@ -421,42 +420,13 @@ typedef struct SColumnFilterInfo {
};
} SColumnFilterInfo;
/* sql function msg, to describe the message to vnode about sql function
* operations in select clause */
typedef struct SSqlFuncMsg {
int16_t functionId;
int16_t numOfParams;
int16_t resColId; // result column id, id of the current output column
int16_t colType;
int16_t colBytes;
SColIndex colInfo;
struct ArgElem {
int16_t argType;
int16_t argBytes;
union {
double d;
int64_t i64;
char * pz;
} argValue;
} arg[3];
int32_t filterNum;
SColumnFilterInfo filterInfo[];
} SSqlFuncMsg;
typedef struct SExprInfo {
SColumnFilterInfo * pFilter;
struct tExprNode* pExpr;
int16_t bytes;
int16_t type;
int32_t interBytes;
int64_t uid;
SSqlFuncMsg base;
} SExprInfo;
typedef struct SColumnFilterList {
int16_t numOfFilters;
union{
int64_t placeholder;
SColumnFilterInfo *filterInfo;
};
} SColumnFilterList;
/*
* for client side struct, we only need the column id, type, bytes are not necessary
* But for data in vnode side, we need all the following information.
......@@ -465,11 +435,7 @@ typedef struct SColumnInfo {
int16_t colId;
int16_t type;
int16_t bytes;
int16_t numOfFilters;
union{
int64_t placeholder;
SColumnFilterInfo *filters;
};
SColumnFilterList flist;
} SColumnInfo;
typedef struct STableIdInfo {
......@@ -483,10 +449,29 @@ typedef struct STimeWindow {
TSKEY ekey;
} STimeWindow;
typedef struct {
int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed
int32_t tsLen; // total length of ts comp block
int32_t tsNumOfBlocks; // ts comp block numbers
int32_t tsOrder; // ts comp block order
} STsBufInfo;
typedef struct {
SMsgHead head;
char version[TSDB_VERSION_LEN];
bool stableQuery; // super table query or not
bool topBotQuery; // TODO used bitwise flag
bool groupbyColumn; // denote if this is a groupby normal column query
bool hasTagResults; // if there are tag values in final result or not
bool timeWindowInterpo;// if the time window start/end required interpolation
bool queryBlockDist; // if query data block distribution
bool stabledev; // super table stddev query
bool tsCompQuery; // is tscomp query
bool simpleAgg;
bool pointInterpQuery; // point interpolation query
bool needReverseScan; // need reverse scan
STimeWindow window;
int32_t numOfTables;
int16_t order;
......@@ -509,14 +494,13 @@ typedef struct {
int16_t fillType; // interpolate type
uint64_t fillVal; // default value array list
int32_t secondStageOutput;
int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed
int32_t tsLen; // total length of ts comp block
int32_t tsNumOfBlocks; // ts comp block numbers
int32_t tsOrder; // ts comp block order
STsBufInfo tsBuf; // tsBuf info
int32_t numOfTags; // number of tags columns involved
int32_t sqlstrLen; // sql query string
int32_t prevResultLen; // previous result length
SColumnInfo colList[];
int32_t numOfOperator;
int32_t tableScanOperator;// table scan operator. -1 means no scan operator
SColumnInfo tableCols[];
} SQueryTableMsg;
typedef struct {
......@@ -827,7 +811,7 @@ typedef struct {
uint32_t queryId;
int64_t useconds;
int64_t stime;
uint64_t qHandle;
uint64_t qId;
} SQueryDesc;
typedef struct {
......
......@@ -221,7 +221,7 @@ typedef struct {
typedef struct {
uint32_t numOfTables;
SArray * pGroupList;
SArray *pGroupList;
SHashObj *map; // speedup acquire the tableQueryInfo by table uid
} STableGroupInfo;
......
......@@ -205,13 +205,6 @@
#define TK_VALUES 186
#define TK_SPACE 300
#define TK_COMMENT 301
#define TK_ILLEGAL 302
......
......@@ -344,7 +344,7 @@ static int32_t mnodeGetQueryMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
pShow->bytes[cols] = 24;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "qhandle");
strcpy(pSchema[cols].name, "qid");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
......@@ -420,7 +420,7 @@ static int32_t mnodeRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, v
cols++;
char handleBuf[24] = {0};
snprintf(handleBuf, tListLen(handleBuf), "%p", (void*)htobe64(pDesc->qHandle));
snprintf(handleBuf, tListLen(handleBuf), "%"PRIu64, htobe64(pDesc->qId));
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, handleBuf, pShow->bytes[cols]);
......
......@@ -18,7 +18,7 @@ TEST(testCase, parse_time) {
deltaToUtcInitOnce();
// window: 1500000001000, 1500002000000
// pQuery->interval: interval: 86400000, sliding:3600000
// pQueryAttr->interval: interval: 86400000, sliding:3600000
int64_t key = 1500000001000;
SInterval interval = {0};
interval.interval = 86400000;
......
......@@ -122,7 +122,7 @@ enum {
#define QUERY_IS_FREE_RESOURCE(type) (((type)&TSDB_QUERY_TYPE_FREE_RESOURCE) != 0)
typedef struct SArithmeticSupport {
SExprInfo *pArithExpr;
SExprInfo *pExprInfo;
int32_t numOfCols;
SColumnInfo *colList;
void *exprList; // client side used
......@@ -210,9 +210,6 @@ typedef struct SAggFunctionInfo {
void (*xFunction)(SQLFunctionCtx *pCtx); // blocks version function
void (*xFunctionF)(SQLFunctionCtx *pCtx, int32_t position); // single-row function version, todo merge with blockwise function
// some sql function require scan data twice or more, e.g.,stddev, percentile
void (*xNextStep)(SQLFunctionCtx *pCtx);
// finalizer must be called after all xFunction has been executed to generated final result.
void (*xFinalize)(SQLFunctionCtx *pCtx);
void (*mergeFunc)(SQLFunctionCtx *pCtx);
......
此差异已折叠。
......@@ -237,6 +237,11 @@ int32_t compare_a(tOrderDescriptor *, int32_t numOfRow1, int32_t s1, char *data1
int32_t compare_d(tOrderDescriptor *, int32_t numOfRow1, int32_t s1, char *data1, int32_t numOfRow2, int32_t s2,
char *data2);
struct SSDataBlock;
int32_t compare_aRv(struct SSDataBlock* pBlock, SArray* colIndex, int32_t numOfCols, int32_t rowIndex, char** buffer, int32_t order);
int32_t columnValueAscendingComparator(char *f1, char *f2, int32_t type, int32_t bytes);
#ifdef __cplusplus
}
#endif
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_QPLAN_H
#define TDENGINE_QPLAN_H
//TODO refactor
SArray* createTableScanPlan(SQueryAttr* pQueryAttr);
SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr);
SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr);
#endif // TDENGINE_QPLAN_H
此差异已折叠。
......@@ -28,9 +28,9 @@
#define GET_QID(_r) (((SQInfo*)((_r)->qinfo))->qId)
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!(sq)))? (_q)->pExpr1[1].base.arg->argValue.i64:1)
#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!(sq)))? (_q)->pExpr1[1].base.param[0].i64:1)
int32_t getOutputInterResultBufSize(SQuery* pQuery);
int32_t getOutputInterResultBufSize(SQueryAttr* pQueryAttr);
size_t getResultRowSize(SQueryRuntimeEnv* pRuntimeEnv);
int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size, int16_t type);
......@@ -52,20 +52,11 @@ static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int
return pResultRowInfo->pResult[slot];
}
static FORCE_INLINE char* getPosInResultPage(SQueryRuntimeEnv* pRuntimeEnv, tFilePage* page, int32_t rowOffset,
int16_t offset, int32_t size) {
assert(rowOffset >= 0 && pRuntimeEnv != NULL);
static FORCE_INLINE char *getPosInResultPage(SQueryAttr *pQueryAttr, tFilePage* page, int32_t rowOffset, int16_t offset) {
assert(rowOffset >= 0 && pQueryAttr != NULL);
SQuery* pQuery = pRuntimeEnv->pQuery;
int64_t pageSize = pRuntimeEnv->pResultBuf->pageSize;
int32_t numOfRows = (int32_t)GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pQuery->topBotQuery, pQuery->stableQuery);
// buffer overflow check
int64_t bufEnd = (rowOffset + offset * numOfRows + size);
assert(page->num <= pageSize && bufEnd <= page->num);
return ((char*)page->data) + rowOffset + offset * numOfRows;
int32_t numOfRows = (int32_t)GET_ROW_PARAM_FOR_MULTIOUTPUT(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery);
return ((char *)page->data) + rowOffset + offset * numOfRows;
}
bool isNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type);
......
......@@ -450,16 +450,16 @@ tagitem(A) ::= PLUS(X) FLOAT(Y). {
}
//////////////////////// The SELECT statement /////////////////////////////////
%type select {SQuerySqlNode*}
%destructor select {destroyQuerySqlNode($$);}
%type select {SSqlNode*}
%destructor select {destroySqlNode($$);}
select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) session_option(H) fill_opt(F) sliding_opt(S) groupby_opt(P) orderby_opt(Z) having_opt(N) slimit_opt(G) limit_opt(L). {
A = tSetQuerySqlNode(&T, W, X, Y, P, Z, &K, &H, &S, F, &L, &G, N);
}
select(A) ::= LP select(B) RP. {A = B;}
%type union {SSubclauseInfo*}
%destructor union {destroyAllSelectClause($$);}
%type union {SArray*}
%destructor union {destroyAllSqlNode($$);}
union(Y) ::= select(X). { Y = setSubclause(NULL, X); }
union(Y) ::= union(Z) UNION ALL select(X). { Y = appendSelectClause(Z, X); }
......@@ -505,35 +505,30 @@ distinct(X) ::= DISTINCT(Y). { X = Y; }
distinct(X) ::= . { X.n = 0;}
// A complete FROM clause.
%type from {SFromInfo*}
%type from {SRelationInfo*}
%destructor from {destroyRelationInfo($$);}
from(A) ::= FROM tablelist(X). {A = X;}
from(A) ::= FROM LP union(Y) RP. {A = Y;}
from(A) ::= FROM LP union(Y) RP. {A = setSubquery(NULL, Y);}
%type tablelist {SArray*}
%type tablelist {SRelationInfo*}
%destructor tablelist {destroyRelationInfo($$);}
tablelist(A) ::= ids(X) cpxName(Y). {
toTSDBType(X.type);
X.n += Y.n;
A = setTableNameList(NULL, &X, NULL);
}
tablelist(A) ::= ids(X) cpxName(Y) ids(Z). {
toTSDBType(X.type);
toTSDBType(Z.type);
X.n += Y.n;
A = setTableNameList(NULL, &X, &Z);
}
tablelist(A) ::= tablelist(Y) COMMA ids(X) cpxName(Z). {
toTSDBType(X.type);
X.n += Z.n;
A = setTableNameList(Y, &X, NULL);
}
tablelist(A) ::= tablelist(Y) COMMA ids(X) cpxName(Z) ids(F). {
toTSDBType(X.type);
toTSDBType(F.type);
X.n += Z.n;
A = setTableNameList(Y, &X, &F);
}
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -239,6 +239,8 @@ python3 ./test.py -f stream/history.py
python3 ./test.py -f stream/sys.py
python3 ./test.py -f stream/table_1.py
python3 ./test.py -f stream/table_n.py
python3 ./test.py -f stream/showStreamExecTimeisNull.py
python3 ./test.py -f stream/cqSupportBefore1970.py
#alter table
python3 ./test.py -f alter/alter_table_crash.py
......
此差异已折叠。
此差异已折叠。
......@@ -129,8 +129,8 @@ if $rows != 3 then
return -1
endi
sql drop database $db
sql show databases
if $rows != 0 then
return -1
endi
#sql drop database $db
#sql show databases
#if $rows != 0 then
# return -1
#endi
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册