提交 9dbb925a 编写于 作者: L Liu Jicong

add serialization and deserialization for tq

上级 e2622c7c
......@@ -135,7 +135,7 @@ typedef struct tqBufferHandle {
} tqBufferHandle;
typedef struct tqListHandle {
tqBufferHandle* bufHandle;
tqBufferHandle bufHandle;
struct tqListHandle* next;
} tqListHandle;
......@@ -176,22 +176,20 @@ int tqMoveOffsetToNext(tqGroupHandle*);
int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset);
int tqRegisterContext(tqGroupHandle*, void*);
int tqLaunchQuery(tqGroupHandle*);
int tqSendLaunchQuery(STQ*, int64_t topicId, int64_t cgId, void* query);
int tqSendLaunchQuery(tqGroupHandle*);
int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes, int32_t offset);
int tqSerializeListHandle(tqListHandle *listHandle, void** ppBytes, int32_t offset);
int tqSerializeBufHandle(tqBufferHandle *bufHandle, void** ppBytes, int32_t offset);
int tqSerializeBufItem(tqBufferItem *bufItem, void** ppBytes, int32_t offset);
int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes);
void* tqSerializeListHandle(tqListHandle *listHandle, void* ptr);
void* tqSerializeBufHandle(tqBufferHandle *bufHandle, void* ptr);
void* tqSerializeBufItem(tqBufferItem *bufItem, void* ptr);
int tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle **pGhandle);
int tqDeserializeListHandle(const void* pBytes, tqListHandle **pListHandle);
int tqDeserializeBufHandle(const void* pBytes, tqBufferHandle **pBufHandle);
int tqDeserializeBufItem(const void* pBytes, tqBufferItem **pBufItem);
const void* tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle *ghandle);
const void* tqDeserializeBufHandle(const void* pBytes, tqBufferHandle *bufHandle);
const void* tqDeserializeBufItem(const void* pBytes, tqBufferItem *bufItem);
int tqGetGHandleSSize(const tqGroupHandle *gHandle);
int tqListHandleSSize(const tqListHandle *listHandle);
int tqBufHandleSSize(const tqBufferHandle *bufHandle);
int tqBufItemSSize(const tqBufferItem *bufItem);
int tqBufHandleSSize();
int tqBufItemSSize();
#ifdef __cplusplus
}
......
......@@ -58,10 +58,10 @@ static int tqAck(tqGroupHandle* ghandle, tmqAcks* pAcks) {
int ackCnt = 0;
tqQueryMsg *pQuery = NULL;
while(i < ackNum && node->next) {
if(acks[i].topicId == node->next->bufHandle->topicId) {
if(acks[i].topicId == node->next->bufHandle.topicId) {
ackCnt++;
tqAckOneTopic(node->next->bufHandle, &acks[i], &pQuery);
} else if(acks[i].topicId < node->next->bufHandle->topicId) {
tqAckOneTopic(&node->next->bufHandle, &acks[i], &pQuery);
} else if(acks[i].topicId < node->next->bufHandle.topicId) {
i++;
} else {
node = node->next;
......@@ -114,7 +114,7 @@ static int tqFetch(tqGroupHandle* ghandle, void** msg) {
//until all topic iterated or msgs over sizeLimit
while(node->next) {
node = node->next;
tqBufferHandle* bufHandle = node->bufHandle;
tqBufferHandle* bufHandle = &node->bufHandle;
int idx = bufHandle->nextConsumeOffset % TQ_BUFFER_SIZE;
if(bufHandle->buffer[idx].content != NULL &&
bufHandle->buffer[idx].offset == bufHandle->nextConsumeOffset
......@@ -140,11 +140,6 @@ static int tqFetch(tqGroupHandle* ghandle, void** msg) {
}
}
}
if(totSize == 0) {
//no msg
return -1;
}
return totSize;
}
......@@ -157,7 +152,7 @@ int tqLaunchQuery(tqGroupHandle* ghandle) {
return 0;
}
int tqSendLaunchQuery(STQ* pTq, int64_t topicId, int64_t cgId, void* query) {
int tqSendLaunchQuery(tqGroupHandle* gHandle) {
return 0;
}
......@@ -196,7 +191,7 @@ int tqConsume(STQ* pTq, tmqConsumeReq* pMsg) {
tmqConsumeRsp *pRsp = (tmqConsumeRsp*) pMsg;
if(tqFetch(ghandle, (void**)&pRsp->msgs) < 0) {
if(tqFetch(ghandle, (void**)&pRsp->msgs) <= 0) {
//fetch error
return -1;
}
......@@ -209,14 +204,9 @@ int tqConsume(STQ* pTq, tmqConsumeReq* pMsg) {
return 0;
}
int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes, int32_t offset) {
int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes) {
//calculate size
int sz = tqGetGHandleSSize(gHandle);
if(sz <= 0) {
//TODO: err
return -1;
}
void* ptr = realloc(*ppBytes, sz);
if(ptr == NULL) {
free(ppBytes);
......@@ -224,29 +214,30 @@ int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes, int32_t offse
return -1;
}
*ppBytes = ptr;
//do serialize
//do serialization
*(int64_t*)ptr = gHandle->cId;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int64_t*)ptr = gHandle->cgId;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int32_t*)ptr = gHandle->topicNum;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
if(gHandle->topicNum > 0) {
tqSerializeListHandle(gHandle->head, ppBytes, ptr - *ppBytes);
tqSerializeListHandle(gHandle->head, ptr);
}
return 0;
}
int tqSerializeListHandle(tqListHandle *listHandle, void** ppBytes, int32_t offset) {
void* ptr = POINTER_SHIFT(*ppBytes, offset);
void* tqSerializeListHandle(tqListHandle *listHandle, void* ptr) {
tqListHandle *node = listHandle;
while(node->next) {
ASSERT(node != NULL);
while(node) {
ptr = tqSerializeBufHandle(&node->bufHandle, ptr);
node = node->next;
offset = tqSerializeBufHandle(node->bufHandle, ppBytes, offset);
}
return offset;
return ptr;
}
int tqSerializeBufHandle(tqBufferHandle *bufHandle, void** ppBytes, int32_t offset) {
void *ptr = POINTER_SHIFT(*ppBytes, offset);
void* tqSerializeBufHandle(tqBufferHandle *bufHandle, void* ptr) {
*(int64_t*)ptr = bufHandle->nextConsumeOffset;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int64_t*)ptr = bufHandle->topicId;
......@@ -256,41 +247,87 @@ int tqSerializeBufHandle(tqBufferHandle *bufHandle, void** ppBytes, int32_t offs
*(int32_t*)ptr = bufHandle->tail;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
for(int i = 0; i < TQ_BUFFER_SIZE; i++) {
int sz = tqSerializeBufItem(&bufHandle->buffer[i], ppBytes, ptr - *ppBytes);
ptr = POINTER_SHIFT(ptr, sz);
ptr = tqSerializeBufItem(&bufHandle->buffer[i], ptr);
}
return ptr - *ppBytes;
return ptr;
}
int tqSerializeBufItem(tqBufferItem *bufItem, void** ppBytes, int32_t offset) {
void *ptr = POINTER_SHIFT(*ppBytes, offset);
void* tqSerializeBufItem(tqBufferItem *bufItem, void* ptr) {
//TODO: do we need serialize this?
return 0;
//mainly for executor
return ptr;
}
int tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle **pGhandle) {
return 0;
}
int tqDeserializeListHandle(const void* pBytes, tqListHandle **pListHandle) {
return 0;
}
int tqDeserializeBufHandle(const void* pBytes, tqBufferHandle **pBufHandle) {
return 0;
const void* tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle *gHandle) {
const void* ptr = pBytes;
gHandle->cId = *(int64_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
gHandle->cgId = *(int64_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
gHandle->ahandle = NULL;
gHandle->topicNum = *(int32_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
gHandle->head = NULL;
tqListHandle *node = gHandle->head;
for(int i = 0; i < gHandle->topicNum; i++) {
if(gHandle->head == NULL) {
if((node = malloc(sizeof(tqListHandle))) == NULL) {
//TODO: error
return NULL;
}
node->next= NULL;
ptr = tqDeserializeBufHandle(ptr, &node->bufHandle);
gHandle->head = node;
} else {
node->next = malloc(sizeof(tqListHandle));
if(node->next == NULL) {
//TODO: error
return NULL;
}
node->next->next = NULL;
ptr = tqDeserializeBufHandle(ptr, &node->next->bufHandle);
node = node->next;
}
}
return ptr;
}
int tqDeserializeBufItem(const void* pBytes, tqBufferItem **pBufItem) {
return 0;
const void* tqDeserializeBufHandle(const void* pBytes, tqBufferHandle *bufHandle) {
const void* ptr = pBytes;
bufHandle->nextConsumeOffset = *(int64_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
bufHandle->topicId = *(int64_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
bufHandle->head = *(int32_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
bufHandle->tail = *(int32_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
for(int i = 0; i < TQ_BUFFER_SIZE; i++) {
ptr = tqDeserializeBufItem(ptr, &bufHandle->buffer[i]);
}
return ptr;
}
const void* tqDeserializeBufItem(const void* pBytes, tqBufferItem *bufItem) {
return pBytes;
}
//TODO: make this a macro
int tqGetGHandleSSize(const tqGroupHandle *gHandle) {
return 0;
return sizeof(int64_t) * 2
+ sizeof(int32_t)
+ gHandle->topicNum * tqBufHandleSSize();
}
int tqListHandleSSize(const tqListHandle *listHandle) {
return 0;
}
int tqBufHandleSSize(const tqBufferHandle *bufHandle) {
return 0;
//TODO: make this a macro
int tqBufHandleSSize() {
return sizeof(int64_t) * 2
+ sizeof(int32_t) * 2
+ TQ_BUFFER_SIZE * tqBufItemSSize();
}
int tqBufItemSSize(const tqBufferItem *bufItem) {
int tqBufItemSSize() {
//TODO: do this need serialization?
//mainly for executor
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册