提交 f569bb4b 编写于 作者: B Bomin Zhang

td-909: fix subscription bugs

上级 3fb51717
...@@ -34,6 +34,7 @@ typedef struct SSubscriptionProgress { ...@@ -34,6 +34,7 @@ typedef struct SSubscriptionProgress {
typedef struct SSub { typedef struct SSub {
void * signature; void * signature;
char topic[32]; char topic[32];
sem_t sem;
int64_t lastSyncTime; int64_t lastSyncTime;
int64_t lastConsumeTime; int64_t lastConsumeTime;
TAOS * taos; TAOS * taos;
...@@ -83,84 +84,107 @@ void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) { ...@@ -83,84 +84,107 @@ void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) {
static void asyncCallback(void *param, TAOS_RES *tres, int code) { static void asyncCallback(void *param, TAOS_RES *tres, int code) {
assert(param != NULL); assert(param != NULL);
SSqlObj *pSql = ((SSqlObj *)param); SSub *pSub = ((SSub *)param);
pSub->pSql->res.code = code;
pSql->res.code = code; sem_post(&pSub->sem);
sem_post(&pSql->rspSem);
} }
static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* sql) { static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* sql) {
SSub* pSub = NULL; int code = TSDB_CODE_SUCCESS, line = __LINE__;
TRY( 8 ) {
SSqlObj* pSql = calloc_throw(1, sizeof(SSqlObj));
CLEANUP_PUSH_FREE(true, pSql);
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
if (tsem_init(&pSql->rspSem, 0, 0) == -1) {
THROW(TAOS_SYSTEM_ERROR(errno));
}
CLEANUP_PUSH_INT_PTR(true, tsem_destroy, &pSql->rspSem);
pSql->signature = pSql; SSub* pSub = calloc(1, sizeof(SSub));
pSql->param = pSql; if (pSub == NULL) {
pSql->pTscObj = pObj; line = __LINE__;
pSql->maxRetry = TSDB_MAX_REPLICA; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
pSql->fp = asyncCallback; goto fail;
}
int code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE); pSub->signature = pSub;
if (code != TSDB_CODE_SUCCESS) { if (tsem_init(&pSub->sem, 0, 0) == -1) {
THROW(code); line = __LINE__;
} code = TAOS_SYSTEM_ERROR(errno);
CLEANUP_PUSH_FREE(true, pCmd->payload); goto fail;
}
tstrncpy(pSub->topic, topic, sizeof(pSub->topic));
pSub->progress = taosArrayInit(32, sizeof(SSubscriptionProgress));
if (pSub->progress == NULL) {
line = __LINE__;
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto fail;
}
pRes->qhandle = 0; SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
pRes->numOfRows = 1; if (pSql == NULL) {
line = __LINE__;
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto fail;
}
pSql->signature = pSql;
pSql->pTscObj = pObj;
pSql->pSubscription = pSub;
pSub->pSql = pSql;
pSql->sqlstr = strdup_throw(sql); SSqlCmd* pCmd = &pSql->cmd;
CLEANUP_PUSH_FREE(true, pSql->sqlstr); SSqlRes* pRes = &pSql->res;
strtolower(pSql->sqlstr, pSql->sqlstr); if (tsem_init(&pSql->rspSem, 0, 0) == -1) {
line = __LINE__;
code = TAOS_SYSTEM_ERROR(errno);
goto fail;
}
code = tsParseSql(pSql, false); pSql->param = pSub;
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { pSql->maxRetry = TSDB_MAX_REPLICA;
// wait for the callback function to post the semaphore pSql->fp = asyncCallback;
sem_wait(&pSql->rspSem); pSql->fetchFp = asyncCallback;
code = pSql->res.code; pSql->sqlstr = strdup(sql);
} if (pSql->sqlstr == NULL) {
if (code != TSDB_CODE_SUCCESS) { line = __LINE__;
tscError("failed to parse sql statement: %s, error: %s", pSub->topic, tstrerror(code)); code = TSDB_CODE_TSC_OUT_OF_MEMORY;
THROW( code ); goto fail;
} }
strtolower(pSql->sqlstr, pSql->sqlstr);
pRes->qhandle = 0;
pRes->numOfRows = 1;
code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
if (code != TSDB_CODE_SUCCESS) {
line = __LINE__;
goto fail;
}
if (pSql->cmd.command != TSDB_SQL_SELECT) { code = tsParseSql(pSql, false);
tscError("only 'select' statement is allowed in subscription: %s", pSub->topic); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
THROW( -1 ); // TODO sem_wait(&pSub->sem);
} code = pSql->res.code;
}
if (code != TSDB_CODE_SUCCESS) {
line = __LINE__;
goto fail;
}
pSub = calloc_throw(1, sizeof(SSub)); if (pSql->cmd.command != TSDB_SQL_SELECT) {
CLEANUP_PUSH_FREE(true, pSub); line = __LINE__;
pSql->pSubscription = pSub; code = TSDB_CODE_TSC_INVALID_SQL;
pSub->pSql = pSql; goto fail;
pSub->signature = pSub; }
strncpy(pSub->topic, topic, sizeof(pSub->topic));
pSub->topic[sizeof(pSub->topic) - 1] = 0;
pSub->progress = taosArrayInit(32, sizeof(SSubscriptionProgress));
if (pSub->progress == NULL) {
THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
}
CLEANUP_EXECUTE(); return pSub;
} CATCH( code ) { fail:
tscError("failed to create subscription object: %s", tstrerror(code)); tscError("tscCreateSubscription failed at line %d, reason: %s", line, tstrerror(code));
CLEANUP_EXECUTE(); if (pSql != NULL) {
tscFreeSqlObj(pSql);
pSql = NULL;
}
if (pSub != NULL) {
taosArrayDestroy(pSub->progress);
tsem_destroy(&pSub->sem);
free(pSub);
pSub = NULL; pSub = NULL;
}
} END_TRY terrno = code;
return NULL;
return pSub;
} }
...@@ -405,9 +429,10 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { ...@@ -405,9 +429,10 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->vgroupIndex = 0; tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->vgroupIndex = 0;
pSql->fp = asyncCallback; pSql->fp = asyncCallback;
pSql->param = pSql; pSql->fetchFp = asyncCallback;
pSql->param = pSub;
tscDoQuery(pSql); tscDoQuery(pSql);
sem_wait(&pSql->rspSem); sem_wait(&pSub->sem);
if (pRes->code != TSDB_CODE_SUCCESS) { if (pRes->code != TSDB_CODE_SUCCESS) {
continue; continue;
...@@ -437,7 +462,9 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) { ...@@ -437,7 +462,9 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) {
} }
if (keepProgress) { if (keepProgress) {
tscSaveSubscriptionProgress(pSub); if (pSub->progress != NULL) {
tscSaveSubscriptionProgress(pSub);
}
} else { } else {
char path[256]; char path[256];
sprintf(path, "%s/subscribe/%s", tsDataDir, pSub->topic); sprintf(path, "%s/subscribe/%s", tsDataDir, pSub->topic);
...@@ -448,6 +475,7 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) { ...@@ -448,6 +475,7 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) {
tscFreeSqlObj(pSub->pSql); tscFreeSqlObj(pSub->pSql);
taosArrayDestroy(pSub->progress); taosArrayDestroy(pSub->progress);
tsem_destroy(&pSub->sem);
memset(pSub, 0, sizeof(*pSub)); memset(pSub, 0, sizeof(*pSub));
free(pSub); free(pSub);
} }
...@@ -4918,7 +4918,7 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) ...@@ -4918,7 +4918,7 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
pQuery->current->lastKey, pQuery->window.ekey); pQuery->current->lastKey, pQuery->window.ekey);
} else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { } else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
STableIdInfo tidInfo; STableIdInfo tidInfo;
STableId* id = TSDB_TABLEID(pQuery->current); STableId* id = TSDB_TABLEID(pQuery->current->pTable);
tidInfo.uid = id->uid; tidInfo.uid = id->uid;
tidInfo.tid = id->tid; tidInfo.tid = id->tid;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册