提交 c18dd031 编写于 作者: H Haojun Liao

[td-3299]

上级 6216f6c5
...@@ -59,7 +59,7 @@ typedef struct SJoinSupporter { ...@@ -59,7 +59,7 @@ typedef struct SJoinSupporter {
SArray* exprList; SArray* exprList;
SFieldInfo fieldsInfo; SFieldInfo fieldsInfo;
STagCond tagCond; STagCond tagCond;
SSqlGroupbyExpr groupInfo; // group by info SGroupbyExpr groupInfo; // group by info
struct STSBuf* pTSBuf; // the TSBuf struct that holds the compressed timestamp array struct STSBuf* pTSBuf; // the TSBuf struct that holds the compressed timestamp array
FILE* f; // temporary file in order to create TSBuf FILE* f; // temporary file in order to create TSBuf
char path[PATH_MAX]; // temporary file path, todo dynamic allocate memory char path[PATH_MAX]; // temporary file path, todo dynamic allocate memory
...@@ -92,11 +92,11 @@ typedef struct SVgroupTableInfo { ...@@ -92,11 +92,11 @@ typedef struct SVgroupTableInfo {
static FORCE_INLINE SQueryInfo* tscGetQueryInfo(SSqlCmd* pCmd, int32_t subClauseIndex) { static FORCE_INLINE SQueryInfo* tscGetQueryInfo(SSqlCmd* pCmd, int32_t subClauseIndex) {
assert(pCmd != NULL && subClauseIndex >= 0); assert(pCmd != NULL && subClauseIndex >= 0);
if (pCmd->pQueryInfo == NULL || subClauseIndex >= pCmd->numOfClause) { if (pCmd->pQueryInfo == NULL) {
return NULL; return NULL;
} }
return pCmd->pQueryInfo[subClauseIndex]; return pCmd->active;
} }
SQueryInfo* tscGetActiveQueryInfo(SSqlCmd* pCmd); SQueryInfo* tscGetActiveQueryInfo(SSqlCmd* pCmd);
...@@ -127,7 +127,7 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i ...@@ -127,7 +127,7 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i
*/ */
bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo); bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo);
bool tscIsTWAQuery(SQueryInfo* pQueryInfo); bool tscIsTWAQuery(SQueryInfo* pQueryInfo);
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo); bool tsIsArithmeticQueryOnAggResult(SQueryInfo* pQueryInfo);
bool tscGroupbyColumn(SQueryInfo* pQueryInfo); bool tscGroupbyColumn(SQueryInfo* pQueryInfo);
bool tscIsTopBotQuery(SQueryInfo* pQueryInfo); bool tscIsTopBotQuery(SQueryInfo* pQueryInfo);
bool hasTagValOutput(SQueryInfo* pQueryInfo); bool hasTagValOutput(SQueryInfo* pQueryInfo);
...@@ -136,13 +136,14 @@ bool isStabledev(SQueryInfo* pQueryInfo); ...@@ -136,13 +136,14 @@ bool isStabledev(SQueryInfo* pQueryInfo);
bool isTsCompQuery(SQueryInfo* pQueryInfo); bool isTsCompQuery(SQueryInfo* pQueryInfo);
bool isSimpleAggregate(SQueryInfo* pQueryInfo); bool isSimpleAggregate(SQueryInfo* pQueryInfo);
bool isBlockDistQuery(SQueryInfo* pQueryInfo); bool isBlockDistQuery(SQueryInfo* pQueryInfo);
int32_t tscGetTopbotQueryParam(SQueryInfo* pQueryInfo); bool isSimpleAggregateRv(SQueryInfo* pQueryInfo);
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex); bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex);
bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex); bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex); bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscIsProjectionQuery(SQueryInfo* pQueryInfo); bool tscIsProjectionQuery(SQueryInfo* pQueryInfo);
bool tscHasColumnFilter(SQueryInfo* pQueryInfo);
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex); bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryTags(SQueryInfo* pQueryInfo); bool tscQueryTags(SQueryInfo* pQueryInfo);
...@@ -175,26 +176,32 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo); ...@@ -175,26 +176,32 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo);
static FORCE_INLINE int32_t tscNumOfFields(SQueryInfo* pQueryInfo) { return pQueryInfo->fieldsInfo.numOfOutput; } static FORCE_INLINE int32_t tscNumOfFields(SQueryInfo* pQueryInfo) { return pQueryInfo->fieldsInfo.numOfOutput; }
int32_t tscFieldInfoCompare(const SFieldInfo* pFieldInfo1, const SFieldInfo* pFieldInfo2); int32_t tscFieldInfoCompare(const SFieldInfo* pFieldInfo1, const SFieldInfo* pFieldInfo2);
void tscInsertPrimaryTsSourceColumn(SQueryInfo* pQueryInfo, uint64_t uid);
void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes);
int32_t tscGetResRowLength(SArray* pExprList); int32_t tscGetResRowLength(SArray* pExprList);
SExprInfo* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type, SExprInfo* tscExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, bool isTagCol); int16_t size, int16_t resColId, int16_t interSize, bool isTagCol);
SExprInfo* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type, SExprInfo* tscExprCreate(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, int32_t colType);
void tscExprAddParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes);
SExprInfo* tscExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, bool isTagCol); int16_t size, int16_t resColId, int16_t interSize, bool isTagCol);
SExprInfo* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type, SExprInfo* tscExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type,
int16_t size); int16_t size);
size_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo);
void tscInsertPrimaryTsSourceColumn(SQueryInfo* pQueryInfo, uint64_t uid);
SExprInfo* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index); size_t tscNumOfExprs(SQueryInfo* pQueryInfo);
int32_t tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy); SExprInfo *tscExprGet(SQueryInfo* pQueryInfo, int32_t index);
void tscSqlExprAssign(SExprInfo* dst, const SExprInfo* src); int32_t tscExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy);
void tscSqlExprInfoDestroy(SArray* pExprInfo); void tscExprAssign(SExprInfo* dst, const SExprInfo* src);
void tscExprDestroy(SArray* pExprInfo);
int32_t createProjectionExpr(SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SExprInfo*** pExpr, int32_t* num);
SColumn* tscColumnClone(const SColumn* src); SColumn* tscColumnClone(const SColumn* src);
bool tscColumnExists(SArray* pColumnList, int32_t columnIndex, uint64_t uid); bool tscColumnExists(SArray* pColumnList, int32_t columnIndex, uint64_t uid);
...@@ -243,7 +250,7 @@ SArray* tscVgroupTableInfoDup(SArray* pVgroupTables); ...@@ -243,7 +250,7 @@ SArray* tscVgroupTableInfoDup(SArray* pVgroupTables);
void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index); void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index);
void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo); void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo);
int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex); int tscGetSTableVgroupInfo(SSqlObj* pSql, SQueryInfo* pQueryInfo);
int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo); int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo);
int tscGetTableMetaEx(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, bool createIfNotExists); int tscGetTableMetaEx(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, bool createIfNotExists);
......
...@@ -203,10 +203,11 @@ typedef struct SQueryInfo { ...@@ -203,10 +203,11 @@ typedef struct SQueryInfo {
SInterval interval; // tumble time window SInterval interval; // tumble time window
SSessionWindow sessionWindow; // session time window SSessionWindow sessionWindow; // session time window
SSqlGroupbyExpr groupbyExpr; // groupby tags info SGroupbyExpr groupbyExpr; // groupby tags info
SArray * colList; // SArray<SColumn*> SArray * colList; // SArray<SColumn*>
SFieldInfo fieldsInfo; SFieldInfo fieldsInfo;
SArray * exprList; // SArray<SExprInfo*> SArray * exprList; // SArray<SExprInfo*>
SArray * exprList1; // final exprlist in case of arithmetic expression exists
SLimitVal limit; SLimitVal limit;
SLimitVal slimit; SLimitVal slimit;
STagCond tagCond; STagCond tagCond;
...@@ -230,8 +231,6 @@ typedef struct SQueryInfo { ...@@ -230,8 +231,6 @@ typedef struct SQueryInfo {
int32_t bufLen; int32_t bufLen;
char* buf; char* buf;
SQInfo* pQInfo; // global merge operator SQInfo* pQInfo; // global merge operator
SArray* pDSOperator; // data source operator
SArray* pPhyOperator; // physical query execution plan
SQueryAttr* pQueryAttr; // query object SQueryAttr* pQueryAttr; // query object
struct SQueryInfo *sibling; // sibling struct SQueryInfo *sibling; // sibling
...@@ -244,6 +243,7 @@ typedef struct SQueryInfo { ...@@ -244,6 +243,7 @@ typedef struct SQueryInfo {
bool arithmeticOnAgg; bool arithmeticOnAgg;
bool projectionQuery; bool projectionQuery;
bool hasFilter; bool hasFilter;
bool onlyTagQuery;
} SQueryInfo; } SQueryInfo;
typedef struct { typedef struct {
...@@ -268,8 +268,8 @@ typedef struct { ...@@ -268,8 +268,8 @@ typedef struct {
char * payload; char * payload;
int32_t payloadLen; int32_t payloadLen;
SQueryInfo **pQueryInfo; SQueryInfo *pQueryInfo;
int32_t numOfClause; // int32_t numOfClause;
int32_t clauseIndex; // index of multiple subclause query int32_t clauseIndex; // index of multiple subclause query
SQueryInfo *active; // current active query info SQueryInfo *active; // current active query info
......
...@@ -127,7 +127,8 @@ static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) { ...@@ -127,7 +127,8 @@ static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) {
* all available virtual node has been checked already, now we need to check * all available virtual node has been checked already, now we need to check
* for the next subclause queries * for the next subclause queries
*/ */
if (pCmd->clauseIndex < pCmd->numOfClause - 1) { if (pCmd->active->sibling != NULL) {
pCmd->active = pCmd->active->sibling;
tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode); tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
return; return;
} }
...@@ -231,7 +232,8 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) { ...@@ -231,7 +232,8 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
* all available virtual nodes in current clause has been checked already, now try the * all available virtual nodes in current clause has been checked already, now try the
* next one in the following union subclause * next one in the following union subclause
*/ */
if (pCmd->clauseIndex < pCmd->numOfClause - 1) { if (pCmd->active->sibling != NULL) {
pCmd->active = pCmd->active->sibling; // todo refactor
tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode); tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
return; return;
} }
...@@ -317,13 +319,13 @@ static int32_t updateMetaBeforeRetryQuery(SSqlObj* pSql, STableMetaInfo* pTableM ...@@ -317,13 +319,13 @@ static int32_t updateMetaBeforeRetryQuery(SSqlObj* pSql, STableMetaInfo* pTableM
// update the pExpr info, colList info, number of table columns // update the pExpr info, colList info, number of table columns
// TODO Re-parse this sql and issue the corresponding subquery as an alternative for this case. // TODO Re-parse this sql and issue the corresponding subquery as an alternative for this case.
if (pSql->retryReason == TSDB_CODE_TDB_INVALID_TABLE_ID) { if (pSql->retryReason == TSDB_CODE_TDB_INVALID_TABLE_ID) {
int32_t numOfExprs = (int32_t) tscSqlExprNumOfExprs(pQueryInfo); int32_t numOfExprs = (int32_t) tscNumOfExprs(pQueryInfo);
int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta); int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
int32_t numOfTags = tscGetNumOfTags(pTableMetaInfo->pTableMeta); int32_t numOfTags = tscGetNumOfTags(pTableMetaInfo->pTableMeta);
SSchema *pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta); SSchema *pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
for (int32_t i = 0; i < numOfExprs; ++i) { for (int32_t i = 0; i < numOfExprs; ++i) {
SSqlExpr *pExpr = &(tscSqlExprGet(pQueryInfo, i)->base); SSqlExpr *pExpr = &(tscExprGet(pQueryInfo, i)->base);
pExpr->uid = pTableMetaInfo->pTableMeta->id.uid; pExpr->uid = pTableMetaInfo->pTableMeta->id.uid;
if (pExpr->colInfo.colIndex >= 0) { if (pExpr->colInfo.colIndex >= 0) {
...@@ -474,7 +476,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -474,7 +476,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
} }
} else { // stream computing } else { // stream computing
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); SQueryInfo* pQueryInfo = tscGetActiveQueryInfo(pCmd);
STableMetaInfo *pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
code = tscGetTableMeta(pSql, pTableMetaInfo); code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
...@@ -485,7 +488,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -485,7 +488,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
} }
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
code = tscGetSTableVgroupInfo(pSql, pCmd->clauseIndex); code = tscGetSTableVgroupInfo(pSql, pQueryInfo);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self); taosReleaseRef(tscObjRef, pSql->self);
return; return;
......
...@@ -161,7 +161,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols, ...@@ -161,7 +161,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
tstrncpy(f.name, "Field", sizeof(f.name)); tstrncpy(f.name, "Field", sizeof(f.name));
SInternalField* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); SInternalField* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, pInfo->pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY,
(TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE, -1000, (TSDB_COL_NAME_LEN - 1), false); (TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE, -1000, (TSDB_COL_NAME_LEN - 1), false);
rowLen += ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE); rowLen += ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE);
...@@ -171,7 +171,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols, ...@@ -171,7 +171,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
tstrncpy(f.name, "Type", sizeof(f.name)); tstrncpy(f.name, "Type", sizeof(f.name));
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (int16_t)(typeColLength + VARSTR_HEADER_SIZE), pInfo->pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (int16_t)(typeColLength + VARSTR_HEADER_SIZE),
-1000, typeColLength, false); -1000, typeColLength, false);
rowLen += typeColLength + VARSTR_HEADER_SIZE; rowLen += typeColLength + VARSTR_HEADER_SIZE;
...@@ -181,7 +181,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols, ...@@ -181,7 +181,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
tstrncpy(f.name, "Length", sizeof(f.name)); tstrncpy(f.name, "Length", sizeof(f.name));
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_INT, sizeof(int32_t), pInfo->pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_INT, sizeof(int32_t),
-1000, sizeof(int32_t), false); -1000, sizeof(int32_t), false);
rowLen += sizeof(int32_t); rowLen += sizeof(int32_t);
...@@ -191,7 +191,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols, ...@@ -191,7 +191,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
tstrncpy(f.name, "Note", sizeof(f.name)); tstrncpy(f.name, "Note", sizeof(f.name));
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (int16_t)(noteColLength + VARSTR_HEADER_SIZE), pInfo->pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (int16_t)(noteColLength + VARSTR_HEADER_SIZE),
-1000, noteColLength, false); -1000, noteColLength, false);
rowLen += noteColLength + VARSTR_HEADER_SIZE; rowLen += noteColLength + VARSTR_HEADER_SIZE;
...@@ -404,7 +404,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const ...@@ -404,7 +404,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
} }
SInternalField* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); SInternalField* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, f.bytes, -1000, f.bytes - VARSTR_HEADER_SIZE, false); pInfo->pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, f.bytes, -1000, f.bytes - VARSTR_HEADER_SIZE, false);
rowLen += f.bytes; rowLen += f.bytes;
...@@ -417,7 +417,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const ...@@ -417,7 +417,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
} }
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f); pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, pInfo->pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY,
(int16_t)(ddlLen + VARSTR_HEADER_SIZE), -1000, ddlLen, false); (int16_t)(ddlLen + VARSTR_HEADER_SIZE), -1000, ddlLen, false);
rowLen += ddlLen + VARSTR_HEADER_SIZE; rowLen += ddlLen + VARSTR_HEADER_SIZE;
......
...@@ -341,13 +341,13 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SQueryInfo* ...@@ -341,13 +341,13 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SQueryInfo*
if (numOfGroupByCols > 0) { if (numOfGroupByCols > 0) {
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) { if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) {
int32_t numOfInternalOutput = (int32_t) tscSqlExprNumOfExprs(pQueryInfo); int32_t numOfInternalOutput = (int32_t) tscNumOfExprs(pQueryInfo);
// the last "pQueryInfo->groupbyExpr.numOfGroupCols" columns are order-by columns // the last "pQueryInfo->groupbyExpr.numOfGroupCols" columns are order-by columns
for (int32_t i = 0; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) { for (int32_t i = 0; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) {
SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, i); SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, i);
for(int32_t j = 0; j < numOfInternalOutput; ++j) { for(int32_t j = 0; j < numOfInternalOutput; ++j) {
SExprInfo* pExprInfo = tscSqlExprGet(pQueryInfo, j); SExprInfo* pExprInfo = tscExprGet(pQueryInfo, j);
int32_t functionId = pExprInfo->base.functionId; int32_t functionId = pExprInfo->base.functionId;
if (pColIndex->colId == pExprInfo->base.colInfo.colId && (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TAG)) { if (pColIndex->colId == pExprInfo->base.colInfo.colId && (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TAG)) {
...@@ -369,9 +369,9 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SQueryInfo* ...@@ -369,9 +369,9 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SQueryInfo*
if (pQueryInfo->interval.interval != 0) { if (pQueryInfo->interval.interval != 0) {
orderColIndexList[0] = PRIMARYKEY_TIMESTAMP_COL_INDEX; orderColIndexList[0] = PRIMARYKEY_TIMESTAMP_COL_INDEX;
} else { } else {
size_t size = tscSqlExprNumOfExprs(pQueryInfo); size_t size = tscNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, i); SExprInfo *pExpr = tscExprGet(pQueryInfo, i);
if (pExpr->base.functionId == TSDB_FUNC_PRJ && pExpr->base.colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { if (pExpr->base.functionId == TSDB_FUNC_PRJ && pExpr->base.colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
orderColIndexList[0] = i; orderColIndexList[0] = i;
} }
...@@ -405,7 +405,7 @@ int32_t tscLocalReducerEnvCreate(SQueryInfo *pQueryInfo, tExtMemBuffer ***pMemBu ...@@ -405,7 +405,7 @@ int32_t tscLocalReducerEnvCreate(SQueryInfo *pQueryInfo, tExtMemBuffer ***pMemBu
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
size_t size = tscSqlExprNumOfExprs(pQueryInfo); size_t size = tscNumOfExprs(pQueryInfo);
pSchema = (SSchema *)calloc(1, sizeof(SSchema) * size); pSchema = (SSchema *)calloc(1, sizeof(SSchema) * size);
if (pSchema == NULL) { if (pSchema == NULL) {
...@@ -415,7 +415,7 @@ int32_t tscLocalReducerEnvCreate(SQueryInfo *pQueryInfo, tExtMemBuffer ***pMemBu ...@@ -415,7 +415,7 @@ int32_t tscLocalReducerEnvCreate(SQueryInfo *pQueryInfo, tExtMemBuffer ***pMemBu
int32_t rlen = 0; int32_t rlen = 0;
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, i); SExprInfo *pExpr = tscExprGet(pQueryInfo, i);
pSchema[i].bytes = pExpr->base.resBytes; pSchema[i].bytes = pExpr->base.resBytes;
pSchema[i].type = (int8_t)pExpr->base.resType; pSchema[i].type = (int8_t)pExpr->base.resType;
...@@ -702,12 +702,12 @@ int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_ ...@@ -702,12 +702,12 @@ int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_
// todo refactor // todo refactor
arithSup.offset = 0; arithSup.offset = 0;
arithSup.numOfCols = (int32_t) tscSqlExprNumOfExprs(pQueryInfo); arithSup.numOfCols = (int32_t) tscNumOfExprs(pQueryInfo);
arithSup.exprList = pQueryInfo->exprList; arithSup.exprList = pQueryInfo->exprList;
arithSup.data = calloc(arithSup.numOfCols, POINTER_BYTES); arithSup.data = calloc(arithSup.numOfCols, POINTER_BYTES);
for(int32_t k = 0; k < arithSup.numOfCols; ++k) { for(int32_t k = 0; k < arithSup.numOfCols; ++k) {
SExprInfo* pExpr = tscSqlExprGet(pQueryInfo, k); SExprInfo* pExpr = tscExprGet(pQueryInfo, k);
arithSup.data[k] = (pOutput->data + pOutput->num* pExpr->base.offset); arithSup.data[k] = (pOutput->data + pOutput->num* pExpr->base.offset);
} }
......
...@@ -1363,7 +1363,6 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock ...@@ -1363,7 +1363,6 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pSql->res.numOfRows = 0; pSql->res.numOfRows = 0;
assert(pCmd->numOfClause == 1);
STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta; STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
SSubmitBlk *pBlocks = (SSubmitBlk *)(pTableDataBlocks->pData); SSubmitBlk *pBlocks = (SSubmitBlk *)(pTableDataBlocks->pData);
...@@ -1527,7 +1526,7 @@ void tscImportDataFromFile(SSqlObj *pSql) { ...@@ -1527,7 +1526,7 @@ void tscImportDataFromFile(SSqlObj *pSql) {
} }
assert(pCmd->dataSourceType == DATA_FROM_DATA_FILE && strlen(pCmd->payload) != 0); assert(pCmd->dataSourceType == DATA_FROM_DATA_FILE && strlen(pCmd->payload) != 0);
pCmd->active = pCmd->pQueryInfo[0]; pCmd->active = pCmd->pQueryInfo;
SImportFileSupport *pSupporter = calloc(1, sizeof(SImportFileSupport)); SImportFileSupport *pSupporter = calloc(1, sizeof(SImportFileSupport));
SSqlObj *pNew = createSubqueryObj(pSql, 0, parseFileSendDataBlock, pSupporter, TSDB_SQL_INSERT, NULL); SSqlObj *pNew = createSubqueryObj(pSql, 0, parseFileSendDataBlock, pSupporter, TSDB_SQL_INSERT, NULL);
......
...@@ -774,7 +774,6 @@ static int insertStmtExecute(STscStmt* stmt) { ...@@ -774,7 +774,6 @@ static int insertStmtExecute(STscStmt* stmt) {
return TSDB_CODE_TSC_INVALID_VALUE; return TSDB_CODE_TSC_INVALID_VALUE;
} }
assert(pCmd->numOfClause == 1);
if (taosHashGetSize(pCmd->pTableBlockHashList) == 0) { if (taosHashGetSize(pCmd->pTableBlockHashList) == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
此差异已折叠。
...@@ -624,7 +624,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql, int32_t clauseIndex) { ...@@ -624,7 +624,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql, int32_t clauseIndex) {
int32_t srcColListSize = (int32_t)(taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo)); int32_t srcColListSize = (int32_t)(taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo));
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); size_t numOfExprs = tscNumOfExprs(pQueryInfo);
int32_t exprSize = (int32_t)(sizeof(SSqlExpr) * numOfExprs * 2); int32_t exprSize = (int32_t)(sizeof(SSqlExpr) * numOfExprs * 2);
int32_t tsBufSize = (pQueryInfo->tsBuf != NULL) ? pQueryInfo->tsBuf->fileSize : 0; int32_t tsBufSize = (pQueryInfo->tsBuf != NULL) ? pQueryInfo->tsBuf->fileSize : 0;
...@@ -929,7 +929,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -929,7 +929,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
goto _end; goto _end;
} }
SSqlGroupbyExpr *pGroupbyExpr = query.pGroupbyExpr; SGroupbyExpr *pGroupbyExpr = query.pGroupbyExpr;
if (pGroupbyExpr->numOfGroupCols > 0) { if (pGroupbyExpr->numOfGroupCols > 0) {
pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex); pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
pQueryMsg->orderType = htons(pGroupbyExpr->orderType); pQueryMsg->orderType = htons(pGroupbyExpr->orderType);
...@@ -1050,7 +1050,7 @@ int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1050,7 +1050,7 @@ int32_t tscBuildCreateDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SCreateDbMsg *pCreateDbMsg = (SCreateDbMsg *)pCmd->payload; SCreateDbMsg *pCreateDbMsg = (SCreateDbMsg *)pCmd->payload;
assert(pCmd->numOfClause == 1); // assert(pCmd->numOfClause == 1);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
int32_t code = tNameExtractFullName(&pTableMetaInfo->name, pCreateDbMsg->db); int32_t code = tNameExtractFullName(&pTableMetaInfo->name, pCreateDbMsg->db);
assert(code == TSDB_CODE_SUCCESS); assert(code == TSDB_CODE_SUCCESS);
...@@ -1632,9 +1632,9 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) { ...@@ -1632,9 +1632,9 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
taosArrayPush(group, &tableKeyInfo); taosArrayPush(group, &tableKeyInfo);
taosArrayPush(tableGroupInfo.pGroupList, &group); taosArrayPush(tableGroupInfo.pGroupList, &group);
SExprInfo* list = calloc(tscSqlExprNumOfExprs(pQueryInfo), sizeof(SExprInfo)); SExprInfo* list = calloc(tscNumOfExprs(pQueryInfo), sizeof(SExprInfo));
for(int32_t i = 0; i < tscSqlExprNumOfExprs(pQueryInfo); ++i) { for(int32_t i = 0; i < tscNumOfExprs(pQueryInfo); ++i) {
SExprInfo* pExprInfo = tscSqlExprGet(pQueryInfo, i); SExprInfo* pExprInfo = tscExprGet(pQueryInfo, i);
list[i] = *pExprInfo; list[i] = *pExprInfo;
} }
...@@ -2166,7 +2166,7 @@ int tscProcessShowRsp(SSqlObj *pSql) { ...@@ -2166,7 +2166,7 @@ int tscProcessShowRsp(SSqlObj *pSql) {
TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes); TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes);
SInternalField* pInfo = tscFieldInfoAppend(pFieldInfo, &f); SInternalField* pInfo = tscFieldInfoAppend(pFieldInfo, &f);
pInfo->pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, pInfo->pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index,
pTableSchema[i].type, pTableSchema[i].bytes, getNewResColId(pQueryInfo), pTableSchema[i].bytes, false); pTableSchema[i].type, pTableSchema[i].bytes, getNewResColId(pQueryInfo), pTableSchema[i].bytes, false);
} }
...@@ -2411,7 +2411,7 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn ...@@ -2411,7 +2411,7 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn
} }
STableMetaInfo *pNewMeterMetaInfo = tscAddEmptyMetaInfo(pNewQueryInfo); STableMetaInfo *pNewMeterMetaInfo = tscAddEmptyMetaInfo(pNewQueryInfo);
assert(pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1); assert(/*pNew->cmd.numOfClause == 1 && */pNewQueryInfo->numOfTables == 1);
tNameAssign(&pNewMeterMetaInfo->name, &pTableMetaInfo->name); tNameAssign(&pNewMeterMetaInfo->name, &pTableMetaInfo->name);
...@@ -2513,8 +2513,7 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) { ...@@ -2513,8 +2513,7 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
return getTableMetaFromMnode(pSql, pTableMetaInfo); return getTableMetaFromMnode(pSql, pTableMetaInfo);
} }
static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) { static bool allVgroupInfoRetrieved(SQueryInfo* pQueryInfo) {
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, clauseIndex);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
if (pTableMetaInfo->vgroupList == NULL) { if (pTableMetaInfo->vgroupList == NULL) {
...@@ -2526,11 +2525,9 @@ static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) { ...@@ -2526,11 +2525,9 @@ static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) {
return true; return true;
} }
int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { int tscGetSTableVgroupInfo(SSqlObj *pSql, SQueryInfo* pQueryInfo) {
int code = TSDB_CODE_RPC_NETWORK_UNAVAIL; int32_t code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
SSqlCmd *pCmd = &pSql->cmd; if (allVgroupInfoRetrieved(pQueryInfo)) {
if (allVgroupInfoRetrieved(pCmd, clauseIndex)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2547,7 +2544,6 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { ...@@ -2547,7 +2544,6 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
return code; return code;
} }
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, clauseIndex);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i); STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i);
STableMeta* pTableMeta = tscTableMetaDup(pMInfo->pTableMeta); STableMeta* pTableMeta = tscTableMetaDup(pMInfo->pTableMeta);
......
...@@ -37,7 +37,7 @@ static int64_t getDelayValueAfterTimewindowClosed(SSqlStream* pStream, int64_t l ...@@ -37,7 +37,7 @@ static int64_t getDelayValueAfterTimewindowClosed(SSqlStream* pStream, int64_t l
static bool isProjectStream(SQueryInfo* pQueryInfo) { static bool isProjectStream(SQueryInfo* pQueryInfo) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, i); SExprInfo *pExpr = tscExprGet(pQueryInfo, i);
if (pExpr->base.functionId != TSDB_FUNC_PRJ) { if (pExpr->base.functionId != TSDB_FUNC_PRJ) {
return false; return false;
} }
......
...@@ -417,14 +417,15 @@ static void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) { ...@@ -417,14 +417,15 @@ static void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) {
} }
if (pSupporter->exprList != NULL) { if (pSupporter->exprList != NULL) {
tscSqlExprInfoDestroy(pSupporter->exprList); tscExprDestroy(pSupporter->exprList);
pSupporter->exprList = NULL;
} }
if (pSupporter->colList != NULL) { if (pSupporter->colList != NULL) {
tscColumnListDestroy(pSupporter->colList); tscColumnListDestroy(pSupporter->colList);
} }
tscFieldInfoClear(&pSupporter->fieldsInfo); // tscFieldInfoClear(&pSupporter->fieldsInfo);
if (pSupporter->pTSBuf != NULL) { if (pSupporter->pTSBuf != NULL) {
tsBufDestroy(pSupporter->pTSBuf); tsBufDestroy(pSupporter->pTSBuf);
...@@ -554,7 +555,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -554,7 +555,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
pSubQueryInfo->tsBuf = NULL; pSubQueryInfo->tsBuf = NULL;
// free result for async object will also free sqlObj // free result for async object will also free sqlObj
assert(tscSqlExprNumOfExprs(pSubQueryInfo) == 1); // ts_comp query only requires one result columns assert(tscNumOfExprs(pSubQueryInfo) == 1); // ts_comp query only requires one result columns
taos_free_result(pPrevSub); taos_free_result(pPrevSub);
SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, pSupporter, TSDB_SQL_SELECT, NULL); SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, pSupporter, TSDB_SQL_SELECT, NULL);
...@@ -582,7 +583,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -582,7 +583,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
pQueryInfo->groupbyExpr = pSupporter->groupInfo; pQueryInfo->groupbyExpr = pSupporter->groupInfo;
pQueryInfo->pUpstream = taosArrayInit(4, sizeof(POINTER_BYTES)); pQueryInfo->pUpstream = taosArrayInit(4, sizeof(POINTER_BYTES));
assert(pNew->subState.numOfSub == 0 && pNew->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1); assert(pNew->subState.numOfSub == 0 && pQueryInfo->numOfTables == 1);
tscFieldInfoUpdateOffset(pQueryInfo); tscFieldInfoUpdateOffset(pQueryInfo);
...@@ -593,7 +594,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -593,7 +594,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
pSupporter->colList = NULL; pSupporter->colList = NULL;
pSupporter->pVgroupTables = NULL; pSupporter->pVgroupTables = NULL;
memset(&pSupporter->fieldsInfo, 0, sizeof(SFieldInfo)); memset(&pSupporter->fieldsInfo, 0, sizeof(SFieldInfo));
memset(&pSupporter->groupInfo, 0, sizeof(SSqlGroupbyExpr)); memset(&pSupporter->groupInfo, 0, sizeof(SGroupbyExpr));
/* /*
* When handling the projection query, the offset value will be modified for table-table join, which is changed * When handling the projection query, the offset value will be modified for table-table join, which is changed
...@@ -605,7 +606,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -605,7 +606,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
SColumnIndex index = {.tableIndex = 0, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; SColumnIndex index = {.tableIndex = 0, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
SSchema* s = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, 0); SSchema* s = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, 0);
SExprInfo* pExpr = tscSqlExprGet(pQueryInfo, 0); SExprInfo* pExpr = tscExprGet(pQueryInfo, 0);
int16_t funcId = pExpr->base.functionId; int16_t funcId = pExpr->base.functionId;
// add the invisible timestamp column // add the invisible timestamp column
...@@ -618,7 +619,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { ...@@ -618,7 +619,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
tscPrintSelNodeList(pNew, 0); tscPrintSelNodeList(pNew, 0);
tscFieldInfoUpdateOffset(pQueryInfo); tscFieldInfoUpdateOffset(pQueryInfo);
pExpr = tscSqlExprGet(pQueryInfo, 0); pExpr = tscExprGet(pQueryInfo, 0);
} }
// set the join condition tag column info, todo extract method // set the join condition tag column info, todo extract method
...@@ -821,7 +822,7 @@ static void issueTsCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* ...@@ -821,7 +822,7 @@ static void issueTsCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj*
// set the tags value for ts_comp function // set the tags value for ts_comp function
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, 0); SExprInfo *pExpr = tscExprGet(pQueryInfo, 0);
int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid); int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid);
pExpr->base.param[0].i64 = tagColId; pExpr->base.param[0].i64 = tagColId;
pExpr->base.param[0].nLen = sizeof(int64_t); pExpr->base.param[0].nLen = sizeof(int64_t);
...@@ -849,7 +850,7 @@ static void issueTsCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj* ...@@ -849,7 +850,7 @@ static void issueTsCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj*
"0x%"PRIx64" subquery:0x%"PRIx64" tableIndex:%d, vgroupIndex:%d, numOfVgroups:%d, type:%d, ts_comp query to retrieve timestamps, " "0x%"PRIx64" subquery:0x%"PRIx64" tableIndex:%d, vgroupIndex:%d, numOfVgroups:%d, type:%d, ts_comp query to retrieve timestamps, "
"numOfExpr:%" PRIzu ", colList:%" PRIzu ", numOfOutputFields:%d, name:%s", "numOfExpr:%" PRIzu ", colList:%" PRIzu ", numOfOutputFields:%d, name:%s",
pParent->self, pSql->self, 0, pTableMetaInfo->vgroupIndex, pTableMetaInfo->vgroupList->numOfVgroups, pQueryInfo->type, pParent->self, pSql->self, 0, pTableMetaInfo->vgroupIndex, pTableMetaInfo->vgroupList->numOfVgroups, pQueryInfo->type,
tscSqlExprNumOfExprs(pQueryInfo), numOfCols, pQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name)); tscNumOfExprs(pQueryInfo), numOfCols, pQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name));
tscBuildAndSendRequest(pSql, NULL); tscBuildAndSendRequest(pSql, NULL);
} }
...@@ -1688,7 +1689,7 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) { ...@@ -1688,7 +1689,7 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
int32_t numOfExprs = (int32_t)tscSqlExprNumOfExprs(pQueryInfo); int32_t numOfExprs = (int32_t)tscNumOfExprs(pQueryInfo);
pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * numOfExprs); pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * numOfExprs);
if (pRes->pColumnIndex == NULL) { if (pRes->pColumnIndex == NULL) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -1696,7 +1697,7 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) { ...@@ -1696,7 +1697,7 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
} }
for (int32_t i = 0; i < numOfExprs; ++i) { for (int32_t i = 0; i < numOfExprs; ++i) {
SExprInfo* pExpr = tscSqlExprGet(pQueryInfo, i); SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
int32_t tableIndexOfSub = -1; int32_t tableIndexOfSub = -1;
for (int32_t j = 0; j < pQueryInfo->numOfTables; ++j) { for (int32_t j = 0; j < pQueryInfo->numOfTables; ++j) {
...@@ -1714,7 +1715,7 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) { ...@@ -1714,7 +1715,7 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
size_t numOfSubExpr = taosArrayGetSize(pSubQueryInfo->exprList); size_t numOfSubExpr = taosArrayGetSize(pSubQueryInfo->exprList);
for (int32_t k = 0; k < numOfSubExpr; ++k) { for (int32_t k = 0; k < numOfSubExpr; ++k) {
SExprInfo* pSubExpr = tscSqlExprGet(pSubQueryInfo, k); SExprInfo* pSubExpr = tscExprGet(pSubQueryInfo, k);
if (pExpr->base.functionId == pSubExpr->base.functionId && pExpr->base.colInfo.colId == pSubExpr->base.colInfo.colId) { if (pExpr->base.functionId == pSubExpr->base.functionId && pExpr->base.colInfo.colId == pSubExpr->base.colInfo.colId) {
pRes->pColumnIndex[i] = (SColumnIndex){.tableIndex = tableIndexOfSub, .columnIndex = k}; pRes->pColumnIndex[i] = (SColumnIndex){.tableIndex = tableIndexOfSub, .columnIndex = k};
break; break;
...@@ -1737,7 +1738,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { ...@@ -1737,7 +1738,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1 && pSql->cmd.numOfClause == 1); assert(pQueryInfo->numOfTables == 1);
// retrieve actual query results from vnode during the second stage join subquery // retrieve actual query results from vnode during the second stage join subquery
if (pParentSql->res.code != TSDB_CODE_SUCCESS) { if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
...@@ -1865,7 +1866,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter ...@@ -1865,7 +1866,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
} }
pSupporter->groupInfo = pNewQueryInfo->groupbyExpr; pSupporter->groupInfo = pNewQueryInfo->groupbyExpr;
memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr)); memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SGroupbyExpr));
pNew->cmd.numOfCols = 0; pNew->cmd.numOfCols = 0;
pNewQueryInfo->interval.interval = 0; pNewQueryInfo->interval.interval = 0;
...@@ -1878,7 +1879,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter ...@@ -1878,7 +1879,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
pNewQueryInfo->order.orderColId = INT32_MIN; pNewQueryInfo->order.orderColId = INT32_MIN;
// backup the data and clear it in the sqlcmd object // backup the data and clear it in the sqlcmd object
memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr)); memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SGroupbyExpr));
tscInitQueryInfo(pNewQueryInfo); tscInitQueryInfo(pNewQueryInfo);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);
...@@ -1912,7 +1913,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter ...@@ -1912,7 +1913,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
tscDebug( tscDebug(
"%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, transfer to tid_tag query to retrieve (tableId, tags), " "%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, transfer to tid_tag query to retrieve (tableId, tags), "
"exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, tagIndex:%d, name:%s", "exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, tagIndex:%d, name:%s",
pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo), pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscNumOfExprs(pNewQueryInfo),
numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, colIndex.columnIndex, tNameGetTableName(&pNewQueryInfo->pTableMetaInfo[0]->name)); numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, colIndex.columnIndex, tNameGetTableName(&pNewQueryInfo->pTableMetaInfo[0]->name));
} else { } else {
SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1}; SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = 1};
...@@ -1920,7 +1921,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter ...@@ -1920,7 +1921,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
tscAddFuncInSelectClause(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &colIndex, &colSchema, TSDB_COL_NORMAL); tscAddFuncInSelectClause(pNewQueryInfo, 0, TSDB_FUNC_TS_COMP, &colIndex, &colSchema, TSDB_COL_NORMAL);
// set the tags value for ts_comp function // set the tags value for ts_comp function
SExprInfo *pExpr = tscSqlExprGet(pNewQueryInfo, 0); SExprInfo *pExpr = tscExprGet(pNewQueryInfo, 0);
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid); int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->id.uid);
...@@ -1947,7 +1948,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter ...@@ -1947,7 +1948,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
tscDebug( tscDebug(
"%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%u, transfer to ts_comp query to retrieve timestamps, " "%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%u, transfer to ts_comp query to retrieve timestamps, "
"exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s", "exprInfo:%" PRIzu ", colList:%" PRIzu ", fieldsInfo:%d, name:%s",
pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscSqlExprNumOfExprs(pNewQueryInfo), pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type, tscNumOfExprs(pNewQueryInfo),
numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pNewQueryInfo->pTableMetaInfo[0]->name)); numOfCols, pNewQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pNewQueryInfo->pTableMetaInfo[0]->name));
} }
} else { } else {
...@@ -2086,7 +2087,7 @@ typedef struct SFirstRoundQuerySup { ...@@ -2086,7 +2087,7 @@ typedef struct SFirstRoundQuerySup {
void doAppendData(SInterResult* pInterResult, TAOS_ROW row, int32_t numOfCols, SQueryInfo* pQueryInfo) { void doAppendData(SInterResult* pInterResult, TAOS_ROW row, int32_t numOfCols, SQueryInfo* pQueryInfo) {
TSKEY key = INT64_MIN; TSKEY key = INT64_MIN;
for(int32_t i = 0; i < numOfCols; ++i) { for(int32_t i = 0; i < numOfCols; ++i) {
SExprInfo* pExpr = tscSqlExprGet(pQueryInfo, i); SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
if (TSDB_COL_IS_TAG(pExpr->base.colInfo.flag) || pExpr->base.functionId == TSDB_FUNC_PRJ) { if (TSDB_COL_IS_TAG(pExpr->base.colInfo.flag) || pExpr->base.functionId == TSDB_FUNC_PRJ) {
continue; continue;
} }
...@@ -2179,7 +2180,7 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { ...@@ -2179,7 +2180,7 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
int32_t offset = 0; int32_t offset = 0;
for (int32_t i = 0; i < numOfCols && offset < pSup->tagLen; ++i) { for (int32_t i = 0; i < numOfCols && offset < pSup->tagLen; ++i) {
SExprInfo* pExpr = tscSqlExprGet(pQueryInfo, i); SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
// tag or group by column // tag or group by column
if (TSDB_COL_IS_TAG(pExpr->base.colInfo.flag) || pExpr->base.functionId == TSDB_FUNC_PRJ) { if (TSDB_COL_IS_TAG(pExpr->base.colInfo.flag) || pExpr->base.functionId == TSDB_FUNC_PRJ) {
...@@ -2321,11 +2322,11 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) { ...@@ -2321,11 +2322,11 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
pCmd->command = TSDB_SQL_SELECT; pCmd->command = TSDB_SQL_SELECT;
pNew->fp = tscFirstRoundCallback; pNew->fp = tscFirstRoundCallback;
int32_t numOfExprs = (int32_t) tscSqlExprNumOfExprs(pQueryInfo); int32_t numOfExprs = (int32_t) tscNumOfExprs(pQueryInfo);
int32_t index = 0; int32_t index = 0;
for(int32_t i = 0; i < numOfExprs; ++i) { for(int32_t i = 0; i < numOfExprs; ++i) {
SExprInfo* pExpr = tscSqlExprGet(pQueryInfo, i); SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
if (pExpr->base.functionId == TSDB_FUNC_TS && pQueryInfo->interval.interval > 0) { if (pExpr->base.functionId == TSDB_FUNC_TS && pQueryInfo->interval.interval > 0) {
taosArrayPush(pSup->pColsInfo, &pExpr->base.resColId); taosArrayPush(pSup->pColsInfo, &pExpr->base.resColId);
...@@ -2385,7 +2386,7 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) { ...@@ -2385,7 +2386,7 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
"0x%"PRIx64" first round subquery:0x%"PRIx64" tableIndex:%d, vgroupIndex:%d, numOfVgroups:%d, type:%d, query to retrieve timestamps, " "0x%"PRIx64" first round subquery:0x%"PRIx64" tableIndex:%d, vgroupIndex:%d, numOfVgroups:%d, type:%d, query to retrieve timestamps, "
"numOfExpr:%" PRIzu ", colList:%d, numOfOutputFields:%d, name:%s", "numOfExpr:%" PRIzu ", colList:%d, numOfOutputFields:%d, name:%s",
pSql->self, pNew->self, 0, pTableMetaInfo->vgroupIndex, pTableMetaInfo->vgroupList->numOfVgroups, pNewQueryInfo->type, pSql->self, pNew->self, 0, pTableMetaInfo->vgroupIndex, pTableMetaInfo->vgroupList->numOfVgroups, pNewQueryInfo->type,
tscSqlExprNumOfExprs(pNewQueryInfo), index+1, pNewQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name)); tscNumOfExprs(pNewQueryInfo), index+1, pNewQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pTableMetaInfo->name));
tscHandleMasterSTableQuery(pNew); tscHandleMasterSTableQuery(pNew);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2897,7 +2898,7 @@ static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsuppo ...@@ -2897,7 +2898,7 @@ static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsuppo
pQueryInfo->limit.limit = -1; pQueryInfo->limit.limit = -1;
pQueryInfo->limit.offset = 0; pQueryInfo->limit.offset = 0;
assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1 && trsupport->subqueryIndex < pSql->subState.numOfSub); assert(/*pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1 &&*/ trsupport->subqueryIndex < pSql->subState.numOfSub);
// launch subquery for each vnode, so the subquery index equals to the vgroupIndex. // launch subquery for each vnode, so the subquery index equals to the vgroupIndex.
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, table_index); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, table_index);
...@@ -2924,7 +2925,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { ...@@ -2924,7 +2925,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
SSqlObj* pSql = (SSqlObj *) tres; SSqlObj* pSql = (SSqlObj *) tres;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
assert(pSql->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1); assert(pQueryInfo->numOfTables == 1);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
SVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[trsupport->subqueryIndex]; SVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[trsupport->subqueryIndex];
...@@ -3296,12 +3297,12 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { ...@@ -3296,12 +3297,12 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
continue; continue;
} }
SQueryInfo* pSubQueryInfo = pSub->cmd.pQueryInfo[0]; SQueryInfo* pSubQueryInfo = pSub->cmd.pQueryInfo;
tscRestoreFuncForSTableQuery(pSubQueryInfo); tscRestoreFuncForSTableQuery(pSubQueryInfo);
tscFieldInfoUpdateOffset(pSubQueryInfo); tscFieldInfoUpdateOffset(pSubQueryInfo);
} }
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); size_t numOfExprs = tscNumOfExprs(pQueryInfo);
for(int32_t i = 0; i < numOfExprs; ++i) { for(int32_t i = 0; i < numOfExprs; ++i) {
SColumnIndex* pIndex = &pRes->pColumnIndex[i]; SColumnIndex* pIndex = &pRes->pColumnIndex[i];
SSqlRes* pRes1 = &pSql->pSubs[pIndex->tableIndex]->res; SSqlRes* pRes1 = &pSql->pSubs[pIndex->tableIndex]->res;
...@@ -3348,7 +3349,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) { ...@@ -3348,7 +3349,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
if (pRes->tsrow == NULL) { if (pRes->tsrow == NULL) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, pSql->cmd.clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, pSql->cmd.clauseIndex);
pRes->numOfCols = (int16_t) tscSqlExprNumOfExprs(pQueryInfo); pRes->numOfCols = (int16_t) tscNumOfExprs(pQueryInfo);
pRes->tsrow = calloc(pRes->numOfCols, POINTER_BYTES); pRes->tsrow = calloc(pRes->numOfCols, POINTER_BYTES);
pRes->urow = calloc(pRes->numOfCols, POINTER_BYTES); pRes->urow = calloc(pRes->numOfCols, POINTER_BYTES);
......
此差异已折叠。
...@@ -44,8 +44,8 @@ typedef struct SResPair { ...@@ -44,8 +44,8 @@ typedef struct SResPair {
// the structure for sql function in select clause // the structure for sql function in select clause
typedef struct SSqlExpr { typedef struct SSqlExpr {
char aliasName[TSDB_COL_NAME_LEN]; // as aliasName char aliasName[TSDB_COL_NAME_LEN]; // as aliasName
char token[TSDB_COL_NAME_LEN]; // original token
SColIndex colInfo; SColIndex colInfo;
uint64_t uid; // refactor use the pointer uint64_t uid; // refactor use the pointer
int16_t functionId; // function id in aAgg array int16_t functionId; // function id in aAgg array
...@@ -92,8 +92,6 @@ size_t tableIdPrefix(const char* name, char* prefix, int32_t len); ...@@ -92,8 +92,6 @@ size_t tableIdPrefix(const char* name, char* prefix, int32_t len);
void extractTableNameFromToken(SStrToken *pToken, SStrToken* pTable); void extractTableNameFromToken(SStrToken *pToken, SStrToken* pTable);
//SSchema tGetTbnameColumnSchema();
SSchema tGetBlockDistColumnSchema(); SSchema tGetBlockDistColumnSchema();
SSchema tGetUserSpecifiedColumnSchema(tVariant* pVal, SStrToken* exprStr, const char* name); SSchema tGetUserSpecifiedColumnSchema(tVariant* pVal, SStrToken* exprStr, const char* name);
......
...@@ -2569,6 +2569,7 @@ _arithmetic_operator_fn_t getArithmeticOperatorFn(int32_t arithmeticOptr) { ...@@ -2569,6 +2569,7 @@ _arithmetic_operator_fn_t getArithmeticOperatorFn(int32_t arithmeticOptr) {
case TSDB_BINARY_OP_REMAINDER: case TSDB_BINARY_OP_REMAINDER:
return vectorRemainder; return vectorRemainder;
default: default:
assert(0);
return NULL; return NULL;
} }
} }
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <texpr.h>
#include "os.h" #include "os.h"
#include "texpr.h" #include "texpr.h"
...@@ -465,27 +466,29 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) { ...@@ -465,27 +466,29 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) {
return expr; return expr;
} }
tExprNode* exprdup(tExprNode* pTree) { tExprNode* exprdup(tExprNode* pNode) {
if (pTree == NULL) { if (pNode == NULL) {
return NULL; return NULL;
} }
tExprNode* pNode = calloc(1, sizeof(tExprNode)); tExprNode* pCloned = calloc(1, sizeof(tExprNode));
if (pTree->nodeType == TSQL_NODE_EXPR) { if (pNode->nodeType == TSQL_NODE_EXPR) {
tExprNode* pLeft = exprdup(pTree->_node.pLeft); tExprNode* pLeft = exprdup(pNode->_node.pLeft);
tExprNode* pRight = exprdup(pTree->_node.pRight); tExprNode* pRight = exprdup(pNode->_node.pRight);
pNode->nodeType = TSQL_NODE_EXPR; pCloned->_node.pLeft = pLeft;
pNode->_node.pLeft = pLeft; pCloned->_node.pRight = pRight;
pNode->_node.pRight = pRight; pCloned->_node.optr = pNode->_node.optr;
} else if (pTree->nodeType == TSQL_NODE_VALUE) { pCloned->_node.hasPK = pNode->_node.hasPK;
pNode->pVal = calloc(1, sizeof(tVariant)); } else if (pNode->nodeType == TSQL_NODE_VALUE) {
tVariantAssign(pNode->pVal, pTree->pVal); pCloned->pVal = calloc(1, sizeof(tVariant));
} else if (pTree->nodeType == TSQL_NODE_COL) { tVariantAssign(pCloned->pVal, pNode->pVal);
pNode->pSchema = calloc(1, sizeof(SSchema)); } else if (pNode->nodeType == TSQL_NODE_COL) {
*pNode->pSchema = *pTree->pSchema; pCloned->pSchema = calloc(1, sizeof(SSchema));
*pCloned->pSchema = *pNode->pSchema;
} }
return pNode; pCloned->nodeType = pNode->nodeType;
return pCloned;
} }
...@@ -70,13 +70,13 @@ typedef struct SResultRowPool { ...@@ -70,13 +70,13 @@ typedef struct SResultRowPool {
SArray* pData; // SArray<void*> SArray* pData; // SArray<void*>
} SResultRowPool; } SResultRowPool;
typedef struct SSqlGroupbyExpr { typedef struct SGroupbyExpr {
int16_t tableIndex; int16_t tableIndex;
SArray* columnInfo; // SArray<SColIndex>, group by columns information SArray* columnInfo; // SArray<SColIndex>, group by columns information
int16_t numOfGroupCols; int16_t numOfGroupCols; // todo remove it
int16_t orderIndex; // order by column index int16_t orderIndex; // order by column index
int16_t orderType; // order by type: asc/desc int16_t orderType; // order by type: asc/desc
} SSqlGroupbyExpr; } SGroupbyExpr;
typedef struct SResultRow { typedef struct SResultRow {
int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer
...@@ -216,7 +216,7 @@ typedef struct SQueryAttr { ...@@ -216,7 +216,7 @@ typedef struct SQueryAttr {
int32_t intermediateResultRowSize; // intermediate result row size, in case of top-k query. int32_t intermediateResultRowSize; // intermediate result row size, in case of top-k query.
int32_t maxTableColumnWidth; int32_t maxTableColumnWidth;
int32_t tagLen; // tag value length of current query int32_t tagLen; // tag value length of current query
SSqlGroupbyExpr* pGroupbyExpr; SGroupbyExpr* pGroupbyExpr;
SExprInfo* pExpr1; SExprInfo* pExpr1;
SExprInfo* pExpr2; SExprInfo* pExpr2;
...@@ -362,7 +362,7 @@ typedef struct SQueryParam { ...@@ -362,7 +362,7 @@ typedef struct SQueryParam {
SColIndex *pGroupColIndex; SColIndex *pGroupColIndex;
SColumnInfo *pTagColumnInfo; SColumnInfo *pTagColumnInfo;
SSqlGroupbyExpr *pGroupbyExpr; SGroupbyExpr *pGroupbyExpr;
int32_t tableScanOperator; int32_t tableScanOperator;
SArray *pOperator; SArray *pOperator;
} SQueryParam; } SQueryParam;
...@@ -536,8 +536,8 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp ...@@ -536,8 +536,8 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp
int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo,
SSqlExpr **pExpr, SExprInfo *prevExpr); SSqlExpr **pExpr, SExprInfo *prevExpr);
SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code); SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code);
SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs, SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, int32_t vgId, char* sql, uint64_t *qId); SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, int32_t vgId, char* sql, uint64_t *qId);
int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, SQueryParam* param, char* start, int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, SQueryParam* param, char* start,
......
...@@ -62,7 +62,7 @@ typedef struct SFillInfo { ...@@ -62,7 +62,7 @@ typedef struct SFillInfo {
SFillColInfo* pFillCol; // column info for fill operations SFillColInfo* pFillCol; // column info for fill operations
SFillTagColInfo* pTags; // tags value for filling gap SFillTagColInfo* pTags; // tags value for filling gap
void* handle; // for dubug purpose void* handle; // for debug purpose
} SFillInfo; } SFillInfo;
typedef struct SPoint { typedef struct SPoint {
......
...@@ -16,7 +16,38 @@ ...@@ -16,7 +16,38 @@
#ifndef TDENGINE_QPLAN_H #ifndef TDENGINE_QPLAN_H
#define TDENGINE_QPLAN_H #define TDENGINE_QPLAN_H
//TODO refactor struct SQueryInfo;
typedef struct SQueryNodeBasicInfo {
int32_t type;
char *name;
} SQueryNodeBasicInfo;
typedef struct SQueryTableInfo {
char *tableName;
STableId id;
} SQueryTableInfo;
typedef struct SQueryNode {
SQueryNodeBasicInfo info;
SQueryTableInfo tableInfo;
SSchema *pSchema; // the schema of the input SSDatablock
int32_t numOfCols; // number of input columns
SExprInfo *pExpr; // the query functions or sql aggregations
int32_t numOfOutput; // number of result columns, which is also the number of pExprs
void *pExtInfo; // additional information
// previous operator to generated result for current node to process
// in case of join, multiple prev nodes exist.
SArray *pPrevNodes;// upstream nodes
struct SQueryNode *nextNode;
} SQueryNode;
SQueryNode* qCreateQueryPlan(struct SQueryInfo* pQueryInfo);
void* qDestroyQueryPlan(SQueryNode* pQueryNode);
char* queryPlanToString(SQueryNode* pQueryNode);
SArray* createTableScanPlan(SQueryAttr* pQueryAttr); SArray* createTableScanPlan(SQueryAttr* pQueryAttr);
SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr); SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr);
SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr); SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr);
......
...@@ -47,6 +47,8 @@ void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow, in ...@@ -47,6 +47,8 @@ void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow, in
SResultRowCellInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t* offset); SResultRowCellInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t* offset);
void* destroyQueryFuncExpr(SExprInfo* pExprInfo, int32_t numOfExpr);
static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) { static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) {
assert(pResultRowInfo != NULL && slot >= 0 && slot < pResultRowInfo->size); assert(pResultRowInfo != NULL && slot >= 0 && slot < pResultRowInfo->size);
return pResultRowInfo->pResult[slot]; return pResultRowInfo->pResult[slot];
......
...@@ -3700,7 +3700,7 @@ char *getArithColumnData(void *param, const char* name, int32_t colId) { ...@@ -3700,7 +3700,7 @@ char *getArithColumnData(void *param, const char* name, int32_t colId) {
} }
} }
assert(index >= 0 /*&& colId >= 0*/); assert(index >= 0);
return pSupport->data[index] + pSupport->offset * pSupport->colList[index].bytes; return pSupport->data[index] + pSupport->offset * pSupport->colList[index].bytes;
} }
......
...@@ -189,7 +189,7 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator); ...@@ -189,7 +189,7 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator);
static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock); static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock);
static int32_t getGroupbyColumnIndex(SSqlGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock); static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock);
static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SGroupbyOperatorInfo *pInfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex); static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SGroupbyOperatorInfo *pInfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex);
static void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size); static void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size);
...@@ -1418,7 +1418,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SGroupbyOp ...@@ -1418,7 +1418,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SGroupbyOp
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t getGroupbyColumnIndex(SSqlGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock) { static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock) {
for (int32_t k = 0; k < pGroupbyExpr->numOfGroupCols; ++k) { for (int32_t k = 0; k < pGroupbyExpr->numOfGroupCols; ++k) {
SColIndex* pColIndex = taosArrayGet(pGroupbyExpr->columnInfo, k); SColIndex* pColIndex = taosArrayGet(pGroupbyExpr->columnInfo, k);
if (TSDB_COL_IS_TAG(pColIndex->flag)) { if (TSDB_COL_IS_TAG(pColIndex->flag)) {
...@@ -6603,13 +6603,13 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t nu ...@@ -6603,13 +6603,13 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t nu
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code) { SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code) {
if (pQueryMsg->numOfGroupCols == 0) { if (pQueryMsg->numOfGroupCols == 0) {
return NULL; return NULL;
} }
// using group by tag columns // using group by tag columns
SSqlGroupbyExpr *pGroupbyExpr = (SSqlGroupbyExpr *)calloc(1, sizeof(SSqlGroupbyExpr)); SGroupbyExpr *pGroupbyExpr = (SGroupbyExpr *)calloc(1, sizeof(SGroupbyExpr));
if (pGroupbyExpr == NULL) { if (pGroupbyExpr == NULL) {
*code = TSDB_CODE_QRY_OUT_OF_MEMORY; *code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL; return NULL;
...@@ -6770,7 +6770,7 @@ FORCE_INLINE bool checkQIdEqual(void *qHandle, uint64_t qId) { ...@@ -6770,7 +6770,7 @@ FORCE_INLINE bool checkQIdEqual(void *qHandle, uint64_t qId) {
return ((SQInfo *)qHandle)->qId == qId; return ((SQInfo *)qHandle)->qId == qId;
} }
SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr, SExprInfo* pExprs, SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, SExprInfo* pExprs,
SExprInfo* pSecExprs, STableGroupInfo* pTableGroupInfo, SColumnInfo* pTagCols, int32_t vgId, SExprInfo* pSecExprs, STableGroupInfo* pTableGroupInfo, SColumnInfo* pTagCols, int32_t vgId,
char* sql, uint64_t *qId) { char* sql, uint64_t *qId) {
int16_t numOfCols = pQueryMsg->numOfCols; int16_t numOfCols = pQueryMsg->numOfCols;
...@@ -7073,7 +7073,7 @@ static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo) { ...@@ -7073,7 +7073,7 @@ static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo) {
pTableqinfoGroupInfo->numOfTables = 0; pTableqinfoGroupInfo->numOfTables = 0;
} }
static void* destroyQueryFuncExpr(SExprInfo* pExprInfo, int32_t numOfExpr) { void* destroyQueryFuncExpr(SExprInfo* pExprInfo, int32_t numOfExpr) {
if (pExprInfo == NULL) { if (pExprInfo == NULL) {
assert(numOfExpr == 0); assert(numOfExpr == 0);
return NULL; return NULL;
......
#include <tscUtil.h>
#include "os.h" #include "os.h"
#include "qExecutor.h"
#include "qUtil.h" #include "qUtil.h"
#include "texpr.h" #include "texpr.h"
#include "qPlan.h"
#include "tsclient.h" #include "tsclient.h"
#include "tscUtil.h"
#define QNODE_PROJECT 1 #define QNODE_TAGSCAN 1
#define QNODE_FILTER 2 #define QNODE_TABLESCAN 2
#define QNODE_TABLESCAN 3 #define QNODE_PROJECT 3
#define QNODE_AGGREGATE 4 #define QNODE_AGGREGATE 4
#define QNODE_GROUPBY 5 #define QNODE_GROUPBY 5
#define QNODE_LIMIT 6 #define QNODE_LIMIT 6
...@@ -18,126 +20,492 @@ ...@@ -18,126 +20,492 @@
#define QNODE_SESSIONWINDOW 12 #define QNODE_SESSIONWINDOW 12
#define QNODE_FILL 13 #define QNODE_FILL 13
typedef struct SQueryNodeBasicInfo { typedef struct SFillEssInfo {
int32_t type; int32_t fillType; // fill type
char *name; int64_t *val; // fill value
} SQueryNodeBasicInfo; } SFillEssInfo;
typedef struct SQueryNode { static SQueryNode* createQueryNode(int32_t type, const char* name, SQueryNode** prev,
SQueryNodeBasicInfo info; int32_t numOfPrev, SExprInfo** pExpr, int32_t numOfOutput, SQueryTableInfo* pTableInfo,
// char *name; // the name of logic node void* pExtInfo) {
// int32_t type; // the type of logic node
SSchema *pSchema; // the schema of the input SSDatablock
int32_t numOfCols; // number of input columns
SExprInfo *pExpr; // the query functions or sql aggregations
int32_t numOfOutput; // number of result columns, which is also the number of pExprs
// previous operator to generated result for current node to process
// in case of join, multiple prev nodes exist.
SArray *pPrevNodes;// upstream nodes
struct SQueryNode *nextNode;
} SQueryNode;
static SQueryNode* createQueryNode(int32_t type, const char* name, SQueryNode** prev, int32_t numOfPrev) {
SQueryNode* pNode = calloc(1, sizeof(SQueryNode)); SQueryNode* pNode = calloc(1, sizeof(SQueryNode));
pNode->info.type = type; pNode->info.type = type;
pNode->info.name = strdup(name); pNode->info.name = strdup(name);
if (pTableInfo->id.uid != 0) { // it is a true table
pNode->tableInfo.id = pTableInfo->id;
pNode->tableInfo.tableName = strdup(pTableInfo->tableName);
}
pNode->numOfOutput = numOfOutput;
pNode->pExpr = calloc(numOfOutput, sizeof(SExprInfo));
for(int32_t i = 0; i < numOfOutput; ++i) {
tscExprAssign(&pNode->pExpr[i], pExpr[i]);
}
pNode->pPrevNodes = taosArrayInit(4, POINTER_BYTES); pNode->pPrevNodes = taosArrayInit(4, POINTER_BYTES);
for(int32_t i = 0; i < numOfPrev; ++i) { for(int32_t i = 0; i < numOfPrev; ++i) {
taosArrayPush(pNode->pPrevNodes, &prev[i]); taosArrayPush(pNode->pPrevNodes, &prev[i]);
} }
switch(type) {
case QNODE_TABLESCAN: {
STimeWindow* window = calloc(1, sizeof(STimeWindow));
memcpy(window, pExtInfo, sizeof(STimeWindow));
pNode->pExtInfo = window;
break;
}
case QNODE_TIMEWINDOW: {
SInterval* pInterval = calloc(1, sizeof(SInterval));
pNode->pExtInfo = pInterval;
memcpy(pInterval, pExtInfo, sizeof(SInterval));
break;
}
case QNODE_GROUPBY: {
SGroupbyExpr* p = (SGroupbyExpr*) pExtInfo;
SGroupbyExpr* pGroupbyExpr = calloc(1, sizeof(SGroupbyExpr));
pGroupbyExpr->tableIndex = p->tableIndex;
pGroupbyExpr->orderType = p->orderType;
pGroupbyExpr->orderIndex = p->orderIndex;
pGroupbyExpr->numOfGroupCols = p->numOfGroupCols;
pGroupbyExpr->columnInfo = taosArrayDup(p->columnInfo);
pNode->pExtInfo = pGroupbyExpr;
break;
}
case QNODE_FILL: { // todo !!
pNode->pExtInfo = pExtInfo;
break;
}
case QNODE_LIMIT: {
pNode->pExtInfo = calloc(1, sizeof(SLimitVal));
memcpy(pNode->pExtInfo, pExtInfo, sizeof(SLimitVal));
break;
}
}
return pNode; return pNode;
} }
static SQueryNode* doCreateQueryPlanForOneTable(SQueryInfo* pQueryInfo) { static SQueryNode* doAddTableColumnNode(SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo,
SQueryNode* pNode = createQueryNode(QNODE_TABLESCAN, "", NULL, 0); SQueryTableInfo* info, SArray* pExprs, SArray* tableCols) {
if (pQueryInfo->onlyTagQuery) {
int32_t num = taosArrayGetSize(pExprs);
SQueryNode* pNode = createQueryNode(QNODE_TAGSCAN, "TableTagScan", NULL, 0, pExprs->pData, num, info, NULL);
if (pQueryInfo->distinctTag) {
pNode = createQueryNode(QNODE_DISTINCT, "Distinct", &pNode, 1, pExprs->pData, num, info, NULL);
}
// check for filter return pNode;
if (pQueryInfo->hasFilter) {
pNode = createQueryNode(QNODE_FILTER, "", &pNode, 1);
} }
if (pQueryInfo->distinctTag) { STimeWindow* window = &pQueryInfo->window;
pNode = createQueryNode(QNODE_DISTINCT, "", &pNode, 0); SQueryNode* pNode = createQueryNode(QNODE_TABLESCAN, "TableScan", NULL, 0, NULL, 0,
info, window);
if (pQueryInfo->projectionQuery) {
int32_t numOfOutput = taosArrayGetSize(pExprs);
pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExprs->pData, numOfOutput, info, NULL);
} else {
// table source column projection, generate the projection expr
int32_t numOfCols = taosArrayGetSize(tableCols);
SExprInfo** pExpr = calloc(numOfCols, POINTER_BYTES);
SSchema* pSchema = pTableMetaInfo->pTableMeta->schema;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumn* pCol = taosArrayGetP(tableCols, i);
SColumnIndex index = {.tableIndex = 0, .columnIndex = pCol->columnIndex};
SExprInfo* p = tscExprCreate(pQueryInfo, TSDB_FUNC_PRJ, &index, pCol->info.type, pCol->info.bytes,
pCol->info.colId, 0, TSDB_COL_NORMAL);
strncpy(p->base.aliasName, pSchema[pCol->columnIndex].name, tListLen(p->base.aliasName));
} else if (pQueryInfo->projectionQuery) { pExpr[i] = p;
pNode = createQueryNode(QNODE_PROJECT, "", &pNode, 1); }
} else { // check for aggregation
pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExpr, numOfCols, info, NULL);
for (int32_t i = 0; i < numOfCols; ++i) {
destroyQueryFuncExpr(pExpr[i], 1);
}
tfree(pExpr);
}
return pNode;
}
static SQueryNode* doCreateQueryPlanForOneTableImpl(SQueryInfo* pQueryInfo, SQueryNode* pNode, SQueryTableInfo* info,
SArray* pExprs) {
// check for aggregation
if (pQueryInfo->interval.interval > 0) { if (pQueryInfo->interval.interval > 0) {
pNode = createQueryNode(QNODE_TIMEWINDOW, "", &pNode, 1); int32_t numOfOutput = taosArrayGetSize(pExprs);
pNode = createQueryNode(QNODE_TIMEWINDOW, "TimeWindowAgg", &pNode, 1, pExprs->pData, numOfOutput, info,
&pQueryInfo->interval);
} else if (pQueryInfo->groupbyColumn) { } else if (pQueryInfo->groupbyColumn) {
pNode = createQueryNode(QNODE_GROUPBY, "", &pNode, 1); int32_t numOfOutput = taosArrayGetSize(pExprs);
pNode = createQueryNode(QNODE_GROUPBY, "Groupby", &pNode, 1, pExprs->pData, numOfOutput, info,
&pQueryInfo->groupbyExpr);
} else if (pQueryInfo->sessionWindow.gap > 0) { } else if (pQueryInfo->sessionWindow.gap > 0) {
pNode = createQueryNode(QNODE_SESSIONWINDOW, "", &pNode, 1); pNode = createQueryNode(QNODE_SESSIONWINDOW, "SessionWindowAgg", &pNode, 1, NULL, 0, info, NULL);
} else if (pQueryInfo->simpleAgg) { } else if (pQueryInfo->simpleAgg) {
pNode = createQueryNode(QNODE_AGGREGATE, "", &pNode, 1); int32_t numOfOutput = taosArrayGetSize(pExprs);
} pNode = createQueryNode(QNODE_AGGREGATE, "Aggregate", &pNode, 1, pExprs->pData, numOfOutput, info, NULL);
if (pQueryInfo->havingFieldNum > 0) {
pNode = createQueryNode(QNODE_FILTER, "", &pNode, 1);
} }
if (pQueryInfo->arithmeticOnAgg) { if (pQueryInfo->havingFieldNum > 0 || pQueryInfo->arithmeticOnAgg) {
pNode = createQueryNode(QNODE_PROJECT, "", &pNode, 1); int32_t numOfExpr = taosArrayGetSize(pQueryInfo->exprList1);
pNode =
createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pQueryInfo->exprList1->pData, numOfExpr, info, NULL);
} }
if (pQueryInfo->fillType != TSDB_FILL_NONE) { if (pQueryInfo->fillType != TSDB_FILL_NONE) {
pNode = createQueryNode(QNODE_FILL, "", &pNode, 1); SFillEssInfo* pInfo = calloc(1, sizeof(SFillEssInfo));
pInfo->fillType = pQueryInfo->fillType;
pInfo->val = calloc(pNode->numOfOutput, sizeof(int64_t));
memcpy(pInfo->val, pQueryInfo->fillVal, pNode->numOfOutput);
pNode = createQueryNode(QNODE_FILL, "Fill", &pNode, 1, NULL, 0, info, pInfo);
} }
}
if (pQueryInfo->limit.limit != -1 || pQueryInfo->limit.offset != 0) { if (pQueryInfo->limit.limit != -1 || pQueryInfo->limit.offset != 0) {
pNode = createQueryNode(QNODE_LIMIT, "", &pNode, 1); pNode = createQueryNode(QNODE_LIMIT, "Limit", &pNode, 1, NULL, 0, info, &pQueryInfo->limit);
} }
return pNode; return pNode;
} }
SArray* qCreateQueryPlan(SQueryInfo* pQueryInfo) { static SQueryNode* doCreateQueryPlanForOneTable(SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SArray* pExprs,
// join and subquery SArray* tableCols) {
char name[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pTableMetaInfo->name, name);
SQueryTableInfo info = {.tableName = strdup(name), .id = pTableMetaInfo->pTableMeta->id,};
// handle the only tag query
SQueryNode* pNode = doAddTableColumnNode(pQueryInfo, pTableMetaInfo, &info, pExprs, tableCols);
if (pQueryInfo->onlyTagQuery) {
tfree(info.tableName);
return pNode;
}
SQueryNode* pNode1 = doCreateQueryPlanForOneTableImpl(pQueryInfo, pNode, &info, pExprs);
tfree(info.tableName);
return pNode1;
}
SArray* createQueryPlanImpl(SQueryInfo* pQueryInfo) {
SArray* upstream = NULL; SArray* upstream = NULL;
if (pQueryInfo->pUpstream != NULL) { // subquery in the from clause
if (pQueryInfo->pUpstream != NULL && taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // subquery in the from clause
upstream = taosArrayInit(4, POINTER_BYTES); upstream = taosArrayInit(4, POINTER_BYTES);
size_t size = taosArrayGetSize(pQueryInfo->pUpstream); size_t size = taosArrayGetSize(pQueryInfo->pUpstream);
for(int32_t i = 0; i < size; ++i) { for(int32_t i = 0; i < size; ++i) {
SQueryInfo* pq = taosArrayGet(pQueryInfo->pUpstream, i); SQueryInfo* pq = taosArrayGet(pQueryInfo->pUpstream, i);
SArray* p = qCreateQueryPlan(pq); SArray* p = createQueryPlanImpl(pq);
taosArrayPushBatch(upstream, p->pData, (int32_t) taosArrayGetSize(p)); taosArrayPushBatch(upstream, p->pData, (int32_t) taosArrayGetSize(p));
} }
} }
if (pQueryInfo->numOfTables > 1) { // it is a join query if (pQueryInfo->numOfTables > 1) { // it is a join query
// 1. separate the select clause according to table // 1. separate the select clause according to table
int32_t tableIndex = 0; upstream = taosArrayInit(5, POINTER_BYTES);
STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[tableIndex];
uint64_t uid = pTableMetaInfo->pTableMeta->id.uid; for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[i];
uint64_t uid = pTableMetaInfo->pTableMeta->id.uid;
SArray* exprList = taosArrayInit(4, POINTER_BYTES);
if (tscExprCopy(exprList, pQueryInfo->exprList, uid, true) != 0) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
exit(-1);
}
SArray* tableColumnList = taosArrayInit(4, sizeof(SColumn));
tscColumnListCopy(tableColumnList, pQueryInfo->colList, uid);
// 2. create the query execution node
char name[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pTableMetaInfo->name, name);
SQueryTableInfo info = {.tableName = strdup(name), .id = pTableMetaInfo->pTableMeta->id,};
SQueryNode* pNode = doAddTableColumnNode(pQueryInfo, pTableMetaInfo, &info, exprList, tableColumnList);
taosArrayPush(upstream, &pNode);
}
// 3. add the join node here
SQueryTableInfo info = {0};
int32_t num = taosArrayGetSize(pQueryInfo->exprList);
SQueryNode* pNode = createQueryNode(QNODE_JOIN, "Join", upstream->pData, pQueryInfo->numOfTables,
pQueryInfo->exprList->pData, num, &info, NULL);
// 4. add the aggregation or projection execution node
pNode = doCreateQueryPlanForOneTableImpl(pQueryInfo, pNode, &info, pQueryInfo->exprList);
upstream = taosArrayInit(5, POINTER_BYTES);
taosArrayPush(upstream, &pNode);
} else { // only one table, normal query process
STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
SQueryNode* pNode = doCreateQueryPlanForOneTable(pQueryInfo, pTableMetaInfo, pQueryInfo->exprList, pQueryInfo->colList);
upstream = taosArrayInit(5, POINTER_BYTES);
taosArrayPush(upstream, &pNode);
}
return upstream;
}
SQueryNode* qCreateQueryPlan(SQueryInfo* pQueryInfo) {
SArray* upstream = createQueryPlanImpl(pQueryInfo);
assert(taosArrayGetSize(upstream) == 1);
SQueryNode* p = taosArrayGetP(upstream, 0);
taosArrayDestroy(upstream);
return p;
}
static void doDestroyQueryNode(SQueryNode* pQueryNode) {
tfree(pQueryNode->pExtInfo);
tfree(pQueryNode->pSchema);
tfree(pQueryNode->info.name);
SArray* exprList = taosArrayInit(4, POINTER_BYTES); tfree(pQueryNode->tableInfo.tableName);
if (tscSqlExprCopy(exprList, pQueryInfo->exprList, uid, true) != 0) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; pQueryNode->pExpr = destroyQueryFuncExpr(pQueryNode->pExpr, pQueryNode->numOfOutput);
exit(-1);
if (pQueryNode->pPrevNodes != NULL) {
int32_t size = taosArrayGetSize(pQueryNode->pPrevNodes);
for(int32_t i = 0; i < size; ++i) {
SQueryNode* p = taosArrayGetP(pQueryNode->pPrevNodes, i);
doDestroyQueryNode(p);
} }
SArray* tableColumnList = taosArrayInit(4, sizeof(SColumn)); taosArrayDestroy(pQueryNode->pPrevNodes);
tscColumnListCopy(tableColumnList, pQueryInfo->colList, uid); }
tfree(pQueryNode);
}
// 2. void* qDestroyQueryPlan(SQueryNode* pQueryNode) {
SQueryNode* pNode = doCreateQueryPlanForOneTable(pQueryInfo); if (pQueryNode == NULL) {
UNUSED(pNode); return NULL;
} else { // only one table, normal query process
SQueryNode* pNode = doCreateQueryPlanForOneTable(pQueryInfo);
UNUSED(pNode);
} }
doDestroyQueryNode(pQueryNode);
return NULL; return NULL;
} }
char* queryPlanToString() { bool hasAliasName(SExprInfo* pExpr) {
return NULL; assert(pExpr != NULL);
return strncmp(pExpr->base.token, pExpr->base.aliasName, tListLen(pExpr->base.aliasName)) != 0;
}
static int32_t doPrintPlan(char* buf, SQueryNode* pQueryNode, int32_t level, int32_t totalLen) {
if (level > 0) {
sprintf(buf + totalLen, "%*c", level, ' ');
totalLen += level;
}
int32_t len1 = sprintf(buf + totalLen, "%s(", pQueryNode->info.name);
int32_t len = len1 + totalLen;
switch(pQueryNode->info.type) {
case QNODE_TABLESCAN: {
STimeWindow* win = (STimeWindow*) pQueryNode->pExtInfo;
len1 = sprintf(buf + len, "%s #0x%"PRIx64") time_range: %"PRId64" - %"PRId64"\n", pQueryNode->tableInfo.tableName,
pQueryNode->tableInfo.id.uid, win->skey, win->ekey);
len += len1;
break;
}
case QNODE_PROJECT: {
len1 = sprintf(buf + len, "cols: ");
len += len1;
for(int32_t i = 0; i < pQueryNode->numOfOutput; ++i) {
SSqlExpr* p = &pQueryNode->pExpr[i].base;
len1 = sprintf(buf + len, "[%s #%d]", p->aliasName, p->resColId);
len += len1;
if (i < pQueryNode->numOfOutput - 1) {
len1 = sprintf(buf + len, ", ");
len += len1;
}
}
len1 = sprintf(buf + len, ")");
len += len1;
//todo print filter info
len1 = sprintf(buf + len, " filters:(nil)\n");
len += len1;
break;
}
case QNODE_AGGREGATE: {
for(int32_t i = 0; i < pQueryNode->numOfOutput; ++i) {
SSqlExpr* pExpr = &pQueryNode->pExpr[i].base;
if (hasAliasName(&pQueryNode->pExpr[i])) {
len1 = sprintf(buf + len,"[%s #%s]", pExpr->token, pExpr->aliasName);
} else {
len1 = sprintf(buf + len,"[%s]", pExpr->token);
}
len += len1;
if (i < pQueryNode->numOfOutput - 1) {
len1 = sprintf(buf + len, ", ");
len += len1;
}
}
len1 = sprintf(buf + len, ")\n");
len += len1;
break;
}
case QNODE_TIMEWINDOW: {
for(int32_t i = 0; i < pQueryNode->numOfOutput; ++i) {
SSqlExpr* pExpr = &pQueryNode->pExpr[i].base;
if (hasAliasName(&pQueryNode->pExpr[i])) {
len1 = sprintf(buf + len,"[%s #%s]", pExpr->token, pExpr->aliasName);
} else {
len1 = sprintf(buf + len,"[%s]", pExpr->token);
}
len += len1;
if (i < pQueryNode->numOfOutput - 1) {
len1 = sprintf(buf + len,", ");
len += len1;
}
}
len1 = sprintf(buf + len,") ");
len += len1;
SInterval* pInterval = pQueryNode->pExtInfo;
len1 = sprintf(buf + len, "interval:%"PRId64"(%c), sliding:%"PRId64"(%c), offset:%"PRId64"\n", pInterval->interval,
pInterval->intervalUnit, pInterval->sliding, pInterval->slidingUnit, pInterval->offset);
len += len1;
break;
}
case QNODE_GROUPBY: { // todo hide the invisible column
for(int32_t i = 0; i < pQueryNode->numOfOutput; ++i) {
SSqlExpr* pExpr = &pQueryNode->pExpr[i].base;
if (hasAliasName(&pQueryNode->pExpr[i])) {
len1 = sprintf(buf + len,"[%s #%s]", pExpr->token, pExpr->aliasName);
} else {
len1 = sprintf(buf + len,"[%s]", pExpr->token);
}
len += len1;
if (i < pQueryNode->numOfOutput - 1) {
len1 = sprintf(buf + len,", ");
len += len1;
}
}
SGroupbyExpr* pGroupbyExpr = pQueryNode->pExtInfo;
SColIndex* pIndex = taosArrayGet(pGroupbyExpr->columnInfo, 0);
len1 = sprintf(buf + len,") groupby_col: [%s #%d]\n", pIndex->name, pIndex->colId);
len += len1;
break;
}
case QNODE_FILL: {
SFillEssInfo* pEssInfo = pQueryNode->pExtInfo;
len1 = sprintf(buf + len,"%d", pEssInfo->fillType);
len += len1;
if (pEssInfo->fillType == TSDB_FILL_SET_VALUE) {
len1 = sprintf(buf + len,", val:");
len += len1;
// todo get the correct fill data type
for(int32_t i = 0; i < pQueryNode->numOfOutput; ++i) {
len1 = sprintf(buf + len,"%"PRId64, pEssInfo->val[i]);
len += len1;
if (i < pQueryNode->numOfOutput - 1) {
len1 = sprintf(buf + len,", ");
len += len1;
}
}
}
len1 = sprintf(buf + len,")\n");
len += len1;
break;
}
case QNODE_LIMIT: {
SLimitVal* pVal = pQueryNode->pExtInfo;
len1 = sprintf(buf + len,"limit: %"PRId64", offset: %"PRId64")\n", pVal->limit, pVal->offset);
len += len1;
break;
}
case QNODE_DISTINCT:
case QNODE_TAGSCAN: {
len1 = sprintf(buf + len,"cols: ");
len += len1;
for(int32_t i = 0; i < pQueryNode->numOfOutput; ++i) {
SSqlExpr* p = &pQueryNode->pExpr[i].base;
len1 = sprintf(buf + len,"[%s #%d]", p->aliasName, p->resColId);
len += len1;
if (i < pQueryNode->numOfOutput - 1) {
len1 = sprintf(buf + len,", ");
len += len1;
}
}
len1 = sprintf(buf + len,")\n");
len += len1;
break;
}
case QNODE_JOIN: {
// print join condition
len1 = sprintf(buf + len, "\n");
len += len1;
break;
}
}
return len;
}
int32_t queryPlanToStringImpl(char* buf, SQueryNode* pQueryNode, int32_t level, int32_t totalLen) {
int32_t len = doPrintPlan(buf, pQueryNode, level, totalLen);
for(int32_t i = 0; i < taosArrayGetSize(pQueryNode->pPrevNodes); ++i) {
SQueryNode* p1 = taosArrayGetP(pQueryNode->pPrevNodes, i);
int32_t len1 = queryPlanToStringImpl(buf, p1, level + 1, len);
len = len1;
}
return len;
}
char* queryPlanToString(SQueryNode* pQueryNode) {
assert(pQueryNode);
char* buf = calloc(1, 4096);
int32_t len = sprintf(buf, "===== logic plan =====\n");
queryPlanToStringImpl(buf, pQueryNode, 0, len);
return buf;
} }
SQueryNode* queryPlanFromString() { SQueryNode* queryPlanFromString() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册