提交 f36f1a82 编写于 作者: L Liu Jicong

temporary remove multi topic consume

上级 a038c466
......@@ -13,16 +13,14 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <time.h>
#include "taos.h"
static int running = 1;
static void msg_process(tmq_message_t* message) {
tmqShowMsg(message);
}
static int running = 1;
static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }
int32_t init_env() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
......@@ -44,29 +42,28 @@ int32_t init_env() {
}
taos_free_result(pRes);
/*pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)");*/
/*if (taos_errno(pRes) != 0) {*/
/*printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes));*/
/*return -1;*/
/*}*/
/*taos_free_result(pRes);*/
/*pRes = taos_query(pConn, "create table if not exists tu using st1 tags(1)");*/
/*if (taos_errno(pRes) != 0) {*/
/*printf("failed to create child table tu, reason:%s\n", taos_errstr(pRes));*/
/*return -1;*/
/*}*/
/*taos_free_result(pRes);*/
/*pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags(2)");*/
/*if (taos_errno(pRes) != 0) {*/
/*printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes));*/
/*return -1;*/
/*}*/
/*taos_free_result(pRes);*/
const char* sql = "select * from st1";
pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists tu1 using st1 tags(1)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags(2)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
const char* sql = "select * from tu2";
pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));
if (taos_errno(pRes) != 0) {
printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes));
......@@ -95,7 +92,7 @@ tmq_t* build_consumer() {
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_stb_topic_1");
tmq_subscribe(tmq, topic_list);
return NULL;
return NULL;
}
tmq_list_t* build_topic_list() {
......@@ -104,8 +101,7 @@ tmq_list_t* build_topic_list() {
return topic_list;
}
void basic_consume_loop(tmq_t *tmq,
tmq_list_t *topics) {
void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
tmq_resp_err_t err;
if ((err = tmq_subscribe(tmq, topics))) {
......@@ -116,12 +112,12 @@ void basic_consume_loop(tmq_t *tmq,
int32_t cnt = 0;
/*clock_t startTime = clock();*/
while (running) {
tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 500);
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500);
if (tmqmessage) {
cnt++;
msg_process(tmqmessage);
tmq_message_destroy(tmqmessage);
/*} else {*/
/*} else {*/
/*break;*/
}
}
......@@ -135,11 +131,10 @@ void basic_consume_loop(tmq_t *tmq,
fprintf(stderr, "%% Consumer closed\n");
}
void sync_consume_loop(tmq_t *tmq,
tmq_list_t *topics) {
void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
static const int MIN_COMMIT_COUNT = 1000;
int msg_count = 0;
int msg_count = 0;
tmq_resp_err_t err;
if ((err = tmq_subscribe(tmq, topics))) {
......@@ -148,15 +143,14 @@ void sync_consume_loop(tmq_t *tmq,
}
while (running) {
tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 500);
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500);
if (tmqmessage) {
msg_process(tmqmessage);
tmq_message_destroy(tmqmessage);
if ((++msg_count % MIN_COMMIT_COUNT) == 0)
tmq_commit(tmq, NULL, 0);
if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);
}
}
}
err = tmq_consumer_close(tmq);
if (err)
......@@ -168,7 +162,7 @@ void sync_consume_loop(tmq_t *tmq,
int main() {
int code;
code = init_env();
tmq_t* tmq = build_consumer();
tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = build_topic_list();
basic_consume_loop(tmq, topic_list);
/*sync_consume_loop(tmq, topic_list);*/
......
......@@ -25,9 +25,9 @@ extern "C" {
#include "taoserror.h"
#include "tarray.h"
#include "tcoding.h"
#include "trow.h"
#include "thash.h"
#include "tlist.h"
#include "trow.h"
/* ------------------------ MESSAGE DEFINITIONS ------------------------ */
#define TD_MSG_NUMBER_
......@@ -69,7 +69,7 @@ typedef uint16_t tmsg_t;
#define TSDB_IE_TYPE_DNODE_STATE 7
typedef enum {
HEARTBEAT_TYPE_MQ = 0,
HEARTBEAT_TYPE_MQ = 0,
HEARTBEAT_TYPE_QUERY = 1,
// types can be added here
//
......@@ -82,7 +82,6 @@ enum {
HEARTBEAT_KEY_MQ_TMP,
};
typedef enum _mgmt_table {
TSDB_MGMT_TABLE_START,
TSDB_MGMT_TABLE_ACCT,
......@@ -192,14 +191,14 @@ typedef struct {
// Submit message for one table
typedef struct SSubmitBlk {
uint64_t uid; // table unique id
int32_t tid; // table id
int32_t padding; // TODO just for padding here
int32_t sversion; // data schema version
int32_t dataLen; // data part length, not including the SSubmitBlk head
int32_t schemaLen; // schema length, if length is 0, no schema exists
int16_t numOfRows; // total number of rows in current submit block
char data[];
int64_t uid; // table unique id
int32_t tid; // table id
int32_t padding; // TODO just for padding here
int32_t sversion; // data schema version
int32_t dataLen; // data part length, not including the SSubmitBlk head
int32_t schemaLen; // schema length, if length is 0, no schema exists
int16_t numOfRows; // total number of rows in current submit block
char data[];
} SSubmitBlk;
typedef struct {
......@@ -226,7 +225,7 @@ typedef struct {
typedef struct {
int32_t totalLen;
int32_t len;
STSRow *row;
STSRow* row;
} SSubmitBlkIter;
typedef struct {
......@@ -302,9 +301,9 @@ typedef struct {
} SConnectReq;
typedef struct SEpSet {
int8_t inUse;
int8_t numOfEps;
SEp eps[TSDB_MAX_REPLICA];
int8_t inUse;
int8_t numOfEps;
SEp eps[TSDB_MAX_REPLICA];
} SEpSet;
static FORCE_INLINE int taosEncodeSEpSet(void** buf, const SEpSet* pEp) {
......@@ -689,9 +688,9 @@ typedef struct {
} SDnodeCfg;
typedef struct {
int32_t id;
int8_t isMnode;
SEp ep;
int32_t id;
int8_t isMnode;
SEp ep;
} SDnodeEp;
typedef struct {
......@@ -768,10 +767,10 @@ typedef struct {
// todo refactor
typedef struct SVgroupInfo {
int32_t vgId;
uint32_t hashBegin;
uint32_t hashEnd;
SEpSet epset;
int32_t vgId;
uint32_t hashBegin;
uint32_t hashEnd;
SEpSet epset;
} SVgroupInfo;
typedef struct {
......@@ -962,7 +961,7 @@ typedef struct SSubQueryMsg {
uint64_t queryId;
uint64_t taskId;
int8_t taskType;
uint32_t sqlLen; // the query sql,
uint32_t sqlLen; // the query sql,
uint32_t phyLen;
char msg[];
} SSubQueryMsg;
......@@ -981,7 +980,6 @@ typedef struct {
uint64_t taskId;
} SQueryContinueReq;
typedef struct {
SMsgHead header;
uint64_t sId;
......@@ -1182,10 +1180,10 @@ typedef struct {
} SMqTmrMsg;
typedef struct {
const char* key;
SArray* lostConsumers; //SArray<int64_t>
SArray* removedConsumers; //SArray<int64_t>
SArray* newConsumers; //SArray<int64_t>
const char* key;
SArray* lostConsumers; // SArray<int64_t>
SArray* removedConsumers; // SArray<int64_t>
SArray* newConsumers; // SArray<int64_t>
} SMqRebSubscribe;
static FORCE_INLINE SMqRebSubscribe* tNewSMqRebSubscribe(const char* key) {
......@@ -1215,10 +1213,11 @@ _err:
return NULL;
}
// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization / deserialization
// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization /
// deserialization
typedef struct {
//SArray* rebSubscribes; //SArray<SMqRebSubscribe>
SHashObj* rebSubHash; // SHashObj<key, SMqRebSubscribe>
// SArray* rebSubscribes; //SArray<SMqRebSubscribe>
SHashObj* rebSubHash; // SHashObj<key, SMqRebSubscribe>
} SMqDoRebalanceMsg;
#if 0
......@@ -1391,9 +1390,9 @@ static FORCE_INLINE int32_t tEncodeSMsgHead(void** buf, const SMsgHead* pMsg) {
}
typedef struct SMqHbRsp {
int8_t status; //idle or not
int8_t status; // idle or not
int8_t vnodeChanged;
int8_t epChanged; // should use new epset
int8_t epChanged; // should use new epset
int8_t reserved;
SEpSet epSet;
} SMqHbRsp;
......@@ -1416,7 +1415,7 @@ static FORCE_INLINE void* taosDecodeSMqHbRsp(void* buf, SMqHbRsp* pRsp) {
}
typedef struct SMqHbOneTopicBatchRsp {
char topicName[TSDB_TOPIC_FNAME_LEN];
char topicName[TSDB_TOPIC_FNAME_LEN];
SArray* rsps; // SArray<SMqHbRsp>
} SMqHbOneTopicBatchRsp;
......@@ -1446,8 +1445,8 @@ static FORCE_INLINE void* taosDecodeSMqHbOneTopicBatchRsp(void* buf, SMqHbOneTop
}
typedef struct SMqHbBatchRsp {
int64_t consumerId;
SArray* batchRsps; // SArray<SMqHbOneTopicBatchRsp>
int64_t consumerId;
SArray* batchRsps; // SArray<SMqHbOneTopicBatchRsp>
} SMqHbBatchRsp;
static FORCE_INLINE int taosEncodeSMqHbBatchRsp(void** buf, const SMqHbBatchRsp* pBatchRsp) {
......@@ -1456,7 +1455,7 @@ static FORCE_INLINE int taosEncodeSMqHbBatchRsp(void** buf, const SMqHbBatchRsp*
int32_t sz;
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqHbOneTopicBatchRsp* pRsp = (SMqHbOneTopicBatchRsp*) taosArrayGet(pBatchRsp->batchRsps, i);
SMqHbOneTopicBatchRsp* pRsp = (SMqHbOneTopicBatchRsp*)taosArrayGet(pBatchRsp->batchRsps, i);
tlen += taosEncodeSMqHbOneTopicBatchRsp(buf, pRsp);
}
return tlen;
......@@ -1468,7 +1467,7 @@ static FORCE_INLINE void* taosDecodeSMqHbBatchRsp(void* buf, SMqHbBatchRsp* pBat
buf = taosDecodeFixedI32(buf, &sz);
pBatchRsp->batchRsps = taosArrayInit(sz, sizeof(SMqHbOneTopicBatchRsp));
for (int32_t i = 0; i < sz; i++) {
SMqHbOneTopicBatchRsp rsp;
SMqHbOneTopicBatchRsp rsp;
buf = taosDecodeSMqHbOneTopicBatchRsp(buf, &rsp);
buf = taosArrayPush(pBatchRsp->batchRsps, &rsp);
}
......@@ -1508,9 +1507,7 @@ typedef struct {
SArray* rsps; // SArray<SClientHbRsp>
} SClientHbBatchRsp;
static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) {
return taosIntHash_64(key, keyLen);
}
static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { return taosIntHash_64(key, keyLen); }
int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq);
void* tDeserializeSClientHbReq(void* buf, SClientHbReq* pReq);
......@@ -1518,9 +1515,8 @@ void* tDeserializeSClientHbReq(void* buf, SClientHbReq* pReq);
int tSerializeSClientHbRsp(void** buf, const SClientHbRsp* pRsp);
void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp);
static FORCE_INLINE void tFreeReqKvHash(SHashObj* info) {
void *pIter = taosHashIterate(info, NULL);
static FORCE_INLINE void tFreeReqKvHash(SHashObj* info) {
void* pIter = taosHashIterate(info, NULL);
while (pIter != NULL) {
SKv* kv = (SKv*)pIter;
......@@ -1530,12 +1526,11 @@ static FORCE_INLINE void tFreeReqKvHash(SHashObj* info) {
}
}
static FORCE_INLINE void tFreeClientHbReq(void *pReq) {
static FORCE_INLINE void tFreeClientHbReq(void* pReq) {
SClientHbReq* req = (SClientHbReq*)pReq;
if (req->info) {
tFreeReqKvHash(req->info);
taosHashCleanup(req->info);
}
}
......@@ -1544,7 +1539,7 @@ int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq);
void* tDeserializeSClientHbBatchReq(void* buf, SClientHbBatchReq* pReq);
static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) {
SClientHbBatchReq *req = (SClientHbBatchReq*)pReq;
SClientHbBatchReq* req = (SClientHbBatchReq*)pReq;
if (deep) {
taosArrayDestroyEx(req->reqs, tFreeClientHbReq);
} else {
......@@ -1553,25 +1548,23 @@ static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) {
free(pReq);
}
static FORCE_INLINE void tFreeClientKv(void *pKv) {
SKv *kv = (SKv *)pKv;
static FORCE_INLINE void tFreeClientKv(void* pKv) {
SKv* kv = (SKv*)pKv;
if (kv) {
tfree(kv->value);
}
}
static FORCE_INLINE void tFreeClientHbRsp(void *pRsp) {
static FORCE_INLINE void tFreeClientHbRsp(void* pRsp) {
SClientHbRsp* rsp = (SClientHbRsp*)pRsp;
if (rsp->info) taosArrayDestroyEx(rsp->info, tFreeClientKv);
}
static FORCE_INLINE void tFreeClientHbBatchRsp(void* pRsp) {
SClientHbBatchRsp *rsp = (SClientHbBatchRsp*)pRsp;
SClientHbBatchRsp* rsp = (SClientHbBatchRsp*)pRsp;
taosArrayDestroyEx(rsp->rsps, tFreeClientHbRsp);
}
int tSerializeSClientHbBatchRsp(void** buf, const SClientHbBatchRsp* pBatchRsp);
void* tDeserializeSClientHbBatchRsp(void* buf, SClientHbBatchRsp* pBatchRsp);
......@@ -1655,10 +1648,10 @@ static FORCE_INLINE void* taosDecodeSMqHbTopicInfoMsg(void* buf, SMqHbTopicInfo*
}
typedef struct SMqHbMsg {
int32_t status; // ask hb endpoint
int32_t epoch;
int64_t consumerId;
SArray* pTopics; // SArray<SMqHbTopicInfo>
int32_t status; // ask hb endpoint
int32_t epoch;
int64_t consumerId;
SArray* pTopics; // SArray<SMqHbTopicInfo>
} SMqHbMsg;
static FORCE_INLINE int taosEncodeSMqMsg(void** buf, const SMqHbMsg* pMsg) {
......@@ -1691,15 +1684,15 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
}
typedef struct {
int64_t leftForVer;
int32_t vgId;
int64_t consumerId;
char topicName[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CONSUMER_GROUP_LEN];
char* sql;
char* logicalPlan;
char* physicalPlan;
char* qmsg;
int64_t leftForVer;
int32_t vgId;
int64_t consumerId;
char topicName[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CONSUMER_GROUP_LEN];
char* sql;
char* logicalPlan;
char* physicalPlan;
char* qmsg;
} SMqSetCVgReq;
static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) {
......@@ -1730,16 +1723,16 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
}
typedef struct {
int64_t leftForVer;
int32_t vgId;
int64_t oldConsumerId;
int64_t newConsumerId;
//char topicName[TSDB_TOPIC_FNAME_LEN];
//char cgroup[TSDB_CONSUMER_GROUP_LEN];
//char* sql;
//char* logicalPlan;
//char* physicalPlan;
//char* qmsg;
int64_t leftForVer;
int32_t vgId;
int64_t oldConsumerId;
int64_t newConsumerId;
// char topicName[TSDB_TOPIC_FNAME_LEN];
// char cgroup[TSDB_CONSUMER_GROUP_LEN];
// char* sql;
// char* logicalPlan;
// char* physicalPlan;
// char* qmsg;
} SMqMVRebReq;
static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pReq) {
......@@ -1748,13 +1741,13 @@ static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pR
tlen += taosEncodeFixedI32(buf, pReq->vgId);
tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId);
tlen += taosEncodeFixedI64(buf, pReq->newConsumerId);
//tlen += taosEncodeString(buf, pReq->topicName);
//tlen += taosEncodeString(buf, pReq->cgroup);
//tlen += taosEncodeString(buf, pReq->sql);
//tlen += taosEncodeString(buf, pReq->logicalPlan);
//tlen += taosEncodeString(buf, pReq->physicalPlan);
//tlen += taosEncodeString(buf, pReq->qmsg);
//tlen += tEncodeSSubQueryMsg(buf, &pReq->msg);
// tlen += taosEncodeString(buf, pReq->topicName);
// tlen += taosEncodeString(buf, pReq->cgroup);
// tlen += taosEncodeString(buf, pReq->sql);
// tlen += taosEncodeString(buf, pReq->logicalPlan);
// tlen += taosEncodeString(buf, pReq->physicalPlan);
// tlen += taosEncodeString(buf, pReq->qmsg);
// tlen += tEncodeSSubQueryMsg(buf, &pReq->msg);
return tlen;
}
......@@ -1763,13 +1756,13 @@ static FORCE_INLINE void* tDecodeSMqMVRebReq(void* buf, SMqMVRebReq* pReq) {
buf = taosDecodeFixedI32(buf, &pReq->vgId);
buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId);
buf = taosDecodeFixedI64(buf, &pReq->newConsumerId);
//buf = taosDecodeStringTo(buf, pReq->topicName);
//buf = taosDecodeStringTo(buf, pReq->cgroup);
//buf = taosDecodeString(buf, &pReq->sql);
//buf = taosDecodeString(buf, &pReq->logicalPlan);
//buf = taosDecodeString(buf, &pReq->physicalPlan);
//buf = taosDecodeString(buf, &pReq->qmsg);
//buf = tDecodeSSubQueryMsg(buf, &pReq->msg);
// buf = taosDecodeStringTo(buf, pReq->topicName);
// buf = taosDecodeStringTo(buf, pReq->cgroup);
// buf = taosDecodeString(buf, &pReq->sql);
// buf = taosDecodeString(buf, &pReq->logicalPlan);
// buf = taosDecodeString(buf, &pReq->physicalPlan);
// buf = taosDecodeString(buf, &pReq->qmsg);
// buf = tDecodeSSubQueryMsg(buf, &pReq->msg);
return buf;
}
......@@ -1791,7 +1784,7 @@ typedef struct {
typedef struct {
uint32_t nCols;
SSchema *pSchema;
SSchema* pSchema;
} SSchemaWrapper;
static FORCE_INLINE int32_t tEncodeSSchema(void** buf, const SSchema* pSchema) {
......@@ -1814,7 +1807,7 @@ static FORCE_INLINE void* tDecodeSSchema(void* buf, SSchema* pSchema) {
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) {
int32_t tlen = 0;
tlen += taosEncodeFixedU32(buf, pSW->nCols);
for (int32_t i = 0; i < pSW->nCols; i ++) {
for (int32_t i = 0; i < pSW->nCols; i++) {
tlen += tEncodeSSchema(buf, &pSW->pSchema[i]);
}
return tlen;
......@@ -1822,20 +1815,20 @@ static FORCE_INLINE int32_t tEncodeSSchemaWrapper(void** buf, const SSchemaWrapp
static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) {
buf = taosDecodeFixedU32(buf, &pSW->nCols);
pSW->pSchema = (SSchema*) calloc(pSW->nCols, sizeof(SSchema));
pSW->pSchema = (SSchema*)calloc(pSW->nCols, sizeof(SSchema));
if (pSW->pSchema == NULL) {
return NULL;
}
for (int32_t i = 0; i < pSW->nCols; i ++) {
for (int32_t i = 0; i < pSW->nCols; i++) {
buf = tDecodeSSchema(buf, &pSW->pSchema[i]);
}
return buf;
}
typedef struct {
int64_t uid;
int32_t numOfRows;
char* colData;
int64_t uid;
int32_t numOfRows;
char* colData;
} SMqTbData;
typedef struct {
......@@ -1857,24 +1850,24 @@ typedef struct {
int64_t rspOffset;
int32_t skipLogNum;
int32_t numOfTopics;
SArray* pBlockData; //SArray<SSDataBlock>
SArray* pBlockData; // SArray<SSDataBlock>
} SMqConsumeRsp;
// one req for one vg+topic
typedef struct {
SMsgHead head;
//0: commit only, current offset
//1: consume only, poll next offset
//2: commit current and consume next offset
int32_t reqType;
SMsgHead head;
// 0: commit only, current offset
// 1: consume only, poll next offset
// 2: commit current and consume next offset
int32_t reqType;
int64_t reqId;
int64_t consumerId;
int64_t blockingTime;
char cgroup[TSDB_CONSUMER_GROUP_LEN];
int64_t reqId;
int64_t consumerId;
int64_t blockingTime;
char cgroup[TSDB_CONSUMER_GROUP_LEN];
int64_t offset;
char topic[TSDB_TOPIC_FNAME_LEN];
int64_t offset;
char topic[TSDB_TOPIC_FNAME_LEN];
} SMqConsumeReq;
typedef struct {
......@@ -1884,7 +1877,7 @@ typedef struct {
typedef struct {
char topic[TSDB_TOPIC_FNAME_LEN];
SArray* vgs; // SArray<SMqSubVgEp>
SArray* vgs; // SArray<SMqSubVgEp>
} SMqSubTopicEp;
typedef struct {
......@@ -1894,9 +1887,7 @@ typedef struct {
SArray* topics; // SArray<SMqSubTopicEp>
} SMqCMGetSubEpRsp;
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) {
taosArrayDestroy(pSubTopicEp->vgs);
}
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); }
static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) {
int32_t tlen = 0;
......@@ -1912,7 +1903,7 @@ static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) {
}
static FORCE_INLINE void tDeleteSMqCMGetSubEpRsp(SMqCMGetSubEpRsp* pRsp) {
taosArrayDestroyEx(pRsp->topics, (void (*)(void*)) tDeleteSMqSubTopicEp);
taosArrayDestroyEx(pRsp->topics, (void (*)(void*))tDeleteSMqSubTopicEp);
}
static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) {
......
......@@ -205,6 +205,117 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
}
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
SMqConsumeReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId;
int64_t fetchOffset = pReq->offset;
/*int64_t blockingTime = pReq->blockingTime;*/
SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL};
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
if (pConsumer == NULL) {
pMsg->pCont = NULL;
pMsg->contLen = 0;
pMsg->code = -1;
rpcSendResponse(pMsg);
return 0;
}
int sz = taosArrayGetSize(pConsumer->topics);
ASSERT(sz == 1);
STqTopic* pTopic = taosArrayGet(pConsumer->topics, 0);
ASSERT(strcmp(pTopic->topicName, pReq->topic) == 0);
ASSERT(pConsumer->consumerId == consumerId);
if (pReq->reqType == TMQ_REQ_TYPE_COMMIT_ONLY) {
pTopic->committedOffset = pReq->offset;
pMsg->pCont = NULL;
pMsg->contLen = 0;
pMsg->code = 0;
rpcSendResponse(pMsg);
return 0;
}
if (pReq->reqType == TMQ_REQ_TYPE_CONSUME_AND_COMMIT) {
pTopic->committedOffset = pReq->offset - 1;
}
rsp.committedOffset = pTopic->committedOffset;
rsp.reqOffset = pReq->offset;
rsp.skipLogNum = 0;
SWalHead* pHead;
while (1) {
int8_t pos = fetchOffset % TQ_BUFFER_SIZE;
if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
break;
}
pHead = pTopic->pReadhandle->pHead;
if (pHead->head.msgType == TDMT_VND_SUBMIT) {
SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body;
qTaskInfo_t task = pTopic->buffer.output[pos].task;
qSetStreamInput(task, pCont);
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
while (1) {
SSDataBlock* pDataBlock;
uint64_t ts;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
ASSERT(false);
}
if (pDataBlock == NULL) {
fetchOffset++;
rsp.skipLogNum++;
break;
}
taosArrayPush(pRes, pDataBlock);
rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper;
rsp.rspOffset = fetchOffset;
pTopic->currentOffset = fetchOffset;
rsp.numOfTopics = 1;
rsp.pBlockData = pRes;
int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
pMsg->code = -1;
return -1;
}
void* abuf = buf;
tEncodeSMqConsumeRsp(&abuf, &rsp);
taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;
rpcSendResponse(pMsg);
return 0;
}
} else {
fetchOffset++;
rsp.skipLogNum++;
}
}
int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
pMsg->code = -1;
return -1;
}
void* abuf = buf;
tEncodeSMqConsumeRsp(&abuf, &rsp);
rsp.pBlockData = NULL;
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;
rpcSendResponse(pMsg);
return 0;
}
#if 0
int32_t tqProcessConsumeReqV0(STQ* pTq, SRpcMsg* pMsg) {
SMqConsumeReq* pReq = pMsg->pCont;
int64_t reqId = pReq->reqId;
int64_t consumerId = pReq->consumerId;
......@@ -265,6 +376,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
break;
}
if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
printf("read offset %ld\n", fetchOffset);
// check err
atomic_store_8(&pTopic->buffer.output[pos].status, 0);
skip = 1;
......@@ -273,10 +385,10 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
// read until find TDMT_VND_SUBMIT
pHead = pTopic->pReadhandle->pHead;
if (pHead->head.msgType == TDMT_VND_SUBMIT) {
break;
}
rsp.skipLogNum++;
if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {
printf("read offset %ld\n", fetchOffset);
atomic_store_8(&pTopic->buffer.output[pos].status, 0);
skip = 1;
break;
......@@ -288,6 +400,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body;
qTaskInfo_t task = pTopic->buffer.output[pos].task;
printf("current fetch offset %ld\n", fetchOffset);
qSetStreamInput(task, pCont);
// SArray<SSDataBlock>
......@@ -307,6 +420,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
// TODO copy
rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper;
rsp.rspOffset = fetchOffset;
pTopic->currentOffset = fetchOffset;
atomic_store_8(&pTopic->buffer.output[pos].status, 0);
......@@ -350,6 +464,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
rpcSendResponse(pMsg);
return 0;
}
#endif
int32_t tqProcessRebReq(STQ* pTq, char* msg) {
SMqMVRebReq req = {0};
......
......@@ -52,12 +52,15 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
ASSERT(pHandle->tbIdHash);
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t));
if (ret != NULL) {
/*printf("retrieve one tb %ld\n", pHandle->pBlock->uid);*/
pHandle->pBlock->tid = htonl(pHandle->pBlock->tid);
pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion);
pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen);
pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen);
pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows);
return true;
} else {
/*printf("skip one tb %ld\n", pHandle->pBlock->uid);*/
}
}
return false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册