diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 7ff52b83d56b21357ac1c418aa2d29ab8bfdf5a5..632d99878ce726ca042ff46c09b74a8e6606c156 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -133,47 +133,18 @@ typedef enum _mgmt_table { #define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC) #define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0) -typedef struct SKlv { +typedef struct SKv { int32_t keyLen; int32_t valueLen; void* key; void* value; -} SKlv; +} SKv; -static FORCE_INLINE int taosEncodeSKlv(void** buf, const SKlv* pKlv) { - int tlen = 0; - tlen += taosEncodeFixedI32(buf, pKlv->keyLen); - tlen += taosEncodeFixedI32(buf, pKlv->valueLen); - tlen += taosEncodeBinary(buf, pKlv->key, pKlv->keyLen); - tlen += taosEncodeBinary(buf, pKlv->value, pKlv->valueLen); - return tlen; -} - -static FORCE_INLINE void* taosDecodeSKlv(void* buf, SKlv* pKlv) { - buf = taosDecodeFixedI32(buf, &pKlv->keyLen); - buf = taosDecodeFixedI32(buf, &pKlv->valueLen); - buf = taosDecodeBinary(buf, &pKlv->key, pKlv->keyLen); - buf = taosDecodeBinary(buf, &pKlv->value, pKlv->valueLen); - return buf; -} typedef struct SClientHbKey { int32_t connId; int32_t hbType; } SClientHbKey; -static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) { - int tlen = 0; - tlen += taosEncodeFixedI32(buf, pKey->connId); - tlen += taosEncodeFixedI32(buf, pKey->hbType); - return tlen; -} - -static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) { - buf = taosDecodeFixedI32(buf, &pKey->connId); - buf = taosDecodeFixedI32(buf, &pKey->hbType); - return buf; -} - typedef struct SClientHbReq { SClientHbKey connKey; SHashObj* info; // hash @@ -184,9 +155,6 @@ typedef struct SClientHbBatchReq { SArray* reqs; // SArray } SClientHbBatchReq; -int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq); -void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq); - typedef struct SClientHbRsp { SClientHbKey connKey; int32_t status; @@ -200,6 +168,58 @@ typedef struct SClientHbBatchRsp { SArray* rsps; // SArray } SClientHbBatchRsp; +static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { + return taosIntHash_64(key, keyLen); +} + +int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq); +void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq); + +static FORCE_INLINE void tFreeClientHbReq(void *pReq) { + SClientHbReq* req = (SClientHbReq*)pReq; + taosHashCleanup(req->info); + free(pReq); +} + +int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq); +void* tDeserializeClientHbBatchReq(void* buf, SClientHbBatchReq* pReq); + +static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq) { + SClientHbBatchReq *req = (SClientHbBatchReq*)pReq; + taosArrayDestroyEx(req->reqs, tFreeClientHbReq); + free(pReq); +} + +static FORCE_INLINE int taosEncodeSKv(void** buf, const SKv* pKv) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pKv->keyLen); + tlen += taosEncodeFixedI32(buf, pKv->valueLen); + tlen += taosEncodeBinary(buf, pKv->key, pKv->keyLen); + tlen += taosEncodeBinary(buf, pKv->value, pKv->valueLen); + return tlen; +} + +static FORCE_INLINE void* taosDecodeSKv(void* buf, SKv* pKv) { + buf = taosDecodeFixedI32(buf, &pKv->keyLen); + buf = taosDecodeFixedI32(buf, &pKv->valueLen); + buf = taosDecodeBinary(buf, &pKv->key, pKv->keyLen); + buf = taosDecodeBinary(buf, &pKv->value, pKv->valueLen); + return buf; +} + +static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pKey->connId); + tlen += taosEncodeFixedI32(buf, pKey->hbType); + return tlen; +} + +static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) { + buf = taosDecodeFixedI32(buf, &pKey->connId); + buf = taosDecodeFixedI32(buf, &pKey->hbType); + return buf; +} + typedef struct SBuildTableMetaInput { int32_t vgId; char* dbName; diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 9a7ffa8c187a8233f8cc76c7738896024ea793b0..c5da68f0a63eba0314a887767a917b02b4a3f390 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -23,6 +23,7 @@ extern "C" { #include "query.h" #include "tmsg.h" #include "tarray.h" +#include "trpc.h" #define QUERY_TYPE_MERGE 1 #define QUERY_TYPE_PARTIAL 2 diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index da88366f11cb9ce4598605a940182da9796a19ab..a3ee59e2e008ccee7e1ab26dda8a8e51eca4e45d 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -103,55 +103,6 @@ typedef struct STableMetaOutput { STableMeta *tbMeta; } STableMetaOutput; -typedef struct SDataBuf { - void *pData; - uint32_t len; -} SDataBuf; - -typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code); -typedef int32_t (*__async_exec_fn_t)(void* param); - -typedef struct SMsgSendInfo { - __async_send_cb_fn_t fp; //async callback function - void *param; - uint64_t requestId; - uint64_t requestObjRefId; - int32_t msgType; - SDataBuf msgInfo; -} SMsgSendInfo; - -typedef struct SQueryNodeAddr{ - int32_t nodeId; //vgId or qnodeId - int8_t inUse; - int8_t numOfEps; - SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; -} SQueryNodeAddr; - -bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); - -int32_t initTaskQueue(); -int32_t cleanupTaskQueue(); - -/** - * - * @param execFn The asynchronously execution function - * @param execParam The parameters of the execFn - * @param code The response code during execution the execFn - * @return - */ -int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); - -/** - * Asynchronously send message to server, after the response received, the callback will be incured. - * - * @param pTransporter - * @param epSet - * @param pTransporterId - * @param pInfo - * @return - */ -int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo); - const SSchema* tGetTbnameColumnSchema(); void initQueryModuleMsgHandle(); diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index b88d45fbd661fd4c1fce9c71abcabacb9b007404..e28098dfbf08124ff58f81ee6c30843e739bca69 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -84,6 +84,55 @@ void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) int rpcReportProgress(void *pConn, char *pCont, int contLen); void rpcCancelRequest(int64_t rid); +typedef struct SDataBuf { + void *pData; + uint32_t len; +} SDataBuf; + +typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code); +typedef int32_t (*__async_exec_fn_t)(void* param); + +typedef struct SMsgSendInfo { + __async_send_cb_fn_t fp; //async callback function + void *param; + uint64_t requestId; + uint64_t requestObjRefId; + int32_t msgType; + SDataBuf msgInfo; +} SMsgSendInfo; + +typedef struct SQueryNodeAddr{ + int32_t nodeId; //vgId or qnodeId + int8_t inUse; + int8_t numOfEps; + SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; +} SQueryNodeAddr; + +bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); + +int32_t initTaskQueue(); +int32_t cleanupTaskQueue(); + +/** + * + * @param execFn The asynchronously execution function + * @param execParam The parameters of the execFn + * @param code The response code during execution the execFn + * @return + */ +int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); + +/** + * Asynchronously send message to server, after the response received, the callback will be incured. + * + * @param pTransporter + * @param epSet + * @param pTransporterId + * @param pInfo + * @return + */ +int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo); + #ifdef __cplusplus } #endif diff --git a/include/util/thash.h b/include/util/thash.h index 06a4ae1664d01e17eac3b69436a68bae67bc0d4c..3a614a73a6283a5e170163c03b7bad4dca6f2bea 100644 --- a/include/util/thash.h +++ b/include/util/thash.h @@ -214,6 +214,24 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p); int32_t taosHashGetKey(void *data, void** key, size_t* keyLen); +/** + * Get the corresponding key information for a given data in hash table, using memcpy + * @param data + * @param dst + * @return + */ +static FORCE_INLINE int32_t taosHashCopyKey(void *data, void* dst) { + if (NULL == data || NULL == dst) { + return -1; + } + + SHashNode * node = GET_HASH_PNODE(data); + void* key = GET_HASH_NODE_KEY(node); + memcpy(dst, key, node->keyLen); + + return 0; +} + /** * Get the corresponding data length for a given data in hash table * @param data diff --git a/source/client/inc/clientHb.h b/source/client/inc/clientHb.h index 73adb41308cfa71ae109bc9f399500e1ca395d6f..7bc4311b29dc5c89203b3b32054ec7186dfee073 100644 --- a/source/client/inc/clientHb.h +++ b/source/client/inc/clientHb.h @@ -18,36 +18,61 @@ #include "thash.h" #include "tmsg.h" +#define HEARTBEAT_INTERVAL 1500 // ms + typedef enum { - mq = 0, - // type can be added here + HEARTBEAT_TYPE_MQ = 0, + // types can be added here // HEARTBEAT_TYPE_MAX } EHbType; typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq); -typedef int32_t (*FGetConnInfo)(SClientHbKey connKey, void* param); + +typedef struct SAppHbMgr { + // statistics + int32_t reportCnt; + int32_t connKeyCnt; + int64_t reportBytes; // not implemented + int64_t startTime; + // ctl + SRWLatch lock; // lock is used in serialization + // connection + void* transporter; + SEpSet epSet; + // info + SHashObj* activeInfo; // hash + SHashObj* getInfoFuncs; // hash +} SAppHbMgr; typedef struct SClientHbMgr { - int8_t inited; - int32_t reportInterval; // unit ms - int32_t stats; - SRWLatch lock; - SHashObj* activeInfo; // hash - SHashObj* getInfoFuncs; // hash - FHbRspHandle handle[HEARTBEAT_TYPE_MAX]; - // input queue + int8_t inited; + // ctl + int8_t threadStop; + pthread_t thread; + pthread_mutex_t lock; // used when app init and cleanup + SArray* appHbMgrs; // SArray one for each cluster + FHbRspHandle handle[HEARTBEAT_TYPE_MAX]; } SClientHbMgr; -static SClientHbMgr clientHbMgr = {0}; +// TODO: embed param into function +// return type: SArray +typedef SArray* (*FGetConnInfo)(SClientHbKey connKey, void* param); +// global, called by mgmt int hbMgrInit(); void hbMgrCleanUp(); -int hbHandleRsp(void* hbMsg); - +int hbHandleRsp(SClientHbBatchRsp* hbRsp); -int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func); +// cluster level +SAppHbMgr* appHbMgrInit(void* transporter, SEpSet epSet); +void appHbMgrCleanup(SAppHbMgr* pAppHbMgr); +// conn level +int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func); +void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey); -int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen); +int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen); +// mq +void hbMgrInitMqHbRspHandle(); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 7daa1204d0022b216aac7636c18a2be39b9b2dfd..9bbd62c1d9bfcdf095c0fd15140e2f644cb756b8 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -14,64 +14,214 @@ */ #include "clientHb.h" +#include "trpc.h" -static int32_t mqHbRspHandle(SClientHbRsp* pReq) { +static SClientHbMgr clientHbMgr = {0}; + +static int32_t hbCreateThread(); +static void hbStopThread(); + +static int32_t hbMqHbRspHandle(SClientHbRsp* pReq) { return 0; } -uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { +void hbMgrInitMqHbRspHandle() { + clientHbMgr.handle[HEARTBEAT_TYPE_MQ] = hbMqHbRspHandle; +} + +static FORCE_INLINE void hbMgrInitHandle() { + // init all handle + hbMgrInitMqHbRspHandle(); +} + +SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { + SClientHbBatchReq* pReq = malloc(sizeof(SClientHbBatchReq)); + if (pReq == NULL) { + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + return NULL; + } + int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt); + pReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq)); + + void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL); + while (pIter != NULL) { + taosArrayPush(pReq->reqs, pIter); + SClientHbReq* pOneReq = pIter; + taosHashClear(pOneReq->info); + + pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); + } + + pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, NULL); + while (pIter != NULL) { + FGetConnInfo getConnInfoFp = (FGetConnInfo)pIter; + SClientHbKey connKey; + taosHashCopyKey(pIter, &connKey); + getConnInfoFp(connKey, NULL); + + pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); + } + + return pReq; +} + +static void* hbThreadFunc(void* param) { + setThreadName("hb"); + while (1) { + int8_t threadStop = atomic_load_8(&clientHbMgr.threadStop); + if(threadStop) { + break; + } + + int sz = taosArrayGetSize(clientHbMgr.appHbMgrs); + for(int i = 0; i < sz; i++) { + SAppHbMgr* pAppHbMgr = taosArrayGet(clientHbMgr.appHbMgrs, i); + SClientHbBatchReq* pReq = hbGatherAllInfo(pAppHbMgr); + void* reqStr = NULL; + int tlen = tSerializeSClientHbBatchReq(&reqStr, pReq); + SMsgSendInfo info; + /*info.fp = hbHandleRsp;*/ + + int64_t transporterId = 0; + asyncSendMsgToServer(pAppHbMgr->transporter, &pAppHbMgr->epSet, &transporterId, &info); + tFreeClientHbBatchReq(pReq); + + atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1); + taosMsleep(HEARTBEAT_INTERVAL); + } + } + return NULL; +} + +static int32_t hbCreateThread() { + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + + if (pthread_create(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + pthread_attr_destroy(&thAttr); return 0; } -static void hbMgrInitMqHbFunc() { - clientHbMgr.handle[mq] = mqHbRspHandle; +static void hbStopThread() { + atomic_store_8(&clientHbMgr.threadStop, 1); +} + +SAppHbMgr* appHbMgrInit(void* transporter, SEpSet epSet) { + SAppHbMgr* pAppHbMgr = malloc(sizeof(SAppHbMgr)); + if (pAppHbMgr == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + // init stat + pAppHbMgr->startTime = taosGetTimestampMs(); + + // init connection info + pAppHbMgr->transporter = transporter; + pAppHbMgr->epSet = epSet; + + // init hash info + pAppHbMgr->activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + pAppHbMgr->activeInfo->freeFp = tFreeClientHbReq; + // init getInfoFunc + pAppHbMgr->getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + + taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr); + return pAppHbMgr; +} + +void appHbMgrCleanup(SAppHbMgr* pAppHbMgr) { + pthread_mutex_lock(&clientHbMgr.lock); + + int sz = taosArrayGetSize(clientHbMgr.appHbMgrs); + for (int i = 0; i < sz; i++) { + SAppHbMgr* pTarget = taosArrayGet(clientHbMgr.appHbMgrs, i); + if (pAppHbMgr == pTarget) { + taosHashCleanup(pTarget->activeInfo); + taosHashCleanup(pTarget->getInfoFuncs); + } + } + + pthread_mutex_unlock(&clientHbMgr.lock); } int hbMgrInit() { - //init once + // init once int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1); if (old == 1) return 0; - //init config - clientHbMgr.reportInterval = 1500; + clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void*)); + pthread_mutex_init(&clientHbMgr.lock, NULL); - //init stat - clientHbMgr.stats = 0; - - //init lock - taosInitRWLatch(&clientHbMgr.lock); + // init handle funcs + hbMgrInitHandle(); - //init handle funcs - hbMgrInitMqHbFunc(); + // init backgroud thread + hbCreateThread(); - //init hash info - clientHbMgr.activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); - //init getInfoFunc - clientHbMgr.getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); return 0; } void hbMgrCleanUp() { + // destroy all appHbMgr int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0); if (old == 0) return; - taosHashCleanup(clientHbMgr.activeInfo); - taosHashCleanup(clientHbMgr.getInfoFuncs); + taosArrayDestroy(clientHbMgr.appHbMgrs); + } -int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func) { - +int hbHandleRsp(SClientHbBatchRsp* hbRsp) { + int64_t reqId = hbRsp->reqId; + int64_t rspId = hbRsp->rspId; + + SArray* rsps = hbRsp->rsps; + int32_t sz = taosArrayGetSize(rsps); + for (int i = 0; i < sz; i++) { + SClientHbRsp* pRsp = taosArrayGet(rsps, i); + if (pRsp->connKey.hbType < HEARTBEAT_TYPE_MAX) { + clientHbMgr.handle[pRsp->connKey.hbType](pRsp); + } else { + // discard rsp + } + } return 0; } -int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) { - //lock +int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func) { + // init hash in activeinfo + void* data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); + if (data != NULL) { + return 0; + } + SClientHbReq hbReq; + hbReq.connKey = connKey; + hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq)); + // init hash + if (func != NULL) { + taosHashPut(pAppHbMgr->getInfoFuncs, &connKey, sizeof(SClientHbKey), func, sizeof(FGetConnInfo)); + } + + atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1); + return 0; +} + +void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey) { + taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); + taosHashRemove(pAppHbMgr->getInfoFuncs, &connKey, sizeof(SClientHbKey)); + atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1); +} + +int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) { + // find req by connection id + SClientHbReq* pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); + ASSERT(pReq != NULL); - //find req by connection id - SClientHbReq* data = taosHashGet(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey)); - ASSERT(data != NULL); - taosHashPut(data->info, key, keyLen, value, valueLen); + taosHashPut(pReq->info, key, keyLen, value, valueLen); - //unlock return 0; } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index c99ca151fcc4c0670fd84a6be6e93f6ee5548113..53f59c7d57f4db395bdef1d2ac1ca5e0ef698b25 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -89,17 +89,17 @@ int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) { int tlen = 0; tlen += taosEncodeSClientHbKey(buf, &pReq->connKey); - void *pIter = NULL; - void *data; - SKlv klv; - data = taosHashIterate(pReq->info, pIter); - while (data != NULL) { - taosHashGetKey(data, &klv.key, (size_t *)&klv.keyLen); - klv.valueLen = taosHashGetDataLen(data); - klv.value = data; - taosEncodeSKlv(buf, &klv); - - data = taosHashIterate(pReq->info, pIter); + int kvNum = taosHashGetSize(pReq->info); + tlen += taosEncodeFixedI32(buf, kvNum); + SKv kv; + void* pIter = taosHashIterate(pReq->info, pIter); + while (pIter != NULL) { + taosHashGetKey(pIter, &kv.key, (size_t *)&kv.keyLen); + kv.valueLen = taosHashGetDataLen(pIter); + kv.value = pIter; + tlen += taosEncodeSKv(buf, &kv); + + pIter = taosHashIterate(pReq->info, pIter); } return tlen; } @@ -109,16 +109,27 @@ void *tDeserializeClientHbReq(void *buf, SClientHbReq *pReq) { buf = taosDecodeSClientHbKey(buf, &pReq->connKey); // TODO: error handling - if (pReq->info == NULL) { - pReq->info = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + int kvNum; + taosDecodeFixedI32(buf, &kvNum); + pReq->info = taosHashInit(kvNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + for(int i = 0; i < kvNum; i++) { + SKv kv; + buf = taosDecodeSKv(buf, &kv); + taosHashPut(pReq->info, kv.key, kv.keyLen, kv.value, kv.valueLen); } - SKlv klv; - buf = taosDecodeSKlv(buf, &klv); - taosHashPut(pReq->info, klv.key, klv.keyLen, klv.value, klv.valueLen); return buf; } +int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq) { + int tlen = 0; + return tlen; +} + +void* tDeserializeClientHbBatchReq(void* buf, SClientHbBatchReq* pReq) { + return buf; +} + int tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { int tlen = 0; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index f030398144b25ee7306f978db0e36e045ca9f4b0..591367c519e16f2e889bc269c70de0c77a4720d2 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -91,14 +91,15 @@ static int32_t mndRestoreWal(SMnode *pMnode) { if (sdbWriteFile(pSdb) != 0) { goto WAL_RESTORE_OVER; } - } - if (walBeginSnapshot(pWal, sdbVer) < 0) { - goto WAL_RESTORE_OVER; - } + if (walBeginSnapshot(pWal, sdbVer) < 0) { + goto WAL_RESTORE_OVER; + } + + if (walEndSnapshot(pWal) < 0) { + goto WAL_RESTORE_OVER; + } - if (walEndSnapshot(pWal) < 0) { - goto WAL_RESTORE_OVER; } code = 0; @@ -181,4 +182,4 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { bool mndIsMaster(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; return pMgmt->state == TAOS_SYNC_STATE_LEADER; -} \ No newline at end of file +} diff --git a/source/libs/planner/CMakeLists.txt b/source/libs/planner/CMakeLists.txt index 6234dbe0acd9a960ed390a719ab9c6e93716441b..8d8c148fdece005cbc25f3e97986f04f1663a5ce 100644 --- a/source/libs/planner/CMakeLists.txt +++ b/source/libs/planner/CMakeLists.txt @@ -8,7 +8,8 @@ target_include_directories( target_link_libraries( planner - PRIVATE os util catalog cjson parser transport function qcom + PRIVATE os util catalog cjson parser function qcom + PUBLIC transport ) if(${BUILD_TEST}) diff --git a/source/libs/qcom/test/CMakeLists.txt b/source/libs/qcom/test/CMakeLists.txt index 7adec3752aeadefb791b203b66d5c40bb3fcae5b..e3a0e11a3231b4e3cad24c07c58294bd6069565a 100644 --- a/source/libs/qcom/test/CMakeLists.txt +++ b/source/libs/qcom/test/CMakeLists.txt @@ -15,5 +15,5 @@ TARGET_INCLUDE_DIRECTORIES( TARGET_LINK_LIBRARIES( queryUtilTest - PUBLIC os util gtest qcom common + PUBLIC os util gtest qcom common transport ) diff --git a/source/libs/qcom/test/queryTest.cpp b/source/libs/qcom/test/queryTest.cpp index ddf89c62727c5a0d8f2a4fce9d66d48d7fca7d0b..8fc6b7e5297057c6d8b41c434eb4a1b3f3f86cfc 100644 --- a/source/libs/qcom/test/queryTest.cpp +++ b/source/libs/qcom/test/queryTest.cpp @@ -17,6 +17,7 @@ #include #include "tmsg.h" #include "query.h" +#include "trpc.h" #pragma GCC diagnostic ignored "-Wwrite-strings" #pragma GCC diagnostic ignored "-Wunused-function" @@ -80,4 +81,4 @@ TEST(testCase, error_in_async_test) { taosAsyncExec(testPrintError, p, &code); usleep(1000); printf("Error code:%d after asynchronously exec function\n", code); -} \ No newline at end of file +} diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index 871c95193fdef3cf3d7da1820e8a8b3633948efb..7631593dd85aff973e7bffb3bd2085bf40be3d70 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -131,6 +131,7 @@ int walMetaDeserialize(SWal* pWal, const char* bytes); // seek section int walChangeFile(SWal* pWal, int64_t ver); +int walChangeFileToLast(SWal* pWal); // seek section end int64_t walGetSeq(); diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 6164ec2baff8df22bbaf7a09ea17bd513b4ef66b..270a26bf80f2ef34ed5761500de68939a9a72ac7 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -184,6 +184,7 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) { } taosArraySetSize(pArray, sz); pWal->fileInfoSet = pArray; + pWal->writeCur = sz - 1; cJSON_Delete(pRoot); return 0; } diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index 189881c86d6cbfa8ddae98d65e2a24bd78f1f83a..d12acb52c6aa9f9f56f89b5244d04cc8341067f9 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -232,7 +232,8 @@ static int32_t walCreateThread() { if (pthread_create(&tsWal.thread, &thAttr, walThreadFunc, NULL) != 0) { wError("failed to create wal thread since %s", strerror(errno)); - return TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; } pthread_attr_destroy(&thAttr); diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 39bf06ceddb6acdf641f001e6cb2442248fc084a..975f232e3d3fd3914a57a562ec53fa2d2f6f5f95 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -244,6 +244,7 @@ int walRoll(SWal *pWal) { pWal->writeIdxTfd = idxTfd; pWal->writeLogTfd = logTfd; pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1; + ASSERT(pWal->writeCur >= 0); pWal->lastRollSeq = walGetSeq(); return 0; @@ -253,6 +254,7 @@ static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { SWalIdxEntry entry = {.ver = ver, .offset = offset}; int size = tfWrite(pWal->writeIdxTfd, &entry, sizeof(SWalIdxEntry)); if (size != sizeof(SWalIdxEntry)) { + terrno = TAOS_SYSTEM_ERROR(errno); // TODO truncate return -1; } @@ -286,7 +288,13 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i } /*if (!tfValid(pWal->writeLogTfd)) return -1;*/ + ASSERT(pWal->writeCur >= 0); + pthread_mutex_lock(&pWal->mutex); + if (pWal->writeIdxTfd == -1 || pWal->writeLogTfd == -1) { + walChangeFileToLast(pWal); + } + pWal->writeHead.head.version = index; int64_t offset = walGetCurFileOffset(pWal); @@ -308,6 +316,7 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno)); } + code = walWriteIndex(pWal, index, offset); if (code != 0) { // TODO diff --git a/source/util/src/thash.c b/source/util/src/thash.c index 2f4e43b13f81aad28d54503802c826775841b823..2b013bfdd08bf12226db447477d6f3e69fce3f2f 100644 --- a/source/util/src/thash.c +++ b/source/util/src/thash.c @@ -812,7 +812,9 @@ FORCE_INLINE int32_t taosHashGetKey(void *data, void** key, size_t* keyLen) { SHashNode * node = GET_HASH_PNODE(data); *key = GET_HASH_NODE_KEY(node); - *keyLen = node->keyLen; + if (keyLen) { + *keyLen = node->keyLen; + } return 0; }