diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 97be4389209d32441e3f0f5dc47f5b22bb24bb33..8cddc24bc7ed29a5343db4479edb16882c0ecbfa 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -26,6 +26,7 @@ extern "C" { #include "tarray.h" #include "tcoding.h" #include "tdataformat.h" +#include "thash.h" #include "tlist.h" /* ------------------------ MESSAGE DEFINITIONS ------------------------ */ @@ -132,6 +133,73 @@ typedef enum _mgmt_table { #define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC) #define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0) +typedef struct SKlv { + int32_t keyLen; + int32_t valueLen; + void* key; + void* value; +} SKlv; + +static FORCE_INLINE int taosEncodeSKlv(void** buf, const SKlv* pKlv) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pKlv->keyLen); + tlen += taosEncodeFixedI32(buf, pKlv->valueLen); + tlen += taosEncodeBinary(buf, pKlv->key, pKlv->keyLen); + tlen += taosEncodeBinary(buf, pKlv->value, pKlv->valueLen); + return tlen; +} + +static FORCE_INLINE void* taosDecodeSKlv(void* buf, SKlv* pKlv) { + buf = taosDecodeFixedI32(buf, &pKlv->keyLen); + buf = taosDecodeFixedI32(buf, &pKlv->valueLen); + buf = taosDecodeBinary(buf, &pKlv->key, pKlv->keyLen); + buf = taosDecodeBinary(buf, &pKlv->value, pKlv->valueLen); + return buf; +} +typedef struct SClientHbKey { + int32_t connId; + int32_t hbType; +} SClientHbKey; + +static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pKey->connId); + tlen += taosEncodeFixedI32(buf, pKey->hbType); + return tlen; +} + +static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) { + buf = taosDecodeFixedI32(buf, &pKey->connId); + buf = taosDecodeFixedI32(buf, &pKey->hbType); + return buf; +} + +typedef struct SClientHbReq { + SClientHbKey connKey; + SHashObj* info; // hash +} SClientHbReq; + +typedef struct SClientHbBatchReq { + int64_t reqId; + SArray* reqs; // SArray +} SClientHbBatchReq; + +int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq); +void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq); + +typedef struct SClientHbRsp { + SClientHbKey connKey; + int32_t status; + int32_t bodyLen; + void* body; +} SClientHbRsp; + +typedef struct SClientHbBatchRsp { + int64_t reqId; + int64_t rspId; + SArray* rsps; // SArray +} SClientHbBatchRsp; + typedef struct SBuildTableMetaInput { int32_t vgId; char* dbName; @@ -1123,7 +1191,7 @@ typedef struct { int32_t topicNum; int64_t consumerId; char* consumerGroup; - SArray* topicNames; // SArray + SArray* topicNames; // SArray } SCMSubscribeReq; static FORCE_INLINE int tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) { @@ -1132,7 +1200,7 @@ static FORCE_INLINE int tSerializeSCMSubscribeReq(void** buf, const SCMSubscribe tlen += taosEncodeFixedI64(buf, pReq->consumerId); tlen += taosEncodeString(buf, pReq->consumerGroup); - for(int i = 0; i < pReq->topicNum; i++) { + for (int i = 0; i < pReq->topicNum; i++) { tlen += taosEncodeString(buf, (char*)taosArrayGetP(pReq->topicNames, i)); } return tlen; @@ -1143,7 +1211,7 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq buf = taosDecodeFixedI64(buf, &pReq->consumerId); buf = taosDecodeString(buf, &pReq->consumerGroup); pReq->topicNames = taosArrayInit(pReq->topicNum, sizeof(void*)); - for(int i = 0; i < pReq->topicNum; i++) { + for (int i = 0; i < pReq->topicNum; i++) { char* name = NULL; buf = taosDecodeString(buf, &name); taosArrayPush(pReq->topicNames, &name); @@ -1158,14 +1226,14 @@ typedef struct SMqSubTopic { } SMqSubTopic; typedef struct { - int32_t topicNum; + int32_t topicNum; SMqSubTopic topics[]; } SCMSubscribeRsp; static FORCE_INLINE int tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribeRsp* pRsp) { int tlen = 0; tlen += taosEncodeFixedI32(buf, pRsp->topicNum); - for(int i = 0; i < pRsp->topicNum; i++) { + for (int i = 0; i < pRsp->topicNum; i++) { tlen += taosEncodeFixedI32(buf, pRsp->topics[i].vgId); tlen += taosEncodeFixedI64(buf, pRsp->topics[i].topicId); tlen += taosEncodeSEpSet(buf, &pRsp->topics[i].epSet); @@ -1175,7 +1243,7 @@ static FORCE_INLINE int tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribe static FORCE_INLINE void* tDeserializeSCMSubscribeRsp(void* buf, SCMSubscribeRsp* pRsp) { buf = taosDecodeFixedI32(buf, &pRsp->topicNum); - for(int i = 0; i < pRsp->topicNum; i++) { + for (int i = 0; i < pRsp->topicNum; i++) { buf = taosDecodeFixedI32(buf, &pRsp->topics[i].vgId); buf = taosDecodeFixedI64(buf, &pRsp->topics[i].topicId); buf = taosDecodeSEpSet(buf, &pRsp->topics[i].epSet); diff --git a/source/client/inc/clientHb.h b/source/client/inc/clientHb.h index 676b5c4c40276d009fbf3356a9261fddce16424a..73adb41308cfa71ae109bc9f399500e1ca395d6f 100644 --- a/source/client/inc/clientHb.h +++ b/source/client/inc/clientHb.h @@ -20,118 +20,21 @@ typedef enum { mq = 0, + // type can be added here + // HEARTBEAT_TYPE_MAX } EHbType; -typedef struct SKlv { - int32_t keyLen; - int32_t valueLen; - void* key; - void* value; -} SKlv; - -static FORCE_INLINE int taosEncodeSKlv(void** buf, const SKlv* pKlv) { - int tlen = 0; - tlen += taosEncodeFixedI32(buf, pKlv->keyLen); - tlen += taosEncodeFixedI32(buf, pKlv->valueLen); - tlen += taosEncodeBinary(buf, pKlv->key, pKlv->keyLen); - tlen += taosEncodeBinary(buf, pKlv->value, pKlv->valueLen); - return tlen; -} - -static FORCE_INLINE void* taosDecodeSKlv(void* buf, SKlv* pKlv) { - buf = taosDecodeFixedI32(buf, &pKlv->keyLen); - buf = taosDecodeFixedI32(buf, &pKlv->valueLen); - buf = taosDecodeBinary(buf, &pKlv->key, pKlv->keyLen); - buf = taosDecodeBinary(buf, &pKlv->value, pKlv->valueLen); - return buf; -} - -typedef struct SClientHbKey { - int32_t connId; - int32_t hbType; -} SClientHbKey; - -static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) { - int tlen = 0; - tlen += taosEncodeFixedI32(buf, pKey->connId); - tlen += taosEncodeFixedI32(buf, pKey->hbType); - return tlen; -} - -static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) { - buf = taosDecodeFixedI32(buf, &pKey->connId); - buf = taosDecodeFixedI32(buf, &pKey->hbType); - return buf; -} - -typedef struct SClientHbReq { - SClientHbKey hbKey; - SHashObj* info; // hash -} SClientHbReq; - -static FORCE_INLINE int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq) { - int tlen = 0; - tlen += taosEncodeSClientHbKey(buf, &pReq->hbKey); - - void* pIter = NULL; - void* data; - SKlv klv; - data = taosHashIterate(pReq->info, pIter); - while (data != NULL) { - taosHashGetKey(data, &klv.key, (size_t*)&klv.keyLen); - klv.valueLen = taosHashGetDataLen(data); - klv.value = data; - taosEncodeSKlv(buf, &klv); - - data = taosHashIterate(pReq->info, pIter); - } - return tlen; -} - -static FORCE_INLINE void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq) { - ASSERT(pReq->info != NULL); - buf = taosDecodeSClientHbKey(buf, &pReq->hbKey); - - //TODO: error handling - if(pReq->info == NULL) { - pReq->info = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - } - SKlv klv; - buf = taosDecodeSKlv(buf, &klv); - taosHashPut(pReq->info, klv.key, klv.keyLen, klv.value, klv.valueLen); - - return buf; -} - -typedef struct SClientHbBatchReq { - int64_t reqId; - SArray* reqs; // SArray -} SClientHbBatchReq; - -typedef struct SClientHbHandleResult { -} SClientHbHandleResult; - -typedef struct SClientHbRsp { - int32_t connId; - int32_t hbType; -} SClientHbRsp; - -typedef struct SClientHbBatchRsp { - int64_t reqId; - int64_t rspId; - SArray* rsps; // SArray -} SClientHbBatchRsp; - -typedef int32_t (*FHbRspHandle)(SClientHbReq* pReq); -typedef int32_t (*FGetConnInfo)(int32_t conn, void* self); +typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq); +typedef int32_t (*FGetConnInfo)(SClientHbKey connKey, void* param); typedef struct SClientHbMgr { int8_t inited; int32_t reportInterval; // unit ms int32_t stats; SRWLatch lock; - SHashObj* info; //hash + SHashObj* activeInfo; // hash + SHashObj* getInfoFuncs; // hash FHbRspHandle handle[HEARTBEAT_TYPE_MAX]; // input queue } SClientHbMgr; @@ -140,9 +43,11 @@ static SClientHbMgr clientHbMgr = {0}; int hbMgrInit(); void hbMgrCleanUp(); +int hbHandleRsp(void* hbMsg); + + +int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func); -int registerConn(int32_t connId, FGetConnInfo func, FHbRspHandle rspHandle); -int registerHbRspHandle(int32_t connId, int32_t hbType, FHbRspHandle rspHandle); +int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen); -int HbAddConnInfo(int32_t connId, void* key, void* value, int32_t keyLen, int32_t valueLen); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index f1f7409f05bb3912359258107595b4f1c4a5350f..7daa1204d0022b216aac7636c18a2be39b9b2dfd 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -15,45 +15,62 @@ #include "clientHb.h" -static int32_t mqHbRspHandle(SClientHbReq* pReq) { +static int32_t mqHbRspHandle(SClientHbRsp* pReq) { return 0; } +uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { + return 0; +} + +static void hbMgrInitMqHbFunc() { + clientHbMgr.handle[mq] = mqHbRspHandle; +} + int hbMgrInit() { //init once - // - //init lock - // - //init handle funcs - clientHbMgr.handle[mq] = mqHbRspHandle; + int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1); + if (old == 1) return 0; + + //init config + clientHbMgr.reportInterval = 1500; //init stat clientHbMgr.stats = 0; - //init config - clientHbMgr.reportInterval = 1500; + //init lock + taosInitRWLatch(&clientHbMgr.lock); + + //init handle funcs + hbMgrInitMqHbFunc(); //init hash info - // + clientHbMgr.activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + //init getInfoFunc + clientHbMgr.getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); return 0; } void hbMgrCleanUp() { + int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0); + if (old == 0) return; + taosHashCleanup(clientHbMgr.activeInfo); + taosHashCleanup(clientHbMgr.getInfoFuncs); } -int registerConn(int32_t connId, FGetConnInfo func, FHbRspHandle rspHandle) { - return 0; -} - -int registerHbRspHandle(int32_t connId, int32_t hbType, FHbRspHandle rspHandle) { +int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func) { + return 0; } -int HbAddConnInfo(int32_t connId, void* key, void* value, int32_t keyLen, int32_t valueLen) { +int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) { //lock //find req by connection id + SClientHbReq* data = taosHashGet(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey)); + ASSERT(data != NULL); + taosHashPut(data->info, key, keyLen, value, valueLen); //unlock return 0; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index c1048f8482ae10cc1eb9cfc77327e3e143c6f683..5cbb42c1e6f42134c773ec146656e0f292350e5f 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -27,6 +27,40 @@ #undef TD_MSG_SEG_CODE_ #include "tmsgdef.h" +int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) { + int tlen = 0; + tlen += taosEncodeSClientHbKey(buf, &pReq->connKey); + + void *pIter = NULL; + void *data; + SKlv klv; + data = taosHashIterate(pReq->info, pIter); + while (data != NULL) { + taosHashGetKey(data, &klv.key, (size_t *)&klv.keyLen); + klv.valueLen = taosHashGetDataLen(data); + klv.value = data; + taosEncodeSKlv(buf, &klv); + + data = taosHashIterate(pReq->info, pIter); + } + return tlen; +} + +void *tDeserializeClientHbReq(void *buf, SClientHbReq *pReq) { + ASSERT(pReq->info != NULL); + buf = taosDecodeSClientHbKey(buf, &pReq->connKey); + + // TODO: error handling + if (pReq->info == NULL) { + pReq->info = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + } + SKlv klv; + buf = taosDecodeSKlv(buf, &klv); + taosHashPut(pReq->info, klv.key, klv.keyLen, klv.value, klv.valueLen); + + return buf; +} + int tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { int tlen = 0; @@ -148,4 +182,4 @@ void *tSVCreateTbBatchReqDeserialize(void *buf, SVCreateTbBatchReq *pReq) { } return buf; -} \ No newline at end of file +}