提交 409ad8c4 编写于 作者: H Haojun Liao

[td-3318]

上级 22698b8d
......@@ -88,9 +88,8 @@ typedef struct SVgroupTableInfo {
SArray* itemList; //SArray<STableIdInfo>
} SVgroupTableInfo;
static FORCE_INLINE SQueryInfo* tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex) {
static FORCE_INLINE SQueryInfo* tscGetQueryInfo(SSqlCmd* pCmd, int32_t subClauseIndex) {
assert(pCmd != NULL && subClauseIndex >= 0);
if (pCmd->pQueryInfo == NULL || subClauseIndex >= pCmd->numOfClause) {
return NULL;
}
......@@ -98,6 +97,8 @@ static FORCE_INLINE SQueryInfo* tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t sub
return pCmd->pQueryInfo[subClauseIndex];
}
SQueryInfo* tscGetActiveQueryInfo(SSqlCmd* pCmd);
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks);
void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta);
void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf);
......@@ -208,8 +209,11 @@ bool tscShouldBeFreed(SSqlObj* pSql);
STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd *pCmd, int32_t subClauseIndex, int32_t tableIndex);
STableMetaInfo* tscGetMetaInfo(SQueryInfo *pQueryInfo, int32_t tableIndex);
SQueryInfo *tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex);
SQueryInfo *tscGetQueryInfoDetailSafely(SSqlCmd *pCmd, int32_t subClauseIndex);
void tscInitQueryInfo(SQueryInfo* pQueryInfo);
void tscClearSubqueryInfo(SSqlCmd* pCmd);
int32_t tscAddQueryInfo(SSqlCmd *pCmd);
SQueryInfo *tscGetQueryInfo(SSqlCmd* pCmd, int32_t subClauseIndex);
SQueryInfo *tscGetQueryInfoS(SSqlCmd *pCmd, int32_t subClauseIndex);
void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo);
......@@ -217,11 +221,7 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, SName* name, STableM
SVgroupsInfo* vgroupList, SArray* pTagCols, SArray* pVgroupTables);
STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo *pQueryInfo);
int32_t tscAddSubqueryInfo(SSqlCmd *pCmd);
void tscInitQueryInfo(SQueryInfo* pQueryInfo);
void tscClearSubqueryInfo(SSqlCmd* pCmd);
void tscFreeVgroupTableInfo(SArray* pVgroupTables);
SArray* tscVgroupTableInfoDup(SArray* pVgroupTables);
void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index);
......@@ -233,6 +233,8 @@ int tscGetTableMetaEx(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, bool creat
void tscResetForNextRetrieve(SSqlRes* pRes);
void tscDoQuery(SSqlObj* pSql);
void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo);
void doExecuteQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo);
SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *pInfo);
void* tscVgroupInfoClear(SVgroupsInfo *pInfo);
......@@ -266,7 +268,7 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex);
int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid);
int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId);
void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex);
void tscPrintSelNodeList(SSqlObj* pSql, int32_t subClauseIndex);
bool hasMoreVnodesToTry(SSqlObj *pSql);
bool hasMoreClauseToTry(SSqlObj* pSql);
......
......@@ -228,6 +228,9 @@ typedef struct SQueryInfo {
int32_t round; // 0/1/....
int32_t bufLen;
char* buf;
struct SQueryInfo *sibling; // sibling
SArray *pUpstream; //SArray<struct SQueryInfo>
} SQueryInfo;
typedef struct {
......@@ -242,8 +245,6 @@ typedef struct {
};
uint32_t insertType; // TODO remove it
int32_t clauseIndex; // index of multiple subclause query
char * curSql; // current sql, resume position of sql after parsing paused
int8_t parseFinished;
char reserve2[3]; // fix bus error on arm32
......@@ -253,22 +254,26 @@ typedef struct {
uint32_t allocSize;
char * payload;
int32_t payloadLen;
SQueryInfo **pQueryInfo;
int32_t numOfClause;
int32_t clauseIndex; // index of multiple subclause query
SQueryInfo *active; // current active query info
int32_t batchSize; // for parameter ('?') binding and batch processing
int32_t numOfParams;
int8_t dataSourceType; // load data from file or not
char reserve4[3]; // fix bus error on arm32
char reserve4[3]; // fix bus error on arm32
int8_t submitSchema; // submit block is built with table schema
char reserve5[3]; // fix bus error on arm32
char reserve5[3]; // fix bus error on arm32
STagData tagData; // NOTE: pTagData->data is used as a variant length array
SName **pTableNameList; // all involved tableMeta list of current insert sql statement.
int32_t numOfTables;
SHashObj *pTableBlockHashList; // data block for each table
SArray *pDataBlocks; // SArray<STableDataBlocks*>. Merged submit block for each vgroup
SArray *pDataBlocks; // SArray<STableDataBlocks*>. Merged submit block for each vgroup
} SSqlCmd;
typedef struct SResRec {
......@@ -410,7 +415,7 @@ void tscInitMsgsFp();
int tsParseSql(SSqlObj *pSql, bool initial);
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet);
int tscProcessSql(SSqlObj *pSql);
int tscProcessSql(SSqlObj *pSql, SQueryInfo* pQueryInfo);
int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex);
void tscAsyncResultOnError(SSqlObj *pSql);
......@@ -425,6 +430,7 @@ void tscRestoreFuncForSTableQuery(SQueryInfo *pQueryInfo);
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta);
......
......@@ -69,7 +69,8 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
return;
}
tscDoQuery(pSql);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
executeQuery(pSql, pQueryInfo);
}
// TODO return the correct error code to client in tscQueueAsyncError
......@@ -179,7 +180,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
tscFetchDatablockForSubquery(pSql);
} else {
tscProcessSql(pSql);
tscProcessSql(pSql, NULL);
}
}
......@@ -193,8 +194,8 @@ static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOf
tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchRowsProxy);
}
void taos_fetch_rows_a(TAOS_RES *taosa, __async_cb_func_t fp, void *param) {
SSqlObj *pSql = (SSqlObj *)taosa;
void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
SSqlObj *pSql = (SSqlObj *)tres;
if (pSql == NULL || pSql->signature != pSql) {
tscError("sql object is NULL");
tscQueueAsyncError(fp, param, TSDB_CODE_TSC_DISCONNECTED);
......@@ -206,18 +207,17 @@ void taos_fetch_rows_a(TAOS_RES *taosa, __async_cb_func_t fp, void *param) {
// user-defined callback function is stored in fetchFp
pSql->fetchFp = fp;
pSql->fp = tscAsyncFetchRowsProxy;
pSql->fp = tscAsyncFetchRowsProxy;
pSql->param = param;
if (pRes->qhandle == 0) {
tscError("qhandle is NULL");
pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE;
pSql->param = param;
tscAsyncResultOnError(pSql);
return;
}
pSql->param = param;
tscResetForNextRetrieve(pRes);
// handle the sub queries of join query
......@@ -255,8 +255,9 @@ void taos_fetch_rows_a(TAOS_RES *taosa, __async_cb_func_t fp, void *param) {
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
}
tscProcessSql(pSql);
SQueryInfo* pQueryInfo1 = tscGetActiveQueryInfo(&pSql->cmd);
tscProcessSql(pSql, pQueryInfo1);
}
}
......@@ -330,7 +331,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
tscDebug("%p get %s successfully", pSql, msg);
if (pSql->pStream == NULL) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
// check if it is a sub-query of super table query first, if true, enter another routine
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY|TSDB_QUERY_TYPE_SUBQUERY|TSDB_QUERY_TYPE_TAG_FILTER_QUERY))) {
......@@ -348,7 +349,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0));
// tscProcessSql can add error into async res
tscProcessSql(pSql);
tscProcessSql(pSql, NULL);
taosReleaseRef(tscObjRef, pSql->self);
return;
} else { // continue to process normal async query
......@@ -379,9 +380,9 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
goto _error;
}
tscProcessSql(pSql);
tscProcessSql(pSql, NULL);
} else { // in all other cases, simple retry
tscProcessSql(pSql);
tscProcessSql(pSql, NULL);
}
taosReleaseRef(tscObjRef, pSql->self);
......@@ -408,11 +409,15 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
}
(*pSql->fp)(pSql->param, pSql, code);
taosReleaseRef(tscObjRef, pSql->self);
return;
} else if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) {
tscHandleMultivnodeInsert(pSql);
} else {
SQueryInfo* pQueryInfo1 = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
executeQuery(pSql, pQueryInfo1);
}
// proceed to invoke the tscDoQuery();
taosReleaseRef(tscObjRef, pSql->self);
return;
}
}
......@@ -449,7 +454,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
return;
}
tscDoQuery(pSql);
// tscDoQuery(pSql);
taosReleaseRef(tscObjRef, pSql->self);
......
......@@ -53,7 +53,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
SSqlRes *pRes = &pSql->res;
// one column for each row
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta;
......@@ -154,7 +154,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
pSql->cmd.numOfCols = numOfCols;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
pQueryInfo->order.order = TSDB_ORDER_ASC;
TAOS_FIELD f = {.type = TSDB_DATA_TYPE_BINARY, .bytes = (TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE};
......@@ -199,7 +199,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
}
static int32_t tscProcessDescribeTable(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
assert(tscGetMetaInfo(pQueryInfo, 0)->pTableMeta != NULL);
......@@ -389,7 +389,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
SColumnIndex index = {0};
pSql->cmd.numOfCols = 2;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
pQueryInfo->order.order = TSDB_ORDER_ASC;
TAOS_FIELD f;
......@@ -427,7 +427,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
static int32_t tscSCreateSetValueToResObj(SSqlObj *pSql, int32_t rowLen, const char *tableName, const char *ddl) {
SSqlRes *pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
int32_t numOfRows = 1;
if (strlen(ddl) == 0) {
......@@ -444,7 +444,7 @@ static int32_t tscSCreateSetValueToResObj(SSqlObj *pSql, int32_t rowLen, const c
return 0;
}
static int32_t tscSCreateBuildResult(SSqlObj *pSql, BuildType type, const char *str, const char *result) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
int32_t rowLen = tscSCreateBuildResultFields(pSql, type, result);
tscFieldInfoUpdateOffset(pQueryInfo);
......@@ -552,7 +552,7 @@ static int32_t tscGetTableTagColumnName(SSqlObj *pSql, char **result) {
return TSDB_CODE_SUCCESS;
}
static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, char *ddl) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta;
......@@ -606,7 +606,7 @@ static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, ch
}
static int32_t tscRebuildDDLForNormalTable(SSqlObj *pSql, const char *tableName, char *ddl) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta;
......@@ -633,7 +633,7 @@ static int32_t tscRebuildDDLForNormalTable(SSqlObj *pSql, const char *tableName,
}
static int32_t tscRebuildDDLForSuperTable(SSqlObj *pSql, const char *tableName, char *ddl) {
char *result = ddl;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta;
......@@ -674,7 +674,7 @@ static int32_t tscRebuildDDLForSuperTable(SSqlObj *pSql, const char *tableName,
}
static int32_t tscProcessShowCreateTable(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert(pTableMetaInfo->pTableMeta != NULL);
......@@ -700,7 +700,7 @@ static int32_t tscProcessShowCreateTable(SSqlObj *pSql) {
}
static int32_t tscProcessShowCreateDatabase(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......@@ -727,7 +727,7 @@ static int32_t tscProcessShowCreateDatabase(SSqlObj *pSql) {
return TSDB_CODE_TSC_ACTION_IN_PROGRESS;
}
static int32_t tscProcessCurrentUser(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resBytes = TSDB_USER_LEN + TSDB_DATA_TYPE_BINARY;
......@@ -754,7 +754,7 @@ static int32_t tscProcessCurrentDB(SSqlObj *pSql) {
extractDBName(pSql->pTscObj->db, db);
pthread_mutex_unlock(&pSql->pTscObj->mutex);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, pSql->cmd.clauseIndex);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resType = TSDB_DATA_TYPE_BINARY;
......@@ -781,7 +781,7 @@ static int32_t tscProcessCurrentDB(SSqlObj *pSql) {
static int32_t tscProcessServerVer(SSqlObj *pSql) {
const char* v = pSql->pTscObj->sversion;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, pSql->cmd.clauseIndex);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resType = TSDB_DATA_TYPE_BINARY;
......@@ -804,7 +804,7 @@ static int32_t tscProcessServerVer(SSqlObj *pSql) {
}
static int32_t tscProcessClientVer(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resType = TSDB_DATA_TYPE_BINARY;
......@@ -856,7 +856,7 @@ static int32_t tscProcessServStatus(SSqlObj *pSql) {
return pSql->res.code;
}
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
int32_t val = 1;
......@@ -870,7 +870,7 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa
pCmd->numOfCols = 1;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
pQueryInfo->order.order = TSDB_ORDER_ASC;
tscFieldInfoClear(&pQueryInfo->fieldsInfo);
......
......@@ -61,7 +61,7 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalMerger *pReducer, tOrderDescr
* the fields and offset attributes in pCmd and pModel may be different due to
* merge requirement. So, the final result in pRes structure is formatted in accordance with the pCmd object.
*/
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) {
......@@ -262,7 +262,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
#ifdef _DEBUG_VIEW
printf("load data page into mem for build loser tree: %" PRIu64 " rows\n", ds->filePage.num);
SSrcColumnInfo colInfo[256] = {0};
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo * pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
tscGetSrcColumnInfo(colInfo, pQueryInfo);
......@@ -297,7 +297,7 @@ void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrde
param->pLocalData = pReducer->pLocalDataSrc;
param->pDesc = pReducer->pDesc;
param->num = pReducer->pLocalDataSrc[0]->pMemBuffer->numOfElemsPerPage;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
param->groupOrderType = pQueryInfo->groupbyExpr.orderType;
pReducer->orderPrjOnSTable = tscOrderedProjectionQueryOnSTable(pQueryInfo, 0);
......@@ -491,7 +491,7 @@ void tscDestroyLocalMerger(SSqlObj *pSql) {
}
SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
// there is no more result, so we release all allocated resource
SLocalMerger *pLocalMerge = (SLocalMerger *)atomic_exchange_ptr(&pRes->pLocalMerger, NULL);
......@@ -545,7 +545,7 @@ void tscDestroyLocalMerger(SSqlObj *pSql) {
static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCmd, SColumnModel *pModel) {
int32_t numOfGroupByCols = 0;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) {
numOfGroupByCols = pQueryInfo->groupbyExpr.numOfGroupCols;
......@@ -608,7 +608,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm
}
bool isSameGroup(SSqlCmd *pCmd, SLocalMerger *pReducer, char *pPrev, tFilePage *tmpBuffer) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
// disable merge procedure for column projection query
int16_t functionId = pReducer->pCtx[0].functionId;
......@@ -659,7 +659,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
SColumnModel *pModel = NULL;
*pFinalModel = NULL;
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo * pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
(*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * pSql->subState.numOfSub);
......@@ -949,7 +949,7 @@ static void doFillResult(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool doneOutp
SSqlRes *pRes = &pSql->res;
tFilePage *pBeforeFillData = pLocalMerge->pResultBuf;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SFillInfo *pFillInfo = pLocalMerge->pFillInfo;
// todo extract function
......@@ -1048,7 +1048,7 @@ static void savePreviousRow(SLocalMerger *pLocalMerge, tFilePage *tmpBuffer) {
static void doExecuteFinalMerge(SSqlCmd *pCmd, SLocalMerger *pLocalMerge, bool needInit) {
// the tag columns need to be set before all functions execution
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t j = 0; j < size; ++j) {
......@@ -1215,7 +1215,7 @@ static bool saveGroupResultInfo(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
if (pRes->numOfRowsGroup > 0) {
pRes->numOfGroups += 1;
......@@ -1244,7 +1244,7 @@ bool genFinalResults(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool noMoreCurren
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo * pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
tFilePage * pResBuf = pLocalMerge->pResultBuf;
SColumnModel *pModel = pLocalMerge->resColModel;
......@@ -1310,7 +1310,7 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalMerger *
pRes->numOfRows = 0;
pRes->numOfRowsGroup = 0;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
pQueryInfo->limit.offset = pLocalMerge->offset;
......@@ -1333,7 +1333,7 @@ static bool doBuildFilledResultForGroup(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SLocalMerger *pLocalMerge = pRes->pLocalMerger;
SFillInfo *pFillInfo = pLocalMerge->pFillInfo;
......@@ -1364,7 +1364,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
bool prevGroupCompleted = (!pLocalMerge->discard) && pLocalMerge->hasUnprocessedRow;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
if ((isAllSourcesCompleted(pLocalMerge) && !pLocalMerge->hasPrevRow) || pLocalMerge->pLocalDataSrc[0] == NULL ||
prevGroupCompleted) {
......@@ -1405,7 +1405,7 @@ static void doProcessResultInNextWindow(SSqlObj *pSql, int32_t numOfRes) {
SSqlRes *pRes = &pSql->res;
SLocalMerger *pLocalMerge = pRes->pLocalMerger;
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo * pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t k = 0; k < size; ++k) {
......@@ -1437,7 +1437,7 @@ int32_t tscDoLocalMerge(SSqlObj *pSql) {
}
SLocalMerger *pLocalMerge = pRes->pLocalMerger;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
tFilePage *tmpBuffer = pLocalMerge->pTempBuffer;
if (doHandleLastRemainData(pSql)) {
......
......@@ -759,7 +759,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
const int32_t STABLE_INDEX = 1;
SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
char *sql = *sqlstr;
......@@ -1055,7 +1055,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
int32_t totalNum = 0;
int32_t code = TSDB_CODE_SUCCESS;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
assert(pQueryInfo != NULL);
STableMetaInfo *pTableMetaInfo = (pQueryInfo->numOfTables == 0)? tscAddEmptyMetaInfo(pQueryInfo):tscGetMetaInfo(pQueryInfo, 0);
......@@ -1313,7 +1313,7 @@ int tsInsertInitialCheck(SSqlObj *pSql) {
pCmd->count = 0;
pCmd->command = TSDB_SQL_INSERT;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfoS(pCmd, pCmd->clauseIndex);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT | pCmd->insertType);
......@@ -1403,7 +1403,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock
return code;
}
return tscProcessSql(pSql);
return tscProcessSql(pSql, NULL);
}
typedef struct SImportFileSupport {
......
......@@ -815,7 +815,7 @@ static int insertStmtExecute(STscStmt* stmt) {
pRes->numOfRows = 0;
pRes->numOfTotal = 0;
tscProcessSql(pSql);
tscProcessSql(pSql, NULL);
// wait for the callback function to post the semaphore
tsem_wait(&pSql->rspSem);
......
此差异已折叠。
......@@ -237,7 +237,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
assert(pHB->self == pObj->hbrid);
pHB->retry = 0;
int32_t code = tscProcessSql(pHB);
int32_t code = tscProcessSql(pHB, NULL);
taosReleaseRef(tscObjRef, pObj->hbrid);
if (code != TSDB_CODE_SUCCESS) {
......@@ -302,7 +302,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
return;
}
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, 0);
if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
......@@ -469,13 +469,16 @@ int doProcessSql(SSqlObj *pSql) {
return TSDB_CODE_SUCCESS;
}
int tscProcessSql(SSqlObj *pSql) {
int tscProcessSql(SSqlObj *pSql, SQueryInfo* pQueryInfo) {
char name[TSDB_TABLE_FNAME_LEN] = {0};
SSqlCmd *pCmd = &pSql->cmd;
uint32_t type = 0;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (pQueryInfo == NULL) {
pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
}
STableMetaInfo *pTableMetaInfo = NULL;
if (pQueryInfo != NULL) {
......@@ -506,7 +509,7 @@ int tscProcessSql(SSqlObj *pSql) {
int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(&pSql->cmd);
pRetrieveMsg->free = htons(pQueryInfo->type);
pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
......@@ -545,7 +548,7 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
char* pMsg = pSql->cmd.payload;
......@@ -584,7 +587,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql, int32_t clauseIndex) {
const static int32_t MIN_QUERY_MSG_PKT_SIZE = TSDB_MAX_BYTES_PER_ROW * 5;
SSqlCmd* pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, clauseIndex);
int32_t srcColListSize = (int32_t)(taosArrayGetSize(pQueryInfo->colList) * sizeof(SColumnInfo));
......@@ -614,7 +617,9 @@ static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql, int32_t clauseIndex) {
}
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
SQueryInfo* pQueryInfo = tscGetActiveQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
TSKEY dfltKey = htobe64(pQueryMsg->window.skey);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
......@@ -698,7 +703,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_TSC_INVALID_SQL; // todo add test for this
}
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
......@@ -1371,7 +1376,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSchema *pSchema;
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// Reallocate the payload size
......@@ -1460,7 +1465,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
int tscEstimateAlterTableMsgLength(SSqlCmd *pCmd) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
return minMsgSize() + sizeof(SAlterTableMsg) + sizeof(SSchema) * tscNumOfFields(pQueryInfo) + TSDB_EXTRA_PAYLOAD_SIZE;
}
......@@ -1469,7 +1474,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int msgLen = 0;
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......@@ -1518,7 +1523,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
pCmd->payloadLen = htonl(pUpdateMsg->head.contLen);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
STableMeta *pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
SNewVgroupInfo vgroupInfo = {.vgId = -1};
......@@ -1554,7 +1559,7 @@ int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
pRetrieveMsg->free = htons(pQueryInfo->type);
......@@ -1578,7 +1583,7 @@ static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
pRes->row = 0;
pRes->rspType = 1;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
return pRes->code;
}
......@@ -1629,7 +1634,7 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
pRes->code = tscDoLocalMerge(pSql);
if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetActiveQueryInfo(pCmd);
tscCreateResPointerInfo(pRes, pQueryInfo);
tscSetResRawPtr(pRes, pQueryInfo);
}
......@@ -1683,7 +1688,7 @@ int tscBuildConnectMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableInfoMsg *pInfoMsg = (STableInfoMsg *)pCmd->payload;
......@@ -1753,7 +1758,7 @@ int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd;
char* pMsg = pCmd->payload;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, 0);
SSTableVgroupMsg *pStableVgroupMsg = (SSTableVgroupMsg *)pMsg;
pStableVgroupMsg->numOfTables = htonl(pQueryInfo->numOfTables);
......@@ -2096,7 +2101,7 @@ int tscProcessShowRsp(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......@@ -2155,7 +2160,7 @@ static void createHbObj(STscObj* pObj) {
pSql->fp = tscProcessHeartBeatRsp;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetailSafely(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoS(&pSql->cmd, 0);
if (pQueryInfo == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tfree(pSql);
......@@ -2311,7 +2316,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
pRes->completed = (pRetrieve->completed == 1);
pRes->data = pRetrieve->data;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo = tscGetActiveQueryInfo(pCmd);
if (tscCreateResPointerInfo(pRes, pQueryInfo) != TSDB_CODE_SUCCESS) {
return pRes->code;
}
......@@ -2325,6 +2330,7 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
tscSetResRawPtr(pRes, pQueryInfo);
}
prepareInputDataFromUpstream(pRes, pQueryInfo);
if (pSql->pSubscription != NULL) {
int32_t numOfCols = pQueryInfo->fieldsInfo.numOfOutput;
......@@ -2364,9 +2370,9 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn
pNew->signature = pNew;
pNew->cmd.command = TSDB_SQL_META;
tscAddSubqueryInfo(&pNew->cmd);
tscAddQueryInfo(&pNew->cmd);
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetailSafely(&pNew->cmd, 0);
SQueryInfo *pNewQueryInfo = tscGetQueryInfoS(&pNew->cmd, 0);
pNew->cmd.autoCreated = pSql->cmd.autoCreated; // create table if not exists
if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE + pSql->cmd.payloadLen)) {
......@@ -2400,7 +2406,7 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn
pSql->metaRid = pNew->self;
int32_t code = tscProcessSql(pNew);
int32_t code = tscProcessSql(pNew, NULL);
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_TSC_ACTION_IN_PROGRESS; // notify application that current process needs to be terminated
}
......@@ -2455,7 +2461,7 @@ int tscGetTableMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool create
int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
char name[TSDB_TABLE_FNAME_LEN] = {0};
......@@ -2479,7 +2485,7 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
}
static bool allVgroupInfoRetrieved(SSqlCmd* pCmd, int32_t clauseIndex) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, clauseIndex);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
if (pTableMetaInfo->vgroupList == NULL) {
......@@ -2506,13 +2512,13 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
pNew->cmd.command = TSDB_SQL_STABLEVGROUP;
// TODO TEST IT
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetailSafely(&pNew->cmd, 0);
SQueryInfo *pNewQueryInfo = tscGetQueryInfoS(&pNew->cmd, 0);
if (pNewQueryInfo == NULL) {
tscFreeSqlObj(pNew);
return code;
}
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, clauseIndex);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i);
STableMeta* pTableMeta = tscTableMetaDup(pMInfo->pTableMeta);
......@@ -2536,7 +2542,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
pNew->fp = tscTableMetaCallBack;
pNew->param = (void *)pSql->self;
code = tscProcessSql(pNew);
code = tscProcessSql(pNew, NULL);
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;
}
......
......@@ -191,7 +191,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
pSql->fp = syncConnCallback;
pSql->param = pSql;
tscProcessSql(pSql);
tscProcessSql(pSql, NULL);
tsem_wait(&pSql->rspSem);
if (pSql->res.code != TSDB_CODE_SUCCESS) {
......@@ -265,7 +265,7 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port,
if (taos) *taos = pObj;
pSql->fetchFp = fp;
pSql->res.code = tscProcessSql(pSql);
pSql->res.code = tscProcessSql(pSql, NULL);
tscDebug("%p DB async connection is opening", taos);
return pObj;
}
......@@ -373,7 +373,7 @@ int taos_num_fields(TAOS_RES *res) {
if (pSql == NULL || pSql->signature != pSql) return 0;
int32_t num = 0;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
if (pQueryInfo == NULL) {
return num;
}
......@@ -407,7 +407,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
SSqlObj *pSql = (SSqlObj *)res;
if (pSql == NULL || pSql->signature != pSql) return 0;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
if (pQueryInfo == NULL) {
return NULL;
}
......@@ -558,7 +558,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) {
return true;
}
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
if ((pQueryInfo == NULL) || tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
return true;
......@@ -577,7 +577,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
tscDebug("%p send msg to dnode to free qhandle ASAP before free sqlObj, command:%s", pSql, sqlCmd[pCmd->command]);
tscProcessSql(pSql);
tscProcessSql(pSql, NULL);
return false;
}
......@@ -671,7 +671,7 @@ char *taos_get_client_info() { return version; }
static void tscKillSTableQuery(SSqlObj *pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
return;
......@@ -722,7 +722,7 @@ void taos_stop_query(TAOS_RES *res) {
// set the error code for master pSqlObj firstly
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
assert(pSql->rpcRid <= 0);
......@@ -752,7 +752,7 @@ bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
return true;
}
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
if (pQueryInfo == NULL) {
return true;
}
......@@ -932,7 +932,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t
int code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
char *str = (char *)tblNameList;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfoS(pCmd, pCmd->clauseIndex);
if (pQueryInfo == NULL) {
pSql->res.code = terrno;
return terrno;
......
......@@ -89,7 +89,7 @@ static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) {
return;
}
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
......@@ -130,7 +130,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
pStream->numOfRes = 0; // reset the numOfRes.
SSqlObj *pSql = pStream->pSql;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
tscDebug("%p add into timer", pSql);
if (pStream->isProject) {
......@@ -208,7 +208,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
static void tscStreamFillTimeGap(SSqlStream* pStream, TSKEY ts) {
#if 0
SSqlObj * pSql = pStream->pSql;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
if (pQueryInfo->fillType != TSDB_FILL_SET_VALUE && pQueryInfo->fillType != TSDB_FILL_NULL) {
return;
......@@ -421,7 +421,7 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
int64_t minIntervalTime =
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinIntervalTime * 1000L : tsMinIntervalTime;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
if (!pStream->isProject && pQueryInfo->interval.interval == 0) {
sprintf(pSql->cmd.payload, "the interval value is 0");
......@@ -471,7 +471,7 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
}
static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, int64_t stime) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
if (pStream->isProject) {
// no data in table, flush all data till now to destination meter, 10sec delay
......@@ -530,7 +530,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
return;
}
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, 0);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
......
......@@ -284,7 +284,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
}
size_t numOfTables = taosArrayGetSize(tables);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, 0);
SArray* progress = taosArrayInit(numOfTables, sizeof(SSubscriptionProgress));
for( size_t i = 0; i < numOfTables; i++ ) {
STidTags* tt = taosArrayGet( tables, i );
......@@ -304,7 +304,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
}
taosArrayDestroy(tables);
TSDB_QUERY_SET_TYPE(tscGetQueryInfoDetail(pCmd, 0)->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
TSDB_QUERY_SET_TYPE(tscGetQueryInfo(pCmd, 0)->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
return 1;
}
......@@ -502,7 +502,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
if (taosArrayGetSize(pSub->progress) > 0) { // fix crash in single table subscription
size_t size = taosArrayGetSize(pSub->progress);
......
此差异已折叠。
......@@ -371,6 +371,10 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
}
}
void prepareInputDataFromUpstream(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
printf("abc\n");
}
static void tscDestroyResPointerInfo(SSqlRes* pRes) {
if (pRes->buffer != NULL) { // free all buffers containing the multibyte string
for (int i = 0; i < pRes->numOfCols; i++) {
......@@ -404,7 +408,7 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) {
}
for (int32_t i = 0; i < pCmd->numOfClause; ++i) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, i);
freeQueryInfoImpl(pQueryInfo);
clearAllTableMetaInfo(pQueryInfo, removeMeta);
......@@ -669,6 +673,10 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
return TSDB_CODE_SUCCESS;
}
SQueryInfo* tscGetActiveQueryInfo(SSqlCmd* pCmd) {
return pCmd->active;
}
/**
* create the in-memory buffer for each table to keep the submitted data block
* @param initialSize
......@@ -1697,7 +1705,7 @@ STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd* pCmd, int32_t clauseIndex, i
assert(clauseIndex >= 0 && clauseIndex < pCmd->numOfClause);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, clauseIndex);
return tscGetMetaInfo(pQueryInfo, tableIndex);
}
......@@ -1714,17 +1722,17 @@ STableMetaInfo* tscGetMetaInfo(SQueryInfo* pQueryInfo, int32_t tableIndex) {
return pQueryInfo->pTableMetaInfo[tableIndex];
}
SQueryInfo* tscGetQueryInfoDetailSafely(SSqlCmd* pCmd, int32_t subClauseIndex) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex);
SQueryInfo* tscGetQueryInfoS(SSqlCmd* pCmd, int32_t subClauseIndex) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, subClauseIndex);
int32_t ret = TSDB_CODE_SUCCESS;
while ((pQueryInfo) == NULL) {
if ((ret = tscAddSubqueryInfo(pCmd)) != TSDB_CODE_SUCCESS) {
if ((ret = tscAddQueryInfo(pCmd)) != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL;
}
pQueryInfo = tscGetQueryInfoDetail(pCmd, subClauseIndex);
pQueryInfo = tscGetQueryInfo(pCmd, subClauseIndex);
}
return pQueryInfo;
......@@ -1753,18 +1761,20 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) {
pQueryInfo->fieldsInfo.internalField = taosArrayInit(4, sizeof(SInternalField));
assert(pQueryInfo->exprList == NULL);
pQueryInfo->exprList = taosArrayInit(4, POINTER_BYTES);
pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
pQueryInfo->udColumnId = TSDB_UD_COLUMN_INDEX;
pQueryInfo->resColumnId = -1000;
pQueryInfo->limit.limit = -1;
pQueryInfo->limit.offset = 0;
pQueryInfo->slimit.limit = -1;
pQueryInfo->slimit.offset = 0;
pQueryInfo->exprList = taosArrayInit(4, POINTER_BYTES);
pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
pQueryInfo->udColumnId = TSDB_UD_COLUMN_INDEX;
pQueryInfo->resColumnId = -1000;
pQueryInfo->limit.limit = -1;
pQueryInfo->limit.offset = 0;
pQueryInfo->slimit.limit = -1;
pQueryInfo->slimit.offset = 0;
pQueryInfo->pUpstream = taosArrayInit(4, POINTER_BYTES);
}
int32_t tscAddSubqueryInfo(SSqlCmd* pCmd) {
int32_t tscAddQueryInfo(SSqlCmd* pCmd) {
assert(pCmd != NULL);
// todo refactor: remove this structure
......@@ -1814,7 +1824,7 @@ static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) {
void tscClearSubqueryInfo(SSqlCmd* pCmd) {
for (int32_t i = 0; i < pCmd->numOfClause; ++i) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, i);
freeQueryInfoImpl(pQueryInfo);
}
}
......@@ -2001,7 +2011,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, in
return NULL;
}
if (tscAddSubqueryInfo(pCmd) != TSDB_CODE_SUCCESS) {
if (tscAddQueryInfo(pCmd) != TSDB_CODE_SUCCESS) {
#ifdef __APPLE__
// to satisfy later tsem_destroy in taos_free_result
tsem_init(&pNew->rspSem, 0, 0);
......@@ -2016,7 +2026,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, in
pNew->sqlstr = NULL;
pNew->maxRetry = TSDB_MAX_REPLICA;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetailSafely(pCmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfoS(pCmd, 0);
assert(pSql->cmd.clauseIndex == 0);
STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
......@@ -2091,7 +2101,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
pnCmd->payload = NULL;
pnCmd->allocSize = 0;
pnCmd->pQueryInfo = NULL;
pnCmd->pQueryInfo = NULL;
pnCmd->numOfClause = 0;
pnCmd->clauseIndex = 0;
pnCmd->pDataBlocks = NULL;
......@@ -2103,13 +2113,13 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
pnCmd->tagData.data = NULL;
pnCmd->tagData.dataLen = 0;
if (tscAddSubqueryInfo(pnCmd) != TSDB_CODE_SUCCESS) {
if (tscAddQueryInfo(pnCmd) != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
}
SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(pnCmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pnCmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
pNewQueryInfo->command = pQueryInfo->command;
memcpy(&pNewQueryInfo->interval, &pQueryInfo->interval, sizeof(pNewQueryInfo->interval));
......@@ -2171,7 +2181,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
// set the correct query type
if (pPrevSql != NULL) {
SQueryInfo* pPrevQueryInfo = tscGetQueryInfoDetail(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex);
SQueryInfo* pPrevQueryInfo = tscGetQueryInfo(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex);
pNewQueryInfo->type = pPrevQueryInfo->type;
} else {
TSDB_QUERY_SET_TYPE(pNewQueryInfo->type, TSDB_QUERY_TYPE_SUBQUERY);// it must be the subquery
......@@ -2241,7 +2251,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
size, pNewQueryInfo->fieldsInfo.numOfOutput, tNameGetTableName(&pFinalInfo->name), pNewQueryInfo->window.skey,
pNewQueryInfo->window.ekey, pNewQueryInfo->order.order, pNewQueryInfo->limit.limit);
tscPrintSelectClause(pNew, 0);
tscPrintSelNodeList(pNew, 0);
} else {
tscDebug("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->vgroupIndex);
}
......@@ -2254,6 +2264,41 @@ _error:
return NULL;
}
void doExecuteQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
uint16_t type = pQueryInfo->type;
if (QUERY_IS_JOIN_QUERY(type) && !TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_SUBQUERY)) {
tscHandleMasterJoinQuery(pSql);
} else if (tscMultiRoundQuery(pQueryInfo, 0) && pQueryInfo->round == 0) {
tscHandleFirstRoundStableQuery(pSql); // todo lock?
} else if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query
tscLockByThread(&pSql->squeryLock);
tscHandleMasterSTableQuery(pSql);
tscUnlockByThread(&pSql->squeryLock);
} else if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) {
tscHandleMultivnodeInsert(pSql);
} else if (pSql->cmd.command > TSDB_SQL_LOCAL) {
tscProcessLocalCmd(pSql);
} else { // send request to server directly
tscProcessSql(pSql, pQueryInfo);
}
}
// do execute the query according to the query execution plan
void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
if (taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // nest query. do execute it firstly
SQueryInfo* pq = taosArrayGetP(pQueryInfo->pUpstream, 0);
pSql->cmd.active = pq;
executeQuery(pSql, pq);
// merge nest query result and generate final results
return;
}
pSql->cmd.active = pQueryInfo;
doExecuteQuery(pSql, pQueryInfo);
}
/**
* To decide if current is a two-stage super table query, join query, or insert. And invoke different
* procedure accordingly
......@@ -2277,27 +2322,22 @@ void tscDoQuery(SSqlObj* pSql) {
if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) {
tscImportDataFromFile(pSql);
} else {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
uint16_t type = pQueryInfo->type;
if (TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_INSERT)) { // multi-vnodes insertion
tscHandleMultivnodeInsert(pSql);
return;
}
if (QUERY_IS_JOIN_QUERY(type)) {
if (!TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_SUBQUERY)) {
tscHandleMasterJoinQuery(pSql);
} else { // for first stage sub query, iterate all vnodes to get all timestamp
if (!TSDB_QUERY_HAS_TYPE(type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
tscProcessSql(pSql);
tscProcessSql(pSql, NULL);
} else { // secondary stage join query.
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query
tscLockByThread(&pSql->squeryLock);
tscHandleMasterSTableQuery(pSql);
tscUnlockByThread(&pSql->squeryLock);
} else {
tscProcessSql(pSql);
tscProcessSql(pSql, NULL);
}
}
}
......@@ -2313,7 +2353,7 @@ void tscDoQuery(SSqlObj* pSql) {
return;
}
tscProcessSql(pSql);
tscProcessSql(pSql, NULL);
}
}
......@@ -2368,7 +2408,7 @@ bool tscIsQueryWithLimit(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
for (int32_t i = 0; i < pCmd->numOfClause; ++i) {
SQueryInfo* pqi = tscGetQueryInfoDetailSafely(pCmd, i);
SQueryInfo* pqi = tscGetQueryInfoS(pCmd, i);
if (pqi == NULL) {
continue;
}
......@@ -2453,7 +2493,7 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) {
}
assert(pRes->completed);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// for normal table, no need to try any more if results are all retrieved from one vnode
......@@ -2478,7 +2518,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
/*
* no result returned from the current virtual node anymore, try the next vnode if exists
......@@ -2524,7 +2564,7 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
// set the callback function
pSql->fp = fp;
tscProcessSql(pSql);
tscProcessSql(pSql, NULL);
} else {
tscDebug("%p try all %d vnodes, query complete. current numOfRes:%" PRId64, pSql, totalVgroups, pRes->numOfClauseTotal);
}
......@@ -2538,7 +2578,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) {
assert(pCmd->clauseIndex < pCmd->numOfClause - 1);
pCmd->clauseIndex++;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
pSql->cmd.command = pQueryInfo->command;
......@@ -2556,7 +2596,7 @@ void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp) {
if (pCmd->command > TSDB_SQL_LOCAL) {
tscProcessLocalCmd(pSql);
} else {
tscDoQuery(pSql);
executeQuery(pSql, pQueryInfo);
}
}
......
......@@ -388,9 +388,10 @@ typedef enum {
typedef enum {
TSDB_SUPER_TABLE = 0, // super table
TSDB_CHILD_TABLE = 1, // table created from super table
TSDB_NORMAL_TABLE = 2, // ordinary table
TSDB_STREAM_TABLE = 3, // table created from stream computing
TSDB_TABLE_MAX = 4
TSDB_NORMAL_TABLE = 2, // ordinary table
TSDB_STREAM_TABLE = 3, // table created from stream computing
TSDB_TEMP_TABLE = 4, // temp table created by nest query
TSDB_TABLE_MAX = 5
} ETableType;
typedef enum {
......
......@@ -86,7 +86,7 @@ typedef struct SSessionWindowVal {
struct SFromInfo;
typedef struct SQuerySqlNode {
struct SArray *pSelectList; // select clause
struct SArray *pSelNodeList; // select clause
struct SFromInfo *from; // from clause SArray<SQuerySqlNode>
struct tSqlExpr *pWhere; // where clause [optional]
SArray *pGroupby; // groupby clause, only for tags[optional], SArray<tVariantListItem>
......@@ -113,7 +113,7 @@ typedef struct SSubclauseInfo { // "UNION" multiple select sub-clause
typedef struct SFromInfo {
int32_t type; // nested query|table name list
union {
SSubclauseInfo *pNode;
SSubclauseInfo pNode;
SArray *tableList; // SArray<STableNamePair>
};
} SFromInfo;
......@@ -254,7 +254,7 @@ SArray *tVariantListInsert(SArray *pList, tVariant *pVar, uint8_t sortOrder, int
SArray *tVariantListAppendToken(SArray *pList, SStrToken *pAliasToken, uint8_t sortOrder);
SFromInfo *setTableNameList(SFromInfo* pFromInfo, SStrToken *pName, SStrToken* pAlias);
SFromInfo *setSubquery(SFromInfo* pFromInfo, SQuerySqlNode *pSqlNode);
SFromInfo *setSubquery(SFromInfo* pFromInfo, SSubclauseInfo* pSqlNode);
void *destroyFromInfo(SFromInfo* pFromInfo);
// sql expr leaf node
......@@ -270,7 +270,7 @@ void tSqlExprDestroy(tSqlExpr *pExpr);
SArray *tSqlExprListAppend(SArray *pList, tSqlExpr *pNode, SStrToken *pDistinct, SStrToken *pToken);
void tSqlExprListDestroy(SArray *pList);
SQuerySqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelectList, SFromInfo *pFrom, tSqlExpr *pWhere,
SQuerySqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SFromInfo *pFrom, tSqlExpr *pWhere,
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *ps,
SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *pgLimit);
......
......@@ -507,33 +507,26 @@ distinct(X) ::= . { X.n = 0;}
// A complete FROM clause.
%type from {SFromInfo*}
from(A) ::= FROM tablelist(X). {A = X;}
from(A) ::= FROM LP union(Y) RP. {A = Y;}
from(A) ::= FROM LP union(Y) RP. {A = setSubquery(NULL, Y);}
%type tablelist {SArray*}
%type tablelist {SFromInfo*}
tablelist(A) ::= ids(X) cpxName(Y). {
toTSDBType(X.type);
X.n += Y.n;
A = setTableNameList(NULL, &X, NULL);
}
tablelist(A) ::= ids(X) cpxName(Y) ids(Z). {
toTSDBType(X.type);
toTSDBType(Z.type);
X.n += Y.n;
A = setTableNameList(NULL, &X, &Z);
}
tablelist(A) ::= tablelist(Y) COMMA ids(X) cpxName(Z). {
toTSDBType(X.type);
X.n += Z.n;
A = setTableNameList(Y, &X, NULL);
}
tablelist(A) ::= tablelist(Y) COMMA ids(X) cpxName(Z) ids(F). {
toTSDBType(X.type);
toTSDBType(F.type);
X.n += Z.n;
A = setTableNameList(Y, &X, &F);
}
......
......@@ -462,13 +462,13 @@ SFromInfo *setTableNameList(SFromInfo* pFromInfo, SStrToken *pName, SStrToken* p
return pFromInfo;
}
SFromInfo *setSubquery(SFromInfo* pFromInfo, SQuerySqlNode* pSqlNode) {
SFromInfo *setSubquery(SFromInfo* pFromInfo, SSubclauseInfo* pSqlNode) {
if (pFromInfo == NULL) {
pFromInfo = calloc(1, sizeof(SFromInfo));
}
pFromInfo->type = SQL_NODE_FROM_SUBQUERY;
pFromInfo->pNode->pClause[pFromInfo->pNode->numOfClause - 1] = pSqlNode;
pFromInfo->pNode = *pSqlNode;
return pFromInfo;
}
......@@ -481,7 +481,7 @@ void* destroyFromInfo(SFromInfo* pFromInfo) {
if (pFromInfo->type == SQL_NODE_FROM_NAMELIST) {
taosArrayDestroy(pFromInfo->tableList);
} else {
destroyAllSelectClause(pFromInfo->pNode);
destroyAllSelectClause(&pFromInfo->pNode);
}
tfree(pFromInfo);
......@@ -628,11 +628,11 @@ void tSetColumnType(TAOS_FIELD *pField, SStrToken *type) {
/*
* extract the select info out of sql string
*/
SQuerySqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelectList, SFromInfo *pFrom, tSqlExpr *pWhere,
SQuerySqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SFromInfo *pFrom, tSqlExpr *pWhere,
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval,
SSessionWindowVal *pSession, SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit,
SLimitVal *psLimit) {
assert(pSelectList != NULL);
assert(pSelNodeList != NULL);
SQuerySqlNode *pSqlNode = calloc(1, sizeof(SQuerySqlNode));
......@@ -640,7 +640,7 @@ SQuerySqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelectList, SF
pSqlNode->sqlstr = *pSelectToken;
pSqlNode->sqlstr.n = (uint32_t)strlen(pSqlNode->sqlstr.z);
pSqlNode->pSelectList = pSelectList;
pSqlNode->pSelNodeList = pSelNodeList;
pSqlNode->from = pFrom;
pSqlNode->pGroupby = pGroupby;
pSqlNode->pSortOrder = pSortOrder;
......@@ -702,9 +702,9 @@ void destroyQuerySqlNode(SQuerySqlNode *pQuerySql) {
return;
}
tSqlExprListDestroy(pQuerySql->pSelectList);
tSqlExprListDestroy(pQuerySql->pSelNodeList);
pQuerySql->pSelectList = NULL;
pQuerySql->pSelNodeList = NULL;
tSqlExprDestroy(pQuerySql->pWhere);
pQuerySql->pWhere = NULL;
......
......@@ -2649,45 +2649,38 @@ static void yy_reduce(
yymsp[0].minor.yy0 = yylhsminor.yy0;
break;
case 171: /* from ::= FROM tablelist */
{yymsp[-1].minor.yy70 = yymsp[0].minor.yy429;}
{yymsp[-1].minor.yy70 = yymsp[0].minor.yy70;}
break;
case 172: /* from ::= FROM LP union RP */
{yymsp[-3].minor.yy70 = yymsp[-1].minor.yy141;}
{yymsp[-3].minor.yy70 = setSubquery(NULL, yymsp[-1].minor.yy141);}
break;
case 173: /* tablelist ::= ids cpxName */
{
toTSDBType(yymsp[-1].minor.yy0.type);
yymsp[-1].minor.yy0.n += yymsp[0].minor.yy0.n;
yylhsminor.yy429 = setTableNameList(NULL, &yymsp[-1].minor.yy0, NULL);
yylhsminor.yy70 = setTableNameList(NULL, &yymsp[-1].minor.yy0, NULL);
}
yymsp[-1].minor.yy429 = yylhsminor.yy429;
yymsp[-1].minor.yy70 = yylhsminor.yy70;
break;
case 174: /* tablelist ::= ids cpxName ids */
{
toTSDBType(yymsp[-2].minor.yy0.type);
toTSDBType(yymsp[0].minor.yy0.type);
yymsp[-2].minor.yy0.n += yymsp[-1].minor.yy0.n;
yylhsminor.yy429 = setTableNameList(NULL, &yymsp[-2].minor.yy0, &yymsp[0].minor.yy0);
yylhsminor.yy70 = setTableNameList(NULL, &yymsp[-2].minor.yy0, &yymsp[0].minor.yy0);
}
yymsp[-2].minor.yy429 = yylhsminor.yy429;
yymsp[-2].minor.yy70 = yylhsminor.yy70;
break;
case 175: /* tablelist ::= tablelist COMMA ids cpxName */
{
toTSDBType(yymsp[-1].minor.yy0.type);
yymsp[-1].minor.yy0.n += yymsp[0].minor.yy0.n;
yylhsminor.yy429 = setTableNameList(yymsp[-3].minor.yy429, &yymsp[-1].minor.yy0, NULL);
yylhsminor.yy70 = setTableNameList(yymsp[-3].minor.yy70, &yymsp[-1].minor.yy0, NULL);
}
yymsp[-3].minor.yy429 = yylhsminor.yy429;
yymsp[-3].minor.yy70 = yylhsminor.yy70;
break;
case 176: /* tablelist ::= tablelist COMMA ids cpxName ids */
{
toTSDBType(yymsp[-2].minor.yy0.type);
toTSDBType(yymsp[0].minor.yy0.type);
yymsp[-2].minor.yy0.n += yymsp[-1].minor.yy0.n;
yylhsminor.yy429 = setTableNameList(yymsp[-4].minor.yy429, &yymsp[-2].minor.yy0, &yymsp[0].minor.yy0);
yylhsminor.yy70 = setTableNameList(yymsp[-4].minor.yy70, &yymsp[-2].minor.yy0, &yymsp[0].minor.yy0);
}
yymsp[-4].minor.yy429 = yylhsminor.yy429;
yymsp[-4].minor.yy70 = yylhsminor.yy70;
break;
case 177: /* tmvar ::= VARIABLE */
{yylhsminor.yy0 = yymsp[0].minor.yy0;}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册