未验证 提交 3cce6bd8 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #9705 from taosdata/feature/tq

fix wal crash
...@@ -133,47 +133,18 @@ typedef enum _mgmt_table { ...@@ -133,47 +133,18 @@ typedef enum _mgmt_table {
#define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC) #define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC)
#define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0) #define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0)
typedef struct SKlv { typedef struct SKv {
int32_t keyLen; int32_t keyLen;
int32_t valueLen; int32_t valueLen;
void* key; void* key;
void* value; void* value;
} SKlv; } SKv;
static FORCE_INLINE int taosEncodeSKlv(void** buf, const SKlv* pKlv) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, pKlv->keyLen);
tlen += taosEncodeFixedI32(buf, pKlv->valueLen);
tlen += taosEncodeBinary(buf, pKlv->key, pKlv->keyLen);
tlen += taosEncodeBinary(buf, pKlv->value, pKlv->valueLen);
return tlen;
}
static FORCE_INLINE void* taosDecodeSKlv(void* buf, SKlv* pKlv) {
buf = taosDecodeFixedI32(buf, &pKlv->keyLen);
buf = taosDecodeFixedI32(buf, &pKlv->valueLen);
buf = taosDecodeBinary(buf, &pKlv->key, pKlv->keyLen);
buf = taosDecodeBinary(buf, &pKlv->value, pKlv->valueLen);
return buf;
}
typedef struct SClientHbKey { typedef struct SClientHbKey {
int32_t connId; int32_t connId;
int32_t hbType; int32_t hbType;
} SClientHbKey; } SClientHbKey;
static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, pKey->connId);
tlen += taosEncodeFixedI32(buf, pKey->hbType);
return tlen;
}
static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) {
buf = taosDecodeFixedI32(buf, &pKey->connId);
buf = taosDecodeFixedI32(buf, &pKey->hbType);
return buf;
}
typedef struct SClientHbReq { typedef struct SClientHbReq {
SClientHbKey connKey; SClientHbKey connKey;
SHashObj* info; // hash<Slv.key, Sklv> SHashObj* info; // hash<Slv.key, Sklv>
...@@ -184,9 +155,6 @@ typedef struct SClientHbBatchReq { ...@@ -184,9 +155,6 @@ typedef struct SClientHbBatchReq {
SArray* reqs; // SArray<SClientHbReq> SArray* reqs; // SArray<SClientHbReq>
} SClientHbBatchReq; } SClientHbBatchReq;
int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq);
void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq);
typedef struct SClientHbRsp { typedef struct SClientHbRsp {
SClientHbKey connKey; SClientHbKey connKey;
int32_t status; int32_t status;
...@@ -200,6 +168,58 @@ typedef struct SClientHbBatchRsp { ...@@ -200,6 +168,58 @@ typedef struct SClientHbBatchRsp {
SArray* rsps; // SArray<SClientHbRsp> SArray* rsps; // SArray<SClientHbRsp>
} SClientHbBatchRsp; } SClientHbBatchRsp;
static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) {
return taosIntHash_64(key, keyLen);
}
int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq);
void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq);
static FORCE_INLINE void tFreeClientHbReq(void *pReq) {
SClientHbReq* req = (SClientHbReq*)pReq;
taosHashCleanup(req->info);
free(pReq);
}
int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq);
void* tDeserializeClientHbBatchReq(void* buf, SClientHbBatchReq* pReq);
static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq) {
SClientHbBatchReq *req = (SClientHbBatchReq*)pReq;
taosArrayDestroyEx(req->reqs, tFreeClientHbReq);
free(pReq);
}
static FORCE_INLINE int taosEncodeSKv(void** buf, const SKv* pKv) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, pKv->keyLen);
tlen += taosEncodeFixedI32(buf, pKv->valueLen);
tlen += taosEncodeBinary(buf, pKv->key, pKv->keyLen);
tlen += taosEncodeBinary(buf, pKv->value, pKv->valueLen);
return tlen;
}
static FORCE_INLINE void* taosDecodeSKv(void* buf, SKv* pKv) {
buf = taosDecodeFixedI32(buf, &pKv->keyLen);
buf = taosDecodeFixedI32(buf, &pKv->valueLen);
buf = taosDecodeBinary(buf, &pKv->key, pKv->keyLen);
buf = taosDecodeBinary(buf, &pKv->value, pKv->valueLen);
return buf;
}
static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, pKey->connId);
tlen += taosEncodeFixedI32(buf, pKey->hbType);
return tlen;
}
static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) {
buf = taosDecodeFixedI32(buf, &pKey->connId);
buf = taosDecodeFixedI32(buf, &pKey->hbType);
return buf;
}
typedef struct SBuildTableMetaInput { typedef struct SBuildTableMetaInput {
int32_t vgId; int32_t vgId;
char* dbName; char* dbName;
......
...@@ -23,6 +23,7 @@ extern "C" { ...@@ -23,6 +23,7 @@ extern "C" {
#include "query.h" #include "query.h"
#include "tmsg.h" #include "tmsg.h"
#include "tarray.h" #include "tarray.h"
#include "trpc.h"
#define QUERY_TYPE_MERGE 1 #define QUERY_TYPE_MERGE 1
#define QUERY_TYPE_PARTIAL 2 #define QUERY_TYPE_PARTIAL 2
......
...@@ -103,55 +103,6 @@ typedef struct STableMetaOutput { ...@@ -103,55 +103,6 @@ typedef struct STableMetaOutput {
STableMeta *tbMeta; STableMeta *tbMeta;
} STableMetaOutput; } STableMetaOutput;
typedef struct SDataBuf {
void *pData;
uint32_t len;
} SDataBuf;
typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code);
typedef int32_t (*__async_exec_fn_t)(void* param);
typedef struct SMsgSendInfo {
__async_send_cb_fn_t fp; //async callback function
void *param;
uint64_t requestId;
uint64_t requestObjRefId;
int32_t msgType;
SDataBuf msgInfo;
} SMsgSendInfo;
typedef struct SQueryNodeAddr{
int32_t nodeId; //vgId or qnodeId
int8_t inUse;
int8_t numOfEps;
SEpAddrMsg epAddr[TSDB_MAX_REPLICA];
} SQueryNodeAddr;
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
int32_t initTaskQueue();
int32_t cleanupTaskQueue();
/**
*
* @param execFn The asynchronously execution function
* @param execParam The parameters of the execFn
* @param code The response code during execution the execFn
* @return
*/
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
/**
* Asynchronously send message to server, after the response received, the callback will be incured.
*
* @param pTransporter
* @param epSet
* @param pTransporterId
* @param pInfo
* @return
*/
int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo);
const SSchema* tGetTbnameColumnSchema(); const SSchema* tGetTbnameColumnSchema();
void initQueryModuleMsgHandle(); void initQueryModuleMsgHandle();
......
...@@ -84,6 +84,55 @@ void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) ...@@ -84,6 +84,55 @@ void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp)
int rpcReportProgress(void *pConn, char *pCont, int contLen); int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(int64_t rid); void rpcCancelRequest(int64_t rid);
typedef struct SDataBuf {
void *pData;
uint32_t len;
} SDataBuf;
typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code);
typedef int32_t (*__async_exec_fn_t)(void* param);
typedef struct SMsgSendInfo {
__async_send_cb_fn_t fp; //async callback function
void *param;
uint64_t requestId;
uint64_t requestObjRefId;
int32_t msgType;
SDataBuf msgInfo;
} SMsgSendInfo;
typedef struct SQueryNodeAddr{
int32_t nodeId; //vgId or qnodeId
int8_t inUse;
int8_t numOfEps;
SEpAddrMsg epAddr[TSDB_MAX_REPLICA];
} SQueryNodeAddr;
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
int32_t initTaskQueue();
int32_t cleanupTaskQueue();
/**
*
* @param execFn The asynchronously execution function
* @param execParam The parameters of the execFn
* @param code The response code during execution the execFn
* @return
*/
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
/**
* Asynchronously send message to server, after the response received, the callback will be incured.
*
* @param pTransporter
* @param epSet
* @param pTransporterId
* @param pInfo
* @return
*/
int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -214,6 +214,24 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p); ...@@ -214,6 +214,24 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p);
int32_t taosHashGetKey(void *data, void** key, size_t* keyLen); int32_t taosHashGetKey(void *data, void** key, size_t* keyLen);
/**
* Get the corresponding key information for a given data in hash table, using memcpy
* @param data
* @param dst
* @return
*/
static FORCE_INLINE int32_t taosHashCopyKey(void *data, void* dst) {
if (NULL == data || NULL == dst) {
return -1;
}
SHashNode * node = GET_HASH_PNODE(data);
void* key = GET_HASH_NODE_KEY(node);
memcpy(dst, key, node->keyLen);
return 0;
}
/** /**
* Get the corresponding data length for a given data in hash table * Get the corresponding data length for a given data in hash table
* @param data * @param data
......
...@@ -18,36 +18,61 @@ ...@@ -18,36 +18,61 @@
#include "thash.h" #include "thash.h"
#include "tmsg.h" #include "tmsg.h"
#define HEARTBEAT_INTERVAL 1500 // ms
typedef enum { typedef enum {
mq = 0, HEARTBEAT_TYPE_MQ = 0,
// type can be added here // types can be added here
// //
HEARTBEAT_TYPE_MAX HEARTBEAT_TYPE_MAX
} EHbType; } EHbType;
typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq); typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq);
typedef int32_t (*FGetConnInfo)(SClientHbKey connKey, void* param);
typedef struct SAppHbMgr {
// statistics
int32_t reportCnt;
int32_t connKeyCnt;
int64_t reportBytes; // not implemented
int64_t startTime;
// ctl
SRWLatch lock; // lock is used in serialization
// connection
void* transporter;
SEpSet epSet;
// info
SHashObj* activeInfo; // hash<SClientHbKey, SClientHbReq>
SHashObj* getInfoFuncs; // hash<SClientHbKey, FGetConnInfo>
} SAppHbMgr;
typedef struct SClientHbMgr { typedef struct SClientHbMgr {
int8_t inited; int8_t inited;
int32_t reportInterval; // unit ms // ctl
int32_t stats; int8_t threadStop;
SRWLatch lock; pthread_t thread;
SHashObj* activeInfo; // hash<SClientHbKey, SClientHbReq> pthread_mutex_t lock; // used when app init and cleanup
SHashObj* getInfoFuncs; // hash<SClientHbKey, FGetConnInfo> SArray* appHbMgrs; // SArray<SAppHbMgr*> one for each cluster
FHbRspHandle handle[HEARTBEAT_TYPE_MAX]; FHbRspHandle handle[HEARTBEAT_TYPE_MAX];
// input queue
} SClientHbMgr; } SClientHbMgr;
static SClientHbMgr clientHbMgr = {0}; // TODO: embed param into function
// return type: SArray<Skv>
typedef SArray* (*FGetConnInfo)(SClientHbKey connKey, void* param);
// global, called by mgmt
int hbMgrInit(); int hbMgrInit();
void hbMgrCleanUp(); void hbMgrCleanUp();
int hbHandleRsp(void* hbMsg); int hbHandleRsp(SClientHbBatchRsp* hbRsp);
int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func); // cluster level
SAppHbMgr* appHbMgrInit(void* transporter, SEpSet epSet);
void appHbMgrCleanup(SAppHbMgr* pAppHbMgr);
// conn level
int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func);
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey);
int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen); int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen);
// mq
void hbMgrInitMqHbRspHandle();
...@@ -14,64 +14,214 @@ ...@@ -14,64 +14,214 @@
*/ */
#include "clientHb.h" #include "clientHb.h"
#include "trpc.h"
static int32_t mqHbRspHandle(SClientHbRsp* pReq) { static SClientHbMgr clientHbMgr = {0};
static int32_t hbCreateThread();
static void hbStopThread();
static int32_t hbMqHbRspHandle(SClientHbRsp* pReq) {
return 0; return 0;
} }
uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { void hbMgrInitMqHbRspHandle() {
clientHbMgr.handle[HEARTBEAT_TYPE_MQ] = hbMqHbRspHandle;
}
static FORCE_INLINE void hbMgrInitHandle() {
// init all handle
hbMgrInitMqHbRspHandle();
}
SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
SClientHbBatchReq* pReq = malloc(sizeof(SClientHbBatchReq));
if (pReq == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL;
}
int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
pReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL);
while (pIter != NULL) {
taosArrayPush(pReq->reqs, pIter);
SClientHbReq* pOneReq = pIter;
taosHashClear(pOneReq->info);
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
}
pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, NULL);
while (pIter != NULL) {
FGetConnInfo getConnInfoFp = (FGetConnInfo)pIter;
SClientHbKey connKey;
taosHashCopyKey(pIter, &connKey);
getConnInfoFp(connKey, NULL);
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
}
return pReq;
}
static void* hbThreadFunc(void* param) {
setThreadName("hb");
while (1) {
int8_t threadStop = atomic_load_8(&clientHbMgr.threadStop);
if(threadStop) {
break;
}
int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
for(int i = 0; i < sz; i++) {
SAppHbMgr* pAppHbMgr = taosArrayGet(clientHbMgr.appHbMgrs, i);
SClientHbBatchReq* pReq = hbGatherAllInfo(pAppHbMgr);
void* reqStr = NULL;
int tlen = tSerializeSClientHbBatchReq(&reqStr, pReq);
SMsgSendInfo info;
/*info.fp = hbHandleRsp;*/
int64_t transporterId = 0;
asyncSendMsgToServer(pAppHbMgr->transporter, &pAppHbMgr->epSet, &transporterId, &info);
tFreeClientHbBatchReq(pReq);
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
taosMsleep(HEARTBEAT_INTERVAL);
}
}
return NULL;
}
static int32_t hbCreateThread() {
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pthread_attr_destroy(&thAttr);
return 0; return 0;
} }
static void hbMgrInitMqHbFunc() { static void hbStopThread() {
clientHbMgr.handle[mq] = mqHbRspHandle; atomic_store_8(&clientHbMgr.threadStop, 1);
}
SAppHbMgr* appHbMgrInit(void* transporter, SEpSet epSet) {
SAppHbMgr* pAppHbMgr = malloc(sizeof(SAppHbMgr));
if (pAppHbMgr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
// init stat
pAppHbMgr->startTime = taosGetTimestampMs();
// init connection info
pAppHbMgr->transporter = transporter;
pAppHbMgr->epSet = epSet;
// init hash info
pAppHbMgr->activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
pAppHbMgr->activeInfo->freeFp = tFreeClientHbReq;
// init getInfoFunc
pAppHbMgr->getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr);
return pAppHbMgr;
}
void appHbMgrCleanup(SAppHbMgr* pAppHbMgr) {
pthread_mutex_lock(&clientHbMgr.lock);
int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
for (int i = 0; i < sz; i++) {
SAppHbMgr* pTarget = taosArrayGet(clientHbMgr.appHbMgrs, i);
if (pAppHbMgr == pTarget) {
taosHashCleanup(pTarget->activeInfo);
taosHashCleanup(pTarget->getInfoFuncs);
}
}
pthread_mutex_unlock(&clientHbMgr.lock);
} }
int hbMgrInit() { int hbMgrInit() {
//init once // init once
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1); int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
if (old == 1) return 0; if (old == 1) return 0;
//init config clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void*));
clientHbMgr.reportInterval = 1500; pthread_mutex_init(&clientHbMgr.lock, NULL);
//init stat // init handle funcs
clientHbMgr.stats = 0; hbMgrInitHandle();
//init lock
taosInitRWLatch(&clientHbMgr.lock);
//init handle funcs // init backgroud thread
hbMgrInitMqHbFunc(); hbCreateThread();
//init hash info
clientHbMgr.activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
//init getInfoFunc
clientHbMgr.getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
return 0; return 0;
} }
void hbMgrCleanUp() { void hbMgrCleanUp() {
// destroy all appHbMgr
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0); int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0);
if (old == 0) return; if (old == 0) return;
taosHashCleanup(clientHbMgr.activeInfo); taosArrayDestroy(clientHbMgr.appHbMgrs);
taosHashCleanup(clientHbMgr.getInfoFuncs);
} }
int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func) { int hbHandleRsp(SClientHbBatchRsp* hbRsp) {
int64_t reqId = hbRsp->reqId;
int64_t rspId = hbRsp->rspId;
SArray* rsps = hbRsp->rsps;
int32_t sz = taosArrayGetSize(rsps);
for (int i = 0; i < sz; i++) {
SClientHbRsp* pRsp = taosArrayGet(rsps, i);
if (pRsp->connKey.hbType < HEARTBEAT_TYPE_MAX) {
clientHbMgr.handle[pRsp->connKey.hbType](pRsp);
} else {
// discard rsp
}
}
return 0; return 0;
} }
int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) { int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func) {
//lock // init hash in activeinfo
void* data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
if (data != NULL) {
return 0;
}
SClientHbReq hbReq;
hbReq.connKey = connKey;
hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq));
// init hash
if (func != NULL) {
taosHashPut(pAppHbMgr->getInfoFuncs, &connKey, sizeof(SClientHbKey), func, sizeof(FGetConnInfo));
}
atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1);
return 0;
}
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey) {
taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
taosHashRemove(pAppHbMgr->getInfoFuncs, &connKey, sizeof(SClientHbKey));
atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
}
int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) {
// find req by connection id
SClientHbReq* pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
ASSERT(pReq != NULL);
//find req by connection id taosHashPut(pReq->info, key, keyLen, value, valueLen);
SClientHbReq* data = taosHashGet(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey));
ASSERT(data != NULL);
taosHashPut(data->info, key, keyLen, value, valueLen);
//unlock
return 0; return 0;
} }
...@@ -89,17 +89,17 @@ int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) { ...@@ -89,17 +89,17 @@ int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) {
int tlen = 0; int tlen = 0;
tlen += taosEncodeSClientHbKey(buf, &pReq->connKey); tlen += taosEncodeSClientHbKey(buf, &pReq->connKey);
void *pIter = NULL; int kvNum = taosHashGetSize(pReq->info);
void *data; tlen += taosEncodeFixedI32(buf, kvNum);
SKlv klv; SKv kv;
data = taosHashIterate(pReq->info, pIter); void* pIter = taosHashIterate(pReq->info, pIter);
while (data != NULL) { while (pIter != NULL) {
taosHashGetKey(data, &klv.key, (size_t *)&klv.keyLen); taosHashGetKey(pIter, &kv.key, (size_t *)&kv.keyLen);
klv.valueLen = taosHashGetDataLen(data); kv.valueLen = taosHashGetDataLen(pIter);
klv.value = data; kv.value = pIter;
taosEncodeSKlv(buf, &klv); tlen += taosEncodeSKv(buf, &kv);
data = taosHashIterate(pReq->info, pIter); pIter = taosHashIterate(pReq->info, pIter);
} }
return tlen; return tlen;
} }
...@@ -109,16 +109,27 @@ void *tDeserializeClientHbReq(void *buf, SClientHbReq *pReq) { ...@@ -109,16 +109,27 @@ void *tDeserializeClientHbReq(void *buf, SClientHbReq *pReq) {
buf = taosDecodeSClientHbKey(buf, &pReq->connKey); buf = taosDecodeSClientHbKey(buf, &pReq->connKey);
// TODO: error handling // TODO: error handling
if (pReq->info == NULL) { int kvNum;
pReq->info = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); taosDecodeFixedI32(buf, &kvNum);
pReq->info = taosHashInit(kvNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
for(int i = 0; i < kvNum; i++) {
SKv kv;
buf = taosDecodeSKv(buf, &kv);
taosHashPut(pReq->info, kv.key, kv.keyLen, kv.value, kv.valueLen);
} }
SKlv klv;
buf = taosDecodeSKlv(buf, &klv);
taosHashPut(pReq->info, klv.key, klv.keyLen, klv.value, klv.valueLen);
return buf; return buf;
} }
int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq) {
int tlen = 0;
return tlen;
}
void* tDeserializeClientHbBatchReq(void* buf, SClientHbBatchReq* pReq) {
return buf;
}
int tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { int tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
int tlen = 0; int tlen = 0;
......
...@@ -91,14 +91,15 @@ static int32_t mndRestoreWal(SMnode *pMnode) { ...@@ -91,14 +91,15 @@ static int32_t mndRestoreWal(SMnode *pMnode) {
if (sdbWriteFile(pSdb) != 0) { if (sdbWriteFile(pSdb) != 0) {
goto WAL_RESTORE_OVER; goto WAL_RESTORE_OVER;
} }
}
if (walBeginSnapshot(pWal, sdbVer) < 0) { if (walBeginSnapshot(pWal, sdbVer) < 0) {
goto WAL_RESTORE_OVER; goto WAL_RESTORE_OVER;
} }
if (walEndSnapshot(pWal) < 0) {
goto WAL_RESTORE_OVER;
}
if (walEndSnapshot(pWal) < 0) {
goto WAL_RESTORE_OVER;
} }
code = 0; code = 0;
...@@ -181,4 +182,4 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { ...@@ -181,4 +182,4 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
bool mndIsMaster(SMnode *pMnode) { bool mndIsMaster(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
return pMgmt->state == TAOS_SYNC_STATE_LEADER; return pMgmt->state == TAOS_SYNC_STATE_LEADER;
} }
\ No newline at end of file
...@@ -8,7 +8,8 @@ target_include_directories( ...@@ -8,7 +8,8 @@ target_include_directories(
target_link_libraries( target_link_libraries(
planner planner
PRIVATE os util catalog cjson parser transport function qcom PRIVATE os util catalog cjson parser function qcom
PUBLIC transport
) )
if(${BUILD_TEST}) if(${BUILD_TEST})
......
...@@ -15,5 +15,5 @@ TARGET_INCLUDE_DIRECTORIES( ...@@ -15,5 +15,5 @@ TARGET_INCLUDE_DIRECTORIES(
TARGET_LINK_LIBRARIES( TARGET_LINK_LIBRARIES(
queryUtilTest queryUtilTest
PUBLIC os util gtest qcom common PUBLIC os util gtest qcom common transport
) )
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include <iostream> #include <iostream>
#include "tmsg.h" #include "tmsg.h"
#include "query.h" #include "query.h"
#include "trpc.h"
#pragma GCC diagnostic ignored "-Wwrite-strings" #pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function" #pragma GCC diagnostic ignored "-Wunused-function"
...@@ -80,4 +81,4 @@ TEST(testCase, error_in_async_test) { ...@@ -80,4 +81,4 @@ TEST(testCase, error_in_async_test) {
taosAsyncExec(testPrintError, p, &code); taosAsyncExec(testPrintError, p, &code);
usleep(1000); usleep(1000);
printf("Error code:%d after asynchronously exec function\n", code); printf("Error code:%d after asynchronously exec function\n", code);
} }
\ No newline at end of file
...@@ -131,6 +131,7 @@ int walMetaDeserialize(SWal* pWal, const char* bytes); ...@@ -131,6 +131,7 @@ int walMetaDeserialize(SWal* pWal, const char* bytes);
// seek section // seek section
int walChangeFile(SWal* pWal, int64_t ver); int walChangeFile(SWal* pWal, int64_t ver);
int walChangeFileToLast(SWal* pWal);
// seek section end // seek section end
int64_t walGetSeq(); int64_t walGetSeq();
......
...@@ -184,6 +184,7 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) { ...@@ -184,6 +184,7 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) {
} }
taosArraySetSize(pArray, sz); taosArraySetSize(pArray, sz);
pWal->fileInfoSet = pArray; pWal->fileInfoSet = pArray;
pWal->writeCur = sz - 1;
cJSON_Delete(pRoot); cJSON_Delete(pRoot);
return 0; return 0;
} }
......
...@@ -232,7 +232,8 @@ static int32_t walCreateThread() { ...@@ -232,7 +232,8 @@ static int32_t walCreateThread() {
if (pthread_create(&tsWal.thread, &thAttr, walThreadFunc, NULL) != 0) { if (pthread_create(&tsWal.thread, &thAttr, walThreadFunc, NULL) != 0) {
wError("failed to create wal thread since %s", strerror(errno)); wError("failed to create wal thread since %s", strerror(errno));
return TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
} }
pthread_attr_destroy(&thAttr); pthread_attr_destroy(&thAttr);
......
...@@ -244,6 +244,7 @@ int walRoll(SWal *pWal) { ...@@ -244,6 +244,7 @@ int walRoll(SWal *pWal) {
pWal->writeIdxTfd = idxTfd; pWal->writeIdxTfd = idxTfd;
pWal->writeLogTfd = logTfd; pWal->writeLogTfd = logTfd;
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1; pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
ASSERT(pWal->writeCur >= 0);
pWal->lastRollSeq = walGetSeq(); pWal->lastRollSeq = walGetSeq();
return 0; return 0;
...@@ -253,6 +254,7 @@ static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { ...@@ -253,6 +254,7 @@ static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
SWalIdxEntry entry = {.ver = ver, .offset = offset}; SWalIdxEntry entry = {.ver = ver, .offset = offset};
int size = tfWrite(pWal->writeIdxTfd, &entry, sizeof(SWalIdxEntry)); int size = tfWrite(pWal->writeIdxTfd, &entry, sizeof(SWalIdxEntry));
if (size != sizeof(SWalIdxEntry)) { if (size != sizeof(SWalIdxEntry)) {
terrno = TAOS_SYSTEM_ERROR(errno);
// TODO truncate // TODO truncate
return -1; return -1;
} }
...@@ -286,7 +288,13 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i ...@@ -286,7 +288,13 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
} }
/*if (!tfValid(pWal->writeLogTfd)) return -1;*/ /*if (!tfValid(pWal->writeLogTfd)) return -1;*/
ASSERT(pWal->writeCur >= 0);
pthread_mutex_lock(&pWal->mutex); pthread_mutex_lock(&pWal->mutex);
if (pWal->writeIdxTfd == -1 || pWal->writeLogTfd == -1) {
walChangeFileToLast(pWal);
}
pWal->writeHead.head.version = index; pWal->writeHead.head.version = index;
int64_t offset = walGetCurFileOffset(pWal); int64_t offset = walGetCurFileOffset(pWal);
...@@ -308,6 +316,7 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i ...@@ -308,6 +316,7 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(errno)); strerror(errno));
} }
code = walWriteIndex(pWal, index, offset); code = walWriteIndex(pWal, index, offset);
if (code != 0) { if (code != 0) {
// TODO // TODO
......
...@@ -812,7 +812,9 @@ FORCE_INLINE int32_t taosHashGetKey(void *data, void** key, size_t* keyLen) { ...@@ -812,7 +812,9 @@ FORCE_INLINE int32_t taosHashGetKey(void *data, void** key, size_t* keyLen) {
SHashNode * node = GET_HASH_PNODE(data); SHashNode * node = GET_HASH_PNODE(data);
*key = GET_HASH_NODE_KEY(node); *key = GET_HASH_NODE_KEY(node);
*keyLen = node->keyLen; if (keyLen) {
*keyLen = node->keyLen;
}
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册