提交 82523908 编写于 作者: L Liu Jicong

refact: change struct name

上级 9dbb925a
......@@ -22,89 +22,89 @@
extern "C" {
#endif
typedef struct tmqMsgHead {
typedef struct TmqMsgHead {
int32_t protoVer;
int32_t msgType;
int64_t cgId;
int64_t clientId;
} tmqMsgHead;
} TmqMsgHead;
typedef struct tmqOneAck {
typedef struct TmqOneAck {
int64_t topicId;
int64_t consumeOffset;
} tmqOneAck;
} TmqOneAck;
typedef struct tmqAcks {
typedef struct TmqAcks {
int32_t ackNum;
//should be sorted
tmqOneAck acks[];
} tmqAcks;
TmqOneAck acks[];
} TmqAcks;
//TODO: put msgs into common
typedef struct tmqConnectReq {
tmqMsgHead head;
tmqAcks acks;
} tmqConnectReq;
typedef struct TmqConnectReq {
TmqMsgHead head;
TmqAcks acks;
} TmqConnectReq;
typedef struct tmqConnectRsp {
tmqMsgHead head;
typedef struct TmqConnectRsp {
TmqMsgHead head;
int8_t status;
} tmqConnectRsp;
} TmqConnectRsp;
typedef struct tmqDisconnectReq {
tmqMsgHead head;
} tmqDisconnectReq;
typedef struct TmqDisconnectReq {
TmqMsgHead head;
} TmqDiscconectReq;
typedef struct tmqDisconnectRsp {
tmqMsgHead head;
typedef struct TmqDisconnectRsp {
TmqMsgHead head;
int8_t status;
} tmqDiconnectRsp;
} TmqDisconnectRsp;
typedef struct tmqConsumeReq {
tmqMsgHead head;
tmqAcks acks;
} tmqConsumeReq;
typedef struct TmqConsumeReq {
TmqMsgHead head;
TmqAcks acks;
} TmqConsumeReq;
typedef struct tmqMsgContent {
typedef struct TmqMsgContent {
int64_t topicId;
int64_t msgLen;
char msg[];
} tmqMsgContent;
} TmqMsgContent;
typedef struct tmqConsumeRsp {
tmqMsgHead head;
typedef struct TmqConsumeRsp {
TmqMsgHead head;
int64_t bodySize;
tmqMsgContent msgs[];
} tmqConsumeRsp;
TmqMsgContent msgs[];
} TmqConsumeRsp;
typedef struct tmqMnodeSubscribeReq {
tmqMsgHead head;
typedef struct TmqSubscribeReq {
TmqMsgHead head;
int64_t topicLen;
char topic[];
} tmqSubscribeReq;
} TmqSubscribeReq;
typedef struct tmqMnodeSubscribeRsp {
tmqMsgHead head;
typedef struct tmqSubscribeRsp {
TmqMsgHead head;
int64_t vgId;
char ep[]; //TSDB_EP_LEN
} tmqSubscribeRsp;
} TmqSubscribeRsp;
typedef struct tmqHeartbeatReq {
typedef struct TmqHeartbeatReq {
} tmqHeartbeatReq;
} TmqHeartbeatReq;
typedef struct tmqHeartbeatRsp {
typedef struct TmqHeartbeatRsp {
} tmqHeartbeatRsp;
} TmqHeartbeatRsp;
typedef struct tqTopicVhandle {
typedef struct TqTopicVhandle {
//name
//
//executor for filter
//
//callback for mnode
//
} tqTopicVhandle;
} TqTopicVhandle;
typedef struct STQ {
//the collection of group handle
......@@ -114,16 +114,16 @@ typedef struct STQ {
#define TQ_BUFFER_SIZE 8
//TODO: define a serializer and deserializer
typedef struct tqBufferItem {
typedef struct TqBufferItem {
int64_t offset;
//executors are identical but not concurrent
//so it must be a copy in each item
void* executor;
int64_t size;
void* content;
} tqBufferItem;
} TqBufferItem;
typedef struct tqBufferHandle {
typedef struct TqBufferHandle {
//char* topic; //c style, end with '\0'
//int64_t cgId;
//void* ahandle;
......@@ -131,32 +131,32 @@ typedef struct tqBufferHandle {
int64_t topicId;
int32_t head;
int32_t tail;
tqBufferItem buffer[TQ_BUFFER_SIZE];
} tqBufferHandle;
TqBufferItem buffer[TQ_BUFFER_SIZE];
} TqBufferHandle;
typedef struct tqListHandle {
tqBufferHandle bufHandle;
struct tqListHandle* next;
} tqListHandle;
typedef struct TqListHandle {
TqBufferHandle bufHandle;
struct TqListHandle* next;
} TqListHandle;
typedef struct tqGroupHandle {
typedef struct TqGroupHandle {
int64_t cId;
int64_t cgId;
void* ahandle;
int32_t topicNum;
tqListHandle *head;
} tqGroupHandle;
TqListHandle *head;
} TqGroupHandle;
typedef struct tqQueryExec {
typedef struct TqQueryExec {
void* src;
tqBufferItem* dest;
TqBufferItem* dest;
void* executor;
} tqQueryExec;
} TqQueryExec;
typedef struct tqQueryMsg {
tqQueryExec *exec;
struct tqQueryMsg *next;
} tqQueryMsg;
typedef struct TqQueryMsg {
TqQueryExec *exec;
struct TqQueryMsg *next;
} TqQueryMsg;
//init in each vnode
STQ* tqInit(void* ref_func(void*), void* unref_func(void*));
......@@ -166,28 +166,28 @@ void tqCleanUp(STQ*);
int tqPushMsg(STQ*, void* msg, int64_t version);
int tqCommit(STQ*);
int tqConsume(STQ*, tmqConsumeReq*);
int tqConsume(STQ*, TmqConsumeReq*);
tqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId);
TqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId);
int tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int tqMoveOffsetToNext(tqGroupHandle*);
int tqMoveOffsetToNext(TqGroupHandle*);
int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset);
int tqRegisterContext(tqGroupHandle*, void*);
int tqLaunchQuery(tqGroupHandle*);
int tqSendLaunchQuery(tqGroupHandle*);
int tqRegisterContext(TqGroupHandle*, void* ahandle);
int tqLaunchQuery(TqGroupHandle*);
int tqSendLaunchQuery(TqGroupHandle*);
int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes);
void* tqSerializeListHandle(tqListHandle *listHandle, void* ptr);
void* tqSerializeBufHandle(tqBufferHandle *bufHandle, void* ptr);
void* tqSerializeBufItem(tqBufferItem *bufItem, void* ptr);
int tqSerializeGroupHandle(TqGroupHandle *gHandle, void** ppBytes);
void* tqSerializeListHandle(TqListHandle *listHandle, void* ptr);
void* tqSerializeBufHandle(TqBufferHandle *bufHandle, void* ptr);
void* tqSerializeBufItem(TqBufferItem *bufItem, void* ptr);
const void* tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle *ghandle);
const void* tqDeserializeBufHandle(const void* pBytes, tqBufferHandle *bufHandle);
const void* tqDeserializeBufItem(const void* pBytes, tqBufferItem *bufItem);
const void* tqDeserializeGroupHandle(const void* pBytes, TqGroupHandle *ghandle);
const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle *bufHandle);
const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem *bufItem);
int tqGetGHandleSSize(const tqGroupHandle *gHandle);
int tqGetGHandleSSize(const TqGroupHandle *gHandle);
int tqBufHandleSSize();
int tqBufItemSSize();
......
......@@ -221,8 +221,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead) {
int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead) {
//parse message and optionally move offset
void* pMsg = pRead->pCont;
tmqConsumeReq *pConsumeMsg = (tmqConsumeReq*) pMsg;
tmqMsgHead msgHead = pConsumeMsg->head;
TmqConsumeReq *pConsumeMsg = (TmqConsumeReq*) pMsg;
TmqMsgHead msgHead = pConsumeMsg->head;
//extract head
STQ *pTq = pVnode->pTQ;
/*tqBufferHandle *pHandle = tqGetHandle(pTq, msgHead.clientId);*/
......
......@@ -22,18 +22,18 @@
//
//handle management message
//
static int tqProtoCheck(tmqMsgHead *pMsg) {
static int tqProtoCheck(TmqMsgHead *pMsg) {
return pMsg->protoVer == 0;
}
static int tqAckOneTopic(tqBufferHandle *bhandle, tmqOneAck *pAck, tqQueryMsg** ppQuery) {
static int tqAckOneTopic(TqBufferHandle *bhandle, TmqOneAck *pAck, TqQueryMsg** ppQuery) {
//clean old item and move forward
int32_t consumeOffset = pAck->consumeOffset;
int idx = consumeOffset % TQ_BUFFER_SIZE;
ASSERT(bhandle->buffer[idx].content && bhandle->buffer[idx].executor);
tfree(bhandle->buffer[idx].content);
if( 1 /* TODO: need to launch new query */) {
tqQueryMsg* pNewQuery = malloc(sizeof(tqQueryMsg));
TqQueryMsg* pNewQuery = malloc(sizeof(TqQueryMsg));
if(pNewQuery == NULL) {
//TODO: memory insufficient
return -1;
......@@ -49,14 +49,14 @@ static int tqAckOneTopic(tqBufferHandle *bhandle, tmqOneAck *pAck, tqQueryMsg**
return 0;
}
static int tqAck(tqGroupHandle* ghandle, tmqAcks* pAcks) {
static int tqAck(TqGroupHandle* ghandle, TmqAcks* pAcks) {
int32_t ackNum = pAcks->ackNum;
tmqOneAck *acks = pAcks->acks;
TmqOneAck *acks = pAcks->acks;
//double ptr for acks and list
int i = 0;
tqListHandle* node = ghandle->head;
TqListHandle* node = ghandle->head;
int ackCnt = 0;
tqQueryMsg *pQuery = NULL;
TqQueryMsg *pQuery = NULL;
while(i < ackNum && node->next) {
if(acks[i].topicId == node->next->bufHandle.topicId) {
ackCnt++;
......@@ -73,12 +73,12 @@ static int tqAck(tqGroupHandle* ghandle, tmqAcks* pAcks) {
return ackCnt;
}
static int tqCommitTCGroup(tqGroupHandle* handle) {
static int tqCommitTCGroup(TqGroupHandle* handle) {
//persist modification into disk
return 0;
}
int tqCreateTCGroup(STQ *pTq, int64_t topicId, int64_t cgId, int64_t cId, tqGroupHandle** handle) {
int tqCreateTCGroup(STQ *pTq, int64_t topicId, int64_t cgId, int64_t cId, TqGroupHandle** handle) {
//create in disk
return 0;
}
......@@ -99,13 +99,13 @@ int tqDropTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
return 0;
}
static int tqFetch(tqGroupHandle* ghandle, void** msg) {
tqListHandle* head = ghandle->head;
tqListHandle* node = head;
static int tqFetch(TqGroupHandle* ghandle, void** msg) {
TqListHandle* head = ghandle->head;
TqListHandle* node = head;
int totSize = 0;
//TODO: make it a macro
int sizeLimit = 4 * 1024;
tmqMsgContent* buffer = malloc(sizeLimit);
TmqMsgContent* buffer = malloc(sizeLimit);
if(buffer == NULL) {
//TODO:memory insufficient
return -1;
......@@ -114,7 +114,7 @@ static int tqFetch(tqGroupHandle* ghandle, void** msg) {
//until all topic iterated or msgs over sizeLimit
while(node->next) {
node = node->next;
tqBufferHandle* bufHandle = &node->bufHandle;
TqBufferHandle* bufHandle = &node->bufHandle;
int idx = bufHandle->nextConsumeOffset % TQ_BUFFER_SIZE;
if(bufHandle->buffer[idx].content != NULL &&
bufHandle->buffer[idx].offset == bufHandle->nextConsumeOffset
......@@ -144,19 +144,19 @@ static int tqFetch(tqGroupHandle* ghandle, void** msg) {
}
tqGroupHandle* tqGetGroupHandle(STQ* pTq, int64_t cId) {
TqGroupHandle* tqGetGroupHandle(STQ* pTq, int64_t cId) {
return NULL;
}
int tqLaunchQuery(tqGroupHandle* ghandle) {
int tqLaunchQuery(TqGroupHandle* ghandle) {
return 0;
}
int tqSendLaunchQuery(tqGroupHandle* gHandle) {
int tqSendLaunchQuery(TqGroupHandle* gHandle) {
return 0;
}
/*int tqMoveOffsetToNext(tqGroupHandle* ghandle) {*/
/*int tqMoveOffsetToNext(TqGroupHandle* ghandle) {*/
/*return 0;*/
/*}*/
......@@ -171,13 +171,13 @@ int tqCommit(STQ* pTq) {
return 0;
}
int tqConsume(STQ* pTq, tmqConsumeReq* pMsg) {
if(!tqProtoCheck((tmqMsgHead *)pMsg)) {
int tqConsume(STQ* pTq, TmqConsumeReq* pMsg) {
if(!tqProtoCheck((TmqMsgHead *)pMsg)) {
//proto version invalid
return -1;
}
int64_t clientId = pMsg->head.clientId;
tqGroupHandle *ghandle = tqGetGroupHandle(pTq, clientId);
TqGroupHandle *ghandle = tqGetGroupHandle(pTq, clientId);
if(ghandle == NULL) {
//client not connect
return -1;
......@@ -189,7 +189,7 @@ int tqConsume(STQ* pTq, tmqConsumeReq* pMsg) {
}
}
tmqConsumeRsp *pRsp = (tmqConsumeRsp*) pMsg;
TmqConsumeRsp *pRsp = (TmqConsumeRsp*) pMsg;
if(tqFetch(ghandle, (void**)&pRsp->msgs) <= 0) {
//fetch error
......@@ -204,7 +204,7 @@ int tqConsume(STQ* pTq, tmqConsumeReq* pMsg) {
return 0;
}
int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes) {
int tqSerializeGroupHandle(TqGroupHandle *gHandle, void** ppBytes) {
//calculate size
int sz = tqGetGHandleSSize(gHandle);
void* ptr = realloc(*ppBytes, sz);
......@@ -227,8 +227,8 @@ int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes) {
return 0;
}
void* tqSerializeListHandle(tqListHandle *listHandle, void* ptr) {
tqListHandle *node = listHandle;
void* tqSerializeListHandle(TqListHandle *listHandle, void* ptr) {
TqListHandle *node = listHandle;
ASSERT(node != NULL);
while(node) {
ptr = tqSerializeBufHandle(&node->bufHandle, ptr);
......@@ -237,7 +237,7 @@ void* tqSerializeListHandle(tqListHandle *listHandle, void* ptr) {
return ptr;
}
void* tqSerializeBufHandle(tqBufferHandle *bufHandle, void* ptr) {
void* tqSerializeBufHandle(TqBufferHandle *bufHandle, void* ptr) {
*(int64_t*)ptr = bufHandle->nextConsumeOffset;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int64_t*)ptr = bufHandle->topicId;
......@@ -252,13 +252,13 @@ void* tqSerializeBufHandle(tqBufferHandle *bufHandle, void* ptr) {
return ptr;
}
void* tqSerializeBufItem(tqBufferItem *bufItem, void* ptr) {
void* tqSerializeBufItem(TqBufferItem *bufItem, void* ptr) {
//TODO: do we need serialize this?
//mainly for executor
return ptr;
}
const void* tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle *gHandle) {
const void* tqDeserializeGroupHandle(const void* pBytes, TqGroupHandle *gHandle) {
const void* ptr = pBytes;
gHandle->cId = *(int64_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
......@@ -268,10 +268,10 @@ const void* tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle *gHandle)
gHandle->topicNum = *(int32_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
gHandle->head = NULL;
tqListHandle *node = gHandle->head;
TqListHandle *node = gHandle->head;
for(int i = 0; i < gHandle->topicNum; i++) {
if(gHandle->head == NULL) {
if((node = malloc(sizeof(tqListHandle))) == NULL) {
if((node = malloc(sizeof(TqListHandle))) == NULL) {
//TODO: error
return NULL;
}
......@@ -279,7 +279,7 @@ const void* tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle *gHandle)
ptr = tqDeserializeBufHandle(ptr, &node->bufHandle);
gHandle->head = node;
} else {
node->next = malloc(sizeof(tqListHandle));
node->next = malloc(sizeof(TqListHandle));
if(node->next == NULL) {
//TODO: error
return NULL;
......@@ -292,7 +292,7 @@ const void* tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle *gHandle)
return ptr;
}
const void* tqDeserializeBufHandle(const void* pBytes, tqBufferHandle *bufHandle) {
const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle *bufHandle) {
const void* ptr = pBytes;
bufHandle->nextConsumeOffset = *(int64_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
......@@ -308,12 +308,12 @@ const void* tqDeserializeBufHandle(const void* pBytes, tqBufferHandle *bufHandle
return ptr;
}
const void* tqDeserializeBufItem(const void* pBytes, tqBufferItem *bufItem) {
const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem *bufItem) {
return pBytes;
}
//TODO: make this a macro
int tqGetGHandleSSize(const tqGroupHandle *gHandle) {
int tqGetGHandleSSize(const TqGroupHandle *gHandle) {
return sizeof(int64_t) * 2
+ sizeof(int32_t)
+ gHandle->topicNum * tqBufHandleSSize();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册