diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 4a0e3593dacfd4388cd27c715f9f17cc9e62783c..7d232aa8521856f00f13fb56baec4f13449f8231 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1126,7 +1126,7 @@ typedef struct { int32_t topicNum; int64_t consumerId; char* consumerGroup; - char* topicName[]; + SArray* topicNames; // SArray } SCMSubscribeReq; static FORCE_INLINE int tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) { @@ -1134,8 +1134,9 @@ static FORCE_INLINE int tSerializeSCMSubscribeReq(void** buf, const SCMSubscribe tlen += taosEncodeFixedI32(buf, pReq->topicNum); tlen += taosEncodeFixedI64(buf, pReq->consumerId); tlen += taosEncodeString(buf, pReq->consumerGroup); + for(int i = 0; i < pReq->topicNum; i++) { - tlen += taosEncodeString(buf, pReq->topicName[i]); + tlen += taosEncodeString(buf, (char*)taosArrayGetP(pReq->topicNames, i)); } return tlen; } @@ -1144,8 +1145,11 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq buf = taosDecodeFixedI32(buf, &pReq->topicNum); buf = taosDecodeFixedI64(buf, &pReq->consumerId); buf = taosDecodeString(buf, &pReq->consumerGroup); + pReq->topicNames = taosArrayInit(pReq->topicNum, sizeof(void*)); for(int i = 0; i < pReq->topicNum; i++) { - buf = taosDecodeString(buf, &pReq->topicName[i]); + char* name = NULL; + buf = taosDecodeString(buf, &name); + taosArrayPush(pReq->topicNames, &name); } return buf; } diff --git a/include/util/tcoding.h b/include/util/tcoding.h index e1edf0d7922eec321a282036811e3aa470a4ff59..226856901f2d20ae6ce50f81989aba032ca38d6c 100644 --- a/include/util/tcoding.h +++ b/include/util/tcoding.h @@ -370,10 +370,33 @@ static FORCE_INLINE void *taosDecodeStringTo(void *buf, char *value) { return POINTER_SHIFT(buf, size); } +// ---- binary +static FORCE_INLINE int taosEncodeBinary(void **buf, const void *value, int valueLen) { + int tlen = 0; + + if (buf != NULL) { + memcpy(*buf, value, valueLen); + *buf = POINTER_SHIFT(*buf, valueLen); + } + tlen += (int)valueLen; + + return tlen; +} + +static FORCE_INLINE void *taosDecodeBinary(void *buf, void **value, int valueLen) { + uint64_t size = 0; + + *value = malloc((size_t)valueLen); + if (*value == NULL) return NULL; + memcpy(*value, buf, (size_t)size); + + return POINTER_SHIFT(buf, size); +} + #endif #ifdef __cplusplus } #endif -#endif /*_TD_UTIL_CODING_H*/ \ No newline at end of file +#endif /*_TD_UTIL_CODING_H*/ diff --git a/include/util/thash.h b/include/util/thash.h index a736fc26af37ad1989fd04a04113db039d69c70f..4558162ac5d9ff1bd4e8c8eb32cf6b440f9fb982 100644 --- a/include/util/thash.h +++ b/include/util/thash.h @@ -210,6 +210,14 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p); */ int32_t taosHashGetKey(void *data, void** key, size_t* keyLen); + +/** + * Get the corresponding data length for a given data in hash table + * @param data + * @return + */ +int32_t taosHashGetDataLen(void *data); + /** * return the payload data with the specified key(reference number added) * diff --git a/source/client/inc/clientHb.h b/source/client/inc/clientHb.h new file mode 100644 index 0000000000000000000000000000000000000000..676b5c4c40276d009fbf3356a9261fddce16424a --- /dev/null +++ b/source/client/inc/clientHb.h @@ -0,0 +1,148 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "os.h" +#include "tarray.h" +#include "thash.h" +#include "tmsg.h" + +typedef enum { + mq = 0, + HEARTBEAT_TYPE_MAX +} EHbType; + +typedef struct SKlv { + int32_t keyLen; + int32_t valueLen; + void* key; + void* value; +} SKlv; + +static FORCE_INLINE int taosEncodeSKlv(void** buf, const SKlv* pKlv) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pKlv->keyLen); + tlen += taosEncodeFixedI32(buf, pKlv->valueLen); + tlen += taosEncodeBinary(buf, pKlv->key, pKlv->keyLen); + tlen += taosEncodeBinary(buf, pKlv->value, pKlv->valueLen); + return tlen; +} + +static FORCE_INLINE void* taosDecodeSKlv(void* buf, SKlv* pKlv) { + buf = taosDecodeFixedI32(buf, &pKlv->keyLen); + buf = taosDecodeFixedI32(buf, &pKlv->valueLen); + buf = taosDecodeBinary(buf, &pKlv->key, pKlv->keyLen); + buf = taosDecodeBinary(buf, &pKlv->value, pKlv->valueLen); + return buf; +} + +typedef struct SClientHbKey { + int32_t connId; + int32_t hbType; +} SClientHbKey; + +static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pKey->connId); + tlen += taosEncodeFixedI32(buf, pKey->hbType); + return tlen; +} + +static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) { + buf = taosDecodeFixedI32(buf, &pKey->connId); + buf = taosDecodeFixedI32(buf, &pKey->hbType); + return buf; +} + +typedef struct SClientHbReq { + SClientHbKey hbKey; + SHashObj* info; // hash +} SClientHbReq; + +static FORCE_INLINE int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq) { + int tlen = 0; + tlen += taosEncodeSClientHbKey(buf, &pReq->hbKey); + + void* pIter = NULL; + void* data; + SKlv klv; + data = taosHashIterate(pReq->info, pIter); + while (data != NULL) { + taosHashGetKey(data, &klv.key, (size_t*)&klv.keyLen); + klv.valueLen = taosHashGetDataLen(data); + klv.value = data; + taosEncodeSKlv(buf, &klv); + + data = taosHashIterate(pReq->info, pIter); + } + return tlen; +} + +static FORCE_INLINE void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq) { + ASSERT(pReq->info != NULL); + buf = taosDecodeSClientHbKey(buf, &pReq->hbKey); + + //TODO: error handling + if(pReq->info == NULL) { + pReq->info = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + } + SKlv klv; + buf = taosDecodeSKlv(buf, &klv); + taosHashPut(pReq->info, klv.key, klv.keyLen, klv.value, klv.valueLen); + + return buf; +} + +typedef struct SClientHbBatchReq { + int64_t reqId; + SArray* reqs; // SArray +} SClientHbBatchReq; + +typedef struct SClientHbHandleResult { +} SClientHbHandleResult; + +typedef struct SClientHbRsp { + int32_t connId; + int32_t hbType; +} SClientHbRsp; + +typedef struct SClientHbBatchRsp { + int64_t reqId; + int64_t rspId; + SArray* rsps; // SArray +} SClientHbBatchRsp; + +typedef int32_t (*FHbRspHandle)(SClientHbReq* pReq); +typedef int32_t (*FGetConnInfo)(int32_t conn, void* self); + +typedef struct SClientHbMgr { + int8_t inited; + int32_t reportInterval; // unit ms + int32_t stats; + SRWLatch lock; + SHashObj* info; //hash + FHbRspHandle handle[HEARTBEAT_TYPE_MAX]; + // input queue +} SClientHbMgr; + +static SClientHbMgr clientHbMgr = {0}; + +int hbMgrInit(); +void hbMgrCleanUp(); + +int registerConn(int32_t connId, FGetConnInfo func, FHbRspHandle rspHandle); + +int registerHbRspHandle(int32_t connId, int32_t hbType, FHbRspHandle rspHandle); + +int HbAddConnInfo(int32_t connId, void* key, void* value, int32_t keyLen, int32_t valueLen); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c new file mode 100644 index 0000000000000000000000000000000000000000..f1f7409f05bb3912359258107595b4f1c4a5350f --- /dev/null +++ b/source/client/src/clientHb.c @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "clientHb.h" + +static int32_t mqHbRspHandle(SClientHbReq* pReq) { + return 0; +} + +int hbMgrInit() { + //init once + // + //init lock + // + //init handle funcs + clientHbMgr.handle[mq] = mqHbRspHandle; + + //init stat + clientHbMgr.stats = 0; + + //init config + clientHbMgr.reportInterval = 1500; + + //init hash info + // + return 0; +} + +void hbMgrCleanUp() { + +} + +int registerConn(int32_t connId, FGetConnInfo func, FHbRspHandle rspHandle) { + return 0; +} + +int registerHbRspHandle(int32_t connId, int32_t hbType, FHbRspHandle rspHandle) { + return 0; +} + +int HbAddConnInfo(int32_t connId, void* key, void* value, int32_t keyLen, int32_t valueLen) { + //lock + + //find req by connection id + + //unlock + return 0; +} diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 7aa818c7bb4c95b14b9b5ac90bad44f0d57c2273..828eb4a5b714dcd14de3045d09e85c630da86cc6 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -127,8 +127,8 @@ static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { goto CONSUME_DECODE_OVER; } - int32_t size = sizeof(SMqConsumerObj); - SSdbRow *pRow = sdbAllocRow(size); + int32_t size = sizeof(SMqConsumerObj); + SSdbRow *pRow = sdbAllocRow(size); if (pRow == NULL) goto CONSUME_DECODE_OVER; SMqConsumerObj *pConsumer = sdbGetRowObj(pRow); @@ -155,7 +155,6 @@ static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { SDB_GET_INT32(pRaw, dataPos, &vgSize, CONSUME_DECODE_OVER); } - CONSUME_DECODE_OVER: if (terrno != 0) { mError("consumer:%ld, failed to decode from raw:%p since %s", pConsumer->consumerId, pRaw, terrstr()); @@ -209,6 +208,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { tDeserializeSCMSubscribeReq(msgStr, pSubscribe); int64_t consumerId = pSubscribe->consumerId; char *consumerGroup = pSubscribe->consumerGroup; + int32_t cgroupLen = strlen(consumerGroup); SArray *newSub = NULL; int newTopicNum = pSubscribe->topicNum; @@ -216,13 +216,14 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { newSub = taosArrayInit(newTopicNum, sizeof(SMqConsumerTopic)); } for (int i = 0; i < newTopicNum; i++) { - char *topic = pSubscribe->topicName[i]; + char *newTopicName = taosArrayGetP(newSub, i); SMqConsumerTopic *pConsumerTopic = malloc(sizeof(SMqConsumerTopic)); if (pConsumerTopic == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; // TODO: free return -1; } + strcpy(pConsumerTopic->name, newTopicName); pConsumerTopic->vgroups = tdListNew(sizeof(int64_t)); taosArrayPush(newSub, pConsumerTopic); free(pConsumerTopic); @@ -239,7 +240,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - strcpy(pConsumer->cgroup, pSubscribe->consumerGroup); + pConsumer->consumerId = consumerId; + strcpy(pConsumer->cgroup, consumerGroup); } else { oldSub = pConsumer->topics; @@ -260,6 +262,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { j++; } else if (j >= oldTopicNum) { pNewTopic = taosArrayGet(newSub, i); + i++; } else { pNewTopic = taosArrayGet(newSub, i); pOldTopic = taosArrayGet(oldSub, j); @@ -292,7 +295,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { SMqTopicObj *pTopic = mndAcquireTopic(pMnode, oldTopicName); ASSERT(pTopic != NULL); - SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, pSubscribe->consumerGroup, strlen(pSubscribe->consumerGroup)); + SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen); while ((pn = tdListNext(&iter)) != NULL) { int32_t vgId = *(int64_t *)pn->data; SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); @@ -302,8 +305,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { continue; } // acquire and get epset - void *pMqVgSetReq = - mndBuildMqVGroupSetReq(pMnode, oldTopicName, vgId, pSubscribe->consumerId, pSubscribe->consumerGroup); + void *pMqVgSetReq = mndBuildMqVGroupSetReq(pMnode, oldTopicName, vgId, consumerId, consumerGroup); // TODO:serialize if (pMsg == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -321,7 +323,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { return -1; } } - taosHashRemove(pTopic->cgroups, pSubscribe->consumerGroup, strlen(pSubscribe->consumerGroup)); + taosHashRemove(pTopic->cgroups, consumerGroup, cgroupLen); + mndReleaseTopic(pMnode, pTopic); } else if (pNewTopic != NULL) { ASSERT(pOldTopic == NULL); @@ -330,7 +333,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName); ASSERT(pTopic != NULL); - SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, pSubscribe->consumerGroup, strlen(pSubscribe->consumerGroup)); + SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen); if (pGroup == NULL) { // add new group pGroup = malloc(sizeof(SMqCGroup)); @@ -346,18 +349,20 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { } pGroup->status = 0; // add into cgroups - taosHashPut(pTopic->cgroups, pSubscribe->consumerGroup, strlen(pSubscribe->consumerGroup), pGroup, - sizeof(SMqCGroup)); + taosHashPut(pTopic->cgroups, consumerGroup, cgroupLen, pGroup, sizeof(SMqCGroup)); } // put the consumer into list // rebalance will be triggered by timer - tdListAppend(pGroup->consumerIds, &pSubscribe->consumerId); + tdListAppend(pGroup->consumerIds, &consumerId); SSdbRaw *pTopicRaw = mndTopicActionEncode(pTopic); sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY); // TODO: error handling mndTransAppendRedolog(pTrans, pTopicRaw); + + mndReleaseTopic(pMnode, pTopic); + } else { ASSERT(0); } @@ -376,11 +381,13 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); + mndReleaseConsumer(pMnode, pConsumer); return -1; } // TODO: free memory mndTransDrop(pTrans); + mndReleaseConsumer(pMnode, pConsumer); return 0; } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index c31b91c3fcc8d732b1f6b5bfdc6407bac2266059..57b7c16c39fd968f4e713926a2dfd4c032b41f10 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -58,25 +58,36 @@ int32_t mndInitTopic(SMnode *pMnode) { void mndCleanupTopic(SMnode *pMnode) {} SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + int32_t size = sizeof(SMqTopicObj) + MND_TOPIC_RESERVE_SIZE; SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size); - if (pRaw == NULL) goto WTF; + if (pRaw == NULL) goto TOPIC_ENCODE_OVER; int32_t dataPos = 0; - SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN, WTF); - SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, WTF); - SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, WTF); - SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, WTF); - SDB_SET_INT64(pRaw, dataPos, pTopic->uid, WTF); - SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, WTF); - SDB_SET_INT32(pRaw, dataPos, pTopic->version, WTF); - SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, WTF); - SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, WTF); - - SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, WTF); - SDB_SET_DATALEN(pRaw, dataPos, WTF); - -WTF: + SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN, TOPIC_ENCODE_OVER); + SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_ENCODE_OVER); + SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, TOPIC_ENCODE_OVER); + SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER); + SDB_SET_INT64(pRaw, dataPos, pTopic->uid, TOPIC_ENCODE_OVER); + SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER); + SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER); + SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER); + SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER); + + SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER); + SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER); + + terrno = TSDB_CODE_SUCCESS; + +TOPIC_ENCODE_OVER: + if (terrno != TSDB_CODE_SUCCESS) { + mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr()); + sdbFreeRaw(pRaw); + return NULL; + } + + mTrace("topic:%s, encode to raw:%p, row:%p", pTopic->name, pRaw, pTopic); return pRaw; } @@ -90,8 +101,8 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { goto TOPIC_DECODE_OVER; } - int32_t size = sizeof(SMqTopicObj); - SSdbRow *pRow = sdbAllocRow(size); + int32_t size = sizeof(SMqTopicObj); + SSdbRow *pRow = sdbAllocRow(size); if (pRow == NULL) goto TOPIC_DECODE_OVER; SMqTopicObj *pTopic = sdbGetRowObj(pRow); @@ -115,10 +126,10 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER) - terrno = 0; + terrno = TSDB_CODE_SUCCESS; TOPIC_DECODE_OVER: - if (terrno != 0) { + if (terrno != TSDB_CODE_SUCCESS) { mError("topic:%s, failed to decode from raw:%p since %s", pTopic->name, pRaw, terrstr()); tfree(pRow); return NULL; diff --git a/source/util/src/thash.c b/source/util/src/thash.c index f90b157558adb6e851927f34945dc0f734a48c9a..181661d3048953df7aeb03b0347ed99601315e80 100644 --- a/source/util/src/thash.c +++ b/source/util/src/thash.c @@ -799,6 +799,11 @@ FORCE_INLINE int32_t taosHashGetKey(void *data, void** key, size_t* keyLen) { return 0; } +FORCE_INLINE int32_t taosHashGetDataLen(void *data) { + SHashNode * node = GET_HASH_PNODE(data); + return node->keyLen; +} + FORCE_INLINE uint32_t taosHashGetDataKeyLen(SHashObj *pHashObj, void *data) { SHashNode * node = GET_HASH_PNODE(data); return node->keyLen;