提交 9d3ce4d5 编写于 作者: H hjxilinx

[td-99]

上级 0019048a
......@@ -408,7 +408,7 @@ int tscProcessSql(SSqlObj *pSql);
int tscRenewMeterMeta(SSqlObj *pSql, char *tableId);
void tscQueueAsyncRes(SSqlObj *pSql);
void tscQueueAsyncError(void(*fp), void *param);
void tscQueueAsyncError(void(*fp), void *param, int32_t code);
int tscProcessLocalCmd(SSqlObj *pSql);
int tscCfgDynamicOptions(char *msg);
......@@ -449,7 +449,7 @@ void tscFreeSqlObj(SSqlObj *pObj);
void tscCloseTscObj(STscObj *pObj);
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, int32_t sqlLen);
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen);
void tscProcessMultiVnodesInsert(SSqlObj *pSql);
void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql);
......
......@@ -40,7 +40,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows);
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, int32_t sqlLen) {
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
......@@ -51,14 +51,14 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
tscError("failed to malloc payload");
tscQueueAsyncError(fp, param);
tscQueueAsyncError(fp, param, TSDB_CODE_CLI_OUT_OF_MEMORY);
return;
}
pSql->sqlstr = realloc(pSql->sqlstr, sqlLen + 1);
if (pSql->sqlstr == NULL) {
tscError("%p failed to malloc sql string buffer", pSql);
tscQueueAsyncError(fp, param);
tscQueueAsyncError(fp, param, TSDB_CODE_CLI_OUT_OF_MEMORY);
free(pCmd->payload);
return;
}
......@@ -73,7 +73,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
if (code != TSDB_CODE_SUCCESS) {
pSql->res.code = (uint8_t)code;
pSql->res.code = code;
tscQueueAsyncRes(pSql);
return;
}
......@@ -86,15 +86,16 @@ void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *pa
STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) {
tscError("bug!!! pObj:%p", pObj);
globalCode = TSDB_CODE_DISCONNECTED;
tscQueueAsyncError(fp, param);
terrno = TSDB_CODE_DISCONNECTED;
tscQueueAsyncError(fp, param, TSDB_CODE_DISCONNECTED);
return;
}
int32_t sqlLen = strlen(sqlstr);
if (sqlLen > tsMaxSQLStringLen) {
tscError("sql string too long");
tscQueueAsyncError(fp, param);
terrno = TSDB_CODE_INVALID_SQL;
tscQueueAsyncError(fp, param, TSDB_CODE_INVALID_SQL);
return;
}
......@@ -103,7 +104,8 @@ void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *pa
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
if (pSql == NULL) {
tscError("failed to malloc sqlObj");
tscQueueAsyncError(fp, param);
terrno = TSDB_CODE_CLI_OUT_OF_MEMORY;
tscQueueAsyncError(fp, param, TSDB_CODE_CLI_OUT_OF_MEMORY);
return;
}
......@@ -168,7 +170,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
pRes->code = numOfRows;
}
tscQueueAsyncError(pSql->fetchFp, param);
tscQueueAsyncError(pSql->fetchFp, param, pRes->code);
return;
}
......@@ -198,8 +200,8 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
SSqlObj *pSql = (SSqlObj *)taosa;
if (pSql == NULL || pSql->signature != pSql) {
tscError("sql object is NULL");
globalCode = TSDB_CODE_DISCONNECTED;
tscQueueAsyncError(fp, param);
// globalCode = TSDB_CODE_DISCONNECTED;
tscQueueAsyncError(fp, param, TSDB_CODE_DISCONNECTED);
return;
}
......@@ -208,7 +210,7 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
if (pRes->qhandle == 0) {
tscError("qhandle is NULL");
tscQueueAsyncError(fp, param);
tscQueueAsyncError(fp, param, TSDB_CODE_INVALID_QHANDLE);
return;
}
......@@ -230,8 +232,8 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW),
SSqlObj *pSql = (SSqlObj *)taosa;
if (pSql == NULL || pSql->signature != pSql) {
tscError("sql object is NULL");
globalCode = TSDB_CODE_DISCONNECTED;
tscQueueAsyncError(fp, param);
// globalCode = TSDB_CODE_DISCONNECTED;
tscQueueAsyncError(fp, param, TSDB_CODE_DISCONNECTED);
return;
}
......@@ -240,7 +242,7 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW),
if (pRes->qhandle == 0) {
tscError("qhandle is NULL");
tscQueueAsyncError(fp, param);
tscQueueAsyncError(fp, param, TSDB_CODE_INVALID_QHANDLE);
return;
}
......@@ -329,7 +331,7 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) {
// pCmd may be released, so cache pCmd->command
int cmd = pCmd->command;
int code = pRes->code ? -pRes->code : pRes->numOfRows;
int code = pRes->code;// ? -pRes->code : pRes->numOfRows;
// in case of async insert, restore the user specified callback function
bool shouldFree = tscShouldFreeAsyncSqlObj(pSql);
......@@ -347,18 +349,20 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) {
}
}
void tscProcessAsyncError(SSchedMsg *pMsg) {
static void tscProcessAsyncError(SSchedMsg *pMsg) {
void (*fp)() = pMsg->ahandle;
(*fp)(pMsg->thandle, NULL, -1);
(*fp)(pMsg->thandle, NULL, *(int32_t*)pMsg->msg);
}
void tscQueueAsyncError(void(*fp), void *param) {
void tscQueueAsyncError(void(*fp), void *param, int32_t code) {
int32_t* c = malloc(sizeof(int32_t));
*c = code;
SSchedMsg schedMsg;
schedMsg.fp = tscProcessAsyncError;
schedMsg.ahandle = fp;
schedMsg.thandle = param;
schedMsg.msg = NULL;
schedMsg.msg = c;
taosScheduleTask(tscQhandle, &schedMsg);
}
......
......@@ -329,7 +329,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
void *taosres = tscKeepConn[pCmd->command] ? pSql : NULL;
rpcMsg->code = pRes->code ? pRes->code : pRes->numOfRows;
tscTrace("%p Async SQL result:%s res:%p", pSql, tstrerror(pRes->code), taosres);
tscTrace("%p Async SQL result:%s res:%p", pSql, tstrerror(pRes->code), pSql);
/*
* Whether to free sqlObj or not should be decided before call the user defined function, since this SqlObj
......
......@@ -267,25 +267,25 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) {
return pRes->code;
}
static void syncQueryCallback(void *param, TAOS_RES *tres, int code) {
static void waitForQueryRsp(void *param, TAOS_RES *tres, int code) {
assert(param != NULL);
STscObj *pObj = (STscObj *)param;
SSqlObj *pSql = ((STscObj *)param)->pSql;
assert(pObj->pSql != NULL);
sem_post(&pObj->pSql->rspSem);
pSql->res.code = code;
sem_post(&pSql->rspSem);
}
int taos_query(TAOS *taos, const char *sqlstr) {
STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) {
globalCode = TSDB_CODE_DISCONNECTED;
terrno = TSDB_CODE_DISCONNECTED;
return TSDB_CODE_DISCONNECTED;
}
SSqlObj* pSql = pObj->pSql;
int32_t sqlLen = strlen(sqlstr);
doAsyncQuery(pObj, pObj->pSql, syncQueryCallback, taos, sqlstr, sqlLen);
size_t sqlLen = strlen(sqlstr);
doAsyncQuery(pObj, pObj->pSql, waitForQueryRsp, taos, sqlstr, sqlLen);
// wait for the callback function to post the semaphore
sem_wait(&pSql->rspSem);
......@@ -643,7 +643,7 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) {
return pRes->tsrow;
}
static void asyncFetchCallback(void *param, TAOS_RES *tres, int numOfRows) {
static void waitForRetrieveRsp(void *param, TAOS_RES *tres, int numOfRows) {
SSqlObj* pSql = (SSqlObj*) tres;
if (numOfRows < 0) { // set the error code
......@@ -671,7 +671,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
// current data are exhausted, fetch more data
if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && pRes->completed != true &&
(pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_METRIC || pCmd->command == TSDB_SQL_FETCH))) {
taos_fetch_rows_a(res, asyncFetchCallback, pSql->pTscObj);
taos_fetch_rows_a(res, waitForRetrieveRsp, pSql->pTscObj);
sem_wait(&pSql->rspSem);
}
......@@ -848,22 +848,17 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
void taos_free_result(TAOS_RES *res) { taos_free_result_imp(res, 0); }
// todo should not be used in async query
int taos_errno(TAOS *taos) {
STscObj *pObj = (STscObj *)taos;
int code;
if (pObj == NULL || pObj->signature != pObj) return globalCode;
if ((int8_t)(pObj->pSql->res.code) == -1)
code = TSDB_CODE_OTHERS;
else
code = pObj->pSql->res.code;
if (pObj == NULL || pObj->signature != pObj) {
return terrno;
}
return code;
return pObj->pSql->res.code;
}
//static bool validErrorCode(int32_t code) { return code >= TSDB_CODE_SUCCESS && code < TSDB_CODE_MAX_ERROR_CODE; }
/*
* In case of invalid sql error, additional information is attached to explain
* why the sql is invalid
......@@ -883,6 +878,7 @@ static bool hasAdditionalErrorInfo(int32_t code, SSqlCmd *pCmd) {
return z != NULL;
}
// todo should not be used in async model
char *taos_errstr(TAOS *taos) {
STscObj *pObj = (STscObj *)taos;
......
......@@ -236,7 +236,7 @@ void *taosProcessAlarmSignal(void *tharg) {
void (*callback)(int) = tharg;
timer_t timerId;
struct sigevent sevent;
struct sigevent sevent = {0};
#ifdef _ALPINE
sevent.sigev_notify = SIGEV_THREAD;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册