From 086a3a4a79cb1b17e62891e56abf61a5b90c6936 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 6 Jun 2020 15:47:28 +0800 Subject: [PATCH] [td-428] --- src/client/src/tscPrepare.c | 1 - src/client/src/tscSQLParser.c | 19 ++++--- src/client/src/tscServer.c | 2 +- src/client/src/tscSql.c | 42 -------------- src/client/src/tscUtil.c | 14 +++-- src/kit/shell/src/shellEngine.c | 6 +- src/kit/shell/src/shellMain.c | 11 ++-- src/query/src/qExecutor.c | 98 +++++++++++++++++++++------------ 8 files changed, 94 insertions(+), 99 deletions(-) diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index 519d7da4fb..2093e8f741 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -24,7 +24,6 @@ #include "tscSubquery.h" int tsParseInsertSql(SSqlObj *pSql); -int taos_query_imp(STscObj* pObj, SSqlObj* pSql); //////////////////////////////////////////////////////////////////////////////// // functions for normal statement preparation diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 32ec0a0db1..d6136a2ad1 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -41,7 +41,7 @@ #define COLUMN_INDEX_VALIDE(index) (((index).tableIndex >= 0) && ((index).columnIndex >= TSDB_TBNAME_COLUMN_INDEX)) #define TBNAME_LIST_SEP "," -typedef struct SColumnList { +typedef struct SColumnList { // todo refactor int32_t num; SColumnIndex ids[TSDB_MAX_COLUMNS]; } SColumnList; @@ -1517,12 +1517,14 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); // count tag is equalled to count(tbname) - if (index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) { + bool isTag = false; + if (index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta) || index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { index.columnIndex = TSDB_TBNAME_COLUMN_INDEX; + isTag = true; } int32_t size = tDataTypeDesc[TSDB_DATA_TYPE_BIGINT].nSize; - pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, size, false); + pExpr = tscSqlExprAppend(pQueryInfo, functionID, &index, TSDB_DATA_TYPE_BIGINT, size, size, isTag); } } else { // count(*) is equalled to count(primary_timestamp_key) index = (SColumnIndex){0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; @@ -1543,10 +1545,13 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr tscColumnListInsert(pQueryInfo->colList, &(ids.ids[i])); } } - - SColumnIndex tsCol = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; - tscColumnListInsert(pQueryInfo->colList, &tsCol); - + + // the time stamp may be always needed + if (index.tableIndex > 0 && index.tableIndex < tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) { + SColumnIndex tsCol = {.tableIndex = index.tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; + tscColumnListInsert(pQueryInfo->colList, &tsCol); + } + return TSDB_CODE_SUCCESS; } case TK_SUM: diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index d2e6e0ac3f..39b9350284 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -605,7 +605,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) { tscError("%p failed to malloc for query msg", pSql); - return -1; + return -1; // todo add test for this } SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index e5832bacfc..dd0b0627ac 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -213,48 +213,6 @@ void taos_close(TAOS *taos) { } } -int taos_query_imp(STscObj *pObj, SSqlObj *pSql) { - SSqlRes *pRes = &pSql->res; - SSqlCmd *pCmd = &pSql->cmd; - - pRes->numOfRows = 1; - pRes->numOfTotal = 0; - pRes->numOfClauseTotal = 0; - - pCmd->curSql = NULL; - if (NULL != pCmd->pTableList) { - taosHashCleanup(pCmd->pTableList); - pCmd->pTableList = NULL; - } - - tscDump("%p pObj:%p, SQL: %s", pSql, pObj, pSql->sqlstr); - - pRes->code = (uint8_t)tsParseSql(pSql, false); - - /* - * set the qhandle to 0 before return in order to erase the qhandle value assigned in the previous successful query. - * If qhandle is NOT set 0, the function of taos_free_result() will send message to server by calling tscProcessSql() - * to free connection, which may cause segment fault, when the parse phrase is not even successfully executed. - */ - pRes->qhandle = 0; - - if (pRes->code == TSDB_CODE_SUCCESS) { - tscDoQuery(pSql); - } - - if (pRes->code == TSDB_CODE_SUCCESS) { - tscTrace("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(pObj), pObj); - } else { - tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(pObj), pObj); - } - - if (pRes->code != TSDB_CODE_SUCCESS) { - tscPartiallyFreeSqlObj(pSql); - } - - return pRes->code; -} - void waitForQueryRsp(void *param, TAOS_RES *tres, int code) { assert(tres != NULL); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 811d0920f9..d494700715 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -79,8 +79,14 @@ bool tscQueryOnSTable(SSqlCmd* pCmd) { bool tscQueryTags(SQueryInfo* pQueryInfo) { for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { - int32_t functId = tscSqlExprGet(pQueryInfo, i)->functionId; - + SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); + int32_t functId = pExpr->functionId; + + // "select count(tbname)" query + if (functId == TSDB_FUNC_COUNT && pExpr->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { + continue; + } + if (functId != TSDB_FUNC_TAGPRJ && functId != TSDB_FUNC_TID_TAG) { return false; } @@ -208,13 +214,14 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) { return true; } +// not order by timestamp projection query on super table bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) { if (!tscIsProjectionQueryOnSTable(pQueryInfo, tableIndex)) { return false; } // order by columnIndex exists, not a non-ordered projection query - return pQueryInfo->order.orderColId < 0; + return pQueryInfo->order.orderColId < 0 && pQueryInfo->order.orderColId != TSDB_TBNAME_COLUMN_INDEX; } bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) { @@ -984,7 +991,6 @@ static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SCol pExpr->uid = pTableMetaInfo->pTableMeta->uid; } - return pExpr; } diff --git a/src/kit/shell/src/shellEngine.c b/src/kit/shell/src/shellEngine.c index a01decc6c6..6dfe424fbc 100644 --- a/src/kit/shell/src/shellEngine.c +++ b/src/kit/shell/src/shellEngine.c @@ -858,9 +858,9 @@ void shellGetGrantInfo(void *con) { char sql[] = "show grants"; - TAOS_RES* pSql = taos_query(con, sql); - int code = taos_errno(pSql); - + result = taos_query(con, sql); + + int code = taos_errno(result); if (code != TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_OPS_NOT_SUPPORT) { fprintf(stdout, "Server is Community Edition, version is %s\n\n", taos_get_server_info(con)); diff --git a/src/kit/shell/src/shellMain.c b/src/kit/shell/src/shellMain.c index 1fbe04208c..92474bdd03 100644 --- a/src/kit/shell/src/shellMain.c +++ b/src/kit/shell/src/shellMain.c @@ -16,21 +16,19 @@ #include "os.h" #include "shell.h" #include "tsclient.h" -#include "tutil.h" -TAOS_RES* con; pthread_t pid; // TODO: IMPLEMENT INTERRUPT HANDLER. void interruptHandler(int signum) { #ifdef LINUX - taos_stop_query(con); - if (con != NULL) { + taos_stop_query(result); + if (result != NULL) { /* * we need to free result in async model, in order to avoid free * results while the master thread is waiting for server response. */ - tscQueueAsyncFreeResult(con); + tscQueueAsyncFreeResult(result); } result = NULL; @@ -88,7 +86,7 @@ int main(int argc, char* argv[]) { shellParseArgument(argc, argv, &args); /* Initialize the shell */ - con = shellInit(&args); + TAOS* con = shellInit(&args); if (con == NULL) { exit(EXIT_FAILURE); } @@ -109,5 +107,4 @@ int main(int argc, char* argv[]) { pthread_create(&pid, NULL, shellLoopQuery, con); pthread_join(pid, NULL); } - return 0; } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index fd718f4ac0..ce5286af58 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1590,8 +1590,11 @@ static bool needReverseScan(SQuery *pQuery) { static bool onlyQueryTags(SQuery* pQuery) { for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { - int32_t functionId = pQuery->pSelectExpr[i].base.functionId; - if (functionId != TSDB_FUNC_TAGPRJ && functionId != TSDB_FUNC_TID_TAG) { + SExprInfo* pExprInfo = &pQuery->pSelectExpr[i]; + + int32_t functionId = pExprInfo->base.functionId; + if (functionId != TSDB_FUNC_TAGPRJ && functionId != TSDB_FUNC_TID_TAG && + (!(functionId == TSDB_FUNC_COUNT && pExprInfo->base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX))) { return false; } } @@ -4885,6 +4888,10 @@ static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pE int32_t j = 0; if (TSDB_COL_IS_TAG(pExprMsg->colInfo.flag)) { + if (pExprMsg->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { + return -1; + } + while(j < pQueryMsg->numOfTags) { if (pExprMsg->colInfo.colId == pTagCols[j].colId) { return j; @@ -4942,8 +4949,11 @@ static bool validateQuerySourceCols(SQueryTableMsg *pQueryMsg, SSqlFuncMsg** pEx return false; } else if (numOfTotal == 0) { for(int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) { - if ((pExprMsg[i]->functionId == TSDB_FUNC_TAGPRJ) || - (pExprMsg[i]->functionId == TSDB_FUNC_TID_TAG && pExprMsg[i]->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX)) { + SSqlFuncMsg* pFuncMsg = pExprMsg[i]; + + if ((pFuncMsg->functionId == TSDB_FUNC_TAGPRJ) || + (pFuncMsg->functionId == TSDB_FUNC_TID_TAG && pFuncMsg->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) || + (pFuncMsg->functionId == TSDB_FUNC_COUNT && pFuncMsg->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX)) { continue; } @@ -5079,8 +5089,8 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, } } - if (pExprMsg->functionId == TSDB_FUNC_TAG || pExprMsg->functionId == TSDB_FUNC_TAGPRJ || - pExprMsg->functionId == TSDB_FUNC_TAG_DUMMY) { + int16_t functionId = pExprMsg->functionId; + if (functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_TAG_DUMMY) { if (pExprMsg->colInfo.flag != TSDB_COL_TAG) { // ignore the column index check for arithmetic expression. return TSDB_CODE_INVALID_QUERY_MSG; } @@ -5192,12 +5202,12 @@ static int32_t buildAirthmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTable return TSDB_CODE_SUCCESS; } -static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg, +static int32_t createQFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg, SColumnInfo* pTagCols) { *pExprInfo = NULL; int32_t code = TSDB_CODE_SUCCESS; - SExprInfo *pExprs = (SExprInfo *)calloc(1, sizeof(SExprInfo) * pQueryMsg->numOfOutput); + SExprInfo *pExprs = (SExprInfo *)calloc(pQueryMsg->numOfOutput, sizeof(SExprInfo)); if (pExprs == NULL) { return TSDB_CODE_SERV_OUT_OF_MEMORY; } @@ -5223,16 +5233,22 @@ static int32_t createSqlFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo type = TSDB_DATA_TYPE_DOUBLE; bytes = tDataTypeDesc[type].nSize; - } else if (pExprs[i].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { // parse the normal column + } else if (pExprs[i].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX && pExprs[i].base.functionId == TSDB_FUNC_TAGPRJ) { // parse the normal column type = TSDB_DATA_TYPE_BINARY; bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE; - } else{ + } else { int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols); - assert(j < pQueryMsg->numOfCols || j < pQueryMsg->numOfTags); + assert(j < pQueryMsg->numOfCols || j < pQueryMsg->numOfTags || j == TSDB_TBNAME_COLUMN_INDEX); + + if (pExprs[i].base.colInfo.colId != TSDB_TBNAME_COLUMN_INDEX) { + SColumnInfo* pCol = (TSDB_COL_IS_TAG(pExprs[i].base.colInfo.flag))? &pTagCols[j]:&pQueryMsg->colList[j]; + type = pCol->type; + bytes = pCol->bytes; + } else { + type = TSDB_DATA_TYPE_BINARY; + bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE; + } - SColumnInfo* pCol = (TSDB_COL_IS_TAG(pExprs[i].base.colInfo.flag))? &pTagCols[j]:&pQueryMsg->colList[j]; - type = pCol->type; - bytes = pCol->bytes; } int32_t param = pExprs[i].base.arg[0].argValue.i64; @@ -5485,7 +5501,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, } // set the output buffer capacity - pQuery->rec.capacity = 4096; + pQuery->rec.capacity = 2; pQuery->rec.threshold = 4000; for (int32_t col = 0; col < pQuery->numOfOutput; ++col) { @@ -5824,7 +5840,7 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi } SExprInfo *pExprs = NULL; - if ((code = createSqlFunctionExprFromMsg(pQueryMsg, &pExprs, pExprMsg, pTagColumnInfo)) != TSDB_CODE_SUCCESS) { + if ((code = createQFunctionExprFromMsg(pQueryMsg, &pExprs, pExprMsg, pTagColumnInfo)) != TSDB_CODE_SUCCESS) { goto _over; } @@ -5926,6 +5942,7 @@ void qTableQuery(qinfo_t qinfo) { qTrace("QInfo:%p query task is launched", pQInfo); if (onlyQueryTags(pQInfo->runtimeEnv.pQuery)) { + assert(pQInfo->runtimeEnv.pQueryHandle == NULL); buildTagQueryResult(pQInfo); // todo support the limit/offset } else if (pQInfo->runtimeEnv.stableQuery) { stableQueryImpl(pQInfo); @@ -6028,24 +6045,29 @@ static void buildTagQueryResult(SQInfo* pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; - size_t num = taosArrayGetSize(pQInfo->groupInfo.pGroupList); - assert(num == 0 || num == 1); - if (num == 0) { + size_t numOfGroup = taosArrayGetSize(pQInfo->groupInfo.pGroupList); + assert(numOfGroup == 0 || numOfGroup == 1); + + if (numOfGroup == 0) { return; } SArray* pa = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0); - num = taosArrayGetSize(pa); - + + size_t num = taosArrayGetSize(pa); assert(num == pQInfo->groupInfo.numOfTables); + + int32_t count = 0; int32_t functionId = pQuery->pSelectExpr[0].base.functionId; if (functionId == TSDB_FUNC_TID_TAG) { // return the tags & table Id assert(pQuery->numOfOutput == 1); SExprInfo* pExprInfo = &pQuery->pSelectExpr[0]; int32_t rsize = pExprInfo->bytes; - - for(int32_t i = 0; i < num; ++i) { + count = 0; + + while(pQInfo->tableIndex < num && count < pQuery->rec.capacity) { + int32_t i = pQInfo->tableIndex++; SGroupItem *item = taosArrayGet(pa, i); char *output = pQuery->sdata[0]->data + i * rsize; @@ -6085,30 +6107,38 @@ static void buildTagQueryResult(SQInfo* pQInfo) { } } } + + count += 1; } - pQInfo->tableIndex = pQInfo->groupInfo.numOfTables; - qTrace("QInfo:%p create (tableId, tag) info completed, rows:%d", pQInfo, num); + qTrace("QInfo:%p create (tableId, tag) info completed, rows:%d", pQInfo, count); + } else if (functionId == TSDB_FUNC_COUNT) {// handle the "count(tbname)" query + *(int64_t*) pQuery->sdata[0]->data = num; + + count = 1; + pQInfo->tableIndex = num; //set query completed + qTrace("QInfo:%p create count(tbname) query, res:%d rows:1", pQInfo, count); } else { // return only the tags|table name etc. - for(int32_t i = 0; i < num; ++i) { + count = 0; + while(pQInfo->tableIndex < num && count < pQuery->rec.capacity) { + int32_t i = pQInfo->tableIndex++; + SExprInfo* pExprInfo = pQuery->pSelectExpr; SGroupItem* item = taosArrayGet(pa, i); for(int32_t j = 0; j < pQuery->numOfOutput; ++j) { - // todo check the return value, refactor codes if (pExprInfo[j].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { char* data = tsdbGetTableName(pQInfo->tsdb, &item->id); - - char* dst = pQuery->sdata[j]->data + i * (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE); + char* dst = pQuery->sdata[j]->data + count * (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE); memcpy(dst, data, varDataTLen(data)); } else {// todo refactor int16_t type = pExprInfo[j].type; int16_t bytes = pExprInfo[j].bytes; char* data = tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo[j].base.colInfo.colId, type, bytes); + char* dst = pQuery->sdata[j]->data + count * pExprInfo[j].bytes; - char* dst = pQuery->sdata[j]->data + i * pExprInfo[j].bytes; if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { if (data == NULL) { setVardataNull(dst, type); @@ -6124,13 +6154,13 @@ static void buildTagQueryResult(SQInfo* pQInfo) { } } } + count += 1; } - - pQInfo->tableIndex = pQInfo->groupInfo.numOfTables; - qTrace("QInfo:%p create tag values results completed, rows:%d", pQInfo, num); + + qTrace("QInfo:%p create tag values results completed, rows:%d", pQInfo, count); } - pQuery->rec.rows = num; + pQuery->rec.rows = count; setQueryStatus(pQuery, QUERY_COMPLETED); } -- GitLab