提交 9c510c90 编写于 作者: L Liu Jicong

enh(tmq): retry ask ep

上级 ee9084e4
...@@ -93,12 +93,12 @@ typedef struct taosField { ...@@ -93,12 +93,12 @@ typedef struct taosField {
typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code); typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code);
typedef struct TAOS_BIND_v2 { typedef struct TAOS_BIND_v2 {
int buffer_type; int buffer_type;
void *buffer; void *buffer;
int32_t buffer_length; int32_t buffer_length;
int32_t *length; int32_t *length;
char *is_null; char *is_null;
int num; int num;
} TAOS_BIND_v2; } TAOS_BIND_v2;
typedef enum { typedef enum {
...@@ -128,35 +128,35 @@ DLL_EXPORT void taos_close(TAOS *taos); ...@@ -128,35 +128,35 @@ DLL_EXPORT void taos_close(TAOS *taos);
const char *taos_data_type(int type); const char *taos_data_type(int type);
DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos); DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos);
DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length); DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND_v2 *tags); DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND_v2 *tags);
DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name); DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name); DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert); DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums); DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes); DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes);
DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND_v2 *bind); DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND_v2 *bind);
DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_BIND_v2 *bind); DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_BIND_v2 *bind);
DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_BIND_v2 *bind, int colIdx); DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_BIND_v2 *bind, int colIdx);
DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt); DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt); DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt); DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt); DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt);
DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt); DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt); DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_affected_rows_once(TAOS_STMT *stmt); DLL_EXPORT int taos_stmt_affected_rows_once(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql); DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql);
DLL_EXPORT TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen); DLL_EXPORT TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen);
DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res); DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res);
DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result
DLL_EXPORT void taos_free_result(TAOS_RES *res); DLL_EXPORT void taos_free_result(TAOS_RES *res);
DLL_EXPORT int taos_field_count(TAOS_RES *res); DLL_EXPORT int taos_field_count(TAOS_RES *res);
DLL_EXPORT int taos_num_fields(TAOS_RES *res); DLL_EXPORT int taos_num_fields(TAOS_RES *res);
DLL_EXPORT int taos_affected_rows(TAOS_RES *res); DLL_EXPORT int taos_affected_rows(TAOS_RES *res);
DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res); DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res);
DLL_EXPORT int taos_select_db(TAOS *taos, const char *db); DLL_EXPORT int taos_select_db(TAOS *taos, const char *db);
...@@ -234,7 +234,7 @@ DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errst ...@@ -234,7 +234,7 @@ DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errst
DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t); DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */ /* ------------------------TMQ CONSUMER INTERFACE------------------------ */
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, tmq_list_t *topic_list); DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq); DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics); DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time); DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time);
......
...@@ -1333,7 +1333,7 @@ typedef struct { ...@@ -1333,7 +1333,7 @@ typedef struct {
typedef struct { typedef struct {
int64_t consumerId; int64_t consumerId;
char cgroup[TSDB_CGROUP_LEN]; char cgroup[TSDB_CGROUP_LEN];
SArray* topicNames; // SArray<char*> SArray* topicNames; // SArray<char**>
} SCMSubscribeReq; } SCMSubscribeReq;
static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) { static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) {
......
...@@ -145,7 +145,7 @@ enum { ...@@ -145,7 +145,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp) TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-mq-ask-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-tmr", SMTimerReq, SMTimerReq) TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-tmr", SMTimerReq, SMTimerReq)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_LOST, "mnode-mq-consumer-lost", SMTimerReq, SMTimerReq) TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_LOST, "mnode-mq-consumer-lost", SMTimerReq, SMTimerReq)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg) TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg)
......
...@@ -41,10 +41,6 @@ bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void *param, void *han ...@@ -41,10 +41,6 @@ bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void *param, void *han
void taosTmrCleanUp(void *handle); void taosTmrCleanUp(void *handle);
int32_t taosInitTimer(void (*callback)(int32_t), int32_t ms);
void taosUninitTimer();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -23,6 +23,9 @@ ...@@ -23,6 +23,9 @@
#include "tmsgtype.h" #include "tmsgtype.h"
#include "tqueue.h" #include "tqueue.h"
#include "tref.h" #include "tref.h"
#include "ttimer.h"
int32_t tmqAskEp(tmq_t* tmq, bool sync);
typedef struct { typedef struct {
int8_t tmqRspType; int8_t tmqRspType;
...@@ -61,29 +64,40 @@ struct tmq_conf_t { ...@@ -61,29 +64,40 @@ struct tmq_conf_t {
tmq_commit_cb* commit_cb; tmq_commit_cb* commit_cb;
}; };
typedef struct {
int8_t inited;
tmr_h timer;
} SMqMgmt;
static SMqMgmt tmqMgmt = {0};
struct tmq_t { struct tmq_t {
// conf // conf
char groupId[TSDB_CGROUP_LEN]; char groupId[TSDB_CGROUP_LEN];
char clientId[256]; char clientId[256];
int8_t autoCommit; int8_t autoCommit;
/*int8_t inWaiting;*/
int64_t consumerId; int64_t consumerId;
int32_t epoch;
int32_t resetOffsetCfg; int32_t resetOffsetCfg;
int64_t status;
STscObj* pTscObj;
tmq_commit_cb* commit_cb; tmq_commit_cb* commit_cb;
/*int32_t nextTopicIdx;*/
// status
int8_t status;
int8_t epStatus; int8_t epStatus;
int32_t epoch;
int32_t epSkipCnt; int32_t epSkipCnt;
/*int32_t waitingRequest;*/ int64_t pollCnt;
/*int32_t readyRequest;*/
// connection
STscObj* pTscObj;
// container
SArray* clientTopics; // SArray<SMqClientTopic> SArray* clientTopics; // SArray<SMqClientTopic>
STaosQueue* mqueue; // queue of tmq_message_t STaosQueue* mqueue; // queue of rsp
STaosQall* qall; STaosQall* qall;
tsem_t rspSem; STaosQueue* delayedTask; // delayed task queue for heartbeat and auto commit
// stat
int64_t pollCnt; // ctl
tsem_t rspSem;
}; };
enum { enum {
...@@ -93,6 +107,7 @@ enum { ...@@ -93,6 +107,7 @@ enum {
enum { enum {
TMQ_CONSUMER_STATUS__INIT = 0, TMQ_CONSUMER_STATUS__INIT = 0,
TMQ_CONSUMER_STATUS__SUBSCRIBED,
TMQ_CONSUMER_STATUS__READY, TMQ_CONSUMER_STATUS__READY,
}; };
...@@ -110,13 +125,11 @@ typedef struct { ...@@ -110,13 +125,11 @@ typedef struct {
typedef struct { typedef struct {
// subscribe info // subscribe info
int32_t sqlLen; char* topicName;
char* sql;
char* topicName; SArray* vgs; // SArray<SMqClientVg>
int64_t topicId;
SArray* vgs; // SArray<SMqClientVg>
int8_t isSchemaAdaptive; int8_t isSchemaAdaptive;
int32_t numOfFields;
SSchemaWrapper schema; SSchemaWrapper schema;
} SMqClientTopic; } SMqClientTopic;
...@@ -156,7 +169,6 @@ typedef struct { ...@@ -156,7 +169,6 @@ typedef struct {
int32_t async; int32_t async;
tsem_t rspSem; tsem_t rspSem;
tmq_resp_err_t rspErr; tmq_resp_err_t rspErr;
/*SMqClientVg* pVg;*/
} SMqCommitCbParam; } SMqCommitCbParam;
tmq_conf_t* tmq_conf_new() { tmq_conf_t* tmq_conf_new() {
...@@ -251,13 +263,7 @@ int32_t tmq_list_append(tmq_list_t* list, const char* src) { ...@@ -251,13 +263,7 @@ int32_t tmq_list_append(tmq_list_t* list, const char* src) {
void tmq_list_destroy(tmq_list_t* list) { void tmq_list_destroy(tmq_list_t* list) {
SArray* container = &list->container; SArray* container = &list->container;
/*taosArrayDestroy(container);*/ taosArrayDestroyP(container, taosMemoryFree);
int32_t sz = taosArrayGetSize(container);
for (int32_t i = 0; i < sz; i++) {
char* str = taosArrayGetP(container, i);
taosMemoryFree(str);
}
taosArrayDestroy(container);
} }
int32_t tmq_list_get_size(const tmq_list_t* list) { int32_t tmq_list_get_size(const tmq_list_t* list) {
...@@ -298,6 +304,8 @@ void tmqClearUnhandleMsg(tmq_t* tmq) { ...@@ -298,6 +304,8 @@ void tmqClearUnhandleMsg(tmq_t* tmq) {
int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) { int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param; SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
pParam->rspErr = code; pParam->rspErr = code;
tmq_t* tmq = pParam->tmq;
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__SUBSCRIBED);
tsem_post(&pParam->rspSem); tsem_post(&pParam->rspSem);
return 0; return 0;
} }
...@@ -335,12 +343,9 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs ...@@ -335,12 +343,9 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
return NULL; return NULL;
} }
pTmq->pTscObj = (STscObj*)conn; pTmq->pTscObj = (STscObj*)conn;
/*pTmq->inWaiting = 0;*/
pTmq->status = 0; pTmq->status = 0;
pTmq->pollCnt = 0; pTmq->pollCnt = 0;
pTmq->epoch = 0; pTmq->epoch = 0;
/*pTmq->waitingRequest = 0;*/
/*pTmq->readyRequest = 0;*/
pTmq->epStatus = 0; pTmq->epStatus = 0;
pTmq->epSkipCnt = 0; pTmq->epSkipCnt = 0;
// set conf // set conf
...@@ -367,26 +372,45 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs ...@@ -367,26 +372,45 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
#endif #endif
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
// init timer
int8_t inited = atomic_val_compare_exchange_8(&tmqMgmt.inited, 0, 1);
if (inited == 0) {
tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
if (tmqMgmt.timer == NULL) {
atomic_store_8(&tmqMgmt.inited, 0);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
}
tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t)); tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
if (pTmq == NULL) { if (pTmq == NULL) {
return NULL; return NULL;
} }
const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user; const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass; const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;
ASSERT(user); ASSERT(user);
ASSERT(pass); ASSERT(pass);
/*ASSERT(conf->db);*/
ASSERT(conf->groupId[0]); ASSERT(conf->groupId[0]);
pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, conf->db, conf->port, CONN_TYPE__TMQ); pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
if (pTmq->pTscObj == NULL) return NULL; pTmq->mqueue = taosOpenQueue();
pTmq->qall = taosAllocateQall();
pTmq->delayedTask = taosOpenQueue();
if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
goto FAIL;
}
pTmq->status = 0; // init status
pTmq->status = TMQ_CONSUMER_STATUS__INIT;
pTmq->pollCnt = 0; pTmq->pollCnt = 0;
pTmq->epoch = 0; pTmq->epoch = 0;
pTmq->epStatus = 0; pTmq->epStatus = 0;
pTmq->epSkipCnt = 0; pTmq->epSkipCnt = 0;
// set conf // set conf
strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->clientId, conf->clientId);
strcpy(pTmq->groupId, conf->groupId); strcpy(pTmq->groupId, conf->groupId);
...@@ -394,19 +418,30 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { ...@@ -394,19 +418,30 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->commit_cb = conf->commit_cb; pTmq->commit_cb = conf->commit_cb;
pTmq->resetOffsetCfg = conf->resetOffset; pTmq->resetOffsetCfg = conf->resetOffset;
// assign consumerId
pTmq->consumerId = tGenIdPI64(); pTmq->consumerId = tGenIdPI64();
pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
if (pTmq->clientTopics == NULL) {
taosMemoryFree(pTmq);
return NULL;
}
pTmq->mqueue = taosOpenQueue(); // init semaphore
pTmq->qall = taosAllocateQall(); if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
goto FAIL;
}
tsem_init(&pTmq->rspSem, 0, 0); // init connection
pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
if (pTmq->pTscObj == NULL) {
tsem_destroy(&pTmq->rspSem);
goto FAIL;
}
return pTmq; return pTmq;
FAIL:
if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
if (pTmq->qall) taosFreeQall(pTmq->qall);
taosMemoryFree(pTmq);
return NULL;
} }
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) { tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
...@@ -497,86 +532,64 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in ...@@ -497,86 +532,64 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
return resp; return resp;
} }
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
SRequestObj* pRequest = NULL; const SArray* container = &topic_list->container;
SArray* container = &topic_list->container; int32_t sz = taosArrayGetSize(container);
int32_t sz = taosArrayGetSize(container); void* buf = NULL;
// destroy ex SCMSubscribeReq req = {0};
taosArrayDestroy(tmq->clientTopics); int32_t code = -1;
tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic));
SCMSubscribeReq req;
req.consumerId = tmq->consumerId; req.consumerId = tmq->consumerId;
strcpy(req.cgroup, tmq->groupId); tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
req.topicNames = taosArrayInit(sz, sizeof(void*)); req.topicNames = taosArrayInit(sz, sizeof(void*));
if (req.topicNames == NULL) goto FAIL;
for (int i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
/*char* topicName = topic_list->elems[i];*/ char* topic = taosArrayGetP(container, i);
char* topicName = taosArrayGetP(container, i);
SName name = {0}; SName name = {0};
#if 0 tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
char* dbName = getDbOfConnection(tmq->pTscObj);
if (dbName == NULL) {
return TMQ_RESP_ERR__FAIL;
}
#endif
tNameSetDbName(&name, tmq->pTscObj->acctId, topicName, strlen(topicName));
#if 0
tNameFromString(&name, topicName, T_NAME_TABLE);
#endif
char* topicFname = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN); char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
if (topicFname == NULL) { if (topicFName == NULL) {
goto _return; goto FAIL;
} }
tNameExtractFullName(&name, topicFname); tNameExtractFullName(&name, topicFName);
tscDebug("subscribe topic: %s", topicFname);
SMqClientTopic topic = {
.sql = NULL,
.sqlLen = 0,
.topicId = 0,
.topicName = topicFname,
.vgs = NULL,
};
topic.vgs = taosArrayInit(0, sizeof(SMqClientVg));
taosArrayPush(tmq->clientTopics, &topic);
taosArrayPush(req.topicNames, &topicFname);
#if 0
taosMemoryFree(dbName);
#endif
}
int tlen = tSerializeSCMSubscribeReq(NULL, &req); tscDebug("subscribe topic: %s", topicFName);
void* buf = taosMemoryMalloc(tlen);
if (buf == NULL) { taosArrayPush(req.topicNames, &topicFName);
goto _return;
} }
int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
buf = taosMemoryMalloc(tlen);
if (buf == NULL) goto FAIL;
void* abuf = buf; void* abuf = buf;
tSerializeSCMSubscribeReq(&abuf, &req); tSerializeSCMSubscribeReq(&abuf, &req);
/*printf("formatted: %s\n", dagStr);*/
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_SUBSCRIBE); SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
if (pRequest == NULL) { if (sendInfo == NULL) goto FAIL;
tscError("failed to malloc request");
}
SMqSubscribeCbParam param = { SMqSubscribeCbParam param = {
.rspErr = TMQ_RESP_ERR__SUCCESS, .rspErr = TMQ_RESP_ERR__SUCCESS,
.tmq = tmq, .tmq = tmq,
}; };
tsem_init(&param.rspSem, 0, 0);
pRequest->body.requestMsg = (SDataBuf){ if (tsem_init(&param.rspSem, 0, 0) != 0) goto FAIL;
sendInfo->msgInfo = (SDataBuf){
.pData = buf, .pData = buf,
.len = tlen, .len = tlen,
.handle = NULL, .handle = NULL,
}; };
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); sendInfo->requestId = generateRequestId();
sendInfo->requestObjRefId = 0;
sendInfo->param = &param; sendInfo->param = &param;
sendInfo->fp = tmqSubscribeCb; sendInfo->fp = tmqSubscribeCb;
sendInfo->msgType = TDMT_MND_SUBSCRIBE;
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0; int64_t transporterId = 0;
...@@ -585,15 +598,28 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { ...@@ -585,15 +598,28 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
tsem_wait(&param.rspSem); tsem_wait(&param.rspSem);
tsem_destroy(&param.rspSem); tsem_destroy(&param.rspSem);
_return: code = param.rspErr;
/*if (sendInfo != NULL) {*/ if (code != 0) goto FAIL;
/*destroySendMsgInfo(sendInfo);*/
/*}*/ // TODO: add max retry cnt
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, true)) {
tscDebug("not ready, retry\n");
taosMsleep(500);
}
return param.rspErr; code = 0;
FAIL:
if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
if (code != 0) {
taosMemoryFree(buf);
}
return code;
} }
void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { conf->commit_cb = cb; } void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) {
//
conf->commit_cb = cb;
}
TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbName, const char* sql) { TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbName, const char* sql) {
STscObj* pTscObj = (STscObj*)taos; STscObj* pTscObj = (STscObj*)taos;
...@@ -627,9 +653,6 @@ TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbNa ...@@ -627,9 +653,6 @@ TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbNa
int32_t code = 0; int32_t code = 0;
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
CHECK_CODE_GOTO(parseSql(pRequest, false, &pQueryNode, NULL), _return); CHECK_CODE_GOTO(parseSql(pRequest, false, &pQueryNode, NULL), _return);
// todo check for invalid sql statement and return with error code
CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &astStr, NULL), _return); CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &astStr, NULL), _return);
/*printf("%s\n", pStr);*/ /*printf("%s\n", pStr);*/
...@@ -653,7 +676,6 @@ TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbNa ...@@ -653,7 +676,6 @@ TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbNa
} }
tSerializeSCMCreateStreamReq(buf, tlen, &req); tSerializeSCMCreateStreamReq(buf, tlen, &req);
/*printf("formatted: %s\n", dagStr);*/
pRequest->body.requestMsg = (SDataBuf){ pRequest->body.requestMsg = (SDataBuf){
.pData = buf, .pData = buf,
...@@ -684,94 +706,6 @@ _return: ...@@ -684,94 +706,6 @@ _return:
return pRequest; return pRequest;
} }
#if 0
TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) {
STscObj* pTscObj = (STscObj*)taos;
SRequestObj* pRequest = NULL;
SQuery* pQueryNode = NULL;
char* astStr = NULL;
terrno = TSDB_CODE_SUCCESS;
if (taos == NULL || topicName == NULL || sql == NULL) {
tscError("invalid parameters for creating topic, connObj:%p, topic name:%s, sql:%s", taos, topicName, sql);
terrno = TSDB_CODE_TSC_INVALID_INPUT;
goto _return;
}
if (strlen(topicName) >= TSDB_TOPIC_NAME_LEN) {
tscError("topic name too long, max length:%d", TSDB_TOPIC_NAME_LEN - 1);
terrno = TSDB_CODE_TSC_INVALID_INPUT;
goto _return;
}
if (sqlLen > TSDB_MAX_ALLOWED_SQL_LEN) {
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
goto _return;
}
tscDebug("start to create topic: %s", topicName);
int32_t code = TSDB_CODE_SUCCESS;
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
CHECK_CODE_GOTO(parseSql(pRequest, true, &pQueryNode), _return);
// todo check for invalid sql statement and return with error code
CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &astStr, NULL), _return);
/*printf("%s\n", pStr);*/
SName name = {.acctId = pTscObj->acctId, .type = TSDB_TABLE_NAME_T};
strcpy(name.dbname, pRequest->pDb);
strcpy(name.tname, topicName);
SCMCreateTopicReq req = {
.igExists = 1,
.ast = astStr,
.sql = (char*)sql,
};
tNameExtractFullName(&name, req.name);
int tlen = tSerializeSCMCreateTopicReq(NULL, 0, &req);
void* buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
goto _return;
}
tSerializeSCMCreateTopicReq(buf, tlen, &req);
/*printf("formatted: %s\n", dagStr);*/
pRequest->body.requestMsg = (SDataBuf){
.pData = buf,
.len = tlen,
.handle = NULL,
};
pRequest->type = TDMT_MND_CREATE_TOPIC;
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
tsem_wait(&pRequest->body.rspSem);
_return:
taosMemoryFreeClear(astStr);
qDestroyQuery(pQueryNode);
/*if (sendInfo != NULL) {*/
/*destroySendMsgInfo(sendInfo);*/
/*}*/
if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
pRequest->code = terrno;
}
return pRequest;
}
#endif
#if 0 #if 0
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) { int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return 0; if (tmq_message == NULL) return 0;
...@@ -954,7 +888,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -954,7 +888,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
/*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/ /*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
/*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/ /*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/
if (tmqUpdateEp(tmq, head->epoch, &rsp)) { if (tmqUpdateEp(tmq, head->epoch, &rsp)) {
atomic_store_64(&tmq->status, TMQ_CONSUMER_STATUS__READY); atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
} }
tDeleteSMqCMGetSubEpRsp(&rsp); tDeleteSMqCMGetSubEpRsp(&rsp);
} else { } else {
...@@ -1189,7 +1123,6 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { ...@@ -1189,7 +1123,6 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
int64_t transporterId = 0; int64_t transporterId = 0;
/*printf("send poll\n");*/ /*printf("send poll\n");*/
/*atomic_add_fetch_32(&tmq->waitingRequest, 1);*/
tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId, tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId,
pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId); pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId);
/*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/ /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
...@@ -1268,12 +1201,14 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { ...@@ -1268,12 +1201,14 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
SMqRspObj* rspObj; SMqRspObj* rspObj;
int64_t startTime = taosGetTimestampMs(); int64_t startTime = taosGetTimestampMs();
// TODO: put into another thread or delayed queue // TODO: put into delayed queue
int64_t status = atomic_load_64(&tmq->status); #if 0
while (0 != tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT)) { int8_t status = atomic_load_8(&tmq->status);
while (0 != tmqAskEp(tmq, status != TMQ_CONSUMER_STATUS__READY)) {
tscDebug("not ready, retry\n"); tscDebug("not ready, retry\n");
taosSsleep(1); taosSsleep(1);
} }
#endif
rspObj = tmqHandleAllRsp(tmq, blocking_time, false); rspObj = tmqHandleAllRsp(tmq, blocking_time, false);
if (rspObj) { if (rspObj) {
...@@ -1281,7 +1216,6 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { ...@@ -1281,7 +1216,6 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
} }
while (1) { while (1) {
/*printf("cycle\n");*/
tmqAskEp(tmq, false); tmqAskEp(tmq, false);
tmqPollImpl(tmq, blocking_time); tmqPollImpl(tmq, blocking_time);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册