diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 043e78a9f3c85fa57f776c2212415e12565e3685..52b74f7502db139f18d8230eefb36d41bae4079f 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -157,7 +157,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* registerSqlObj(pSql); - code = tsParseSql(pSql, false); + code = tsParseSql(pSql, true); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { tsem_wait(&pSub->sem); code = pSql->res.code; @@ -168,7 +168,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* goto fail; } - if (pSql->cmd.command != TSDB_SQL_SELECT) { + if (pSql->cmd.command != TSDB_SQL_SELECT && pSql->cmd.command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) { line = __LINE__; code = TSDB_CODE_TSC_INVALID_SQL; goto fail; @@ -182,7 +182,7 @@ fail: if (pSql->self != 0) { taosReleaseRef(tscObjRef, pSql->self); } else { - tscFreeSqlObj(pSql); + tscFreeSqlObj(pSql); } pSql = NULL; @@ -401,9 +401,11 @@ TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char* topic, const char tscLoadSubscriptionProgress(pSub); } - if (!tscUpdateSubscription(pObj, pSub)) { - taos_unsubscribe(pSub, 1); - return NULL; + if (pSub->pSql->cmd.command == TSDB_SQL_SELECT) { + if (!tscUpdateSubscription(pObj, pSub)) { + taos_unsubscribe(pSub, 1); + return NULL; + } } pSub->interval = interval; @@ -417,10 +419,80 @@ TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char* topic, const char return pSub; } +SSqlObj* recreateSqlObj(SSub* pSub) { + SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); + if (pSql == NULL) { + return NULL; + } + + pSql->signature = pSql; + pSql->pTscObj = pSub->taos; + + SSqlCmd* pCmd = &pSql->cmd; + SSqlRes* pRes = &pSql->res; + if (tsem_init(&pSql->rspSem, 0, 0) == -1) { + tscFreeSqlObj(pSql); + return NULL; + } + + pSql->param = pSub; + pSql->maxRetry = TSDB_MAX_REPLICA; + pSql->fp = asyncCallback; + pSql->fetchFp = asyncCallback; + pSql->sqlstr = strdup(pSub->pSql->sqlstr); + if (pSql->sqlstr == NULL) { + tscFreeSqlObj(pSql); + return NULL; + } + + pRes->qhandle = 0; + pRes->numOfRows = 1; + + int code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE); + if (code != TSDB_CODE_SUCCESS) { + tscFreeSqlObj(pSql); + return NULL; + } + + registerSqlObj(pSql); + + code = tsParseSql(pSql, true); + if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + tsem_wait(&pSub->sem); + code = pSql->res.code; + } + + if (code != TSDB_CODE_SUCCESS) { + taosReleaseRef(tscObjRef, pSql->self); + return NULL; + } + + if (pSql->cmd.command != TSDB_SQL_SELECT) { + taosReleaseRef(tscObjRef, pSql->self); + return NULL; + } + + return pSql; +} + TAOS_RES *taos_consume(TAOS_SUB *tsub) { SSub *pSub = (SSub *)tsub; if (pSub == NULL) return NULL; + if (pSub->pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) { + SSqlObj* pSql = recreateSqlObj(pSub); + if (pSql == NULL) { + return NULL; + } + if (pSub->pSql->self != 0) { + taosReleaseRef(tscObjRef, pSub->pSql->self); + } else { + tscFreeSqlObj(pSub->pSql); + } + pSub->pSql = pSql; + pSql->pSubscription = pSub; + } + tscSaveSubscriptionProgress(pSub); SSqlObj *pSql = pSub->pSql; @@ -512,10 +584,13 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) { } if (pSub->pSql != NULL) { - taos_free_result(pSub->pSql); + if (pSub->pSql->self != 0) { + taosReleaseRef(tscObjRef, pSub->pSql->self); + } else { + tscFreeSqlObj(pSub->pSql); + } } - tscFreeSqlObj(pSub->pSql); taosArrayDestroy(pSub->progress); tsem_destroy(&pSub->sem); memset(pSub, 0, sizeof(*pSub));