提交 66791ff3 编写于 作者: S slguan

Fix the bug when the stream calculation error occurs

上级 76ac9464
......@@ -423,7 +423,7 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
return ahandle;
}
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport);
static SSqlObj* tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj* pOld);
static int tscLaunchMetricSubQueries(SSqlObj *pSql);
int tscProcessSql(SSqlObj *pSql) {
......@@ -595,8 +595,9 @@ int tscLaunchMetricSubQueries(SSqlObj *pSql) {
pthread_mutex_init(&trs->queryMutex, &mutexattr);
pthread_mutexattr_destroy(&mutexattr);
SSqlObj *pNew = tscCreateSqlObjForSubquery(pSql, trs);
SSqlObj *pNew = tscCreateSqlObjForSubquery(pSql, trs, NULL);
tscTrace("%p sub:%p launch subquery.orderOfSub:%d", pSql, pNew, pNew->cmd.vnodeIdx);
tscProcessSql(pNew);
}
......@@ -656,6 +657,7 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq
tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d",
pPObj, pSql, idx, *trsupport->code);
} else {
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && *(trsupport->code) == TSDB_CODE_SUCCESS) {
/*
* current query failed, and the retry count is less than the available count,
......@@ -665,12 +667,11 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq
// clear local saved number of results
trsupport->localBuffer->numOfElems = 0;
pthread_mutex_unlock(&trsupport->queryMutex);
SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport);
SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql);
tscTrace("%p sub:%p retrieve failed, code:%d, orderOfSub:%d, retry:%d, new SqlObj:%p",
trsupport->pParentSqlObj, pSql, numOfRows, idx, trsupport->numOfRetry, pNew);
trsupport->pParentSqlObj, pSql, numOfRows, idx, trsupport->numOfRetry, pNew);
tscProcessSql(pNew);
return;
......@@ -680,6 +681,7 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq
tscError("%p sub:%p retrieve failed,code:%d,orderOfSub:%d failed.no more retry,set global code:%d",
pPObj, pSql, numOfRows, idx, *trsupport->code);
}
}
if (__sync_add_and_fetch_32(trsupport->numOfFinished, 1) < trsupport->numOfVnodes) {
......@@ -867,10 +869,15 @@ void tscKillMetricQuery(SSqlObj *pSql) {
tscTrace("%p metric query is cancelled", pSql);
}
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport) {
static SSqlObj* tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport,
SSqlObj* prevSqlObj) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlObj *pNew = (SSqlObj *)calloc(1, sizeof(SSqlObj));
if (pNew == NULL) {
return NULL;
}
pSql->pSubs[trsupport->vnodeIdx - 1] = pNew;
pNew->pTscObj = pSql->pTscObj;
......@@ -894,14 +901,22 @@ static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsu
pNew->param = trsupport;
pNew->cmd.vnodeIdx = trsupport->vnodeIdx;
char key[TSDB_MAX_TAGS_LEN + 1] = {0};
tscGetMetricMetaCacheKey(&pNew->cmd, key);
pNew->cmd.pMetricMeta = taosGetDataFromCache(tscCacheHandle, key);
pNew->cmd.pMeterMeta = taosGetDataFromCache(tscCacheHandle, pCmd->name);
if (prevSqlObj == NULL) {
char key[TSDB_MAX_TAGS_LEN + 1] = {0};
tscGetMetricMetaCacheKey(&pNew->cmd, key);
pNew->cmd.pMetricMeta = taosGetDataFromCache(tscCacheHandle, key);
pNew->cmd.pMeterMeta = taosGetDataFromCache(tscCacheHandle, pCmd->name);
} else {
pNew->cmd.pMeterMeta = prevSqlObj->cmd.pMeterMeta;
pNew->cmd.pMetricMeta = prevSqlObj->cmd.pMetricMeta;
assert(pNew->cmd.pMeterMeta != NULL && pNew->cmd.pMetricMeta != NULL);
prevSqlObj->cmd.pMetricMeta = NULL;
prevSqlObj->cmd.pMeterMeta = NULL;
}
assert(pNew->cmd.pMeterMeta != NULL && pNew->cmd.pMetricMeta != NULL);
return pNew;
}
void tscRetrieveDataRes(void *param, TAOS_RES *tres, int retCode) {
......@@ -938,7 +953,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int retCode) {
tscTrace("%p sub:%p reach the max retry count,set global code:%d", trsupport->pParentSqlObj, pSql, retCode);
__sync_val_compare_and_swap_32(trsupport->code, 0, retCode);
} else { // does not reach the maximum retry count, go on
SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport);
SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql);
tscTrace("%p sub:%p failed code:%d, retry:%d, new SqlObj:%p", trsupport->pParentSqlObj, pSql, retCode,
trsupport->numOfRetry, pNew);
......@@ -2731,10 +2746,7 @@ static int32_t tscDoGetMeterMeta(SSqlObj *pSql, char *meterId) {
pNew->fp = tscMeterMetaCallBack;
pNew->param = pSql;
int32_t len = strlen(pSql->sqlstr);
pNew->sqlstr = malloc(len + 1);
strcpy(pNew->sqlstr, pSql->sqlstr);
pNew->sqlstr[len] = 0;
pNew->sqlstr = strdup(pSql->sqlstr);
code = tscProcessSql(pNew);
if (code == TSDB_CODE_SUCCESS) {
......
......@@ -196,8 +196,8 @@ typename(A) ::= ids(X) LP signed(Y) RP. {
tSQLSetColumnType(&A, &X);
}
%type signed {int}
signed(A) ::= INTEGER(X). { A = atoi(X.z); }
%type signed {int64_t}
signed(A) ::= INTEGER(X). { A = strtol(X.z, NULL, 10); }
signed(A) ::= PLUS INTEGER(X). { A = strtol(X.z, NULL, 10); }
signed(A) ::= MINUS INTEGER(X). { A = -strtol(X.z, NULL, 10);}
......@@ -303,11 +303,6 @@ selcollist(A) ::= sclp(P) STAR. {
A = tSQLExprListAppend(P, pNode, 0);
}
selcollist(A) ::= sclp(P) ID(X) DOT STAR. {
tSQLExpr *pNode = tSQLExprIdValueCreate(NULL, TK_ALL);
A = tSQLExprListAppend(P, pNode, 0);
}
// An option "AS <id>" phrase that can follow one of the expressions that
// define the result set, or one of the tables in the FROM clause.
//
......
......@@ -593,10 +593,8 @@ void *vnodeQueryInTimeRange(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyExp
pQuery = &(pQInfo->query);
dTrace("qmsg:%p create QInfo:%p, QInfo created", pQueryMsg, pQInfo);
pQuery->order.order = pQueryMsg->order;
pQuery->skey = pQueryMsg->skey;
pQuery->ekey = pQueryMsg->ekey;
pQuery->lastKey = pQuery->skey;
pQInfo->fp = pQueryFunc[pQueryMsg->order];
......@@ -680,7 +678,6 @@ void *vnodeQueryOnMultiMeters(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE
pQuery = &(pQInfo->query);
dTrace("qmsg:%p create QInfo:%p, QInfo created", pQueryMsg, pQInfo);
pQuery->order.order = pQueryMsg->order;
pQuery->skey = pQueryMsg->skey;
pQuery->ekey = pQueryMsg->ekey;
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册