From f569bb4babb1341c4128e24cfe2c20005cca011e Mon Sep 17 00:00:00 2001 From: Bomin Zhang Date: Fri, 17 Jul 2020 14:27:02 +0800 Subject: [PATCH] td-909: fix subscription bugs --- src/client/src/tscSub.c | 162 ++++++++++++++++++++++---------------- src/query/src/qExecutor.c | 2 +- 2 files changed, 96 insertions(+), 68 deletions(-) diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 2c5035c2ef..73614fa0a5 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -34,6 +34,7 @@ typedef struct SSubscriptionProgress { typedef struct SSub { void * signature; char topic[32]; + sem_t sem; int64_t lastSyncTime; int64_t lastConsumeTime; TAOS * taos; @@ -83,84 +84,107 @@ void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) { static void asyncCallback(void *param, TAOS_RES *tres, int code) { assert(param != NULL); - SSqlObj *pSql = ((SSqlObj *)param); - - pSql->res.code = code; - sem_post(&pSql->rspSem); + SSub *pSub = ((SSub *)param); + pSub->pSql->res.code = code; + sem_post(&pSub->sem); } static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* sql) { - SSub* pSub = NULL; - - TRY( 8 ) { - SSqlObj* pSql = calloc_throw(1, sizeof(SSqlObj)); - CLEANUP_PUSH_FREE(true, pSql); - SSqlCmd *pCmd = &pSql->cmd; - SSqlRes *pRes = &pSql->res; - - if (tsem_init(&pSql->rspSem, 0, 0) == -1) { - THROW(TAOS_SYSTEM_ERROR(errno)); - } - CLEANUP_PUSH_INT_PTR(true, tsem_destroy, &pSql->rspSem); + int code = TSDB_CODE_SUCCESS, line = __LINE__; - pSql->signature = pSql; - pSql->param = pSql; - pSql->pTscObj = pObj; - pSql->maxRetry = TSDB_MAX_REPLICA; - pSql->fp = asyncCallback; - - int code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE); - if (code != TSDB_CODE_SUCCESS) { - THROW(code); - } - CLEANUP_PUSH_FREE(true, pCmd->payload); + SSub* pSub = calloc(1, sizeof(SSub)); + if (pSub == NULL) { + line = __LINE__; + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto fail; + } + pSub->signature = pSub; + if (tsem_init(&pSub->sem, 0, 0) == -1) { + line = __LINE__; + code = TAOS_SYSTEM_ERROR(errno); + goto fail; + } + tstrncpy(pSub->topic, topic, sizeof(pSub->topic)); + pSub->progress = taosArrayInit(32, sizeof(SSubscriptionProgress)); + if (pSub->progress == NULL) { + line = __LINE__; + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto fail; + } - pRes->qhandle = 0; - pRes->numOfRows = 1; + SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); + if (pSql == NULL) { + line = __LINE__; + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto fail; + } + pSql->signature = pSql; + pSql->pTscObj = pObj; + pSql->pSubscription = pSub; + pSub->pSql = pSql; - pSql->sqlstr = strdup_throw(sql); - CLEANUP_PUSH_FREE(true, pSql->sqlstr); - strtolower(pSql->sqlstr, pSql->sqlstr); + SSqlCmd* pCmd = &pSql->cmd; + SSqlRes* pRes = &pSql->res; + if (tsem_init(&pSql->rspSem, 0, 0) == -1) { + line = __LINE__; + code = TAOS_SYSTEM_ERROR(errno); + goto fail; + } - code = tsParseSql(pSql, false); - if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { - // wait for the callback function to post the semaphore - sem_wait(&pSql->rspSem); - code = pSql->res.code; - } - if (code != TSDB_CODE_SUCCESS) { - tscError("failed to parse sql statement: %s, error: %s", pSub->topic, tstrerror(code)); - THROW( code ); - } + pSql->param = pSub; + pSql->maxRetry = TSDB_MAX_REPLICA; + pSql->fp = asyncCallback; + pSql->fetchFp = asyncCallback; + pSql->sqlstr = strdup(sql); + if (pSql->sqlstr == NULL) { + line = __LINE__; + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto fail; + } + strtolower(pSql->sqlstr, pSql->sqlstr); + pRes->qhandle = 0; + pRes->numOfRows = 1; + + code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE); + if (code != TSDB_CODE_SUCCESS) { + line = __LINE__; + goto fail; + } - if (pSql->cmd.command != TSDB_SQL_SELECT) { - tscError("only 'select' statement is allowed in subscription: %s", pSub->topic); - THROW( -1 ); // TODO - } + code = tsParseSql(pSql, false); + if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + sem_wait(&pSub->sem); + code = pSql->res.code; + } + if (code != TSDB_CODE_SUCCESS) { + line = __LINE__; + goto fail; + } - pSub = calloc_throw(1, sizeof(SSub)); - CLEANUP_PUSH_FREE(true, pSub); - pSql->pSubscription = pSub; - pSub->pSql = pSql; - pSub->signature = pSub; - strncpy(pSub->topic, topic, sizeof(pSub->topic)); - pSub->topic[sizeof(pSub->topic) - 1] = 0; - pSub->progress = taosArrayInit(32, sizeof(SSubscriptionProgress)); - if (pSub->progress == NULL) { - THROW(TSDB_CODE_TSC_OUT_OF_MEMORY); - } + if (pSql->cmd.command != TSDB_SQL_SELECT) { + line = __LINE__; + code = TSDB_CODE_TSC_INVALID_SQL; + goto fail; + } - CLEANUP_EXECUTE(); + return pSub; - } CATCH( code ) { - tscError("failed to create subscription object: %s", tstrerror(code)); - CLEANUP_EXECUTE(); +fail: + tscError("tscCreateSubscription failed at line %d, reason: %s", line, tstrerror(code)); + if (pSql != NULL) { + tscFreeSqlObj(pSql); + pSql = NULL; + } + if (pSub != NULL) { + taosArrayDestroy(pSub->progress); + tsem_destroy(&pSub->sem); + free(pSub); pSub = NULL; + } - } END_TRY - - return pSub; + terrno = code; + return NULL; } @@ -405,9 +429,10 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->vgroupIndex = 0; pSql->fp = asyncCallback; - pSql->param = pSql; + pSql->fetchFp = asyncCallback; + pSql->param = pSub; tscDoQuery(pSql); - sem_wait(&pSql->rspSem); + sem_wait(&pSub->sem); if (pRes->code != TSDB_CODE_SUCCESS) { continue; @@ -437,7 +462,9 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) { } if (keepProgress) { - tscSaveSubscriptionProgress(pSub); + if (pSub->progress != NULL) { + tscSaveSubscriptionProgress(pSub); + } } else { char path[256]; sprintf(path, "%s/subscribe/%s", tsDataDir, pSub->topic); @@ -448,6 +475,7 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) { tscFreeSqlObj(pSub->pSql); taosArrayDestroy(pSub->progress); + tsem_destroy(&pSub->sem); memset(pSub, 0, sizeof(*pSub)); free(pSub); } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index cf13ace11f..3b10967e33 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -4918,7 +4918,7 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) pQuery->current->lastKey, pQuery->window.ekey); } else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { STableIdInfo tidInfo; - STableId* id = TSDB_TABLEID(pQuery->current); + STableId* id = TSDB_TABLEID(pQuery->current->pTable); tidInfo.uid = id->uid; tidInfo.tid = id->tid; -- GitLab