提交 cf9c6f11 编写于 作者: L Liu Jicong

add errno and log for tq

上级 894ff930
...@@ -16,8 +16,11 @@ ...@@ -16,8 +16,11 @@
#ifndef _TD_TQ_H_ #ifndef _TD_TQ_H_
#define _TD_TQ_H_ #define _TD_TQ_H_
#include "common.h"
#include "mallocator.h" #include "mallocator.h"
#include "os.h" #include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tlist.h" #include "tlist.h"
#include "tutil.h" #include "tutil.h"
...@@ -97,10 +100,9 @@ typedef struct STqTopicVhandle { ...@@ -97,10 +100,9 @@ typedef struct STqTopicVhandle {
typedef struct STqExec { typedef struct STqExec {
void* runtimeEnv; void* runtimeEnv;
// return type will be SSDataBlock SSDataBlock* (*exec)(void* runtimeEnv);
void* (*exec)(void* runtimeEnv); void* (*assign)(void* runtimeEnv, SSubmitBlk* inputData);
// inputData type will be submitblk void (*clear)(void* runtimeEnv);
void* (*assign)(void* runtimeEnv, void* inputData);
char* (*serialize)(struct STqExec*); char* (*serialize)(struct STqExec*);
struct STqExec* (*deserialize)(char*); struct STqExec* (*deserialize)(char*);
} STqExec; } STqExec;
...@@ -133,22 +135,17 @@ typedef struct STqListHandle { ...@@ -133,22 +135,17 @@ typedef struct STqListHandle {
} STqList; } STqList;
typedef struct STqGroup { typedef struct STqGroup {
int64_t cId; int64_t clientId;
int64_t cgId; int64_t cgId;
void* ahandle; void* ahandle;
int32_t topicNum; int32_t topicNum;
STqList* head; STqList* head;
SList* topicList; // SList<STqTopic> SList* topicList; // SList<STqTopic>
void* returnMsg; // SVReadMsg
} STqGroup; } STqGroup;
typedef struct STqQueryExec {
void* src;
STqMsgItem* dest;
void* executor;
} STqQueryExec;
typedef struct STqQueryMsg { typedef struct STqQueryMsg {
STqQueryExec* exec; STqMsgItem* item;
struct STqQueryMsg* next; struct STqQueryMsg* next;
} STqQueryMsg; } STqQueryMsg;
...@@ -258,12 +255,10 @@ typedef struct STQ { ...@@ -258,12 +255,10 @@ typedef struct STQ {
STqLogReader* tqLogReader; STqLogReader* tqLogReader;
STqMemRef tqMemRef; STqMemRef tqMemRef;
STqMetaStore* tqMeta; STqMetaStore* tqMeta;
STqExec* tqExec;
} STQ; } STQ;
// open in each vnode // open in each vnode
STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac, STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac);
STqExec* tqExec);
void tqClose(STQ*); void tqClose(STQ*);
// void* will be replace by a msg type // void* will be replace by a msg type
......
...@@ -353,6 +353,20 @@ int32_t* taosGetErrno(); ...@@ -353,6 +353,20 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SYN_INVALID_MSGLEN TAOS_DEF_ERROR_CODE(0, 0x0909) //"Invalid msg length") #define TSDB_CODE_SYN_INVALID_MSGLEN TAOS_DEF_ERROR_CODE(0, 0x0909) //"Invalid msg length")
#define TSDB_CODE_SYN_INVALID_MSGTYPE TAOS_DEF_ERROR_CODE(0, 0x090A) //"Invalid msg type") #define TSDB_CODE_SYN_INVALID_MSGTYPE TAOS_DEF_ERROR_CODE(0, 0x090A) //"Invalid msg type")
// tq
#define TSDB_CODE_TQ_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0A00) //"Invalid configuration")
#define TSDB_CODE_TQ_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0A01) //"Tq init failed")
#define TSDB_CODE_TQ_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x0A02) //"No diskspace for tq")
#define TSDB_CODE_TQ_NO_DISK_PERMISSIONS TAOS_DEF_ERROR_CODE(0, 0x0A03) //"No permission for disk files")
#define TSDB_CODE_TQ_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0A04) //"Data file(s) corrupted")
#define TSDB_CODE_TQ_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0A05) //"Out of memory")
#define TSDB_CODE_TQ_FILE_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x0A06) //"File already exists")
#define TSDB_CODE_TQ_FAILED_TO_CREATE_DIR TAOS_DEF_ERROR_CODE(0, 0x0A07) //"Failed to create dir")
#define TSDB_CODE_TQ_META_NO_SUCH_KEY TAOS_DEF_ERROR_CODE(0, 0x0A08) //"Target key not found")
#define TSDB_CODE_TQ_META_KEY_NOT_IN_TXN TAOS_DEF_ERROR_CODE(0, 0x0A09) //"Target key not in transaction")
#define TSDB_CODE_TQ_META_KEY_DUP_IN_TXN TAOS_DEF_ERROR_CODE(0, 0x0A0A) //"Target key duplicated in transaction")
#define TSDB_CODE_TQ_GROUP_NOT_SET TAOS_DEF_ERROR_CODE(0, 0x0A0B) //"Group of corresponding client is not set by mnode")
// wal // wal
#define TSDB_CODE_WAL_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x1000) //"Unexpected generic error in wal") #define TSDB_CODE_WAL_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x1000) //"Unexpected generic error in wal")
#define TSDB_CODE_WAL_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x1001) //"WAL file is corrupted") #define TSDB_CODE_WAL_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x1001) //"WAL file is corrupted")
......
...@@ -42,11 +42,11 @@ extern int32_t qDebugFlag; ...@@ -42,11 +42,11 @@ extern int32_t qDebugFlag;
extern int32_t wDebugFlag; extern int32_t wDebugFlag;
extern int32_t sDebugFlag; extern int32_t sDebugFlag;
extern int32_t tsdbDebugFlag; extern int32_t tsdbDebugFlag;
extern int32_t tqDebugFlag;
extern int32_t cqDebugFlag; extern int32_t cqDebugFlag;
extern int32_t debugFlag; extern int32_t debugFlag;
extern int32_t ctgDebugFlag; extern int32_t ctgDebugFlag;
#define DEBUG_FATAL 1U #define DEBUG_FATAL 1U
#define DEBUG_ERROR DEBUG_FATAL #define DEBUG_ERROR DEBUG_FATAL
#define DEBUG_WARN 2U #define DEBUG_WARN 2U
......
...@@ -11,6 +11,7 @@ target_link_libraries( ...@@ -11,6 +11,7 @@ target_link_libraries(
PUBLIC wal PUBLIC wal
PUBLIC os PUBLIC os
PUBLIC util PUBLIC util
PUBLIC common
) )
if(${BUILD_TEST}) if(${BUILD_TEST})
......
...@@ -17,11 +17,20 @@ ...@@ -17,11 +17,20 @@
#define _TD_TQ_INT_H_ #define _TD_TQ_INT_H_
#include "tq.h" #include "tq.h"
#include "tlog.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
extern int32_t tqDebugFlag;
#define tqFatal(...) { if (tqDebugFlag & DEBUG_FATAL) { taosPrintLog("TQ FATAL ", 255, __VA_ARGS__); }}
#define tqError(...) { if (tqDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ ERROR ", 255, __VA_ARGS__); }}
#define tqWarn(...) { if (tqDebugFlag & DEBUG_WARN) { taosPrintLog("TQ WARN ", 255, __VA_ARGS__); }}
#define tqInfo(...) { if (tqDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", 255, __VA_ARGS__); }}
#define tqDebug(...) { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", tqDebugFlag, __VA_ARGS__); }}
#define tqTrace(...) { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", tqDebugFlag, __VA_ARGS__); }}
// create persistent storage for meta info such as consuming offset // create persistent storage for meta info such as consuming offset
// return value > 0: cgId // return value > 0: cgId
// return value <= 0: error code // return value <= 0: error code
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
#define _TQ_META_STORE_H_ #define _TQ_META_STORE_H_
#include "os.h" #include "os.h"
#include "tq.h" #include "tqInt.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
......
...@@ -35,11 +35,10 @@ void* tqSerializeItem(STqMsgItem* pItem, void* ptr); ...@@ -35,11 +35,10 @@ void* tqSerializeItem(STqMsgItem* pItem, void* ptr);
const void* tqDeserializeTopic(const void* pBytes, STqTopic* pTopic); const void* tqDeserializeTopic(const void* pBytes, STqTopic* pTopic);
const void* tqDeserializeItem(const void* pBytes, STqMsgItem* pItem); const void* tqDeserializeItem(const void* pBytes, STqMsgItem* pItem);
STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac, STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac) {
STqExec* tqExec) {
STQ* pTq = malloc(sizeof(STQ)); STQ* pTq = malloc(sizeof(STQ));
if (pTq == NULL) { if (pTq == NULL) {
// TODO: memory error terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pTq->path = strdup(path); pTq->path = strdup(path);
...@@ -48,14 +47,13 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemA ...@@ -48,14 +47,13 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemA
pTq->tqMemRef.pAlloctorFactory = allocFac; pTq->tqMemRef.pAlloctorFactory = allocFac;
pTq->tqMemRef.pAllocator = allocFac->create(allocFac); pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
if (pTq->tqMemRef.pAllocator == NULL) { if (pTq->tqMemRef.pAllocator == NULL) {
// TODO // TODO: error code of buffer pool
} }
pTq->tqMeta = tqStoreOpen(path, (FTqSerialize)tqSerializeGroup, (FTqDeserialize)tqDeserializeGroup, free, 0); pTq->tqMeta = tqStoreOpen(path, (FTqSerialize)tqSerializeGroup, (FTqDeserialize)tqDeserializeGroup, free, 0);
if (pTq->tqMeta == NULL) { if (pTq->tqMeta == NULL) {
// TODO: free STQ // TODO: free STQ
return NULL; return NULL;
} }
pTq->tqExec = tqExec;
return pTq; return pTq;
} }
...@@ -70,16 +68,16 @@ static int tqAckOneTopic(STqTopic* pTopic, STqOneAck* pAck, STqQueryMsg** ppQuer ...@@ -70,16 +68,16 @@ static int tqAckOneTopic(STqTopic* pTopic, STqOneAck* pAck, STqQueryMsg** ppQuer
if (1 /* TODO: need to launch new query */) { if (1 /* TODO: need to launch new query */) {
STqQueryMsg* pNewQuery = malloc(sizeof(STqQueryMsg)); STqQueryMsg* pNewQuery = malloc(sizeof(STqQueryMsg));
if (pNewQuery == NULL) { if (pNewQuery == NULL) {
// TODO: memory insufficient terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return -1; return -1;
} }
// TODO: lock executor // TODO: lock executor
pNewQuery->exec->executor = pTopic->buffer[idx].executor;
// TODO: read from wal and assign to src // TODO: read from wal and assign to src
pNewQuery->exec->src = 0; /*pNewQuery->exec->executor = pTopic->buffer[idx].executor;*/
pNewQuery->exec->dest = &pTopic->buffer[idx]; /*pNewQuery->exec->src = 0;*/
pNewQuery->next = *ppQuery; /*pNewQuery->exec->dest = &pTopic->buffer[idx];*/
*ppQuery = pNewQuery; /*pNewQuery->next = *ppQuery;*/
/**ppQuery = pNewQuery;*/
} }
return 0; return 0;
} }
...@@ -134,10 +132,10 @@ STqGroup* tqOpenGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { ...@@ -134,10 +132,10 @@ STqGroup* tqOpenGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
// TODO // TODO
return NULL; return NULL;
} }
tqHandleMovePut(pTq->tqMeta, cId, pGroup);
} }
ASSERT(pGroup);
// create
// open
return pGroup; return pGroup;
} }
...@@ -272,6 +270,33 @@ int tqSetCursor(STQ* pTq, STqSetCurReq* pMsg) { ...@@ -272,6 +270,33 @@ int tqSetCursor(STQ* pTq, STqSetCurReq* pMsg) {
return 0; return 0;
} }
int tqConsume(STQ* pTq, STqConsumeReq* pMsg) {
int64_t clientId = pMsg->head.clientId;
STqGroup* pGroup = tqGetGroup(pTq, clientId);
if (pGroup == NULL) {
terrno = TSDB_CODE_TQ_GROUP_NOT_SET;
return -1;
}
STqConsumeRsp* pRsp = (STqConsumeRsp*)pMsg;
int numOfMsgs = tqFetch(pGroup, (void**)&pRsp->msgs);
if (numOfMsgs < 0) {
return -1;
}
if (numOfMsgs == 0) {
// most recent data has been fetched
// enable timer for blocking wait
// once new data written during wait time
// launch query and response
}
// fetched a num of msgs, rpc response
return 0;
}
#if 0
int tqConsume(STQ* pTq, STqConsumeReq* pMsg) { int tqConsume(STQ* pTq, STqConsumeReq* pMsg) {
if (!tqProtoCheck((STqMsgHead*)pMsg)) { if (!tqProtoCheck((STqMsgHead*)pMsg)) {
// proto version invalid // proto version invalid
...@@ -304,6 +329,7 @@ int tqConsume(STQ* pTq, STqConsumeReq* pMsg) { ...@@ -304,6 +329,7 @@ int tqConsume(STQ* pTq, STqConsumeReq* pMsg) {
/*}*/ /*}*/
return 0; return 0;
} }
#endif
int tqSerializeGroup(const STqGroup* pGroup, STqSerializedHead** ppHead) { int tqSerializeGroup(const STqGroup* pGroup, STqSerializedHead** ppHead) {
// calculate size // calculate size
...@@ -320,7 +346,7 @@ int tqSerializeGroup(const STqGroup* pGroup, STqSerializedHead** ppHead) { ...@@ -320,7 +346,7 @@ int tqSerializeGroup(const STqGroup* pGroup, STqSerializedHead** ppHead) {
} }
void* ptr = (*ppHead)->content; void* ptr = (*ppHead)->content;
// do serialization // do serialization
*(int64_t*)ptr = pGroup->cId; *(int64_t*)ptr = pGroup->clientId;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int64_t*)ptr = pGroup->cgId; *(int64_t*)ptr = pGroup->cgId;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
...@@ -366,7 +392,7 @@ void* tqSerializeItem(STqMsgItem* bufItem, void* ptr) { ...@@ -366,7 +392,7 @@ void* tqSerializeItem(STqMsgItem* bufItem, void* ptr) {
const void* tqDeserializeGroup(const STqSerializedHead* pHead, STqGroup** ppGroup) { const void* tqDeserializeGroup(const STqSerializedHead* pHead, STqGroup** ppGroup) {
STqGroup* gHandle = *ppGroup; STqGroup* gHandle = *ppGroup;
const void* ptr = pHead->content; const void* ptr = pHead->content;
gHandle->cId = *(int64_t*)ptr; gHandle->clientId = *(int64_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
gHandle->cgId = *(int64_t*)ptr; gHandle->cgId = *(int64_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
......
...@@ -56,6 +56,7 @@ static inline int tqReadLastPage(int fd, STqIdxPageBuf* pBuf) { ...@@ -56,6 +56,7 @@ static inline int tqReadLastPage(int fd, STqIdxPageBuf* pBuf) {
int offset = tqSeekLastPage(fd); int offset = tqSeekLastPage(fd);
int nBytes; int nBytes;
if ((nBytes = read(fd, pBuf, TQ_PAGE_SIZE)) == -1) { if ((nBytes = read(fd, pBuf, TQ_PAGE_SIZE)) == -1) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
if (nBytes == 0) { if (nBytes == 0) {
...@@ -71,7 +72,7 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial ...@@ -71,7 +72,7 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial
int32_t tqConfigFlag) { int32_t tqConfigFlag) {
STqMetaStore* pMeta = malloc(sizeof(STqMetaStore)); STqMetaStore* pMeta = malloc(sizeof(STqMetaStore));
if (pMeta == NULL) { if (pMeta == NULL) {
// close terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return NULL; return NULL;
} }
memset(pMeta, 0, sizeof(STqMetaStore)); memset(pMeta, 0, sizeof(STqMetaStore));
...@@ -79,8 +80,9 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial ...@@ -79,8 +80,9 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial
// concat data file name and index file name // concat data file name and index file name
size_t pathLen = strlen(path); size_t pathLen = strlen(path);
pMeta->dirPath = malloc(pathLen + 1); pMeta->dirPath = malloc(pathLen + 1);
if (pMeta->dirPath != NULL) { if (pMeta->dirPath == NULL) {
// TODO: memory insufficient terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return NULL;
} }
strcpy(pMeta->dirPath, path); strcpy(pMeta->dirPath, path);
...@@ -88,13 +90,14 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial ...@@ -88,13 +90,14 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial
strcpy(name, path); strcpy(name, path);
if (taosDirExist(name) != 0 && taosMkDir(name) != 0) { if (taosDirExist(name) != 0 && taosMkDir(name) != 0) {
ASSERT(false); terrno = TSDB_CODE_TQ_FAILED_TO_CREATE_DIR;
tqError("failed to create dir:%s since %s ", name, terrstr());
} }
strcat(name, "/" TQ_IDX_NAME); strcat(name, "/" TQ_IDX_NAME);
int idxFd = open(name, O_RDWR | O_CREAT, 0755); int idxFd = open(name, O_RDWR | O_CREAT, 0755);
if (idxFd < 0) { if (idxFd < 0) {
ASSERT(false); terrno = TAOS_SYSTEM_ERROR(errno);
// close file tqError("failed to open file:%s since %s ", name, terrstr());
// free memory // free memory
return NULL; return NULL;
} }
...@@ -102,9 +105,7 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial ...@@ -102,9 +105,7 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial
pMeta->idxFd = idxFd; pMeta->idxFd = idxFd;
pMeta->unpersistHead = malloc(sizeof(STqMetaList)); pMeta->unpersistHead = malloc(sizeof(STqMetaList));
if (pMeta->unpersistHead == NULL) { if (pMeta->unpersistHead == NULL) {
ASSERT(false); terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
// close file
// free memory
return NULL; return NULL;
} }
memset(pMeta->unpersistHead, 0, sizeof(STqMetaList)); memset(pMeta->unpersistHead, 0, sizeof(STqMetaList));
...@@ -114,7 +115,8 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial ...@@ -114,7 +115,8 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial
strcat(name, "/" TQ_META_NAME); strcat(name, "/" TQ_META_NAME);
int fileFd = open(name, O_RDWR | O_CREAT, 0755); int fileFd = open(name, O_RDWR | O_CREAT, 0755);
if (fileFd < 0) { if (fileFd < 0) {
ASSERT(false); terrno = TAOS_SYSTEM_ERROR(errno);
tqError("failed to open file:%s since %s", name, terrstr());
return NULL; return NULL;
} }
...@@ -129,7 +131,7 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial ...@@ -129,7 +131,7 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial
STqIdxPageBuf idxBuf; STqIdxPageBuf idxBuf;
STqSerializedHead* serializedObj = malloc(TQ_PAGE_SIZE); STqSerializedHead* serializedObj = malloc(TQ_PAGE_SIZE);
if (serializedObj == NULL) { if (serializedObj == NULL) {
// TODO:memory insufficient terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
} }
int idxRead; int idxRead;
int allocated = TQ_PAGE_SIZE; int allocated = TQ_PAGE_SIZE;
...@@ -137,14 +139,16 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial ...@@ -137,14 +139,16 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial
while ((idxRead = read(idxFd, &idxBuf, TQ_PAGE_SIZE))) { while ((idxRead = read(idxFd, &idxBuf, TQ_PAGE_SIZE))) {
if (idxRead == -1) { if (idxRead == -1) {
// TODO: handle error // TODO: handle error
ASSERT(false); terrno = TAOS_SYSTEM_ERROR(errno);
tqError("failed to read tq index file since %s", terrstr());
} }
ASSERT(idxBuf.head.writeOffset == idxRead); ASSERT(idxBuf.head.writeOffset == idxRead);
// loop read every entry // loop read every entry
for (int i = 0; i < idxBuf.head.writeOffset - TQ_IDX_PAGE_HEAD_SIZE; i += TQ_IDX_SIZE) { for (int i = 0; i < idxBuf.head.writeOffset - TQ_IDX_PAGE_HEAD_SIZE; i += TQ_IDX_SIZE) {
STqMetaList* pNode = malloc(sizeof(STqMetaList)); STqMetaList* pNode = malloc(sizeof(STqMetaList));
if (pNode == NULL) { if (pNode == NULL) {
// TODO: free memory and return error terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
// TODO: free memory
} }
memset(pNode, 0, sizeof(STqMetaList)); memset(pNode, 0, sizeof(STqMetaList));
memcpy(&pNode->handle, &idxBuf.buffer[i], TQ_IDX_SIZE); memcpy(&pNode->handle, &idxBuf.buffer[i], TQ_IDX_SIZE);
...@@ -153,7 +157,8 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial ...@@ -153,7 +157,8 @@ STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserial
if (allocated < pNode->handle.serializedSize) { if (allocated < pNode->handle.serializedSize) {
void* ptr = realloc(serializedObj, pNode->handle.serializedSize); void* ptr = realloc(serializedObj, pNode->handle.serializedSize);
if (ptr == NULL) { if (ptr == NULL) {
// TODO: memory insufficient terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
// TODO: free memory
} }
serializedObj = ptr; serializedObj = ptr;
allocated = pNode->handle.serializedSize; allocated = pNode->handle.serializedSize;
...@@ -292,7 +297,7 @@ int32_t tqStorePersist(STqMetaStore* pMeta) { ...@@ -292,7 +297,7 @@ int32_t tqStorePersist(STqMetaStore* pMeta) {
STqMetaList* pNode = pHead->unpersistNext; STqMetaList* pNode = pHead->unpersistNext;
STqSerializedHead* pSHead = malloc(sizeof(STqSerializedHead)); STqSerializedHead* pSHead = malloc(sizeof(STqSerializedHead));
if (pSHead == NULL) { if (pSHead == NULL) {
// TODO: memory error terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return -1; return -1;
} }
pSHead->ver = TQ_SVER; pSHead->ver = TQ_SVER;
...@@ -403,7 +408,6 @@ static int32_t tqHandlePutCommitted(STqMetaStore* pMeta, int64_t key, void* valu ...@@ -403,7 +408,6 @@ static int32_t tqHandlePutCommitted(STqMetaStore* pMeta, int64_t key, void* valu
STqMetaList* pNode = pMeta->bucket[bucketKey]; STqMetaList* pNode = pMeta->bucket[bucketKey];
while (pNode) { while (pNode) {
if (pNode->handle.key == key) { if (pNode->handle.key == key) {
// TODO: think about thread safety
if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
pMeta->pDeleter(pNode->handle.valueInUse); pMeta->pDeleter(pNode->handle.valueInUse);
} }
...@@ -416,7 +420,7 @@ static int32_t tqHandlePutCommitted(STqMetaStore* pMeta, int64_t key, void* valu ...@@ -416,7 +420,7 @@ static int32_t tqHandlePutCommitted(STqMetaStore* pMeta, int64_t key, void* valu
} }
STqMetaList* pNewNode = malloc(sizeof(STqMetaList)); STqMetaList* pNewNode = malloc(sizeof(STqMetaList));
if (pNewNode == NULL) { if (pNewNode == NULL) {
// TODO: memory error terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return -1; return -1;
} }
memset(pNewNode, 0, sizeof(STqMetaList)); memset(pNewNode, 0, sizeof(STqMetaList));
...@@ -470,10 +474,10 @@ static inline int32_t tqHandlePutImpl(STqMetaStore* pMeta, int64_t key, void* va ...@@ -470,10 +474,10 @@ static inline int32_t tqHandlePutImpl(STqMetaStore* pMeta, int64_t key, void* va
STqMetaList* pNode = pMeta->bucket[bucketKey]; STqMetaList* pNode = pMeta->bucket[bucketKey];
while (pNode) { while (pNode) {
if (pNode->handle.key == key) { if (pNode->handle.key == key) {
// TODO: think about thread safety
if (pNode->handle.valueInTxn) { if (pNode->handle.valueInTxn) {
if (tqDupIntxnReject(pMeta->tqConfigFlag)) { if (tqDupIntxnReject(pMeta->tqConfigFlag)) {
return -2; terrno = TSDB_CODE_TQ_META_KEY_DUP_IN_TXN;
return -1;
} }
if (pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { if (pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
pMeta->pDeleter(pNode->handle.valueInTxn); pMeta->pDeleter(pNode->handle.valueInTxn);
...@@ -488,7 +492,7 @@ static inline int32_t tqHandlePutImpl(STqMetaStore* pMeta, int64_t key, void* va ...@@ -488,7 +492,7 @@ static inline int32_t tqHandlePutImpl(STqMetaStore* pMeta, int64_t key, void* va
} }
STqMetaList* pNewNode = malloc(sizeof(STqMetaList)); STqMetaList* pNewNode = malloc(sizeof(STqMetaList));
if (pNewNode == NULL) { if (pNewNode == NULL) {
// TODO: memory error terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return -1; return -1;
} }
memset(pNewNode, 0, sizeof(STqMetaList)); memset(pNewNode, 0, sizeof(STqMetaList));
...@@ -505,7 +509,7 @@ int32_t tqHandleMovePut(STqMetaStore* pMeta, int64_t key, void* value) { return ...@@ -505,7 +509,7 @@ int32_t tqHandleMovePut(STqMetaStore* pMeta, int64_t key, void* value) { return
int32_t tqHandleCopyPut(STqMetaStore* pMeta, int64_t key, void* value, size_t vsize) { int32_t tqHandleCopyPut(STqMetaStore* pMeta, int64_t key, void* value, size_t vsize) {
void* vmem = malloc(vsize); void* vmem = malloc(vsize);
if (vmem == NULL) { if (vmem == NULL) {
// TODO: memory error terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
return -1; return -1;
} }
memcpy(vmem, value, vsize); memcpy(vmem, value, vsize);
...@@ -535,6 +539,7 @@ int32_t tqHandleCommit(STqMetaStore* pMeta, int64_t key) { ...@@ -535,6 +539,7 @@ int32_t tqHandleCommit(STqMetaStore* pMeta, int64_t key) {
while (pNode) { while (pNode) {
if (pNode->handle.key == key) { if (pNode->handle.key == key) {
if (pNode->handle.valueInTxn == NULL) { if (pNode->handle.valueInTxn == NULL) {
terrno = TSDB_CODE_TQ_META_KEY_NOT_IN_TXN;
return -1; return -1;
} }
if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
...@@ -548,7 +553,8 @@ int32_t tqHandleCommit(STqMetaStore* pMeta, int64_t key) { ...@@ -548,7 +553,8 @@ int32_t tqHandleCommit(STqMetaStore* pMeta, int64_t key) {
pNode = pNode->next; pNode = pNode->next;
} }
} }
return -2; terrno = TSDB_CODE_TQ_META_NO_SUCH_KEY;
return -1;
} }
int32_t tqHandleAbort(STqMetaStore* pMeta, int64_t key) { int32_t tqHandleAbort(STqMetaStore* pMeta, int64_t key) {
...@@ -564,12 +570,14 @@ int32_t tqHandleAbort(STqMetaStore* pMeta, int64_t key) { ...@@ -564,12 +570,14 @@ int32_t tqHandleAbort(STqMetaStore* pMeta, int64_t key) {
tqLinkUnpersist(pMeta, pNode); tqLinkUnpersist(pMeta, pNode);
return 0; return 0;
} }
terrno = TSDB_CODE_TQ_META_KEY_NOT_IN_TXN;
return -1; return -1;
} else { } else {
pNode = pNode->next; pNode = pNode->next;
} }
} }
return -2; terrno = TSDB_CODE_TQ_META_NO_SUCH_KEY;
return -1;
} }
int32_t tqHandleDel(STqMetaStore* pMeta, int64_t key) { int32_t tqHandleDel(STqMetaStore* pMeta, int64_t key) {
...@@ -588,7 +596,7 @@ int32_t tqHandleDel(STqMetaStore* pMeta, int64_t key) { ...@@ -588,7 +596,7 @@ int32_t tqHandleDel(STqMetaStore* pMeta, int64_t key) {
pNode = pNode->next; pNode = pNode->next;
} }
} }
// no such key terrno = TSDB_CODE_TQ_META_NO_SUCH_KEY;
return -1; return -1;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册