提交 0944c0cd 编写于 作者: H Haojun Liao

[td-314] [td-437] refactor C API.

上级 82d75cbd
......@@ -293,7 +293,6 @@ typedef struct STscObj {
char sversion[TSDB_VERSION_LEN];
char writeAuth : 1;
char superAuth : 1;
struct SSqlObj * pSql;
struct SSqlObj * pHb;
struct SSqlObj * sqlList;
struct SSqlStream *streamList;
......@@ -408,7 +407,7 @@ void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, void (*fp)(), void *param, const
void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql);
void tscKillSTableQuery(SSqlObj *pSql);
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);
bool tscIsUpdateQuery(STscObj *pObj);
bool tscIsUpdateQuery(SSqlObj* pSql);
bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes);
char *tscGetErrorMsgPayload(SSqlCmd *pCmd);
......
......@@ -268,18 +268,18 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeQueryImp(J
}
jniTrace("jobj:%p, conn:%p, sql:%s", jobj, tscon, dst);
int code = taos_query(tscon, dst);
if (code != 0) {
jniError("jobj:%p, conn:%p, code:%s, msg:%s", jobj, tscon, tstrerror(code), taos_errstr(tscon));
SSqlObj *pSql = taos_query(tscon, dst);
if (pSql == NULL || pSql->res.code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s, msg:%s", jobj, tscon, tstrerror(pSql->res.code), taos_errstr(tscon));
free(dst);
return JNI_TDENGINE_ERROR;
} else {
int32_t affectRows = 0;
SSqlObj *pSql = ((STscObj *)tscon)->pSql;
int32_t affectRows = 0;
int32_t code = pSql->res.code;
if (pSql->cmd.command == TSDB_SQL_INSERT) {
affectRows = taos_affected_rows(tscon);
affectRows = taos_affected_rows(pSql);
jniTrace("jobj:%p, conn:%p, code:%s, affect rows:%d", jobj, tscon, tstrerror(code), affectRows);
} else {
jniTrace("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
......@@ -306,20 +306,20 @@ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrMsgImp(J
}
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getResultSetImp(JNIEnv *env, jobject jobj, jlong con) {
TAOS *tscon = (TAOS *)con;
if (tscon == NULL) {
SSqlObj *pSql = (TAOS_RES *) con;
if (pSql == NULL) {
jniError("jobj:%p, connection is closed", jobj);
return JNI_CONNECTION_NULL;
}
jlong ret = 0;
if (tscIsUpdateQuery(tscon)) {
STscObj* pObj = pSql->pTscObj;
if (tscIsUpdateQuery(pSql)) {
ret = 0; // for update query, no result pointer
jniTrace("jobj:%p, conn:%p, no resultset", jobj, tscon);
jniTrace("jobj:%p, conn:%p, no resultset", jobj, pObj);
} else {
ret = (jlong) taos_use_result(tscon);
jniTrace("jobj:%p, conn:%p, get resultset:%p", jobj, tscon, (void *) ret);
jniTrace("jobj:%p, conn:%p, get resultset:%p", jobj, pObj, (void *) ret);
}
return ret;
......
......@@ -220,9 +220,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
if (pObj->signature != pObj) {
tscTrace("%p sql is already released or DB connection is closed, freed:%d pObj:%p signature:%p", pSql, pSql->freed,
pObj, pObj->signature);
if (pSql != pObj->pSql) {
tscFreeSqlObj(pSql);
}
tscFreeSqlObj(pSql);
rpcFreeCont(rpcMsg->pCont);
return;
}
......@@ -257,6 +255,9 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
rpcMsg->code = TSDB_CODE_NOT_READY;
rpcFreeCont(rpcMsg->pCont);
return;
} else if (pCmd->command == TSDB_SQL_META) {
// rpcFreeCont(rpcMsg->pCont);
// return;
} else {
tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry);
......@@ -331,7 +332,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
if (rpcMsg->code != TSDB_CODE_ACTION_IN_PROGRESS) {
void *taosres = tscKeepConn[pCmd->command] ? pSql : NULL;
rpcMsg->code = pRes->code ? pRes->code : pRes->numOfRows;
tscTrace("%p SQL result:%s res:%p", pSql, tstrerror(pRes->code), pSql);
......@@ -345,7 +345,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) {
* the tscShouldBeFreed will success and tscFreeSqlObj free it immediately.
*/
bool shouldFree = tscShouldBeFreed(pSql);
(*pSql->fp)(pSql->param, taosres, rpcMsg->code);
(*pSql->fp)(pSql->param, pSql, rpcMsg->code);
if (shouldFree) {
tscTrace("%p sqlObj is automatically freed", pSql);
......
......@@ -52,7 +52,7 @@ static bool validPassword(const char* passwd) {
return validImpl(passwd, TSDB_PASSWORD_LEN);
}
STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, const char *db, uint16_t port,
SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, const char *db, uint16_t port,
void (*fp)(void *, TAOS_RES *, int), void *param, void **taos) {
taos_init();
......@@ -120,10 +120,8 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
pSql->pTscObj = pObj;
pSql->signature = pSql;
pSql->maxRetry = TSDB_MAX_REPLICA_NUM;
tsem_init(&pSql->rspSem, 0, 0);
pObj->pSql = pSql;
pObj->pDnodeConn = pDnodeConn;
pSql->fp = fp;
......@@ -143,42 +141,38 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
// tsRpcHeaderSize will be updated during RPC initialization, so only after it initialization, this value is valid
tsInsertHeadSize = tsRpcHeadSize + sizeof(SMsgDesc) + sizeof(SSubmitMsg);
return pObj;
return pSql;
}
static void syncConnCallback(void *param, TAOS_RES *tres, int code) {
STscObj *pObj = (STscObj *)param;
assert(pObj != NULL && pObj->pSql != NULL);
if (code < 0) {
pObj->pSql->res.code = code;
}
SSqlObj *pSql = (SSqlObj *) tres;
assert(pSql != NULL);
sem_post(&pObj->pSql->rspSem);
sem_post(&pSql->rspSem);
}
TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port) {
tscTrace("try to create a connection to %s:%u, user:%s db:%s", ip, port, user, db);
STscObj *pObj = taosConnectImpl(ip, user, pass, db, port, NULL, NULL, NULL);
if (pObj != NULL) {
SSqlObj* pSql = pObj->pSql;
assert(pSql != NULL);
STscObj* pObj = NULL;
SSqlObj *pSql = taosConnectImpl(ip, user, pass, db, port, syncConnCallback, NULL, (void**) &pObj);
if (pSql != NULL) {
pSql->fp = syncConnCallback;
pSql->param = pObj;
pSql->param = pSql;
tscProcessSql(pSql);
sem_wait(&pSql->rspSem);
if (pSql->res.code != TSDB_CODE_SUCCESS) {
terrno = pSql->res.code;
taos_free_result(pSql);
taos_close(pObj);
return NULL;
}
tscTrace("%p DB connection is opening, dnodeConn:%p", pObj, pObj->pDnodeConn);
taos_free_result(pSql);
// version compare only requires the first 3 segments of the version string
int code = taosCheckVersion(version, taos_get_server_info(pObj), 3);
if (code != 0) {
......@@ -195,17 +189,14 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
void *param, void **taos) {
STscObj* pObj = taosConnectImpl(ip, user, pass, db, port, fp, param, taos);
if (pObj == NULL) {
SSqlObj* pSql = taosConnectImpl(ip, user, pass, db, port, fp, param, taos);
if (pSql == NULL) {
return NULL;
}
SSqlObj* pSql = pObj->pSql;
pSql->res.code = tscProcessSql(pSql);
tscTrace("%p DB async connection is opening", pObj);
return pObj;
tscTrace("%p DB async connection is opening", taos);
return taos;
}
void taos_close(TAOS *taos) {
......@@ -265,41 +256,32 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) {
}
static void waitForQueryRsp(void *param, TAOS_RES *tres, int code) {
assert(param != NULL);
SSqlObj *pSql = ((STscObj *)param)->pSql;
// valid error code is less than 0
if (code < 0) {
pSql->res.code = code;
}
assert(tres != NULL);
SSqlObj *pSql = (SSqlObj *) tres;
sem_post(&pSql->rspSem);
}
int taos_query(TAOS *taos, const char *sqlstr) {
TAOS_RES* taos_query(TAOS *taos, const char *sqlstr) {
STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) {
terrno = TSDB_CODE_DISCONNECTED;
return TSDB_CODE_DISCONNECTED;
return NULL;
}
SSqlObj* pSql = pObj->pSql;
size_t sqlLen = strlen(sqlstr);
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
if (pSql == NULL) {
tscError("failed to malloc sqlObj");
terrno = TSDB_CODE_CLI_OUT_OF_MEMORY;
return NULL;
}
size_t sqlLen = strlen(sqlstr);
doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen);
// wait for the callback function to post the semaphore
tsem_wait(&pSql->rspSem);
return pSql->res.code;
}
TAOS_RES *taos_use_result(TAOS *taos) {
STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) {
terrno = TSDB_CODE_DISCONNECTED;
return NULL;
}
return pObj->pSql;
return pSql;
}
int taos_result_precision(TAOS_RES *res) {
......@@ -332,18 +314,18 @@ int taos_num_fields(TAOS_RES *res) {
return num;
}
int taos_field_count(TAOS *taos) {
STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) return 0;
int taos_field_count(TAOS_RES *tres) {
SSqlObj* pSql = (SSqlObj*) tres;
if (pSql == NULL || pSql->signature != pSql) return 0;
return taos_num_fields(pObj->pSql);
return taos_num_fields(pSql);
}
int taos_affected_rows(TAOS *taos) {
STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) return 0;
int taos_affected_rows(TAOS_RES *tres) {
SSqlObj* pSql = (SSqlObj*) tres;
if (pSql == NULL || pSql->signature != pSql) return 0;
return (pObj->pSql->res.numOfRows);
return (pSql->res.numOfRows);
}
TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
......@@ -385,9 +367,8 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) {
SSqlObj *pSql = (SSqlObj *)res;
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
STscObj *pObj = pSql->pTscObj;
if (pRes->qhandle == 0 || pObj->pSql != pSql) {
if (pRes->qhandle == 0 || pSql->signature != pSql) {
*rows = NULL;
return 0;
}
......@@ -521,7 +502,11 @@ int taos_select_db(TAOS *taos, const char *db) {
}
snprintf(sql, tListLen(sql), "use %s", db);
return taos_query(taos, sql);
SSqlObj* pSql = taos_query(taos, sql);
int32_t code = pSql->res.code;
taos_free_result(pSql);
return code;
}
void taos_free_result(TAOS_RES *res) {
......@@ -533,83 +518,62 @@ void taos_free_result(TAOS_RES *res) {
tscTrace("%p start to free result", pSql);
if (pSql->signature != pSql) return;
STscObj* pObj = pSql->pTscObj;
if (pRes == NULL || pRes->qhandle == 0) {
/* Query rsp is not received from vnode, so the qhandle is NULL */
tscTrace("%p qhandle is null, abort free, fp:%p", pSql, pSql->fp);
// The semaphore can not be changed while freeing async sub query objects.
if (pObj->pSql != pSql) {
tscTrace("%p SqlObj is freed by app", pSql);
tscFreeSqlObj(pSql);
} else {
tscPartiallyFreeSqlObj(pSql);
}
if (pSql->signature != pSql) {
tscTrace("%p result has been freed", pSql);
return;
}
// The semaphore can not be changed while freeing async sub query objects.
if (pRes == NULL || pRes->qhandle == 0) {
tscTrace("%p SqlObj is freed by app, phandle is null", pSql);
tscFreeSqlObj(pSql);
return;
}
// set freeFlag to 1 in retrieve message if there are un-retrieved results data in node
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
if (pQueryInfo == NULL) {
tscPartiallyFreeSqlObj(pSql);
tscFreeSqlObj(pSql);
return;
}
pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE;
STscObj* pTscObj = pSql->pTscObj;
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
/*
* case 1. Partial data have been retrieved from vnodes, but not all data has been retrieved yet.
* We need to recycle the connection by noticing the vnode return 0 results.
* case 2. When the query response is received from vnodes and the numOfRows is set to 0, the user calls
* taos_free_result before the taos_fetch_row is called in non-stream computing,
* we need to recycle the connection.
* case 3. If the query process is cancelled by user in stable query, tscProcessSql should not be called
* for each subquery. Because the failure of execution tsProcessSql may trigger the callback function
* be executed, and the retry efforts may result in double free the resources, e.g.,SRetrieveSupport
* If the query process is cancelled by user in stable query, tscProcessSql should not be called
* for each subquery. Because the failure of execution tsProcessSql may trigger the callback function
* be executed, and the retry efforts may result in double free the resources, e.g.,SRetrieveSupport
*/
if ((pCmd->command == TSDB_SQL_SELECT ||
pCmd->command == TSDB_SQL_SHOW ||
pCmd->command == TSDB_SQL_RETRIEVE ||
pCmd->command == TSDB_SQL_FETCH) && pRes->code == TSDB_CODE_SUCCESS && pRes->completed == false &&
if (pRes->code == TSDB_CODE_SUCCESS && pRes->completed == false &&
(pCmd->command == TSDB_SQL_SELECT || pCmd->command == TSDB_SQL_SHOW ||
pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_FETCH) &&
(pCmd->command == TSDB_SQL_SELECT && pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL)) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
tscTrace("%p send msg to free qhandle in vnode, code:%d, numOfRows:%d, command:%s", pSql, pRes->code, pRes->numOfRows,
sqlCmd[pCmd->command]);
tscTrace("%p send msg to free qhandle in node, code:%d, command:%s", pSql, pRes->code, sqlCmd[pCmd->command]);
pSql->freed = 1;
tscProcessSql(pSql);
// waits for response and then goes on
if (pTscObj->pSql == pSql) {
// in case of sync model query, waits for response and then goes on
if (pSql->fp == waitForQueryRsp || pSql->fp == waitForRetrieveRsp) {
sem_wait(&pSql->rspSem);
}
} else { // if no free resource msg is sent to vnode, we free this object immediately.
if (pTscObj->pSql != pSql) {
tscFreeSqlObj(pSql);
tscTrace("%p sql result is freed by app", pSql);
} else {
tscPartiallyFreeSqlObj(pSql);
tscTrace("%p sql result is freed by app", pSql);
}
}
tscFreeSqlObj(pSql);
tscTrace("%p sql result is freed by app", pSql);
}
// todo should not be used in async query
int taos_errno(TAOS *taos) {
STscObj *pObj = (STscObj *)taos;
int taos_errno(TAOS_RES *tres) {
SSqlObj *pSql = (SSqlObj *) tres;
if (pObj == NULL || pObj->signature != pObj) {
if (pSql == NULL || pSql->signature != pSql) {
return terrno;
}
return pObj->pSql->res.code;
return pSql->res.code;
}
/*
......@@ -632,14 +596,13 @@ static bool hasAdditionalErrorInfo(int32_t code, SSqlCmd *pCmd) {
}
// todo should not be used in async model
char *taos_errstr(TAOS *taos) {
STscObj *pObj = (STscObj *)taos;
char *taos_errstr(TAOS_RES *tres) {
SSqlObj *pSql = (SSqlObj *) tres;
if (pObj == NULL || pObj->signature != pObj)
return (char*)tstrerror(terrno);
if (pSql == NULL || pSql->signature != pSql) {
return (char*) tstrerror(terrno);
}
SSqlObj* pSql = pObj->pSql;
if (hasAdditionalErrorInfo(pSql->res.code, &pSql->cmd)) {
return pSql->cmd.payload;
} else {
......@@ -769,7 +732,8 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
return TSDB_CODE_DISCONNECTED;
}
SSqlObj *pSql = pObj->pSql;
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
......@@ -902,7 +866,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
return TSDB_CODE_DISCONNECTED;
}
SSqlObj *pSql = pObj->pSql;
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
SSqlRes *pRes = &pSql->res;
pRes->numOfTotal = 0; // the number of getting table meta from server
......
......@@ -459,14 +459,14 @@ static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) {
return (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? timer / 1000L : timer;
}
static void setErrorInfo(STscObj* pObj, int32_t code, char* info) {
if (pObj == NULL) {
static void setErrorInfo(SSqlObj* pSql, int32_t code, char* info) {
if (pSql == NULL) {
return;
}
SSqlCmd* pCmd = &pObj->pSql->cmd;
SSqlCmd* pCmd = &pSql->cmd;
pObj->pSql->res.code = code;
pSql->res.code = code;
if (info != NULL) {
strncpy(pCmd->payload, info, pCmd->payloadLen);
......@@ -480,7 +480,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
if (pSql == NULL) {
setErrorInfo(pObj, TSDB_CODE_CLI_OUT_OF_MEMORY, NULL);
setErrorInfo(pSql, TSDB_CODE_CLI_OUT_OF_MEMORY, NULL);
return NULL;
}
......@@ -490,14 +490,14 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
SSqlRes *pRes = &pSql->res;
int ret = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
if (TSDB_CODE_SUCCESS != ret) {
setErrorInfo(pObj, ret, NULL);
setErrorInfo(pSql, ret, NULL);
free(pSql);
return NULL;
}
pSql->sqlstr = strdup(sqlstr);
if (pSql->sqlstr == NULL) {
setErrorInfo(pObj, TSDB_CODE_CLI_OUT_OF_MEMORY, NULL);
setErrorInfo(pSql, TSDB_CODE_CLI_OUT_OF_MEMORY, NULL);
tfree(pSql);
return NULL;
......@@ -511,7 +511,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
tscResetSqlCmdObj(&pSql->cmd);
ret = tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE);
if (TSDB_CODE_SUCCESS != ret) {
setErrorInfo(pObj, ret, NULL);
setErrorInfo(pSql, ret, NULL);
tscError("%p open stream failed, sql:%s, code:%d", pSql, sqlstr, TSDB_CODE_CLI_OUT_OF_MEMORY);
tscFreeSqlObj(pSql);
return NULL;
......@@ -521,7 +521,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
SQLInfoDestroy(&SQLInfo);
if (pRes->code != TSDB_CODE_SUCCESS) {
setErrorInfo(pObj, pRes->code, pCmd->payload);
setErrorInfo(pSql, pRes->code, pCmd->payload);
tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code);
tscFreeSqlObj(pSql);
......@@ -530,7 +530,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
SSqlStream *pStream = (SSqlStream *)calloc(1, sizeof(SSqlStream));
if (pStream == NULL) {
setErrorInfo(pObj, TSDB_CODE_CLI_OUT_OF_MEMORY, NULL);
setErrorInfo(pSql, TSDB_CODE_CLI_OUT_OF_MEMORY, NULL);
tscError("%p open stream failed, sql:%s, reason:%s, code:%d", pSql, sqlstr, pCmd->payload, pRes->code);
tscFreeSqlObj(pSql);
......
......@@ -181,21 +181,23 @@ static SArray* getTableList( SSqlObj* pSql ) {
const char* p = strstr( pSql->sqlstr, " from " );
char* sql = alloca(strlen(p) + 32);
sprintf(sql, "select tbid(tbname)%s", p);
int code = taos_query( pSql->pTscObj, sql );
if (code != TSDB_CODE_SUCCESS) {
tscError("failed to retrieve table id: %s", tstrerror(code));
SSqlObj* pSql1 = taos_query(pSql->pTscObj, sql);
if (terrno != TSDB_CODE_SUCCESS) {
tscError("failed to retrieve table id: %s", tstrerror(terrno));
return NULL;
}
TAOS_RES* res = taos_use_result( pSql->pTscObj );
TAOS_ROW row;
SArray* result = taosArrayInit( 128, sizeof(STidTags) );
while ((row = taos_fetch_row(res))) {
while ((row = taos_fetch_row(pSql1))) {
STidTags tags;
memcpy(&tags, row[0], sizeof(tags));
taosArrayPush(result, &tags);
}
taos_free_result(pSql1);
return result;
}
......
......@@ -409,8 +409,10 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) {
}
void tscFreeSqlObj(SSqlObj* pSql) {
if (pSql == NULL || pSql->signature != pSql) return;
if (pSql == NULL || pSql->signature != pSql) {
return;
}
tscTrace("%p start to free sql object", pSql);
tscPartiallyFreeSqlObj(pSql);
......@@ -749,20 +751,7 @@ void tscCloseTscObj(STscObj* pObj) {
assert(pObj != NULL);
pObj->signature = NULL;
SSqlObj* pSql = pObj->pSql;
if (pSql) {
terrno = pSql->res.code;
sem_destroy(&pSql->rspSem);
}
taosTmrStopA(&(pObj->pTimer));
tscFreeSqlObj(pSql);
if (pSql) {
sem_destroy(&pSql->rspSem);
}
pthread_mutex_destroy(&pObj->mutex);
if (pObj->pDnodeConn != NULL) {
......@@ -1474,22 +1463,27 @@ bool tscShouldFreeHeatBeat(SSqlObj* pHb) {
* If connection need to be recycled, the SqlObj also should be freed.
*/
bool tscShouldBeFreed(SSqlObj* pSql) {
if (pSql == NULL || pSql->signature != pSql || pSql->fp == NULL) {
if (pSql == NULL || pSql->signature != pSql) {
return false;
}
assert(pSql->fp != NULL);
STscObj* pTscObj = pSql->pTscObj;
if (pSql->pStream != NULL || pTscObj->pHb == pSql || pTscObj->pSql == pSql || pSql->pSubscription != NULL) {
if (pSql->pStream != NULL || pTscObj->pHb == pSql || pSql->pSubscription != NULL) {
return false;
}
int32_t command = pSql->cmd.command;
if (command == TSDB_SQL_CONNECT || command == TSDB_SQL_INSERT) {
if (command == TSDB_SQL_META || command == TSDB_SQL_STABLEVGROUP) {//TODO subquery should be freed here
return true;
} else {
return tscKeepConn[command] == 0 ||
(pSql->res.code != TSDB_CODE_ACTION_IN_PROGRESS && pSql->res.code != TSDB_CODE_SUCCESS);
}
// all subqueries should be automatically freed
// if (pSql->cmd.pQueryInfo != NULL && pSql->cmd.pQueryInfo[0]->type & TSDB_QUERY_TYPE_SUBQUERY) {
// return true;
// }
return false;
}
/**
......@@ -1952,15 +1946,14 @@ int16_t tscGetJoinTagColIndexByUid(STagCond* pTagCond, uint64_t uid) {
}
}
bool tscIsUpdateQuery(STscObj* pObj) {
if (pObj == NULL || pObj->signature != pObj) {
bool tscIsUpdateQuery(SSqlObj* pSql) {
if (pSql == NULL || pSql->signature != pSql) {
terrno = TSDB_CODE_DISCONNECTED;
return TSDB_CODE_DISCONNECTED;
}
SSqlCmd* pCmd = &pObj->pSql->cmd;
return ((pCmd->command >= TSDB_SQL_INSERT && pCmd->command <= TSDB_SQL_DROP_DNODE) ||
TSDB_SQL_USE_DB == pCmd->command);
SSqlCmd* pCmd = &pSql->cmd;
return ((pCmd->command >= TSDB_SQL_INSERT && pCmd->command <= TSDB_SQL_DROP_DNODE) || TSDB_SQL_USE_DB == pCmd->command);
}
int32_t tscInvalidSQLErrMsg(char* msg, const char* additionalInfo, const char* sql) {
......
......@@ -88,14 +88,13 @@ int taos_stmt_execute(TAOS_STMT *stmt);
TAOS_RES * taos_stmt_use_result(TAOS_STMT *stmt);
int taos_stmt_close(TAOS_STMT *stmt);
DLL_EXPORT int taos_query(TAOS *taos, const char *sql);
DLL_EXPORT TAOS_RES *taos_use_result(TAOS *taos);
DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql);
DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res);
DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result
DLL_EXPORT void taos_free_result(TAOS_RES *res);
DLL_EXPORT int taos_field_count(TAOS *taos);
DLL_EXPORT int taos_field_count(TAOS_RES *tres);
DLL_EXPORT int taos_num_fields(TAOS_RES *res);
DLL_EXPORT int taos_affected_rows(TAOS *taos);
DLL_EXPORT int taos_affected_rows(TAOS_RES *taos);
DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res);
DLL_EXPORT int taos_select_db(TAOS *taos, const char *db);
DLL_EXPORT int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields);
......@@ -112,9 +111,9 @@ int* taos_fetch_lengths(TAOS_RES *res);
// TODO: the return value should be `const`
DLL_EXPORT char *taos_get_server_info(TAOS *taos);
DLL_EXPORT char *taos_get_client_info();
DLL_EXPORT char *taos_errstr(TAOS *taos);
DLL_EXPORT char *taos_errstr(TAOS_RES *tres);
DLL_EXPORT int taos_errno(TAOS *taos);
DLL_EXPORT int taos_errno(TAOS_RES *tres);
DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, void (*fp)(void *param, TAOS_RES *, int code), void *param);
DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, int numOfRows), void *param);
......
......@@ -275,22 +275,28 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
st = taosGetTimestampUs();
if (taos_query(con, command)) {
taos_error(con);
TAOS_RES* pSql = taos_query(con, command);
if (taos_errno(pSql)) {
taos_error(pSql);
return;
}
if (regex_match(command, "^\\s*use\\s+[a-zA-Z0-9_]+\\s*;\\s*$", REG_EXTENDED | REG_ICASE)) {
fprintf(stdout, "Database changed.\n\n");
fflush(stdout);
taos_free_result(pSql);
return;
}
int num_fields = taos_field_count(con);
int num_fields = taos_field_count(pSql);
if (num_fields != 0) { // select and show kinds of commands
int error_no = 0;
int numOfRows = shellDumpResult(con, fname, &error_no, printMode);
if (numOfRows < 0) return;
int numOfRows = shellDumpResult(pSql, fname, &error_no, printMode);
if (numOfRows < 0) {
taos_free_result(pSql);
return;
}
et = taosGetTimestampUs();
if (error_no == 0) {
......@@ -299,7 +305,7 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
printf("Query interrupted (%s), %d row(s) in set (%.6fs)\n", taos_errstr(con), numOfRows, (et - st) / 1E6);
}
} else {
int num_rows_affacted = taos_affected_rows(con);
int num_rows_affacted = taos_affected_rows(pSql);
et = taosGetTimestampUs();
printf("Query OK, %d row(s) affected (%.6fs)\n", num_rows_affacted, (et - st) / 1E6);
}
......@@ -309,6 +315,8 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
if (fname != NULL) {
wordfree(&full_path);
}
taos_free_result(pSql);
}
/* Function to do regular expression check */
......@@ -461,6 +469,7 @@ static int dumpResultToFile(const char* fname, TAOS_RES* result) {
} while( row != NULL);
fclose(fp);
taos_free_result(result);
return numOfRows;
}
......@@ -548,15 +557,15 @@ static void printField(const char* val, TAOS_FIELD* field, int width, int32_t le
}
static int verticalPrintResult(TAOS_RES* result) {
TAOS_ROW row = taos_fetch_row(result);
static int verticalPrintResult(TAOS_RES* tres) {
TAOS_ROW row = taos_fetch_row(tres);
if (row == NULL) {
return 0;
}
int num_fields = taos_num_fields(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
int precision = taos_result_precision(result);
int num_fields = taos_num_fields(tres);
TAOS_FIELD *fields = taos_fetch_fields(tres);
int precision = taos_result_precision(tres);
int maxColNameLen = 0;
for (int col = 0; col < num_fields; col++) {
......@@ -569,7 +578,7 @@ static int verticalPrintResult(TAOS_RES* result) {
int numOfRows = 0;
do {
printf("*************************** %d.row ***************************\n", numOfRows + 1);
int32_t* length = taos_fetch_lengths(result);
int32_t* length = taos_fetch_lengths(tres);
for (int i = 0; i < num_fields; i++) {
TAOS_FIELD* field = fields + i;
......@@ -581,7 +590,7 @@ static int verticalPrintResult(TAOS_RES* result) {
}
numOfRows++;
row = taos_fetch_row(result);
row = taos_fetch_row(tres);
} while(row != NULL);
return numOfRows;
......@@ -656,15 +665,15 @@ static void printHeader(TAOS_FIELD* fields, int* width, int num_fields) {
}
static int horizontalPrintResult(TAOS_RES* result) {
TAOS_ROW row = taos_fetch_row(result);
static int horizontalPrintResult(TAOS_RES* tres) {
TAOS_ROW row = taos_fetch_row(tres);
if (row == NULL) {
return 0;
}
int num_fields = taos_num_fields(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
int precision = taos_result_precision(result);
int num_fields = taos_num_fields(tres);
TAOS_FIELD *fields = taos_fetch_fields(tres);
int precision = taos_result_precision(tres);
int width[TSDB_MAX_COLUMNS];
for (int col = 0; col < num_fields; col++) {
......@@ -675,7 +684,7 @@ static int horizontalPrintResult(TAOS_RES* result) {
int numOfRows = 0;
do {
int32_t* length = taos_fetch_lengths(result);
int32_t* length = taos_fetch_lengths(tres);
for (int i = 0; i < num_fields; i++) {
putchar(' ');
printField(row[i], fields + i, width[i], length[i], precision);
......@@ -684,32 +693,24 @@ static int horizontalPrintResult(TAOS_RES* result) {
}
putchar('\n');
numOfRows++;
row = taos_fetch_row(result);
row = taos_fetch_row(tres);
} while(row != NULL);
return numOfRows;
}
int shellDumpResult(TAOS *con, char *fname, int *error_no, bool vertical) {
int shellDumpResult(TAOS_RES *tres, char *fname, int *error_no, bool vertical) {
int numOfRows = 0;
TAOS_RES* result = taos_use_result(con);
if (result == NULL) {
taos_error(con);
return -1;
}
if (fname != NULL) {
numOfRows = dumpResultToFile(fname, result);
numOfRows = dumpResultToFile(fname, tres);
} else if(vertical) {
numOfRows = verticalPrintResult(result);
numOfRows = verticalPrintResult(tres);
} else {
numOfRows = horizontalPrintResult(result);
numOfRows = horizontalPrintResult(tres);
}
*error_no = taos_errno(con);
taos_free_result(result);
*error_no = taos_errno(tres);
return numOfRows;
}
......@@ -771,12 +772,11 @@ void write_history() {
fclose(f);
}
void taos_error(TAOS *con) {
fprintf(stderr, "\nDB error: %s\n", taos_errstr(con));
void taos_error(TAOS_RES *tres) {
fprintf(stderr, "\nDB error: %s\n", taos_errstr(tres));
/* free local resouce: allocated memory/metric-meta refcnt */
TAOS_RES *pRes = taos_use_result(con);
taos_free_result(pRes);
taos_free_result(tres);
}
int isCommentLine(char *line) {
......@@ -858,8 +858,9 @@ void shellGetGrantInfo(void *con) {
char sql[] = "show grants";
int code = taos_query(con, sql);
TAOS_RES* pSql = taos_query(con, sql);
int code = taos_errno(pSql);
if (code != TSDB_CODE_SUCCESS) {
if (code == TSDB_CODE_OPS_NOT_SUPPORT) {
fprintf(stdout, "Server is Community Edition, version is %s\n\n", taos_get_server_info(con));
......@@ -869,12 +870,11 @@ void shellGetGrantInfo(void *con) {
return;
}
int num_fields = taos_field_count(con);
int num_fields = taos_field_count(result);
if (num_fields == 0) {
fprintf(stderr, "\nInvalid grant information.\n");
exit(0);
} else {
result = taos_use_result(con);
if (result == NULL) {
fprintf(stderr, "\nGrant information is null.\n");
exit(0);
......
......@@ -192,11 +192,14 @@ static void shellSourceFile(TAOS *con, char *fptr) {
}
memcpy(cmd + cmd_len, line, read_len);
if (taos_query(con, cmd)) {
TAOS_RES* pSql = taos_query(con, cmd);
int32_t code = taos_errno(pSql);
if (code != 0) {
fprintf(stderr, "DB error: %s: %s (%d)\n", taos_errstr(con), fname, lineNo);
/* free local resouce: allocated memory/metric-meta refcnt */
TAOS_RES *pRes = taos_use_result(con);
taos_free_result(pRes);
taos_free_result(pSql);
}
memset(cmd, 0, MAX_COMMAND_SIZE);
......
......@@ -18,21 +18,21 @@
#include "tsclient.h"
#include "tutil.h"
TAOS* con;
TAOS_RES* con;
pthread_t pid;
// TODO: IMPLEMENT INTERRUPT HANDLER.
void interruptHandler(int signum) {
#ifdef LINUX
TAOS_RES* res = taos_use_result(con);
taos_stop_query(res);
if (res != NULL) {
taos_stop_query(con);
if (con != NULL) {
/*
* we need to free result in async model, in order to avoid free
* results while the master thread is waiting for server response.
*/
tscQueueAsyncFreeResult(res);
tscQueueAsyncFreeResult(con);
}
result = NULL;
#else
printf("\nReceive ctrl+c or other signal, quit shell.\n");
......@@ -90,7 +90,6 @@ int main(int argc, char* argv[]) {
/* Initialize the shell */
con = shellInit(&args);
if (con == NULL) {
taos_error(con);
exit(EXIT_FAILURE);
}
......
......@@ -635,9 +635,13 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) {
SCmdLine *line = &script->lines[script->linePos];
int ret = -1;
TAOS_RES* pSql = NULL;
for (int attempt = 0; attempt < 3; ++attempt) {
simLogSql(rest);
ret = taos_query(script->taos, rest);
pSql = taos_query(script->taos, rest);
ret = terrno;
if (ret == TSDB_CODE_TABLE_ALREADY_EXIST ||
ret == TSDB_CODE_DB_ALREADY_EXIST) {
simTrace("script:%s, taos:%p, %s success, ret:%d:%s", script->fileName, script->taos, rest, ret, tstrerror(ret));
......@@ -663,10 +667,9 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) {
}
int numOfRows = 0;
int num_fields = taos_field_count(script->taos);
int num_fields = taos_field_count(pSql);
if (num_fields != 0) {
TAOS_RES *result = taos_use_result(script->taos);
if (result == NULL) {
if (pSql == NULL) {
simTrace("script:%s, taos:%p, %s failed, result is null", script->fileName, script->taos, rest);
if (line->errorJump == SQL_JUMP_TRUE) {
script->linePos = line->jump;
......@@ -679,10 +682,10 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) {
TAOS_ROW row;
while ((row = taos_fetch_row(result))) {
while ((row = taos_fetch_row(pSql))) {
if (numOfRows < MAX_QUERY_ROW_NUM) {
TAOS_FIELD *fields = taos_fetch_fields(result);
int* length = taos_fetch_lengths(result);
TAOS_FIELD *fields = taos_fetch_fields(pSql);
int* length = taos_fetch_lengths(pSql);
for (int i = 0; i < num_fields; i++) {
char *value = NULL;
......@@ -768,9 +771,9 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) {
}
}
taos_free_result(result);
taos_free_result(pSql);
} else {
numOfRows = taos_affected_rows(script->taos);
numOfRows = taos_affected_rows(pSql);
}
sprintf(script->rows, "%d", numOfRows);
......@@ -911,13 +914,17 @@ bool simExecuteSqlErrorCmd(SScript *script, char *rest) {
}
int ret;
TAOS_RES* pSql = NULL;
if (simAsyncQuery) {
char command[4096];
sprintf(command, "curl -H 'Authorization: Taosd %s' -d '%s' 127.0.0.1:6020/rest/sql", script->auth, rest);
ret = simExecuteRestFulCommand(script, command);
}
else {
ret = taos_query(script->taos, rest);
pSql = taos_query(script->taos, rest);
ret = terrno;
taos_free_result(pSql);
}
if (ret != TSDB_CODE_SUCCESS) {
......@@ -926,6 +933,7 @@ bool simExecuteSqlErrorCmd(SScript *script, char *rest) {
script->linePos++;
return true;
}
sprintf(script->error, "lineNum:%d. sql:%s expect failed, but success, ret:%d:%s", line->lineNum, rest, ret, tstrerror(ret));
return false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册