未验证 提交 8b5e0b43 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #9104 from taosdata/feature/tq

refactor wal and tq
...@@ -226,21 +226,21 @@ typedef struct TqMetaHandle { ...@@ -226,21 +226,21 @@ typedef struct TqMetaHandle {
int64_t serializedSize; int64_t serializedSize;
void* valueInUse; void* valueInUse;
void* valueInTxn; void* valueInTxn;
} TqMetaHandle; } STqMetaHandle;
typedef struct TqMetaList { typedef struct TqMetaList {
TqMetaHandle handle; STqMetaHandle handle;
struct TqMetaList* next; struct TqMetaList* next;
//struct TqMetaList* inTxnPrev; //struct TqMetaList* inTxnPrev;
//struct TqMetaList* inTxnNext; //struct TqMetaList* inTxnNext;
struct TqMetaList* unpersistPrev; struct TqMetaList* unpersistPrev;
struct TqMetaList* unpersistNext; struct TqMetaList* unpersistNext;
} TqMetaList; } STqMetaList;
typedef struct TqMetaStore { typedef struct TqMetaStore {
TqMetaList* bucket[TQ_BUCKET_SIZE]; STqMetaList* bucket[TQ_BUCKET_SIZE];
//a table head //a table head
TqMetaList* unpersistHead; STqMetaList* unpersistHead;
//TODO:temporaral use, to be replaced by unified tfile //TODO:temporaral use, to be replaced by unified tfile
int fileFd; int fileFd;
//TODO:temporaral use, to be replaced by unified tfile //TODO:temporaral use, to be replaced by unified tfile
...@@ -250,7 +250,7 @@ typedef struct TqMetaStore { ...@@ -250,7 +250,7 @@ typedef struct TqMetaStore {
TqSerializeFun pSerializer; TqSerializeFun pSerializer;
TqDeserializeFun pDeserializer; TqDeserializeFun pDeserializer;
TqDeleteFun pDeleter; TqDeleteFun pDeleter;
} TqMetaStore; } STqMetaStore;
typedef struct STQ { typedef struct STQ {
// the collection of group handle // the collection of group handle
...@@ -259,7 +259,7 @@ typedef struct STQ { ...@@ -259,7 +259,7 @@ typedef struct STQ {
STqCfg* tqConfig; STqCfg* tqConfig;
TqLogReader* tqLogReader; TqLogReader* tqLogReader;
TqMemRef tqMemRef; TqMemRef tqMemRef;
TqMetaStore* tqMeta; STqMetaStore* tqMeta;
} STQ; } STQ;
// open in each vnode // open in each vnode
......
...@@ -88,17 +88,17 @@ typedef struct SWalVer { ...@@ -88,17 +88,17 @@ typedef struct SWalVer {
typedef struct SWal { typedef struct SWal {
// cfg // cfg
SWalCfg cfg; SWalCfg cfg;
int32_t fsyncSeq;
//meta
SWalVer vers; SWalVer vers;
//file set
int64_t writeLogTfd; int64_t writeLogTfd;
int64_t writeIdxTfd; int64_t writeIdxTfd;
int32_t writeCur; int32_t writeCur;
SArray* fileInfoSet; SArray* fileInfoSet;
//statistics //status
int64_t totSize; int64_t totSize;
int64_t lastRollSeq; int64_t lastRollSeq;
//ctl //ctl
int32_t fsyncSeq;
int64_t refId; int64_t refId;
pthread_mutex_t mutex; pthread_mutex_t mutex;
//path //path
......
...@@ -24,29 +24,29 @@ extern "C" { ...@@ -24,29 +24,29 @@ extern "C" {
#endif #endif
TqMetaStore* tqStoreOpen(const char* path, STqMetaStore* tqStoreOpen(const char* path,
TqSerializeFun pSerializer, TqSerializeFun pSerializer,
TqDeserializeFun pDeserializer, TqDeserializeFun pDeserializer,
TqDeleteFun pDeleter, TqDeleteFun pDeleter,
int32_t tqConfigFlag int32_t tqConfigFlag
); );
int32_t tqStoreClose(TqMetaStore*); int32_t tqStoreClose(STqMetaStore*);
//int32_t tqStoreDelete(TqMetaStore*); //int32_t tqStoreDelete(TqMetaStore*);
//int32_t tqStoreCommitAll(TqMetaStore*); //int32_t tqStoreCommitAll(TqMetaStore*);
int32_t tqStorePersist(TqMetaStore*); int32_t tqStorePersist(STqMetaStore*);
//clean deleted idx and data from persistent file //clean deleted idx and data from persistent file
int32_t tqStoreCompact(TqMetaStore*); int32_t tqStoreCompact(STqMetaStore*);
void* tqHandleGet(TqMetaStore*, int64_t key); void* tqHandleGet(STqMetaStore*, int64_t key);
//make it unpersist //make it unpersist
void* tqHandleTouchGet(TqMetaStore*, int64_t key); void* tqHandleTouchGet(STqMetaStore*, int64_t key);
int32_t tqHandleMovePut(TqMetaStore*, int64_t key, void* value); int32_t tqHandleMovePut(STqMetaStore*, int64_t key, void* value);
int32_t tqHandleCopyPut(TqMetaStore*, int64_t key, void* value, size_t vsize); int32_t tqHandleCopyPut(STqMetaStore*, int64_t key, void* value, size_t vsize);
//delete committed kv pair //delete committed kv pair
//notice that a delete action still needs to be committed //notice that a delete action still needs to be committed
int32_t tqHandleDel(TqMetaStore*, int64_t key); int32_t tqHandleDel(STqMetaStore*, int64_t key);
int32_t tqHandleCommit(TqMetaStore*, int64_t key); int32_t tqHandleCommit(STqMetaStore*, int64_t key);
int32_t tqHandleAbort(TqMetaStore*, int64_t key); int32_t tqHandleAbort(STqMetaStore*, int64_t key);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -49,8 +49,8 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, TqLogReader* tqLogReader, SMemAl ...@@ -49,8 +49,8 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, TqLogReader* tqLogReader, SMemAl
pTq->path = strdup(path); pTq->path = strdup(path);
pTq->tqConfig = tqConfig; pTq->tqConfig = tqConfig;
pTq->tqLogReader = tqLogReader; pTq->tqLogReader = tqLogReader;
// pTq->tqMemRef.pAlloctorFactory = allocFac; pTq->tqMemRef.pAlloctorFactory = allocFac;
// pTq->tqMemRef.pAllocator = allocFac->create(allocFac); pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
if(pTq->tqMemRef.pAllocator == NULL) { if(pTq->tqMemRef.pAllocator == NULL) {
//TODO //TODO
} }
...@@ -202,7 +202,6 @@ static int tqFetch(TqGroupHandle* gHandle, void** msg) { ...@@ -202,7 +202,6 @@ static int tqFetch(TqGroupHandle* gHandle, void** msg) {
return totSize; return totSize;
} }
TqGroupHandle* tqGetGroupHandle(STQ* pTq, int64_t cId) { TqGroupHandle* tqGetGroupHandle(STQ* pTq, int64_t cId) {
return NULL; return NULL;
} }
......
...@@ -22,11 +22,10 @@ ...@@ -22,11 +22,10 @@
#define TQ_META_NAME "tq.meta" #define TQ_META_NAME "tq.meta"
#define TQ_IDX_NAME "tq.idx" #define TQ_IDX_NAME "tq.idx"
static int32_t tqHandlePutCommitted(STqMetaStore*, int64_t key, void* value);
static void* tqHandleGetUncommitted(STqMetaStore*, int64_t key);
static int32_t tqHandlePutCommitted(TqMetaStore*, int64_t key, void* value); static inline void tqLinkUnpersist(STqMetaStore *pMeta, STqMetaList* pNode) {
static void* tqHandleGetUncommitted(TqMetaStore*, int64_t key);
static inline void tqLinkUnpersist(TqMetaStore *pMeta, TqMetaList* pNode) {
if(pNode->unpersistNext == NULL) { if(pNode->unpersistNext == NULL) {
pNode->unpersistNext = pMeta->unpersistHead->unpersistNext; pNode->unpersistNext = pMeta->unpersistHead->unpersistNext;
pNode->unpersistPrev = pMeta->unpersistHead; pNode->unpersistPrev = pMeta->unpersistHead;
...@@ -68,18 +67,18 @@ static inline int tqReadLastPage(int fd, TqIdxPageBuf* pBuf) { ...@@ -68,18 +67,18 @@ static inline int tqReadLastPage(int fd, TqIdxPageBuf* pBuf) {
return lseek(fd, offset, SEEK_SET); return lseek(fd, offset, SEEK_SET);
} }
TqMetaStore* tqStoreOpen(const char* path, STqMetaStore* tqStoreOpen(const char* path,
TqSerializeFun serializer, TqSerializeFun serializer,
TqDeserializeFun deserializer, TqDeserializeFun deserializer,
TqDeleteFun deleter, TqDeleteFun deleter,
int32_t tqConfigFlag int32_t tqConfigFlag
) { ) {
TqMetaStore* pMeta = malloc(sizeof(TqMetaStore)); STqMetaStore* pMeta = malloc(sizeof(STqMetaStore));
if(pMeta == NULL) { if(pMeta == NULL) {
//close //close
return NULL; return NULL;
} }
memset(pMeta, 0, sizeof(TqMetaStore)); memset(pMeta, 0, sizeof(STqMetaStore));
//concat data file name and index file name //concat data file name and index file name
size_t pathLen = strlen(path); size_t pathLen = strlen(path);
...@@ -105,14 +104,14 @@ TqMetaStore* tqStoreOpen(const char* path, ...@@ -105,14 +104,14 @@ TqMetaStore* tqStoreOpen(const char* path,
} }
pMeta->idxFd = idxFd; pMeta->idxFd = idxFd;
pMeta->unpersistHead = malloc(sizeof(TqMetaList)); pMeta->unpersistHead = malloc(sizeof(STqMetaList));
if(pMeta->unpersistHead == NULL) { if(pMeta->unpersistHead == NULL) {
ASSERT(false); ASSERT(false);
//close file //close file
//free memory //free memory
return NULL; return NULL;
} }
memset(pMeta->unpersistHead, 0, sizeof(TqMetaList)); memset(pMeta->unpersistHead, 0, sizeof(STqMetaList));
pMeta->unpersistHead->unpersistNext pMeta->unpersistHead->unpersistNext
= pMeta->unpersistHead->unpersistPrev = pMeta->unpersistHead->unpersistPrev
= pMeta->unpersistHead; = pMeta->unpersistHead;
...@@ -149,11 +148,11 @@ TqMetaStore* tqStoreOpen(const char* path, ...@@ -149,11 +148,11 @@ TqMetaStore* tqStoreOpen(const char* path,
ASSERT(idxBuf.head.writeOffset == idxRead); ASSERT(idxBuf.head.writeOffset == idxRead);
//loop read every entry //loop read every entry
for(int i = 0; i < idxBuf.head.writeOffset - TQ_IDX_PAGE_HEAD_SIZE; i += TQ_IDX_SIZE) { for(int i = 0; i < idxBuf.head.writeOffset - TQ_IDX_PAGE_HEAD_SIZE; i += TQ_IDX_SIZE) {
TqMetaList *pNode = malloc(sizeof(TqMetaList)); STqMetaList *pNode = malloc(sizeof(STqMetaList));
if(pNode == NULL) { if(pNode == NULL) {
//TODO: free memory and return error //TODO: free memory and return error
} }
memset(pNode, 0, sizeof(TqMetaList)); memset(pNode, 0, sizeof(STqMetaList));
memcpy(&pNode->handle, &idxBuf.buffer[i], TQ_IDX_SIZE); memcpy(&pNode->handle, &idxBuf.buffer[i], TQ_IDX_SIZE);
lseek(fileFd, pNode->handle.offset, SEEK_SET); lseek(fileFd, pNode->handle.offset, SEEK_SET);
...@@ -199,7 +198,7 @@ TqMetaStore* tqStoreOpen(const char* path, ...@@ -199,7 +198,7 @@ TqMetaStore* tqStoreOpen(const char* path,
//put into list //put into list
int bucketKey = pNode->handle.key & TQ_BUCKET_MASK; int bucketKey = pNode->handle.key & TQ_BUCKET_MASK;
TqMetaList* pBucketNode = pMeta->bucket[bucketKey]; STqMetaList* pBucketNode = pMeta->bucket[bucketKey];
if(pBucketNode == NULL) { if(pBucketNode == NULL) {
pMeta->bucket[bucketKey] = pNode; pMeta->bucket[bucketKey] = pNode;
} else if(pBucketNode->handle.key == pNode->handle.key) { } else if(pBucketNode->handle.key == pNode->handle.key) {
...@@ -212,7 +211,7 @@ TqMetaStore* tqStoreOpen(const char* path, ...@@ -212,7 +211,7 @@ TqMetaStore* tqStoreOpen(const char* path,
} }
if(pBucketNode->next) { if(pBucketNode->next) {
ASSERT(pBucketNode->next->handle.key == pNode->handle.key); ASSERT(pBucketNode->next->handle.key == pNode->handle.key);
TqMetaList *pNodeFound = pBucketNode->next; STqMetaList *pNodeFound = pBucketNode->next;
pNode->next = pNodeFound->next; pNode->next = pNodeFound->next;
pBucketNode->next = pNode; pBucketNode->next = pNode;
pBucketNode = pNodeFound; pBucketNode = pNodeFound;
...@@ -239,7 +238,7 @@ TqMetaStore* tqStoreOpen(const char* path, ...@@ -239,7 +238,7 @@ TqMetaStore* tqStoreOpen(const char* path,
return pMeta; return pMeta;
} }
int32_t tqStoreClose(TqMetaStore* pMeta) { int32_t tqStoreClose(STqMetaStore* pMeta) {
//commit data and idx //commit data and idx
tqStorePersist(pMeta); tqStorePersist(pMeta);
ASSERT(pMeta->unpersistHead && pMeta->unpersistHead->next==NULL); ASSERT(pMeta->unpersistHead && pMeta->unpersistHead->next==NULL);
...@@ -247,7 +246,7 @@ int32_t tqStoreClose(TqMetaStore* pMeta) { ...@@ -247,7 +246,7 @@ int32_t tqStoreClose(TqMetaStore* pMeta) {
close(pMeta->idxFd); close(pMeta->idxFd);
//free memory //free memory
for(int i = 0; i < TQ_BUCKET_SIZE; i++) { for(int i = 0; i < TQ_BUCKET_SIZE; i++) {
TqMetaList* pNode = pMeta->bucket[i]; STqMetaList* pNode = pMeta->bucket[i];
while(pNode) { while(pNode) {
ASSERT(pNode->unpersistNext == NULL); ASSERT(pNode->unpersistNext == NULL);
ASSERT(pNode->unpersistPrev == NULL); ASSERT(pNode->unpersistPrev == NULL);
...@@ -259,7 +258,7 @@ int32_t tqStoreClose(TqMetaStore* pMeta) { ...@@ -259,7 +258,7 @@ int32_t tqStoreClose(TqMetaStore* pMeta) {
&& pNode->handle.valueInUse != TQ_DELETE_TOKEN) { && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
pMeta->pDeleter(pNode->handle.valueInUse); pMeta->pDeleter(pNode->handle.valueInUse);
} }
TqMetaList* next = pNode->next; STqMetaList* next = pNode->next;
free(pNode); free(pNode);
pNode = next; pNode = next;
} }
...@@ -270,12 +269,12 @@ int32_t tqStoreClose(TqMetaStore* pMeta) { ...@@ -270,12 +269,12 @@ int32_t tqStoreClose(TqMetaStore* pMeta) {
return 0; return 0;
} }
int32_t tqStoreDelete(TqMetaStore* pMeta) { int32_t tqStoreDelete(STqMetaStore* pMeta) {
close(pMeta->fileFd); close(pMeta->fileFd);
close(pMeta->idxFd); close(pMeta->idxFd);
//free memory //free memory
for(int i = 0; i < TQ_BUCKET_SIZE; i++) { for(int i = 0; i < TQ_BUCKET_SIZE; i++) {
TqMetaList* pNode = pMeta->bucket[i]; STqMetaList* pNode = pMeta->bucket[i];
pMeta->bucket[i] = NULL; pMeta->bucket[i] = NULL;
while(pNode) { while(pNode) {
if(pNode->handle.valueInTxn if(pNode->handle.valueInTxn
...@@ -286,7 +285,7 @@ int32_t tqStoreDelete(TqMetaStore* pMeta) { ...@@ -286,7 +285,7 @@ int32_t tqStoreDelete(TqMetaStore* pMeta) {
&& pNode->handle.valueInUse != TQ_DELETE_TOKEN) { && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
pMeta->pDeleter(pNode->handle.valueInUse); pMeta->pDeleter(pNode->handle.valueInUse);
} }
TqMetaList* next = pNode->next; STqMetaList* next = pNode->next;
free(pNode); free(pNode);
pNode = next; pNode = next;
} }
...@@ -299,11 +298,11 @@ int32_t tqStoreDelete(TqMetaStore* pMeta) { ...@@ -299,11 +298,11 @@ int32_t tqStoreDelete(TqMetaStore* pMeta) {
} }
//TODO: wrap in tfile //TODO: wrap in tfile
int32_t tqStorePersist(TqMetaStore* pMeta) { int32_t tqStorePersist(STqMetaStore* pMeta) {
TqIdxPageBuf idxBuf; TqIdxPageBuf idxBuf;
int64_t* bufPtr = (int64_t*)idxBuf.buffer; int64_t* bufPtr = (int64_t*)idxBuf.buffer;
TqMetaList *pHead = pMeta->unpersistHead; STqMetaList *pHead = pMeta->unpersistHead;
TqMetaList *pNode = pHead->unpersistNext; STqMetaList *pNode = pHead->unpersistNext;
TqSerializedHead *pSHead = malloc(sizeof(TqSerializedHead)); TqSerializedHead *pSHead = malloc(sizeof(TqSerializedHead));
if(pSHead == NULL) { if(pSHead == NULL) {
//TODO: memory error //TODO: memory error
...@@ -384,11 +383,11 @@ int32_t tqStorePersist(TqMetaStore* pMeta) { ...@@ -384,11 +383,11 @@ int32_t tqStorePersist(TqMetaStore* pMeta) {
pNode->handle.valueInTxn == NULL pNode->handle.valueInTxn == NULL
) { ) {
int bucketKey = pNode->handle.key & TQ_BUCKET_MASK; int bucketKey = pNode->handle.key & TQ_BUCKET_MASK;
TqMetaList* pBucketHead = pMeta->bucket[bucketKey]; STqMetaList* pBucketHead = pMeta->bucket[bucketKey];
if(pBucketHead == pNode) { if(pBucketHead == pNode) {
pMeta->bucket[bucketKey] = pNode->next; pMeta->bucket[bucketKey] = pNode->next;
} else { } else {
TqMetaList* pBucketNode = pBucketHead; STqMetaList* pBucketNode = pBucketHead;
while(pBucketNode->next != NULL while(pBucketNode->next != NULL
&& pBucketNode->next != pNode) { && pBucketNode->next != pNode) {
pBucketNode = pBucketNode->next; pBucketNode = pBucketNode->next;
...@@ -415,9 +414,9 @@ int32_t tqStorePersist(TqMetaStore* pMeta) { ...@@ -415,9 +414,9 @@ int32_t tqStorePersist(TqMetaStore* pMeta) {
return 0; return 0;
} }
static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value) { static int32_t tqHandlePutCommitted(STqMetaStore* pMeta, int64_t key, void* value) {
int64_t bucketKey = key & TQ_BUCKET_MASK; int64_t bucketKey = key & TQ_BUCKET_MASK;
TqMetaList* pNode = pMeta->bucket[bucketKey]; STqMetaList* pNode = pMeta->bucket[bucketKey];
while(pNode) { while(pNode) {
if(pNode->handle.key == key) { if(pNode->handle.key == key) {
//TODO: think about thread safety //TODO: think about thread safety
...@@ -432,12 +431,12 @@ static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value ...@@ -432,12 +431,12 @@ static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value
pNode = pNode->next; pNode = pNode->next;
} }
} }
TqMetaList *pNewNode = malloc(sizeof(TqMetaList)); STqMetaList *pNewNode = malloc(sizeof(STqMetaList));
if(pNewNode == NULL) { if(pNewNode == NULL) {
//TODO: memory error //TODO: memory error
return -1; return -1;
} }
memset(pNewNode, 0, sizeof(TqMetaList)); memset(pNewNode, 0, sizeof(STqMetaList));
pNewNode->handle.key = key; pNewNode->handle.key = key;
pNewNode->handle.valueInUse = value; pNewNode->handle.valueInUse = value;
//put into unpersist list //put into unpersist list
...@@ -448,9 +447,9 @@ static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value ...@@ -448,9 +447,9 @@ static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value
return 0; return 0;
} }
void* tqHandleGet(TqMetaStore* pMeta, int64_t key) { void* tqHandleGet(STqMetaStore* pMeta, int64_t key) {
int64_t bucketKey = key & TQ_BUCKET_MASK; int64_t bucketKey = key & TQ_BUCKET_MASK;
TqMetaList* pNode = pMeta->bucket[bucketKey]; STqMetaList* pNode = pMeta->bucket[bucketKey];
while(pNode) { while(pNode) {
if(pNode->handle.key == key) { if(pNode->handle.key == key) {
if(pNode->handle.valueInUse != NULL if(pNode->handle.valueInUse != NULL
...@@ -466,9 +465,9 @@ void* tqHandleGet(TqMetaStore* pMeta, int64_t key) { ...@@ -466,9 +465,9 @@ void* tqHandleGet(TqMetaStore* pMeta, int64_t key) {
return NULL; return NULL;
} }
void* tqHandleTouchGet(TqMetaStore* pMeta, int64_t key) { void* tqHandleTouchGet(STqMetaStore* pMeta, int64_t key) {
int64_t bucketKey = key & TQ_BUCKET_MASK; int64_t bucketKey = key & TQ_BUCKET_MASK;
TqMetaList* pNode = pMeta->bucket[bucketKey]; STqMetaList* pNode = pMeta->bucket[bucketKey];
while(pNode) { while(pNode) {
if(pNode->handle.key == key) { if(pNode->handle.key == key) {
if(pNode->handle.valueInUse != NULL if(pNode->handle.valueInUse != NULL
...@@ -485,9 +484,9 @@ void* tqHandleTouchGet(TqMetaStore* pMeta, int64_t key) { ...@@ -485,9 +484,9 @@ void* tqHandleTouchGet(TqMetaStore* pMeta, int64_t key) {
return NULL; return NULL;
} }
static inline int32_t tqHandlePutImpl(TqMetaStore* pMeta, int64_t key, void* value) { static inline int32_t tqHandlePutImpl(STqMetaStore* pMeta, int64_t key, void* value) {
int64_t bucketKey = key & TQ_BUCKET_MASK; int64_t bucketKey = key & TQ_BUCKET_MASK;
TqMetaList* pNode = pMeta->bucket[bucketKey]; STqMetaList* pNode = pMeta->bucket[bucketKey];
while(pNode) { while(pNode) {
if(pNode->handle.key == key) { if(pNode->handle.key == key) {
//TODO: think about thread safety //TODO: think about thread safety
...@@ -506,12 +505,12 @@ static inline int32_t tqHandlePutImpl(TqMetaStore* pMeta, int64_t key, void* val ...@@ -506,12 +505,12 @@ static inline int32_t tqHandlePutImpl(TqMetaStore* pMeta, int64_t key, void* val
pNode = pNode->next; pNode = pNode->next;
} }
} }
TqMetaList *pNewNode = malloc(sizeof(TqMetaList)); STqMetaList *pNewNode = malloc(sizeof(STqMetaList));
if(pNewNode == NULL) { if(pNewNode == NULL) {
//TODO: memory error //TODO: memory error
return -1; return -1;
} }
memset(pNewNode, 0, sizeof(TqMetaList)); memset(pNewNode, 0, sizeof(STqMetaList));
pNewNode->handle.key = key; pNewNode->handle.key = key;
pNewNode->handle.valueInTxn = value; pNewNode->handle.valueInTxn = value;
pNewNode->next = pMeta->bucket[bucketKey]; pNewNode->next = pMeta->bucket[bucketKey];
...@@ -520,11 +519,11 @@ static inline int32_t tqHandlePutImpl(TqMetaStore* pMeta, int64_t key, void* val ...@@ -520,11 +519,11 @@ static inline int32_t tqHandlePutImpl(TqMetaStore* pMeta, int64_t key, void* val
return 0; return 0;
} }
int32_t tqHandleMovePut(TqMetaStore* pMeta, int64_t key, void* value) { int32_t tqHandleMovePut(STqMetaStore* pMeta, int64_t key, void* value) {
return tqHandlePutImpl(pMeta, key, value); return tqHandlePutImpl(pMeta, key, value);
} }
int32_t tqHandleCopyPut(TqMetaStore* pMeta, int64_t key, void* value, size_t vsize) { int32_t tqHandleCopyPut(STqMetaStore* pMeta, int64_t key, void* value, size_t vsize) {
void *vmem = malloc(vsize); void *vmem = malloc(vsize);
if(vmem == NULL) { if(vmem == NULL) {
//TODO: memory error //TODO: memory error
...@@ -534,9 +533,9 @@ int32_t tqHandleCopyPut(TqMetaStore* pMeta, int64_t key, void* value, size_t vsi ...@@ -534,9 +533,9 @@ int32_t tqHandleCopyPut(TqMetaStore* pMeta, int64_t key, void* value, size_t vsi
return tqHandlePutImpl(pMeta, key, vmem); return tqHandlePutImpl(pMeta, key, vmem);
} }
static void* tqHandleGetUncommitted(TqMetaStore* pMeta, int64_t key) { static void* tqHandleGetUncommitted(STqMetaStore* pMeta, int64_t key) {
int64_t bucketKey = key & TQ_BUCKET_MASK; int64_t bucketKey = key & TQ_BUCKET_MASK;
TqMetaList* pNode = pMeta->bucket[bucketKey]; STqMetaList* pNode = pMeta->bucket[bucketKey];
while(pNode) { while(pNode) {
if(pNode->handle.key == key) { if(pNode->handle.key == key) {
if(pNode->handle.valueInTxn != NULL if(pNode->handle.valueInTxn != NULL
...@@ -552,9 +551,9 @@ static void* tqHandleGetUncommitted(TqMetaStore* pMeta, int64_t key) { ...@@ -552,9 +551,9 @@ static void* tqHandleGetUncommitted(TqMetaStore* pMeta, int64_t key) {
return NULL; return NULL;
} }
int32_t tqHandleCommit(TqMetaStore* pMeta, int64_t key) { int32_t tqHandleCommit(STqMetaStore* pMeta, int64_t key) {
int64_t bucketKey = key & TQ_BUCKET_MASK; int64_t bucketKey = key & TQ_BUCKET_MASK;
TqMetaList* pNode = pMeta->bucket[bucketKey]; STqMetaList* pNode = pMeta->bucket[bucketKey];
while(pNode) { while(pNode) {
if(pNode->handle.key == key) { if(pNode->handle.key == key) {
if(pNode->handle.valueInTxn == NULL) { if(pNode->handle.valueInTxn == NULL) {
...@@ -575,9 +574,9 @@ int32_t tqHandleCommit(TqMetaStore* pMeta, int64_t key) { ...@@ -575,9 +574,9 @@ int32_t tqHandleCommit(TqMetaStore* pMeta, int64_t key) {
return -2; return -2;
} }
int32_t tqHandleAbort(TqMetaStore* pMeta, int64_t key) { int32_t tqHandleAbort(STqMetaStore* pMeta, int64_t key) {
int64_t bucketKey = key & TQ_BUCKET_MASK; int64_t bucketKey = key & TQ_BUCKET_MASK;
TqMetaList* pNode = pMeta->bucket[bucketKey]; STqMetaList* pNode = pMeta->bucket[bucketKey];
while(pNode) { while(pNode) {
if(pNode->handle.key == key) { if(pNode->handle.key == key) {
if(pNode->handle.valueInTxn) { if(pNode->handle.valueInTxn) {
...@@ -596,9 +595,9 @@ int32_t tqHandleAbort(TqMetaStore* pMeta, int64_t key) { ...@@ -596,9 +595,9 @@ int32_t tqHandleAbort(TqMetaStore* pMeta, int64_t key) {
return -2; return -2;
} }
int32_t tqHandleDel(TqMetaStore* pMeta, int64_t key) { int32_t tqHandleDel(STqMetaStore* pMeta, int64_t key) {
int64_t bucketKey = key & TQ_BUCKET_MASK; int64_t bucketKey = key & TQ_BUCKET_MASK;
TqMetaList* pNode = pMeta->bucket[bucketKey]; STqMetaList* pNode = pMeta->bucket[bucketKey];
while(pNode) { while(pNode) {
if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
if(pNode->handle.valueInTxn) { if(pNode->handle.valueInTxn) {
...@@ -616,6 +615,6 @@ int32_t tqHandleDel(TqMetaStore* pMeta, int64_t key) { ...@@ -616,6 +615,6 @@ int32_t tqHandleDel(TqMetaStore* pMeta, int64_t key) {
} }
//TODO: clean deleted idx and data from persistent file //TODO: clean deleted idx and data from persistent file
int32_t tqStoreCompact(TqMetaStore *pMeta) { int32_t tqStoreCompact(STqMetaStore *pMeta) {
return 0; return 0;
} }
...@@ -105,6 +105,10 @@ static inline uint32_t walCalcBodyCksum(const void* body, uint32_t len) { ...@@ -105,6 +105,10 @@ static inline uint32_t walCalcBodyCksum(const void* body, uint32_t len) {
return taosCalcChecksum(0, (uint8_t*)body, len); return taosCalcChecksum(0, (uint8_t*)body, len);
} }
static inline int64_t walGetVerIdxOffset(SWal* pWal, int64_t ver) {
return (ver - walGetCurFileFirstVer(pWal)) * sizeof(WalIdxEntry);
}
static inline void walResetVer(SWalVer* pVer) { static inline void walResetVer(SWalVer* pVer) {
pVer->firstVer = -1; pVer->firstVer = -1;
pVer->verInSnapshotting = -1; pVer->verInSnapshotting = -1;
...@@ -117,6 +121,10 @@ int walLoadMeta(SWal* pWal); ...@@ -117,6 +121,10 @@ int walLoadMeta(SWal* pWal);
int walSaveMeta(SWal* pWal); int walSaveMeta(SWal* pWal);
int walRollFileInfo(SWal* pWal); int walRollFileInfo(SWal* pWal);
int walCheckAndRepairMeta(SWal* pWal);
int walCheckAndRepairIdx(SWal* pWal);
char* walMetaSerialize(SWal* pWal); char* walMetaSerialize(SWal* pWal);
int walMetaDeserialize(SWal* pWal, const char* bytes); int walMetaDeserialize(SWal* pWal, const char* bytes);
//meta section end //meta section end
......
...@@ -40,6 +40,47 @@ static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) { ...@@ -40,6 +40,47 @@ static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer); return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer);
} }
int walCheckAndRepairMeta(SWal* pWal) {
// load log files, get first/snapshot/last version info
const char* logPattern = "^[0-9]+.log$";
const char* idxPattern = "^[0-9]+.idx$";
regex_t logRegPattern;
regex_t idxRegPattern;
SArray* pLogArray = taosArrayInit(8, sizeof(int64_t));
regcomp(&logRegPattern, logPattern, REG_EXTENDED);
regcomp(&idxRegPattern, idxPattern, REG_EXTENDED);
DIR *dir = opendir(pWal->path);
if(dir == NULL) {
wError("vgId:%d, path:%s, failed to open since %s", pWal->cfg.vgId, pWal->path, strerror(errno));
return -1;
}
struct dirent* ent;
while((ent = readdir(dir)) != NULL) {
char *name = basename(ent->d_name);
int code = regexec(&logRegPattern, name, 0, NULL, 0);
if(code == 0) {
int64_t firstVer;
sscanf(name, "%" PRId64 ".log", &firstVer);
taosArrayPush(pLogArray, &firstVer);
}
}
// load meta
// if not match, or meta missing
// rebuild meta
return 0;
}
int walCheckAndRepairIdx(SWal* pWal) {
// iterate all idx files
// check first and last entry of each idx file valid
return 0;
}
int walRollFileInfo(SWal* pWal) { int walRollFileInfo(SWal* pWal) {
int64_t ts = taosGetTimestampSec(); int64_t ts = taosGetTimestampSec();
......
...@@ -90,6 +90,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { ...@@ -90,6 +90,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
} }
//open meta //open meta
walResetVer(&pWal->vers);
pWal->writeLogTfd = -1; pWal->writeLogTfd = -1;
pWal->writeIdxTfd = -1; pWal->writeIdxTfd = -1;
pWal->writeCur = -1; pWal->writeCur = -1;
...@@ -101,7 +102,6 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { ...@@ -101,7 +102,6 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
} }
//init status //init status
walResetVer(&pWal->vers);
pWal->totSize = 0; pWal->totSize = 0;
pWal->lastRollSeq = -1; pWal->lastRollSeq = -1;
...@@ -123,7 +123,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { ...@@ -123,7 +123,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
return NULL; return NULL;
} }
if(walLoadMeta(pWal) < 0) { if(walLoadMeta(pWal) < 0 && walCheckAndRepairMeta(pWal) < 0) {
taosRemoveRef(tsWal.refSetId, pWal->refId); taosRemoveRef(tsWal.refSetId, pWal->refId);
pthread_mutex_destroy(&pWal->mutex); pthread_mutex_destroy(&pWal->mutex);
taosArrayDestroy(pWal->fileInfoSet); taosArrayDestroy(pWal->fileInfoSet);
...@@ -131,6 +131,10 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { ...@@ -131,6 +131,10 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
return NULL; return NULL;
} }
if(walCheckAndRepairIdx(pWal) < 0) {
}
wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level, pWal->cfg.fsyncPeriod); wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level, pWal->cfg.fsyncPeriod);
return pWal; return pWal;
......
...@@ -27,20 +27,20 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) { ...@@ -27,20 +27,20 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) {
int64_t logTfd = pWal->writeLogTfd; int64_t logTfd = pWal->writeLogTfd;
//seek position //seek position
int64_t offset = (ver - walGetCurFileFirstVer(pWal)) * sizeof(WalIdxEntry); int64_t idxOff = walGetVerIdxOffset(pWal, ver);
code = tfLseek(idxTfd, offset, SEEK_SET); code = tfLseek(idxTfd, idxOff, SEEK_SET);
if(code != 0) { if(code != 0) {
return -1; return -1;
} }
int64_t readBuf[2]; WalIdxEntry entry;
code = tfRead(idxTfd, readBuf, sizeof(readBuf)); //TODO:deserialize
code = tfRead(idxTfd, &entry, sizeof(WalIdxEntry));
if(code != 0) { if(code != 0) {
return -1; return -1;
} }
//TODO:deserialize ASSERT(entry.ver == ver);
ASSERT(readBuf[0] == ver); code = tfLseek(logTfd, entry.offset, SEEK_CUR);
code = tfLseek(logTfd, readBuf[1], SEEK_CUR); if (code < 0) {
if (code != 0) {
return -1; return -1;
} }
return code; return code;
......
...@@ -68,13 +68,12 @@ int32_t walRollback(SWal *pWal, int64_t ver) { ...@@ -68,13 +68,12 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr); walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
int64_t idxTfd = tfOpenReadWrite(fnameStr); int64_t idxTfd = tfOpenReadWrite(fnameStr);
//change to deserialize function //TODO:change to deserialize function
if(idxTfd < 0) { if(idxTfd < 0) {
pthread_mutex_unlock(&pWal->mutex); pthread_mutex_unlock(&pWal->mutex);
return -1; return -1;
} }
int idxOff = (ver - walGetCurFileFirstVer(pWal)) * sizeof(WalIdxEntry); int64_t idxOff = walGetVerIdxOffset(pWal, ver);
code = tfLseek(idxTfd, idxOff, SEEK_SET); code = tfLseek(idxTfd, idxOff, SEEK_SET);
if(code < 0) { if(code < 0) {
pthread_mutex_unlock(&pWal->mutex); pthread_mutex_unlock(&pWal->mutex);
......
...@@ -5142,22 +5142,22 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* ...@@ -5142,22 +5142,22 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
assert(repeatTime > 0); assert(repeatTime > 0);
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
pInfo->pQueryHandle = pTsdbQueryHandle; pInfo->pQueryHandle = pTsdbQueryHandle;
pInfo->times = repeatTime; pInfo->times = repeatTime;
pInfo->reverseTimes = 0; pInfo->reverseTimes = 0;
pInfo->order = pRuntimeEnv->pQueryAttr->order.order; pInfo->order = pRuntimeEnv->pQueryAttr->order.order;
pInfo->current = 0; pInfo->current = 0;
// pInfo->prevGroupId = -1; // pInfo->prevGroupId = -1;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableScanOperator"; pOperator->name = "TableScanOperator";
pOperator->operatorType = OP_TableScan; pOperator->operatorType = OP_TableScan;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doTableScan; pOperator->exec = doTableScan;
return pOperator; return pOperator;
} }
...@@ -5174,14 +5174,14 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE ...@@ -5174,14 +5174,14 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE
pRuntimeEnv->enableGroupData = true; pRuntimeEnv->enableGroupData = true;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableSeqScanOperator"; pOperator->name = "TableSeqScanOperator";
pOperator->operatorType = OP_TableSeqScan; pOperator->operatorType = OP_TableSeqScan;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doTableScanImpl; pOperator->exec = doTableScanImpl;
return pOperator; return pOperator;
} }
...@@ -5199,14 +5199,14 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRu ...@@ -5199,14 +5199,14 @@ SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRu
taosArrayPush(pInfo->block.pDataBlock, &infoData); taosArrayPush(pInfo->block.pDataBlock, &infoData);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableBlockInfoScanOperator"; pOperator->name = "TableBlockInfoScanOperator";
pOperator->operatorType = OP_TableBlockInfoScan; pOperator->operatorType = OP_TableBlockInfoScan;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols;
pOperator->exec = doBlockInfoScan; pOperator->exec = doBlockInfoScan;
return pOperator; return pOperator;
} }
...@@ -5271,11 +5271,11 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime ...@@ -5271,11 +5271,11 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime
assert(repeatTime > 0); assert(repeatTime > 0);
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
pInfo->pQueryHandle = pTsdbQueryHandle; pInfo->pQueryHandle = pTsdbQueryHandle;
pInfo->times = repeatTime; pInfo->times = repeatTime;
pInfo->reverseTimes = reverseTime; pInfo->reverseTimes = reverseTime;
pInfo->current = 0; pInfo->current = 0;
pInfo->order = pRuntimeEnv->pQueryAttr->order.order; pInfo->order = pRuntimeEnv->pQueryAttr->order.order;
SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo));
pOptr->name = "DataBlocksOptimizedScanOperator"; pOptr->name = "DataBlocksOptimizedScanOperator";
...@@ -5429,14 +5429,14 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, ...@@ -5429,14 +5429,14 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed, MERGE_STAGE); setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed, MERGE_STAGE);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "GlobalAggregate"; pOperator->name = "GlobalAggregate";
pOperator->operatorType = OP_GlobalAggregate; pOperator->operatorType = OP_GlobalAggregate;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doGlobalAggregate; pOperator->exec = doGlobalAggregate;
pOperator->cleanup = destroyGlobalAggOperatorInfo; pOperator->cleanup = destroyGlobalAggOperatorInfo;
...@@ -5473,16 +5473,16 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx ...@@ -5473,16 +5473,16 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx
} }
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "MultiwaySortOperator"; pOperator->name = "MultiwaySortOperator";
pOperator->operatorType = OP_MultiwayMergeSort; pOperator->operatorType = OP_MultiwayMergeSort;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->exec = doMultiwayMergeSort; pOperator->exec = doMultiwayMergeSort;
pOperator->cleanup = destroyGlobalAggOperatorInfo; pOperator->cleanup = destroyGlobalAggOperatorInfo;
return pOperator; return pOperator;
} }
...@@ -6543,14 +6543,14 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera ...@@ -6543,14 +6543,14 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed, MASTER_SCAN); setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed, MASTER_SCAN);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableAggregate"; pOperator->name = "TableAggregate";
pOperator->operatorType = OP_Aggregate; pOperator->operatorType = OP_Aggregate;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doAggregate; pOperator->exec = doAggregate;
pOperator->cleanup = destroyAggOperatorInfo; pOperator->cleanup = destroyAggOperatorInfo;
...@@ -6638,14 +6638,14 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO ...@@ -6638,14 +6638,14 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup, TSDB_DATA_TYPE_INT); initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup, TSDB_DATA_TYPE_INT);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "MultiTableAggregate"; pOperator->name = "MultiTableAggregate";
pOperator->operatorType = OP_MultiTableAggregate; pOperator->operatorType = OP_MultiTableAggregate;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doSTableAggregate; pOperator->exec = doSTableAggregate;
pOperator->cleanup = destroyAggOperatorInfo; pOperator->cleanup = destroyAggOperatorInfo;
...@@ -6668,14 +6668,14 @@ SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato ...@@ -6668,14 +6668,14 @@ SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed, MASTER_SCAN); setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed, MASTER_SCAN);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "ProjectOperator"; pOperator->name = "ProjectOperator";
pOperator->operatorType = OP_Project; pOperator->operatorType = OP_Project;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doProjectOperation; pOperator->exec = doProjectOperation;
pOperator->cleanup = destroyProjectOperatorInfo; pOperator->cleanup = destroyProjectOperatorInfo;
...@@ -6920,16 +6920,16 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato ...@@ -6920,16 +6920,16 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "GroupbyAggOperator"; pOperator->name = "GroupbyAggOperator";
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->operatorType = OP_Groupby; pOperator->operatorType = OP_Groupby;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = hashGroupbyAggregate; pOperator->exec = hashGroupbyAggregate;
pOperator->cleanup = destroyGroupbyOperatorInfo; pOperator->cleanup = destroyGroupbyOperatorInfo;
appendUpstream(pOperator, upstream); appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
...@@ -7163,16 +7163,16 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf ...@@ -7163,16 +7163,16 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf
pInfo->curPos = 0; pInfo->curPos = 0;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "SeqTableTagScan"; pOperator->name = "SeqTableTagScan";
pOperator->operatorType = OP_TagScan; pOperator->operatorType = OP_TagScan;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->exec = doTagScan; pOperator->exec = doTagScan;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->cleanup = destroyTagScanOperatorInfo; pOperator->cleanup = destroyTagScanOperatorInfo;
return pOperator; return pOperator;
} }
...@@ -7302,17 +7302,17 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat ...@@ -7302,17 +7302,17 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "DistinctOperator"; pOperator->name = "DistinctOperator";
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->operatorType = OP_Distinct; pOperator->operatorType = OP_Distinct;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = hashDistinct; pOperator->exec = hashDistinct;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->cleanup = destroyDistinctOperatorInfo; pOperator->cleanup = destroyDistinctOperatorInfo;
appendUpstream(pOperator, upstream); appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
...@@ -7587,20 +7587,20 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { ...@@ -7587,20 +7587,20 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) { for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) {
param->pExpr[i] = pExprMsg; param->pExpr[i] = pExprMsg;
pExprMsg->colInfo.colIndex = htons(pExprMsg->colInfo.colIndex); pExprMsg->colInfo.colIndex = htons(pExprMsg->colInfo.colIndex);
pExprMsg->colInfo.colId = htons(pExprMsg->colInfo.colId); pExprMsg->colInfo.colId = htons(pExprMsg->colInfo.colId);
pExprMsg->colInfo.flag = htons(pExprMsg->colInfo.flag); pExprMsg->colInfo.flag = htons(pExprMsg->colInfo.flag);
pExprMsg->colBytes = htons(pExprMsg->colBytes); pExprMsg->colBytes = htons(pExprMsg->colBytes);
pExprMsg->colType = htons(pExprMsg->colType); pExprMsg->colType = htons(pExprMsg->colType);
pExprMsg->resType = htons(pExprMsg->resType); pExprMsg->resType = htons(pExprMsg->resType);
pExprMsg->resBytes = htons(pExprMsg->resBytes); pExprMsg->resBytes = htons(pExprMsg->resBytes);
pExprMsg->interBytes = htonl(pExprMsg->interBytes); pExprMsg->interBytes = htonl(pExprMsg->interBytes);
pExprMsg->functionId = htons(pExprMsg->functionId); pExprMsg->functionId = htons(pExprMsg->functionId);
pExprMsg->numOfParams = htons(pExprMsg->numOfParams); pExprMsg->numOfParams = htons(pExprMsg->numOfParams);
pExprMsg->resColId = htons(pExprMsg->resColId); pExprMsg->resColId = htons(pExprMsg->resColId);
pExprMsg->flist.numOfFilters = htons(pExprMsg->flist.numOfFilters); pExprMsg->flist.numOfFilters = htons(pExprMsg->flist.numOfFilters);
pMsg += sizeof(SSqlExpr); pMsg += sizeof(SSqlExpr);
for (int32_t j = 0; j < pExprMsg->numOfParams; ++j) { for (int32_t j = 0; j < pExprMsg->numOfParams; ++j) {
...@@ -7639,15 +7639,15 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { ...@@ -7639,15 +7639,15 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
param->pSecExpr[i] = pExprMsg; param->pSecExpr[i] = pExprMsg;
pExprMsg->colInfo.colIndex = htons(pExprMsg->colInfo.colIndex); pExprMsg->colInfo.colIndex = htons(pExprMsg->colInfo.colIndex);
pExprMsg->colInfo.colId = htons(pExprMsg->colInfo.colId); pExprMsg->colInfo.colId = htons(pExprMsg->colInfo.colId);
pExprMsg->colInfo.flag = htons(pExprMsg->colInfo.flag); pExprMsg->colInfo.flag = htons(pExprMsg->colInfo.flag);
pExprMsg->resType = htons(pExprMsg->resType); pExprMsg->resType = htons(pExprMsg->resType);
pExprMsg->resBytes = htons(pExprMsg->resBytes); pExprMsg->resBytes = htons(pExprMsg->resBytes);
pExprMsg->colBytes = htons(pExprMsg->colBytes); pExprMsg->colBytes = htons(pExprMsg->colBytes);
pExprMsg->colType = htons(pExprMsg->colType); pExprMsg->colType = htons(pExprMsg->colType);
pExprMsg->functionId = htons(pExprMsg->functionId); pExprMsg->functionId = htons(pExprMsg->functionId);
pExprMsg->numOfParams = htons(pExprMsg->numOfParams); pExprMsg->numOfParams = htons(pExprMsg->numOfParams);
pMsg += sizeof(SSqlExpr); pMsg += sizeof(SSqlExpr);
...@@ -8422,40 +8422,40 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S ...@@ -8422,40 +8422,40 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S
SQueryAttr* pQueryAttr = &pQInfo->query; SQueryAttr* pQueryAttr = &pQInfo->query;
pQInfo->runtimeEnv.pQueryAttr = pQueryAttr; pQInfo->runtimeEnv.pQueryAttr = pQueryAttr;
pQueryAttr->tableGroupInfo = *pTableGroupInfo; pQueryAttr->tableGroupInfo = *pTableGroupInfo;
pQueryAttr->numOfCols = numOfCols; pQueryAttr->numOfCols = numOfCols;
pQueryAttr->numOfOutput = numOfOutput; pQueryAttr->numOfOutput = numOfOutput;
pQueryAttr->limit.limit = pQueryMsg->limit; pQueryAttr->limit.limit = pQueryMsg->limit;
pQueryAttr->limit.offset = pQueryMsg->offset; pQueryAttr->limit.offset = pQueryMsg->offset;
pQueryAttr->order.order = pQueryMsg->order; pQueryAttr->order.order = pQueryMsg->order;
pQueryAttr->order.orderColId = pQueryMsg->orderColId; pQueryAttr->order.orderColId = pQueryMsg->orderColId;
pQueryAttr->pExpr1 = pExprs; pQueryAttr->pExpr1 = pExprs;
pQueryAttr->pExpr2 = pSecExprs; pQueryAttr->pExpr2 = pSecExprs;
pQueryAttr->numOfExpr2 = pQueryMsg->secondStageOutput; pQueryAttr->numOfExpr2 = pQueryMsg->secondStageOutput;
pQueryAttr->pGroupbyExpr = pGroupbyExpr; pQueryAttr->pGroupbyExpr = pGroupbyExpr;
memcpy(&pQueryAttr->interval, &pQueryMsg->interval, sizeof(pQueryAttr->interval)); memcpy(&pQueryAttr->interval, &pQueryMsg->interval, sizeof(pQueryAttr->interval));
pQueryAttr->fillType = pQueryMsg->fillType; pQueryAttr->fillType = pQueryMsg->fillType;
pQueryAttr->numOfTags = pQueryMsg->numOfTags; pQueryAttr->numOfTags = pQueryMsg->numOfTags;
pQueryAttr->tagColList = pTagCols; pQueryAttr->tagColList = pTagCols;
pQueryAttr->prjInfo.vgroupLimit = pQueryMsg->vgroupLimit; pQueryAttr->prjInfo.vgroupLimit = pQueryMsg->vgroupLimit;
pQueryAttr->prjInfo.ts = (pQueryMsg->order == TSDB_ORDER_ASC)? INT64_MIN:INT64_MAX; pQueryAttr->prjInfo.ts = (pQueryMsg->order == TSDB_ORDER_ASC)? INT64_MIN:INT64_MAX;
pQueryAttr->sw = pQueryMsg->sw; pQueryAttr->sw = pQueryMsg->sw;
pQueryAttr->vgId = vgId; pQueryAttr->vgId = vgId;
pQueryAttr->stableQuery = pQueryMsg->stableQuery; pQueryAttr->stableQuery = pQueryMsg->stableQuery;
pQueryAttr->topBotQuery = pQueryMsg->topBotQuery; pQueryAttr->topBotQuery = pQueryMsg->topBotQuery;
pQueryAttr->groupbyColumn = pQueryMsg->groupbyColumn; pQueryAttr->groupbyColumn = pQueryMsg->groupbyColumn;
pQueryAttr->hasTagResults = pQueryMsg->hasTagResults; pQueryAttr->hasTagResults = pQueryMsg->hasTagResults;
pQueryAttr->timeWindowInterpo = pQueryMsg->timeWindowInterpo; pQueryAttr->timeWindowInterpo = pQueryMsg->timeWindowInterpo;
pQueryAttr->queryBlockDist = pQueryMsg->queryBlockDist; pQueryAttr->queryBlockDist = pQueryMsg->queryBlockDist;
pQueryAttr->stabledev = pQueryMsg->stabledev; pQueryAttr->stabledev = pQueryMsg->stabledev;
pQueryAttr->tsCompQuery = pQueryMsg->tsCompQuery; pQueryAttr->tsCompQuery = pQueryMsg->tsCompQuery;
pQueryAttr->simpleAgg = pQueryMsg->simpleAgg; pQueryAttr->simpleAgg = pQueryMsg->simpleAgg;
pQueryAttr->pointInterpQuery = pQueryMsg->pointInterpQuery; pQueryAttr->pointInterpQuery = pQueryMsg->pointInterpQuery;
pQueryAttr->needReverseScan = pQueryMsg->needReverseScan; pQueryAttr->needReverseScan = pQueryMsg->needReverseScan;
pQueryAttr->stateWindow = pQueryMsg->stateWindow; pQueryAttr->stateWindow = pQueryMsg->stateWindow;
pQueryAttr->vgId = vgId; pQueryAttr->vgId = vgId;
pQueryAttr->pFilters = pFilters; pQueryAttr->pFilters = pFilters;
pQueryAttr->tableCols = calloc(numOfCols, sizeof(SSingleColumnFilterInfo)); pQueryAttr->tableCols = calloc(numOfCols, sizeof(SSingleColumnFilterInfo));
if (pQueryAttr->tableCols == NULL) { if (pQueryAttr->tableCols == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册