From 0944c0cd84134b9513832047d1c05ea6a5c55209 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 1 Jun 2020 23:30:10 +0800 Subject: [PATCH] [td-314] [td-437] refactor C API. --- src/client/inc/tsclient.h | 3 +- src/client/src/TSDBJNIConnector.c | 30 ++--- src/client/src/tscServer.c | 10 +- src/client/src/tscSql.c | 190 ++++++++++++------------------ src/client/src/tscStream.c | 20 ++-- src/client/src/tscSub.c | 12 +- src/client/src/tscUtil.c | 47 ++++---- src/inc/taos.h | 11 +- src/kit/shell/src/shellEngine.c | 82 ++++++------- src/kit/shell/src/shellImport.c | 9 +- src/kit/shell/src/shellMain.c | 11 +- tests/tsim/src/simExe.c | 28 +++-- 12 files changed, 210 insertions(+), 243 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index f1b620176d..d40f8782f4 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -293,7 +293,6 @@ typedef struct STscObj { char sversion[TSDB_VERSION_LEN]; char writeAuth : 1; char superAuth : 1; - struct SSqlObj * pSql; struct SSqlObj * pHb; struct SSqlObj * sqlList; struct SSqlStream *streamList; @@ -408,7 +407,7 @@ void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, void (*fp)(), void *param, const void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql); void tscKillSTableQuery(SSqlObj *pSql); void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen); -bool tscIsUpdateQuery(STscObj *pObj); +bool tscIsUpdateQuery(SSqlObj* pSql); bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes); char *tscGetErrorMsgPayload(SSqlCmd *pCmd); diff --git a/src/client/src/TSDBJNIConnector.c b/src/client/src/TSDBJNIConnector.c index 6ab1b73d1e..69e045eca4 100644 --- a/src/client/src/TSDBJNIConnector.c +++ b/src/client/src/TSDBJNIConnector.c @@ -268,18 +268,18 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeQueryImp(J } jniTrace("jobj:%p, conn:%p, sql:%s", jobj, tscon, dst); - - int code = taos_query(tscon, dst); - if (code != 0) { - jniError("jobj:%p, conn:%p, code:%s, msg:%s", jobj, tscon, tstrerror(code), taos_errstr(tscon)); + + SSqlObj *pSql = taos_query(tscon, dst); + if (pSql == NULL || pSql->res.code != TSDB_CODE_SUCCESS) { + jniError("jobj:%p, conn:%p, code:%s, msg:%s", jobj, tscon, tstrerror(pSql->res.code), taos_errstr(tscon)); free(dst); return JNI_TDENGINE_ERROR; } else { - int32_t affectRows = 0; - SSqlObj *pSql = ((STscObj *)tscon)->pSql; - + int32_t affectRows = 0; + int32_t code = pSql->res.code; + if (pSql->cmd.command == TSDB_SQL_INSERT) { - affectRows = taos_affected_rows(tscon); + affectRows = taos_affected_rows(pSql); jniTrace("jobj:%p, conn:%p, code:%s, affect rows:%d", jobj, tscon, tstrerror(code), affectRows); } else { jniTrace("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code)); @@ -306,20 +306,20 @@ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrMsgImp(J } JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getResultSetImp(JNIEnv *env, jobject jobj, jlong con) { - TAOS *tscon = (TAOS *)con; - if (tscon == NULL) { + SSqlObj *pSql = (TAOS_RES *) con; + if (pSql == NULL) { jniError("jobj:%p, connection is closed", jobj); return JNI_CONNECTION_NULL; } jlong ret = 0; - - if (tscIsUpdateQuery(tscon)) { + STscObj* pObj = pSql->pTscObj; + + if (tscIsUpdateQuery(pSql)) { ret = 0; // for update query, no result pointer - jniTrace("jobj:%p, conn:%p, no resultset", jobj, tscon); + jniTrace("jobj:%p, conn:%p, no resultset", jobj, pObj); } else { - ret = (jlong) taos_use_result(tscon); - jniTrace("jobj:%p, conn:%p, get resultset:%p", jobj, tscon, (void *) ret); + jniTrace("jobj:%p, conn:%p, get resultset:%p", jobj, pObj, (void *) ret); } return ret; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 8ca590a1f6..5b7009f958 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -220,9 +220,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { if (pObj->signature != pObj) { tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed, pObj, pObj->signature); - if (pSql != pObj->pSql) { - tscFreeSqlObj(pSql); - } + tscFreeSqlObj(pSql); rpcFreeCont(rpcMsg->pCont); return; } @@ -257,6 +255,9 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { rpcMsg->code = TSDB_CODE_NOT_READY; rpcFreeCont(rpcMsg->pCont); return; + } else if (pCmd->command == TSDB_SQL_META) { +// rpcFreeCont(rpcMsg->pCont); +// return; } else { tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry); @@ -331,7 +332,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql); if (rpcMsg->code != TSDB_CODE_ACTION_IN_PROGRESS) { - void *taosres = tscKeepConn[pCmd->command] ? pSql : NULL; rpcMsg->code = pRes->code ? pRes->code : pRes->numOfRows; tscTrace("%p SQL result:%s res:%p", pSql, tstrerror(pRes->code), pSql); @@ -345,7 +345,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { * the tscShouldBeFreed will success and tscFreeSqlObj free it immediately. */ bool shouldFree = tscShouldBeFreed(pSql); - (*pSql->fp)(pSql->param, taosres, rpcMsg->code); + (*pSql->fp)(pSql->param, pSql, rpcMsg->code); if (shouldFree) { tscTrace("%p sqlObj is automatically freed", pSql); diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index a4cbd7f7ec..e491678e58 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -52,7 +52,7 @@ static bool validPassword(const char* passwd) { return validImpl(passwd, TSDB_PASSWORD_LEN); } -STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, const char *db, uint16_t port, +SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, const char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int), void *param, void **taos) { taos_init(); @@ -120,10 +120,8 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con pSql->pTscObj = pObj; pSql->signature = pSql; pSql->maxRetry = TSDB_MAX_REPLICA_NUM; - tsem_init(&pSql->rspSem, 0, 0); - pObj->pSql = pSql; pObj->pDnodeConn = pDnodeConn; pSql->fp = fp; @@ -143,42 +141,38 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con // tsRpcHeaderSize will be updated during RPC initialization, so only after it initialization, this value is valid tsInsertHeadSize = tsRpcHeadSize + sizeof(SMsgDesc) + sizeof(SSubmitMsg); - return pObj; + return pSql; } static void syncConnCallback(void *param, TAOS_RES *tres, int code) { - STscObj *pObj = (STscObj *)param; - assert(pObj != NULL && pObj->pSql != NULL); - - if (code < 0) { - pObj->pSql->res.code = code; - } + SSqlObj *pSql = (SSqlObj *) tres; + assert(pSql != NULL); - sem_post(&pObj->pSql->rspSem); + sem_post(&pSql->rspSem); } TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port) { tscTrace("try to create a connection to %s:%u, user:%s db:%s", ip, port, user, db); - STscObj *pObj = taosConnectImpl(ip, user, pass, db, port, NULL, NULL, NULL); - if (pObj != NULL) { - SSqlObj* pSql = pObj->pSql; - assert(pSql != NULL); - + STscObj* pObj = NULL; + SSqlObj *pSql = taosConnectImpl(ip, user, pass, db, port, syncConnCallback, NULL, (void**) &pObj); + if (pSql != NULL) { pSql->fp = syncConnCallback; - pSql->param = pObj; + pSql->param = pSql; tscProcessSql(pSql); sem_wait(&pSql->rspSem); if (pSql->res.code != TSDB_CODE_SUCCESS) { terrno = pSql->res.code; + taos_free_result(pSql); taos_close(pObj); return NULL; } tscTrace("%p DB connection is opening, dnodeConn:%p", pObj, pObj->pDnodeConn); - + taos_free_result(pSql); + // version compare only requires the first 3 segments of the version string int code = taosCheckVersion(version, taos_get_server_info(pObj), 3); if (code != 0) { @@ -195,17 +189,14 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int), void *param, void **taos) { - STscObj* pObj = taosConnectImpl(ip, user, pass, db, port, fp, param, taos); - if (pObj == NULL) { + SSqlObj* pSql = taosConnectImpl(ip, user, pass, db, port, fp, param, taos); + if (pSql == NULL) { return NULL; } - SSqlObj* pSql = pObj->pSql; - pSql->res.code = tscProcessSql(pSql); - tscTrace("%p DB async connection is opening", pObj); - - return pObj; + tscTrace("%p DB async connection is opening", taos); + return taos; } void taos_close(TAOS *taos) { @@ -265,41 +256,32 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) { } static void waitForQueryRsp(void *param, TAOS_RES *tres, int code) { - assert(param != NULL); - SSqlObj *pSql = ((STscObj *)param)->pSql; - - // valid error code is less than 0 - if (code < 0) { - pSql->res.code = code; - } + assert(tres != NULL); + SSqlObj *pSql = (SSqlObj *) tres; sem_post(&pSql->rspSem); } -int taos_query(TAOS *taos, const char *sqlstr) { +TAOS_RES* taos_query(TAOS *taos, const char *sqlstr) { STscObj *pObj = (STscObj *)taos; if (pObj == NULL || pObj->signature != pObj) { terrno = TSDB_CODE_DISCONNECTED; - return TSDB_CODE_DISCONNECTED; + return NULL; } - SSqlObj* pSql = pObj->pSql; - size_t sqlLen = strlen(sqlstr); + SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); + if (pSql == NULL) { + tscError("failed to malloc sqlObj"); + terrno = TSDB_CODE_CLI_OUT_OF_MEMORY; + return NULL; + } + + size_t sqlLen = strlen(sqlstr); doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen); // wait for the callback function to post the semaphore tsem_wait(&pSql->rspSem); - return pSql->res.code; -} - -TAOS_RES *taos_use_result(TAOS *taos) { - STscObj *pObj = (STscObj *)taos; - if (pObj == NULL || pObj->signature != pObj) { - terrno = TSDB_CODE_DISCONNECTED; - return NULL; - } - - return pObj->pSql; + return pSql; } int taos_result_precision(TAOS_RES *res) { @@ -332,18 +314,18 @@ int taos_num_fields(TAOS_RES *res) { return num; } -int taos_field_count(TAOS *taos) { - STscObj *pObj = (STscObj *)taos; - if (pObj == NULL || pObj->signature != pObj) return 0; +int taos_field_count(TAOS_RES *tres) { + SSqlObj* pSql = (SSqlObj*) tres; + if (pSql == NULL || pSql->signature != pSql) return 0; - return taos_num_fields(pObj->pSql); + return taos_num_fields(pSql); } -int taos_affected_rows(TAOS *taos) { - STscObj *pObj = (STscObj *)taos; - if (pObj == NULL || pObj->signature != pObj) return 0; +int taos_affected_rows(TAOS_RES *tres) { + SSqlObj* pSql = (SSqlObj*) tres; + if (pSql == NULL || pSql->signature != pSql) return 0; - return (pObj->pSql->res.numOfRows); + return (pSql->res.numOfRows); } TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) { @@ -385,9 +367,8 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) { SSqlObj *pSql = (SSqlObj *)res; SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - STscObj *pObj = pSql->pTscObj; - if (pRes->qhandle == 0 || pObj->pSql != pSql) { + if (pRes->qhandle == 0 || pSql->signature != pSql) { *rows = NULL; return 0; } @@ -521,7 +502,11 @@ int taos_select_db(TAOS *taos, const char *db) { } snprintf(sql, tListLen(sql), "use %s", db); - return taos_query(taos, sql); + SSqlObj* pSql = taos_query(taos, sql); + int32_t code = pSql->res.code; + taos_free_result(pSql); + + return code; } void taos_free_result(TAOS_RES *res) { @@ -533,83 +518,62 @@ void taos_free_result(TAOS_RES *res) { tscTrace("%p start to free result", pSql); - if (pSql->signature != pSql) return; - - STscObj* pObj = pSql->pTscObj; - if (pRes == NULL || pRes->qhandle == 0) { - /* Query rsp is not received from vnode, so the qhandle is NULL */ - tscTrace("%p qhandle is null, abort free, fp:%p", pSql, pSql->fp); - - // The semaphore can not be changed while freeing async sub query objects. - if (pObj->pSql != pSql) { - tscTrace("%p SqlObj is freed by app", pSql); - tscFreeSqlObj(pSql); - } else { - tscPartiallyFreeSqlObj(pSql); - } + if (pSql->signature != pSql) { + tscTrace("%p result has been freed", pSql); + return; + } + // The semaphore can not be changed while freeing async sub query objects. + if (pRes == NULL || pRes->qhandle == 0) { + tscTrace("%p SqlObj is freed by app, phandle is null", pSql); + tscFreeSqlObj(pSql); return; } // set freeFlag to 1 in retrieve message if there are un-retrieved results data in node SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); if (pQueryInfo == NULL) { - tscPartiallyFreeSqlObj(pSql); + tscFreeSqlObj(pSql); return; } pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; - STscObj* pTscObj = pSql->pTscObj; - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); /* - * case 1. Partial data have been retrieved from vnodes, but not all data has been retrieved yet. - * We need to recycle the connection by noticing the vnode return 0 results. - * case 2. When the query response is received from vnodes and the numOfRows is set to 0, the user calls - * taos_free_result before the taos_fetch_row is called in non-stream computing, - * we need to recycle the connection. - * case 3. If the query process is cancelled by user in stable query, tscProcessSql should not be called - * for each subquery. Because the failure of execution tsProcessSql may trigger the callback function - * be executed, and the retry efforts may result in double free the resources, e.g.,SRetrieveSupport + * If the query process is cancelled by user in stable query, tscProcessSql should not be called + * for each subquery. Because the failure of execution tsProcessSql may trigger the callback function + * be executed, and the retry efforts may result in double free the resources, e.g.,SRetrieveSupport */ - if ((pCmd->command == TSDB_SQL_SELECT || - pCmd->command == TSDB_SQL_SHOW || - pCmd->command == TSDB_SQL_RETRIEVE || - pCmd->command == TSDB_SQL_FETCH) && pRes->code == TSDB_CODE_SUCCESS && pRes->completed == false && + if (pRes->code == TSDB_CODE_SUCCESS && pRes->completed == false && + (pCmd->command == TSDB_SQL_SELECT || pCmd->command == TSDB_SQL_SHOW || + pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_FETCH) && (pCmd->command == TSDB_SQL_SELECT && pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL)) { pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; - tscTrace("%p send msg to free qhandle in vnode, code:%d, numOfRows:%d, command:%s", pSql, pRes->code, pRes->numOfRows, - sqlCmd[pCmd->command]); - + tscTrace("%p send msg to free qhandle in node, code:%d, command:%s", pSql, pRes->code, sqlCmd[pCmd->command]); pSql->freed = 1; tscProcessSql(pSql); - - // waits for response and then goes on - if (pTscObj->pSql == pSql) { + + // in case of sync model query, waits for response and then goes on + if (pSql->fp == waitForQueryRsp || pSql->fp == waitForRetrieveRsp) { sem_wait(&pSql->rspSem); } - } else { // if no free resource msg is sent to vnode, we free this object immediately. - if (pTscObj->pSql != pSql) { - tscFreeSqlObj(pSql); - tscTrace("%p sql result is freed by app", pSql); - } else { - tscPartiallyFreeSqlObj(pSql); - tscTrace("%p sql result is freed by app", pSql); - } } + + tscFreeSqlObj(pSql); + tscTrace("%p sql result is freed by app", pSql); } // todo should not be used in async query -int taos_errno(TAOS *taos) { - STscObj *pObj = (STscObj *)taos; +int taos_errno(TAOS_RES *tres) { + SSqlObj *pSql = (SSqlObj *) tres; - if (pObj == NULL || pObj->signature != pObj) { + if (pSql == NULL || pSql->signature != pSql) { return terrno; } - return pObj->pSql->res.code; + return pSql->res.code; } /* @@ -632,14 +596,13 @@ static bool hasAdditionalErrorInfo(int32_t code, SSqlCmd *pCmd) { } // todo should not be used in async model -char *taos_errstr(TAOS *taos) { - STscObj *pObj = (STscObj *)taos; +char *taos_errstr(TAOS_RES *tres) { + SSqlObj *pSql = (SSqlObj *) tres; - if (pObj == NULL || pObj->signature != pObj) - return (char*)tstrerror(terrno); + if (pSql == NULL || pSql->signature != pSql) { + return (char*) tstrerror(terrno); + } - SSqlObj* pSql = pObj->pSql; - if (hasAdditionalErrorInfo(pSql->res.code, &pSql->cmd)) { return pSql->cmd.payload; } else { @@ -769,7 +732,8 @@ int taos_validate_sql(TAOS *taos, const char *sql) { return TSDB_CODE_DISCONNECTED; } - SSqlObj *pSql = pObj->pSql; + SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); + SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; @@ -902,7 +866,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { return TSDB_CODE_DISCONNECTED; } - SSqlObj *pSql = pObj->pSql; + SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); SSqlRes *pRes = &pSql->res; pRes->numOfTotal = 0; // the number of getting table meta from server diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 1b1aaa54c9..9e1628bb9b 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -459,14 +459,14 @@ static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) { return (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? timer / 1000L : timer; } -static void setErrorInfo(STscObj* pObj, int32_t code, char* info) { - if (pObj == NULL) { +static void setErrorInfo(SSqlObj* pSql, int32_t code, char* info) { + if (pSql == NULL) { return; } - SSqlCmd* pCmd = &pObj->pSql->cmd; + SSqlCmd* pCmd = &pSql->cmd; - pObj->pSql->res.code = code; + pSql->res.code = code; if (info != NULL) { strncpy(pCmd->payload, info, pCmd->payloadLen); @@ -480,7 +480,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); if (pSql == NULL) { - setErrorInfo(pObj, TSDB_CODE_CLI_OUT_OF_MEMORY, NULL); + setErrorInfo(pSql, TSDB_CODE_CLI_OUT_OF_MEMORY, NULL); return NULL; } @@ -490,14 +490,14 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p SSqlRes *pRes = &pSql->res; int ret = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE); if (TSDB_CODE_SUCCESS != ret) { - setErrorInfo(pObj, ret, NULL); + setErrorInfo(pSql, ret, NULL); free(pSql); return NULL; } pSql->sqlstr = strdup(sqlstr); if (pSql->sqlstr == NULL) { - setErrorInfo(pObj, TSDB_CODE_CLI_OUT_OF_MEMORY, NULL); + setErrorInfo(pSql, TSDB_CODE_CLI_OUT_OF_MEMORY, NULL); tfree(pSql); return NULL; @@ -511,7 +511,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p tscResetSqlCmdObj(&pSql->cmd); ret = tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE); if (TSDB_CODE_SUCCESS != ret) { - setErrorInfo(pObj, ret, NULL); + setErrorInfo(pSql, ret, NULL); tscError("%p open stream failed, sql:%s, code:%d", pSql, sqlstr, TSDB_CODE_CLI_OUT_OF_MEMORY); tscFreeSqlObj(pSql); return NULL; @@ -521,7 +521,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p SQLInfoDestroy(&SQLInfo); if (pRes->code != TSDB_CODE_SUCCESS) { - setErrorInfo(pObj, pRes->code, pCmd->payload); + setErrorInfo(pSql, pRes->code, pCmd->payload); tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code); tscFreeSqlObj(pSql); @@ -530,7 +530,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p SSqlStream *pStream = (SSqlStream *)calloc(1, sizeof(SSqlStream)); if (pStream == NULL) { - setErrorInfo(pObj, TSDB_CODE_CLI_OUT_OF_MEMORY, NULL); + setErrorInfo(pSql, TSDB_CODE_CLI_OUT_OF_MEMORY, NULL); tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code); tscFreeSqlObj(pSql); diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 5831ddad4a..f6cbe4a42e 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -181,21 +181,23 @@ static SArray* getTableList( SSqlObj* pSql ) { const char* p = strstr( pSql->sqlstr, " from " ); char* sql = alloca(strlen(p) + 32); sprintf(sql, "select tbid(tbname)%s", p); - int code = taos_query( pSql->pTscObj, sql ); - if (code != TSDB_CODE_SUCCESS) { - tscError("failed to retrieve table id: %s", tstrerror(code)); + + SSqlObj* pSql1 = taos_query(pSql->pTscObj, sql); + if (terrno != TSDB_CODE_SUCCESS) { + tscError("failed to retrieve table id: %s", tstrerror(terrno)); return NULL; } - TAOS_RES* res = taos_use_result( pSql->pTscObj ); TAOS_ROW row; SArray* result = taosArrayInit( 128, sizeof(STidTags) ); - while ((row = taos_fetch_row(res))) { + while ((row = taos_fetch_row(pSql1))) { STidTags tags; memcpy(&tags, row[0], sizeof(tags)); taosArrayPush(result, &tags); } + taos_free_result(pSql1); + return result; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 4b9d2b920f..e85dd578ab 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -409,8 +409,10 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) { } void tscFreeSqlObj(SSqlObj* pSql) { - if (pSql == NULL || pSql->signature != pSql) return; - + if (pSql == NULL || pSql->signature != pSql) { + return; + } + tscTrace("%p start to free sql object", pSql); tscPartiallyFreeSqlObj(pSql); @@ -749,20 +751,7 @@ void tscCloseTscObj(STscObj* pObj) { assert(pObj != NULL); pObj->signature = NULL; - SSqlObj* pSql = pObj->pSql; - - if (pSql) { - terrno = pSql->res.code; - sem_destroy(&pSql->rspSem); - } - taosTmrStopA(&(pObj->pTimer)); - tscFreeSqlObj(pSql); - - if (pSql) { - sem_destroy(&pSql->rspSem); - } - pthread_mutex_destroy(&pObj->mutex); if (pObj->pDnodeConn != NULL) { @@ -1474,22 +1463,27 @@ bool tscShouldFreeHeatBeat(SSqlObj* pHb) { * If connection need to be recycled, the SqlObj also should be freed. */ bool tscShouldBeFreed(SSqlObj* pSql) { - if (pSql == NULL || pSql->signature != pSql || pSql->fp == NULL) { + if (pSql == NULL || pSql->signature != pSql) { return false; } - + + assert(pSql->fp != NULL); + STscObj* pTscObj = pSql->pTscObj; - if (pSql->pStream != NULL || pTscObj->pHb == pSql || pTscObj->pSql == pSql || pSql->pSubscription != NULL) { + if (pSql->pStream != NULL || pTscObj->pHb == pSql || pSql->pSubscription != NULL) { return false; } int32_t command = pSql->cmd.command; - if (command == TSDB_SQL_CONNECT || command == TSDB_SQL_INSERT) { + if (command == TSDB_SQL_META || command == TSDB_SQL_STABLEVGROUP) {//TODO subquery should be freed here return true; - } else { - return tscKeepConn[command] == 0 || - (pSql->res.code != TSDB_CODE_ACTION_IN_PROGRESS && pSql->res.code != TSDB_CODE_SUCCESS); } + + // all subqueries should be automatically freed +// if (pSql->cmd.pQueryInfo != NULL && pSql->cmd.pQueryInfo[0]->type & TSDB_QUERY_TYPE_SUBQUERY) { +// return true; +// } + return false; } /** @@ -1952,15 +1946,14 @@ int16_t tscGetJoinTagColIndexByUid(STagCond* pTagCond, uint64_t uid) { } } -bool tscIsUpdateQuery(STscObj* pObj) { - if (pObj == NULL || pObj->signature != pObj) { +bool tscIsUpdateQuery(SSqlObj* pSql) { + if (pSql == NULL || pSql->signature != pSql) { terrno = TSDB_CODE_DISCONNECTED; return TSDB_CODE_DISCONNECTED; } - SSqlCmd* pCmd = &pObj->pSql->cmd; - return ((pCmd->command >= TSDB_SQL_INSERT && pCmd->command <= TSDB_SQL_DROP_DNODE) || - TSDB_SQL_USE_DB == pCmd->command); + SSqlCmd* pCmd = &pSql->cmd; + return ((pCmd->command >= TSDB_SQL_INSERT && pCmd->command <= TSDB_SQL_DROP_DNODE) || TSDB_SQL_USE_DB == pCmd->command); } int32_t tscInvalidSQLErrMsg(char* msg, const char* additionalInfo, const char* sql) { diff --git a/src/inc/taos.h b/src/inc/taos.h index 2f23b10a61..947cbe6759 100644 --- a/src/inc/taos.h +++ b/src/inc/taos.h @@ -88,14 +88,13 @@ int taos_stmt_execute(TAOS_STMT *stmt); TAOS_RES * taos_stmt_use_result(TAOS_STMT *stmt); int taos_stmt_close(TAOS_STMT *stmt); -DLL_EXPORT int taos_query(TAOS *taos, const char *sql); -DLL_EXPORT TAOS_RES *taos_use_result(TAOS *taos); +DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql); DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res); DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result DLL_EXPORT void taos_free_result(TAOS_RES *res); -DLL_EXPORT int taos_field_count(TAOS *taos); +DLL_EXPORT int taos_field_count(TAOS_RES *tres); DLL_EXPORT int taos_num_fields(TAOS_RES *res); -DLL_EXPORT int taos_affected_rows(TAOS *taos); +DLL_EXPORT int taos_affected_rows(TAOS_RES *taos); DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res); DLL_EXPORT int taos_select_db(TAOS *taos, const char *db); DLL_EXPORT int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields); @@ -112,9 +111,9 @@ int* taos_fetch_lengths(TAOS_RES *res); // TODO: the return value should be `const` DLL_EXPORT char *taos_get_server_info(TAOS *taos); DLL_EXPORT char *taos_get_client_info(); -DLL_EXPORT char *taos_errstr(TAOS *taos); +DLL_EXPORT char *taos_errstr(TAOS_RES *tres); -DLL_EXPORT int taos_errno(TAOS *taos); +DLL_EXPORT int taos_errno(TAOS_RES *tres); DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, void (*fp)(void *param, TAOS_RES *, int code), void *param); DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, int numOfRows), void *param); diff --git a/src/kit/shell/src/shellEngine.c b/src/kit/shell/src/shellEngine.c index 5b4da875de..a01decc6c6 100644 --- a/src/kit/shell/src/shellEngine.c +++ b/src/kit/shell/src/shellEngine.c @@ -275,22 +275,28 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { st = taosGetTimestampUs(); - if (taos_query(con, command)) { - taos_error(con); + TAOS_RES* pSql = taos_query(con, command); + if (taos_errno(pSql)) { + taos_error(pSql); return; } if (regex_match(command, "^\\s*use\\s+[a-zA-Z0-9_]+\\s*;\\s*$", REG_EXTENDED | REG_ICASE)) { fprintf(stdout, "Database changed.\n\n"); fflush(stdout); + + taos_free_result(pSql); return; } - int num_fields = taos_field_count(con); + int num_fields = taos_field_count(pSql); if (num_fields != 0) { // select and show kinds of commands int error_no = 0; - int numOfRows = shellDumpResult(con, fname, &error_no, printMode); - if (numOfRows < 0) return; + int numOfRows = shellDumpResult(pSql, fname, &error_no, printMode); + if (numOfRows < 0) { + taos_free_result(pSql); + return; + } et = taosGetTimestampUs(); if (error_no == 0) { @@ -299,7 +305,7 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { printf("Query interrupted (%s), %d row(s) in set (%.6fs)\n", taos_errstr(con), numOfRows, (et - st) / 1E6); } } else { - int num_rows_affacted = taos_affected_rows(con); + int num_rows_affacted = taos_affected_rows(pSql); et = taosGetTimestampUs(); printf("Query OK, %d row(s) affected (%.6fs)\n", num_rows_affacted, (et - st) / 1E6); } @@ -309,6 +315,8 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { if (fname != NULL) { wordfree(&full_path); } + + taos_free_result(pSql); } /* Function to do regular expression check */ @@ -461,6 +469,7 @@ static int dumpResultToFile(const char* fname, TAOS_RES* result) { } while( row != NULL); fclose(fp); + taos_free_result(result); return numOfRows; } @@ -548,15 +557,15 @@ static void printField(const char* val, TAOS_FIELD* field, int width, int32_t le } -static int verticalPrintResult(TAOS_RES* result) { - TAOS_ROW row = taos_fetch_row(result); +static int verticalPrintResult(TAOS_RES* tres) { + TAOS_ROW row = taos_fetch_row(tres); if (row == NULL) { return 0; } - int num_fields = taos_num_fields(result); - TAOS_FIELD *fields = taos_fetch_fields(result); - int precision = taos_result_precision(result); + int num_fields = taos_num_fields(tres); + TAOS_FIELD *fields = taos_fetch_fields(tres); + int precision = taos_result_precision(tres); int maxColNameLen = 0; for (int col = 0; col < num_fields; col++) { @@ -569,7 +578,7 @@ static int verticalPrintResult(TAOS_RES* result) { int numOfRows = 0; do { printf("*************************** %d.row ***************************\n", numOfRows + 1); - int32_t* length = taos_fetch_lengths(result); + int32_t* length = taos_fetch_lengths(tres); for (int i = 0; i < num_fields; i++) { TAOS_FIELD* field = fields + i; @@ -581,7 +590,7 @@ static int verticalPrintResult(TAOS_RES* result) { } numOfRows++; - row = taos_fetch_row(result); + row = taos_fetch_row(tres); } while(row != NULL); return numOfRows; @@ -656,15 +665,15 @@ static void printHeader(TAOS_FIELD* fields, int* width, int num_fields) { } -static int horizontalPrintResult(TAOS_RES* result) { - TAOS_ROW row = taos_fetch_row(result); +static int horizontalPrintResult(TAOS_RES* tres) { + TAOS_ROW row = taos_fetch_row(tres); if (row == NULL) { return 0; } - int num_fields = taos_num_fields(result); - TAOS_FIELD *fields = taos_fetch_fields(result); - int precision = taos_result_precision(result); + int num_fields = taos_num_fields(tres); + TAOS_FIELD *fields = taos_fetch_fields(tres); + int precision = taos_result_precision(tres); int width[TSDB_MAX_COLUMNS]; for (int col = 0; col < num_fields; col++) { @@ -675,7 +684,7 @@ static int horizontalPrintResult(TAOS_RES* result) { int numOfRows = 0; do { - int32_t* length = taos_fetch_lengths(result); + int32_t* length = taos_fetch_lengths(tres); for (int i = 0; i < num_fields; i++) { putchar(' '); printField(row[i], fields + i, width[i], length[i], precision); @@ -684,32 +693,24 @@ static int horizontalPrintResult(TAOS_RES* result) { } putchar('\n'); numOfRows++; - row = taos_fetch_row(result); + row = taos_fetch_row(tres); } while(row != NULL); return numOfRows; } -int shellDumpResult(TAOS *con, char *fname, int *error_no, bool vertical) { +int shellDumpResult(TAOS_RES *tres, char *fname, int *error_no, bool vertical) { int numOfRows = 0; - - TAOS_RES* result = taos_use_result(con); - if (result == NULL) { - taos_error(con); - return -1; - } - if (fname != NULL) { - numOfRows = dumpResultToFile(fname, result); + numOfRows = dumpResultToFile(fname, tres); } else if(vertical) { - numOfRows = verticalPrintResult(result); + numOfRows = verticalPrintResult(tres); } else { - numOfRows = horizontalPrintResult(result); + numOfRows = horizontalPrintResult(tres); } - *error_no = taos_errno(con); - taos_free_result(result); + *error_no = taos_errno(tres); return numOfRows; } @@ -771,12 +772,11 @@ void write_history() { fclose(f); } -void taos_error(TAOS *con) { - fprintf(stderr, "\nDB error: %s\n", taos_errstr(con)); +void taos_error(TAOS_RES *tres) { + fprintf(stderr, "\nDB error: %s\n", taos_errstr(tres)); /* free local resouce: allocated memory/metric-meta refcnt */ - TAOS_RES *pRes = taos_use_result(con); - taos_free_result(pRes); + taos_free_result(tres); } int isCommentLine(char *line) { @@ -858,8 +858,9 @@ void shellGetGrantInfo(void *con) { char sql[] = "show grants"; - int code = taos_query(con, sql); - + TAOS_RES* pSql = taos_query(con, sql); + int code = taos_errno(pSql); + if (code != TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_OPS_NOT_SUPPORT) { fprintf(stdout, "Server is Community Edition, version is %s\n\n", taos_get_server_info(con)); @@ -869,12 +870,11 @@ void shellGetGrantInfo(void *con) { return; } - int num_fields = taos_field_count(con); + int num_fields = taos_field_count(result); if (num_fields == 0) { fprintf(stderr, "\nInvalid grant information.\n"); exit(0); } else { - result = taos_use_result(con); if (result == NULL) { fprintf(stderr, "\nGrant information is null.\n"); exit(0); diff --git a/src/kit/shell/src/shellImport.c b/src/kit/shell/src/shellImport.c index 70660254a5..1dea6bca70 100644 --- a/src/kit/shell/src/shellImport.c +++ b/src/kit/shell/src/shellImport.c @@ -192,11 +192,14 @@ static void shellSourceFile(TAOS *con, char *fptr) { } memcpy(cmd + cmd_len, line, read_len); - if (taos_query(con, cmd)) { + + TAOS_RES* pSql = taos_query(con, cmd); + int32_t code = taos_errno(pSql); + + if (code != 0) { fprintf(stderr, "DB error: %s: %s (%d)\n", taos_errstr(con), fname, lineNo); /* free local resouce: allocated memory/metric-meta refcnt */ - TAOS_RES *pRes = taos_use_result(con); - taos_free_result(pRes); + taos_free_result(pSql); } memset(cmd, 0, MAX_COMMAND_SIZE); diff --git a/src/kit/shell/src/shellMain.c b/src/kit/shell/src/shellMain.c index 3485a964d7..1fbe04208c 100644 --- a/src/kit/shell/src/shellMain.c +++ b/src/kit/shell/src/shellMain.c @@ -18,21 +18,21 @@ #include "tsclient.h" #include "tutil.h" -TAOS* con; +TAOS_RES* con; pthread_t pid; // TODO: IMPLEMENT INTERRUPT HANDLER. void interruptHandler(int signum) { #ifdef LINUX - TAOS_RES* res = taos_use_result(con); - taos_stop_query(res); - if (res != NULL) { + taos_stop_query(con); + if (con != NULL) { /* * we need to free result in async model, in order to avoid free * results while the master thread is waiting for server response. */ - tscQueueAsyncFreeResult(res); + tscQueueAsyncFreeResult(con); } + result = NULL; #else printf("\nReceive ctrl+c or other signal, quit shell.\n"); @@ -90,7 +90,6 @@ int main(int argc, char* argv[]) { /* Initialize the shell */ con = shellInit(&args); if (con == NULL) { - taos_error(con); exit(EXIT_FAILURE); } diff --git a/tests/tsim/src/simExe.c b/tests/tsim/src/simExe.c index 0c35e3668c..1be446fa08 100644 --- a/tests/tsim/src/simExe.c +++ b/tests/tsim/src/simExe.c @@ -635,9 +635,13 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) { SCmdLine *line = &script->lines[script->linePos]; int ret = -1; + TAOS_RES* pSql = NULL; + for (int attempt = 0; attempt < 3; ++attempt) { simLogSql(rest); - ret = taos_query(script->taos, rest); + pSql = taos_query(script->taos, rest); + ret = terrno; + if (ret == TSDB_CODE_TABLE_ALREADY_EXIST || ret == TSDB_CODE_DB_ALREADY_EXIST) { simTrace("script:%s, taos:%p, %s success, ret:%d:%s", script->fileName, script->taos, rest, ret, tstrerror(ret)); @@ -663,10 +667,9 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) { } int numOfRows = 0; - int num_fields = taos_field_count(script->taos); + int num_fields = taos_field_count(pSql); if (num_fields != 0) { - TAOS_RES *result = taos_use_result(script->taos); - if (result == NULL) { + if (pSql == NULL) { simTrace("script:%s, taos:%p, %s failed, result is null", script->fileName, script->taos, rest); if (line->errorJump == SQL_JUMP_TRUE) { script->linePos = line->jump; @@ -679,10 +682,10 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) { TAOS_ROW row; - while ((row = taos_fetch_row(result))) { + while ((row = taos_fetch_row(pSql))) { if (numOfRows < MAX_QUERY_ROW_NUM) { - TAOS_FIELD *fields = taos_fetch_fields(result); - int* length = taos_fetch_lengths(result); + TAOS_FIELD *fields = taos_fetch_fields(pSql); + int* length = taos_fetch_lengths(pSql); for (int i = 0; i < num_fields; i++) { char *value = NULL; @@ -768,9 +771,9 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) { } } - taos_free_result(result); + taos_free_result(pSql); } else { - numOfRows = taos_affected_rows(script->taos); + numOfRows = taos_affected_rows(pSql); } sprintf(script->rows, "%d", numOfRows); @@ -911,13 +914,17 @@ bool simExecuteSqlErrorCmd(SScript *script, char *rest) { } int ret; + TAOS_RES* pSql = NULL; if (simAsyncQuery) { char command[4096]; sprintf(command, "curl -H 'Authorization: Taosd %s' -d '%s' 127.0.0.1:6020/rest/sql", script->auth, rest); ret = simExecuteRestFulCommand(script, command); } else { - ret = taos_query(script->taos, rest); + pSql = taos_query(script->taos, rest); + ret = terrno; + + taos_free_result(pSql); } if (ret != TSDB_CODE_SUCCESS) { @@ -926,6 +933,7 @@ bool simExecuteSqlErrorCmd(SScript *script, char *rest) { script->linePos++; return true; } + sprintf(script->error, "lineNum:%d. sql:%s expect failed, but success, ret:%d:%s", line->lineNum, rest, ret, tstrerror(ret)); return false; -- GitLab