未验证 提交 b82c54f8 编写于 作者: H haojun Liao 提交者: GitHub

Merge pull request #4412 from taosdata/bugfix/td-2290

fix td-2290
...@@ -157,7 +157,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* ...@@ -157,7 +157,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
registerSqlObj(pSql); registerSqlObj(pSql);
code = tsParseSql(pSql, false); code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
tsem_wait(&pSub->sem); tsem_wait(&pSub->sem);
code = pSql->res.code; code = pSql->res.code;
...@@ -168,7 +168,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* ...@@ -168,7 +168,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
goto fail; goto fail;
} }
if (pSql->cmd.command != TSDB_SQL_SELECT) { if (pSql->cmd.command != TSDB_SQL_SELECT && pSql->cmd.command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
line = __LINE__; line = __LINE__;
code = TSDB_CODE_TSC_INVALID_SQL; code = TSDB_CODE_TSC_INVALID_SQL;
goto fail; goto fail;
...@@ -182,7 +182,7 @@ fail: ...@@ -182,7 +182,7 @@ fail:
if (pSql->self != 0) { if (pSql->self != 0) {
taosReleaseRef(tscObjRef, pSql->self); taosReleaseRef(tscObjRef, pSql->self);
} else { } else {
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
} }
pSql = NULL; pSql = NULL;
...@@ -401,9 +401,11 @@ TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char* topic, const char ...@@ -401,9 +401,11 @@ TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char* topic, const char
tscLoadSubscriptionProgress(pSub); tscLoadSubscriptionProgress(pSub);
} }
if (!tscUpdateSubscription(pObj, pSub)) { if (pSub->pSql->cmd.command == TSDB_SQL_SELECT) {
taos_unsubscribe(pSub, 1); if (!tscUpdateSubscription(pObj, pSub)) {
return NULL; taos_unsubscribe(pSub, 1);
return NULL;
}
} }
pSub->interval = interval; pSub->interval = interval;
...@@ -417,10 +419,80 @@ TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char* topic, const char ...@@ -417,10 +419,80 @@ TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char* topic, const char
return pSub; return pSub;
} }
SSqlObj* recreateSqlObj(SSub* pSub) {
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
if (pSql == NULL) {
return NULL;
}
pSql->signature = pSql;
pSql->pTscObj = pSub->taos;
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
if (tsem_init(&pSql->rspSem, 0, 0) == -1) {
tscFreeSqlObj(pSql);
return NULL;
}
pSql->param = pSub;
pSql->maxRetry = TSDB_MAX_REPLICA;
pSql->fp = asyncCallback;
pSql->fetchFp = asyncCallback;
pSql->sqlstr = strdup(pSub->pSql->sqlstr);
if (pSql->sqlstr == NULL) {
tscFreeSqlObj(pSql);
return NULL;
}
pRes->qhandle = 0;
pRes->numOfRows = 1;
int code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
if (code != TSDB_CODE_SUCCESS) {
tscFreeSqlObj(pSql);
return NULL;
}
registerSqlObj(pSql);
code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
tsem_wait(&pSub->sem);
code = pSql->res.code;
}
if (code != TSDB_CODE_SUCCESS) {
taosReleaseRef(tscObjRef, pSql->self);
return NULL;
}
if (pSql->cmd.command != TSDB_SQL_SELECT) {
taosReleaseRef(tscObjRef, pSql->self);
return NULL;
}
return pSql;
}
TAOS_RES *taos_consume(TAOS_SUB *tsub) { TAOS_RES *taos_consume(TAOS_SUB *tsub) {
SSub *pSub = (SSub *)tsub; SSub *pSub = (SSub *)tsub;
if (pSub == NULL) return NULL; if (pSub == NULL) return NULL;
if (pSub->pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
SSqlObj* pSql = recreateSqlObj(pSub);
if (pSql == NULL) {
return NULL;
}
if (pSub->pSql->self != 0) {
taosReleaseRef(tscObjRef, pSub->pSql->self);
} else {
tscFreeSqlObj(pSub->pSql);
}
pSub->pSql = pSql;
pSql->pSubscription = pSub;
}
tscSaveSubscriptionProgress(pSub); tscSaveSubscriptionProgress(pSub);
SSqlObj *pSql = pSub->pSql; SSqlObj *pSql = pSub->pSql;
...@@ -512,10 +584,13 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) { ...@@ -512,10 +584,13 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) {
} }
if (pSub->pSql != NULL) { if (pSub->pSql != NULL) {
taos_free_result(pSub->pSql); if (pSub->pSql->self != 0) {
taosReleaseRef(tscObjRef, pSub->pSql->self);
} else {
tscFreeSqlObj(pSub->pSql);
}
} }
tscFreeSqlObj(pSub->pSql);
taosArrayDestroy(pSub->progress); taosArrayDestroy(pSub->progress);
tsem_destroy(&pSub->sem); tsem_destroy(&pSub->sem);
memset(pSub, 0, sizeof(*pSub)); memset(pSub, 0, sizeof(*pSub));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册