提交 8303603a 编写于 作者: Y yihaoDeng

Merge branch 'develop' of https://github.com/taosdata/TDengine into develop

...@@ -273,7 +273,8 @@ void tscQueueAsyncError(void(*fp), void *param, int32_t code) { ...@@ -273,7 +273,8 @@ void tscQueueAsyncError(void(*fp), void *param, int32_t code) {
taosScheduleTask(tscQhandle, &schedMsg); taosScheduleTask(tscQhandle, &schedMsg);
} }
void tscAsyncResultOnError(SSqlObj *pSql) { static void tscAsyncResultCallback(SSchedMsg *pMsg) {
SSqlObj* pSql = pMsg->ahandle;
if (pSql == NULL || pSql->signature != pSql) { if (pSql == NULL || pSql->signature != pSql) {
tscDebug("%p SqlObj is freed, not add into queue async res", pSql); tscDebug("%p SqlObj is freed, not add into queue async res", pSql);
return; return;
...@@ -291,6 +292,16 @@ void tscAsyncResultOnError(SSqlObj *pSql) { ...@@ -291,6 +292,16 @@ void tscAsyncResultOnError(SSqlObj *pSql) {
(*pSql->fp)(pSql->param, pSql, pRes->code); (*pSql->fp)(pSql->param, pSql, pRes->code);
} }
void tscAsyncResultOnError(SSqlObj* pSql) {
SSchedMsg schedMsg = {0};
schedMsg.fp = tscAsyncResultCallback;
schedMsg.ahandle = pSql;
schedMsg.thandle = (void *)1;
schedMsg.msg = 0;
taosScheduleTask(tscQhandle, &schedMsg);
}
int tscSendMsgToServer(SSqlObj *pSql); int tscSendMsgToServer(SSqlObj *pSql);
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
......
...@@ -609,7 +609,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql, int32_t clauseIndex) { ...@@ -609,7 +609,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlObj *pSql, int32_t clauseIndex) {
} }
return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + tsBufSize + return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize + tsBufSize +
tableSerialize + sqlLen + 4096; tableSerialize + sqlLen + 4096 + pQueryInfo->bufLen;
} }
static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) { static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char *pMsg) {
......
...@@ -95,11 +95,21 @@ static bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) { ...@@ -95,11 +95,21 @@ static bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) {
pthread_mutex_lock(&subState->mutex); pthread_mutex_lock(&subState->mutex);
bool done = allSubqueryDone(pParentSql);
if (done) {
tscDebug("%p subquery:%p,%d all subs already done", pParentSql, pSql, idx);
pthread_mutex_unlock(&subState->mutex);
return false;
}
tscDebug("%p subquery:%p,%d state set to 1", pParentSql, pSql, idx); tscDebug("%p subquery:%p,%d state set to 1", pParentSql, pSql, idx);
subState->states[idx] = 1; subState->states[idx] = 1;
bool done = allSubqueryDone(pParentSql); done = allSubqueryDone(pParentSql);
pthread_mutex_unlock(&subState->mutex); pthread_mutex_unlock(&subState->mutex);
...@@ -2245,7 +2255,9 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES ...@@ -2245,7 +2255,9 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES
* current query failed, and the retry count is less than the available * current query failed, and the retry count is less than the available
* count, retry query clear previous retrieved data, then launch a new sub query * count, retry query clear previous retrieved data, then launch a new sub query
*/ */
static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32_t code) { static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32_t code, int32_t *sent) {
*sent = 0;
SRetrieveSupport *trsupport = malloc(sizeof(SRetrieveSupport)); SRetrieveSupport *trsupport = malloc(sizeof(SRetrieveSupport));
if (trsupport == NULL) { if (trsupport == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -2277,21 +2289,28 @@ static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32 ...@@ -2277,21 +2289,28 @@ static int32_t tscReissueSubquery(SRetrieveSupport *oriTrs, SSqlObj *pSql, int32
SSqlObj *pNew = tscCreateSTableSubquery(trsupport->pParentSql, trsupport, pSql); SSqlObj *pNew = tscCreateSTableSubquery(trsupport->pParentSql, trsupport, pSql);
if (pNew == NULL) { if (pNew == NULL) {
tscError("%p sub:%p failed to create new subquery due to error:%s, abort retry, vgId:%d, orderOfSub:%d", tscError("%p sub:%p failed to create new subquery due to error:%s, abort retry, vgId:%d, orderOfSub:%d",
trsupport->pParentSql, pSql, tstrerror(terrno), pVgroup->vgId, trsupport->subqueryIndex); oriTrs->pParentSql, pSql, tstrerror(terrno), pVgroup->vgId, oriTrs->subqueryIndex);
pParentSql->res.code = terrno; pParentSql->res.code = terrno;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; oriTrs->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
tfree(trsupport);
return pParentSql->res.code; return pParentSql->res.code;
} }
int32_t ret = tscProcessSql(pNew); int32_t ret = tscProcessSql(pNew);
*sent = 1;
// if failed to process sql, let following code handle the pSql // if failed to process sql, let following code handle the pSql
if (ret == TSDB_CODE_SUCCESS) { if (ret == TSDB_CODE_SUCCESS) {
tscFreeRetrieveSup(pSql);
taos_free_result(pSql); taos_free_result(pSql);
return ret; return ret;
} else { } else {
pSql->pSubs[trsupport->subqueryIndex] = pSql;
tscFreeRetrieveSup(pNew);
taos_free_result(pNew);
return ret; return ret;
} }
} }
...@@ -2328,7 +2347,10 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO ...@@ -2328,7 +2347,10 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
subqueryIndex, tstrerror(pParentSql->res.code)); subqueryIndex, tstrerror(pParentSql->res.code));
} else { } else {
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pParentSql->res.code == TSDB_CODE_SUCCESS) { if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pParentSql->res.code == TSDB_CODE_SUCCESS) {
if (tscReissueSubquery(trsupport, pSql, numOfRows) == TSDB_CODE_SUCCESS) { int32_t sent = 0;
tscReissueSubquery(trsupport, pSql, numOfRows, &sent);
if (sent) {
return; return;
} }
} else { // reach the maximum retry count, abort } else { // reach the maximum retry count, abort
...@@ -2482,7 +2504,10 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR ...@@ -2482,7 +2504,10 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) { if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) {
tscError("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(numOfRows), trsupport->numOfRetry); tscError("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(numOfRows), trsupport->numOfRetry);
if (tscReissueSubquery(trsupport, pSql, numOfRows) == TSDB_CODE_SUCCESS) { int32_t sent = 0;
tscReissueSubquery(trsupport, pSql, numOfRows, &sent);
if (sent) {
return; return;
} }
} else { } else {
...@@ -2604,7 +2629,11 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { ...@@ -2604,7 +2629,11 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) { if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) {
tscError("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(code), trsupport->numOfRetry); tscError("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(code), trsupport->numOfRetry);
if (tscReissueSubquery(trsupport, pSql, code) == TSDB_CODE_SUCCESS) {
int32_t sent = 0;
tscReissueSubquery(trsupport, pSql, code, &sent);
if (sent) {
return; return;
} }
} else { } else {
......
...@@ -1569,7 +1569,9 @@ static void stddev_dst_function(SQLFunctionCtx *pCtx) { ...@@ -1569,7 +1569,9 @@ static void stddev_dst_function(SQLFunctionCtx *pCtx) {
avg = p->avg; avg = p->avg;
} else { // todo opt performance by using iterator since the timestamp lsit is matched with the output result } else { // todo opt performance by using iterator since the timestamp lsit is matched with the output result
SResPair* p = bsearch(&pCtx->startTs, resList->pData, len, sizeof(SResPair), tsCompare); SResPair* p = bsearch(&pCtx->startTs, resList->pData, len, sizeof(SResPair), tsCompare);
assert(p != NULL); if (p == NULL) {
return;
}
avg = p->avg; avg = p->avg;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册