diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 8394924caf00d56a5ed040149431d3a789a94614..b7914d68831f555609ea3848decb0513b8d1e2c0 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -429,6 +429,8 @@ void tscFreeSqlObj(SSqlObj *pObj); void tscCloseTscObj(STscObj *pObj); +void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, int32_t sqlLen); + void tscProcessMultiVnodesInsert(SSqlObj *pSql); void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql); void tscKillMetricQuery(SSqlObj *pSql); @@ -448,7 +450,7 @@ void tscQueueAsyncFreeResult(SSqlObj *pSql); extern void * pVnodeConn; extern void * pTscMgmtConn; extern void * tscCacheHandle; -extern uint8_t globalCode; +extern int32_t globalCode; extern int slaveIndex; extern void * tscTmr; extern void * tscConnCache; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 9528097d207f7d2546c3702f2c2712591fb81cc6..1c5c21e85280aac1fcd2dc0a86371027f6228031 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -40,47 +40,22 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows); static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows); -// TODO return the correct error code to client in tscQueueAsyncError -void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *, int), void *param) { - STscObj *pObj = (STscObj *)taos; - if (pObj == NULL || pObj->signature != pObj) { - tscError("bug!!! pObj:%p", pObj); - globalCode = TSDB_CODE_DISCONNECTED; - tscQueueAsyncError(fp, param); - return; - } - - int32_t sqlLen = strlen(sqlstr); - if (sqlLen > tsMaxSQLStringLen) { - tscError("sql string too long"); - tscQueueAsyncError(fp, param); - return; - } - - taosNotePrintTsc(sqlstr); - - SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); - if (pSql == NULL) { - tscError("failed to malloc sqlObj"); - tscQueueAsyncError(fp, param); - return; - } - +void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, int32_t sqlLen) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - + pSql->signature = pSql; pSql->pTscObj = pObj; pSql->fp = fp; pSql->param = param; - + if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) { tscError("failed to malloc payload"); tfree(pSql); tscQueueAsyncError(fp, param); return; } - + pSql->sqlstr = malloc(sqlLen + 1); if (pSql->sqlstr == NULL) { tscError("%p failed to malloc sql string buffer", pSql); @@ -89,25 +64,54 @@ void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *, free(pSql); return; } - + pRes->qhandle = 0; pRes->numOfRows = 1; - + strtolower(pSql->sqlstr, sqlstr); tscDump("%p pObj:%p, Async SQL: %s", pSql, pObj, pSql->sqlstr); - + int32_t code = tsParseSql(pSql, true); if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; - + if (code != TSDB_CODE_SUCCESS) { pSql->res.code = (uint8_t)code; tscQueueAsyncRes(pSql); return; } - + tscDoQuery(pSql); } +// TODO return the correct error code to client in tscQueueAsyncError +void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *, int), void *param) { + STscObj *pObj = (STscObj *)taos; + if (pObj == NULL || pObj->signature != pObj) { + tscError("bug!!! pObj:%p", pObj); + globalCode = TSDB_CODE_DISCONNECTED; + tscQueueAsyncError(fp, param); + return; + } + + int32_t sqlLen = strlen(sqlstr); + if (sqlLen > tsMaxSQLStringLen) { + tscError("sql string too long"); + tscQueueAsyncError(fp, param); + return; + } + + taosNotePrintTsc(sqlstr); + + SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); + if (pSql == NULL) { + tscError("failed to malloc sqlObj"); + tscQueueAsyncError(fp, param); + return; + } + + doAsyncQuery(pObj, pSql, fp, param, sqlstr, sqlLen); +} + static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) { if (tres == NULL) { return; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index c6efafe3329ff57e6d5c3f1dfadd1b49952c4beb..efeabf57f80cf7bda22f75860b780e387af98580 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -181,16 +181,22 @@ int tscSendMsgToServer(SSqlObj *pSql) { } pSql->ipList->ip[0] = inet_addr("192.168.0.1"); + SSqlCmd* pCmd = &pSql->cmd; + if (pSql->cmd.command < TSDB_SQL_MGMT) { pSql->ipList->port = tsVnodeShellPort; tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port); memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen); - rpcSendRequest(pVnodeConn, pSql->ipList, pSql->cmd.msgType, pMsg, pSql->cmd.payloadLen, pSql); + + SRpcMsg msg = {.msgType = pCmd->msgType, .contLen = pCmd->payloadLen, .pCont = pMsg, .handle = pSql}; + rpcSendRequest(pVnodeConn, pSql->ipList, &msg); } else { pSql->ipList->port = tsMgmtShellPort; tscPrint("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList->port); memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); - rpcSendRequest(pTscMgmtConn, pSql->ipList, pSql->cmd.msgType, pMsg, pSql->cmd.payloadLen, pSql); + + SRpcMsg msg = {.msgType = pCmd->msgType, .contLen = pCmd->payloadLen, .pCont = pMsg, .handle = pSql}; + rpcSendRequest(pTscMgmtConn, pSql->ipList, &msg); } return TSDB_CODE_SUCCESS; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 2f3312b0b6dca5d8ec5db584b4ca732b91fcdcdf..d86ca40554aef1f1690cbd789e5bbcd8e3ef2a5c 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -238,39 +238,36 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) { return pRes->code; } +static void syncQueryCallback(void *param, TAOS_RES *tres, int code) { + STscObj *pObj = (STscObj *)param; + assert(pObj != NULL && pObj->pSql != NULL); + + sem_post(&pObj->pSql->rspSem); +} + int taos_query(TAOS *taos, const char *sqlstr) { STscObj *pObj = (STscObj *)taos; if (pObj == NULL || pObj->signature != pObj) { globalCode = TSDB_CODE_DISCONNECTED; return TSDB_CODE_DISCONNECTED; } - - SSqlObj *pSql = pObj->pSql; - SSqlRes *pRes = &pSql->res; - - size_t sqlLen = strlen(sqlstr); - if (sqlLen > tsMaxSQLStringLen) { - pRes->code = - tscInvalidSQLErrMsg(pSql->cmd.payload, "sql too long", NULL); // set the additional error msg for invalid sql - tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj); - - return pRes->code; - } - - taosNotePrintTsc(sqlstr); - - void *sql = realloc(pSql->sqlstr, sqlLen + 1); - if (sql == NULL) { - pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; - tscError("%p failed to malloc sql string buffer, reason:%s", pSql, strerror(errno)); - - tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj); - return pRes->code; + + SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); + if (pSql == NULL) { + tscError("failed to malloc sqlObj"); + return TSDB_CODE_CLI_OUT_OF_MEMORY; } + + pObj->pSql = pSql; + tsem_init(&pSql->rspSem, 0, 0); + + int32_t sqlLen = strlen(sqlstr); + doAsyncQuery(pObj, pObj->pSql, syncQueryCallback, taos, sqlstr, sqlLen); - pSql->sqlstr = sql; - strtolower(pSql->sqlstr, sqlstr); - return taos_query_imp(pObj, pSql); + // wait for the callback function to post the semaphore + sem_wait(&pSql->rspSem); + + return pSql->res.code; } TAOS_RES *taos_use_result(TAOS *taos) { @@ -683,33 +680,37 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) { return doSetResultRowData(pSql); } +static void asyncFetchCallback(void *param, TAOS_RES *tres, int numOfRows) { + SSqlObj* pSql = (SSqlObj*) tres; + if (numOfRows < 0) { + // set the error code + pSql->res.code = -numOfRows; + } + + sem_post(&pSql->rspSem); +} + TAOS_ROW taos_fetch_row(TAOS_RES *res) { SSqlObj *pSql = (SSqlObj *)res; - SSqlCmd *pCmd = &pSql->cmd; - if (pSql == NULL || pSql->signature != pSql) { globalCode = TSDB_CODE_DISCONNECTED; return NULL; } - - /* - * projection query on super table, access each virtual node sequentially retrieve data from vnode list, - * instead of two-stage merge - */ - TAOS_ROW rows = taos_fetch_row_impl(res); - if (rows != NULL) { - return rows; + + SSqlCmd *pCmd = &pSql->cmd; + SSqlRes *pRes = &pSql->res; + + if (pRes->qhandle == 0 || pCmd->command == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pCmd->command == TSDB_SQL_INSERT) { + return NULL; } - - // current subclause is completed, try the next subclause - while (rows == NULL && pCmd->clauseIndex < pCmd->numOfClause - 1) { - tscTryQueryNextClause(pSql, NULL); - - // if the rows is not NULL, return immediately - rows = taos_fetch_row_impl(res); + + // current data are exhausted, fetch more data + if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && pCmd->command == TSDB_SQL_RETRIEVE)) { + taos_fetch_rows_a(res, asyncFetchCallback, pSql->pTscObj); + sem_wait(&pSql->rspSem); } - - return rows; + + return doSetResultRowData(pSql); } int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 6e4653c2d442e58e6d8cfa58a4ed402dc44b725a..300adb7005b01e3e1fdc04e5f14dd76eb9931ff7 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -33,7 +33,7 @@ void * pVMeterConn; void * pTscMgmtConn; void * pSlaveConn; void * tscCacheHandle; -uint8_t globalCode = 0; +int32_t globalCode = 0; int initialized = 0; int slaveIndex; void * tscTmr;