提交 35348528 编写于 作者: L Liu Jicong

improve mq consume

上级 a1d14338
......@@ -210,26 +210,28 @@ typedef struct tmq_conf_t tmq_conf_t;
typedef struct tmq_list_t tmq_list_t;
typedef struct tmq_message_t tmq_message_t;
typedef void (tmq_commit_cb(tmq_t*, tmq_resp_err_t, tmq_topic_vgroup_list_t*, void* param));
DLL_EXPORT tmq_list_t* tmq_list_new();
DLL_EXPORT int32_t tmq_list_append(tmq_list_t*, char*);
typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *, void *param));
DLL_EXPORT TAOS_RES *taos_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen);
DLL_EXPORT tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen);
DLL_EXPORT tmq_list_t *tmq_list_new();
DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, char *);
DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen);
DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen);
DLL_EXPORT void tmq_message_destroy(tmq_message_t* tmq_message);
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list);
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, tmq_list_t *topic_list);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_topic_vgroup_list_t** topics);
#endif
DLL_EXPORT tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time);
DLL_EXPORT tmq_message_t *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t* tmq);
DLL_EXPORT tmq_resp_err_t tmq_assign(tmq_t* tmq, const tmq_topic_vgroup_list_t* vgroups);
DLL_EXPORT tmq_resp_err_t tmq_assignment(tmq_t* tmq, tmq_topic_vgroup_list_t** vgroups);
#endif
DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async);
DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async);
#endif
......@@ -243,10 +245,13 @@ enum tmq_conf_res_t {
typedef enum tmq_conf_res_t tmq_conf_res_t;
DLL_EXPORT tmq_conf_t* tmq_conf_new();
DLL_EXPORT void tmq_conf_destroy(tmq_conf_t* conf);
DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value);
DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb);
DLL_EXPORT tmq_conf_t *tmq_conf_new();
DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb);
//temporary used function for demo only
void tmqShowMsg(tmq_message_t* tmq_message);
#ifdef __cplusplus
}
......
......@@ -1570,6 +1570,7 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
}
typedef struct SMqSetCVgReq {
int64_t leftForVer;
int32_t vgId;
int64_t oldConsumerId;
int64_t newConsumerId;
......@@ -1604,6 +1605,7 @@ static FORCE_INLINE void* tDecodeSSubQueryMsg(void* buf, SSubQueryMsg* pMsg) {
static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pReq->leftForVer);
tlen += taosEncodeFixedI32(buf, pReq->vgId);
tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId);
tlen += taosEncodeFixedI64(buf, pReq->newConsumerId);
......@@ -1612,12 +1614,13 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq*
tlen += taosEncodeString(buf, pReq->sql);
tlen += taosEncodeString(buf, pReq->logicalPlan);
tlen += taosEncodeString(buf, pReq->physicalPlan);
tlen += taosEncodeString(buf, (char*)pReq->qmsg);
tlen += taosEncodeString(buf, pReq->qmsg);
//tlen += tEncodeSSubQueryMsg(buf, &pReq->msg);
return tlen;
}
static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
buf = taosDecodeFixedI64(buf, &pReq->leftForVer);
buf = taosDecodeFixedI32(buf, &pReq->vgId);
buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId);
buf = taosDecodeFixedI64(buf, &pReq->newConsumerId);
......@@ -1626,7 +1629,7 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
buf = taosDecodeString(buf, &pReq->sql);
buf = taosDecodeString(buf, &pReq->logicalPlan);
buf = taosDecodeString(buf, &pReq->physicalPlan);
buf = taosDecodeString(buf, (char**)&pReq->qmsg);
buf = taosDecodeString(buf, &pReq->qmsg);
//buf = tDecodeSSubQueryMsg(buf, &pReq->msg);
return buf;
}
......
此差异已折叠。
......@@ -583,7 +583,7 @@ TEST(testCase, create_topic_ctb_Test) {
taos_free_result(pRes);
char* sql = "select * from tu";
pRes = taos_create_topic(pConn, "test_ctb_topic_1", sql, strlen(sql));
pRes = tmq_create_topic(pConn, "test_ctb_topic_1", sql, strlen(sql));
taos_free_result(pRes);
taos_close(pConn);
}
......@@ -607,7 +607,7 @@ TEST(testCase, create_topic_stb_Test) {
taos_free_result(pRes);
char* sql = "select * from st1";
pRes = taos_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));
pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));
taos_free_result(pRes);
taos_close(pConn);
}
......@@ -633,11 +633,11 @@ TEST(testCase, tmq_subscribe_ctb_Test) {
while (1) {
tmq_message_t* msg = tmq_consumer_poll(tmq, 1000);
tmq_message_destroy(msg);
//printf("get msg\n");
//if (msg == NULL) break;
}
}
#endif
TEST(testCase, tmq_subscribe_stb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
......@@ -657,10 +657,17 @@ TEST(testCase, tmq_subscribe_stb_Test) {
tmq_list_append(topic_list, "test_stb_topic_1");
tmq_subscribe(tmq, topic_list);
int cnt = 1;
while (1) {
tmq_message_t* msg = tmq_consumer_poll(tmq, 1000);
if (msg == NULL) continue;
tmqShowMsg(msg);
if (cnt++ % 10 == 0){
tmq_commit(tmq, NULL, 0);
}
//tmq_commit(tmq, NULL, 0);
tmq_message_destroy(msg);
//printf("get msg\n");
//if (msg == NULL) break;
}
}
......@@ -669,6 +676,7 @@ TEST(testCase, tmq_consume_Test) {
TEST(testCase, tmq_commit_TEST) {
}
#endif
#if 0
TEST(testCase, projection_query_tables) {
......
......@@ -633,7 +633,7 @@ typedef struct SMqConsumerTopic {
} SMqConsumerTopic;
static FORCE_INLINE SMqConsumerTopic* tNewConsumerTopic(int64_t consumerId, SMqTopicObj* pTopic,
SMqSubscribeObj* pSub) {
SMqSubscribeObj* pSub, int64_t* oldConsumerId) {
SMqConsumerTopic* pCTopic = malloc(sizeof(SMqConsumerTopic));
if (pCTopic == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -646,6 +646,7 @@ static FORCE_INLINE SMqConsumerTopic* tNewConsumerTopic(int64_t consumerId, SMqT
int32_t unassignedVgSz = taosArrayGetSize(pSub->unassignedVg);
if (unassignedVgSz > 0) {
SMqConsumerEp* pCEp = taosArrayPop(pSub->unassignedVg);
*oldConsumerId = pCEp->consumerId;
pCEp->consumerId = consumerId;
taosArrayPush(pCTopic->pVgInfo, &pCEp->vgId);
taosArrayPush(pSub->assigned, pCEp);
......
......@@ -48,7 +48,8 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg);
static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg);
static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer,
SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic, SMqConsumerEp *pSub);
SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic, SMqConsumerEp *pSub,
int64_t oldConsumerId);
int32_t mndInitSubscribe(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_SUBSCRIBE,
......@@ -166,7 +167,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
if (pCEp->lastConsumerHbTs != -1 && currentTs - pCEp->lastConsumerHbTs > MND_SUBSCRIBE_REBALANCE_MS) {
// put consumer into lostConsumer
taosArrayPush(pSub->lostConsumer, pCEp);
// put vg into unassgined
// put vg into unassigned
taosArrayPush(pSub->unassignedVg, pCEp);
// remove from assigned
// TODO: swap with last one, reduce size and reset i
......@@ -202,6 +203,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
for (int32_t i = 0; i < sz; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pSub->availConsumer, pSub->nextConsumerIdx);
SMqConsumerEp *pCEp = taosArrayPop(pSub->unassignedVg);
int64_t oldConsumerId = pCEp->consumerId;
pCEp->consumerId = consumerId;
taosArrayPush(pSub->assigned, pCEp);
pSub->nextConsumerIdx = (pSub->nextConsumerIdx + 1) % taosArrayGetSize(pSub->availConsumer);
......@@ -222,7 +224,9 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
req.logicalPlan = pTopic->logicalPlan;
req.physicalPlan = pTopic->physicalPlan;
req.qmsg = pCEp->qmsg;
req.oldConsumerId = oldConsumerId;
req.newConsumerId = consumerId;
req.vgId = pCEp->vgId;
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
void *buf = malloc(sizeof(SMsgHead) + tlen);
if (buf == NULL) {
......@@ -237,6 +241,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
tEncodeSMqSetCVgReq(&abuf, &req);
// persist msg
// TODO: no need for txn
STransAction action = {0};
action.epSet = pCEp->epSet;
action.pCont = buf;
......@@ -303,13 +308,12 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
ASSERT(CEp.vgId == pVgroup->vgId);
CEp.qmsg = strdup(pTaskInfo->msg->msg);
taosArrayPush(unassignedVg, &CEp);
//TODO: free taskInfo
// TODO: free taskInfo
taosArrayDestroy(pArray);
/*SEpSet *pEpSet = &plan->execNode.epset;*/
/*pEpSet->inUse = 0;*/
/*addEpIntoEpSet(pEpSet, "localhost", 6030);*/
}
/*qDestroyQueryDag(pDag);*/
......@@ -317,14 +321,15 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
}
static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer,
SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic, SMqConsumerEp *pCEp) {
SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic, SMqConsumerEp *pCEp,
int64_t oldConsumerId) {
int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo);
for (int32_t i = 0; i < sz; i++) {
int32_t vgId = *(int32_t *)taosArrayGet(pConsumerTopic->pVgInfo, i);
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
SMqSetCVgReq req = {
.vgId = vgId,
.oldConsumerId = -1,
.oldConsumerId = oldConsumerId,
.newConsumerId = pConsumer->consumerId,
};
strcpy(req.cgroup, pConsumer->cgroup);
......@@ -459,10 +464,6 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubsc
return 0;
}
static void *mndBuildMqVGroupSetReq(SMnode *pMnode, char *topicName, int32_t vgId, int64_t consumerId, char *cgroup) {
return 0;
}
static char *mndMakeSubscribeKey(char *cgroup, char *topicName) {
char *key = malloc(TSDB_SHOW_SUBQUERY_LEN);
if (key == NULL) {
......@@ -642,7 +643,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
free(key);
// set unassigned vg
if (mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg) < 0) {
//TODO: free memory
// TODO: free memory
return -1;
}
// TODO: disable alter
......@@ -650,7 +651,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
}
taosArrayPush(pSub->availConsumer, &consumerId);
SMqConsumerTopic *pConsumerTopic = tNewConsumerTopic(consumerId, pTopic, pSub);
int64_t oldConsumerId;
SMqConsumerTopic *pConsumerTopic = tNewConsumerTopic(consumerId, pTopic, pSub, &oldConsumerId);
taosArrayPush(pConsumer->topics, pConsumerTopic);
if (taosArrayGetSize(pConsumerTopic->pVgInfo) > 0) {
......@@ -658,7 +660,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
int32_t vgId = *(int32_t *)taosArrayGetLast(pConsumerTopic->pVgInfo);
SMqConsumerEp *pCEp = taosArrayGetLast(pSub->assigned);
if (pCEp->vgId == vgId) {
if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic, pTopic, pCEp) < 0) {
if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic, pTopic, pCEp, oldConsumerId) < 0) {
// TODO
return -1;
}
......
......@@ -40,6 +40,7 @@ int32_t tqHandleCopyPut(STqMetaStore*, int64_t key, void* value, size_t vsize);
// delete committed kv pair
// notice that a delete action still needs to be committed
int32_t tqHandleDel(STqMetaStore*, int64_t key);
int32_t tqHandlePurge(STqMetaStore*, int64_t key);
int32_t tqHandleCommit(STqMetaStore*, int64_t key);
int32_t tqHandleAbort(STqMetaStore*, int64_t key);
......
......@@ -25,17 +25,6 @@
// handle management message
//
int tqGroupSSize(const STqGroup* pGroup);
int tqTopicSSize();
int tqItemSSize();
void* tqSerializeListHandle(STqList* listHandle, void* ptr);
void* tqSerializeTopic(STqTopic* pTopic, void* ptr);
void* tqSerializeItem(STqMsgItem* pItem, void* ptr);
const void* tqDeserializeTopic(const void* pBytes, STqTopic* pTopic);
const void* tqDeserializeItem(const void* pBytes, STqMsgItem* pItem);
int tqInit() {
int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 1);
if (old == 1) return 0;
......@@ -88,177 +77,6 @@ void tqClose(STQ* pTq) {
// TODO
}
static int tqProtoCheck(STqMsgHead* pMsg) {
// TODO
return pMsg->protoVer == 0;
}
static int tqAckOneTopic(STqTopic* pTopic, STqOneAck* pAck, STqQueryMsg** ppQuery) {
// clean old item and move forward
int32_t consumeOffset = pAck->consumeOffset;
int idx = consumeOffset % TQ_BUFFER_SIZE;
ASSERT(pTopic->buffer[idx].content && pTopic->buffer[idx].executor);
tfree(pTopic->buffer[idx].content);
if (1 /* TODO: need to launch new query */) {
STqQueryMsg* pNewQuery = malloc(sizeof(STqQueryMsg));
if (pNewQuery == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return -1;
}
// TODO: lock executor
// TODO: read from wal and assign to src
/*pNewQuery->exec->executor = pTopic->buffer[idx].executor;*/
/*pNewQuery->exec->src = 0;*/
/*pNewQuery->exec->dest = &pTopic->buffer[idx];*/
/*pNewQuery->next = *ppQuery;*/
/**ppQuery = pNewQuery;*/
}
return 0;
}
static int tqAck(STqGroup* pGroup, STqAcks* pAcks) {
int32_t ackNum = pAcks->ackNum;
STqOneAck* acks = pAcks->acks;
// double ptr for acks and list
int i = 0;
STqList* node = pGroup->head;
int ackCnt = 0;
STqQueryMsg* pQuery = NULL;
while (i < ackNum && node->next) {
if (acks[i].topicId == node->next->topic.topicId) {
ackCnt++;
tqAckOneTopic(&node->next->topic, &acks[i], &pQuery);
} else if (acks[i].topicId < node->next->topic.topicId) {
i++;
} else {
node = node->next;
}
}
if (pQuery) {
// post message
}
return ackCnt;
}
static int tqCommitGroup(STqGroup* pGroup) {
// persist modification into disk
return 0;
}
int tqCreateGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId, STqGroup** ppGroup) {
// create in disk
STqGroup* pGroup = (STqGroup*)malloc(sizeof(STqGroup));
if (pGroup == NULL) {
// TODO
return -1;
}
*ppGroup = pGroup;
memset(pGroup, 0, sizeof(STqGroup));
pGroup->topicList = tdListNew(sizeof(STqTopic));
if (pGroup->topicList == NULL) {
free(pGroup);
return -1;
}
*ppGroup = pGroup;
return 0;
}
STqGroup* tqOpenGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
STqGroup* pGroup = tqHandleGet(pTq->tqMeta, cId);
if (pGroup == NULL) {
int code = tqCreateGroup(pTq, topicId, cgId, cId, &pGroup);
if (code < 0) {
// TODO
return NULL;
}
tqHandleMovePut(pTq->tqMeta, cId, pGroup);
}
ASSERT(pGroup);
return pGroup;
}
int tqCloseGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
// TODO
return 0;
}
int tqDropGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
// delete from disk
return 0;
}
static int tqFetch(STqGroup* pGroup, STqConsumeRsp** pRsp) {
STqList* pHead = pGroup->head;
STqList* pNode = pHead;
int totSize = 0;
int numOfMsgs = 0;
// TODO: make it a macro
int sizeLimit = 4 * 1024;
void* ptr = realloc(*pRsp, sizeof(STqConsumeRsp) + sizeLimit);
if (ptr == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return -1;
}
*pRsp = ptr;
STqMsgContent* buffer = (*pRsp)->msgs;
// iterate the list to get msgs of all topics
// until all topic iterated or msgs over sizeLimit
while (pNode->next) {
pNode = pNode->next;
STqTopic* pTopic = &pNode->topic;
int idx = pTopic->nextConsumeOffset % TQ_BUFFER_SIZE;
if (pTopic->buffer[idx].content != NULL && pTopic->buffer[idx].offset == pTopic->nextConsumeOffset) {
totSize += pTopic->buffer[idx].size;
if (totSize > sizeLimit) {
void* ptr = realloc(*pRsp, sizeof(STqConsumeRsp) + totSize);
if (ptr == NULL) {
totSize -= pTopic->buffer[idx].size;
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
// return msgs already copied
break;
}
*pRsp = ptr;
break;
}
*((int64_t*)buffer) = pTopic->topicId;
buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
*((int64_t*)buffer) = pTopic->buffer[idx].size;
buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
memcpy(buffer, pTopic->buffer[idx].content, pTopic->buffer[idx].size);
buffer = POINTER_SHIFT(buffer, pTopic->buffer[idx].size);
numOfMsgs++;
if (totSize > sizeLimit) {
break;
}
}
}
(*pRsp)->bodySize = totSize;
return numOfMsgs;
}
STqGroup* tqGetGroup(STQ* pTq, int64_t clientId) { return tqHandleGet(pTq->tqMeta, clientId); }
int tqSendLaunchQuery(STqMsgItem* bufItem, int64_t offset) {
if (tqQueryExecuting(bufItem->status)) {
return 0;
}
bufItem->status = 1;
// load data from wal or buffer pool
// put into exec
// send exec into non blocking queue
// when query finished, put into buffer pool
return 0;
}
/*int tqMoveOffsetToNext(TqGroupHandle* gHandle) {*/
/*return 0;*/
/*}*/
int tqPushMsg(STQ* pTq, void* p, int64_t version) {
// add reference
// judge and launch new query
......@@ -270,218 +88,6 @@ int tqCommit(STQ* pTq) {
return 0;
}
int tqBufferSetOffset(STqTopic* pTopic, int64_t offset) {
int code;
memset(pTopic->buffer, 0, sizeof(pTopic->buffer));
// launch query
for (int i = offset; i < offset + TQ_BUFFER_SIZE; i++) {
int pos = i % TQ_BUFFER_SIZE;
code = tqSendLaunchQuery(&pTopic->buffer[pos], offset);
if (code < 0) {
// TODO: error handling
}
}
// set offset
pTopic->nextConsumeOffset = offset;
pTopic->floatingCursor = offset;
return 0;
}
STqTopic* tqFindTopic(STqGroup* pGroup, int64_t topicId) {
// TODO
return NULL;
}
int tqSetCursor(STQ* pTq, STqSetCurReq* pMsg) {
int code;
int64_t clientId = pMsg->head.clientId;
int64_t topicId = pMsg->topicId;
int64_t offset = pMsg->offset;
STqGroup* gHandle = tqGetGroup(pTq, clientId);
if (gHandle == NULL) {
// client not connect
return -1;
}
STqTopic* topicHandle = tqFindTopic(gHandle, topicId);
if (topicHandle == NULL) {
return -1;
}
if (pMsg->offset == topicHandle->nextConsumeOffset) {
return 0;
}
// TODO: check log last version
code = tqBufferSetOffset(topicHandle, offset);
if (code < 0) {
// set error code
return -1;
}
return 0;
}
// temporary
int tqProcessCMsg(STQ* pTq, STqConsumeReq* pMsg, STqRspHandle* pRsp) {
int64_t clientId = pMsg->head.clientId;
STqGroup* pGroup = tqGetGroup(pTq, clientId);
if (pGroup == NULL) {
terrno = TSDB_CODE_TQ_GROUP_NOT_SET;
return -1;
}
pGroup->rspHandle.handle = pRsp->handle;
pGroup->rspHandle.ahandle = pRsp->ahandle;
return 0;
}
int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) {
STqConsumeReq* pMsg = pReq->pCont;
int64_t clientId = pMsg->head.clientId;
STqGroup* pGroup = tqGetGroup(pTq, clientId);
if (pGroup == NULL) {
terrno = TSDB_CODE_TQ_GROUP_NOT_SET;
return -1;
}
SList* topicList = pGroup->topicList;
int totSize = 0;
int numOfMsgs = 0;
int sizeLimit = 4096;
STqConsumeRsp* pCsmRsp = (*pRsp)->pCont;
void* ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + sizeLimit);
if (ptr == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return -1;
}
(*pRsp)->pCont = ptr;
SListIter iter;
tdListInitIter(topicList, &iter, TD_LIST_FORWARD);
STqMsgContent* buffer = NULL;
SArray* pArray = taosArrayInit(0, sizeof(void*));
SListNode* pn;
while ((pn = tdListNext(&iter)) != NULL) {
STqTopic* pTopic = *(STqTopic**)pn->data;
int idx = pTopic->floatingCursor % TQ_BUFFER_SIZE;
STqMsgItem* pItem = &pTopic->buffer[idx];
if (pItem->content != NULL && pItem->offset == pTopic->floatingCursor) {
if (pItem->status == TQ_ITEM_READY) {
// if has data
totSize += pTopic->buffer[idx].size;
if (totSize > sizeLimit) {
void* ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + totSize);
if (ptr == NULL) {
totSize -= pTopic->buffer[idx].size;
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
// return msgs already copied
break;
}
(*pRsp)->pCont = ptr;
break;
}
*((int64_t*)buffer) = htobe64(pTopic->topicId);
buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
*((int64_t*)buffer) = htobe64(pTopic->buffer[idx].size);
buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
memcpy(buffer, pTopic->buffer[idx].content, pTopic->buffer[idx].size);
buffer = POINTER_SHIFT(buffer, pTopic->buffer[idx].size);
numOfMsgs++;
if (totSize > sizeLimit) {
break;
}
} else if (pItem->status == TQ_ITEM_PROCESS) {
// if not have data but in process
} else if (pItem->status == TQ_ITEM_EMPTY) {
// if not have data and not in process
int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_EMPTY, TQ_ITEM_PROCESS);
if (old != TQ_ITEM_EMPTY) {
continue;
}
pItem->offset = pTopic->floatingCursor;
taosArrayPush(pArray, &pItem);
} else {
ASSERT(0);
}
}
}
if (numOfMsgs > 0) {
// set code and other msg
rpcSendResponse(*pRsp);
} else {
// most recent data has been fetched
// enable timer for blocking wait
// once new data written when waiting, launch query and rsp
}
// fetched a num of msgs, rpc response
for (int i = 0; i < pArray->size; i++) {
STqMsgItem* pItem = taosArrayGet(pArray, i);
// read from wal
void* raw = NULL;
/*int code = pTq->tqLogReader->logRead(, &raw, pItem->offset);*/
/*int code = pTq->tqLogHandle->logRead(pItem->pTopic->logReader, &raw, pItem->offset);*/
/*if (code < 0) {*/
// TODO: error
/*}*/
// get msgType
// if submitblk
pItem->executor->assign(pItem->executor->runtimeEnv, raw);
SSDataBlock* content = pItem->executor->exec(pItem->executor->runtimeEnv);
pItem->content = content;
// if other type, send just put into buffer
/*pItem->content = raw;*/
int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_PROCESS, TQ_ITEM_READY);
ASSERT(old == TQ_ITEM_PROCESS);
}
taosArrayDestroy(pArray);
return 0;
}
#if 0
int tqConsume(STQ* pTq, STqConsumeReq* pMsg) {
if (!tqProtoCheck((STqMsgHead*)pMsg)) {
// proto version invalid
return -1;
}
int64_t clientId = pMsg->head.clientId;
STqGroup* pGroup = tqGetGroup(pTq, clientId);
if (pGroup == NULL) {
// client not connect
return -1;
}
if (pMsg->acks.ackNum != 0) {
if (tqAck(pGroup, &pMsg->acks) != 0) {
// ack not success
return -1;
}
}
STqConsumeRsp* pRsp = (STqConsumeRsp*)pMsg;
if (tqFetch(pGroup, (void**)&pRsp->msgs) <= 0) {
// fetch error
return -1;
}
// judge and launch new query
/*if (tqSendLaunchQuery(gHandle)) {*/
// launch query error
/*return -1;*/
/*}*/
return 0;
}
#endif
int tqSerializeConsumer(const STqConsumerHandle* pConsumer, STqSerializedHead** ppHead) {
int32_t num = taosArrayGetSize(pConsumer->topics);
int32_t sz = sizeof(STqSerializedHead) + sizeof(int64_t) * 2 + TSDB_TOPIC_FNAME_LEN +
......@@ -535,138 +141,6 @@ const void* tqDeserializeConsumer(const STqSerializedHead* pHead, STqConsumerHan
return NULL;
}
#if 0
int tqSerializeGroup(const STqGroup* pGroup, STqSerializedHead** ppHead) {
// calculate size
int sz = tqGroupSSize(pGroup) + sizeof(STqSerializedHead);
if (sz > (*ppHead)->ssize) {
void* tmpPtr = realloc(*ppHead, sz);
if (tmpPtr == NULL) {
free(*ppHead);
// TODO: memory err
return -1;
}
*ppHead = tmpPtr;
(*ppHead)->ssize = sz;
}
void* ptr = (*ppHead)->content;
// do serialization
*(int64_t*)ptr = pGroup->clientId;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int64_t*)ptr = pGroup->cgId;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int32_t*)ptr = pGroup->topicNum;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
if (pGroup->topicNum > 0) {
tqSerializeListHandle(pGroup->head, ptr);
}
return 0;
}
void* tqSerializeListHandle(STqList* listHandle, void* ptr) {
STqList* node = listHandle;
ASSERT(node != NULL);
while (node) {
ptr = tqSerializeTopic(&node->topic, ptr);
node = node->next;
}
return ptr;
}
void* tqSerializeTopic(STqTopic* pTopic, void* ptr) {
*(int64_t*)ptr = pTopic->nextConsumeOffset;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int64_t*)ptr = pTopic->topicId;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
/**(int32_t*)ptr = pTopic->head;*/
/*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
/**(int32_t*)ptr = pTopic->tail;*/
/*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
ptr = tqSerializeItem(&pTopic->buffer[i], ptr);
}
return ptr;
}
void* tqSerializeItem(STqMsgItem* bufItem, void* ptr) {
// TODO: do we need serialize this?
// mainly for executor
return ptr;
}
const void* tqDeserializeGroup(const STqSerializedHead* pHead, STqGroup** ppGroup) {
STqGroup* gHandle = *ppGroup;
const void* ptr = pHead->content;
gHandle->clientId = *(int64_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
gHandle->cgId = *(int64_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
gHandle->ahandle = NULL;
gHandle->topicNum = *(int32_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
gHandle->head = NULL;
STqList* node = gHandle->head;
for (int i = 0; i < gHandle->topicNum; i++) {
if (gHandle->head == NULL) {
if ((node = malloc(sizeof(STqList))) == NULL) {
// TODO: error
return NULL;
}
node->next = NULL;
ptr = tqDeserializeTopic(ptr, &node->topic);
gHandle->head = node;
} else {
node->next = malloc(sizeof(STqList));
if (node->next == NULL) {
// TODO: error
return NULL;
}
node->next->next = NULL;
ptr = tqDeserializeTopic(ptr, &node->next->topic);
node = node->next;
}
}
return ptr;
}
const void* tqDeserializeTopic(const void* pBytes, STqTopic* topic) {
const void* ptr = pBytes;
topic->nextConsumeOffset = *(int64_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
topic->topicId = *(int64_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
/*topic->head = *(int32_t*)ptr;*/
/*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
/*topic->tail = *(int32_t*)ptr;*/
/*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
ptr = tqDeserializeItem(ptr, &topic->buffer[i]);
}
return ptr;
}
const void* tqDeserializeItem(const void* pBytes, STqMsgItem* bufItem) { return pBytes; }
// TODO: make this a macro
int tqGroupSSize(const STqGroup* gHandle) {
return sizeof(int64_t) * 2 // cId + cgId
+ sizeof(int32_t) // topicNum
+ gHandle->topicNum * tqTopicSSize();
}
// TODO: make this a macro
int tqTopicSSize() {
return sizeof(int64_t) * 2 // nextConsumeOffset + topicId
+ sizeof(int32_t) * 2 // head + tail
+ TQ_BUFFER_SIZE * tqItemSSize();
}
int tqItemSSize() {
// TODO: do this need serialization?
// mainly for executor
return 0;
}
#endif
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
SMqConsumeReq* pReq = pMsg->pCont;
......@@ -675,7 +149,6 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
int64_t fetchOffset = pReq->offset;
int64_t blockingTime = pReq->blockingTime;
int rspLen = 0;
SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL};
STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
......@@ -694,11 +167,25 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
if (strcmp(pTopic->topicName, pReq->topic) != 0) {
continue;
}
if (pReq->reqType == TMQ_REQ_TYPE_COMMIT_ONLY) {
pTopic->committedOffset = pReq->offset;
pMsg->pCont = NULL;
pMsg->contLen = 0;
pMsg->code = 0;
rpcSendResponse(pMsg);
return 0;
}
if (pReq->reqType == TMQ_REQ_TYPE_CONSUME_AND_COMMIT) {
pTopic->committedOffset = pReq->offset-1;
}
rsp.committedOffset = pTopic->committedOffset;
rsp.reqOffset = pReq->offset;
rsp.skipLogNum = 0;
if (fetchOffset == -1) {
if (fetchOffset <= pTopic->committedOffset) {
fetchOffset = pTopic->committedOffset + 1;
}
int8_t pos;
......@@ -802,12 +289,23 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
}
int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
SMqSetCVgReq req;
SMqSetCVgReq req = {0};
tDecodeSMqSetCVgReq(msg, &req);
STqConsumerHandle* pConsumer = calloc(sizeof(STqConsumerHandle), 1);
STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId);
if (pConsumer == NULL) {
pConsumer = calloc(sizeof(STqConsumerHandle), 1);
if (pConsumer == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return -1;
}
} else {
tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer);
tqHandleCommit(pTq->tqMeta, req.newConsumerId);
tqHandlePurge(pTq->tqMeta, req.oldConsumerId);
terrno = TSDB_CODE_SUCCESS;
return 0;
}
strcpy(pConsumer->cgroup, req.cgroup);
pConsumer->topics = taosArrayInit(0, sizeof(STqTopicHandle));
pConsumer->consumerId = req.newConsumerId;
......@@ -819,9 +317,9 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
return -1;
}
strcpy(pTopic->topicName, req.topicName);
pTopic->sql = strdup(req.sql);
pTopic->logicalPlan = strdup(req.logicalPlan);
pTopic->physicalPlan = strdup(req.physicalPlan);
pTopic->sql = req.sql;
pTopic->logicalPlan = req.logicalPlan;
pTopic->physicalPlan = req.physicalPlan;
pTopic->committedOffset = -1;
pTopic->currentOffset = -1;
......
......@@ -584,6 +584,7 @@ int32_t tqHandleDel(STqMetaStore* pMeta, int64_t key) {
int64_t bucketKey = key & TQ_BUCKET_MASK;
STqMetaList* pNode = pMeta->bucket[bucketKey];
while (pNode) {
if (pNode->handle.key == key) {
if (pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
if (pNode->handle.valueInTxn) {
pMeta->pDeleter(pNode->handle.valueInTxn);
......@@ -592,6 +593,23 @@ int32_t tqHandleDel(STqMetaStore* pMeta, int64_t key) {
pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
tqLinkUnpersist(pMeta, pNode);
return 0;
}
} else {
pNode = pNode->next;
}
}
terrno = TSDB_CODE_TQ_META_NO_SUCH_KEY;
return -1;
}
int32_t tqHandlePurge(STqMetaStore* pMeta, int64_t key) {
int64_t bucketKey = key & TQ_BUCKET_MASK;
STqMetaList* pNode = pMeta->bucket[bucketKey];
while (pNode) {
if (pNode->handle.key == key) {
pNode->handle.valueInUse = TQ_DELETE_TOKEN;
tqLinkUnpersist(pMeta, pNode);
return 0;
} else {
pNode = pNode->next;
}
......
......@@ -16,6 +16,7 @@
#include "tq.h"
#include "vnd.h"
#if 0
int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) {
switch (pMsg->msgType) {
case TDMT_VND_MQ_SET_CUR:
......@@ -26,6 +27,7 @@ int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) {
}
return 0;
}
#endif
int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
SRpcMsg *pMsg;
......
......@@ -169,10 +169,17 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
return -1;
}
ASSERT(pRead->pHead->head.version == ver);
if (pRead->pHead->head.version != ver) {
/*wError("unexpected wal log version: %ld, read request version:%ld", pRead->pHead->head.version, ver);*/
pRead->curVersion = -1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
code = walValidBodyCksum(pRead->pHead);
if (code != 0) {
/*wError("unexpected wal log version: checksum not passed");*/
pRead->curVersion = -1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册