提交 8b2d5a6f 编写于 作者: C Cary Xu

Merge branch '3.0' into feature/TD-11463-3.0

...@@ -212,6 +212,11 @@ if(${BUILD_WITH_UV}) ...@@ -212,6 +212,11 @@ if(${BUILD_WITH_UV})
MESSAGE("Windows need set no-sign-compare") MESSAGE("Windows need set no-sign-compare")
add_compile_options(-Wno-sign-compare) add_compile_options(-Wno-sign-compare)
endif () endif ()
if (${CMAKE_SYSTEM_NAME} MATCHES "Windows")
file(READ "libuv/include/uv.h" CONTENTS)
string(REGEX REPLACE "/([\r]*)\nstruct uv_tcp_s {" "/\\1\ntypedef BOOL (PASCAL *LPFN_CONNECTEX) (SOCKET s, const struct sockaddr* name, int namelen, PVOID lpSendBuffer, DWORD dwSendDataLength,LPDWORD lpdwBytesSent, LPOVERLAPPED lpOverlapped);\\1\nstruct uv_tcp_s {" CONTENTS_NEW "${CONTENTS}")
file(WRITE "libuv/include/uv.h" "${CONTENTS_NEW}")
endif ()
add_subdirectory(libuv) add_subdirectory(libuv)
endif(${BUILD_WITH_UV}) endif(${BUILD_WITH_UV})
...@@ -243,6 +248,10 @@ if(${BUILD_WITH_SQLITE}) ...@@ -243,6 +248,10 @@ if(${BUILD_WITH_SQLITE})
endif(${BUILD_WITH_SQLITE}) endif(${BUILD_WITH_SQLITE})
# pthread # pthread
if(${BUILD_PTHREAD})
ADD_DEFINITIONS("-DPTW32_STATIC_LIB")
add_subdirectory(pthread-win32)
endif(${BUILD_PTHREAD})
# ================================================================================================ # ================================================================================================
......
...@@ -31,27 +31,27 @@ typedef void TAOS_SUB; ...@@ -31,27 +31,27 @@ typedef void TAOS_SUB;
typedef void **TAOS_ROW; typedef void **TAOS_ROW;
// Data type definition // Data type definition
#define TSDB_DATA_TYPE_NULL 0 // 1 bytes #define TSDB_DATA_TYPE_NULL 0 // 1 bytes
#define TSDB_DATA_TYPE_BOOL 1 // 1 bytes #define TSDB_DATA_TYPE_BOOL 1 // 1 bytes
#define TSDB_DATA_TYPE_TINYINT 2 // 1 byte #define TSDB_DATA_TYPE_TINYINT 2 // 1 byte
#define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes #define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes
#define TSDB_DATA_TYPE_INT 4 // 4 bytes #define TSDB_DATA_TYPE_INT 4 // 4 bytes
#define TSDB_DATA_TYPE_BIGINT 5 // 8 bytes #define TSDB_DATA_TYPE_BIGINT 5 // 8 bytes
#define TSDB_DATA_TYPE_FLOAT 6 // 4 bytes #define TSDB_DATA_TYPE_FLOAT 6 // 4 bytes
#define TSDB_DATA_TYPE_DOUBLE 7 // 8 bytes #define TSDB_DATA_TYPE_DOUBLE 7 // 8 bytes
#define TSDB_DATA_TYPE_VARCHAR 8 // string, alias for varchar #define TSDB_DATA_TYPE_VARCHAR 8 // string, alias for varchar
#define TSDB_DATA_TYPE_TIMESTAMP 9 // 8 bytes #define TSDB_DATA_TYPE_TIMESTAMP 9 // 8 bytes
#define TSDB_DATA_TYPE_NCHAR 10 // unicode string #define TSDB_DATA_TYPE_NCHAR 10 // unicode string
#define TSDB_DATA_TYPE_UTINYINT 11 // 1 byte #define TSDB_DATA_TYPE_UTINYINT 11 // 1 byte
#define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes #define TSDB_DATA_TYPE_USMALLINT 12 // 2 bytes
#define TSDB_DATA_TYPE_UINT 13 // 4 bytes #define TSDB_DATA_TYPE_UINT 13 // 4 bytes
#define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes #define TSDB_DATA_TYPE_UBIGINT 14 // 8 bytes
#define TSDB_DATA_TYPE_JSON 15 // json string #define TSDB_DATA_TYPE_JSON 15 // json string
#define TSDB_DATA_TYPE_VARBINARY 16 // binary #define TSDB_DATA_TYPE_VARBINARY 16 // binary
#define TSDB_DATA_TYPE_DECIMAL 17 // decimal #define TSDB_DATA_TYPE_DECIMAL 17 // decimal
#define TSDB_DATA_TYPE_BLOB 18 // binary #define TSDB_DATA_TYPE_BLOB 18 // binary
#define TSDB_DATA_TYPE_MEDIUMBLOB 19 #define TSDB_DATA_TYPE_MEDIUMBLOB 19
#define TSDB_DATA_TYPE_BINARY TSDB_DATA_TYPE_VARCHAR // string #define TSDB_DATA_TYPE_BINARY TSDB_DATA_TYPE_VARCHAR // string
typedef enum { typedef enum {
TSDB_OPTION_LOCALE, TSDB_OPTION_LOCALE,
...@@ -257,9 +257,15 @@ DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_co ...@@ -257,9 +257,15 @@ DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_co
void tmqShowMsg(tmq_message_t *tmq_message); void tmqShowMsg(tmq_message_t *tmq_message);
int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message); int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message);
typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code); /* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */
DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT* stmt); DLL_EXPORT TAOS_ROW tmq_get_row(tmq_message_t *message);
DLL_EXPORT char *tmq_get_topic_name(tmq_message_t *message);
/* ---------------------- OTHER ---------------------------- */
typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code);
DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -62,18 +62,17 @@ typedef struct SDataBlockInfo { ...@@ -62,18 +62,17 @@ typedef struct SDataBlockInfo {
union {int64_t uid; int64_t blockId;}; union {int64_t uid; int64_t blockId;};
} SDataBlockInfo; } SDataBlockInfo;
typedef struct SConstantItem { //typedef struct SConstantItem {
SColumnInfo info; // SColumnInfo info;
int32_t startRow; // run-length-encoding to save the space for multiple rows // int32_t startRow; // run-length-encoding to save the space for multiple rows
int32_t endRow; // int32_t endRow;
SVariant value; // SVariant value;
} SConstantItem; //} SConstantItem;
// info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList); // info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList);
typedef struct SSDataBlock { typedef struct SSDataBlock {
SColumnDataAgg *pBlockAgg; SColumnDataAgg *pBlockAgg;
SArray *pDataBlock; // SArray<SColumnInfoData> SArray *pDataBlock; // SArray<SColumnInfoData>
SArray *pConstantList; // SArray<SConstantItem>, it is a constant/tags value of the corresponding result value.
SDataBlockInfo info; SDataBlockInfo info;
} SSDataBlock; } SSDataBlock;
...@@ -95,66 +94,15 @@ typedef struct SColumnInfoData { ...@@ -95,66 +94,15 @@ typedef struct SColumnInfoData {
}; };
} SColumnInfoData; } SColumnInfoData;
static FORCE_INLINE int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) { void* blockDataDestroy(SSDataBlock* pBlock);
int64_t tbUid = pBlock->info.uid; int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock);
int16_t numOfCols = pBlock->info.numOfCols; void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock);
int16_t hasVarCol = pBlock->info.hasVarCol;
int32_t rows = pBlock->info.rows;
int32_t sz = taosArrayGetSize(pBlock->pDataBlock);
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, tbUid);
tlen += taosEncodeFixedI16(buf, numOfCols);
tlen += taosEncodeFixedI16(buf, hasVarCol);
tlen += taosEncodeFixedI32(buf, rows);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SColumnInfoData* pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
tlen += taosEncodeFixedI16(buf, pColData->info.colId);
tlen += taosEncodeFixedI16(buf, pColData->info.type);
tlen += taosEncodeFixedI32(buf, pColData->info.bytes);
int32_t colSz = rows * pColData->info.bytes;
tlen += taosEncodeBinary(buf, pColData->pData, colSz);
}
return tlen;
}
static FORCE_INLINE void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) {
int32_t sz;
buf = taosDecodeFixedI64(buf, &pBlock->info.uid);
buf = taosDecodeFixedI16(buf, &pBlock->info.numOfCols);
buf = taosDecodeFixedI16(buf, &pBlock->info.hasVarCol);
buf = taosDecodeFixedI32(buf, &pBlock->info.rows);
buf = taosDecodeFixedI32(buf, &sz);
pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData));
for (int32_t i = 0; i < sz; i++) {
SColumnInfoData data = {0};
buf = taosDecodeFixedI16(buf, &data.info.colId);
buf = taosDecodeFixedI16(buf, &data.info.type);
buf = taosDecodeFixedI32(buf, &data.info.bytes);
int32_t colSz = pBlock->info.rows * data.info.bytes;
buf = taosDecodeBinary(buf, (void**)&data.pData, colSz);
taosArrayPush(pBlock->pDataBlock, &data);
}
return (void*)buf;
}
static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) { static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) {
if (pBlock == NULL) { if (pBlock == NULL) {
return; return;
} }
blockDataDestroy(pBlock);
// int32_t numOfOutput = pBlock->info.numOfCols;
int32_t sz = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < sz; ++i) {
SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
tfree(pColInfoData->pData);
}
taosArrayDestroy(pBlock->pDataBlock);
tfree(pBlock->pBlockAgg);
// tfree(pBlock);
} }
static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp) { static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp) {
......
...@@ -52,6 +52,21 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet); ...@@ -52,6 +52,21 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet);
BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \ BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \
} while (0) } while (0)
static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData, uint32_t row) {
if (!pColumnInfoData->hasNull) {
return false;
}
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
return pColumnInfoData->varmeta.offset[row] == -1;
} else {
if (pColumnInfoData->nullbitmap == NULL) {
return false;
}
return colDataIsNull_f(pColumnInfoData->nullbitmap, row);
}
}
static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, uint32_t totalRows, uint32_t row, static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, uint32_t totalRows, uint32_t row,
SColumnDataAgg* pColAgg) { SColumnDataAgg* pColAgg) {
if (!pColumnInfoData->hasNull) { if (!pColumnInfoData->hasNull) {
...@@ -79,10 +94,10 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u ...@@ -79,10 +94,10 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u
} }
} }
#define BitmapLen(_n) (((_n) + ((1<<NBIT)-1)) >> NBIT) #define BitmapLen(_n) (((_n) + ((1 << NBIT) - 1)) >> NBIT)
#define colDataGetData(p1_, r_) \ // SColumnInfoData, rowNumber
#define colDataGetData(p1_, r_) \
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? ((p1_)->pData + (p1_)->varmeta.offset[(r_)]) \ ((IS_VAR_DATA_TYPE((p1_)->info.type)) ? ((p1_)->pData + (p1_)->varmeta.offset[(r_)]) \
: ((p1_)->pData + ((r_) * (p1_)->info.bytes))) : ((p1_)->pData + ((r_) * (p1_)->info.bytes)))
...@@ -126,4 +141,4 @@ void* blockDataDestroy(SSDataBlock* pBlock); ...@@ -126,4 +141,4 @@ void* blockDataDestroy(SSDataBlock* pBlock);
} }
#endif #endif
#endif /*_TD_COMMON_EP_H_*/ #endif /*_TD_COMMON_EP_H_*/
...@@ -419,7 +419,7 @@ typedef struct { ...@@ -419,7 +419,7 @@ typedef struct {
}; };
} SColumnFilterList; } SColumnFilterList;
/* /*
* for client side struct, we only need the column id, type, bytes are not necessary * for client side struct, only column id, type, bytes are necessary
* But for data in vnode side, we need all the following information. * But for data in vnode side, we need all the following information.
*/ */
typedef struct { typedef struct {
...@@ -2158,13 +2158,36 @@ typedef struct { ...@@ -2158,13 +2158,36 @@ typedef struct {
SArray* topics; // SArray<SMqSubTopicEp> SArray* topics; // SArray<SMqSubTopicEp>
} SMqCMGetSubEpRsp; } SMqCMGetSubEpRsp;
struct tmq_message_t { typedef struct {
SMqRspHead head; SMqRspHead head;
union { union {
SMqPollRsp consumeRsp; SMqPollRsp consumeRsp;
SMqCMGetSubEpRsp getEpRsp; SMqCMGetSubEpRsp getEpRsp;
}; };
void* extra; void* extra;
} SMqMsgWrapper;
typedef struct {
int32_t curBlock;
int32_t curRow;
void** uData;
} SMqRowIter;
struct tmq_message_t_v1 {
SMqPollRsp rsp;
SMqRowIter iter;
};
struct tmq_message_t {
SMqRspHead head;
union {
SMqPollRsp consumeRsp;
SMqCMGetSubEpRsp getEpRsp;
};
void* extra;
int32_t curBlock;
int32_t curRow;
void** uData;
}; };
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); } static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); }
......
...@@ -189,6 +189,7 @@ enum { ...@@ -189,6 +189,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp) TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
......
...@@ -69,7 +69,7 @@ typedef struct SProjectLogicNode { ...@@ -69,7 +69,7 @@ typedef struct SProjectLogicNode {
} SProjectLogicNode; } SProjectLogicNode;
typedef struct SVnodeModifLogicNode { typedef struct SVnodeModifLogicNode {
SLogicNode node;; SLogicNode node;
int32_t msgType; int32_t msgType;
SArray* pDataBlocks; SArray* pDataBlocks;
SVgDataBlocks* pVgDataBlocks; SVgDataBlocks* pVgDataBlocks;
...@@ -124,7 +124,7 @@ typedef struct SSubLogicPlan { ...@@ -124,7 +124,7 @@ typedef struct SSubLogicPlan {
} SSubLogicPlan; } SSubLogicPlan;
typedef struct SQueryLogicPlan { typedef struct SQueryLogicPlan {
ENodeType type;; ENodeType type;
int32_t totalLevel; int32_t totalLevel;
SNodeList* pTopSubplans; SNodeList* pTopSubplans;
} SQueryLogicPlan; } SQueryLogicPlan;
...@@ -252,7 +252,7 @@ typedef struct SSubplan { ...@@ -252,7 +252,7 @@ typedef struct SSubplan {
} SSubplan; } SSubplan;
typedef struct SQueryPlan { typedef struct SQueryPlan {
ENodeType type;; ENodeType type;
uint64_t queryId; uint64_t queryId;
int32_t numOfSubplans; int32_t numOfSubplans;
SNodeList* pSubplans; // Element is SNodeListNode. The execution level of subplan, starting from 0. SNodeList* pSubplans; // Element is SNodeListNode. The execution level of subplan, starting from 0.
......
...@@ -25,10 +25,11 @@ extern "C" { ...@@ -25,10 +25,11 @@ extern "C" {
#include <pthread.h> #include <pthread.h>
#include <semaphore.h> #include <semaphore.h>
#include <regex.h>
#if !defined(WINDOWS) #if !defined(WINDOWS)
#include <unistd.h> #include <unistd.h>
#include <dirent.h> #include <dirent.h>
#include <regex.h>
#include <sched.h> #include <sched.h>
#include <wordexp.h> #include <wordexp.h>
#include <libgen.h> #include <libgen.h>
...@@ -36,6 +37,12 @@ extern "C" { ...@@ -36,6 +37,12 @@ extern "C" {
#include <sys/utsname.h> #include <sys/utsname.h>
#include <sys/param.h> #include <sys/param.h>
#include <sys/mman.h> #include <sys/mman.h>
#include <sys/ioctl.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <termios.h>
#include <sys/statvfs.h>
#if defined(DARWIN) #if defined(DARWIN)
#else #else
...@@ -61,12 +68,7 @@ extern "C" { ...@@ -61,12 +68,7 @@ extern "C" {
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <sys/ioctl.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <wchar.h> #include <wchar.h>
#include <termios.h>
#include <wctype.h> #include <wctype.h>
......
...@@ -56,6 +56,7 @@ extern "C" { ...@@ -56,6 +56,7 @@ extern "C" {
// specific // specific
typedef int (*__compar_fn_t)(const void *, const void *); typedef int (*__compar_fn_t)(const void *, const void *);
#define ssize_t int #define ssize_t int
#define _SSIZE_T_
#define bzero(ptr, size) memset((ptr), 0, (size)) #define bzero(ptr, size) memset((ptr), 0, (size))
#define strcasecmp _stricmp #define strcasecmp _stricmp
#define strncasecmp _strnicmp #define strncasecmp _strnicmp
......
...@@ -24,7 +24,7 @@ extern "C" { ...@@ -24,7 +24,7 @@ extern "C" {
#if defined(WINDOWS) #if defined(WINDOWS)
typedef int32_t FileFd; typedef int32_t FileFd;
typedef SOCKET SocketFd; typedef int32_t SocketFd;
#else #else
typedef int32_t FileFd; typedef int32_t FileFd;
typedef int32_t SocketFd; typedef int32_t SocketFd;
......
...@@ -25,6 +25,8 @@ ...@@ -25,6 +25,8 @@
#define epoll_create EPOLL_CREATE_FUNC_TAOS_FORBID #define epoll_create EPOLL_CREATE_FUNC_TAOS_FORBID
#define epoll_ctl EPOLL_CTL_FUNC_TAOS_FORBID #define epoll_ctl EPOLL_CTL_FUNC_TAOS_FORBID
#define epoll_wait EPOLL_WAIT_FUNC_TAOS_FORBID #define epoll_wait EPOLL_WAIT_FUNC_TAOS_FORBID
#define inet_addr INET_ADDR_FUNC_TAOS_FORBID
#define inet_ntoa INET_NTOA_FUNC_TAOS_FORBID
#endif #endif
#if defined(WINDOWS) #if defined(WINDOWS)
......
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
#ifndef _TD_OS_SYSINFO_H_ #ifndef _TD_OS_SYSINFO_H_
#define _TD_OS_SYSINFO_H_ #define _TD_OS_SYSINFO_H_
#include <sys/statvfs.h>
#include "os.h" #include "os.h"
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -75,7 +75,7 @@ typedef struct { ...@@ -75,7 +75,7 @@ typedef struct {
#define TD_CODER_CURRENT(CODER) ((CODER)->data + (CODER)->pos) #define TD_CODER_CURRENT(CODER) ((CODER)->data + (CODER)->pos)
#define TD_CODER_MOVE_POS(CODER, MOVE) ((CODER)->pos += (MOVE)) #define TD_CODER_MOVE_POS(CODER, MOVE) ((CODER)->pos += (MOVE))
#define TD_CODER_CHECK_CAPACITY_FAILED(CODER, EXPSIZE) (((CODER)->size - (CODER)->pos) < (EXPSIZE)) #define TD_CODER_CHECK_CAPACITY_FAILED(CODER, EXPSIZE) (((CODER)->size - (CODER)->pos) < (EXPSIZE))
#define TCODER_MALLOC(SIZE, CODER) TFL_MALLOC(SIZE, &((CODER)->fl)) #define TCODER_MALLOC(PTR, TYPE, SIZE, CODER) TFL_MALLOC(PTR, TYPE, SIZE, &((CODER)->fl))
void tCoderInit(SCoder* pCoder, td_endian_t endian, uint8_t* data, int32_t size, td_coder_t type); void tCoderInit(SCoder* pCoder, td_endian_t endian, uint8_t* data, int32_t size, td_coder_t type);
void tCoderClear(SCoder* pCoder); void tCoderClear(SCoder* pCoder);
......
...@@ -29,15 +29,17 @@ struct SFreeListNode { ...@@ -29,15 +29,17 @@ struct SFreeListNode {
typedef TD_SLIST(SFreeListNode) SFreeList; typedef TD_SLIST(SFreeListNode) SFreeList;
#define TFL_MALLOC(SIZE, LIST) \ #define TFL_MALLOC(PTR, TYPE, SIZE, LIST) \
({ \ do { \
void *ptr = malloc((SIZE) + sizeof(struct SFreeListNode)); \ void *ptr = malloc((SIZE) + sizeof(struct SFreeListNode)); \
if (ptr) { \ if (ptr) { \
TD_SLIST_PUSH((LIST), (struct SFreeListNode *)ptr); \ TD_SLIST_PUSH((LIST), (struct SFreeListNode *)ptr); \
ptr = ((struct SFreeListNode *)ptr)->payload; \ ptr = ((struct SFreeListNode *)ptr)->payload; \
(PTR) = (TYPE)(ptr); \
}else{ \
(PTR) = NULL; \
} \ } \
ptr; \ }while(0);
})
#define tFreeListInit(pFL) TD_SLIST_INIT(pFL) #define tFreeListInit(pFL) TD_SLIST_INIT(pFL)
......
...@@ -82,7 +82,7 @@ typedef uint16_t VarDataLenT; // maxVarDataLen: 32767 ...@@ -82,7 +82,7 @@ typedef uint16_t VarDataLenT; // maxVarDataLen: 32767
#define VARSTR_HEADER_SIZE sizeof(VarDataLenT) #define VARSTR_HEADER_SIZE sizeof(VarDataLenT)
#define varDataLen(v) ((VarDataLenT *)(v))[0] #define varDataLen(v) ((VarDataLenT *)(v))[0]
#define varDataVal(v) ((void *)((char *)v + VARSTR_HEADER_SIZE)) #define varDataVal(v) ((char *)(v) + VARSTR_HEADER_SIZE)
typedef int32_t VarDataOffsetT; typedef int32_t VarDataOffsetT;
......
...@@ -700,6 +700,10 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -700,6 +700,10 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
} }
memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqPollRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->consumeRsp); tDecodeSMqPollRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->consumeRsp);
pRsp->curBlock = 0;
pRsp->curRow = 0;
// TODO: alloc mem
/*pRsp->*/
/*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
if (pRsp->consumeRsp.numOfTopics == 0) { if (pRsp->consumeRsp.numOfTopics == 0) {
/*printf("no data\n");*/ /*printf("no data\n");*/
...@@ -758,9 +762,9 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -758,9 +762,9 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
goto END; goto END;
} }
// tmq's epoch is monotomically increase, // tmq's epoch is monotonically increase,
// so it's safe to discard any old epoch msg. // so it's safe to discard any old epoch msg.
// epoch will only increase when received newer epoch ep msg // Epoch will only increase when received newer epoch ep msg
SMqRspHead* head = pMsg->pData; SMqRspHead* head = pMsg->pData;
int32_t epoch = atomic_load_32(&tmq->epoch); int32_t epoch = atomic_load_32(&tmq->epoch);
if (head->epoch <= epoch) { if (head->epoch <= epoch) {
...@@ -1282,6 +1286,34 @@ const char* tmq_err2str(tmq_resp_err_t err) { ...@@ -1282,6 +1286,34 @@ const char* tmq_err2str(tmq_resp_err_t err) {
return "fail"; return "fail";
} }
TAOS_ROW tmq_get_row(tmq_message_t* message) {
SMqPollRsp* rsp = &message->consumeRsp;
while (1) {
if (message->curBlock < taosArrayGetSize(rsp->pBlockData)) {
SSDataBlock* pBlock = taosArrayGet(rsp->pBlockData, message->curBlock);
if (message->curRow < pBlock->info.rows) {
for (int i = 0; i < pBlock->info.numOfCols; i++) {
SColumnInfoData* pData = taosArrayGet(pBlock->pDataBlock, i);
if (colDataIsNull_s(pData, message->curRow))
message->uData[i] = NULL;
else {
message->uData[i] = colDataGetData(pData, message->curRow);
}
}
message->curRow++;
return message->uData;
} else {
message->curBlock++;
message->curRow = 0;
continue;
}
}
return NULL;
}
}
char* tmq_get_topic_name(tmq_message_t* message) { return "not implemented yet"; }
#if 0 #if 0
tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) { tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) {
tmq_t* pTmq = malloc(sizeof(tmq_t)); tmq_t* pTmq = malloc(sizeof(tmq_t));
......
...@@ -240,10 +240,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, co ...@@ -240,10 +240,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, co
} }
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) { size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) {
ASSERT(pBlock); ASSERT(pBlock && pBlock->info.numOfCols == taosArrayGetSize(pBlock->pDataBlock));
size_t constantCols = (pBlock->pConstantList != NULL)? taosArrayGetSize(pBlock->pConstantList):0;
ASSERT( pBlock->info.numOfCols == taosArrayGetSize(pBlock->pDataBlock) + constantCols);
return pBlock->info.numOfCols; return pBlock->info.numOfCols;
} }
...@@ -1166,3 +1163,67 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) { ...@@ -1166,3 +1163,67 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) {
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) { size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) {
return pageSize / (blockDataGetSerialRowSize(pBlock) + blockDataGetSerialMetaSize(pBlock)); return pageSize / (blockDataGetSerialRowSize(pBlock) + blockDataGetSerialMetaSize(pBlock));
} }
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
int64_t tbUid = pBlock->info.uid;
int16_t numOfCols = pBlock->info.numOfCols;
int16_t hasVarCol = pBlock->info.hasVarCol;
int32_t rows = pBlock->info.rows;
int32_t sz = taosArrayGetSize(pBlock->pDataBlock);
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, tbUid);
tlen += taosEncodeFixedI16(buf, numOfCols);
tlen += taosEncodeFixedI16(buf, hasVarCol);
tlen += taosEncodeFixedI32(buf, rows);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SColumnInfoData* pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
tlen += taosEncodeFixedI16(buf, pColData->info.colId);
tlen += taosEncodeFixedI16(buf, pColData->info.type);
tlen += taosEncodeFixedI32(buf, pColData->info.bytes);
if (IS_VAR_DATA_TYPE(pColData->info.type)) {
tlen += taosEncodeBinary(buf, pColData->varmeta.offset, sizeof(int32_t) * rows);
} else {
tlen += taosEncodeBinary(buf, pColData->nullbitmap, BitmapLen(rows));
}
int32_t len = colDataGetLength(pColData, rows);
taosEncodeFixedI32(buf, len);
tlen += taosEncodeBinary(buf, pColData->pData, len);
}
return tlen;
}
void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) {
int32_t sz;
buf = taosDecodeFixedI64(buf, &pBlock->info.uid);
buf = taosDecodeFixedI16(buf, &pBlock->info.numOfCols);
buf = taosDecodeFixedI16(buf, &pBlock->info.hasVarCol);
buf = taosDecodeFixedI32(buf, &pBlock->info.rows);
buf = taosDecodeFixedI32(buf, &sz);
pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData));
for (int32_t i = 0; i < sz; i++) {
SColumnInfoData data = {0};
buf = taosDecodeFixedI16(buf, &data.info.colId);
buf = taosDecodeFixedI16(buf, &data.info.type);
buf = taosDecodeFixedI32(buf, &data.info.bytes);
if (IS_VAR_DATA_TYPE(data.info.type)) {
buf = taosDecodeBinary(buf, (void**)&data.varmeta.offset, pBlock->info.rows * sizeof(int32_t));
data.varmeta.length = pBlock->info.rows * sizeof(int32_t);
data.varmeta.allocLen = data.varmeta.length;
} else {
buf = taosDecodeBinary(buf, (void**)&data.nullbitmap, BitmapLen(pBlock->info.rows));
}
int32_t len = 0;
buf = taosDecodeFixedI32(buf, &len);
buf = taosDecodeBinary(buf, (void**)&data.pData, len);
taosArrayPush(pBlock->pDataBlock, &data);
}
return (void*)buf;
}
\ No newline at end of file
...@@ -2467,7 +2467,7 @@ int32_t tEncodeSMqCMCommitOffsetReq(SCoder *encoder, const SMqCMCommitOffsetReq ...@@ -2467,7 +2467,7 @@ int32_t tEncodeSMqCMCommitOffsetReq(SCoder *encoder, const SMqCMCommitOffsetReq
int32_t tDecodeSMqCMCommitOffsetReq(SCoder *decoder, SMqCMCommitOffsetReq *pReq) { int32_t tDecodeSMqCMCommitOffsetReq(SCoder *decoder, SMqCMCommitOffsetReq *pReq) {
if (tStartDecode(decoder) < 0) return -1; if (tStartDecode(decoder) < 0) return -1;
if (tDecodeI32(decoder, &pReq->num) < 0) return -1; if (tDecodeI32(decoder, &pReq->num) < 0) return -1;
pReq->offsets = TCODER_MALLOC(pReq->num * sizeof(SMqOffset), decoder); TCODER_MALLOC(pReq->offsets, SMqOffset*, pReq->num * sizeof(SMqOffset), decoder);
if (pReq->offsets == NULL) return -1; if (pReq->offsets == NULL) return -1;
for (int32_t i = 0; i < pReq->num; i++) { for (int32_t i = 0; i < pReq->num; i++) {
tDecodeSMqOffset(decoder, &pReq->offsets[i]); tDecodeSMqOffset(decoder, &pReq->offsets[i]);
......
...@@ -217,7 +217,7 @@ int32_t tNameSetDbName(SName* dst, int32_t acct, const char* dbName, size_t name ...@@ -217,7 +217,7 @@ int32_t tNameSetDbName(SName* dst, int32_t acct, const char* dbName, size_t name
} }
int32_t tNameSetAcctId(SName* dst, int32_t acctId) { int32_t tNameSetAcctId(SName* dst, int32_t acctId) {
assert(dst != NULL && acct != NULL); assert(dst != NULL);
dst->acctId = acctId; dst->acctId = acctId;
return 0; return 0;
} }
......
...@@ -27,6 +27,22 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { ...@@ -27,6 +27,22 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1;
// TODO encode tasks
if (pObj->tasks) {
int32_t sz = taosArrayGetSize(pObj->tasks);
tEncodeI32(pEncoder, sz);
for (int32_t i = 0; i < sz; i++) {
SArray *pArray = taosArrayGet(pObj->tasks, i);
int32_t innerSz = taosArrayGetSize(pArray);
tEncodeI32(pEncoder, innerSz);
for (int32_t j = 0; j < innerSz; j++) {
SStreamTask *pTask = taosArrayGet(pArray, j);
tEncodeSStreamTask(pEncoder, pTask);
}
}
} else {
tEncodeI32(pEncoder, 0);
}
return pEncoder->pos; return pEncoder->pos;
} }
...@@ -42,5 +58,23 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) { ...@@ -42,5 +58,23 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1;
int32_t sz;
if (tDecodeI32(pDecoder, &sz) < 0) return -1;
if (sz != 0) {
pObj->tasks = taosArrayInit(sz, sizeof(SArray));
for (int32_t i = 0; i < sz; i++) {
int32_t innerSz;
if (tDecodeI32(pDecoder, &innerSz) < 0) return -1;
SArray *pArray = taosArrayInit(innerSz, sizeof(SStreamTask));
for (int32_t j = 0; j < innerSz; j++) {
SStreamTask task;
if (tDecodeSStreamTask(pDecoder, &task) < 0) return -1;
taosArrayPush(pArray, &task);
}
taosArrayPush(pObj->tasks, pArray);
}
} else {
pObj->tasks = NULL;
}
return 0; return 0;
} }
...@@ -77,6 +77,32 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -77,6 +77,32 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
return -1; return -1;
} }
taosArrayPush(taskOneLevel, pTask); taosArrayPush(taskOneLevel, pTask);
SCoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
tEncodeSStreamTask(&encoder, pTask);
int32_t tlen = sizeof(SMsgHead) + encoder.pos;
tCoderClear(&encoder);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
((SMsgHead*)buf)->streamTaskId = pTask->taskId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER);
tEncodeSStreamTask(&encoder, pTask);
tCoderClear(&encoder);
STransAction action = {0};
action.epSet = plan->execNode.epSet;
action.pCont = buf;
action.contLen = tlen;
action.msgType = TDMT_VND_TASK_DEPLOY;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
rpcFreeCont(buf);
return -1;
}
} }
} else if (plan->subplanType == SUBPLAN_TYPE_SCAN) { } else if (plan->subplanType == SUBPLAN_TYPE_SCAN) {
// duplicatable // duplicatable
...@@ -101,6 +127,32 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -101,6 +127,32 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
} }
taosArrayPush(taskOneLevel, pTask); taosArrayPush(taskOneLevel, pTask);
SCoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
tEncodeSStreamTask(&encoder, pTask);
int32_t tlen = sizeof(SMsgHead) + encoder.pos;
tCoderClear(&encoder);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
((SMsgHead*)buf)->streamTaskId = pTask->taskId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER);
tEncodeSStreamTask(&encoder, pTask);
tCoderClear(&encoder);
STransAction action = {0};
action.epSet = plan->execNode.epSet;
action.pCont = buf;
action.contLen = tlen;
action.msgType = TDMT_SND_TASK_DEPLOY;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
rpcFreeCont(buf);
return -1;
}
} }
} else { } else {
// not duplicatable // not duplicatable
...@@ -117,6 +169,32 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -117,6 +169,32 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
return -1; return -1;
} }
taosArrayPush(taskOneLevel, pTask); taosArrayPush(taskOneLevel, pTask);
SCoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
tEncodeSStreamTask(&encoder, pTask);
int32_t tlen = sizeof(SMsgHead) + encoder.pos;
tCoderClear(&encoder);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
((SMsgHead*)buf)->streamTaskId = pTask->taskId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER);
tEncodeSStreamTask(&encoder, pTask);
tCoderClear(&encoder);
STransAction action = {0};
action.epSet = plan->execNode.epSet;
action.pCont = buf;
action.contLen = tlen;
action.msgType = TDMT_SND_TASK_DEPLOY;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
rpcFreeCont(buf);
return -1;
}
} }
taosArrayPush(pStream->tasks, taskOneLevel); taosArrayPush(pStream->tasks, taskOneLevel);
} }
......
...@@ -230,19 +230,19 @@ static int32_t mndCreateStream(SMnode *pMnode, SMnodeMsg *pReq, SCMCreateStreamR ...@@ -230,19 +230,19 @@ static int32_t mndCreateStream(SMnode *pMnode, SMnodeMsg *pReq, SCMCreateStreamR
} }
mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name); mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name);
SSdbRaw *pRedoRaw = mndStreamActionEncode(&streamObj); if (mndScheduleStream(pMnode, pTrans, &streamObj) < 0) {
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { mError("stream:%ld, schedule stream since %s", streamObj.uid, terrstr());
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
} }
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
if (mndScheduleStream(pMnode, pTrans, &streamObj) < 0) { SSdbRaw *pRedoRaw = mndStreamActionEncode(&streamObj);
mError("stream:%ld, schedule stream since %s", streamObj.uid, terrstr()); if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
} }
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
......
...@@ -55,6 +55,7 @@ int tqCommit(STQ*); ...@@ -55,6 +55,7 @@ int tqCommit(STQ*);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessSetConnReq(STQ* pTq, char* msg); int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
int32_t tqProcessRebReq(STQ* pTq, char* msg); int32_t tqProcessRebReq(STQ* pTq, char* msg);
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -161,6 +161,7 @@ struct STQ { ...@@ -161,6 +161,7 @@ struct STQ {
STqMemRef tqMemRef; STqMemRef tqMemRef;
STqMetaStore* tqMeta; STqMetaStore* tqMeta;
STqPushMgr* tqPushMgr; STqPushMgr* tqPushMgr;
SHashObj* pStreamTasks;
SWal* pWal; SWal* pWal;
SMeta* pVnodeMeta; SMeta* pVnodeMeta;
}; };
......
...@@ -55,6 +55,8 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pVnodeMeta, STqCfg* tqConfig, S ...@@ -55,6 +55,8 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pVnodeMeta, STqCfg* tqConfig, S
return NULL; return NULL;
} }
pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
return pTq; return pTq;
} }
...@@ -416,3 +418,18 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { ...@@ -416,3 +418,18 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
return 0; return 0;
} }
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
SStreamTask* pTask = malloc(sizeof(SStreamTask));
if (pTask == NULL) {
return -1;
}
SCoder decoder;
tCoderInit(&decoder, TD_LITTLE_ENDIAN, (uint8_t*)msg, msgLen, TD_DECODER);
tDecodeSStreamTask(&decoder, pTask);
tCoderClear(&decoder);
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask));
return 0;
}
...@@ -41,7 +41,7 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { ...@@ -41,7 +41,7 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
return 0; return 0;
} }
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
void *ptr = NULL; void *ptr = NULL;
if (pVnode->config.streamMode == 0) { if (pVnode->config.streamMode == 0) {
...@@ -63,7 +63,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -63,7 +63,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_VND_CREATE_STB: { case TDMT_VND_CREATE_STB: {
SVCreateTbReq vCreateTbReq = {0}; SVCreateTbReq vCreateTbReq = {0};
tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq); tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq);
if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) { if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) {
// TODO: handle error // TODO: handle error
...@@ -100,7 +100,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -100,7 +100,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
break; break;
} }
case TDMT_VND_ALTER_STB: { case TDMT_VND_ALTER_STB: {
SVCreateTbReq vAlterTbReq = {0}; SVCreateTbReq vAlterTbReq = {0};
vTrace("vgId:%d, process alter stb req", pVnode->vgId); vTrace("vgId:%d, process alter stb req", pVnode->vgId);
tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vAlterTbReq); tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vAlterTbReq);
free(vAlterTbReq.stbCfg.pSchema); free(vAlterTbReq.stbCfg.pSchema);
...@@ -132,6 +132,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -132,6 +132,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
if (tqProcessRebReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { if (tqProcessRebReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
} }
} break; } break;
case TDMT_VND_TASK_DEPLOY: {
if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
}
} break;
case TDMT_VND_CREATE_SMA: { // timeRangeSMA case TDMT_VND_CREATE_SMA: { // timeRangeSMA
SSmaCfg vCreateSmaReq = {0}; SSmaCfg vCreateSmaReq = {0};
if (tDeserializeSVCreateTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateSmaReq) == NULL) { if (tDeserializeSVCreateTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateSmaReq) == NULL) {
......
...@@ -89,7 +89,7 @@ static SSubLogicPlan* stsCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode* ...@@ -89,7 +89,7 @@ static SSubLogicPlan* stsCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode*
pSubplan->id.groupId = pCxt->groupId; pSubplan->id.groupId = pCxt->groupId;
pSubplan->subplanType = SUBPLAN_TYPE_SCAN; pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
pSubplan->pNode = (SLogicNode*)nodesCloneNode(pScan); pSubplan->pNode = (SLogicNode*)nodesCloneNode(pScan);
TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pSubplan->pNode)->pVgroupList, SVgroupsInfo); TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pSubplan->pNode)->pVgroupList, SVgroupsInfo*);
SPLIT_FLAG_SET_MASK(pSubplan->splitFlag, SPLIT_FLAG_STS); SPLIT_FLAG_SET_MASK(pSubplan->splitFlag, SPLIT_FLAG_STS);
return pSubplan; return pSubplan;
} }
......
...@@ -163,15 +163,15 @@ typedef struct SyncClientRequest { ...@@ -163,15 +163,15 @@ typedef struct SyncClientRequest {
} SyncClientRequest; } SyncClientRequest;
SyncClientRequest* syncClientRequestBuild(uint32_t dataLen); SyncClientRequest* syncClientRequestBuild(uint32_t dataLen);
SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak); SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak); // step 1
void syncClientRequestDestroy(SyncClientRequest* pMsg); void syncClientRequestDestroy(SyncClientRequest* pMsg);
void syncClientRequestSerialize(const SyncClientRequest* pMsg, char* buf, uint32_t bufLen); void syncClientRequestSerialize(const SyncClientRequest* pMsg, char* buf, uint32_t bufLen);
void syncClientRequestDeserialize(const char* buf, uint32_t len, SyncClientRequest* pMsg); void syncClientRequestDeserialize(const char* buf, uint32_t len, SyncClientRequest* pMsg);
char* syncClientRequestSerialize2(const SyncClientRequest* pMsg, uint32_t* len); char* syncClientRequestSerialize2(const SyncClientRequest* pMsg, uint32_t* len);
SyncClientRequest* syncClientRequestDeserialize2(const char* buf, uint32_t len); SyncClientRequest* syncClientRequestDeserialize2(const char* buf, uint32_t len);
void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg); void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg); // step 2
void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg); void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg);
SyncClientRequest* syncClientRequestFromRpcMsg2(const SRpcMsg* pRpcMsg); SyncClientRequest* syncClientRequestFromRpcMsg2(const SRpcMsg* pRpcMsg); // step 3
cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg); cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg);
char* syncClientRequest2Str(const SyncClientRequest* pMsg); char* syncClientRequest2Str(const SyncClientRequest* pMsg);
......
...@@ -40,12 +40,13 @@ typedef struct SSyncRaftEntry { ...@@ -40,12 +40,13 @@ typedef struct SSyncRaftEntry {
} SSyncRaftEntry; } SSyncRaftEntry;
SSyncRaftEntry* syncEntryBuild(uint32_t dataLen); SSyncRaftEntry* syncEntryBuild(uint32_t dataLen);
SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index); SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index); // step 4
void syncEntryDestory(SSyncRaftEntry* pEntry); void syncEntryDestory(SSyncRaftEntry* pEntry);
char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len); char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len); // step 5
SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len); SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len); // step 6
cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry); cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry);
char* syncEntry2Str(const SSyncRaftEntry* pEntry); char* syncEntry2Str(const SSyncRaftEntry* pEntry);
void syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg); // step 7
// for debug ---------------------- // for debug ----------------------
void syncEntryPrint(const SSyncRaftEntry* pObj); void syncEntryPrint(const SSyncRaftEntry* pObj);
......
...@@ -101,7 +101,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -101,7 +101,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
assert(pMsg->dataLen >= 0); assert(pMsg->dataLen >= 0);
SyncTerm localPreLogTerm = 0; SyncTerm localPreLogTerm = 0;
if (pMsg->prevLogTerm >= SYNC_INDEX_BEGIN && pMsg->prevLogTerm <= ths->pLogStore->getLastIndex(ths->pLogStore)) { if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
SSyncRaftEntry* pEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogIndex); SSyncRaftEntry* pEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogIndex);
assert(pEntry != NULL); assert(pEntry != NULL);
localPreLogTerm = pEntry->term; localPreLogTerm = pEntry->term;
...@@ -174,7 +174,12 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -174,7 +174,12 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
pReply->destId = pMsg->srcId; pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm; pReply->term = ths->pRaftStore->currentTerm;
pReply->success = true; pReply->success = true;
pReply->matchIndex = pMsg->prevLogIndex + 1;
if (pMsg->dataLen > 0) {
pReply->matchIndex = pMsg->prevLogIndex + 1;
} else {
pReply->matchIndex = pMsg->prevLogIndex;
}
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
......
...@@ -39,7 +39,8 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p ...@@ -39,7 +39,8 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
syncAppendEntriesReplyLog2("==syncNodeOnAppendEntriesReplyCb==", pMsg); syncAppendEntriesReplyLog2("==syncNodeOnAppendEntriesReplyCb==", pMsg);
if (pMsg->term < ths->pRaftStore->currentTerm) { if (pMsg->term < ths->pRaftStore->currentTerm) {
sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term, ths->pRaftStore->currentTerm); sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term,
ths->pRaftStore->currentTerm);
return ret; return ret;
} }
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include "syncIndexMgr.h" #include "syncIndexMgr.h"
#include "syncInt.h" #include "syncInt.h"
#include "syncRaftLog.h"
// \* Leader i advances its commitIndex. // \* Leader i advances its commitIndex.
// \* This is done as a separate step from handling AppendEntries responses, // \* This is done as a separate step from handling AppendEntries responses,
...@@ -42,4 +43,25 @@ ...@@ -42,4 +43,25 @@
void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pNextIndex", pSyncNode->pNextIndex); syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pNextIndex", pSyncNode->pNextIndex);
syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pMatchIndex", pSyncNode->pMatchIndex); syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pMatchIndex", pSyncNode->pMatchIndex);
// update commit index
if (pSyncNode->pFsm != NULL) {
SyncIndex beginIndex = SYNC_INDEX_INVALID;
SyncIndex endIndex = SYNC_INDEX_INVALID;
for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, i);
assert(pEntry != NULL);
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (pSyncNode->pFsm->FpCommitCb != NULL) {
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0);
}
rpcFreeCont(rpcMsg.pCont);
syncEntryDestory(pEntry);
}
}
} }
\ No newline at end of file
...@@ -54,20 +54,22 @@ static void syncEnvTick(void *param, void *tmrId) { ...@@ -54,20 +54,22 @@ static void syncEnvTick(void *param, void *tmrId) {
SSyncEnv *pSyncEnv = (SSyncEnv *)param; SSyncEnv *pSyncEnv = (SSyncEnv *)param;
if (atomic_load_64(&pSyncEnv->envTickTimerLogicClockUser) <= atomic_load_64(&pSyncEnv->envTickTimerLogicClock)) { if (atomic_load_64(&pSyncEnv->envTickTimerLogicClockUser) <= atomic_load_64(&pSyncEnv->envTickTimerLogicClock)) {
++(pSyncEnv->envTickTimerCounter); ++(pSyncEnv->envTickTimerCounter);
sTrace( sTrace("syncEnvTick do ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64
"syncEnvTick do ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64 ", envTickTimerCounter:%" PRIu64 ", " ", envTickTimerCounter:%" PRIu64
"envTickTimerMS:%d, tmrId:%p", ", "
pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter, "envTickTimerMS:%d, tmrId:%p",
pSyncEnv->envTickTimerMS, tmrId); pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter,
pSyncEnv->envTickTimerMS, tmrId);
// do something, tick ... // do something, tick ...
taosTmrReset(syncEnvTick, pSyncEnv->envTickTimerMS, pSyncEnv, pSyncEnv->pTimerManager, &pSyncEnv->pEnvTickTimer); taosTmrReset(syncEnvTick, pSyncEnv->envTickTimerMS, pSyncEnv, pSyncEnv->pTimerManager, &pSyncEnv->pEnvTickTimer);
} else { } else {
sTrace( sTrace("syncEnvTick pass ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64
"syncEnvTick pass ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64 ", envTickTimerCounter:%" PRIu64 ", " ", envTickTimerCounter:%" PRIu64
"envTickTimerMS:%d, tmrId:%p", ", "
pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter, "envTickTimerMS:%d, tmrId:%p",
pSyncEnv->envTickTimerMS, tmrId); pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter,
pSyncEnv->envTickTimerMS, tmrId);
} }
} }
......
...@@ -29,7 +29,7 @@ static int32_t syncIODestroy(SSyncIO *io); ...@@ -29,7 +29,7 @@ static int32_t syncIODestroy(SSyncIO *io);
static int32_t syncIOStartInternal(SSyncIO *io); static int32_t syncIOStartInternal(SSyncIO *io);
static int32_t syncIOStopInternal(SSyncIO *io); static int32_t syncIOStopInternal(SSyncIO *io);
static void *syncIOConsumerFunc(void *param); static void * syncIOConsumerFunc(void *param);
static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey); static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey);
...@@ -234,9 +234,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) { ...@@ -234,9 +234,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) {
} }
static void *syncIOConsumerFunc(void *param) { static void *syncIOConsumerFunc(void *param) {
SSyncIO *io = param; SSyncIO * io = param;
STaosQall *qall; STaosQall *qall;
SRpcMsg *pRpcMsg, rpcMsg; SRpcMsg * pRpcMsg, rpcMsg;
qall = taosAllocateQall(); qall = taosAllocateQall();
while (1) { while (1) {
......
...@@ -731,14 +731,44 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg ...@@ -731,14 +731,44 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg
int32_t ret = 0; int32_t ret = 0;
syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg); syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg);
SyncIndex index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1;
SyncTerm term = ths->pRaftStore->currentTerm;
SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index);
assert(pEntry != NULL);
if (ths->state == TAOS_SYNC_STATE_LEADER) { if (ths->state == TAOS_SYNC_STATE_LEADER) {
SSyncRaftEntry* pEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
ths->pLogStore->appendEntry(ths->pLogStore, pEntry); ths->pLogStore->appendEntry(ths->pLogStore, pEntry);
// only myself, maybe commit
syncNodeMaybeAdvanceCommitIndex(ths);
// start replicate right now!
syncNodeReplicate(ths); syncNodeReplicate(ths);
syncEntryDestory(pEntry);
// pre commit
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL) {
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0);
}
}
rpcFreeCont(rpcMsg.pCont);
} else { } else {
// ths->pFsm->FpCommitCb(-1) // pre commit
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL) {
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, -1);
}
}
rpcFreeCont(rpcMsg.pCont);
} }
syncEntryDestory(pEntry);
return ret; return ret;
} }
...@@ -76,7 +76,7 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) { ...@@ -76,7 +76,7 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) {
free(s); free(s);
} else { } else {
pRoot = syncRpcUnknownMsg2Json(); pRoot = cJSON_CreateObject();
char* s; char* s;
s = syncUtilprintBin((char*)(pRpcMsg->pCont), pRpcMsg->contLen); s = syncUtilprintBin((char*)(pRpcMsg->pCont), pRpcMsg->contLen);
cJSON_AddStringToObject(pRoot, "pCont", s); cJSON_AddStringToObject(pRoot, "pCont", s);
...@@ -608,6 +608,7 @@ SyncClientRequest* syncClientRequestBuild(uint32_t dataLen) { ...@@ -608,6 +608,7 @@ SyncClientRequest* syncClientRequestBuild(uint32_t dataLen) {
return pMsg; return pMsg;
} }
// step 1. original SRpcMsg => SyncClientRequest, add seqNum, isWeak
SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak) { SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak) {
SyncClientRequest* pMsg = syncClientRequestBuild(pOriginalRpcMsg->contLen); SyncClientRequest* pMsg = syncClientRequestBuild(pOriginalRpcMsg->contLen);
pMsg->originalRpcType = pOriginalRpcMsg->msgType; pMsg->originalRpcType = pOriginalRpcMsg->msgType;
...@@ -652,6 +653,7 @@ SyncClientRequest* syncClientRequestDeserialize2(const char* buf, uint32_t len) ...@@ -652,6 +653,7 @@ SyncClientRequest* syncClientRequestDeserialize2(const char* buf, uint32_t len)
return pMsg; return pMsg;
} }
// step 2. SyncClientRequest => RpcMsg, to queue
void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg) { void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg)); memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType; pRpcMsg->msgType = pMsg->msgType;
...@@ -664,6 +666,7 @@ void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg ...@@ -664,6 +666,7 @@ void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg
syncClientRequestDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); syncClientRequestDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
} }
// step 3. RpcMsg => SyncClientRequest, from queue
SyncClientRequest* syncClientRequestFromRpcMsg2(const SRpcMsg* pRpcMsg) { SyncClientRequest* syncClientRequestFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncClientRequest* pMsg = syncClientRequestDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); SyncClientRequest* pMsg = syncClientRequestDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
return pMsg; return pMsg;
......
...@@ -26,6 +26,7 @@ SSyncRaftEntry* syncEntryBuild(uint32_t dataLen) { ...@@ -26,6 +26,7 @@ SSyncRaftEntry* syncEntryBuild(uint32_t dataLen) {
return pEntry; return pEntry;
} }
// step 4. SyncClientRequest => SSyncRaftEntry, add term, index
SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) { SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) {
SSyncRaftEntry* pEntry = syncEntryBuild(pMsg->dataLen); SSyncRaftEntry* pEntry = syncEntryBuild(pMsg->dataLen);
assert(pEntry != NULL); assert(pEntry != NULL);
...@@ -48,6 +49,7 @@ void syncEntryDestory(SSyncRaftEntry* pEntry) { ...@@ -48,6 +49,7 @@ void syncEntryDestory(SSyncRaftEntry* pEntry) {
} }
} }
// step 5. SSyncRaftEntry => bin, to raft log
char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len) { char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len) {
char* buf = malloc(pEntry->bytes); char* buf = malloc(pEntry->bytes);
assert(buf != NULL); assert(buf != NULL);
...@@ -58,6 +60,7 @@ char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len) { ...@@ -58,6 +60,7 @@ char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len) {
return buf; return buf;
} }
// step 6. bin => SSyncRaftEntry, from raft log
SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len) { SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len) {
uint32_t bytes = *((uint32_t*)buf); uint32_t bytes = *((uint32_t*)buf);
SSyncRaftEntry* pEntry = malloc(bytes); SSyncRaftEntry* pEntry = malloc(bytes);
...@@ -106,6 +109,15 @@ char* syncEntry2Str(const SSyncRaftEntry* pEntry) { ...@@ -106,6 +109,15 @@ char* syncEntry2Str(const SSyncRaftEntry* pEntry) {
return serialized; return serialized;
} }
// step 7. SSyncRaftEntry => original SRpcMsg, commit to user, delete seqNum, isWeak, term, index
void syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pEntry->originalRpcType;
pRpcMsg->contLen = pEntry->dataLen;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
memcpy(pRpcMsg->pCont, pEntry->data, pRpcMsg->contLen);
}
// for debug ---------------------- // for debug ----------------------
void syncEntryPrint(const SSyncRaftEntry* pObj) { void syncEntryPrint(const SSyncRaftEntry* pObj) {
char* serialized = syncEntry2Str(pObj); char* serialized = syncEntry2Str(pObj);
......
...@@ -73,7 +73,8 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { ...@@ -73,7 +73,8 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
SyncAppendEntries* pMsg = NULL; SyncAppendEntries* pMsg = NULL;
SSyncRaftEntry* pEntry = logStoreGetEntry(pSyncNode->pLogStore, nextIndex); SSyncRaftEntry* pEntry = logStoreGetEntry(pSyncNode->pLogStore, nextIndex);
if (pEntry != NULL) { if (pEntry != NULL) {
SyncAppendEntries* pMsg = syncAppendEntriesBuild(pEntry->bytes); pMsg = syncAppendEntriesBuild(pEntry->bytes);
assert(pMsg != NULL);
// add pEntry into msg // add pEntry into msg
uint32_t len; uint32_t len;
...@@ -86,9 +87,11 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { ...@@ -86,9 +87,11 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
} else { } else {
// maybe overflow, send empty record // maybe overflow, send empty record
SyncAppendEntries* pMsg = syncAppendEntriesBuild(0); pMsg = syncAppendEntriesBuild(0);
assert(pMsg != NULL);
} }
assert(pMsg != NULL);
pMsg->srcId = pSyncNode->myRaftId; pMsg->srcId = pSyncNode->myRaftId;
pMsg->destId = *pDestId; pMsg->destId = *pDestId;
pMsg->term = pSyncNode->pRaftStore->currentTerm; pMsg->term = pSyncNode->pRaftStore->currentTerm;
......
...@@ -41,7 +41,8 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) ...@@ -41,7 +41,8 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
syncRequestVoteReplyLog2("==syncNodeOnRequestVoteReplyCb==", pMsg); syncRequestVoteReplyLog2("==syncNodeOnRequestVoteReplyCb==", pMsg);
if (pMsg->term < ths->pRaftStore->currentTerm) { if (pMsg->term < ths->pRaftStore->currentTerm) {
sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term, ths->pRaftStore->currentTerm); sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term,
ths->pRaftStore->currentTerm);
return ret; return ret;
} }
......
...@@ -14,15 +14,12 @@ ...@@ -14,15 +14,12 @@
*/ */
#include "syncUtil.h" #include "syncUtil.h"
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include "syncEnv.h" #include "syncEnv.h"
// ---- encode / decode // ---- encode / decode
uint64_t syncUtilAddr2U64(const char* host, uint16_t port) { uint64_t syncUtilAddr2U64(const char* host, uint16_t port) {
uint64_t u64; uint64_t u64;
uint32_t hostU32 = (uint32_t)inet_addr(host); uint32_t hostU32 = (uint32_t)taosInetAddr(host);
// assert(hostU32 != (uint32_t)-1); // assert(hostU32 != (uint32_t)-1);
u64 = (((uint64_t)hostU32) << 32) | (((uint32_t)port) << 16); u64 = (((uint64_t)hostU32) << 32) | (((uint32_t)port) << 16);
return u64; return u64;
...@@ -33,7 +30,7 @@ void syncUtilU642Addr(uint64_t u64, char* host, size_t len, uint16_t* port) { ...@@ -33,7 +30,7 @@ void syncUtilU642Addr(uint64_t u64, char* host, size_t len, uint16_t* port) {
struct in_addr addr; struct in_addr addr;
addr.s_addr = hostU32; addr.s_addr = hostU32;
snprintf(host, len, "%s", inet_ntoa(addr)); snprintf(host, len, "%s", taosInetNtoa(addr));
*port = (uint16_t)((u64 & 0x00000000FFFF0000) >> 16); *port = (uint16_t)((u64 & 0x00000000FFFF0000) >> 16);
} }
......
...@@ -28,6 +28,7 @@ add_executable(syncRpcMsgTest "") ...@@ -28,6 +28,7 @@ add_executable(syncRpcMsgTest "")
add_executable(syncPingTimerTest2 "") add_executable(syncPingTimerTest2 "")
add_executable(syncPingSelfTest "") add_executable(syncPingSelfTest "")
add_executable(syncElectTest "") add_executable(syncElectTest "")
add_executable(syncEncodeTest "")
target_sources(syncTest target_sources(syncTest
...@@ -150,6 +151,10 @@ target_sources(syncElectTest ...@@ -150,6 +151,10 @@ target_sources(syncElectTest
PRIVATE PRIVATE
"syncElectTest.cpp" "syncElectTest.cpp"
) )
target_sources(syncEncodeTest
PRIVATE
"syncEncodeTest.cpp"
)
target_include_directories(syncTest target_include_directories(syncTest
...@@ -307,6 +312,11 @@ target_include_directories(syncElectTest ...@@ -307,6 +312,11 @@ target_include_directories(syncElectTest
"${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc" "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
) )
target_include_directories(syncEncodeTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries(syncTest target_link_libraries(syncTest
...@@ -429,6 +439,10 @@ target_link_libraries(syncElectTest ...@@ -429,6 +439,10 @@ target_link_libraries(syncElectTest
sync sync
gtest_main gtest_main
) )
target_link_libraries(syncEncodeTest
sync
gtest_main
)
enable_testing() enable_testing()
......
...@@ -116,7 +116,9 @@ int main(int argc, char** argv) { ...@@ -116,7 +116,9 @@ int main(int argc, char** argv) {
//--------------------------- //---------------------------
while (1) { while (1) {
sTrace("while 1 sleep, state: %d, %s", gSyncNode->state, syncUtilState2String(gSyncNode->state)); sTrace("while 1 sleep, state: %d, %s, electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, electTimerMS:%d",
gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->electTimerLogicClock,
gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS);
taosMsleep(1000); taosMsleep(1000);
} }
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncRaftEntry.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncRaftLog.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
uint16_t ports[] = {7010, 7110, 7210, 7310, 7410};
int32_t replicaNum = 1;
int32_t myIndex = 0;
SRaftId ids[TSDB_MAX_REPLICA];
SSyncInfo syncInfo;
SSyncFSM* pFsm;
SWal* pWal;
SSyncNode* pSyncNode;
SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234;
syncInfo.rpcClient = gSyncIO->clientRpc;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
int code = walInit();
assert(code == 0);
SWalCfg walCfg;
memset(&walCfg, 0, sizeof(SWalCfg));
walCfg.vgId = syncInfo.vgId;
walCfg.fsyncPeriod = 1000;
walCfg.retentionPeriod = 1000;
walCfg.rollPeriod = 1000;
walCfg.retentionSize = 1000;
walCfg.segSize = 1000;
walCfg.level = TAOS_WAL_FSYNC;
pWal = walOpen("./wal_test", &walCfg);
assert(pWal != NULL);
syncInfo.pWal = pWal;
SSyncCfg* pCfg = &syncInfo.syncCfg;
pCfg->myIndex = myIndex;
pCfg->replicaNum = replicaNum;
for (int i = 0; i < replicaNum; ++i) {
pCfg->nodeInfo[i].nodePort = ports[i];
snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
}
pSyncNode = syncNodeOpen(&syncInfo);
assert(pSyncNode != NULL);
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
gSyncIO->pSyncNode = pSyncNode;
return pSyncNode;
}
SSyncNode* syncInitTest() { return syncNodeInit(); }
void logStoreTest() {
logStorePrint2((char*)"logStoreTest", pSyncNode->pLogStore);
assert(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) == SYNC_INDEX_INVALID);
for (int i = 0; i < 5; ++i) {
int32_t dataLen = 10;
SSyncRaftEntry* pEntry = syncEntryBuild(dataLen);
assert(pEntry != NULL);
pEntry->msgType = 1;
pEntry->originalRpcType = 2;
pEntry->seqNum = 3;
pEntry->isWeak = true;
pEntry->term = 100;
pEntry->index = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;
snprintf(pEntry->data, dataLen, "value%d", i);
// syncEntryPrint2((char*)"write entry:", pEntry);
pSyncNode->pLogStore->appendEntry(pSyncNode->pLogStore, pEntry);
syncEntryDestory(pEntry);
if (i == 0) {
assert(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) == SYNC_INDEX_BEGIN);
}
}
logStorePrint2((char*)"after appendEntry", pSyncNode->pLogStore);
pSyncNode->pLogStore->truncate(pSyncNode->pLogStore, 3);
logStorePrint2((char*)"after truncate 3", pSyncNode->pLogStore);
}
void initRaftId(SSyncNode* pSyncNode) {
for (int i = 0; i < replicaNum; ++i) {
ids[i] = pSyncNode->replicasId[i];
char* s = syncUtilRaftId2Str(&ids[i]);
printf("raftId[%d] : %s\n", i, s);
free(s);
}
}
SRpcMsg *step0() {
SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg));
memset(pMsg, 0, sizeof(SRpcMsg));
pMsg->msgType = 9999;
pMsg->contLen = 32;
pMsg->pCont = malloc(pMsg->contLen);
snprintf((char *)(pMsg->pCont), pMsg->contLen, "hello, world");
return pMsg;
}
SyncClientRequest *step1(const SRpcMsg *pMsg) {
SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true);
return pRetMsg;
}
SRpcMsg *step2(const SyncClientRequest *pMsg) {
SRpcMsg *pRetMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg));
syncClientRequest2RpcMsg(pMsg, pRetMsg);
return pRetMsg;
}
SyncClientRequest *step3(const SRpcMsg *pMsg) {
SyncClientRequest *pRetMsg = syncClientRequestFromRpcMsg2(pMsg);
return pRetMsg;
}
SSyncRaftEntry *step4(const SyncClientRequest *pMsg) {
SSyncRaftEntry *pRetMsg = syncEntryBuild2((SyncClientRequest *)pMsg, 100, 0);
return pRetMsg;
}
char *step5(const SSyncRaftEntry *pMsg, uint32_t *len) {
char *pRetMsg = syncEntrySerialize(pMsg, len);
return pRetMsg;
}
SSyncRaftEntry *step6(const char *pMsg, uint32_t len) {
SSyncRaftEntry *pRetMsg = syncEntryDeserialize(pMsg, len);
return pRetMsg;
}
SRpcMsg *step7(const SSyncRaftEntry *pMsg) {
SRpcMsg *pRetMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg));
syncEntry2OriginalRpc(pMsg, pRetMsg);
return pRetMsg;
}
int main(int argc, char **argv) {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
void logTest();
myIndex = 0;
if (argc >= 2) {
myIndex = atoi(argv[1]);
}
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
assert(ret == 0);
ret = syncEnvStart();
assert(ret == 0);
taosRemoveDir("./wal_test");
// step0
SRpcMsg *pMsg0 = step0();
syncRpcMsgPrint2((char *)"==step0==", pMsg0);
// step1
SyncClientRequest *pMsg1 = step1(pMsg0);
syncClientRequestPrint2((char *)"==step1==", pMsg1);
// step2
SRpcMsg *pMsg2 = step2(pMsg1);
syncRpcMsgPrint2((char *)"==step2==", pMsg2);
// step3
SyncClientRequest *pMsg3 = step3(pMsg2);
syncClientRequestPrint2((char *)"==step3==", pMsg3);
// step4
SSyncRaftEntry *pMsg4 = step4(pMsg3);
syncEntryPrint2((char *)"==step4==", pMsg4);
// log, relog
SSyncNode* pSyncNode = syncNodeInit();
assert(pSyncNode != NULL);
SSyncRaftEntry *pEntry = pMsg4;
pSyncNode->pLogStore->appendEntry(pSyncNode->pLogStore, pEntry);
SSyncRaftEntry *pEntry2 = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, pEntry->index);
syncEntryPrint2((char *)"==pEntry2==", pEntry2);
// step5
uint32_t len;
char * pMsg5 = step5(pMsg4, &len);
char * s = syncUtilprintBin(pMsg5, len);
printf("==step5== [%s] \n", s);
free(s);
// step6
SSyncRaftEntry *pMsg6 = step6(pMsg5, len);
syncEntryPrint2((char *)"==step6==", pMsg6);
// step7
SRpcMsg *pMsg7 = step7(pMsg6);
syncRpcMsgPrint2((char *)"==step7==", pMsg7);
return 0;
}
...@@ -27,7 +27,8 @@ typedef struct { ...@@ -27,7 +27,8 @@ typedef struct {
} SBtIdx; } SBtIdx;
// Btree page header definition // Btree page header definition
typedef struct __attribute__((__packed__)) { #pragma pack (push,1)
typedef struct {
uint8_t flag; // page flag uint8_t flag; // page flag
int32_t vlen; // value length of current page, TDB_VARIANT_LEN for variant length int32_t vlen; // value length of current page, TDB_VARIANT_LEN for variant length
uint16_t nPayloads; // number of total payloads uint16_t nPayloads; // number of total payloads
...@@ -36,6 +37,7 @@ typedef struct __attribute__((__packed__)) { ...@@ -36,6 +37,7 @@ typedef struct __attribute__((__packed__)) {
pgoff_t offPayload; // payload offset pgoff_t offPayload; // payload offset
pgno_t rChildPgno; // right most child page number pgno_t rChildPgno; // right most child page number
} SBtPgHdr; } SBtPgHdr;
#pragma pack(pop)
typedef int (*BtreeCmprFn)(const void *, const void *); typedef int (*BtreeCmprFn)(const void *, const void *);
......
...@@ -30,11 +30,7 @@ struct STDbEnv { ...@@ -30,11 +30,7 @@ struct STDbEnv {
} pgfht; // page file hash table; } pgfht; // page file hash table;
}; };
#define TDB_ENV_PGF_HASH(fileid) \ #define TDB_ENV_PGF_HASH(fileid) (((uint8_t *)(fileid))[0] + ((uint8_t *)(fileid))[1] + ((uint8_t *)(fileid))[2])
({ \
uint8_t *tmp = (uint8_t *)(fileid); \
tmp[0] + tmp[1] + tmp[2]; \
})
static int tdbEnvDestroy(TENV *pEnv); static int tdbEnvDestroy(TENV *pEnv);
......
...@@ -106,11 +106,7 @@ int pgCacheClose(SPgCache *pPgCache) { ...@@ -106,11 +106,7 @@ int pgCacheClose(SPgCache *pPgCache) {
return 0; return 0;
} }
#define PG_CACHE_HASH(fileid, pgno) \ #define PG_CACHE_HASH(fileid, pgno) (((uint64_t *)(fileid))[0] + ((uint64_t *)(fileid))[1] + ((uint64_t *)(fileid))[2] + (pgno))
({ \
uint64_t *tmp = (uint64_t *)(fileid); \
(tmp[0] + tmp[1] + tmp[2] + (pgno)); \
})
SPage *pgCacheFetch(SPgCache *pPgCache, pgid_t pgid) { SPage *pgCacheFetch(SPgCache *pPgCache, pgid_t pgid) {
SPage * pPage; SPage * pPage;
......
...@@ -20,13 +20,15 @@ ...@@ -20,13 +20,15 @@
extern "C" { extern "C" {
#endif #endif
typedef struct __attribute__((__packed__)) { #pragma pack (push,1)
typedef struct {
char hdrInfo[16]; // info string char hdrInfo[16]; // info string
pgsz_t szPage; // page size of current file pgsz_t szPage; // page size of current file
int32_t cno; // commit number counter int32_t cno; // commit number counter
pgno_t freePgno; // freelist page number pgno_t freePgno; // freelist page number
uint8_t resv[100]; // reserved space uint8_t resv[100]; // reserved space
} SPgFileHdr; } SPgFileHdr;
#pragma pack(pop)
#define TDB_PG_FILE_HDR_SIZE 128 #define TDB_PG_FILE_HDR_SIZE 128
......
...@@ -207,8 +207,8 @@ void cliHandleResp(SCliConn* conn) { ...@@ -207,8 +207,8 @@ void cliHandleResp(SCliConn* conn) {
} }
tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn,
TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
inet_ntoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen); taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen);
conn->secured = pHead->secured; conn->secured = pHead->secured;
...@@ -519,8 +519,8 @@ void cliSend(SCliConn* pConn) { ...@@ -519,8 +519,8 @@ void cliSend(SCliConn* pConn) {
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn,
TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
pConn->writeReq.data = pConn; pConn->writeReq.data = pConn;
uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
......
...@@ -202,7 +202,7 @@ static void uvHandleReq(SSrvConn* pConn) { ...@@ -202,7 +202,7 @@ static void uvHandleReq(SSrvConn* pConn) {
STransMsgHead* pHead = (STransMsgHead*)p->msg; STransMsgHead* pHead = (STransMsgHead*)p->msg;
if (pHead->secured == 1) { if (pHead->secured == 1) {
STransUserMsg* uMsg = (p->msg + p->msgLen - sizeof(STransUserMsg)); STransUserMsg* uMsg = (STransUserMsg*)((char*)p->msg + p->msgLen - sizeof(STransUserMsg));
memcpy(pConn->user, uMsg->user, tListLen(uMsg->user)); memcpy(pConn->user, uMsg->user, tListLen(uMsg->user));
memcpy(pConn->secret, uMsg->secret, tListLen(uMsg->secret)); memcpy(pConn->secret, uMsg->secret, tListLen(uMsg->secret));
} }
...@@ -235,12 +235,12 @@ static void uvHandleReq(SSrvConn* pConn) { ...@@ -235,12 +235,12 @@ static void uvHandleReq(SSrvConn* pConn) {
transRefSrvHandle(pConn); transRefSrvHandle(pConn);
transMsg.handle = pConn; transMsg.handle = pConn;
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType), tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType),
inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
ntohs(pConn->locaddr.sin_port), transMsg.contLen); ntohs(pConn->locaddr.sin_port), transMsg.contLen);
} else { } else {
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, no resp ", pConn, tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, no resp ", pConn,
TMSG_INFO(transMsg.msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen); taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen);
} }
STrans* pTransInst = (STrans*)p->shandle; STrans* pTransInst = (STrans*)p->shandle;
...@@ -347,7 +347,7 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { ...@@ -347,7 +347,7 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
// impl later // impl later
} }
tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType), tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType),
inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
ntohs(pConn->locaddr.sin_port)); ntohs(pConn->locaddr.sin_port));
pHead->msgLen = htonl(len); pHead->msgLen = htonl(len);
...@@ -374,8 +374,8 @@ static void uvStartSendResp(SSrvMsg* smsg) { ...@@ -374,8 +374,8 @@ static void uvStartSendResp(SSrvMsg* smsg) {
transUnrefSrvHandle(pConn); transUnrefSrvHandle(pConn);
if (taosArrayGetSize(pConn->srvMsgs) > 0) { if (taosArrayGetSize(pConn->srvMsgs) > 0) {
tDebug("server conn %p push data to client %s:%d, local info: %s:%d", pConn, inet_ntoa(pConn->addr.sin_addr), tDebug("server conn %p push data to client %s:%d, local info: %s:%d", pConn, taosInetNtoa(pConn->addr.sin_addr),
ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
taosArrayPush(pConn->srvMsgs, &smsg); taosArrayPush(pConn->srvMsgs, &smsg);
return; return;
} }
......
...@@ -19,9 +19,6 @@ ...@@ -19,9 +19,6 @@
#include "tref.h" #include "tref.h"
#include "walInt.h" #include "walInt.h"
#include <libgen.h>
#include <regex.h>
int64_t inline walGetFirstVer(SWal* pWal) { return pWal->vers.firstVer; } int64_t inline walGetFirstVer(SWal* pWal) { return pWal->vers.firstVer; }
int64_t inline walGetSnaphostVer(SWal* pWal) { return pWal->vers.snapshotVer; } int64_t inline walGetSnaphostVer(SWal* pWal) { return pWal->vers.snapshotVer; }
......
...@@ -3,9 +3,10 @@ add_library(os STATIC ${OS_SRC}) ...@@ -3,9 +3,10 @@ add_library(os STATIC ${OS_SRC})
target_include_directories( target_include_directories(
os os
PUBLIC "${CMAKE_SOURCE_DIR}/include/os" PUBLIC "${CMAKE_SOURCE_DIR}/include/os"
PRIVATE "${CMAKE_SOURCE_DIR}/include" PUBLIC "${CMAKE_SOURCE_DIR}/include"
PRIVATE "${CMAKE_SOURCE_DIR}/include/util" PUBLIC "${CMAKE_SOURCE_DIR}/include/util"
PRIVATE "${CMAKE_SOURCE_DIR}/contrib/pthread-win32" PUBLIC "${CMAKE_SOURCE_DIR}/contrib/pthread-win32"
PUBLIC "${CMAKE_SOURCE_DIR}/contrib/gnuregex"
) )
target_link_libraries( target_link_libraries(
os pthread dl rt m os pthread dl rt m
......
...@@ -427,7 +427,7 @@ int32_t compareWStrPatternMatch(const void *pLeft, const void *pRight) { ...@@ -427,7 +427,7 @@ int32_t compareWStrPatternMatch(const void *pLeft, const void *pRight) {
wchar_t *pattern = calloc(varDataLen(pRight) + 1, sizeof(wchar_t)); wchar_t *pattern = calloc(varDataLen(pRight) + 1, sizeof(wchar_t));
memcpy(pattern, varDataVal(pRight), varDataLen(pRight)); memcpy(pattern, varDataVal(pRight), varDataLen(pRight));
int32_t ret = WCSPatternMatch(pattern, varDataVal(pLeft), varDataLen(pLeft) / TSDB_NCHAR_SIZE, &pInfo); int32_t ret = WCSPatternMatch(pattern, (const wchar_t *)varDataVal(pLeft), varDataLen(pLeft) / TSDB_NCHAR_SIZE, &pInfo);
free(pattern); free(pattern);
return (ret == TSDB_PATTERN_MATCH) ? 0 : 1; return (ret == TSDB_PATTERN_MATCH) ? 0 : 1;
......
...@@ -230,7 +230,7 @@ static int32_t tSStructA_v1_decode(SCoder *pCoder, SStructA_v1 *pSAV1) { ...@@ -230,7 +230,7 @@ static int32_t tSStructA_v1_decode(SCoder *pCoder, SStructA_v1 *pSAV1) {
const char *tstr; const char *tstr;
uint64_t len; uint64_t len;
if (tDecodeCStrAndLen(pCoder, &tstr, &len) < 0) return -1; if (tDecodeCStrAndLen(pCoder, &tstr, &len) < 0) return -1;
pSAV1->A_c = (char *)TCODER_MALLOC(len + 1, pCoder); TCODER_MALLOC(pSAV1->A_c, char*, len + 1, pCoder);
memcpy(pSAV1->A_c, tstr, len + 1); memcpy(pSAV1->A_c, tstr, len + 1);
tEndDecode(pCoder); tEndDecode(pCoder);
...@@ -269,7 +269,7 @@ static int32_t tSStructA_v2_decode(SCoder *pCoder, SStructA_v2 *pSAV2) { ...@@ -269,7 +269,7 @@ static int32_t tSStructA_v2_decode(SCoder *pCoder, SStructA_v2 *pSAV2) {
const char *tstr; const char *tstr;
uint64_t len; uint64_t len;
if (tDecodeCStrAndLen(pCoder, &tstr, &len) < 0) return -1; if (tDecodeCStrAndLen(pCoder, &tstr, &len) < 0) return -1;
pSAV2->A_c = (char *)TCODER_MALLOC(len + 1, pCoder); TCODER_MALLOC(pSAV2->A_c, char*, len + 1, pCoder);
memcpy(pSAV2->A_c, tstr, len + 1); memcpy(pSAV2->A_c, tstr, len + 1);
// ------------------------NEW FIELDS DECODE------------------------------- // ------------------------NEW FIELDS DECODE-------------------------------
...@@ -305,7 +305,7 @@ static int32_t tSFinalReq_v1_encode(SCoder *pCoder, const SFinalReq_v1 *ps1) { ...@@ -305,7 +305,7 @@ static int32_t tSFinalReq_v1_encode(SCoder *pCoder, const SFinalReq_v1 *ps1) {
static int32_t tSFinalReq_v1_decode(SCoder *pCoder, SFinalReq_v1 *ps1) { static int32_t tSFinalReq_v1_decode(SCoder *pCoder, SFinalReq_v1 *ps1) {
if (tStartDecode(pCoder) < 0) return -1; if (tStartDecode(pCoder) < 0) return -1;
ps1->pA = (SStructA_v1 *)TCODER_MALLOC(sizeof(*(ps1->pA)), pCoder); TCODER_MALLOC(ps1->pA, SStructA_v1*, sizeof(*(ps1->pA)), pCoder);
if (tSStructA_v1_decode(pCoder, ps1->pA) < 0) return -1; if (tSStructA_v1_decode(pCoder, ps1->pA) < 0) return -1;
if (tDecodeI32(pCoder, &ps1->v_a) < 0) return -1; if (tDecodeI32(pCoder, &ps1->v_a) < 0) return -1;
if (tDecodeI8(pCoder, &ps1->v_b) < 0) return -1; if (tDecodeI8(pCoder, &ps1->v_b) < 0) return -1;
...@@ -339,7 +339,7 @@ static int32_t tSFinalReq_v2_encode(SCoder *pCoder, const SFinalReq_v2 *ps2) { ...@@ -339,7 +339,7 @@ static int32_t tSFinalReq_v2_encode(SCoder *pCoder, const SFinalReq_v2 *ps2) {
static int32_t tSFinalReq_v2_decode(SCoder *pCoder, SFinalReq_v2 *ps2) { static int32_t tSFinalReq_v2_decode(SCoder *pCoder, SFinalReq_v2 *ps2) {
if (tStartDecode(pCoder) < 0) return -1; if (tStartDecode(pCoder) < 0) return -1;
ps2->pA = (SStructA_v2 *)TCODER_MALLOC(sizeof(*(ps2->pA)), pCoder); TCODER_MALLOC(ps2->pA, SStructA_v2*, sizeof(*(ps2->pA)), pCoder);
if (tSStructA_v2_decode(pCoder, ps2->pA) < 0) return -1; if (tSStructA_v2_decode(pCoder, ps2->pA) < 0) return -1;
if (tDecodeI32(pCoder, &ps2->v_a) < 0) return -1; if (tDecodeI32(pCoder, &ps2->v_a) < 0) return -1;
if (tDecodeI8(pCoder, &ps2->v_b) < 0) return -1; if (tDecodeI8(pCoder, &ps2->v_b) < 0) return -1;
......
...@@ -8,7 +8,8 @@ TEST(TD_UTIL_FREELIST_TEST, simple_test) { ...@@ -8,7 +8,8 @@ TEST(TD_UTIL_FREELIST_TEST, simple_test) {
tFreeListInit(&fl); tFreeListInit(&fl);
for (size_t i = 0; i < 1000; i++) { for (size_t i = 0; i < 1000; i++) {
void *ptr = TFL_MALLOC(1024, &fl); void *ptr = NULL;
TFL_MALLOC(ptr, void*, 1024, &fl);
GTEST_ASSERT_NE(ptr, nullptr); GTEST_ASSERT_NE(ptr, nullptr);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册