未验证 提交 3834aefa 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #10014 from taosdata/feature/tq

fix query crash
...@@ -1241,6 +1241,12 @@ typedef struct { ...@@ -1241,6 +1241,12 @@ typedef struct {
char data[]; char data[];
} SVShowTablesFetchRsp; } SVShowTablesFetchRsp;
typedef struct SMqCMGetSubEpReq {
int64_t consumerId;
int32_t epoch;
char cgroup[TSDB_CONSUMER_GROUP_LEN];
} SMqCMGetSubEpReq;
#pragma pack(pop) #pragma pack(pop)
static FORCE_INLINE int32_t tEncodeSMsgHead(void** buf, const SMsgHead* pMsg) { static FORCE_INLINE int32_t tEncodeSMsgHead(void** buf, const SMsgHead* pMsg) {
...@@ -1562,7 +1568,7 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* ...@@ -1562,7 +1568,7 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq*
tlen += taosEncodeString(buf, pReq->logicalPlan); tlen += taosEncodeString(buf, pReq->logicalPlan);
tlen += taosEncodeString(buf, pReq->physicalPlan); tlen += taosEncodeString(buf, pReq->physicalPlan);
tlen += taosEncodeFixedU32(buf, pReq->qmsgLen); tlen += taosEncodeFixedU32(buf, pReq->qmsgLen);
tlen += taosEncodeBinary(buf, pReq->qmsg, pReq->qmsgLen); tlen += taosEncodeString(buf, (char*)pReq->qmsg);
//tlen += tEncodeSSubQueryMsg(buf, &pReq->msg); //tlen += tEncodeSSubQueryMsg(buf, &pReq->msg);
return tlen; return tlen;
} }
...@@ -1577,7 +1583,7 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { ...@@ -1577,7 +1583,7 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
buf = taosDecodeString(buf, &pReq->logicalPlan); buf = taosDecodeString(buf, &pReq->logicalPlan);
buf = taosDecodeString(buf, &pReq->physicalPlan); buf = taosDecodeString(buf, &pReq->physicalPlan);
buf = taosDecodeFixedU32(buf, &pReq->qmsgLen); buf = taosDecodeFixedU32(buf, &pReq->qmsgLen);
buf = taosDecodeBinary(buf, &pReq->qmsg, pReq->qmsgLen); buf = taosDecodeString(buf, (char**)&pReq->qmsg);
//buf = tDecodeSSubQueryMsg(buf, &pReq->msg); //buf = tDecodeSSubQueryMsg(buf, &pReq->msg);
return buf; return buf;
} }
...@@ -1639,6 +1645,92 @@ typedef struct SMqConsumeReq { ...@@ -1639,6 +1645,92 @@ typedef struct SMqConsumeReq {
char topic[TSDB_TOPIC_FNAME_LEN]; char topic[TSDB_TOPIC_FNAME_LEN];
} SMqConsumeReq; } SMqConsumeReq;
typedef struct SMqSubVgEp {
int32_t vgId;
SEpSet epSet;
} SMqSubVgEp;
typedef struct SMqSubTopicEp {
char topic[TSDB_TOPIC_FNAME_LEN];
SArray* vgs; // SArray<SMqSubVgEp>
} SMqSubTopicEp;
typedef struct SMqCMGetSubEpRsp {
int64_t consumerId;
char cgroup[TSDB_CONSUMER_GROUP_LEN];
SArray* topics; // SArray<SMqSubTopicEp>
} SMqCMGetSubEpRsp;
static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI16(buf, pVgEp->vgId);
tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
return tlen;
}
static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) {
buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
return buf;
}
static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pTopicEp->topic);
int32_t sz = taosArrayGetSize(pTopicEp->vgs);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqSubVgEp* pVgEp = (SMqSubVgEp*)taosArrayGet(pTopicEp->vgs, i);
tlen += tEncodeSMqSubVgEp(buf, pVgEp);
}
return tlen;
}
static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicEp) {
buf = taosDecodeStringTo(buf, pTopicEp->topic);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pTopicEp->vgs = taosArrayInit(sz, sizeof(SMqSubVgEp));
if (pTopicEp->vgs == NULL) {
return NULL;
}
for (int32_t i = 0; i < sz; i++) {
SMqSubVgEp vgEp;
buf = tDecodeSMqSubVgEp(buf, &vgEp);
taosArrayPush(pTopicEp->vgs, &vgEp);
}
return buf;
}
static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSubEpRsp* pRsp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pRsp->consumerId);
tlen += taosEncodeString(buf, pRsp->cgroup);
int32_t sz = taosArrayGetSize(pRsp->topics);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqSubTopicEp* pVgEp = (SMqSubTopicEp*)taosArrayGet(pRsp->topics, i);
tlen += tEncodeSMqSubTopicEp(buf, pVgEp);
}
return tlen;
}
static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* pRsp) {
buf = taosDecodeFixedI64(buf, &pRsp->consumerId);
buf = taosDecodeStringTo(buf, pRsp->cgroup);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pRsp->topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
if (pRsp->topics == NULL) {
return NULL;
}
for (int32_t i = 0; i < sz; i++) {
SMqSubTopicEp topicEp;
buf = tDecodeSMqSubTopicEp(buf, &topicEp);
taosArrayPush(pRsp->topics, &topicEp);
}
return buf;
}
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -140,6 +140,7 @@ enum { ...@@ -140,6 +140,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-timer", SMqTmrMsg, SMqTmrMsg) TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-timer", SMqTmrMsg, SMqTmrMsg)
// Requests handled by VNODE // Requests handled by VNODE
......
...@@ -263,7 +263,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) ...@@ -263,7 +263,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList)
typedef struct SMqClientVg { typedef struct SMqClientVg {
// statistics // statistics
int64_t consumeCnt; int64_t pollCnt;
// offset // offset
int64_t committedOffset; int64_t committedOffset;
int64_t currentOffset; int64_t currentOffset;
...@@ -345,6 +345,7 @@ tmq_t* taos_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t err ...@@ -345,6 +345,7 @@ tmq_t* taos_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t err
strcpy(pTmq->groupId, conf->groupId); strcpy(pTmq->groupId, conf->groupId);
pTmq->commit_cb = conf->commit_cb; pTmq->commit_cb = conf->commit_cb;
pTmq->consumerId = generateRequestId() & ((uint64_t)-1 >> 1); pTmq->consumerId = generateRequestId() & ((uint64_t)-1 >> 1);
pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
return pTmq; return pTmq;
} }
...@@ -411,13 +412,12 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { ...@@ -411,13 +412,12 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
tSerializeSCMSubscribeReq(&abuf, &req); tSerializeSCMSubscribeReq(&abuf, &req);
/*printf("formatted: %s\n", dagStr);*/ /*printf("formatted: %s\n", dagStr);*/
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TSDB_SQL_SELECT); pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_SUBSCRIBE);
if (pRequest == NULL) { if (pRequest == NULL) {
tscError("failed to malloc sqlObj"); tscError("failed to malloc sqlObj");
} }
pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen }; pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
pRequest->type = TDMT_MND_SUBSCRIBE;
SMsgSendInfo* body = buildMsgInfoImpl(pRequest); SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
...@@ -596,6 +596,37 @@ struct tmq_message_t { ...@@ -596,6 +596,37 @@ struct tmq_message_t {
SMqConsumeRsp rsp; SMqConsumeRsp rsp;
}; };
int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) {
return 0;
}
int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) {
tmq_t* tmq = (tmq_t*)param;
SMqCMGetSubEpRsp rsp;
tDecodeSMqCMGetSubEpRsp(pMsg->pData, &rsp);
int32_t sz = taosArrayGetSize(rsp.topics);
// TODO: lock
tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic));
for (int32_t i = 0; i < sz; i++) {
SMqClientTopic topic = {0};
SMqSubTopicEp* pTopicEp = taosArrayGet(rsp.topics, i);
topic.topicName = strdup(pTopicEp->topic);
int32_t vgSz = taosArrayGetSize(pTopicEp->vgs);
topic.vgs = taosArrayInit(vgSz, sizeof(SMqClientVg));
for (int32_t j = 0; j < vgSz; j++) {
SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
SMqClientVg clientVg = {
.vgId = pVgEp->vgId,
.epSet = pVgEp->epSet
};
taosArrayPush(topic.vgs, &clientVg);
}
taosArrayPush(tmq->clientTopics, &topic);
}
// unlock
return 0;
}
tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
if (tmq->clientTopics == NULL || taosArrayGetSize(tmq->clientTopics) == 0) { if (tmq->clientTopics == NULL || taosArrayGetSize(tmq->clientTopics) == 0) {
return NULL; return NULL;
...@@ -605,9 +636,38 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { ...@@ -605,9 +636,38 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
req.reqType = 1; req.reqType = 1;
req.blockingTime = blocking_time; req.blockingTime = blocking_time;
req.consumerId = tmq->consumerId; req.consumerId = tmq->consumerId;
tmq_message_t* tmq_message = NULL;
strcpy(req.cgroup, tmq->groupId); strcpy(req.cgroup, tmq->groupId);
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx); if (taosArrayGetSize(tmq->clientTopics) == 0) {
int32_t tlen = sizeof(SMqCMGetSubEpReq);
SMqCMGetSubEpReq* buf = malloc(tlen);
if (buf == NULL) {
tscError("failed to malloc get subscribe ep buf");
}
buf->consumerId = htobe64(buf->consumerId);
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_GET_SUB_EP);
if (pRequest == NULL) {
tscError("failed to malloc subscribe ep request");
}
pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->requestObjRefId = 0;
sendInfo->param = tmq;
sendInfo->fp = tmq_ask_ep_cb;
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
tsem_wait(&pRequest->body.rspSem);
}
SMqClientTopic* pTopic = taosArrayGetP(tmq->clientTopics, tmq->nextTopicIdx);
tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics); tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics);
strcpy(req.topic, pTopic->topicName); strcpy(req.topic, pTopic->topicName);
int32_t nextVgIdx = pTopic->nextVgIdx; int32_t nextVgIdx = pTopic->nextVgIdx;
...@@ -618,14 +678,17 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { ...@@ -618,14 +678,17 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
pRequest->body.requestMsg = (SDataBuf){ .pData = &req, .len = sizeof(SMqConsumeReq) }; pRequest->body.requestMsg = (SDataBuf){ .pData = &req, .len = sizeof(SMqConsumeReq) };
pRequest->type = TDMT_VND_CONSUME; pRequest->type = TDMT_VND_CONSUME;
SMsgSendInfo* body = buildMsgInfoImpl(pRequest); SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->requestObjRefId = 0;
sendInfo->param = &tmq_message;
sendInfo->fp = tmq_poll_cb_inner;
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, body); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
return (tmq_message_t*)pRequest->body.resInfo.pData; return tmq_message;
/*tsem_wait(&pRequest->body.rspSem);*/ /*tsem_wait(&pRequest->body.rspSem);*/
......
此差异已折叠。
...@@ -115,6 +115,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { ...@@ -115,6 +115,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = dndProcessMnodeWriteMsg;
/*pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBSCRIBE_RSP)] = dndProcessMnodeWriteMsg;*/ /*pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBSCRIBE_RSP)] = dndProcessMnodeWriteMsg;*/
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GET_SUB_EP)] = dndProcessMnodeReadMsg;
// Requests handled by VNODE // Requests handled by VNODE
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg;
......
...@@ -25,7 +25,7 @@ extern "C" { ...@@ -25,7 +25,7 @@ extern "C" {
int32_t mndInitConsumer(SMnode *pMnode); int32_t mndInitConsumer(SMnode *pMnode);
void mndCleanupConsumer(SMnode *pMnode); void mndCleanupConsumer(SMnode *pMnode);
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int32_t consumerId); SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId);
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer); void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer);
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer); SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer);
......
...@@ -30,24 +30,23 @@ ...@@ -30,24 +30,23 @@
#define MND_CONSUMER_VER_NUMBER 1 #define MND_CONSUMER_VER_NUMBER 1
#define MND_CONSUMER_RESERVE_SIZE 64 #define MND_CONSUMER_RESERVE_SIZE 64
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer); static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer); static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer); static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer);
static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg); static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg);
static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveConsumer(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); static int32_t mndRetrieveConsumer(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter); static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
int32_t mndInitConsumer(SMnode *pMnode) { int32_t mndInitConsumer(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_CONSUMER, SSdbTable table = {.sdbType = SDB_CONSUMER,
.keyType = SDB_KEY_BINARY, .keyType = SDB_KEY_INT64,
.encodeFp = (SdbEncodeFp)mndConsumerActionEncode, .encodeFp = (SdbEncodeFp)mndConsumerActionEncode,
.decodeFp = (SdbDecodeFp)mndConsumerActionDecode, .decodeFp = (SdbDecodeFp)mndConsumerActionDecode,
.insertFp = (SdbInsertFp)mndConsumerActionInsert, .insertFp = (SdbInsertFp)mndConsumerActionInsert,
.updateFp = (SdbUpdateFp)mndConsumerActionUpdate, .updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
.deleteFp = (SdbDeleteFp)mndConsumerActionDelete}; .deleteFp = (SdbDeleteFp)mndConsumerActionDelete};
return sdbSetTable(pMnode->pSdb, table); return sdbSetTable(pMnode->pSdb, table);
} }
...@@ -61,10 +60,10 @@ SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { ...@@ -61,10 +60,10 @@ SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size); SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
if (pRaw == NULL) goto CM_ENCODE_OVER; if (pRaw == NULL) goto CM_ENCODE_OVER;
void* buf = malloc(tlen); void *buf = malloc(tlen);
if (buf == NULL) goto CM_ENCODE_OVER; if (buf == NULL) goto CM_ENCODE_OVER;
void* abuf = buf; void *abuf = buf;
tEncodeSMqConsumerObj(&abuf, pConsumer); tEncodeSMqConsumerObj(&abuf, pConsumer);
int32_t dataPos = 0; int32_t dataPos = 0;
...@@ -106,7 +105,7 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { ...@@ -106,7 +105,7 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
int32_t dataPos = 0; int32_t dataPos = 0;
int32_t len; int32_t len;
SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
void* buf = malloc(len); void *buf = malloc(len);
if (buf == NULL) goto CM_DECODE_OVER; if (buf == NULL) goto CM_DECODE_OVER;
SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER); SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER); SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
...@@ -147,7 +146,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, ...@@ -147,7 +146,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
return 0; return 0;
} }
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int32_t consumerId) { SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId); SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
if (pConsumer == NULL) { if (pConsumer == NULL) {
......
...@@ -30,6 +30,8 @@ ...@@ -30,6 +30,8 @@
#define MND_SUBSCRIBE_VER_NUMBER 1 #define MND_SUBSCRIBE_VER_NUMBER 1
#define MND_SUBSCRIBE_RESERVE_SIZE 64 #define MND_SUBSCRIBE_RESERVE_SIZE 64
static char *mndMakeSubscribeKey(char *cgroup, char *topicName);
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *); static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw); static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *); static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
...@@ -41,9 +43,10 @@ static int32_t mndProcessSubscribeRsp(SMnodeMsg *pMsg); ...@@ -41,9 +43,10 @@ static int32_t mndProcessSubscribeRsp(SMnodeMsg *pMsg);
static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg); static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg);
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg); static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg);
static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg); static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg);
static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg);
static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer, static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer,
SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic); SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic, SMqConsumerEp *pSub);
int32_t mndInitSubscribe(SMnode *pMnode) { int32_t mndInitSubscribe(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_SUBSCRIBE, SSdbTable table = {.sdbType = SDB_SUBSCRIBE,
...@@ -57,9 +60,60 @@ int32_t mndInitSubscribe(SMnode *pMnode) { ...@@ -57,9 +60,60 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq); mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
mndSetMsgHandle(pMnode, TDMT_VND_MQ_SET_CONN_RSP, mndProcessSubscribeInternalRsp); mndSetMsgHandle(pMnode, TDMT_VND_MQ_SET_CONN_RSP, mndProcessSubscribeInternalRsp);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg); mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
mndSetMsgHandle(pMnode, TDMT_MND_GET_SUB_EP, mndProcessGetSubEpReq);
return sdbSetTable(pMnode->pSdb, table); return sdbSetTable(pMnode->pSdb, table);
} }
static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->pCont;
SMqCMGetSubEpRsp rsp;
int64_t consumerId = be64toh(pReq->consumerId);
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->pMnode, consumerId);
if (pConsumer == NULL) {
/*terrno = */
return -1;
}
ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0);
strcpy(rsp.cgroup, pReq->cgroup);
rsp.consumerId = consumerId;
SArray *pTopics = pConsumer->topics;
int32_t sz = taosArrayGetSize(pTopics);
rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
for (int32_t i = 0; i < sz; i++) {
SMqSubTopicEp topicEp;
SMqConsumerTopic *pConsumerTopic = taosArrayGet(pTopics, i);
strcpy(topicEp.topic, pConsumerTopic->name);
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, pConsumerTopic->name);
int32_t assignedSz = taosArrayGetSize(pSub->assigned);
topicEp.vgs = taosArrayInit(assignedSz, sizeof(SMqSubVgEp));
for (int32_t j = 0; j < assignedSz; j++) {
SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, i);
if (pCEp->consumerId == consumerId) {
taosArrayPush(pSub->assigned, pCEp);
}
}
if (taosArrayGetSize(topicEp.vgs) != 0) {
taosArrayPush(rsp.topics, &topicEp);
}
}
int32_t tlen = tEncodeSMqCMGetSubEpRsp(NULL, &rsp);
void *buf = malloc(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
void *abuf = buf;
tEncodeSMqCMGetSubEpRsp(&abuf, &rsp);
//TODO: free rsp
pMsg->pCont = buf;
pMsg->contLen = tlen;
return 0;
}
static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) { static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) {
int i = 0; int i = 0;
while (key[i] != ':') { while (key[i] != ':') {
...@@ -97,7 +151,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { ...@@ -97,7 +151,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
// build msg // build msg
SMqSetCVgReq* pReq = malloc(sizeof(SMqSetCVgReq) + pCEp->qmsgLen); SMqSetCVgReq *pReq = malloc(sizeof(SMqSetCVgReq));
if (pReq == NULL) { if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
...@@ -108,7 +162,8 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { ...@@ -108,7 +162,8 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
pReq->logicalPlan = strdup(pTopic->logicalPlan); pReq->logicalPlan = strdup(pTopic->logicalPlan);
pReq->physicalPlan = strdup(pTopic->physicalPlan); pReq->physicalPlan = strdup(pTopic->physicalPlan);
pReq->qmsgLen = pCEp->qmsgLen; pReq->qmsgLen = pCEp->qmsgLen;
memcpy(pReq->qmsg, pCEp->qmsg, pCEp->qmsgLen); /*memcpy(pReq->qmsg, pCEp->qmsg, pCEp->qmsgLen);*/
pReq->qmsg = strdup(pCEp->qmsg);
int32_t tlen = tEncodeSMqSetCVgReq(NULL, pReq); int32_t tlen = tEncodeSMqSetCVgReq(NULL, pReq);
void *reqStr = malloc(tlen); void *reqStr = malloc(tlen);
if (reqStr == NULL) { if (reqStr == NULL) {
...@@ -146,11 +201,11 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { ...@@ -146,11 +201,11 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
} }
static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) { static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) {
//convert phyplan to dag // convert phyplan to dag
SQueryDag *pDag = qStringToDag(pTopic->physicalPlan); SQueryDag *pDag = qStringToDag(pTopic->physicalPlan);
SArray *pArray; SArray *pArray;
SArray* inner = taosArrayGet(pDag->pSubplans, 0); SArray *inner = taosArrayGet(pDag->pSubplans, 0);
SSubplan *plan = taosArrayGetP(inner, 0); SSubplan *plan = taosArrayGetP(inner, 0);
plan->execNode.inUse = 0; plan->execNode.inUse = 0;
strcpy(plan->execNode.epAddr[0].fqdn, "localhost"); strcpy(plan->execNode.epAddr[0].fqdn, "localhost");
plan->execNode.epAddr[0].port = 6030; plan->execNode.epAddr[0].port = 6030;
...@@ -161,21 +216,24 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas ...@@ -161,21 +216,24 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
return -1; return -1;
} }
int32_t sz = taosArrayGetSize(pArray); int32_t sz = taosArrayGetSize(pArray);
//convert dag to msg // convert dag to msg
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp CEp; SMqConsumerEp CEp;
CEp.status = 0; CEp.status = 0;
CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1; CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
STaskInfo* pTaskInfo = taosArrayGet(pArray, i); STaskInfo *pTaskInfo = taosArrayGet(pArray, i);
tConvertQueryAddrToEpSet(&CEp.epSet, &pTaskInfo->addr); tConvertQueryAddrToEpSet(&CEp.epSet, &pTaskInfo->addr);
/*mDebug("subscribe convert ep %d %s %s %s %s %s\n", CEp.epSet.numOfEps, CEp.epSet.fqdn[0], CEp.epSet.fqdn[1], CEp.epSet.fqdn[2], CEp.epSet.fqdn[3], CEp.epSet.fqdn[4]);*/ /*mDebug("subscribe convert ep %d %s %s %s %s %s\n", CEp.epSet.numOfEps, CEp.epSet.fqdn[0], CEp.epSet.fqdn[1],
* CEp.epSet.fqdn[2], CEp.epSet.fqdn[3], CEp.epSet.fqdn[4]);*/
CEp.vgId = pTaskInfo->addr.nodeId; CEp.vgId = pTaskInfo->addr.nodeId;
CEp.qmsgLen = pTaskInfo->msg->contentLen; CEp.qmsg = strdup(pTaskInfo->msg->msg);
CEp.qmsg = malloc(CEp.qmsgLen); CEp.qmsgLen = strlen(CEp.qmsg) + 1;
if (CEp.qmsg == NULL) { printf("abc:\n%s\n", CEp.qmsg);
return -1; /*CEp.qmsg = malloc(CEp.qmsgLen);*/
} /*if (CEp.qmsg == NULL) {*/
memcpy(CEp.qmsg, pTaskInfo->msg->msg, pTaskInfo->msg->contentLen); /*return -1;*/
/*}*/
/*memcpy(CEp.qmsg, pTaskInfo->msg->msg, pTaskInfo->msg->contentLen);*/
taosArrayPush(unassignedVg, &CEp); taosArrayPush(unassignedVg, &CEp);
} }
...@@ -184,7 +242,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas ...@@ -184,7 +242,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
} }
static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer, static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer,
SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic) { SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic, SMqConsumerEp *pCEp) {
int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo); int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
int32_t vgId = *(int32_t *)taosArrayGet(pConsumerTopic->pVgInfo, i); int32_t vgId = *(int32_t *)taosArrayGet(pConsumerTopic->pVgInfo, i);
...@@ -199,6 +257,8 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume ...@@ -199,6 +257,8 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume
req.sql = pTopic->sql; req.sql = pTopic->sql;
req.logicalPlan = pTopic->logicalPlan; req.logicalPlan = pTopic->logicalPlan;
req.physicalPlan = pTopic->physicalPlan; req.physicalPlan = pTopic->physicalPlan;
req.qmsg = strdup(pCEp->qmsg);
req.qmsgLen = strlen(req.qmsg);
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
void *buf = malloc(sizeof(SMsgHead) + tlen); void *buf = malloc(sizeof(SMsgHead) + tlen);
if (buf == NULL) { if (buf == NULL) {
...@@ -206,18 +266,18 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume ...@@ -206,18 +266,18 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume
return -1; return -1;
} }
SMsgHead* pMsgHead = (SMsgHead*)buf; SMsgHead *pMsgHead = (SMsgHead *)buf;
pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen); pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
pMsgHead->vgId = htonl(vgId); pMsgHead->vgId = htonl(vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncodeSMqSetCVgReq(&abuf, &req); tEncodeSMqSetCVgReq(&abuf, &req);
STransAction action = {0}; STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgObj); action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
action.pCont = buf; action.pCont = buf;
action.contLen = tlen; action.contLen = sizeof(SMsgHead) + tlen;
action.msgType = TDMT_VND_MQ_SET_CONN; action.msgType = TDMT_VND_MQ_SET_CONN;
mndReleaseVgroup(pMnode, pVgObj); mndReleaseVgroup(pMnode, pVgObj);
...@@ -285,7 +345,7 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) { ...@@ -285,7 +345,7 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
int32_t dataPos = 0; int32_t dataPos = 0;
int32_t tlen; int32_t tlen;
SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
void *buf = malloc(tlen + 1); void *buf = malloc(tlen + 1);
if (buf == NULL) goto SUB_DECODE_OVER; if (buf == NULL) goto SUB_DECODE_OVER;
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER); SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER); SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
...@@ -493,11 +553,11 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { ...@@ -493,11 +553,11 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
char* key = mndMakeSubscribeKey(consumerGroup, newTopicName); char *key = mndMakeSubscribeKey(consumerGroup, newTopicName);
strcpy(pSub->key, key); strcpy(pSub->key, key);
// set unassigned vg // set unassigned vg
mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg); mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg);
//TODO: disable alter // TODO: disable alter
} }
taosArrayPush(pSub->availConsumer, &consumerId); taosArrayPush(pSub->availConsumer, &consumerId);
...@@ -506,12 +566,15 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { ...@@ -506,12 +566,15 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
if (taosArrayGetSize(pConsumerTopic->pVgInfo) > 0) { if (taosArrayGetSize(pConsumerTopic->pVgInfo) > 0) {
ASSERT(taosArrayGetSize(pConsumerTopic->pVgInfo) == 1); ASSERT(taosArrayGetSize(pConsumerTopic->pVgInfo) == 1);
int32_t vgId = *(int32_t *)taosArrayGetLast(pConsumerTopic->pVgInfo); int32_t vgId = *(int32_t *)taosArrayGetLast(pConsumerTopic->pVgInfo);
// send setmsg to vnode SMqConsumerEp *pCEp = taosArrayGetLast(pSub->assigned);
if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic, pTopic) < 0) { if (pCEp->vgId == vgId) {
// TODO if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic, pTopic, pCEp) < 0) {
return -1; // TODO
return -1;
}
} }
// send setmsg to vnode
} }
SSdbRaw *pRaw = mndSubActionEncode(pSub); SSdbRaw *pRaw = mndSubActionEncode(pSub);
......
...@@ -811,7 +811,7 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { ...@@ -811,7 +811,7 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
for (int i = 0; i < TQ_BUFFER_SIZE; i++) { for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
pTopic->buffer.output[i].status = 0; pTopic->buffer.output[i].status = 0;
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta); STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta);
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&req.qmsg, pReadHandle); pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, pReadHandle);
} }
taosArrayPush(pConsumer->topics, pTopic); taosArrayPush(pConsumer->topics, pTopic);
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
......
...@@ -83,8 +83,9 @@ int32_t qCreateExecTask(void* readHandle, int32_t vgId, SSubplan* pSubplan, qTas ...@@ -83,8 +83,9 @@ int32_t qCreateExecTask(void* readHandle, int32_t vgId, SSubplan* pSubplan, qTas
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
if (handle) {
code = dsCreateDataSinker(pSubplan->pDataSink, handle); code = dsCreateDataSinker(pSubplan->pDataSink, handle);
}
_error: _error:
// if failed to add ref for all tables in this query, abort current query // if failed to add ref for all tables in this query, abort current query
...@@ -461,4 +462,4 @@ int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCo ...@@ -461,4 +462,4 @@ int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCo
return error; return error;
} }
#endif #endif
\ No newline at end of file
...@@ -1482,13 +1482,14 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) { ...@@ -1482,13 +1482,14 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
} }
int32_t msgSize = sizeof(SSubQueryMsg) + msgLen; int32_t msgSize = sizeof(SSubQueryMsg) + msgLen;
msg = calloc(1, msgSize);
if (NULL == msg) { if (NULL == msg) {
qError("calloc %d failed", msgSize); qError("calloc %d failed", msgSize);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
SSubQueryMsg *pMsg = (SSubQueryMsg*) msg; SSubQueryMsg* pMsg = calloc(1, msgSize);
/*SSubQueryMsg *pMsg = (SSubQueryMsg*) msg;*/
memcpy(pMsg->msg, msg, msgLen);
pMsg->header.vgId = tInfo.addr.nodeId; pMsg->header.vgId = tInfo.addr.nodeId;
...@@ -1497,7 +1498,7 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) { ...@@ -1497,7 +1498,7 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
pMsg->taskId = schGenUUID(); pMsg->taskId = schGenUUID();
pMsg->taskType = TASK_TYPE_PERSISTENT; pMsg->taskType = TASK_TYPE_PERSISTENT;
pMsg->contentLen = msgLen; pMsg->contentLen = msgLen;
memcpy(pMsg->msg, msg, msgLen); /*memcpy(pMsg->msg, ((SSubQueryMsg*)msg)->msg, msgLen);*/
tInfo.msg = pMsg; tInfo.msg = pMsg;
......
...@@ -13,12 +13,12 @@ IF (HEADER_GTEST_INCLUDE_DIR AND (LIB_GTEST_STATIC_DIR OR LIB_GTEST_SHARED_DIR)) ...@@ -13,12 +13,12 @@ IF (HEADER_GTEST_INCLUDE_DIR AND (LIB_GTEST_STATIC_DIR OR LIB_GTEST_SHARED_DIR))
LIST(REMOVE_ITEM SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/trefTest.c) LIST(REMOVE_ITEM SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/trefTest.c)
ADD_EXECUTABLE(utilTest ${SOURCE_LIST}) ADD_EXECUTABLE(utilTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(utilTest util common os gtest pthread gcov) TARGET_LINK_LIBRARIES(utilTest util common os gtest pthread)
LIST(REMOVE_ITEM SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/cacheTest.cpp) LIST(REMOVE_ITEM SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/cacheTest.cpp)
LIST(APPEND SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/hashTest.cpp) LIST(APPEND SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/hashTest.cpp)
ADD_EXECUTABLE(hashTest ${SOURCE_LIST}) ADD_EXECUTABLE(hashTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(hashTest util common os gtest pthread gcov) TARGET_LINK_LIBRARIES(hashTest util common os gtest pthread)
LIST(APPEND BIN_SRC ${CMAKE_CURRENT_SOURCE_DIR}/trefTest.c) LIST(APPEND BIN_SRC ${CMAKE_CURRENT_SOURCE_DIR}/trefTest.c)
ADD_EXECUTABLE(trefTest ${BIN_SRC}) ADD_EXECUTABLE(trefTest ${BIN_SRC})
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册