未验证 提交 e3e6f783 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #13634 from taosdata/feature/3_liaohj

refactor: do some internal refactor, and add a new api
...@@ -194,6 +194,7 @@ DLL_EXPORT int taos_errno(TAOS_RES *tres); ...@@ -194,6 +194,7 @@ DLL_EXPORT int taos_errno(TAOS_RES *tres);
DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param); DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param);
DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param); DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param);
DLL_EXPORT void taos_fetch_raw_block_a(TAOS_RES* res, __taos_async_fn_t fp, void* param);
// Shuduo: temporary enable for app build // Shuduo: temporary enable for app build
#if 1 #if 1
......
...@@ -171,6 +171,7 @@ typedef struct SReqResultInfo { ...@@ -171,6 +171,7 @@ typedef struct SReqResultInfo {
uint32_t current; uint32_t current;
bool completed; bool completed;
int32_t precision; int32_t precision;
bool convertUcs4;
int32_t payloadLen; int32_t payloadLen;
} SReqResultInfo; } SReqResultInfo;
...@@ -222,7 +223,7 @@ typedef struct SSyncQueryParam { ...@@ -222,7 +223,7 @@ typedef struct SSyncQueryParam {
SRequestObj* pRequest; SRequestObj* pRequest;
} SSyncQueryParam; } SSyncQueryParam;
void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4); void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4); void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
void doSetOneRowPtr(SReqResultInfo* pResultInfo); void doSetOneRowPtr(SReqResultInfo* pResultInfo);
......
...@@ -191,6 +191,8 @@ void *createRequest(STscObj *pObj, int32_t type) { ...@@ -191,6 +191,8 @@ void *createRequest(STscObj *pObj, int32_t type) {
pRequest->requestId = generateRequestId(); pRequest->requestId = generateRequestId();
pRequest->metric.start = taosGetTimestampUs(); pRequest->metric.start = taosGetTimestampUs();
pRequest->body.resInfo.convertUcs4 = true; // convert ucs4 by default
pRequest->type = type; pRequest->type = type;
pRequest->pTscObj = pObj; pRequest->pTscObj = pObj;
pRequest->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE); pRequest->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
......
...@@ -1114,7 +1114,7 @@ static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) { ...@@ -1114,7 +1114,7 @@ static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) {
tsem_post(&pParam->sem); tsem_post(&pParam->sem);
} }
void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) { void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
assert(pRequest != NULL); assert(pRequest != NULL);
SReqResultInfo* pResultInfo = &pRequest->body.resInfo; SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
...@@ -1126,6 +1126,10 @@ void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUc ...@@ -1126,6 +1126,10 @@ void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUc
} }
SSyncQueryParam* pParam = pRequest->body.param; SSyncQueryParam* pParam = pRequest->body.param;
// convert ucs4 to native multi-bytes string
pResultInfo->convertUcs4 = convertUcs4;
taos_fetch_rows_a(pRequest, syncFetchFn, pParam); taos_fetch_rows_a(pRequest, syncFetchFn, pParam);
tsem_wait(&pParam->sem); tsem_wait(&pParam->sem);
} }
......
...@@ -219,14 +219,13 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { ...@@ -219,14 +219,13 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
if (TD_RES_QUERY(res)) { if (TD_RES_QUERY(res)) {
SRequestObj *pRequest = (SRequestObj *)res; SRequestObj *pRequest = (SRequestObj *)res;
#if SYNC_ON_TOP_OF_ASYNC
return doAsyncFetchRows(pRequest, true, true);
#else
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT || if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT ||
pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) { pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) {
return NULL; return NULL;
} }
#if SYNC_ON_TOP_OF_ASYNC
return doAsyncFetchRow(pRequest, true, true);
#else
return doFetchRows(pRequest, true, true); return doFetchRows(pRequest, true, true);
#endif #endif
...@@ -489,6 +488,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) { ...@@ -489,6 +488,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
if (res == NULL) { if (res == NULL) {
return 0; return 0;
} }
if (TD_RES_QUERY(res)) { if (TD_RES_QUERY(res)) {
SRequestObj *pRequest = (SRequestObj *)res; SRequestObj *pRequest = (SRequestObj *)res;
...@@ -501,7 +501,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) { ...@@ -501,7 +501,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
} }
#if SYNC_ON_TOP_OF_ASYNC #if SYNC_ON_TOP_OF_ASYNC
doAsyncFetchRow(pRequest, false, true); doAsyncFetchRows(pRequest, false, true);
#else #else
doFetchRows(pRequest, true, true); doFetchRows(pRequest, true, true);
#endif #endif
...@@ -552,7 +552,11 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) { ...@@ -552,7 +552,11 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
return 0; return 0;
} }
#if SYNC_ON_TOP_OF_ASYNC
doAsyncFetchRows(pRequest, false, false);
#else
doFetchRows(pRequest, false, false); doFetchRows(pRequest, false, false);
#endif
SReqResultInfo *pResultInfo = &pRequest->body.resInfo; SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
...@@ -771,11 +775,11 @@ static void fetchCallback(void* pResult, void* param, int32_t code) { ...@@ -771,11 +775,11 @@ static void fetchCallback(void* pResult, void* param, int32_t code) {
} }
if (pRequest->code != TSDB_CODE_SUCCESS) { if (pRequest->code != TSDB_CODE_SUCCESS) {
pRequest->code = code;
pRequest->body.fetchFp(pRequest->body.param, pRequest, 0); pRequest->body.fetchFp(pRequest->body.param, pRequest, 0);
return;
} }
pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResultInfo->pData, true, false); pRequest->code = setQueryResultFromRsp(pResultInfo, (SRetrieveTableRsp*)pResultInfo->pData, pResultInfo->convertUcs4, false);
if (pRequest->code != TSDB_CODE_SUCCESS) { if (pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0; pResultInfo->numOfRows = 0;
pRequest->code = code; pRequest->code = code;
...@@ -815,6 +819,13 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { ...@@ -815,6 +819,13 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
schedulerAsyncFetchRows(pRequest->body.queryJob, fetchCallback, pRequest); schedulerAsyncFetchRows(pRequest->body.queryJob, fetchCallback, pRequest);
} }
void taos_fetch_raw_block_a(TAOS_RES* res, __taos_async_fn_t fp, void* param) {
ASSERT(res != NULL && fp != NULL);
SRequestObj *pRequest = res;
pRequest->body.resInfo.convertUcs4 = false;
taos_fetch_rows_a(res, fp, param);
}
TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char *topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char *topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp,
void *param, int interval) { void *param, int interval) {
// TODO // TODO
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册