提交 06853043 编写于 作者: L Liu Jicong

feat(tmq): add config msg.with.table.name

上级 1dbe0650
...@@ -107,7 +107,7 @@ int32_t create_topic() { ...@@ -107,7 +107,7 @@ int32_t create_topic() {
taos_free_result(pRes); taos_free_result(pRes);
/*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/ /*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/
pRes = taos_query(pConn, "create topic topic_ctb_column with table as select ts, c1, c2, c3 from st1"); pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
...@@ -166,6 +166,7 @@ tmq_t* build_consumer() { ...@@ -166,6 +166,7 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "td.connect.pass", "taosdata");
/*tmq_conf_set(conf, "td.connect.db", "abc1");*/ /*tmq_conf_set(conf, "td.connect.db", "abc1");*/
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
assert(tmq); assert(tmq);
......
...@@ -2371,6 +2371,7 @@ typedef struct { ...@@ -2371,6 +2371,7 @@ typedef struct {
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
char subKey[TSDB_SUBSCRIBE_KEY_LEN]; char subKey[TSDB_SUBSCRIBE_KEY_LEN];
int8_t withTbName;
int32_t epoch; int32_t epoch;
uint64_t reqId; uint64_t reqId;
int64_t consumerId; int64_t consumerId;
......
...@@ -57,16 +57,17 @@ struct tmq_topic_vgroup_list_t { ...@@ -57,16 +57,17 @@ struct tmq_topic_vgroup_list_t {
}; };
struct tmq_conf_t { struct tmq_conf_t {
char clientId[256]; char clientId[256];
char groupId[TSDB_CGROUP_LEN]; char groupId[TSDB_CGROUP_LEN];
int8_t autoCommit; int8_t autoCommit;
int8_t resetOffset; int8_t resetOffset;
uint16_t port; int8_t withTbName;
int32_t autoCommitInterval; uint16_t port;
char* ip; int32_t autoCommitInterval;
char* user; char* ip;
char* pass; char* user;
char* db; char* pass;
/*char* db;*/
tmq_commit_cb* commitCb; tmq_commit_cb* commitCb;
void* commitCbUserParam; void* commitCbUserParam;
}; };
...@@ -75,6 +76,7 @@ struct tmq_t { ...@@ -75,6 +76,7 @@ struct tmq_t {
// conf // conf
char groupId[TSDB_CGROUP_LEN]; char groupId[TSDB_CGROUP_LEN];
char clientId[256]; char clientId[256];
int8_t withTbName;
int8_t autoCommit; int8_t autoCommit;
int32_t autoCommitInterval; int32_t autoCommitInterval;
int32_t resetOffsetCfg; int32_t resetOffsetCfg;
...@@ -187,6 +189,7 @@ typedef struct { ...@@ -187,6 +189,7 @@ typedef struct {
tmq_conf_t* tmq_conf_new() { tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t)); tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
conf->withTbName = -1;
conf->autoCommit = true; conf->autoCommit = true;
conf->autoCommitInterval = 5000; conf->autoCommitInterval = 5000;
conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST; conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
...@@ -240,6 +243,18 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value ...@@ -240,6 +243,18 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
} }
} }
if (strcmp(key, "msg.with.table.name") == 0) {
if (strcmp(value, "true") == 0) {
conf->withTbName = 1;
} else if (strcmp(value, "false") == 0) {
conf->withTbName = 0;
} else if (strcmp(value, "none") == 0) {
conf->withTbName = -1;
} else {
return TMQ_CONF_INVALID;
}
}
if (strcmp(key, "td.connect.ip") == 0) { if (strcmp(key, "td.connect.ip") == 0) {
conf->ip = strdup(value); conf->ip = strdup(value);
return TMQ_CONF_OK; return TMQ_CONF_OK;
...@@ -257,7 +272,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value ...@@ -257,7 +272,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
return TMQ_CONF_OK; return TMQ_CONF_OK;
} }
if (strcmp(key, "td.connect.db") == 0) { if (strcmp(key, "td.connect.db") == 0) {
conf->db = strdup(value); /*conf->db = strdup(value);*/
return TMQ_CONF_OK; return TMQ_CONF_OK;
} }
...@@ -485,6 +500,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { ...@@ -485,6 +500,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
// set conf // set conf
strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->clientId, conf->clientId);
strcpy(pTmq->groupId, conf->groupId); strcpy(pTmq->groupId, conf->groupId);
pTmq->withTbName = conf->withTbName;
pTmq->autoCommit = conf->autoCommit; pTmq->autoCommit = conf->autoCommit;
pTmq->autoCommitInterval = conf->autoCommitInterval; pTmq->autoCommitInterval = conf->autoCommitInterval;
pTmq->commitCb = conf->commitCb; pTmq->commitCb = conf->commitCb;
...@@ -1104,6 +1120,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopic* ...@@ -1104,6 +1120,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopic*
pReq->subKey[tlen] = TMQ_SEPARATOR; pReq->subKey[tlen] = TMQ_SEPARATOR;
strcpy(pReq->subKey + tlen + 1, pTopic->topicName); strcpy(pReq->subKey + tlen + 1, pTopic->topicName);
pReq->withTbName = tmq->withTbName;
pReq->waitTime = waitTime; pReq->waitTime = waitTime;
pReq->consumerId = tmq->consumerId; pReq->consumerId = tmq->consumerId;
pReq->epoch = tmq->epoch; pReq->epoch = tmq->epoch;
......
...@@ -427,13 +427,18 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -427,13 +427,18 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqDataBlkRsp rsp = {0}; SMqDataBlkRsp rsp = {0};
rsp.reqOffset = pReq->currentOffset; rsp.reqOffset = pReq->currentOffset;
rsp.withSchema = pExec->withSchema; rsp.withSchema = pExec->withSchema;
rsp.withTbName = pExec->withTbName;
rsp.blockData = taosArrayInit(0, sizeof(void*)); rsp.blockData = taosArrayInit(0, sizeof(void*));
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t)); rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
rsp.blockSchema = taosArrayInit(0, sizeof(void*)); rsp.blockSchema = taosArrayInit(0, sizeof(void*));
rsp.blockTbName = taosArrayInit(0, sizeof(void*)); rsp.blockTbName = taosArrayInit(0, sizeof(void*));
int8_t withTbName = pExec->withTbName;
if (pReq->withTbName != -1) {
withTbName = pReq->withTbName;
}
rsp.withTbName = withTbName;
while (1) { while (1) {
consumerEpoch = atomic_load_32(&pExec->epoch); consumerEpoch = atomic_load_32(&pExec->epoch);
if (consumerEpoch > reqEpoch) { if (consumerEpoch > reqEpoch) {
...@@ -538,7 +543,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -538,7 +543,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
taosArrayPush(rsp.blockSchema, &pSW); taosArrayPush(rsp.blockSchema, &pSW);
} }
if (pExec->withTbName) { if (withTbName) {
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0); metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid; int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
...@@ -578,7 +583,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -578,7 +583,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
ASSERT(actualLen <= dataStrLen); ASSERT(actualLen <= dataStrLen);
taosArrayPush(rsp.blockDataLen, &actualLen); taosArrayPush(rsp.blockDataLen, &actualLen);
taosArrayPush(rsp.blockData, &buf); taosArrayPush(rsp.blockData, &buf);
if (pExec->withTbName) { if (withTbName) {
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0); metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
if (metaGetTableEntryByUid(&mr, block.info.uid) < 0) { if (metaGetTableEntryByUid(&mr, block.info.uid) < 0) {
......
...@@ -155,9 +155,7 @@ int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead) { ...@@ -155,9 +155,7 @@ int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead) {
if (code < 0) return -1; if (code < 0) return -1;
} }
if (!taosValidFile(pRead->pReadLogTFile)) { ASSERT(taosValidFile(pRead->pReadLogTFile) == true);
return -1;
}
code = taosReadFile(pRead->pReadLogTFile, pHead, sizeof(SWalHead)); code = taosReadFile(pRead->pReadLogTFile, pHead, sizeof(SWalHead));
if (code != sizeof(SWalHead)) { if (code != sizeof(SWalHead)) {
...@@ -256,9 +254,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { ...@@ -256,9 +254,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
} }
} }
/*if (!taosValidFile(pRead->pReadLogTFile)) {*/ ASSERT(taosValidFile(pRead->pReadLogTFile) == true);
/*return -1;*/
/*}*/
code = taosReadFile(pRead->pReadLogTFile, pRead->pHead, sizeof(SWalHead)); code = taosReadFile(pRead->pReadLogTFile, pRead->pHead, sizeof(SWalHead));
if (code != sizeof(SWalHead)) { if (code != sizeof(SWalHead)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册