diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 83a30b512d318e3fd85b6e1384e796ecb6c3cbd1..5e3b536a8f8b33cf20a409e3a89c1390aef812bf 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -45,9 +45,9 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const SSqlRes *pRes = &pSql->res; pSql->signature = pSql; - pSql->param = param; - pSql->pTscObj = pObj; - pSql->maxRetry = TSDB_MAX_REPLICA_NUM; + pSql->param = param; + pSql->pTscObj = pObj; + pSql->maxRetry = TSDB_MAX_REPLICA_NUM; pSql->fp = fp; if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) { @@ -146,7 +146,7 @@ static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) { } // local reducer has handle this situation during super table non-projection query. - if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC) { + if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE) { pRes->numOfTotalInCurrentClause += pRes->numOfRows; } @@ -176,7 +176,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo } pSql->fp = fp; - if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC && pCmd->command < TSDB_SQL_LOCAL) { + if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) { pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; } tscProcessSql(pSql); @@ -225,7 +225,7 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi if (pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE) { tscFetchDatablockFromSubquery(pSql); } else { - if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC && pCmd->command < TSDB_SQL_LOCAL) { + if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) { pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; } @@ -257,7 +257,7 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW), tscResetForNextRetrieve(pRes); pSql->fp = tscAsyncFetchSingleRowProxy; - if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC && pCmd->command < TSDB_SQL_LOCAL) { + if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) { pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index fb7174d9881547366621074cdfd4bbf6b8744413..00ffd2ac68df7e760351ac3dba5473fb309db21a 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -403,7 +403,6 @@ int doProcessSql(SSqlObj *pSql) { int tscProcessSql(SSqlObj *pSql) { char * name = NULL; - SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); @@ -423,36 +422,35 @@ int tscProcessSql(SSqlObj *pSql) { } tscTrace("%p SQL cmd:%d will be processed, name:%s, type:%d", pSql, pCmd->command, name, type); - if (pSql->cmd.command < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL + if (pCmd->command < TSDB_SQL_MGMT) { // the pTableMetaInfo cannot be NULL if (pTableMetaInfo == NULL) { pSql->res.code = TSDB_CODE_OTHERS; return pSql->res.code; } - } else if (pSql->cmd.command < TSDB_SQL_LOCAL) { - pSql->ipList = tscMgmtIpSet; + } else if (pCmd->command < TSDB_SQL_LOCAL) { + pSql->ipList = tscMgmtIpSet; //? } else { // local handler return (*tscProcessMsgRsp[pCmd->command])(pSql); } - // todo handle async situation - if (QUERY_IS_JOIN_QUERY(type)) { - if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0) { - return tscHandleMasterJoinQuery(pSql); - } else { - // for first stage sub query, iterate all vnodes to get all timestamp - if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) { - return doProcessSql(pSql); - } - } - } - - if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query - tscHandleMasterSTableQuery(pSql); - return pRes->code; - } else if (pSql->fp == (void(*)())tscHandleMultivnodeInsert) { // multi-vnodes insertion - tscHandleMultivnodeInsert(pSql); - return pSql->res.code; - } +// if (QUERY_IS_JOIN_QUERY(type)) { +// if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0) { +// return tscHandleMasterJoinQuery(pSql); +// } else { +// // for first stage sub query, iterate all vnodes to get all timestamp +// if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) { +// return doProcessSql(pSql); +// } +// } +// } +// +// if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query +// tscHandleMasterSTableQuery(pSql); +// return pRes->code; +// } else if (pSql->fp == (void(*)())tscHandleMultivnodeInsert) { // multi-vnodes insertion +// tscHandleMultivnodeInsert(pSql); +// return pRes->code; +// } return doProcessSql(pSql); } @@ -489,7 +487,7 @@ void tscKillSTableQuery(SSqlObj *pSql) { const int64_t MAX_WAITING_TIME = 10000; // 10 Sec. int64_t stime = taosGetTimestampMs(); - while (pSql->cmd.command != TSDB_SQL_RETRIEVE_METRIC && pSql->cmd.command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) { + while (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) { taosMsleep(100); if (taosGetTimestampMs() - stime > MAX_WAITING_TIME) { break; @@ -1461,7 +1459,7 @@ int tscProcessTagRetrieveRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder(pSql, numOfRes); } -int tscProcessRetrieveMetricRsp(SSqlObj *pSql) { +int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; @@ -2257,7 +2255,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { strcpy(pObj->sversion, pConnect->serverVersion); pObj->writeAuth = pConnect->writeAuth; pObj->superAuth = pConnect->superAuth; - taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); +// taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); return 0; } @@ -2637,7 +2635,7 @@ void tscInitMsgsFp() { tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp; - tscProcessMsgRsp[TSDB_SQL_RETRIEVE_METRIC] = tscProcessRetrieveMetricRsp; + tscProcessMsgRsp[TSDB_SQL_RETRIEVE_LOCALMERGE] = tscProcessRetrieveLocalMergeRsp; tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp; tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index f73052ec38645a250341f4fac10f73cb0ab91fd3..45b1cd8c1092be055f2b48500781ab90ba4d6e42 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -414,7 +414,7 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) { } // secondary merge has handle this situation - if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC) { + if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE) { pRes->numOfTotalInCurrentClause += pRes->numOfRows; } @@ -476,7 +476,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { // current data are exhausted, fetch more data if (pRes->row >= pRes->numOfRows && pRes->completed != true && (pCmd->command == TSDB_SQL_RETRIEVE || - pCmd->command == TSDB_SQL_RETRIEVE_METRIC || + pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE || pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE || pCmd->command == TSDB_SQL_FETCH || pCmd->command == TSDB_SQL_SHOW || diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 052a472bcfa111edfcb92d41c62a20ba0b1cab39..364c183832168834715ec9e132f26c4416ff547f 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1254,7 +1254,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { // pRes->code check only serves in launching metric sub-queries if (pRes->code == TSDB_CODE_QUERY_CANCELLED) { - pCmd->command = TSDB_SQL_RETRIEVE_METRIC; // enable the abort of kill super table function. + pCmd->command = TSDB_SQL_RETRIEVE_LOCALMERGE; // enable the abort of kill super table function. return pRes->code; } @@ -1564,7 +1564,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p tscFreeSubSqlObj(trsupport, pSql); // set the command flag must be after the semaphore been correctly set. - pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC; + pPObj->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE; if (pPObj->res.code == TSDB_CODE_SUCCESS) { (*pPObj->fp)(pPObj->param, pPObj, 0); } else { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index d1cf21e863a13e448076083b41142bfe2573391d..d59e24bcc38bf1062ddb48597b1bf0c0078ac154 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -352,7 +352,7 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) { STscObj* pObj = pSql->pTscObj; int32_t cmd = pCmd->command; - if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_METRIC || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT || + if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_LOCALMERGE || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT || cmd == TSDB_SQL_METRIC_JOIN_RETRIEVE) { tscRemoveFromSqlList(pSql); } @@ -1819,6 +1819,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void // todo handle the agg arithmetic expression for(int32_t f = 0; f < pNewQueryInfo->fieldsInfo.numOfOutput; ++f) { TAOS_FIELD* field = tscFieldInfoGetField(&pNewQueryInfo->fieldsInfo, f); + numOfExprs = tscSqlExprNumOfExprs(pNewQueryInfo); + for(int32_t k1 = 0; k1 < numOfExprs; ++k1) { SSqlExpr* pExpr1 = tscSqlExprGet(pNewQueryInfo, k1); @@ -1875,24 +1877,54 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void return pNew; } +/** + * To decide if current is a two-stage super table query, join query, or insert. And invoke different + * procedure accordingly + * @param pSql + */ void tscDoQuery(SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; + SSqlRes* pRes = &pSql->res; - pSql->res.code = TSDB_CODE_SUCCESS; + pRes->code = TSDB_CODE_SUCCESS; if (pCmd->command > TSDB_SQL_LOCAL) { tscProcessLocalCmd(pSql); + return; + } + + if (pCmd->command == TSDB_SQL_SELECT) { + tscAddIntoSqlList(pSql); + } + + if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) { + tscProcessMultiVnodesInsertFromFile(pSql); } else { - if (pCmd->command == TSDB_SQL_SELECT) { - tscAddIntoSqlList(pSql); + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + uint16_t type = pQueryInfo->type; + + if (pSql->fp == (void(*)())tscHandleMultivnodeInsert) { // multi-vnodes insertion + tscHandleMultivnodeInsert(pSql); + return; } - - if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) { - tscProcessMultiVnodesInsertFromFile(pSql); - } else { - // pSql may be released in this function if it is a async insertion. - tscProcessSql(pSql); + + if (QUERY_IS_JOIN_QUERY(type)) { + if ((pQueryInfo->type & TSDB_QUERY_TYPE_SUBQUERY) == 0) { + tscHandleMasterJoinQuery(pSql); + return; + } else { + // for first stage sub query, iterate all vnodes to get all timestamp + if ((pQueryInfo->type & TSDB_QUERY_TYPE_JOIN_SEC_STAGE) != TSDB_QUERY_TYPE_JOIN_SEC_STAGE) { +// doProcessSql(pSql); + assert(0); + } + } + } else if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query + tscHandleMasterSTableQuery(pSql); + return; } + + tscProcessSql(pSql); } } diff --git a/src/query/inc/qsqltype.h b/src/query/inc/qsqltype.h index 6933f9936e208ab4ea259c49926f428b18dd7db1..08d30be925796654ec127de997e6a236ea39f6fc 100644 --- a/src/query/inc/qsqltype.h +++ b/src/query/inc/qsqltype.h @@ -60,16 +60,16 @@ enum _sql_type { TSDB_SQL_LOCAL, // SQL below for client local TSDB_SQL_DESCRIBE_TABLE, - TSDB_SQL_RETRIEVE_METRIC, + TSDB_SQL_RETRIEVE_LOCALMERGE, TSDB_SQL_METRIC_JOIN_RETRIEVE, /* * build empty result instead of accessing dnode to fetch result * reset the client cache */ - TSDB_SQL_RETRIEVE_EMPTY_RESULT, // 40 + TSDB_SQL_RETRIEVE_EMPTY_RESULT, - TSDB_SQL_RESET_CACHE, + TSDB_SQL_RESET_CACHE, // 40 TSDB_SQL_SERV_STATUS, TSDB_SQL_CURRENT_DB, TSDB_SQL_SERV_VERSION, @@ -77,7 +77,7 @@ enum _sql_type { TSDB_SQL_CURRENT_USER, TSDB_SQL_CFG_LOCAL, - TSDB_SQL_MAX // 48 + TSDB_SQL_MAX // 47 };