未验证 提交 ca2191d6 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #9222 from taosdata/feature/tq

Feature/tq
...@@ -50,9 +50,8 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONSUME, "mq-consume" ) ...@@ -50,9 +50,8 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONSUME, "mq-consume" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_SET, "mq-set" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_SET_CUR, "mq-set-cur" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_RSP_READY, "rsp-ready" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_RSP_READY, "rsp-ready" )
// message from client to mnode // message from client to mnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_ACCT, "create-acct" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_ACCT, "create-acct" )
...@@ -225,6 +224,7 @@ typedef struct SBuildUseDBInput { ...@@ -225,6 +224,7 @@ typedef struct SBuildUseDBInput {
int32_t vgVersion; int32_t vgVersion;
} SBuildUseDBInput; } SBuildUseDBInput;
#pragma pack(push, 1) #pragma pack(push, 1)
// null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta // null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta
...@@ -343,7 +343,7 @@ typedef struct { ...@@ -343,7 +343,7 @@ typedef struct {
typedef struct { typedef struct {
char tableFname[TSDB_TABLE_FNAME_LEN]; char tableFname[TSDB_TABLE_FNAME_LEN];
char db[TSDB_FULL_DB_NAME_LEN]; char db[TSDB_FULL_DB_NAME_LEN];
int16_t type; /* operation type */ int16_t type; /* operation type */
int16_t numOfCols; /* number of schema */ int16_t numOfCols; /* number of schema */
int32_t tagValLen; int32_t tagValLen;
SSchema schema[]; SSchema schema[];
...@@ -545,8 +545,8 @@ typedef struct { ...@@ -545,8 +545,8 @@ typedef struct {
int32_t sqlstrLen; // sql query string int32_t sqlstrLen; // sql query string
int32_t prevResultLen; // previous result length int32_t prevResultLen; // previous result length
int32_t numOfOperator; int32_t numOfOperator;
int32_t tableScanOperator; // table scan operator. -1 means no scan operator int32_t tableScanOperator;// table scan operator. -1 means no scan operator
int32_t udfNum; // number of udf function int32_t udfNum; // number of udf function
int32_t udfContentOffset; int32_t udfContentOffset;
int32_t udfContentLen; int32_t udfContentLen;
SColumnInfo tableCols[]; SColumnInfo tableCols[];
...@@ -1005,27 +1005,35 @@ typedef struct { ...@@ -1005,27 +1005,35 @@ typedef struct {
// mq related // mq related
typedef struct { typedef struct {
} SMqConnectReq; } SMqConnectReq;
typedef struct { typedef struct {
} SMqConnectRsp; } SMqConnectRsp;
typedef struct { typedef struct {
} SMqDisconnectReq; } SMqDisconnectReq;
typedef struct { typedef struct {
} SMqDisconnectRsp; } SMqDisconnectRsp;
typedef struct { typedef struct {
} SMqAckReq; } SMqAckReq;
typedef struct { typedef struct {
} SMqAckRsp; } SMqAckRsp;
typedef struct { typedef struct {
} SMqResetReq; } SMqResetReq;
typedef struct { typedef struct {
} SMqResetRsp; } SMqResetRsp;
// mq related end // mq related end
......
...@@ -16,86 +16,76 @@ ...@@ -16,86 +16,76 @@
#ifndef _TD_TQ_H_ #ifndef _TD_TQ_H_
#define _TD_TQ_H_ #define _TD_TQ_H_
#include "common.h"
#include "mallocator.h" #include "mallocator.h"
#include "os.h" #include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tlist.h"
#include "tutil.h" #include "tutil.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
typedef struct TmqMsgHead { typedef struct STqMsgHead {
int32_t protoVer; int32_t protoVer;
int32_t msgType; int32_t msgType;
int64_t cgId; int64_t cgId;
int64_t clientId; int64_t clientId;
} TmqMsgHead; } STqMsgHead;
typedef struct TmqOneAck { typedef struct STqOneAck {
int64_t topicId; int64_t topicId;
int64_t consumeOffset; int64_t consumeOffset;
} TmqOneAck; } STqOneAck;
typedef struct TmqAcks { typedef struct STqAcks {
int32_t ackNum; int32_t ackNum;
// should be sorted // should be sorted
TmqOneAck acks[]; STqOneAck acks[];
} TmqAcks; } STqAcks;
// TODO: put msgs into common typedef struct STqSetCurReq {
typedef struct TmqConnectReq { STqMsgHead head;
TmqMsgHead head; int64_t topicId;
TmqAcks acks; int64_t offset;
} TmqConnectReq; } STqSetCurReq;
typedef struct TmqConnectRsp {
TmqMsgHead head;
int8_t status;
} TmqConnectRsp;
typedef struct TmqDisconnectReq {
TmqMsgHead head;
} TmqDiscconectReq;
typedef struct TmqDisconnectRsp {
TmqMsgHead head;
int8_t status;
} TmqDisconnectRsp;
typedef struct STqConsumeReq { typedef struct STqConsumeReq {
TmqMsgHead head; STqMsgHead head;
TmqAcks acks; STqAcks acks;
} STqConsumeReq; } STqConsumeReq;
typedef struct TmqMsgContent { typedef struct STqMsgContent {
int64_t topicId; int64_t topicId;
int64_t msgLen; int64_t msgLen;
char msg[]; char msg[];
} TmqMsgContent; } STqMsgContent;
typedef struct STqConsumeRsp { typedef struct STqConsumeRsp {
TmqMsgHead head; STqMsgHead head;
int64_t bodySize; int64_t bodySize;
TmqMsgContent msgs[]; STqMsgContent msgs[];
} STqConsumeRsp; } STqConsumeRsp;
typedef struct TmqSubscribeReq { typedef struct STqSubscribeReq {
TmqMsgHead head; STqMsgHead head;
int32_t topicNum; int32_t topicNum;
int64_t topic[]; int64_t topic[];
} TmqSubscribeReq; } STqSubscribeReq;
typedef struct tmqSubscribeRsp { typedef struct STqSubscribeRsp {
TmqMsgHead head; STqMsgHead head;
int64_t vgId; int64_t vgId;
char ep[TSDB_EP_LEN]; // TSDB_EP_LEN char ep[TSDB_EP_LEN]; // TSDB_EP_LEN
} TmqSubscribeRsp; } STqSubscribeRsp;
typedef struct TmqHeartbeatReq { typedef struct STqHeartbeatReq {
} TmqHeartbeatReq; } STqHeartbeatReq;
typedef struct TmqHeartbeatRsp { typedef struct STqHeartbeatRsp {
} TmqHeartbeatRsp; } STqHeartbeatRsp;
typedef struct STqTopicVhandle { typedef struct STqTopicVhandle {
int64_t topicId; int64_t topicId;
...@@ -108,48 +98,54 @@ typedef struct STqTopicVhandle { ...@@ -108,48 +98,54 @@ typedef struct STqTopicVhandle {
#define TQ_BUFFER_SIZE 8 #define TQ_BUFFER_SIZE 8
typedef struct STqExec {
void* runtimeEnv;
SSDataBlock* (*exec)(void* runtimeEnv);
void* (*assign)(void* runtimeEnv, SSubmitBlk* inputData);
void (*clear)(void* runtimeEnv);
char* (*serialize)(struct STqExec*);
struct STqExec* (*deserialize)(char*);
} STqExec;
typedef struct STqBufferItem { typedef struct STqBufferItem {
int64_t offset; int64_t offset;
// executors are identical but not concurrent // executors are identical but not concurrent
// so there must be a copy in each item // so there must be a copy in each item
void* executor; STqExec* executor;
int64_t size; int32_t status;
void* content; int64_t size;
} STqBufferItem; void* content;
} STqMsgItem;
typedef struct STqBufferHandle { typedef struct STqTopic {
// char* topic; //c style, end with '\0' // char* topic; //c style, end with '\0'
// int64_t cgId; // int64_t cgId;
// void* ahandle; // void* ahandle;
int64_t nextConsumeOffset; int64_t nextConsumeOffset;
int64_t floatingCursor; int64_t floatingCursor;
int64_t topicId; int64_t topicId;
int32_t head; int32_t head;
int32_t tail; int32_t tail;
STqBufferItem buffer[TQ_BUFFER_SIZE]; STqMsgItem buffer[TQ_BUFFER_SIZE];
} STqBufferHandle; } STqTopic;
typedef struct STqListHandle { typedef struct STqListHandle {
STqBufferHandle bufHandle; STqTopic topic;
struct STqListHandle* next; struct STqListHandle* next;
} STqListHandle; } STqList;
typedef struct STqGroupHandle { typedef struct STqGroup {
int64_t cId; int64_t clientId;
int64_t cgId; int64_t cgId;
void* ahandle; void* ahandle;
int32_t topicNum; int32_t topicNum;
STqListHandle* head; STqList* head;
} STqGroupHandle; SList* topicList; // SList<STqTopic>
void* returnMsg; // SVReadMsg
typedef struct STqQueryExec { } STqGroup;
void* src;
STqBufferItem* dest;
void* executor;
} STqQueryExec;
typedef struct STqQueryMsg { typedef struct STqQueryMsg {
STqQueryExec* exec; STqMsgItem* item;
struct STqQueryMsg* next; struct STqQueryMsg* next;
} STqQueryMsg; } STqQueryMsg;
...@@ -209,15 +205,15 @@ typedef void (*FTqDelete)(void*); ...@@ -209,15 +205,15 @@ typedef void (*FTqDelete)(void*);
#define TQ_DUP_INTXN_REWRITE 0 #define TQ_DUP_INTXN_REWRITE 0
#define TQ_DUP_INTXN_REJECT 2 #define TQ_DUP_INTXN_REJECT 2
static inline bool TqUpdateAppend(int32_t tqConfigFlag) { return tqConfigFlag & TQ_UPDATE_APPEND; } static inline bool tqUpdateAppend(int32_t tqConfigFlag) { return tqConfigFlag & TQ_UPDATE_APPEND; }
static inline bool TqDupIntxnReject(int32_t tqConfigFlag) { return tqConfigFlag & TQ_DUP_INTXN_REJECT; } static inline bool tqDupIntxnReject(int32_t tqConfigFlag) { return tqConfigFlag & TQ_DUP_INTXN_REJECT; }
static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST; static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST;
#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE #define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE
typedef struct TqMetaHandle { typedef struct STqMetaHandle {
int64_t key; int64_t key;
int64_t offset; int64_t offset;
int64_t serializedSize; int64_t serializedSize;
...@@ -225,23 +221,25 @@ typedef struct TqMetaHandle { ...@@ -225,23 +221,25 @@ typedef struct TqMetaHandle {
void* valueInTxn; void* valueInTxn;
} STqMetaHandle; } STqMetaHandle;
typedef struct TqMetaList { typedef struct STqMetaList {
STqMetaHandle handle; STqMetaHandle handle;
struct TqMetaList* next; struct STqMetaList* next;
// struct TqMetaList* inTxnPrev; // struct STqMetaList* inTxnPrev;
// struct TqMetaList* inTxnNext; // struct STqMetaList* inTxnNext;
struct TqMetaList* unpersistPrev; struct STqMetaList* unpersistPrev;
struct TqMetaList* unpersistNext; struct STqMetaList* unpersistNext;
} STqMetaList; } STqMetaList;
typedef struct TqMetaStore { typedef struct STqMetaStore {
STqMetaList* bucket[TQ_BUCKET_SIZE]; STqMetaList* bucket[TQ_BUCKET_SIZE];
// a table head // a table head
STqMetaList* unpersistHead; STqMetaList* unpersistHead;
// TODO:temporaral use, to be replaced by unified tfile // TODO:temporaral use, to be replaced by unified tfile
int fileFd; int fileFd;
// TODO:temporaral use, to be replaced by unified tfile // TODO:temporaral use, to be replaced by unified tfile
int idxFd; int idxFd;
char* dirPath; char* dirPath;
int32_t tqConfigFlag; int32_t tqConfigFlag;
FTqSerialize pSerializer; FTqSerialize pSerializer;
...@@ -250,8 +248,8 @@ typedef struct TqMetaStore { ...@@ -250,8 +248,8 @@ typedef struct TqMetaStore {
} STqMetaStore; } STqMetaStore;
typedef struct STQ { typedef struct STQ {
// the collection of group handle // the collection of groups
// the handle of kvstore // the handle of meta kvstore
char* path; char* path;
STqCfg* tqConfig; STqCfg* tqConfig;
STqLogReader* tqLogReader; STqLogReader* tqLogReader;
...@@ -266,23 +264,25 @@ void tqClose(STQ*); ...@@ -266,23 +264,25 @@ void tqClose(STQ*);
// void* will be replace by a msg type // void* will be replace by a msg type
int tqPushMsg(STQ*, void* msg, int64_t version); int tqPushMsg(STQ*, void* msg, int64_t version);
int tqCommit(STQ*); int tqCommit(STQ*);
int tqSetCursor(STQ*, void* msg);
int tqConsume(STQ*, STqConsumeReq*); int tqConsume(STQ*, STqConsumeReq*);
STqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId); int tqSetCursor(STQ*, STqSetCurReq* pMsg);
int tqBufferSetOffset(STqTopic*, int64_t offset);
STqTopic* tqFindTopic(STqGroup*, int64_t topicId);
STqGroup* tqGetGroup(STQ*, int64_t clientId);
STqGroup* tqOpenGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int tqCloseGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int tqRegisterContext(STqGroup*, void* ahandle);
int tqSendLaunchQuery(STqMsgItem*, int64_t offset);
STqGroupHandle* tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); int tqSerializeGroup(const STqGroup*, STqSerializedHead**);
int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int tqMoveOffsetToNext(STqGroupHandle*);
int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset);
int tqRegisterContext(STqGroupHandle*, void* ahandle);
int tqLaunchQuery(STqGroupHandle*);
int tqSendLaunchQuery(STqGroupHandle*);
int tqSerializeGroupHandle(const STqGroupHandle* gHandle, STqSerializedHead** ppHead); const void* tqDeserializeGroup(const STqSerializedHead*, STqGroup**);
const void* tqDeserializeGroupHandle(const STqSerializedHead* pHead, STqGroupHandle** gHandle); static int tqQueryExecuting(int32_t status) { return status; }
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -339,6 +339,20 @@ int32_t* taosGetErrno(); ...@@ -339,6 +339,20 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SYN_INVALID_MSGLEN TAOS_DEF_ERROR_CODE(0, 0x0909) //"Invalid msg length") #define TSDB_CODE_SYN_INVALID_MSGLEN TAOS_DEF_ERROR_CODE(0, 0x0909) //"Invalid msg length")
#define TSDB_CODE_SYN_INVALID_MSGTYPE TAOS_DEF_ERROR_CODE(0, 0x090A) //"Invalid msg type") #define TSDB_CODE_SYN_INVALID_MSGTYPE TAOS_DEF_ERROR_CODE(0, 0x090A) //"Invalid msg type")
// tq
#define TSDB_CODE_TQ_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0A00) //"Invalid configuration")
#define TSDB_CODE_TQ_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0A01) //"Tq init failed")
#define TSDB_CODE_TQ_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x0A02) //"No diskspace for tq")
#define TSDB_CODE_TQ_NO_DISK_PERMISSIONS TAOS_DEF_ERROR_CODE(0, 0x0A03) //"No permission for disk files")
#define TSDB_CODE_TQ_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0A04) //"Data file(s) corrupted")
#define TSDB_CODE_TQ_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0A05) //"Out of memory")
#define TSDB_CODE_TQ_FILE_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x0A06) //"File already exists")
#define TSDB_CODE_TQ_FAILED_TO_CREATE_DIR TAOS_DEF_ERROR_CODE(0, 0x0A07) //"Failed to create dir")
#define TSDB_CODE_TQ_META_NO_SUCH_KEY TAOS_DEF_ERROR_CODE(0, 0x0A08) //"Target key not found")
#define TSDB_CODE_TQ_META_KEY_NOT_IN_TXN TAOS_DEF_ERROR_CODE(0, 0x0A09) //"Target key not in transaction")
#define TSDB_CODE_TQ_META_KEY_DUP_IN_TXN TAOS_DEF_ERROR_CODE(0, 0x0A0A) //"Target key duplicated in transaction")
#define TSDB_CODE_TQ_GROUP_NOT_SET TAOS_DEF_ERROR_CODE(0, 0x0A0B) //"Group of corresponding client is not set by mnode")
// wal // wal
#define TSDB_CODE_WAL_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x1000) //"Unexpected generic error in wal") #define TSDB_CODE_WAL_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x1000) //"Unexpected generic error in wal")
#define TSDB_CODE_WAL_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x1001) //"WAL file is corrupted") #define TSDB_CODE_WAL_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x1001) //"WAL file is corrupted")
......
...@@ -42,11 +42,11 @@ extern int32_t qDebugFlag; ...@@ -42,11 +42,11 @@ extern int32_t qDebugFlag;
extern int32_t wDebugFlag; extern int32_t wDebugFlag;
extern int32_t sDebugFlag; extern int32_t sDebugFlag;
extern int32_t tsdbDebugFlag; extern int32_t tsdbDebugFlag;
extern int32_t tqDebugFlag;
extern int32_t cqDebugFlag; extern int32_t cqDebugFlag;
extern int32_t debugFlag; extern int32_t debugFlag;
extern int32_t ctgDebugFlag; extern int32_t ctgDebugFlag;
#define DEBUG_FATAL 1U #define DEBUG_FATAL 1U
#define DEBUG_ERROR DEBUG_FATAL #define DEBUG_ERROR DEBUG_FATAL
#define DEBUG_WARN 2U #define DEBUG_WARN 2U
......
...@@ -44,7 +44,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { ...@@ -44,7 +44,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = dndProcessVnodeQueryMsg; pMgmt->msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_MQ_SET] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_MQ_SET_CUR] = dndProcessVnodeWriteMsg;
// msg from client to mnode // msg from client to mnode
pMgmt->msgFp[TSDB_MSG_TYPE_CONNECT] = dndProcessMnodeReadMsg; pMgmt->msgFp[TSDB_MSG_TYPE_CONNECT] = dndProcessMnodeReadMsg;
......
...@@ -16,17 +16,13 @@ ...@@ -16,17 +16,13 @@
#include "vnodeDef.h" #include "vnodeDef.h"
int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) { int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) {
SVnodeReq *pVnodeReq;
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TSDB_MSG_TYPE_MQ_SET: case TSDB_MSG_TYPE_MQ_SET_CUR:
if (tqSetCursor(pVnode->pTq, pMsg->pCont) < 0) { if (tqSetCursor(pVnode->pTq, pMsg->pCont) < 0) {
// TODO: handle error // TODO: handle error
} }
break; break;
} }
void *pBuf = pMsg->pCont;
return 0; return 0;
} }
......
...@@ -11,6 +11,7 @@ target_link_libraries( ...@@ -11,6 +11,7 @@ target_link_libraries(
PUBLIC wal PUBLIC wal
PUBLIC os PUBLIC os
PUBLIC util PUBLIC util
PUBLIC common
) )
if(${BUILD_TEST}) if(${BUILD_TEST})
......
...@@ -17,11 +17,20 @@ ...@@ -17,11 +17,20 @@
#define _TD_TQ_INT_H_ #define _TD_TQ_INT_H_
#include "tq.h" #include "tq.h"
#include "tlog.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
extern int32_t tqDebugFlag;
#define tqFatal(...) { if (tqDebugFlag & DEBUG_FATAL) { taosPrintLog("TQ FATAL ", 255, __VA_ARGS__); }}
#define tqError(...) { if (tqDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ ERROR ", 255, __VA_ARGS__); }}
#define tqWarn(...) { if (tqDebugFlag & DEBUG_WARN) { taosPrintLog("TQ WARN ", 255, __VA_ARGS__); }}
#define tqInfo(...) { if (tqDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", 255, __VA_ARGS__); }}
#define tqDebug(...) { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", tqDebugFlag, __VA_ARGS__); }}
#define tqTrace(...) { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", tqDebugFlag, __VA_ARGS__); }}
// create persistent storage for meta info such as consuming offset // create persistent storage for meta info such as consuming offset
// return value > 0: cgId // return value > 0: cgId
// return value <= 0: error code // return value <= 0: error code
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
#define _TQ_META_STORE_H_ #define _TQ_META_STORE_H_
#include "os.h" #include "os.h"
#include "tq.h" #include "tqInt.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
......
...@@ -24,89 +24,80 @@ ...@@ -24,89 +24,80 @@
// handle management message // handle management message
// //
int tqGetgHandleSSize(const STqGroupHandle* gHandle); int tqGroupSSize(const STqGroup* pGroup);
int tqBufHandleSSize(); int tqTopicSSize();
int tqBufItemSSize(); int tqItemSSize();
STqGroupHandle* tqFindHandle(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { void* tqSerializeListHandle(STqList* listHandle, void* ptr);
STqGroupHandle* gHandle; void* tqSerializeTopic(STqTopic* pTopic, void* ptr);
return NULL; void* tqSerializeItem(STqMsgItem* pItem, void* ptr);
}
void* tqSerializeListHandle(STqListHandle* listHandle, void* ptr); const void* tqDeserializeTopic(const void* pBytes, STqTopic* pTopic);
void* tqSerializeBufHandle(STqBufferHandle* bufHandle, void* ptr); const void* tqDeserializeItem(const void* pBytes, STqMsgItem* pItem);
void* tqSerializeBufItem(STqBufferItem* bufItem, void* ptr);
const void* tqDeserializeBufHandle(const void* pBytes, STqBufferHandle* bufHandle);
const void* tqDeserializeBufItem(const void* pBytes, STqBufferItem* bufItem);
STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac) { STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac) {
STQ* pTq = malloc(sizeof(STQ)); STQ* pTq = malloc(sizeof(STQ));
if (pTq == NULL) { if (pTq == NULL) {
// TODO: memory error terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pTq->path = strdup(path); pTq->path = strdup(path);
pTq->tqConfig = tqConfig; pTq->tqConfig = tqConfig;
pTq->tqLogReader = tqLogReader; pTq->tqLogReader = tqLogReader;
pTq->tqMemRef.pAlloctorFactory = allocFac; pTq->tqMemRef.pAlloctorFactory = allocFac;
// pTq->tqMemRef.pAllocator = allocFac->create(allocFac); pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
if (pTq->tqMemRef.pAllocator == NULL) { if (pTq->tqMemRef.pAllocator == NULL) {
// TODO // TODO: error code of buffer pool
} }
pTq->tqMeta = pTq->tqMeta = tqStoreOpen(path, (FTqSerialize)tqSerializeGroup, (FTqDeserialize)tqDeserializeGroup, free, 0);
tqStoreOpen(path, (FTqSerialize)tqSerializeGroupHandle, (FTqDeserialize)tqDeserializeGroupHandle, free, 0);
if (pTq->tqMeta == NULL) { if (pTq->tqMeta == NULL) {
// TODO: free STQ // TODO: free STQ
return NULL; return NULL;
} }
return pTq; return pTq;
} }
void tqClose(STQ* pTq) {
void tqClose(STQ*pTq) {
// TODO // TODO
} }
static int tqProtoCheck(TmqMsgHead *pMsg) { static int tqProtoCheck(STqMsgHead* pMsg) { return pMsg->protoVer == 0; }
return pMsg->protoVer == 0;
}
static int tqAckOneTopic(STqBufferHandle* bHandle, TmqOneAck* pAck, STqQueryMsg** ppQuery) { static int tqAckOneTopic(STqTopic* pTopic, STqOneAck* pAck, STqQueryMsg** ppQuery) {
// clean old item and move forward // clean old item and move forward
int32_t consumeOffset = pAck->consumeOffset; int32_t consumeOffset = pAck->consumeOffset;
int idx = consumeOffset % TQ_BUFFER_SIZE; int idx = consumeOffset % TQ_BUFFER_SIZE;
ASSERT(bHandle->buffer[idx].content && bHandle->buffer[idx].executor); ASSERT(pTopic->buffer[idx].content && pTopic->buffer[idx].executor);
tfree(bHandle->buffer[idx].content); tfree(pTopic->buffer[idx].content);
if (1 /* TODO: need to launch new query */) { if (1 /* TODO: need to launch new query */) {
STqQueryMsg* pNewQuery = malloc(sizeof(STqQueryMsg)); STqQueryMsg* pNewQuery = malloc(sizeof(STqQueryMsg));
if (pNewQuery == NULL) { if (pNewQuery == NULL) {
// TODO: memory insufficient terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return -1; return -1;
} }
// TODO: lock executor // TODO: lock executor
pNewQuery->exec->executor = bHandle->buffer[idx].executor;
// TODO: read from wal and assign to src // TODO: read from wal and assign to src
pNewQuery->exec->src = 0; /*pNewQuery->exec->executor = pTopic->buffer[idx].executor;*/
pNewQuery->exec->dest = &bHandle->buffer[idx]; /*pNewQuery->exec->src = 0;*/
pNewQuery->next = *ppQuery; /*pNewQuery->exec->dest = &pTopic->buffer[idx];*/
*ppQuery = pNewQuery; /*pNewQuery->next = *ppQuery;*/
/**ppQuery = pNewQuery;*/
} }
return 0; return 0;
} }
static int tqAck(STqGroupHandle* gHandle, TmqAcks* pAcks) { static int tqAck(STqGroup* pGroup, STqAcks* pAcks) {
int32_t ackNum = pAcks->ackNum; int32_t ackNum = pAcks->ackNum;
TmqOneAck* acks = pAcks->acks; STqOneAck* acks = pAcks->acks;
// double ptr for acks and list // double ptr for acks and list
int i = 0; int i = 0;
STqListHandle* node = gHandle->head; STqList* node = pGroup->head;
int ackCnt = 0; int ackCnt = 0;
STqQueryMsg* pQuery = NULL; STqQueryMsg* pQuery = NULL;
while (i < ackNum && node->next) { while (i < ackNum && node->next) {
if (acks[i].topicId == node->next->bufHandle.topicId) { if (acks[i].topicId == node->next->topic.topicId) {
ackCnt++; ackCnt++;
tqAckOneTopic(&node->next->bufHandle, &acks[i], &pQuery); tqAckOneTopic(&node->next->topic, &acks[i], &pQuery);
} else if (acks[i].topicId < node->next->bufHandle.topicId) { } else if (acks[i].topicId < node->next->topic.topicId) {
i++; i++;
} else { } else {
node = node->next; node = node->next;
...@@ -118,52 +109,56 @@ static int tqAck(STqGroupHandle* gHandle, TmqAcks* pAcks) { ...@@ -118,52 +109,56 @@ static int tqAck(STqGroupHandle* gHandle, TmqAcks* pAcks) {
return ackCnt; return ackCnt;
} }
static int tqCommitTCGroup(STqGroupHandle* handle) { static int tqCommitGroup(STqGroup* pGroup) {
// persist modification into disk // persist modification into disk
return 0; return 0;
} }
int tqCreateTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId, STqGroupHandle** handle) { int tqCreateGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId, STqGroup** ppGroup) {
// create in disk // create in disk
STqGroupHandle* gHandle = (STqGroupHandle*)malloc(sizeof(STqGroupHandle)); STqGroup* pGroup = (STqGroup*)malloc(sizeof(STqGroup));
if (gHandle == NULL) { if (pGroup == NULL) {
// TODO // TODO
return -1; return -1;
} }
memset(gHandle, 0, sizeof(STqGroupHandle)); *ppGroup = pGroup;
memset(pGroup, 0, sizeof(STqGroup));
return 0; return 0;
} }
STqGroupHandle* tqOpenTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { STqGroup* tqOpenGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
STqGroupHandle* gHandle = tqHandleGet(pTq->tqMeta, cId); STqGroup* pGroup = tqHandleGet(pTq->tqMeta, cId);
if (gHandle == NULL) { if (pGroup == NULL) {
int code = tqCreateTCGroup(pTq, topicId, cgId, cId, &gHandle); int code = tqCreateGroup(pTq, topicId, cgId, cId, &pGroup);
if (code != 0) { if (code < 0) {
// TODO // TODO
return NULL; return NULL;
} }
tqHandleMovePut(pTq->tqMeta, cId, pGroup);
} }
ASSERT(pGroup);
// create return pGroup;
// open
return gHandle;
} }
int tqCloseTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { return 0; } int tqCloseGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
// TODO
return 0;
}
int tqDropTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { int tqDropGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
// delete from disk // delete from disk
return 0; return 0;
} }
static int tqFetch(STqGroupHandle* gHandle, void** msg) { static int tqFetch(STqGroup* pGroup, void** msg) {
STqListHandle* head = gHandle->head; STqList* head = pGroup->head;
STqListHandle* node = head; STqList* node = head;
int totSize = 0; int totSize = 0;
// TODO: make it a macro // TODO: make it a macro
int sizeLimit = 4 * 1024; int sizeLimit = 4 * 1024;
TmqMsgContent* buffer = malloc(sizeLimit); STqMsgContent* buffer = malloc(sizeLimit);
if (buffer == NULL) { if (buffer == NULL) {
// TODO:memory insufficient // TODO:memory insufficient
return -1; return -1;
...@@ -172,25 +167,25 @@ static int tqFetch(STqGroupHandle* gHandle, void** msg) { ...@@ -172,25 +167,25 @@ static int tqFetch(STqGroupHandle* gHandle, void** msg) {
// until all topic iterated or msgs over sizeLimit // until all topic iterated or msgs over sizeLimit
while (node->next) { while (node->next) {
node = node->next; node = node->next;
STqBufferHandle* bufHandle = &node->bufHandle; STqTopic* topicHandle = &node->topic;
int idx = bufHandle->nextConsumeOffset % TQ_BUFFER_SIZE; int idx = topicHandle->nextConsumeOffset % TQ_BUFFER_SIZE;
if (bufHandle->buffer[idx].content != NULL && bufHandle->buffer[idx].offset == bufHandle->nextConsumeOffset) { if (topicHandle->buffer[idx].content != NULL && topicHandle->buffer[idx].offset == topicHandle->nextConsumeOffset) {
totSize += bufHandle->buffer[idx].size; totSize += topicHandle->buffer[idx].size;
if (totSize > sizeLimit) { if (totSize > sizeLimit) {
void* ptr = realloc(buffer, totSize); void* ptr = realloc(buffer, totSize);
if (ptr == NULL) { if (ptr == NULL) {
totSize -= bufHandle->buffer[idx].size; totSize -= topicHandle->buffer[idx].size;
// TODO:memory insufficient // TODO:memory insufficient
// return msgs already copied // return msgs already copied
break; break;
} }
} }
*((int64_t*)buffer) = bufHandle->topicId; *((int64_t*)buffer) = topicHandle->topicId;
buffer = POINTER_SHIFT(buffer, sizeof(int64_t)); buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
*((int64_t*)buffer) = bufHandle->buffer[idx].size; *((int64_t*)buffer) = topicHandle->buffer[idx].size;
buffer = POINTER_SHIFT(buffer, sizeof(int64_t)); buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
memcpy(buffer, bufHandle->buffer[idx].content, bufHandle->buffer[idx].size); memcpy(buffer, topicHandle->buffer[idx].content, topicHandle->buffer[idx].size);
buffer = POINTER_SHIFT(buffer, bufHandle->buffer[idx].size); buffer = POINTER_SHIFT(buffer, topicHandle->buffer[idx].size);
if (totSize > sizeLimit) { if (totSize > sizeLimit) {
break; break;
} }
...@@ -199,11 +194,19 @@ static int tqFetch(STqGroupHandle* gHandle, void** msg) { ...@@ -199,11 +194,19 @@ static int tqFetch(STqGroupHandle* gHandle, void** msg) {
return totSize; return totSize;
} }
STqGroupHandle* tqGetGroupHandle(STQ* pTq, int64_t cId) { return NULL; } STqGroup* tqGetGroup(STQ* pTq, int64_t clientId) { return tqHandleGet(pTq->tqMeta, clientId); }
int tqLaunchQuery(STqGroupHandle* gHandle) { return 0; }
int tqSendLaunchQuery(STqGroupHandle* gHandle) { return 0; } int tqSendLaunchQuery(STqMsgItem* bufItem, int64_t offset) {
if (tqQueryExecuting(bufItem->status)) {
return 0;
}
bufItem->status = 1;
// load data from wal or buffer pool
// put into exec
// send exec into non blocking queue
// when query finished, put into buffer pool
return 0;
}
/*int tqMoveOffsetToNext(TqGroupHandle* gHandle) {*/ /*int tqMoveOffsetToNext(TqGroupHandle* gHandle) {*/
/*return 0;*/ /*return 0;*/
...@@ -220,23 +223,96 @@ int tqCommit(STQ* pTq) { ...@@ -220,23 +223,96 @@ int tqCommit(STQ* pTq) {
return 0; return 0;
} }
int tqSetCursor(STQ* pTq, void* msg) { int tqBufferSetOffset(STqTopic* pTopic, int64_t offset) {
int code;
memset(pTopic->buffer, 0, sizeof(pTopic->buffer));
// launch query
for (int i = offset; i < offset + TQ_BUFFER_SIZE; i++) {
int pos = i % TQ_BUFFER_SIZE;
code = tqSendLaunchQuery(&pTopic->buffer[pos], offset);
if (code < 0) {
// TODO: error handling
}
}
// set offset
pTopic->nextConsumeOffset = offset;
pTopic->floatingCursor = offset;
return 0;
}
STqTopic* tqFindTopic(STqGroup* pGroup, int64_t topicId) {
// TODO
return NULL;
}
int tqSetCursor(STQ* pTq, STqSetCurReq* pMsg) {
int code;
int64_t clientId = pMsg->head.clientId;
int64_t topicId = pMsg->topicId;
int64_t offset = pMsg->offset;
STqGroup* gHandle = tqGetGroup(pTq, clientId);
if (gHandle == NULL) {
// client not connect
return -1;
}
STqTopic* topicHandle = tqFindTopic(gHandle, topicId);
if (topicHandle == NULL) {
return -1;
}
if (pMsg->offset == topicHandle->nextConsumeOffset) {
return 0;
}
// TODO: check log last version
code = tqBufferSetOffset(topicHandle, offset);
if (code < 0) {
// set error code
return -1;
}
return 0;
}
int tqConsume(STQ* pTq, STqConsumeReq* pMsg) {
int64_t clientId = pMsg->head.clientId;
STqGroup* pGroup = tqGetGroup(pTq, clientId);
if (pGroup == NULL) {
terrno = TSDB_CODE_TQ_GROUP_NOT_SET;
return -1;
}
STqConsumeRsp* pRsp = (STqConsumeRsp*)pMsg;
int numOfMsgs = tqFetch(pGroup, (void**)&pRsp->msgs);
if (numOfMsgs < 0) {
return -1;
}
if (numOfMsgs == 0) {
// most recent data has been fetched
// enable timer for blocking wait
// once new data written during wait time
// launch query and response
}
// fetched a num of msgs, rpc response
return 0; return 0;
} }
#if 0
int tqConsume(STQ* pTq, STqConsumeReq* pMsg) { int tqConsume(STQ* pTq, STqConsumeReq* pMsg) {
if (!tqProtoCheck((TmqMsgHead*)pMsg)) { if (!tqProtoCheck((STqMsgHead*)pMsg)) {
// proto version invalid // proto version invalid
return -1; return -1;
} }
int64_t clientId = pMsg->head.clientId; int64_t clientId = pMsg->head.clientId;
STqGroupHandle* gHandle = tqGetGroupHandle(pTq, clientId); STqGroup* pGroup = tqGetGroup(pTq, clientId);
if (gHandle == NULL) { if (pGroup == NULL) {
// client not connect // client not connect
return -1; return -1;
} }
if (pMsg->acks.ackNum != 0) { if (pMsg->acks.ackNum != 0) {
if (tqAck(gHandle, &pMsg->acks) != 0) { if (tqAck(pGroup, &pMsg->acks) != 0) {
// ack not success // ack not success
return -1; return -1;
} }
...@@ -244,22 +320,23 @@ int tqConsume(STQ* pTq, STqConsumeReq* pMsg) { ...@@ -244,22 +320,23 @@ int tqConsume(STQ* pTq, STqConsumeReq* pMsg) {
STqConsumeRsp* pRsp = (STqConsumeRsp*)pMsg; STqConsumeRsp* pRsp = (STqConsumeRsp*)pMsg;
if (tqFetch(gHandle, (void**)&pRsp->msgs) <= 0) { if (tqFetch(pGroup, (void**)&pRsp->msgs) <= 0) {
// fetch error // fetch error
return -1; return -1;
} }
// judge and launch new query // judge and launch new query
if (tqLaunchQuery(gHandle)) { /*if (tqSendLaunchQuery(gHandle)) {*/
// launch query error // launch query error
return -1; /*return -1;*/
} /*}*/
return 0; return 0;
} }
#endif
int tqSerializeGroupHandle(const STqGroupHandle* gHandle, STqSerializedHead** ppHead) { int tqSerializeGroup(const STqGroup* pGroup, STqSerializedHead** ppHead) {
// calculate size // calculate size
int sz = tqGetgHandleSSize(gHandle) + sizeof(STqSerializedHead); int sz = tqGroupSSize(pGroup) + sizeof(STqSerializedHead);
if (sz > (*ppHead)->ssize) { if (sz > (*ppHead)->ssize) {
void* tmpPtr = realloc(*ppHead, sz); void* tmpPtr = realloc(*ppHead, sz);
if (tmpPtr == NULL) { if (tmpPtr == NULL) {
...@@ -272,53 +349,53 @@ int tqSerializeGroupHandle(const STqGroupHandle* gHandle, STqSerializedHead** pp ...@@ -272,53 +349,53 @@ int tqSerializeGroupHandle(const STqGroupHandle* gHandle, STqSerializedHead** pp
} }
void* ptr = (*ppHead)->content; void* ptr = (*ppHead)->content;
// do serialization // do serialization
*(int64_t*)ptr = gHandle->cId; *(int64_t*)ptr = pGroup->clientId;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int64_t*)ptr = gHandle->cgId; *(int64_t*)ptr = pGroup->cgId;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int32_t*)ptr = gHandle->topicNum; *(int32_t*)ptr = pGroup->topicNum;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
if (gHandle->topicNum > 0) { if (pGroup->topicNum > 0) {
tqSerializeListHandle(gHandle->head, ptr); tqSerializeListHandle(pGroup->head, ptr);
} }
return 0; return 0;
} }
void* tqSerializeListHandle(STqListHandle* listHandle, void* ptr) { void* tqSerializeListHandle(STqList* listHandle, void* ptr) {
STqListHandle* node = listHandle; STqList* node = listHandle;
ASSERT(node != NULL); ASSERT(node != NULL);
while (node) { while (node) {
ptr = tqSerializeBufHandle(&node->bufHandle, ptr); ptr = tqSerializeTopic(&node->topic, ptr);
node = node->next; node = node->next;
} }
return ptr; return ptr;
} }
void* tqSerializeBufHandle(STqBufferHandle* bufHandle, void* ptr) { void* tqSerializeTopic(STqTopic* pTopic, void* ptr) {
*(int64_t*)ptr = bufHandle->nextConsumeOffset; *(int64_t*)ptr = pTopic->nextConsumeOffset;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int64_t*)ptr = bufHandle->topicId; *(int64_t*)ptr = pTopic->topicId;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int32_t*)ptr = bufHandle->head; *(int32_t*)ptr = pTopic->head;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
*(int32_t*)ptr = bufHandle->tail; *(int32_t*)ptr = pTopic->tail;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
for (int i = 0; i < TQ_BUFFER_SIZE; i++) { for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
ptr = tqSerializeBufItem(&bufHandle->buffer[i], ptr); ptr = tqSerializeItem(&pTopic->buffer[i], ptr);
} }
return ptr; return ptr;
} }
void* tqSerializeBufItem(STqBufferItem* bufItem, void* ptr) { void* tqSerializeItem(STqMsgItem* bufItem, void* ptr) {
// TODO: do we need serialize this? // TODO: do we need serialize this?
// mainly for executor // mainly for executor
return ptr; return ptr;
} }
const void* tqDeserializeGroupHandle(const STqSerializedHead* pHead, STqGroupHandle** ppGHandle) { const void* tqDeserializeGroup(const STqSerializedHead* pHead, STqGroup** ppGroup) {
STqGroupHandle* gHandle = *ppGHandle; STqGroup* gHandle = *ppGroup;
const void* ptr = pHead->content; const void* ptr = pHead->content;
gHandle->cId = *(int64_t*)ptr; gHandle->clientId = *(int64_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
gHandle->cgId = *(int64_t*)ptr; gHandle->cgId = *(int64_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
...@@ -326,63 +403,63 @@ const void* tqDeserializeGroupHandle(const STqSerializedHead* pHead, STqGroupHan ...@@ -326,63 +403,63 @@ const void* tqDeserializeGroupHandle(const STqSerializedHead* pHead, STqGroupHan
gHandle->topicNum = *(int32_t*)ptr; gHandle->topicNum = *(int32_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
gHandle->head = NULL; gHandle->head = NULL;
STqListHandle* node = gHandle->head; STqList* node = gHandle->head;
for (int i = 0; i < gHandle->topicNum; i++) { for (int i = 0; i < gHandle->topicNum; i++) {
if (gHandle->head == NULL) { if (gHandle->head == NULL) {
if ((node = malloc(sizeof(STqListHandle))) == NULL) { if ((node = malloc(sizeof(STqList))) == NULL) {
// TODO: error // TODO: error
return NULL; return NULL;
} }
node->next = NULL; node->next = NULL;
ptr = tqDeserializeBufHandle(ptr, &node->bufHandle); ptr = tqDeserializeTopic(ptr, &node->topic);
gHandle->head = node; gHandle->head = node;
} else { } else {
node->next = malloc(sizeof(STqListHandle)); node->next = malloc(sizeof(STqList));
if (node->next == NULL) { if (node->next == NULL) {
// TODO: error // TODO: error
return NULL; return NULL;
} }
node->next->next = NULL; node->next->next = NULL;
ptr = tqDeserializeBufHandle(ptr, &node->next->bufHandle); ptr = tqDeserializeTopic(ptr, &node->next->topic);
node = node->next; node = node->next;
} }
} }
return ptr; return ptr;
} }
const void* tqDeserializeBufHandle(const void* pBytes, STqBufferHandle* bufHandle) { const void* tqDeserializeTopic(const void* pBytes, STqTopic* topic) {
const void* ptr = pBytes; const void* ptr = pBytes;
bufHandle->nextConsumeOffset = *(int64_t*)ptr; topic->nextConsumeOffset = *(int64_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
bufHandle->topicId = *(int64_t*)ptr; topic->topicId = *(int64_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
bufHandle->head = *(int32_t*)ptr; topic->head = *(int32_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
bufHandle->tail = *(int32_t*)ptr; topic->tail = *(int32_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
for (int i = 0; i < TQ_BUFFER_SIZE; i++) { for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
ptr = tqDeserializeBufItem(ptr, &bufHandle->buffer[i]); ptr = tqDeserializeItem(ptr, &topic->buffer[i]);
} }
return ptr; return ptr;
} }
const void* tqDeserializeBufItem(const void* pBytes, STqBufferItem* bufItem) { return pBytes; } const void* tqDeserializeItem(const void* pBytes, STqMsgItem* bufItem) { return pBytes; }
// TODO: make this a macro // TODO: make this a macro
int tqGetgHandleSSize(const STqGroupHandle* gHandle) { int tqGroupSSize(const STqGroup* gHandle) {
return sizeof(int64_t) * 2 // cId + cgId return sizeof(int64_t) * 2 // cId + cgId
+ sizeof(int32_t) // topicNum + sizeof(int32_t) // topicNum
+ gHandle->topicNum * tqBufHandleSSize(); + gHandle->topicNum * tqTopicSSize();
} }
// TODO: make this a macro // TODO: make this a macro
int tqBufHandleSSize() { int tqTopicSSize() {
return sizeof(int64_t) * 2 // nextConsumeOffset + topicId return sizeof(int64_t) * 2 // nextConsumeOffset + topicId
+ sizeof(int32_t) * 2 // head + tail + sizeof(int32_t) * 2 // head + tail
+ TQ_BUFFER_SIZE * tqBufItemSSize(); + TQ_BUFFER_SIZE * tqItemSSize();
} }
int tqBufItemSSize() { int tqItemSSize() {
// TODO: do this need serialization? // TODO: do this need serialization?
// mainly for executor // mainly for executor
return 0; return 0;
......
...@@ -56,6 +56,7 @@ static inline int tqReadLastPage(int fd, STqIdxPageBuf* pBuf) { ...@@ -56,6 +56,7 @@ static inline int tqReadLastPage(int fd, STqIdxPageBuf* pBuf) {
int offset = tqSeekLastPage(fd); int offset = tqSeekLastPage(fd);
int nBytes; int nBytes;
if ((nBytes = read(fd, pBuf, TQ_PAGE_SIZE)) == -1) { if ((nBytes = read(fd, pBuf, TQ_PAGE_SIZE)) == -1) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
if (nBytes == 0) { if (nBytes == 0) {
...@@ -71,7 +72,7 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial ...@@ -71,7 +72,7 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial
int32_t tqConfigFlag) { int32_t tqConfigFlag) {
STqMetaStore* pMeta = malloc(sizeof(STqMetaStore)); STqMetaStore* pMeta = malloc(sizeof(STqMetaStore));
if (pMeta == NULL) { if (pMeta == NULL) {
// close terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return NULL; return NULL;
} }
memset(pMeta, 0, sizeof(STqMetaStore)); memset(pMeta, 0, sizeof(STqMetaStore));
...@@ -79,8 +80,9 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial ...@@ -79,8 +80,9 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial
// concat data file name and index file name // concat data file name and index file name
size_t pathLen = strlen(path); size_t pathLen = strlen(path);
pMeta->dirPath = malloc(pathLen + 1); pMeta->dirPath = malloc(pathLen + 1);
if (pMeta->dirPath != NULL) { if (pMeta->dirPath == NULL) {
// TODO: memory insufficient terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return NULL;
} }
strcpy(pMeta->dirPath, path); strcpy(pMeta->dirPath, path);
...@@ -88,13 +90,14 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial ...@@ -88,13 +90,14 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial
strcpy(name, path); strcpy(name, path);
if (taosDirExist(name) != 0 && taosMkDir(name) != 0) { if (taosDirExist(name) != 0 && taosMkDir(name) != 0) {
ASSERT(false); terrno = TSDB_CODE_TQ_FAILED_TO_CREATE_DIR;
tqError("failed to create dir:%s since %s ", name, terrstr());
} }
strcat(name, "/" TQ_IDX_NAME); strcat(name, "/" TQ_IDX_NAME);
int idxFd = open(name, O_RDWR | O_CREAT, 0755); int idxFd = open(name, O_RDWR | O_CREAT, 0755);
if (idxFd < 0) { if (idxFd < 0) {
ASSERT(false); terrno = TAOS_SYSTEM_ERROR(errno);
// close file tqError("failed to open file:%s since %s ", name, terrstr());
// free memory // free memory
return NULL; return NULL;
} }
...@@ -102,9 +105,7 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial ...@@ -102,9 +105,7 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial
pMeta->idxFd = idxFd; pMeta->idxFd = idxFd;
pMeta->unpersistHead = malloc(sizeof(STqMetaList)); pMeta->unpersistHead = malloc(sizeof(STqMetaList));
if (pMeta->unpersistHead == NULL) { if (pMeta->unpersistHead == NULL) {
ASSERT(false); terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
// close file
// free memory
return NULL; return NULL;
} }
memset(pMeta->unpersistHead, 0, sizeof(STqMetaList)); memset(pMeta->unpersistHead, 0, sizeof(STqMetaList));
...@@ -114,7 +115,8 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial ...@@ -114,7 +115,8 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial
strcat(name, "/" TQ_META_NAME); strcat(name, "/" TQ_META_NAME);
int fileFd = open(name, O_RDWR | O_CREAT, 0755); int fileFd = open(name, O_RDWR | O_CREAT, 0755);
if (fileFd < 0) { if (fileFd < 0) {
ASSERT(false); terrno = TAOS_SYSTEM_ERROR(errno);
tqError("failed to open file:%s since %s", name, terrstr());
return NULL; return NULL;
} }
...@@ -129,7 +131,7 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial ...@@ -129,7 +131,7 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial
STqIdxPageBuf idxBuf; STqIdxPageBuf idxBuf;
STqSerializedHead* serializedObj = malloc(TQ_PAGE_SIZE); STqSerializedHead* serializedObj = malloc(TQ_PAGE_SIZE);
if (serializedObj == NULL) { if (serializedObj == NULL) {
// TODO:memory insufficient terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
} }
int idxRead; int idxRead;
int allocated = TQ_PAGE_SIZE; int allocated = TQ_PAGE_SIZE;
...@@ -137,14 +139,16 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial ...@@ -137,14 +139,16 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial
while ((idxRead = read(idxFd, &idxBuf, TQ_PAGE_SIZE))) { while ((idxRead = read(idxFd, &idxBuf, TQ_PAGE_SIZE))) {
if (idxRead == -1) { if (idxRead == -1) {
// TODO: handle error // TODO: handle error
ASSERT(false); terrno = TAOS_SYSTEM_ERROR(errno);
tqError("failed to read tq index file since %s", terrstr());
} }
ASSERT(idxBuf.head.writeOffset == idxRead); ASSERT(idxBuf.head.writeOffset == idxRead);
// loop read every entry // loop read every entry
for (int i = 0; i < idxBuf.head.writeOffset - TQ_IDX_PAGE_HEAD_SIZE; i += TQ_IDX_SIZE) { for (int i = 0; i < idxBuf.head.writeOffset - TQ_IDX_PAGE_HEAD_SIZE; i += TQ_IDX_SIZE) {
STqMetaList* pNode = malloc(sizeof(STqMetaList)); STqMetaList* pNode = malloc(sizeof(STqMetaList));
if (pNode == NULL) { if (pNode == NULL) {
// TODO: free memory and return error terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
// TODO: free memory
} }
memset(pNode, 0, sizeof(STqMetaList)); memset(pNode, 0, sizeof(STqMetaList));
memcpy(&pNode->handle, &idxBuf.buffer[i], TQ_IDX_SIZE); memcpy(&pNode->handle, &idxBuf.buffer[i], TQ_IDX_SIZE);
...@@ -153,7 +157,8 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial ...@@ -153,7 +157,8 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial
if (allocated < pNode->handle.serializedSize) { if (allocated < pNode->handle.serializedSize) {
void* ptr = realloc(serializedObj, pNode->handle.serializedSize); void* ptr = realloc(serializedObj, pNode->handle.serializedSize);
if (ptr == NULL) { if (ptr == NULL) {
// TODO: memory insufficient terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
// TODO: free memory
} }
serializedObj = ptr; serializedObj = ptr;
allocated = pNode->handle.serializedSize; allocated = pNode->handle.serializedSize;
...@@ -292,7 +297,7 @@ int32_t tqStorePersist(STqMetaStore* pMeta) { ...@@ -292,7 +297,7 @@ int32_t tqStorePersist(STqMetaStore* pMeta) {
STqMetaList* pNode = pHead->unpersistNext; STqMetaList* pNode = pHead->unpersistNext;
STqSerializedHead* pSHead = malloc(sizeof(STqSerializedHead)); STqSerializedHead* pSHead = malloc(sizeof(STqSerializedHead));
if (pSHead == NULL) { if (pSHead == NULL) {
// TODO: memory error terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return -1; return -1;
} }
pSHead->ver = TQ_SVER; pSHead->ver = TQ_SVER;
...@@ -403,7 +408,6 @@ static int32_t tqHandlePutCommitted(STqMetaStore* pMeta, int64_t key, void* valu ...@@ -403,7 +408,6 @@ static int32_t tqHandlePutCommitted(STqMetaStore* pMeta, int64_t key, void* valu
STqMetaList* pNode = pMeta->bucket[bucketKey]; STqMetaList* pNode = pMeta->bucket[bucketKey];
while (pNode) { while (pNode) {
if (pNode->handle.key == key) { if (pNode->handle.key == key) {
// TODO: think about thread safety
if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
pMeta->pDeleter(pNode->handle.valueInUse); pMeta->pDeleter(pNode->handle.valueInUse);
} }
...@@ -416,7 +420,7 @@ static int32_t tqHandlePutCommitted(STqMetaStore* pMeta, int64_t key, void* valu ...@@ -416,7 +420,7 @@ static int32_t tqHandlePutCommitted(STqMetaStore* pMeta, int64_t key, void* valu
} }
STqMetaList* pNewNode = malloc(sizeof(STqMetaList)); STqMetaList* pNewNode = malloc(sizeof(STqMetaList));
if (pNewNode == NULL) { if (pNewNode == NULL) {
// TODO: memory error terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return -1; return -1;
} }
memset(pNewNode, 0, sizeof(STqMetaList)); memset(pNewNode, 0, sizeof(STqMetaList));
...@@ -470,10 +474,10 @@ static inline int32_t tqHandlePutImpl(STqMetaStore* pMeta, int64_t key, void* va ...@@ -470,10 +474,10 @@ static inline int32_t tqHandlePutImpl(STqMetaStore* pMeta, int64_t key, void* va
STqMetaList* pNode = pMeta->bucket[bucketKey]; STqMetaList* pNode = pMeta->bucket[bucketKey];
while (pNode) { while (pNode) {
if (pNode->handle.key == key) { if (pNode->handle.key == key) {
// TODO: think about thread safety
if (pNode->handle.valueInTxn) { if (pNode->handle.valueInTxn) {
if (TqDupIntxnReject(pMeta->tqConfigFlag)) { if (tqDupIntxnReject(pMeta->tqConfigFlag)) {
return -2; terrno = TSDB_CODE_TQ_META_KEY_DUP_IN_TXN;
return -1;
} }
if (pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { if (pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
pMeta->pDeleter(pNode->handle.valueInTxn); pMeta->pDeleter(pNode->handle.valueInTxn);
...@@ -488,7 +492,7 @@ static inline int32_t tqHandlePutImpl(STqMetaStore* pMeta, int64_t key, void* va ...@@ -488,7 +492,7 @@ static inline int32_t tqHandlePutImpl(STqMetaStore* pMeta, int64_t key, void* va
} }
STqMetaList* pNewNode = malloc(sizeof(STqMetaList)); STqMetaList* pNewNode = malloc(sizeof(STqMetaList));
if (pNewNode == NULL) { if (pNewNode == NULL) {
// TODO: memory error terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return -1; return -1;
} }
memset(pNewNode, 0, sizeof(STqMetaList)); memset(pNewNode, 0, sizeof(STqMetaList));
...@@ -505,7 +509,7 @@ int32_t tqHandleMovePut(STqMetaStore* pMeta, int64_t key, void* value) { return ...@@ -505,7 +509,7 @@ int32_t tqHandleMovePut(STqMetaStore* pMeta, int64_t key, void* value) { return
int32_t tqHandleCopyPut(STqMetaStore* pMeta, int64_t key, void* value, size_t vsize) { int32_t tqHandleCopyPut(STqMetaStore* pMeta, int64_t key, void* value, size_t vsize) {
void* vmem = malloc(vsize); void* vmem = malloc(vsize);
if (vmem == NULL) { if (vmem == NULL) {
// TODO: memory error terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return -1; return -1;
} }
memcpy(vmem, value, vsize); memcpy(vmem, value, vsize);
...@@ -535,6 +539,7 @@ int32_t tqHandleCommit(STqMetaStore* pMeta, int64_t key) { ...@@ -535,6 +539,7 @@ int32_t tqHandleCommit(STqMetaStore* pMeta, int64_t key) {
while (pNode) { while (pNode) {
if (pNode->handle.key == key) { if (pNode->handle.key == key) {
if (pNode->handle.valueInTxn == NULL) { if (pNode->handle.valueInTxn == NULL) {
terrno = TSDB_CODE_TQ_META_KEY_NOT_IN_TXN;
return -1; return -1;
} }
if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
...@@ -548,7 +553,8 @@ int32_t tqHandleCommit(STqMetaStore* pMeta, int64_t key) { ...@@ -548,7 +553,8 @@ int32_t tqHandleCommit(STqMetaStore* pMeta, int64_t key) {
pNode = pNode->next; pNode = pNode->next;
} }
} }
return -2; terrno = TSDB_CODE_TQ_META_NO_SUCH_KEY;
return -1;
} }
int32_t tqHandleAbort(STqMetaStore* pMeta, int64_t key) { int32_t tqHandleAbort(STqMetaStore* pMeta, int64_t key) {
...@@ -564,12 +570,14 @@ int32_t tqHandleAbort(STqMetaStore* pMeta, int64_t key) { ...@@ -564,12 +570,14 @@ int32_t tqHandleAbort(STqMetaStore* pMeta, int64_t key) {
tqLinkUnpersist(pMeta, pNode); tqLinkUnpersist(pMeta, pNode);
return 0; return 0;
} }
terrno = TSDB_CODE_TQ_META_KEY_NOT_IN_TXN;
return -1; return -1;
} else { } else {
pNode = pNode->next; pNode = pNode->next;
} }
} }
return -2; terrno = TSDB_CODE_TQ_META_NO_SUCH_KEY;
return -1;
} }
int32_t tqHandleDel(STqMetaStore* pMeta, int64_t key) { int32_t tqHandleDel(STqMetaStore* pMeta, int64_t key) {
...@@ -588,7 +596,7 @@ int32_t tqHandleDel(STqMetaStore* pMeta, int64_t key) { ...@@ -588,7 +596,7 @@ int32_t tqHandleDel(STqMetaStore* pMeta, int64_t key) {
pNode = pNode->next; pNode = pNode->next;
} }
} }
// no such key terrno = TSDB_CODE_TQ_META_NO_SUCH_KEY;
return -1; return -1;
} }
......
...@@ -10,8 +10,8 @@ struct Foo { ...@@ -10,8 +10,8 @@ struct Foo {
}; };
int FooSerializer(const void* pObj, STqSerializedHead** ppHead) { int FooSerializer(const void* pObj, STqSerializedHead** ppHead) {
Foo* foo = (Foo*) pObj; Foo* foo = (Foo*)pObj;
if((*ppHead) == NULL || (*ppHead)->ssize < sizeof(STqSerializedHead) + sizeof(int32_t)) { if ((*ppHead) == NULL || (*ppHead)->ssize < sizeof(STqSerializedHead) + sizeof(int32_t)) {
*ppHead = (STqSerializedHead*)realloc(*ppHead, sizeof(STqSerializedHead) + sizeof(int32_t)); *ppHead = (STqSerializedHead*)realloc(*ppHead, sizeof(STqSerializedHead) + sizeof(int32_t));
(*ppHead)->ssize = sizeof(STqSerializedHead) + sizeof(int32_t); (*ppHead)->ssize = sizeof(STqSerializedHead) + sizeof(int32_t);
} }
...@@ -20,36 +20,28 @@ int FooSerializer(const void* pObj, STqSerializedHead** ppHead) { ...@@ -20,36 +20,28 @@ int FooSerializer(const void* pObj, STqSerializedHead** ppHead) {
} }
const void* FooDeserializer(const STqSerializedHead* pHead, void** ppObj) { const void* FooDeserializer(const STqSerializedHead* pHead, void** ppObj) {
if(*ppObj == NULL) { if (*ppObj == NULL) {
*ppObj = realloc(*ppObj, sizeof(int32_t)); *ppObj = realloc(*ppObj, sizeof(int32_t));
} }
Foo* pFoo = *(Foo**)ppObj; Foo* pFoo = *(Foo**)ppObj;
pFoo->a = *(int32_t*)pHead->content; pFoo->a = *(int32_t*)pHead->content;
return NULL; return NULL;
} }
void FooDeleter(void* pObj) { void FooDeleter(void* pObj) { free(pObj); }
free(pObj);
}
class TqMetaUpdateAppendTest : public ::testing::Test { class TqMetaUpdateAppendTest : public ::testing::Test {
protected: protected:
void SetUp() override {
void SetUp() override { taosRemoveDir(pathName);
taosRemoveDir(pathName); pMeta = tqStoreOpen(pathName, FooSerializer, FooDeserializer, FooDeleter, TQ_UPDATE_APPEND);
pMeta = tqStoreOpen(pathName, ASSERT(pMeta);
FooSerializer, FooDeserializer, FooDeleter, }
TQ_UPDATE_APPEND
); void TearDown() override { tqStoreClose(pMeta); }
ASSERT(pMeta);
} STqMetaStore* pMeta;
const char* pathName = "/tmp/tq_test";
void TearDown() override {
tqStoreClose(pMeta);
}
TqMetaStore* pMeta;
const char* pathName = "/tmp/tq_test";
}; };
TEST_F(TqMetaUpdateAppendTest, copyPutTest) { TEST_F(TqMetaUpdateAppendTest, copyPutTest) {
...@@ -57,11 +49,11 @@ TEST_F(TqMetaUpdateAppendTest, copyPutTest) { ...@@ -57,11 +49,11 @@ TEST_F(TqMetaUpdateAppendTest, copyPutTest) {
foo.a = 3; foo.a = 3;
tqHandleCopyPut(pMeta, 1, &foo, sizeof(Foo)); tqHandleCopyPut(pMeta, 1, &foo, sizeof(Foo));
Foo* pFoo = (Foo*) tqHandleGet(pMeta, 1); Foo* pFoo = (Foo*)tqHandleGet(pMeta, 1);
EXPECT_EQ(pFoo == NULL, true); EXPECT_EQ(pFoo == NULL, true);
tqHandleCommit(pMeta, 1); tqHandleCommit(pMeta, 1);
pFoo = (Foo*) tqHandleGet(pMeta, 1); pFoo = (Foo*)tqHandleGet(pMeta, 1);
EXPECT_EQ(pFoo->a, 3); EXPECT_EQ(pFoo->a, 3);
} }
...@@ -78,10 +70,7 @@ TEST_F(TqMetaUpdateAppendTest, persistTest) { ...@@ -78,10 +70,7 @@ TEST_F(TqMetaUpdateAppendTest, persistTest) {
EXPECT_EQ(pBar == NULL, true); EXPECT_EQ(pBar == NULL, true);
tqStoreClose(pMeta); tqStoreClose(pMeta);
pMeta = tqStoreOpen(pathName, pMeta = tqStoreOpen(pathName, FooSerializer, FooDeserializer, FooDeleter, TQ_UPDATE_APPEND);
FooSerializer, FooDeserializer, FooDeleter,
TQ_UPDATE_APPEND
);
ASSERT(pMeta); ASSERT(pMeta);
pBar = (Foo*)tqHandleGet(pMeta, 1); pBar = (Foo*)tqHandleGet(pMeta, 1);
...@@ -97,7 +86,7 @@ TEST_F(TqMetaUpdateAppendTest, uncommittedTest) { ...@@ -97,7 +86,7 @@ TEST_F(TqMetaUpdateAppendTest, uncommittedTest) {
pFoo->a = 3; pFoo->a = 3;
tqHandleMovePut(pMeta, 1, pFoo); tqHandleMovePut(pMeta, 1, pFoo);
pFoo = (Foo*) tqHandleGet(pMeta, 1); pFoo = (Foo*)tqHandleGet(pMeta, 1);
EXPECT_EQ(pFoo == NULL, true); EXPECT_EQ(pFoo == NULL, true);
} }
...@@ -106,11 +95,11 @@ TEST_F(TqMetaUpdateAppendTest, abortTest) { ...@@ -106,11 +95,11 @@ TEST_F(TqMetaUpdateAppendTest, abortTest) {
pFoo->a = 3; pFoo->a = 3;
tqHandleMovePut(pMeta, 1, pFoo); tqHandleMovePut(pMeta, 1, pFoo);
pFoo = (Foo*) tqHandleGet(pMeta, 1); pFoo = (Foo*)tqHandleGet(pMeta, 1);
EXPECT_EQ(pFoo == NULL, true); EXPECT_EQ(pFoo == NULL, true);
tqHandleAbort(pMeta, 1); tqHandleAbort(pMeta, 1);
pFoo = (Foo*) tqHandleGet(pMeta, 1); pFoo = (Foo*)tqHandleGet(pMeta, 1);
EXPECT_EQ(pFoo == NULL, true); EXPECT_EQ(pFoo == NULL, true);
} }
...@@ -119,32 +108,29 @@ TEST_F(TqMetaUpdateAppendTest, deleteTest) { ...@@ -119,32 +108,29 @@ TEST_F(TqMetaUpdateAppendTest, deleteTest) {
pFoo->a = 3; pFoo->a = 3;
tqHandleMovePut(pMeta, 1, pFoo); tqHandleMovePut(pMeta, 1, pFoo);
pFoo = (Foo*) tqHandleGet(pMeta, 1); pFoo = (Foo*)tqHandleGet(pMeta, 1);
EXPECT_EQ(pFoo == NULL, true); EXPECT_EQ(pFoo == NULL, true);
tqHandleCommit(pMeta, 1); tqHandleCommit(pMeta, 1);
pFoo = (Foo*) tqHandleGet(pMeta, 1); pFoo = (Foo*)tqHandleGet(pMeta, 1);
ASSERT_EQ(pFoo != NULL, true); ASSERT_EQ(pFoo != NULL, true);
EXPECT_EQ(pFoo->a, 3); EXPECT_EQ(pFoo->a, 3);
tqHandleDel(pMeta, 1); tqHandleDel(pMeta, 1);
pFoo = (Foo*) tqHandleGet(pMeta, 1); pFoo = (Foo*)tqHandleGet(pMeta, 1);
ASSERT_EQ(pFoo != NULL, true); ASSERT_EQ(pFoo != NULL, true);
EXPECT_EQ(pFoo->a, 3); EXPECT_EQ(pFoo->a, 3);
tqHandleCommit(pMeta, 1); tqHandleCommit(pMeta, 1);
pFoo = (Foo*) tqHandleGet(pMeta, 1); pFoo = (Foo*)tqHandleGet(pMeta, 1);
EXPECT_EQ(pFoo == NULL, true); EXPECT_EQ(pFoo == NULL, true);
tqStoreClose(pMeta); tqStoreClose(pMeta);
pMeta = tqStoreOpen(pathName, pMeta = tqStoreOpen(pathName, FooSerializer, FooDeserializer, FooDeleter, TQ_UPDATE_APPEND);
FooSerializer, FooDeserializer, FooDeleter,
TQ_UPDATE_APPEND
);
ASSERT(pMeta); ASSERT(pMeta);
pFoo = (Foo*) tqHandleGet(pMeta, 1); pFoo = (Foo*)tqHandleGet(pMeta, 1);
EXPECT_EQ(pFoo == NULL, true); EXPECT_EQ(pFoo == NULL, true);
} }
...@@ -162,10 +148,7 @@ TEST_F(TqMetaUpdateAppendTest, intxnPersist) { ...@@ -162,10 +148,7 @@ TEST_F(TqMetaUpdateAppendTest, intxnPersist) {
EXPECT_EQ(pFoo1->a, 3); EXPECT_EQ(pFoo1->a, 3);
tqStoreClose(pMeta); tqStoreClose(pMeta);
pMeta = tqStoreOpen(pathName, pMeta = tqStoreOpen(pathName, FooSerializer, FooDeserializer, FooDeleter, TQ_UPDATE_APPEND);
FooSerializer, FooDeserializer, FooDeleter,
TQ_UPDATE_APPEND
);
ASSERT(pMeta); ASSERT(pMeta);
pFoo1 = (Foo*)tqHandleGet(pMeta, 1); pFoo1 = (Foo*)tqHandleGet(pMeta, 1);
...@@ -177,10 +160,7 @@ TEST_F(TqMetaUpdateAppendTest, intxnPersist) { ...@@ -177,10 +160,7 @@ TEST_F(TqMetaUpdateAppendTest, intxnPersist) {
EXPECT_EQ(pFoo1->a, 4); EXPECT_EQ(pFoo1->a, 4);
tqStoreClose(pMeta); tqStoreClose(pMeta);
pMeta = tqStoreOpen(pathName, pMeta = tqStoreOpen(pathName, FooSerializer, FooDeserializer, FooDeleter, TQ_UPDATE_APPEND);
FooSerializer, FooDeserializer, FooDeleter,
TQ_UPDATE_APPEND
);
ASSERT(pMeta); ASSERT(pMeta);
pFoo1 = (Foo*)tqHandleGet(pMeta, 1); pFoo1 = (Foo*)tqHandleGet(pMeta, 1);
...@@ -190,13 +170,13 @@ TEST_F(TqMetaUpdateAppendTest, intxnPersist) { ...@@ -190,13 +170,13 @@ TEST_F(TqMetaUpdateAppendTest, intxnPersist) {
TEST_F(TqMetaUpdateAppendTest, multiplePage) { TEST_F(TqMetaUpdateAppendTest, multiplePage) {
srand(0); srand(0);
std::vector<int> v; std::vector<int> v;
for(int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
v.push_back(rand()); v.push_back(rand());
Foo foo; Foo foo;
foo.a = v[i]; foo.a = v[i];
tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo));
} }
for(int i = 0; i < 500; i++) { for (int i = 0; i < 500; i++) {
tqHandleCommit(pMeta, i); tqHandleCommit(pMeta, i);
Foo* pFoo = (Foo*)tqHandleGet(pMeta, i); Foo* pFoo = (Foo*)tqHandleGet(pMeta, i);
ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n"; ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n";
...@@ -204,38 +184,34 @@ TEST_F(TqMetaUpdateAppendTest, multiplePage) { ...@@ -204,38 +184,34 @@ TEST_F(TqMetaUpdateAppendTest, multiplePage) {
} }
tqStoreClose(pMeta); tqStoreClose(pMeta);
pMeta = tqStoreOpen(pathName, pMeta = tqStoreOpen(pathName, FooSerializer, FooDeserializer, FooDeleter, TQ_UPDATE_APPEND);
FooSerializer, FooDeserializer, FooDeleter,
TQ_UPDATE_APPEND
);
ASSERT(pMeta); ASSERT(pMeta);
for(int i = 500; i < 1000; i++) { for (int i = 500; i < 1000; i++) {
tqHandleCommit(pMeta, i); tqHandleCommit(pMeta, i);
Foo* pFoo = (Foo*)tqHandleGet(pMeta, i); Foo* pFoo = (Foo*)tqHandleGet(pMeta, i);
ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n"; ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n";
EXPECT_EQ(pFoo->a, v[i]); EXPECT_EQ(pFoo->a, v[i]);
} }
for(int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
Foo* pFoo = (Foo*)tqHandleGet(pMeta, i); Foo* pFoo = (Foo*)tqHandleGet(pMeta, i);
ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n"; ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n";
EXPECT_EQ(pFoo->a, v[i]); EXPECT_EQ(pFoo->a, v[i]);
} }
} }
TEST_F(TqMetaUpdateAppendTest, multipleRewrite) { TEST_F(TqMetaUpdateAppendTest, multipleRewrite) {
srand(0); srand(0);
std::vector<int> v; std::vector<int> v;
for(int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
v.push_back(rand()); v.push_back(rand());
Foo foo; Foo foo;
foo.a = v[i]; foo.a = v[i];
tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo));
} }
for(int i = 0; i < 500; i++) { for (int i = 0; i < 500; i++) {
tqHandleCommit(pMeta, i); tqHandleCommit(pMeta, i);
v[i] = rand(); v[i] = rand();
Foo foo; Foo foo;
...@@ -243,25 +219,22 @@ TEST_F(TqMetaUpdateAppendTest, multipleRewrite) { ...@@ -243,25 +219,22 @@ TEST_F(TqMetaUpdateAppendTest, multipleRewrite) {
tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo));
} }
for(int i = 500; i < 1000; i++) { for (int i = 500; i < 1000; i++) {
v[i] = rand(); v[i] = rand();
Foo foo; Foo foo;
foo.a = v[i]; foo.a = v[i];
tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo));
} }
for(int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
tqHandleCommit(pMeta, i); tqHandleCommit(pMeta, i);
} }
tqStoreClose(pMeta); tqStoreClose(pMeta);
pMeta = tqStoreOpen(pathName, pMeta = tqStoreOpen(pathName, FooSerializer, FooDeserializer, FooDeleter, TQ_UPDATE_APPEND);
FooSerializer, FooDeserializer, FooDeleter,
TQ_UPDATE_APPEND
);
ASSERT(pMeta); ASSERT(pMeta);
for(int i = 500; i < 1000; i++) { for (int i = 500; i < 1000; i++) {
v[i] = rand(); v[i] = rand();
Foo foo; Foo foo;
foo.a = v[i]; foo.a = v[i];
...@@ -269,40 +242,38 @@ TEST_F(TqMetaUpdateAppendTest, multipleRewrite) { ...@@ -269,40 +242,38 @@ TEST_F(TqMetaUpdateAppendTest, multipleRewrite) {
tqHandleCommit(pMeta, i); tqHandleCommit(pMeta, i);
} }
for(int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
Foo* pFoo = (Foo*)tqHandleGet(pMeta, i); Foo* pFoo = (Foo*)tqHandleGet(pMeta, i);
ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n"; ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n";
EXPECT_EQ(pFoo->a, v[i]); EXPECT_EQ(pFoo->a, v[i]);
} }
} }
TEST_F(TqMetaUpdateAppendTest, dupCommit) { TEST_F(TqMetaUpdateAppendTest, dupCommit) {
srand(0); srand(0);
std::vector<int> v; std::vector<int> v;
for(int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
v.push_back(rand()); v.push_back(rand());
Foo foo; Foo foo;
foo.a = v[i]; foo.a = v[i];
tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo));
} }
for(int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
int ret = tqHandleCommit(pMeta, i); int ret = tqHandleCommit(pMeta, i);
EXPECT_EQ(ret, 0); EXPECT_EQ(ret, 0);
ret = tqHandleCommit(pMeta, i); ret = tqHandleCommit(pMeta, i);
EXPECT_EQ(ret, -1); EXPECT_EQ(ret, -1);
} }
for(int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
int ret = tqHandleCommit(pMeta, i); int ret = tqHandleCommit(pMeta, i);
EXPECT_EQ(ret, -1); EXPECT_EQ(ret, -1);
} }
for(int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
Foo* pFoo = (Foo*)tqHandleGet(pMeta, i); Foo* pFoo = (Foo*)tqHandleGet(pMeta, i);
ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n"; ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n";
EXPECT_EQ(pFoo->a, v[i]); EXPECT_EQ(pFoo->a, v[i]);
} }
} }
...@@ -14,21 +14,21 @@ ...@@ -14,21 +14,21 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h"
#include "ulog.h"
#include "tlog.h" #include "tlog.h"
#include "os.h"
#include "tnote.h" #include "tnote.h"
#include "tutil.h" #include "tutil.h"
#include "ulog.h"
#include "zlib.h" #include "zlib.h"
#define MAX_LOGLINE_SIZE (1000) #define MAX_LOGLINE_SIZE (1000)
#define MAX_LOGLINE_BUFFER_SIZE (MAX_LOGLINE_SIZE + 10) #define MAX_LOGLINE_BUFFER_SIZE (MAX_LOGLINE_SIZE + 10)
#define MAX_LOGLINE_CONTENT_SIZE (MAX_LOGLINE_SIZE - 100) #define MAX_LOGLINE_CONTENT_SIZE (MAX_LOGLINE_SIZE - 100)
#define MAX_LOGLINE_DUMP_SIZE (65 * 1024) #define MAX_LOGLINE_DUMP_SIZE (65 * 1024)
#define MAX_LOGLINE_DUMP_BUFFER_SIZE (MAX_LOGLINE_DUMP_SIZE + 10) #define MAX_LOGLINE_DUMP_BUFFER_SIZE (MAX_LOGLINE_DUMP_SIZE + 10)
#define MAX_LOGLINE_DUMP_CONTENT_SIZE (MAX_LOGLINE_DUMP_SIZE - 100) #define MAX_LOGLINE_DUMP_CONTENT_SIZE (MAX_LOGLINE_DUMP_SIZE - 100)
#define LOG_FILE_NAME_LEN 300 #define LOG_FILE_NAME_LEN 300
#define TSDB_DEFAULT_LOG_BUF_SIZE (20 * 1024 * 1024) // 20MB #define TSDB_DEFAULT_LOG_BUF_SIZE (20 * 1024 * 1024) // 20MB
#define DEFAULT_LOG_INTERVAL 25 #define DEFAULT_LOG_INTERVAL 25
...@@ -38,13 +38,13 @@ ...@@ -38,13 +38,13 @@
#define LOG_MAX_WAIT_MSEC 1000 #define LOG_MAX_WAIT_MSEC 1000
#define LOG_BUF_BUFFER(x) ((x)->buffer) #define LOG_BUF_BUFFER(x) ((x)->buffer)
#define LOG_BUF_START(x) ((x)->buffStart) #define LOG_BUF_START(x) ((x)->buffStart)
#define LOG_BUF_END(x) ((x)->buffEnd) #define LOG_BUF_END(x) ((x)->buffEnd)
#define LOG_BUF_SIZE(x) ((x)->buffSize) #define LOG_BUF_SIZE(x) ((x)->buffSize)
#define LOG_BUF_MUTEX(x) ((x)->buffMutex) #define LOG_BUF_MUTEX(x) ((x)->buffMutex)
typedef struct { typedef struct {
char * buffer; char *buffer;
int32_t buffStart; int32_t buffStart;
int32_t buffEnd; int32_t buffEnd;
int32_t buffSize; int32_t buffSize;
...@@ -57,18 +57,18 @@ typedef struct { ...@@ -57,18 +57,18 @@ typedef struct {
} SLogBuff; } SLogBuff;
typedef struct { typedef struct {
int32_t fileNum; int32_t fileNum;
int32_t maxLines; int32_t maxLines;
int32_t lines; int32_t lines;
int32_t flag; int32_t flag;
int32_t openInProgress; int32_t openInProgress;
pid_t pid; pid_t pid;
char logName[LOG_FILE_NAME_LEN]; char logName[LOG_FILE_NAME_LEN];
SLogBuff * logHandle; SLogBuff *logHandle;
pthread_mutex_t logMutex; pthread_mutex_t logMutex;
} SLogObj; } SLogObj;
int8_t tscEmbeddedInUtil = 0; int8_t tscEmbeddedInUtil = 0;
int32_t tsLogKeepDays = 0; int32_t tsLogKeepDays = 0;
int8_t tsAsyncLog = 1; int8_t tsAsyncLog = 1;
...@@ -93,19 +93,19 @@ int32_t debugFlag = 0; ...@@ -93,19 +93,19 @@ int32_t debugFlag = 0;
int32_t sDebugFlag = 135; int32_t sDebugFlag = 135;
int32_t wDebugFlag = 135; int32_t wDebugFlag = 135;
int32_t tsdbDebugFlag = 131; int32_t tsdbDebugFlag = 131;
int32_t tqDebugFlag = 131;
int32_t cqDebugFlag = 131; int32_t cqDebugFlag = 131;
int32_t fsDebugFlag = 135; int32_t fsDebugFlag = 135;
int32_t ctgDebugFlag = 131; int32_t ctgDebugFlag = 131;
int64_t dbgEmptyW = 0; int64_t dbgEmptyW = 0;
int64_t dbgWN = 0; int64_t dbgWN = 0;
int64_t dbgSmallWN = 0; int64_t dbgSmallWN = 0;
int64_t dbgBigWN = 0; int64_t dbgBigWN = 0;
int64_t dbgWSize = 0; int64_t dbgWSize = 0;
static SLogObj tsLogObj = { .fileNum = 1 }; static SLogObj tsLogObj = {.fileNum = 1};
static void * taosAsyncOutputLog(void *param); static void *taosAsyncOutputLog(void *param);
static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen); static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen);
static SLogBuff *taosLogBuffNew(int32_t bufSize); static SLogBuff *taosLogBuffNew(int32_t bufSize);
static void taosCloseLogByFd(int32_t oldFd); static void taosCloseLogByFd(int32_t oldFd);
...@@ -139,8 +139,8 @@ static void taosStopLog() { ...@@ -139,8 +139,8 @@ static void taosStopLog() {
void taosCloseLog() { void taosCloseLog() {
taosStopLog(); taosStopLog();
//tsem_post(&(tsLogObj.logHandle->buffNotEmpty)); // tsem_post(&(tsLogObj.logHandle->buffNotEmpty));
taosMsleep(MAX_LOG_INTERVAL/1000); taosMsleep(MAX_LOG_INTERVAL / 1000);
if (taosCheckPthreadValid(tsLogObj.logHandle->asyncThread)) { if (taosCheckPthreadValid(tsLogObj.logHandle->asyncThread)) {
pthread_join(tsLogObj.logHandle->asyncThread, NULL); pthread_join(tsLogObj.logHandle->asyncThread, NULL);
} }
...@@ -217,7 +217,7 @@ static void *taosThreadToOpenNewFile(void *param) { ...@@ -217,7 +217,7 @@ static void *taosThreadToOpenNewFile(void *param) {
tsLogObj.lines = 0; tsLogObj.lines = 0;
tsLogObj.openInProgress = 0; tsLogObj.openInProgress = 0;
taosCloseLogByFd(oldFd); taosCloseLogByFd(oldFd);
uInfo(" new log file:%d is opened", tsLogObj.flag); uInfo(" new log file:%d is opened", tsLogObj.flag);
uInfo("=================================="); uInfo("==================================");
taosPrintCfg(); taosPrintCfg();
...@@ -308,9 +308,9 @@ static void taosGetLogFileName(char *fn) { ...@@ -308,9 +308,9 @@ static void taosGetLogFileName(char *fn) {
static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum) { static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum) {
#ifdef WINDOWS #ifdef WINDOWS
/* /*
* always set maxFileNum to 1 * always set maxFileNum to 1
* means client log filename is unique in windows * means client log filename is unique in windows
*/ */
maxFileNum = 1; maxFileNum = 1;
#endif #endif
...@@ -381,13 +381,14 @@ static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum) { ...@@ -381,13 +381,14 @@ static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum) {
void taosPrintLog(const char *flags, int32_t dflag, const char *format, ...) { void taosPrintLog(const char *flags, int32_t dflag, const char *format, ...) {
if (tsTotalLogDirGB != 0 && tsAvailLogDirGB < tsMinimalLogDirGB) { if (tsTotalLogDirGB != 0 && tsAvailLogDirGB < tsMinimalLogDirGB) {
printf("server disk:%s space remain %.3f GB, total %.1f GB, stop print log.\n", tsLogDir, tsAvailLogDirGB, tsTotalLogDirGB); printf("server disk:%s space remain %.3f GB, total %.1f GB, stop print log.\n", tsLogDir, tsAvailLogDirGB,
tsTotalLogDirGB);
fflush(stdout); fflush(stdout);
return; return;
} }
va_list argpointer; va_list argpointer;
char buffer[MAX_LOGLINE_BUFFER_SIZE] = { 0 }; char buffer[MAX_LOGLINE_BUFFER_SIZE] = {0};
int32_t len; int32_t len;
struct tm Tm, *ptm; struct tm Tm, *ptm;
struct timeval timeSecs; struct timeval timeSecs;
...@@ -434,20 +435,20 @@ void taosPrintLog(const char *flags, int32_t dflag, const char *format, ...) { ...@@ -434,20 +435,20 @@ void taosPrintLog(const char *flags, int32_t dflag, const char *format, ...) {
} }
} }
if (dflag & DEBUG_SCREEN) if (dflag & DEBUG_SCREEN) taosWriteFile(1, buffer, (uint32_t)len);
taosWriteFile(1, buffer, (uint32_t)len);
if (dflag == 255) nInfo(buffer, len); if (dflag == 255) nInfo(buffer, len);
} }
void taosDumpData(unsigned char *msg, int32_t len) { void taosDumpData(unsigned char *msg, int32_t len) {
if (tsTotalLogDirGB != 0 && tsAvailLogDirGB < tsMinimalLogDirGB) { if (tsTotalLogDirGB != 0 && tsAvailLogDirGB < tsMinimalLogDirGB) {
printf("server disk:%s space remain %.3f GB, total %.1f GB, stop dump log.\n", tsLogDir, tsAvailLogDirGB, tsTotalLogDirGB); printf("server disk:%s space remain %.3f GB, total %.1f GB, stop dump log.\n", tsLogDir, tsAvailLogDirGB,
tsTotalLogDirGB);
fflush(stdout); fflush(stdout);
return; return;
} }
char temp[256]; char temp[256];
int32_t i, pos = 0, c = 0; int32_t i, pos = 0, c = 0;
for (i = 0; i < len; ++i) { for (i = 0; i < len; ++i) {
sprintf(temp + pos, "%02x ", msg[i]); sprintf(temp + pos, "%02x ", msg[i]);
...@@ -468,7 +469,8 @@ void taosDumpData(unsigned char *msg, int32_t len) { ...@@ -468,7 +469,8 @@ void taosDumpData(unsigned char *msg, int32_t len) {
void taosPrintLongString(const char *flags, int32_t dflag, const char *format, ...) { void taosPrintLongString(const char *flags, int32_t dflag, const char *format, ...) {
if (tsTotalLogDirGB != 0 && tsAvailLogDirGB < tsMinimalLogDirGB) { if (tsTotalLogDirGB != 0 && tsAvailLogDirGB < tsMinimalLogDirGB) {
printf("server disk:%s space remain %.3f GB, total %.1f GB, stop write log.\n", tsLogDir, tsAvailLogDirGB, tsTotalLogDirGB); printf("server disk:%s space remain %.3f GB, total %.1f GB, stop write log.\n", tsLogDir, tsAvailLogDirGB,
tsTotalLogDirGB);
fflush(stdout); fflush(stdout);
return; return;
} }
...@@ -503,7 +505,7 @@ void taosPrintLongString(const char *flags, int32_t dflag, const char *format, . ...@@ -503,7 +505,7 @@ void taosPrintLongString(const char *flags, int32_t dflag, const char *format, .
} else { } else {
taosWriteFile(tsLogObj.logHandle->fd, buffer, len); taosWriteFile(tsLogObj.logHandle->fd, buffer, len);
} }
if (tsLogObj.maxLines > 0) { if (tsLogObj.maxLines > 0) {
atomic_add_fetch_32(&tsLogObj.lines, 1); atomic_add_fetch_32(&tsLogObj.lines, 1);
...@@ -542,7 +544,7 @@ static SLogBuff *taosLogBuffNew(int32_t bufSize) { ...@@ -542,7 +544,7 @@ static SLogBuff *taosLogBuffNew(int32_t bufSize) {
tLogBuff->stop = 0; tLogBuff->stop = 0;
if (pthread_mutex_init(&LOG_BUF_MUTEX(tLogBuff), NULL) < 0) goto _err; if (pthread_mutex_init(&LOG_BUF_MUTEX(tLogBuff), NULL) < 0) goto _err;
//tsem_init(&(tLogBuff->buffNotEmpty), 0, 0); // tsem_init(&(tLogBuff->buffNotEmpty), 0, 0);
return tLogBuff; return tLogBuff;
...@@ -576,12 +578,12 @@ static void taosCopyLogBuffer(SLogBuff *tLogBuff, int32_t start, int32_t end, ch ...@@ -576,12 +578,12 @@ static void taosCopyLogBuffer(SLogBuff *tLogBuff, int32_t start, int32_t end, ch
} }
static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen) { static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen) {
int32_t start = 0; int32_t start = 0;
int32_t end = 0; int32_t end = 0;
int32_t remainSize = 0; int32_t remainSize = 0;
static int64_t lostLine = 0; static int64_t lostLine = 0;
char tmpBuf[40] = {0}; char tmpBuf[40] = {0};
int32_t tmpBufLen = 0; int32_t tmpBufLen = 0;
if (tLogBuff == NULL || tLogBuff->stop) return -1; if (tLogBuff == NULL || tLogBuff->stop) return -1;
...@@ -592,7 +594,7 @@ static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen) ...@@ -592,7 +594,7 @@ static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen)
remainSize = (start > end) ? (start - end - 1) : (start + LOG_BUF_SIZE(tLogBuff) - end - 1); remainSize = (start > end) ? (start - end - 1) : (start + LOG_BUF_SIZE(tLogBuff) - end - 1);
if (lostLine > 0) { if (lostLine > 0) {
sprintf(tmpBuf, "...Lost %"PRId64" lines here...\n", lostLine); sprintf(tmpBuf, "...Lost %" PRId64 " lines here...\n", lostLine);
tmpBufLen = (int32_t)strlen(tmpBuf); tmpBufLen = (int32_t)strlen(tmpBuf);
} }
...@@ -610,7 +612,7 @@ static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen) ...@@ -610,7 +612,7 @@ static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen)
taosCopyLogBuffer(tLogBuff, LOG_BUF_START(tLogBuff), LOG_BUF_END(tLogBuff), msg, msgLen); taosCopyLogBuffer(tLogBuff, LOG_BUF_START(tLogBuff), LOG_BUF_END(tLogBuff), msg, msgLen);
//int32_t w = atomic_sub_fetch_32(&waitLock, 1); // int32_t w = atomic_sub_fetch_32(&waitLock, 1);
/* /*
if (w <= 0 || ((remainSize - msgLen - tmpBufLen) < (LOG_BUF_SIZE(tLogBuff) * 4 /5))) { if (w <= 0 || ((remainSize - msgLen - tmpBufLen) < (LOG_BUF_SIZE(tLogBuff) * 4 /5))) {
tsem_post(&(tLogBuff->buffNotEmpty)); tsem_post(&(tLogBuff->buffNotEmpty));
...@@ -622,7 +624,6 @@ static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen) ...@@ -622,7 +624,6 @@ static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen)
pthread_mutex_unlock(&LOG_BUF_MUTEX(tLogBuff)); pthread_mutex_unlock(&LOG_BUF_MUTEX(tLogBuff));
return 0; return 0;
} }
...@@ -634,9 +635,9 @@ static int32_t taosGetLogRemainSize(SLogBuff *tLogBuff, int32_t start, int32_t e ...@@ -634,9 +635,9 @@ static int32_t taosGetLogRemainSize(SLogBuff *tLogBuff, int32_t start, int32_t e
static void taosWriteLog(SLogBuff *tLogBuff) { static void taosWriteLog(SLogBuff *tLogBuff) {
static int32_t lastDuration = 0; static int32_t lastDuration = 0;
int32_t remainChecked = 0; int32_t remainChecked = 0;
int32_t start, end, pollSize; int32_t start, end, pollSize;
do { do {
if (remainChecked == 0) { if (remainChecked == 0) {
start = LOG_BUF_START(tLogBuff); start = LOG_BUF_START(tLogBuff);
...@@ -662,24 +663,24 @@ static void taosWriteLog(SLogBuff *tLogBuff) { ...@@ -662,24 +663,24 @@ static void taosWriteLog(SLogBuff *tLogBuff) {
if (start < end) { if (start < end) {
taosWriteFile(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff) + start, pollSize); taosWriteFile(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff) + start, pollSize);
} else { } else {
int32_t tsize = LOG_BUF_SIZE(tLogBuff) - start; int32_t tsize = LOG_BUF_SIZE(tLogBuff) - start;
taosWriteFile(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff) + start, tsize); taosWriteFile(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff) + start, tsize);
taosWriteFile(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff), end); taosWriteFile(tLogBuff->fd, LOG_BUF_BUFFER(tLogBuff), end);
} }
dbgWN++; dbgWN++;
dbgWSize+=pollSize; dbgWSize += pollSize;
if (pollSize < tLogBuff->minBuffSize) { if (pollSize < tLogBuff->minBuffSize) {
dbgSmallWN++; dbgSmallWN++;
if (writeInterval < MAX_LOG_INTERVAL) { if (writeInterval < MAX_LOG_INTERVAL) {
writeInterval += LOG_INTERVAL_STEP; writeInterval += LOG_INTERVAL_STEP;
} }
} else if (pollSize > LOG_BUF_SIZE(tLogBuff)/3) { } else if (pollSize > LOG_BUF_SIZE(tLogBuff) / 3) {
dbgBigWN++; dbgBigWN++;
writeInterval = MIN_LOG_INTERVAL; writeInterval = MIN_LOG_INTERVAL;
} else if (pollSize > LOG_BUF_SIZE(tLogBuff)/4) { } else if (pollSize > LOG_BUF_SIZE(tLogBuff) / 4) {
if (writeInterval > MIN_LOG_INTERVAL) { if (writeInterval > MIN_LOG_INTERVAL) {
writeInterval -= LOG_INTERVAL_STEP; writeInterval -= LOG_INTERVAL_STEP;
} }
...@@ -698,13 +699,13 @@ static void taosWriteLog(SLogBuff *tLogBuff) { ...@@ -698,13 +699,13 @@ static void taosWriteLog(SLogBuff *tLogBuff) {
writeInterval = MIN_LOG_INTERVAL; writeInterval = MIN_LOG_INTERVAL;
remainChecked = 1; remainChecked = 1;
}while (1); } while (1);
} }
static void *taosAsyncOutputLog(void *param) { static void *taosAsyncOutputLog(void *param) {
SLogBuff *tLogBuff = (SLogBuff *)param; SLogBuff *tLogBuff = (SLogBuff *)param;
setThreadName("log"); setThreadName("log");
while (1) { while (1) {
taosMsleep(writeInterval); taosMsleep(writeInterval);
...@@ -721,8 +722,8 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) { ...@@ -721,8 +722,8 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) {
int32_t compressSize = 163840; int32_t compressSize = 163840;
int32_t ret = 0; int32_t ret = 0;
int32_t len = 0; int32_t len = 0;
char * data = malloc(compressSize); char *data = malloc(compressSize);
FILE * srcFp = NULL; FILE *srcFp = NULL;
gzFile dstFp = NULL; gzFile dstFp = NULL;
srcFp = fopen(srcFileName, "r"); srcFp = fopen(srcFileName, "r");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册