From bfcdbf76d63ae1b63ed0b50f0bdc4f8547ae0274 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 28 May 2020 14:43:03 +0800 Subject: [PATCH] [td-225] fix bugs in interp query --- src/client/inc/tsclient.h | 3 +- src/client/src/tscServer.c | 15 +------ src/client/src/tscSql.c | 49 +++++++++++++--------- src/client/src/tscUtil.c | 7 ++-- src/query/src/qExecutor.c | 86 +++++++++++++++++++------------------- 5 files changed, 81 insertions(+), 79 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 08536a505d..6758438625 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -318,9 +318,10 @@ typedef struct SSqlObj { char freed : 4; char listed : 4; tsem_t rspSem; + pthread_mutex_t inUse; // make sure that one connection can only be utilized by one thread/process SSqlCmd cmd; SSqlRes res; - uint8_t numOfSubs; + uint16_t numOfSubs; struct SSqlObj **pSubs; struct SSqlObj * prev, *next; } SSqlObj; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index a1c50b1518..a672bec080 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -653,7 +653,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->order = htons(pQueryInfo->order.order); pQueryMsg->orderColId = htons(pQueryInfo->order.orderColId); - pQueryMsg->fillType = htons(pQueryInfo->fillType); + pQueryMsg->fillType = htons(pQueryInfo->fillType); pQueryMsg->limit = htobe64(pQueryInfo->limit.limit); pQueryMsg->offset = htobe64(pQueryInfo->limit.offset); pQueryMsg->numOfCols = htons(taosArrayGetSize(pQueryInfo->colList)); @@ -1845,17 +1845,6 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { size_t size = 0; STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg, &size); - -#if 0 - // if current table is created according to super table, get the table meta of super table - if (pTableMeta->tableType == TSDB_CHILD_TABLE) { - char id[TSDB_TABLE_ID_LEN + 1] = {0}; - strncpy(id, pMetaMsg->stableId, TSDB_TABLE_ID_LEN); - - // NOTE: if the table meta of super table is not cached at client side yet, the pSTable is NULL - pTableMeta->pSTable = taosCacheAcquireByName(tscCacheHandle, id); - } -#endif // todo add one more function: taosAddDataIfNotExists(); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); @@ -1978,7 +1967,7 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) { pSql->res.code = TSDB_CODE_SUCCESS; pSql->res.numOfTotal = i; - tscTrace("%p load multi-metermeta resp complete num:%d", pSql, pSql->res.numOfTotal); + tscTrace("%p load multi-metermeta resp from complete num:%d", pSql, pSql->res.numOfTotal); #endif return TSDB_CODE_SUCCESS; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 4650951078..c60d80a93f 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -123,6 +123,13 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con tsem_init(&pSql->rspSem, 0, 0); + pthread_mutexattr_t mutexattr; + memset(&mutexattr, 0, sizeof(pthread_mutexattr_t)); + + pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE_NP); + pthread_mutex_init(&pSql->inUse, &mutexattr); + pthread_mutexattr_destroy(&mutexattr); + pObj->pSql = pSql; pObj->pDnodeConn = pDnodeConn; @@ -284,12 +291,23 @@ int taos_query(TAOS *taos, const char *sqlstr) { } SSqlObj* pSql = pObj->pSql; + SSqlCmd* pCmd = &pSql->cmd; + + // now this TAOS_CONN object is in use by one thread + pthread_mutex_lock(&pSql->inUse); size_t sqlLen = strlen(sqlstr); doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen); // wait for the callback function to post the semaphore - sem_wait(&pSql->rspSem); + tsem_wait(&pSql->rspSem); + + if (pCmd->command != TSDB_SQL_SELECT && + pCmd->command != TSDB_SQL_SHOW && + pCmd->command != TSDB_SQL_DESCRIBE_TABLE) { + pthread_mutex_unlock(&pSql->inUse); + } + return pSql->res.code; } @@ -525,7 +543,7 @@ int taos_select_db(TAOS *taos, const char *db) { return taos_query(taos, sql); } -void taos_free_result_imp(TAOS_RES *res, int keepCmd) { +void taos_free_result(TAOS_RES *res) { if (res == NULL) return; SSqlObj *pSql = (SSqlObj *)res; @@ -536,26 +554,24 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) { if (pSql->signature != pSql) return; + STscObj* pObj = pSql->pTscObj; if (pRes == NULL || pRes->qhandle == 0) { /* Query rsp is not received from vnode, so the qhandle is NULL */ tscTrace("%p qhandle is null, abort free, fp:%p", pSql, pSql->fp); - STscObj* pTscObj = pSql->pTscObj; - if (pTscObj->pSql != pSql) { + // The semaphore can not be changed while freeing async sub query objects. + if (pObj->pSql != pSql) { tscTrace("%p SqlObj is freed by app", pSql); tscFreeSqlObj(pSql); } else { - if (keepCmd) { - tscFreeSqlResult(pSql); - } else { - tscPartiallyFreeSqlObj(pSql); - } + tscPartiallyFreeSqlObj(pSql); + pthread_mutex_unlock(&pSql->inUse); // now this TAOS_CONN can be used by other threads } - + return; } - // set freeFlag to 1 in retrieve message if there are un-retrieved results + // set freeFlag to 1 in retrieve message if there are un-retrieved results data in node SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); if (pQueryInfo == NULL) { tscPartiallyFreeSqlObj(pSql); @@ -600,19 +616,12 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) { tscFreeSqlObj(pSql); tscTrace("%p sql result is freed by app", pSql); } else { - if (keepCmd) { - tscFreeSqlResult(pSql); - tscTrace("%p sql result is freed while sql command is kept", pSql); - } else { - tscPartiallyFreeSqlObj(pSql); - tscTrace("%p sql result is freed by app", pSql); - } + tscPartiallyFreeSqlObj(pSql); + tscTrace("%p sql result is freed by app", pSql); } } } -void taos_free_result(TAOS_RES *res) { taos_free_result_imp(res, 0); } - // todo should not be used in async query int taos_errno(TAOS *taos) { STscObj *pObj = (STscObj *)taos; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 97ce0fbe23..44cfa1de96 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1767,11 +1767,12 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void pNewQueryInfo->limit = pQueryInfo->limit; pNewQueryInfo->slimit = pQueryInfo->slimit; pNewQueryInfo->order = pQueryInfo->order; - pNewQueryInfo->clauseLimit = pQueryInfo->clauseLimit; - pNewQueryInfo->pTableMetaInfo = NULL; + pNewQueryInfo->tsBuf = NULL; + pNewQueryInfo->fillType = pQueryInfo->fillType; pNewQueryInfo->fillVal = NULL; + pNewQueryInfo->clauseLimit = pQueryInfo->clauseLimit; pNewQueryInfo->numOfTables = 0; - pNewQueryInfo->tsBuf = NULL; + pNewQueryInfo->pTableMetaInfo = NULL; pNewQueryInfo->groupbyExpr = pQueryInfo->groupbyExpr; if (pQueryInfo->groupbyExpr.columnInfo != NULL) { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 97f3a82964..30eb291f42 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -4525,57 +4525,59 @@ static void sequentialTableProcess(SQInfo *pQInfo) { size_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList); - if (isPointInterpoQuery(pQuery)) { + if (isPointInterpoQuery(pQuery) || isFirstLastRowQuery(pQuery)) { resetCtxOutputBuf(pRuntimeEnv); assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0); while (pQInfo->groupIndex < numOfGroups) { SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->groupIndex); + qTrace("QInfo:%p last_row query on group:%d, total group:%d, current group:%d", pQInfo, pQInfo->groupIndex, + numOfGroups); + + STsdbQueryCond cond = { + .twindow = pQuery->window, + .colList = pQuery->colList, + .order = pQuery->order.order, + .numOfCols = pQuery->numOfCols, + }; + + SArray *g1 = taosArrayInit(1, POINTER_BYTES); + SArray *tx = taosArrayClone(group); + taosArrayPush(g1, &tx); + + STableGroupInfo gp = {.numOfTables = taosArrayGetSize(tx), .pGroupList = g1}; + + // include only current table + if (pRuntimeEnv->pQueryHandle != NULL) { + tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); + pRuntimeEnv->pQueryHandle = NULL; + } + if (isFirstLastRowQuery(pQuery)) { - qTrace("QInfo:%p last_row query on group:%d, total group:%d, current group:%d", pQInfo, pQInfo->groupIndex, - numOfGroups); - - STsdbQueryCond cond = { - .twindow = pQuery->window, - .colList = pQuery->colList, - .order = pQuery->order.order, - .numOfCols = pQuery->numOfCols, - }; - - SArray *g1 = taosArrayInit(1, POINTER_BYTES); - SArray *tx = taosArrayClone(group); - taosArrayPush(g1, &tx); - - STableGroupInfo gp = {.numOfTables = taosArrayGetSize(tx), .pGroupList = g1}; - - // include only current table - if (pRuntimeEnv->pQueryHandle != NULL) { - tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); - pRuntimeEnv->pQueryHandle = NULL; - } - pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(pQInfo->tsdb, &cond, &gp); - - initCtxOutputBuf(pRuntimeEnv); - setTagVal(pRuntimeEnv, (STableId*) taosArrayGet(tx, 0), pQInfo->tsdb); - - // here we simply set the first table as current table - pQuery->current = ((SGroupItem*) taosArrayGet(group, 0))->info; - scanAllDataBlocks(pRuntimeEnv, pQuery->current->lastKey); - - int64_t numOfRes = getNumOfResult(pRuntimeEnv); - if (numOfRes > 0) { - pQuery->rec.rows += numOfRes; - forwardCtxOutputBuf(pRuntimeEnv, numOfRes); - } - - skipResults(pRuntimeEnv); - pQInfo->groupIndex += 1; - - // enable execution for next table, when handling the projection query - enableExecutionForNextTable(pRuntimeEnv); + } else { + pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(pQInfo->tsdb, &cond, &gp); + } + + initCtxOutputBuf(pRuntimeEnv); + setTagVal(pRuntimeEnv, (STableId*) taosArrayGet(tx, 0), pQInfo->tsdb); + + // here we simply set the first table as current table + pQuery->current = ((SGroupItem*) taosArrayGet(group, 0))->info; + scanAllDataBlocks(pRuntimeEnv, pQuery->current->lastKey); + + int64_t numOfRes = getNumOfResult(pRuntimeEnv); + if (numOfRes > 0) { + pQuery->rec.rows += numOfRes; + forwardCtxOutputBuf(pRuntimeEnv, numOfRes); } + + skipResults(pRuntimeEnv); + pQInfo->groupIndex += 1; + + // enable execution for next table, when handling the projection query + enableExecutionForNextTable(pRuntimeEnv); } } else { /* -- GitLab