提交 cacd0b71 编写于 作者: H hjxilinx

remove the sync function at the client side

上级 bb1a615c
...@@ -429,6 +429,8 @@ void tscFreeSqlObj(SSqlObj *pObj); ...@@ -429,6 +429,8 @@ void tscFreeSqlObj(SSqlObj *pObj);
void tscCloseTscObj(STscObj *pObj); void tscCloseTscObj(STscObj *pObj);
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, int32_t sqlLen);
void tscProcessMultiVnodesInsert(SSqlObj *pSql); void tscProcessMultiVnodesInsert(SSqlObj *pSql);
void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql); void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql);
void tscKillMetricQuery(SSqlObj *pSql); void tscKillMetricQuery(SSqlObj *pSql);
...@@ -448,7 +450,7 @@ void tscQueueAsyncFreeResult(SSqlObj *pSql); ...@@ -448,7 +450,7 @@ void tscQueueAsyncFreeResult(SSqlObj *pSql);
extern void * pVnodeConn; extern void * pVnodeConn;
extern void * pTscMgmtConn; extern void * pTscMgmtConn;
extern void * tscCacheHandle; extern void * tscCacheHandle;
extern uint8_t globalCode; extern int32_t globalCode;
extern int slaveIndex; extern int slaveIndex;
extern void * tscTmr; extern void * tscTmr;
extern void * tscConnCache; extern void * tscConnCache;
......
...@@ -40,32 +40,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo ...@@ -40,32 +40,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows); static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows); static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows);
// TODO return the correct error code to client in tscQueueAsyncError void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, int32_t sqlLen) {
void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *, int), void *param) {
STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) {
tscError("bug!!! pObj:%p", pObj);
globalCode = TSDB_CODE_DISCONNECTED;
tscQueueAsyncError(fp, param);
return;
}
int32_t sqlLen = strlen(sqlstr);
if (sqlLen > tsMaxSQLStringLen) {
tscError("sql string too long");
tscQueueAsyncError(fp, param);
return;
}
taosNotePrintTsc(sqlstr);
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
if (pSql == NULL) {
tscError("failed to malloc sqlObj");
tscQueueAsyncError(fp, param);
return;
}
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
...@@ -108,6 +83,35 @@ void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *, ...@@ -108,6 +83,35 @@ void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *,
tscDoQuery(pSql); tscDoQuery(pSql);
} }
// TODO return the correct error code to client in tscQueueAsyncError
void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *, int), void *param) {
STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) {
tscError("bug!!! pObj:%p", pObj);
globalCode = TSDB_CODE_DISCONNECTED;
tscQueueAsyncError(fp, param);
return;
}
int32_t sqlLen = strlen(sqlstr);
if (sqlLen > tsMaxSQLStringLen) {
tscError("sql string too long");
tscQueueAsyncError(fp, param);
return;
}
taosNotePrintTsc(sqlstr);
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
if (pSql == NULL) {
tscError("failed to malloc sqlObj");
tscQueueAsyncError(fp, param);
return;
}
doAsyncQuery(pObj, pSql, fp, param, sqlstr, sqlLen);
}
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) { static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) {
if (tres == NULL) { if (tres == NULL) {
return; return;
......
...@@ -181,16 +181,22 @@ int tscSendMsgToServer(SSqlObj *pSql) { ...@@ -181,16 +181,22 @@ int tscSendMsgToServer(SSqlObj *pSql) {
} }
pSql->ipList->ip[0] = inet_addr("192.168.0.1"); pSql->ipList->ip[0] = inet_addr("192.168.0.1");
SSqlCmd* pCmd = &pSql->cmd;
if (pSql->cmd.command < TSDB_SQL_MGMT) { if (pSql->cmd.command < TSDB_SQL_MGMT) {
pSql->ipList->port = tsVnodeShellPort; pSql->ipList->port = tsVnodeShellPort;
tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port); tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port);
memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen); memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen);
rpcSendRequest(pVnodeConn, pSql->ipList, pSql->cmd.msgType, pMsg, pSql->cmd.payloadLen, pSql);
SRpcMsg msg = {.msgType = pCmd->msgType, .contLen = pCmd->payloadLen, .pCont = pMsg, .handle = pSql};
rpcSendRequest(pVnodeConn, pSql->ipList, &msg);
} else { } else {
pSql->ipList->port = tsMgmtShellPort; pSql->ipList->port = tsMgmtShellPort;
tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port); tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port);
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
rpcSendRequest(pTscMgmtConn, pSql->ipList, pSql->cmd.msgType, pMsg, pSql->cmd.payloadLen, pSql);
SRpcMsg msg = {.msgType = pCmd->msgType, .contLen = pCmd->payloadLen, .pCont = pMsg, .handle = pSql};
rpcSendRequest(pTscMgmtConn, pSql->ipList, &msg);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -238,6 +238,13 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) { ...@@ -238,6 +238,13 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) {
return pRes->code; return pRes->code;
} }
static void syncQueryCallback(void *param, TAOS_RES *tres, int code) {
STscObj *pObj = (STscObj *)param;
assert(pObj != NULL && pObj->pSql != NULL);
sem_post(&pObj->pSql->rspSem);
}
int taos_query(TAOS *taos, const char *sqlstr) { int taos_query(TAOS *taos, const char *sqlstr) {
STscObj *pObj = (STscObj *)taos; STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) { if (pObj == NULL || pObj->signature != pObj) {
...@@ -245,32 +252,22 @@ int taos_query(TAOS *taos, const char *sqlstr) { ...@@ -245,32 +252,22 @@ int taos_query(TAOS *taos, const char *sqlstr) {
return TSDB_CODE_DISCONNECTED; return TSDB_CODE_DISCONNECTED;
} }
SSqlObj *pSql = pObj->pSql; SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
SSqlRes *pRes = &pSql->res; if (pSql == NULL) {
tscError("failed to malloc sqlObj");
size_t sqlLen = strlen(sqlstr); return TSDB_CODE_CLI_OUT_OF_MEMORY;
if (sqlLen > tsMaxSQLStringLen) {
pRes->code =
tscInvalidSQLErrMsg(pSql->cmd.payload, "sql too long", NULL); // set the additional error msg for invalid sql
tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj);
return pRes->code;
} }
taosNotePrintTsc(sqlstr); pObj->pSql = pSql;
tsem_init(&pSql->rspSem, 0, 0);
void *sql = realloc(pSql->sqlstr, sqlLen + 1); int32_t sqlLen = strlen(sqlstr);
if (sql == NULL) { doAsyncQuery(pObj, pObj->pSql, syncQueryCallback, taos, sqlstr, sqlLen);
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
tscError("%p failed to malloc sql string buffer, reason:%s", pSql, strerror(errno));
tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj); // wait for the callback function to post the semaphore
return pRes->code; sem_wait(&pSql->rspSem);
}
pSql->sqlstr = sql; return pSql->res.code;
strtolower(pSql->sqlstr, sqlstr);
return taos_query_imp(pObj, pSql);
} }
TAOS_RES *taos_use_result(TAOS *taos) { TAOS_RES *taos_use_result(TAOS *taos) {
...@@ -683,33 +680,37 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) { ...@@ -683,33 +680,37 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) {
return doSetResultRowData(pSql); return doSetResultRowData(pSql);
} }
static void asyncFetchCallback(void *param, TAOS_RES *tres, int numOfRows) {
SSqlObj* pSql = (SSqlObj*) tres;
if (numOfRows < 0) {
// set the error code
pSql->res.code = -numOfRows;
}
sem_post(&pSql->rspSem);
}
TAOS_ROW taos_fetch_row(TAOS_RES *res) { TAOS_ROW taos_fetch_row(TAOS_RES *res) {
SSqlObj *pSql = (SSqlObj *)res; SSqlObj *pSql = (SSqlObj *)res;
SSqlCmd *pCmd = &pSql->cmd;
if (pSql == NULL || pSql->signature != pSql) { if (pSql == NULL || pSql->signature != pSql) {
globalCode = TSDB_CODE_DISCONNECTED; globalCode = TSDB_CODE_DISCONNECTED;
return NULL; return NULL;
} }
/* SSqlCmd *pCmd = &pSql->cmd;
* projection query on super table, access each virtual node sequentially retrieve data from vnode list, SSqlRes *pRes = &pSql->res;
* instead of two-stage merge
*/
TAOS_ROW rows = taos_fetch_row_impl(res);
if (rows != NULL) {
return rows;
}
// current subclause is completed, try the next subclause if (pRes->qhandle == 0 || pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pCmd->command == TSDB_SQL_INSERT) {
while (rows == NULL && pCmd->clauseIndex < pCmd->numOfClause - 1) { return NULL;
tscTryQueryNextClause(pSql, NULL); }
// if the rows is not NULL, return immediately // current data are exhausted, fetch more data
rows = taos_fetch_row_impl(res); if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && pCmd->command == TSDB_SQL_RETRIEVE)) {
taos_fetch_rows_a(res, asyncFetchCallback, pSql->pTscObj);
sem_wait(&pSql->rspSem);
} }
return rows; return doSetResultRowData(pSql);
} }
int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
......
...@@ -33,7 +33,7 @@ void * pVMeterConn; ...@@ -33,7 +33,7 @@ void * pVMeterConn;
void * pTscMgmtConn; void * pTscMgmtConn;
void * pSlaveConn; void * pSlaveConn;
void * tscCacheHandle; void * tscCacheHandle;
uint8_t globalCode = 0; int32_t globalCode = 0;
int initialized = 0; int initialized = 0;
int slaveIndex; int slaveIndex;
void * tscTmr; void * tscTmr;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册