提交 ff57f57c 编写于 作者: L Liu Jicong

refactor tmq

上级 a703b702
......@@ -92,17 +92,6 @@ typedef struct taosField {
typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code);
typedef struct tmq_t tmq_t;
typedef struct tmq_conf_t tmq_conf_t;
typedef struct tmq_list_t tmq_list_t;
typedef struct tmq_message_t tmq_message_t;
typedef struct tmq_message_topic_t tmq_message_topic_t;
typedef struct tmq_message_tb_t tmq_message_tb_t;
typedef struct tmq_tb_iter_t tmq_tb_iter_t;
typedef struct tmq_message_col_t tmq_message_col_t;
typedef struct tmq_col_iter_t tmq_col_iter_t;
typedef struct TAOS_BIND {
int buffer_type;
void * buffer;
......@@ -205,27 +194,59 @@ DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList);
DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision);
/* --------------------------TMQ INTERFACE------------------------------- */
typedef struct tmq_resp_err_t tmq_resp_err_t;
enum tmq_resp_err_t {
TMQ_RESP_ERR__SUCCESS = 0,
TMQ_RESP_ERR__FAIL = 1,
};
typedef enum tmq_resp_err_t tmq_resp_err_t;
typedef struct tmq_t tmq_t;
typedef struct tmq_topic_vgroup_t tmq_topic_vgroup_t;
typedef struct tmq_topic_vgroup_list_t tmq_topic_vgroup_list_t;
typedef struct tmq_conf_t tmq_conf_t;
typedef struct tmq_list_t tmq_list_t;
typedef struct tmq_message_t tmq_message_t;
typedef void (tmq_commit_cb(tmq_t*, tmq_resp_err_t, tmq_topic_vgroup_list_t*, void* param));
DLL_EXPORT tmq_list_t* tmq_list_new();
DLL_EXPORT int32_t tmq_list_append(tmq_list_t*, char*);
DLL_EXPORT tmq_conf_t* tmq_conf_new();
DLL_EXPORT int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value);
DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb);
DLL_EXPORT TAOS_RES *taos_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen);
DLL_EXPORT tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen);
DLL_EXPORT TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list);
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_topic_vgroup_list_t** topics);
#endif
DLL_EXPORT tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time);
DLL_EXPORT tmq_resp_err_t* tmq_commit(tmq_t* tmq, tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t* tmq);
DLL_EXPORT tmq_resp_err_t tmq_assign(tmq_t* tmq, const tmq_topic_vgroup_list_t* vgroups);
DLL_EXPORT tmq_resp_err_t tmq_assignment(tmq_t* tmq, tmq_topic_vgroup_list_t** vgroups);
#endif
DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async);
#endif
/* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */
enum tmq_conf_res_t {
TMQ_CONF_UNKNOWN = -2,
TMQ_CONF_INVALID = -1,
TMQ_CONF_OK = 0,
};
typedef enum tmq_conf_res_t tmq_conf_res_t;
DLL_EXPORT tmq_conf_t* tmq_conf_new();
DLL_EXPORT void tmq_conf_destroy(tmq_conf_t* conf);
DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value);
DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb);
#ifdef __cplusplus
}
......
......@@ -26,6 +26,51 @@
#include "tpagedfile.h"
#include "tref.h"
struct tmq_list_t {
int32_t cnt;
int32_t tot;
char* elems[];
};
struct tmq_topic_vgroup_t {
char* topic;
int32_t vgId;
int64_t commitOffset;
};
struct tmq_topic_vgroup_list_t {
int32_t cnt;
int32_t size;
tmq_topic_vgroup_t* elems;
};
struct tmq_conf_t {
char clientId[256];
char groupId[256];
/*char* ip;*/
/*uint16_t port;*/
tmq_commit_cb* commit_cb;
};
struct tmq_t {
char groupId[256];
char clientId[256];
SRWLatch lock;
int64_t consumerId;
int64_t epoch;
int64_t status;
tsem_t rspSem;
STscObj* pTscObj;
tmq_commit_cb* commit_cb;
int32_t nextTopicIdx;
SArray* clientTopics; //SArray<SMqClientTopic>
//stat
int64_t pollCnt;
};
struct tmq_message_t {
SMqConsumeRsp rsp;
};
typedef struct SMqClientVg {
// statistics
int64_t pollCnt;
......@@ -47,83 +92,43 @@ typedef struct SMqClientTopic {
SArray* vgs; //SArray<SMqClientVg>
} SMqClientTopic;
typedef struct SMqAskEpCbParam {
tmq_t* tmq;
int32_t wait;
} SMqAskEpCbParam;
struct tmq_resp_err_t {
int32_t code;
};
struct tmq_topic_vgroup_t {
char* topic;
int32_t vgId;
int64_t commitOffset;
};
struct tmq_topic_vgroup_list_t {
int32_t cnt;
int32_t size;
tmq_topic_vgroup_t* elems;
};
typedef struct SMqConsumeCbParam {
tmq_t* tmq;
SMqClientVg* pVg;
tmq_message_t** retMsg;
} SMqConsumeCbParam;
struct tmq_conf_t {
char clientId[256];
char groupId[256];
char* ip;
uint16_t port;
tmq_commit_cb* commit_cb;
};
struct tmq_message_t {
SMqConsumeRsp rsp;
};
typedef struct SMqSubscribeCbParam {
tmq_t* tmq;
tsem_t rspSem;
tmq_resp_err_t rspErr;
} SMqSubscribeCbParam;
tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t));
return conf;
}
int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
void tmq_conf_destroy(tmq_conf_t* conf) {
if(conf) free(conf);
}
tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
if (strcmp(key, "group.id") == 0) {
strcpy(conf->groupId, value);
}
if (strcmp(key, "client.id") == 0) {
strcpy(conf->clientId, value);
}
return 0;
return TMQ_CONF_OK;
}
struct tmq_t {
char groupId[256];
char clientId[256];
SRWLatch lock;
int64_t consumerId;
int64_t epoch;
int64_t status;
tsem_t rspSem;
STscObj* pTscObj;
tmq_commit_cb* commit_cb;
int32_t nextTopicIdx;
SArray* clientTopics; //SArray<SMqClientTopic>
//stat
int64_t pollCnt;
};
struct tmq_list_t {
int32_t cnt;
int32_t tot;
char* elems[];
};
tmq_list_t* tmq_list_new() {
tmq_list_t *ptr = malloc(sizeof(tmq_list_t) + 8 * sizeof(char*));
if (ptr == NULL) {
......@@ -141,6 +146,12 @@ int32_t tmq_list_append(tmq_list_t* ptr, char* src) {
return 0;
}
int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
pParam->rspErr = code;
tsem_post(&pParam->rspSem);
return 0;
}
tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
tmq_t* pTmq = calloc(sizeof(tmq_t), 1);
......@@ -161,7 +172,7 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
return pTmq;
}
TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
SRequestObj *pRequest = NULL;
int32_t sz = topic_list->cnt;
//destroy ex
......@@ -219,27 +230,31 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
tscError("failed to malloc sqlObj");
}
SMqSubscribeCbParam param = {
.rspErr = TMQ_RESP_ERR__SUCCESS,
.tmq = tmq
};
tsem_init(&param.rspSem, 0, 0);
pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
/*sendInfo->fp*/
sendInfo->param = &param;
sendInfo->fp = tmqSubscribeCb;
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
tsem_wait(&pRequest->body.rspSem);
tsem_wait(&param.rspSem);
tsem_destroy(&param.rspSem);
_return:
/*if (sendInfo != NULL) {*/
/*destroySendMsgInfo(sendInfo);*/
/*}*/
if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
pRequest->code = terrno;
}
return pRequest;
return param.rspErr;
}
void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) {
......@@ -611,10 +626,10 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
tmq_message_t* tmq_message = NULL;
int64_t status = atomic_load_64(&tmq->status);
tmqAsyncAskEp(tmq, status == 0 || taosArrayGetSize(tmq->clientTopics));
tmqAsyncAskEp(tmq, taosArrayGetSize(tmq->clientTopics));
/*if (blocking_time < 0) blocking_time = 500;*/
blocking_time = 1000;
blocking_time = 1;
if (taosArrayGetSize(tmq->clientTopics) == 0) {
tscDebug("consumer:%ld poll but not assigned", tmq->consumerId);
......@@ -674,16 +689,15 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
/*return pRequest;*/
}
tmq_resp_err_t* tmq_commit(tmq_t* tmq, tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) {
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) {
SMqConsumeReq req = {0};
return NULL;
return 0;
}
void tmq_message_destroy(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return;
}
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
assert(pMsgBody != NULL);
tfree(pMsgBody->msgInfo.pData);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册