提交 d0a61176 编写于 作者: L Liu Jicong

add auto commit

上级 684558f9
......@@ -5,6 +5,7 @@ AccessModifierOffset: -1
AlignAfterOpenBracket: Align
AlignConsecutiveAssignments: false
AlignConsecutiveDeclarations: true
AlignConsecutiveMacros: true
AlignEscapedNewlinesLeft: true
AlignOperands: true
AlignTrailingComments: true
......@@ -86,6 +87,5 @@ SpacesInSquareBrackets: false
Standard: Auto
TabWidth: 8
UseTab: Never
AlignConsecutiveDeclarations: true
...
......@@ -88,11 +88,6 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "group.id", "tg2");
tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0);
return tmq;
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_stb_topic_1");
tmq_subscribe(tmq, topic_list);
return NULL;
}
tmq_list_t* build_topic_list() {
......@@ -109,12 +104,12 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
printf("subscribe err\n");
return;
}
int32_t cnt = 0;
/*int32_t cnt = 0;*/
/*clock_t startTime = clock();*/
while (running) {
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500);
if (tmqmessage) {
cnt++;
/*cnt++;*/
msg_process(tmqmessage);
tmq_message_destroy(tmqmessage);
/*} else {*/
......@@ -192,12 +187,15 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics) {
fprintf(stderr, "%% Consumer closed\n");
}
int main() {
int main(int argc, char* argv[]) {
int code;
if (argc > 1) {
printf("env init\n");
code = init_env();
}
tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = build_topic_list();
perf_loop(tmq, topic_list);
/*basic_consume_loop(tmq, topic_list);*/
/*perf_loop(tmq, topic_list);*/
basic_consume_loop(tmq, topic_list);
/*sync_consume_loop(tmq, topic_list);*/
}
......@@ -1274,28 +1274,12 @@ _err:
return NULL;
}
// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization /
// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization or
// deserialization
typedef struct {
// SArray* rebSubscribes; //SArray<SMqRebSubscribe>
SHashObj* rebSubHash; // SHashObj<key, SMqRebSubscribe>
} SMqDoRebalanceMsg;
#if 0
static FORCE_INLINE SMqDoRebalanceMsg* tNewSMqDoRebalanceMsg() {
SMqDoRebalanceMsg *pMsg = malloc(sizeof(SMqDoRebalanceMsg));
if (pMsg == NULL) {
return NULL;
}
pMsg->rebSubscribes = taosArrayInit(0, sizeof(SMqRebSubscribe));
if (pMsg->rebSubscribes == NULL) {
free(pMsg);
return NULL;
}
return pMsg;
}
#endif
typedef struct {
int64_t status;
} SMVSubscribeRsp;
......@@ -1779,12 +1763,6 @@ typedef struct {
int32_t vgId;
int64_t oldConsumerId;
int64_t newConsumerId;
// char topicName[TSDB_TOPIC_FNAME_LEN];
// char cgroup[TSDB_CONSUMER_GROUP_LEN];
// char* sql;
// char* logicalPlan;
// char* physicalPlan;
// char* qmsg;
} SMqMVRebReq;
static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pReq) {
......@@ -1793,13 +1771,6 @@ static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pR
tlen += taosEncodeFixedI32(buf, pReq->vgId);
tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId);
tlen += taosEncodeFixedI64(buf, pReq->newConsumerId);
// tlen += taosEncodeString(buf, pReq->topicName);
// tlen += taosEncodeString(buf, pReq->cgroup);
// tlen += taosEncodeString(buf, pReq->sql);
// tlen += taosEncodeString(buf, pReq->logicalPlan);
// tlen += taosEncodeString(buf, pReq->physicalPlan);
// tlen += taosEncodeString(buf, pReq->qmsg);
// tlen += tEncodeSSubQueryMsg(buf, &pReq->msg);
return tlen;
}
......@@ -1808,13 +1779,6 @@ static FORCE_INLINE void* tDecodeSMqMVRebReq(void* buf, SMqMVRebReq* pReq) {
buf = taosDecodeFixedI32(buf, &pReq->vgId);
buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId);
buf = taosDecodeFixedI64(buf, &pReq->newConsumerId);
// buf = taosDecodeStringTo(buf, pReq->topicName);
// buf = taosDecodeStringTo(buf, pReq->cgroup);
// buf = taosDecodeString(buf, &pReq->sql);
// buf = taosDecodeString(buf, &pReq->logicalPlan);
// buf = taosDecodeString(buf, &pReq->physicalPlan);
// buf = taosDecodeString(buf, &pReq->qmsg);
// buf = tDecodeSSubQueryMsg(buf, &pReq->msg);
return buf;
}
......
......@@ -13,6 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// clang-format off
#if 0
#undef TD_MSG_INFO_
#undef TD_MSG_NUMBER_
......@@ -191,3 +193,4 @@ enum {
TDMT_MAX
#endif
};
// clang-format on
......@@ -33,6 +33,7 @@ struct tmq_list_t {
int32_t tot;
char* elems[];
};
struct tmq_topic_vgroup_t {
char* topic;
int32_t vgId;
......@@ -48,14 +49,17 @@ struct tmq_topic_vgroup_list_t {
struct tmq_conf_t {
char clientId[256];
char groupId[256];
bool auto_commit;
/*char* ip;*/
/*uint16_t port;*/
tmq_commit_cb* commit_cb;
};
struct tmq_t {
// conf
char groupId[256];
char clientId[256];
bool autoCommit;
SRWLatch lock;
int64_t consumerId;
int32_t epoch;
......@@ -94,25 +98,25 @@ typedef struct SMqClientTopic {
SArray* vgs; // SArray<SMqClientVg>
} SMqClientTopic;
typedef struct SMqSubscribeCbParam {
typedef struct {
tmq_t* tmq;
tsem_t rspSem;
tmq_resp_err_t rspErr;
} SMqSubscribeCbParam;
typedef struct SMqAskEpCbParam {
typedef struct {
tmq_t* tmq;
int32_t wait;
} SMqAskEpCbParam;
typedef struct SMqConsumeCbParam {
typedef struct {
tmq_t* tmq;
SMqClientVg* pVg;
tmq_message_t** retMsg;
tsem_t rspSem;
} SMqConsumeCbParam;
typedef struct SMqCommitCbParam {
typedef struct {
tmq_t* tmq;
SMqClientVg* pVg;
int32_t async;
......@@ -121,6 +125,7 @@ typedef struct SMqCommitCbParam {
tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t));
conf->auto_commit = false;
return conf;
}
......@@ -131,11 +136,24 @@ void tmq_conf_destroy(tmq_conf_t* conf) {
tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
if (strcmp(key, "group.id") == 0) {
strcpy(conf->groupId, value);
return TMQ_CONF_OK;
}
if (strcmp(key, "client.id") == 0) {
strcpy(conf->clientId, value);
return TMQ_CONF_OK;
}
if (strcmp(key, "enable.auto.commit") == 0) {
if (strcmp(value, "true") == 0) {
conf->auto_commit = true;
return TMQ_CONF_OK;
} else if (strcmp(value, "false") == 0) {
conf->auto_commit = false;
return TMQ_CONF_OK;
} else {
return TMQ_CONF_INVALID;
}
}
return TMQ_CONF_UNKNOWN;
}
tmq_list_t* tmq_list_new() {
......@@ -182,11 +200,14 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
pTmq->pollCnt = 0;
pTmq->epoch = 0;
taosInitRWLatch(&pTmq->lock);
// set conf
strcpy(pTmq->clientId, conf->clientId);
strcpy(pTmq->groupId, conf->groupId);
pTmq->autoCommit = conf->auto_commit;
pTmq->commit_cb = conf->commit_cb;
tsem_init(&pTmq->rspSem, 0, 0);
pTmq->consumerId = generateRequestId() & ((uint64_t)-1 >> 1);
pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
return pTmq;
}
......@@ -563,8 +584,15 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
topic.vgs = taosArrayInit(vgSz, sizeof(SMqClientVg));
for (int32_t j = 0; j < vgSz; j++) {
SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
// clang-format off
SMqClientVg clientVg = {
.pollCnt = 0, .committedOffset = -1, .currentOffset = -1, .vgId = pVgEp->vgId, .epSet = pVgEp->epSet};
.pollCnt = 0,
.committedOffset = -1,
.currentOffset = -1,
.vgId = pVgEp->vgId,
.epSet = pVgEp->epSet
};
// clang-format on
taosArrayPush(topic.vgs, &clientVg);
set = true;
}
......@@ -655,7 +683,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
int64_t status = atomic_load_64(&tmq->status);
tmqAskEp(tmq, status == 0);
if (blocking_time < 0) blocking_time = 1;
if (blocking_time <= 0) blocking_time = 1;
if (blocking_time > 1000) blocking_time = 1000;
/*blocking_time = 1;*/
......@@ -676,7 +704,8 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
pTopic->nextVgIdx = (pTopic->nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs);
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx);
/*printf("consume vg %d, offset %ld\n", pVg->vgId, pVg->currentOffset);*/
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, TMQ_REQ_TYPE_CONSUME_ONLY, pTopic, pVg);
int32_t reqType = tmq->autoCommit ? TMQ_REQ_TYPE_CONSUME_AND_COMMIT : TMQ_REQ_TYPE_COMMIT_ONLY;
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, reqType, pTopic, pVg);
if (pReq == NULL) {
ASSERT(false);
usleep(blocking_time * 1000);
......
......@@ -248,7 +248,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
switch (pReq->type) {
case TD_SUPER_TABLE:
tlen += taosEncodeFixedU64(buf, pReq->stbCfg.suid);
tlen += taosEncodeFixedI64(buf, pReq->stbCfg.suid);
tlen += taosEncodeFixedU32(buf, pReq->stbCfg.nCols);
for (uint32_t i = 0; i < pReq->stbCfg.nCols; i++) {
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pSchema[i].type);
......@@ -265,7 +265,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
}
break;
case TD_CHILD_TABLE:
tlen += taosEncodeFixedU64(buf, pReq->ctbCfg.suid);
tlen += taosEncodeFixedI64(buf, pReq->ctbCfg.suid);
tlen += tdEncodeKVRow(buf, pReq->ctbCfg.pTag);
break;
case TD_NORMAL_TABLE:
......@@ -293,7 +293,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
switch (pReq->type) {
case TD_SUPER_TABLE:
buf = taosDecodeFixedU64(buf, &(pReq->stbCfg.suid));
buf = taosDecodeFixedI64(buf, &(pReq->stbCfg.suid));
buf = taosDecodeFixedU32(buf, &(pReq->stbCfg.nCols));
pReq->stbCfg.pSchema = (SSchema *)malloc(pReq->stbCfg.nCols * sizeof(SSchema));
for (uint32_t i = 0; i < pReq->stbCfg.nCols; i++) {
......@@ -312,7 +312,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
}
break;
case TD_CHILD_TABLE:
buf = taosDecodeFixedU64(buf, &pReq->ctbCfg.suid);
buf = taosDecodeFixedI64(buf, &pReq->ctbCfg.suid);
buf = tdDecodeKVRow(buf, &pReq->ctbCfg.pTag);
break;
case TD_NORMAL_TABLE:
......@@ -385,14 +385,14 @@ int32_t tSerializeSMCreateStbReq(void **buf, SMCreateStbReq *pReq) {
for (int32_t i = 0; i < pReq->numOfColumns; ++i) {
SField *pField = taosArrayGet(pReq->pColumns, i);
tlen += taosEncodeFixedI8(buf, pField->type);
tlen += taosEncodeFixedU8(buf, pField->type);
tlen += taosEncodeFixedI32(buf, pField->bytes);
tlen += taosEncodeString(buf, pField->name);
}
for (int32_t i = 0; i < pReq->numOfTags; ++i) {
SField *pField = taosArrayGet(pReq->pTags, i);
tlen += taosEncodeFixedI8(buf, pField->type);
tlen += taosEncodeFixedU8(buf, pField->type);
tlen += taosEncodeFixedI32(buf, pField->bytes);
tlen += taosEncodeString(buf, pField->name);
}
......@@ -416,7 +416,7 @@ void *tDeserializeSMCreateStbReq(void *buf, SMCreateStbReq *pReq) {
for (int32_t i = 0; i < pReq->numOfColumns; ++i) {
SField field = {0};
buf = taosDecodeFixedI8(buf, &field.type);
buf = taosDecodeFixedU8(buf, &field.type);
buf = taosDecodeFixedI32(buf, &field.bytes);
buf = taosDecodeStringTo(buf, field.name);
if (taosArrayPush(pReq->pColumns, &field) == NULL) {
......@@ -427,7 +427,7 @@ void *tDeserializeSMCreateStbReq(void *buf, SMCreateStbReq *pReq) {
for (int32_t i = 0; i < pReq->numOfTags; ++i) {
SField field = {0};
buf = taosDecodeFixedI8(buf, &field.type);
buf = taosDecodeFixedU8(buf, &field.type);
buf = taosDecodeFixedI32(buf, &field.bytes);
buf = taosDecodeStringTo(buf, field.name);
if (taosArrayPush(pReq->pTags, &field) == NULL) {
......
......@@ -228,6 +228,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
if (pReq->reqType == TMQ_REQ_TYPE_COMMIT_ONLY) {
pTopic->committedOffset = pReq->offset;
printf("offset %ld committed\n", pTopic->committedOffset);
pMsg->pCont = NULL;
pMsg->contLen = 0;
pMsg->code = 0;
......@@ -236,17 +237,27 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
}
if (pReq->reqType == TMQ_REQ_TYPE_CONSUME_AND_COMMIT) {
if (pTopic->committedOffset < pReq->offset - 1) {
pTopic->committedOffset = pReq->offset - 1;
printf("offset %ld committed\n", pTopic->committedOffset);
}
}
rsp.committedOffset = pTopic->committedOffset;
rsp.reqOffset = pReq->offset;
rsp.skipLogNum = 0;
if (fetchOffset <= pTopic->committedOffset) {
fetchOffset = pTopic->committedOffset + 1;
}
SWalHead* pHead;
while (1) {
int8_t pos = fetchOffset % TQ_BUFFER_SIZE;
if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
// TODO: no more log, set timer to wait blocking time
// if data inserted during waiting, launch query and
// rsponse to user
break;
}
pHead = pTopic->pReadhandle->pHead;
......@@ -263,6 +274,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
}
if (pDataBlock == NULL) {
fetchOffset++;
pos = fetchOffset % TQ_BUFFER_SIZE;
rsp.skipLogNum++;
break;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册