提交 5e14c856 编写于 作者: L Liu Jicong

add tqconsume and tqserialize

上级 05613886
......@@ -23,64 +23,79 @@ extern "C" {
#endif
typedef struct tmqMsgHead {
int32_t headLen;
int32_t protoVer;
int32_t msgType;
int64_t cgId;
int64_t topicId;
int64_t clientId;
int32_t checksum;
int32_t msgType;
} tmqMsgHead;
typedef struct tmqOneAck {
int64_t topicId;
int64_t consumeOffset;
} tmqOneAck;
typedef struct tmqAcks {
int32_t ackNum;
//should be sorted
tmqOneAck acks[];
} tmqAcks;
//TODO: put msgs into common
typedef struct tmqConnectReq {
tmqMsgHead head;
tmqAcks acks;
} tmqConnectReq;
typedef struct tmqConnectResp {
typedef struct tmqConnectRsp {
tmqMsgHead head;
int8_t status;
} tmqConnectResp;
} tmqConnectRsp;
typedef struct tmqDisconnectReq {
tmqMsgHead head;
} tmqDisconnectReq;
typedef struct tmqDisconnectResp {
typedef struct tmqDisconnectRsp {
tmqMsgHead head;
int8_t status;
} tmqDiconnectResp;
} tmqDiconnectRsp;
typedef struct tmqConsumeReq {
tmqMsgHead head;
int64_t commitOffset;
tmqAcks acks;
} tmqConsumeReq;
typedef struct tmqConsumeResp {
tmqMsgHead head;
char content[];
} tmqConsumeResp;
typedef struct tmqMsgContent {
int64_t topicId;
int64_t msgLen;
char msg[];
} tmqMsgContent;
typedef struct tmqConsumeRsp {
tmqMsgHead head;
int64_t bodySize;
tmqMsgContent msgs[];
} tmqConsumeRsp;
//
typedef struct tmqMnodeSubscribeReq {
tmqMsgHead head;
int64_t topicLen;
char topic[];
} tmqSubscribeReq;
typedef struct tmqMnodeSubscribeResp {
typedef struct tmqMnodeSubscribeRsp {
tmqMsgHead head;
int64_t vgId;
char ep[]; //TSDB_EP_LEN
} tmqSubscribeResp;
} tmqSubscribeRsp;
typedef struct tmqHeartbeatReq {
} tmqHeartbeatReq;
typedef struct tmqHeartbeatResp {
typedef struct tmqHeartbeatRsp {
} tmqHeartbeatResp;
} tmqHeartbeatRsp;
typedef struct tqTopicVhandle {
//name
......@@ -92,33 +107,57 @@ typedef struct tqTopicVhandle {
} tqTopicVhandle;
typedef struct STQ {
//the set for topics
//key=topicName: str
//value=tqTopicVhandle
//the collection of group handle
//a map
//key=<topic: str, cgId: int64_t>
//value=consumeOffset: int64_t
} STQ;
#define TQ_BUFFER_SIZE 8
//TODO: define a serializer and deserializer
typedef struct tqBufferItem {
int64_t offset;
//executors are identical but not concurrent
//so it must be a copy in each item
void* executor;
int64_t size;
void* content;
} tqBufferItem;
typedef struct tqGroupHandle {
char* topic; //c style, end with '\0'
int64_t cgId;
void* ahandle;
int64_t consumeOffset;
typedef struct tqBufferHandle {
//char* topic; //c style, end with '\0'
//int64_t cgId;
//void* ahandle;
int64_t nextConsumeOffset;
int64_t topicId;
int32_t head;
int32_t tail;
tqBufferItem buffer[TQ_BUFFER_SIZE];
} tqBufferHandle;
typedef struct tqListHandle {
tqBufferHandle* bufHandle;
struct tqListHandle* next;
} tqListHandle;
typedef struct tqGroupHandle {
int64_t cId;
int64_t cgId;
void* ahandle;
int32_t topicNum;
tqListHandle *head;
} tqGroupHandle;
typedef struct tqQueryExec {
void* src;
tqBufferItem* dest;
void* executor;
} tqQueryExec;
typedef struct tqQueryMsg {
tqQueryExec *exec;
struct tqQueryMsg *next;
} tqQueryMsg;
//init in each vnode
STQ* tqInit(void* ref_func(void*), void* unref_func(void*));
void tqCleanUp(STQ*);
......@@ -127,20 +166,33 @@ void tqCleanUp(STQ*);
int tqPushMsg(STQ*, void* msg, int64_t version);
int tqCommit(STQ*);
//void* will be replace by a msg type
int tqHandleConsumeMsg(STQ*, tmqConsumeReq* msg);
int tqConsume(STQ*, tmqConsumeReq*);
tqGroupHandle* tqFindGHandleBycId(STQ*, int64_t cId);
tqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId);
int tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int tqMoveOffsetToNext(tqGroupHandle*);
int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset);
int tqFetchMsg(tqGroupHandle*, void*);
int tqRegisterContext(tqGroupHandle*, void*);
int tqLaunchQuery(STQ*, int64_t topicId, int64_t cgId, void* query);
int tqLaunchQuery(tqGroupHandle*);
int tqSendLaunchQuery(STQ*, int64_t topicId, int64_t cgId, void* query);
int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes, int32_t offset);
int tqSerializeListHandle(tqListHandle *listHandle, void** ppBytes, int32_t offset);
int tqSerializeBufHandle(tqBufferHandle *bufHandle, void** ppBytes, int32_t offset);
int tqSerializeBufItem(tqBufferItem *bufItem, void** ppBytes, int32_t offset);
int tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle **pGhandle);
int tqDeserializeListHandle(const void* pBytes, tqListHandle **pListHandle);
int tqDeserializeBufHandle(const void* pBytes, tqBufferHandle **pBufHandle);
int tqDeserializeBufItem(const void* pBytes, tqBufferItem **pBufItem);
int tqGetGHandleSSize(const tqGroupHandle *gHandle);
int tqListHandleSSize(const tqListHandle *listHandle);
int tqBufHandleSSize(const tqBufferHandle *bufHandle);
int tqBufItemSSize(const tqBufferItem *bufItem);
#ifdef __cplusplus
}
#endif
......
......@@ -26,13 +26,13 @@ extern "C" {
//create persistent storage for meta info such as consuming offset
//return value > 0: cgId
//return value <= 0: error code
int tqCreateTCGroup(STQ*, const char* topic, int cgId, tqGroupHandle** handle);
//int tqCreateTCGroup(STQ*, const char* topic, int cgId, tqBufferHandle** handle);
//create ring buffer in memory and load consuming offset
//int tqOpenTCGroup(STQ*, const char* topic, int cgId);
//destroy ring buffer and persist consuming offset
//int tqCloseTCGroup(STQ*, const char* topic, int cgId);
//delete persistent storage for meta info
int tqDropTCGroup(STQ*, const char* topic, int cgId);
//int tqDropTCGroup(STQ*, const char* topic, int cgId);
#ifdef __cplusplus
}
......
......@@ -21,65 +21,137 @@
//send to fetch queue
//
//handle management message
tqGroupHandle* tqLookupGroupHandle(STQ *pTq, const char* topic, int cgId) {
//look in memory
//
//not found, try to restore from disk
//
//still not found
return NULL;
}
static int tqCommitTCGroup(tqGroupHandle* handle) {
//persist into disk
return 0;
//
static int tqProtoCheck(tmqMsgHead *pMsg) {
return pMsg->protoVer == 0;
}
int tqCreateTCGroup(STQ *pTq, const char* topic, int cgId, tqGroupHandle** handle) {
static int tqAckOneTopic(tqBufferHandle *bhandle, tmqOneAck *pAck, tqQueryMsg** ppQuery) {
//clean old item and move forward
int32_t consumeOffset = pAck->consumeOffset;
int idx = consumeOffset % TQ_BUFFER_SIZE;
ASSERT(bhandle->buffer[idx].content && bhandle->buffer[idx].executor);
tfree(bhandle->buffer[idx].content);
if( 1 /* TODO: need to launch new query */) {
tqQueryMsg* pNewQuery = malloc(sizeof(tqQueryMsg));
if(pNewQuery == NULL) {
//TODO: memory insufficient
return -1;
}
//TODO: lock executor
pNewQuery->exec->executor = bhandle->buffer[idx].executor;
//TODO: read from wal and assign to src
pNewQuery->exec->src = 0;
pNewQuery->exec->dest = &bhandle->buffer[idx];
pNewQuery->next = *ppQuery;
*ppQuery = pNewQuery;
}
return 0;
}
int tqOpenTGroup(STQ* pTq, const char* topic, int cgId) {
int code;
tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId);
if(handle == NULL) {
code = tqCreateTCGroup(pTq, topic, cgId, &handle);
if(code != 0) {
return code;
static int tqAck(tqGroupHandle* ghandle, tmqAcks* pAcks) {
int32_t ackNum = pAcks->ackNum;
tmqOneAck *acks = pAcks->acks;
//double ptr for acks and list
int i = 0;
tqListHandle* node = ghandle->head;
int ackCnt = 0;
tqQueryMsg *pQuery = NULL;
while(i < ackNum && node->next) {
if(acks[i].topicId == node->next->bufHandle->topicId) {
ackCnt++;
tqAckOneTopic(node->next->bufHandle, &acks[i], &pQuery);
} else if(acks[i].topicId < node->next->bufHandle->topicId) {
i++;
} else {
node = node->next;
}
}
ASSERT(handle != NULL);
if(pQuery) {
//post message
}
return ackCnt;
}
//put into STQ
static int tqCommitTCGroup(tqGroupHandle* handle) {
//persist modification into disk
return 0;
}
/*int tqCloseTCGroup(STQ* pTq, const char* topic, int cgId) {*/
/*tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId);*/
/*return tqCommitTCGroup(handle);*/
/*}*/
int tqDropTCGroup(STQ* pTq, const char* topic, int cgId) {
//delete from disk
int tqCreateTCGroup(STQ *pTq, int64_t topicId, int64_t cgId, int64_t cId, tqGroupHandle** handle) {
//create in disk
return 0;
}
int tqOpenTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
//look up in disk
//create
//open
return 0;
}
int tqFetchMsg(tqGroupHandle* handle, void* msg) {
int tqCloseTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
return 0;
}
int tqMoveOffsetToNext(tqGroupHandle* handle) {
int tqDropTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
//delete from disk
return 0;
}
static int tqFetch(tqGroupHandle* ghandle, void** msg) {
tqListHandle* head = ghandle->head;
tqListHandle* node = head;
int totSize = 0;
//TODO: make it a macro
int sizeLimit = 4 * 1024;
tmqMsgContent* buffer = malloc(sizeLimit);
if(buffer == NULL) {
//TODO:memory insufficient
return -1;
}
//iterate the list to get msgs of all topics
//until all topic iterated or msgs over sizeLimit
while(node->next) {
node = node->next;
tqBufferHandle* bufHandle = node->bufHandle;
int idx = bufHandle->nextConsumeOffset % TQ_BUFFER_SIZE;
if(bufHandle->buffer[idx].content != NULL &&
bufHandle->buffer[idx].offset == bufHandle->nextConsumeOffset
) {
totSize += bufHandle->buffer[idx].size;
if(totSize > sizeLimit) {
void *ptr = realloc(buffer, totSize);
if(ptr == NULL) {
totSize -= bufHandle->buffer[idx].size;
//TODO:memory insufficient
//return msgs already copied
break;
}
}
*((int64_t*)buffer) = bufHandle->topicId;
buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
*((int64_t*)buffer) = bufHandle->buffer[idx].size;
buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
memcpy(buffer, bufHandle->buffer[idx].content, bufHandle->buffer[idx].size);
buffer = POINTER_SHIFT(buffer, bufHandle->buffer[idx].size);
if(totSize > sizeLimit) {
break;
}
}
}
if(totSize == 0) {
//no msg
return -1;
}
tqGroupHandle* tqFindGHandleBycId(STQ* pTq, int64_t cId) {
return NULL;
return totSize;
}
/*int tqMoveOffsetToNext(tqGroupHandle* ghandle) {*/
/*return 0;*/
/*}*/
int tqPushMsg(STQ* pTq , void* p, int64_t version) {
//add reference
//judge and launch new query
......@@ -91,10 +163,108 @@ int tqCommit(STQ* pTq) {
return 0;
}
int tqHandleConsumeMsg(STQ* pTq, tmqConsumeReq* msg) {
//parse msg and extract topic and cgId
//lookup handle
//confirm message and send to consumer
int tqConsume(STQ* pTq, tmqConsumeReq* pMsg) {
if(!tqProtoCheck((tmqMsgHead *)pMsg)) {
//proto version invalid
return -1;
}
int64_t clientId = pMsg->head.clientId;
tqGroupHandle *ghandle = tqGetGroupHandle(pTq, clientId);
if(ghandle == NULL) {
//client not connect
return -1;
}
if(pMsg->acks.ackNum != 0) {
if(tqAck(ghandle, &pMsg->acks) != 0) {
//ack not success
return -1;
}
}
tmqConsumeRsp *pRsp = (tmqConsumeRsp*) pMsg;
if(tqFetch(ghandle, (void**)&pRsp->msgs) < 0) {
//fetch error
return -1;
}
//judge and launch new query
if(tqLaunchQuery(ghandle)) {
//launch query error
return -1;
}
return 0;
}
int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes, int32_t offset) {
//calculate size
int sz = tqGetGHandleSSize(gHandle);
if(sz <= 0) {
//TODO: err
return -1;
}
void* ptr = realloc(*ppBytes, sz);
if(ptr == NULL) {
free(ppBytes);
//TODO: memory err
return -1;
}
*ppBytes = ptr;
//do serialize
*(int64_t*)ptr = gHandle->cId;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int64_t*)ptr = gHandle->cgId;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int32_t*)ptr = gHandle->topicNum;
if(gHandle->topicNum > 0) {
tqSerializeListHandle(gHandle->head, ppBytes, ptr - *ppBytes);
}
return 0;
}
int tqSerializeListHandle(tqListHandle *listHandle, void** ppBytes, int32_t offset) {
void* ptr = POINTER_SHIFT(*ppBytes, offset);
tqListHandle *node = listHandle;
while(node->next) {
node = node->next;
offset = tqSerializeBufHandle(node->bufHandle, ppBytes, offset);
}
return offset;
}
int tqSerializeBufHandle(tqBufferHandle *bufHandle, void** ppBytes, int32_t offset) {
void *ptr = POINTER_SHIFT(*ppBytes, offset);
*(int64_t*)ptr = bufHandle->nextConsumeOffset;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int64_t*)ptr = bufHandle->topicId;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int32_t*)ptr = bufHandle->head;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
*(int32_t*)ptr = bufHandle->tail;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
for(int i = 0; i < TQ_BUFFER_SIZE; i++) {
int sz = tqSerializeBufItem(&bufHandle->buffer[i], ppBytes, ptr - *ppBytes);
ptr = POINTER_SHIFT(ptr, sz);
}
return ptr - *ppBytes;
}
int tqSerializeBufItem(tqBufferItem *bufItem, void** ppBytes, int32_t offset) {
void *ptr = POINTER_SHIFT(*ppBytes, offset);
//TODO: do we need serialize this?
return 0;
}
int tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle **pGhandle) {
return 0;
}
int tqDeserializeListHandle(const void* pBytes, tqListHandle **pListHandle) {
return 0;
}
int tqDeserializeBufHandle(const void* pBytes, tqBufferHandle **pBufHandle) {
return 0;
}
int tqDeserializeBufItem(const void* pBytes, tqBufferItem **pBufItem) {
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册