提交 7c2048e7 编写于 作者: L Liu Jicong

implement tqOpen and tqPersist

上级 f5992851
...@@ -19,9 +19,6 @@ ...@@ -19,9 +19,6 @@
#include "os.h" #include "os.h"
#include "tutil.h" #include "tutil.h"
#define TQ_ACTION_INSERT 0x7f7f7f7fULL
#define TQ_ACTION_DELETE 0x80808080ULL
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
......
...@@ -37,6 +37,24 @@ inline static int TqEmptyTail() { //16 ...@@ -37,6 +37,24 @@ inline static int TqEmptyTail() { //16
return TQ_PAGE_SIZE - TqMaxEntryOnePage(); return TQ_PAGE_SIZE - TqMaxEntryOnePage();
} }
#define TQ_ACTION_CONST 0
#define TQ_ACTION_INUSE 1
#define TQ_ACTION_INUSE_CONT 2
#define TQ_ACTION_INTXN 3
#define TQ_SVER 0
static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST;
#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE
typedef struct TqSerializedHead {
int16_t ver;
int16_t action;
int32_t checksum;
int64_t ssize;
char content[];
} TqSerializedHead;
typedef struct TqMetaHandle { typedef struct TqMetaHandle {
int64_t key; int64_t key;
int64_t offset; int64_t offset;
...@@ -61,14 +79,14 @@ typedef struct TqMetaStore { ...@@ -61,14 +79,14 @@ typedef struct TqMetaStore {
int fileFd; //TODO:temporaral use, to be replaced by unified tfile int fileFd; //TODO:temporaral use, to be replaced by unified tfile
int idxFd; //TODO:temporaral use, to be replaced by unified tfile int idxFd; //TODO:temporaral use, to be replaced by unified tfile
char* dirPath; char* dirPath;
int (*serializer)(const void* pObj, void** ppBytes); int (*serializer)(const void* pObj, TqSerializedHead** ppHead);
const void* (*deserializer)(const void* pBytes, void** ppObj); const void* (*deserializer)(const TqSerializedHead* pHead, void** ppObj);
void (*deleter)(void*); void (*deleter)(void*);
} TqMetaStore; } TqMetaStore;
TqMetaStore* tqStoreOpen(const char* path, TqMetaStore* tqStoreOpen(const char* path,
int serializer(const void* pObj, void** ppBytes), int serializer(const void* pObj, TqSerializedHead** ppHead),
const void* deserializer(const void* pBytes, void** ppObj), const void* deserializer(const TqSerializedHead* pHead, void** ppObj),
void deleter(void* pObj)); void deleter(void* pObj));
int32_t tqStoreClose(TqMetaStore*); int32_t tqStoreClose(TqMetaStore*);
//int32_t tqStoreDelete(TqMetaStore*); //int32_t tqStoreDelete(TqMetaStore*);
...@@ -82,7 +100,8 @@ int32_t tqHandleCopyPut(TqMetaStore*, int64_t key, void* value, size_t vsize); ...@@ -82,7 +100,8 @@ int32_t tqHandleCopyPut(TqMetaStore*, int64_t key, void* value, size_t vsize);
int32_t tqHandleCommit(TqMetaStore*, int64_t key); int32_t tqHandleCommit(TqMetaStore*, int64_t key);
//delete uncommitted //delete uncommitted
int32_t tqHandleAbort(TqMetaStore*, int64_t key); int32_t tqHandleAbort(TqMetaStore*, int64_t key);
//delete committed //delete committed kv pair
//notice that a delete action still needs to be committed
int32_t tqHandleDel(TqMetaStore*, int64_t key); int32_t tqHandleDel(TqMetaStore*, int64_t key);
//delete both committed and uncommitted //delete both committed and uncommitted
int32_t tqHandleClear(TqMetaStore*, int64_t key); int32_t tqHandleClear(TqMetaStore*, int64_t key);
......
...@@ -26,14 +26,24 @@ ...@@ -26,14 +26,24 @@
static int32_t tqHandlePutCommitted(TqMetaStore*, int64_t key, void* value); static int32_t tqHandlePutCommitted(TqMetaStore*, int64_t key, void* value);
static void* tqHandleGetUncommitted(TqMetaStore*, int64_t key); static void* tqHandleGetUncommitted(TqMetaStore*, int64_t key);
static inline void tqLinkUnpersist(TqMetaStore *pMeta, TqMetaList* pNode) {
if(pNode->unpersistNext == NULL) {
pNode->unpersistNext = pMeta->unpersistHead->unpersistNext;
pNode->unpersistPrev = pMeta->unpersistHead;
pMeta->unpersistHead->unpersistNext->unpersistPrev = pNode;
pMeta->unpersistHead->unpersistNext = pNode;
}
}
typedef struct TqMetaPageBuf { typedef struct TqMetaPageBuf {
int16_t offset; int16_t offset;
char buffer[TQ_PAGE_SIZE]; char buffer[TQ_PAGE_SIZE];
} TqMetaPageBuf; } TqMetaPageBuf;
TqMetaStore* tqStoreOpen(const char* path, TqMetaStore* tqStoreOpen(const char* path,
int serializer(const void* pObj, void** ppBytes), int serializer(const void* pObj, TqSerializedHead** ppHead),
const void* deserializer(const void* pBytes, void** ppObj), const void* deserializer(const TqSerializedHead* pHead, void** ppObj),
void deleter(void* pObj)) { void deleter(void* pObj)) {
TqMetaStore* pMeta = malloc(sizeof(TqMetaStore)); TqMetaStore* pMeta = malloc(sizeof(TqMetaStore));
if(pMeta == NULL) { if(pMeta == NULL) {
...@@ -94,12 +104,12 @@ TqMetaStore* tqStoreOpen(const char* path, ...@@ -94,12 +104,12 @@ TqMetaStore* tqStoreOpen(const char* path,
//read idx file and load into memory //read idx file and load into memory
char idxBuf[TQ_PAGE_SIZE]; char idxBuf[TQ_PAGE_SIZE];
char* dataBuf = malloc(TQ_PAGE_SIZE); TqSerializedHead* serializedObj = malloc(TQ_PAGE_SIZE);
if(dataBuf == NULL) { if(serializedObj == NULL) {
//TODO:memory insufficient //TODO:memory insufficient
} }
int dataBufSize = TQ_PAGE_SIZE; int idxRead;
int idxRead, dataReadSize; int allocated = TQ_PAGE_SIZE;
while((idxRead = read(idxFd, idxBuf, TQ_PAGE_SIZE))) { while((idxRead = read(idxFd, idxBuf, TQ_PAGE_SIZE))) {
if(idxRead == -1) { if(idxRead == -1) {
//TODO: handle error //TODO: handle error
...@@ -114,26 +124,82 @@ TqMetaStore* tqStoreOpen(const char* path, ...@@ -114,26 +124,82 @@ TqMetaStore* tqStoreOpen(const char* path,
memset(pNode, 0, sizeof(TqMetaList)); memset(pNode, 0, sizeof(TqMetaList));
memcpy(&pNode->handle, &idxBuf[i], TQ_IDX_ENTRY_SIZE); memcpy(&pNode->handle, &idxBuf[i], TQ_IDX_ENTRY_SIZE);
lseek(fileFd, pNode->handle.offset, SEEK_CUR); lseek(fileFd, pNode->handle.offset, SEEK_CUR);
if(dataBufSize < pNode->handle.serializedSize) { if(allocated < pNode->handle.serializedSize) {
void *ptr = realloc(dataBuf, pNode->handle.serializedSize); void *ptr = realloc(serializedObj, pNode->handle.serializedSize);
if(ptr == NULL) { if(ptr == NULL) {
//TODO: memory insufficient //TODO: memory insufficient
} }
dataBuf = ptr; serializedObj = ptr;
dataBufSize = pNode->handle.serializedSize; allocated = pNode->handle.serializedSize;
} }
if(read(fileFd, dataBuf, pNode->handle.serializedSize) != pNode->handle.serializedSize) { serializedObj->ssize = pNode->handle.serializedSize;
if(read(fileFd, serializedObj, pNode->handle.serializedSize) != pNode->handle.serializedSize) {
//TODO: read error //TODO: read error
} }
pMeta->deserializer(dataBuf, &pNode->handle.valueInUse); if(serializedObj->action == TQ_ACTION_INUSE) {
if(serializedObj->ssize != sizeof(TqSerializedHead)) {
pMeta->deserializer(serializedObj, &pNode->handle.valueInUse);
} else {
pNode->handle.valueInUse = TQ_DELETE_TOKEN;
}
} else if(serializedObj->action == TQ_ACTION_INTXN) {
if(serializedObj->ssize != sizeof(TqSerializedHead)) {
pMeta->deserializer(serializedObj, &pNode->handle.valueInTxn);
} else {
pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
}
} else if(serializedObj->action == TQ_ACTION_INUSE_CONT) {
if(serializedObj->ssize != sizeof(TqSerializedHead)) {
pMeta->deserializer(serializedObj, &pNode->handle.valueInUse);
} else {
pNode->handle.valueInUse = TQ_DELETE_TOKEN;
}
serializedObj = POINTER_SHIFT(serializedObj, serializedObj->ssize);
if(serializedObj->ssize != sizeof(TqSerializedHead)) {
pMeta->deserializer(serializedObj, &pNode->handle.valueInTxn);
} else {
pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
}
} else {
ASSERT(0);
}
//put into list //put into list
int bucketKey = pNode->handle.key & TQ_BUCKET_SIZE; int bucketKey = pNode->handle.key & TQ_BUCKET_SIZE;
pNode->next = pMeta->bucket[bucketKey]; TqMetaList* pBucketNode = pMeta->bucket[bucketKey];
pMeta->bucket[bucketKey] = pNode; if(pBucketNode == NULL) {
pMeta->bucket[bucketKey] = pNode;
} else if(pBucketNode->handle.key == pNode->handle.key) {
pNode->next = pBucketNode->next;
pMeta->bucket[bucketKey] = pNode;
} else {
while(pBucketNode->next &&
pBucketNode->next->handle.key == pNode->handle.key) {
pBucketNode = pBucketNode->next;
}
if(pBucketNode->next) {
ASSERT(pBucketNode->next->handle.key == pNode->handle.key);
TqMetaList *pNodeTmp = pBucketNode->next;
pBucketNode->next = pNodeTmp->next;
pBucketNode = pNodeTmp;
} else {
pBucketNode = NULL;
}
}
if(pBucketNode) {
if(pBucketNode->handle.valueInUse
&& pBucketNode->handle.valueInUse != TQ_DELETE_TOKEN) {
pMeta->deleter(pBucketNode->handle.valueInUse);
}
if(pBucketNode->handle.valueInTxn
&& pBucketNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
pMeta->deleter(pBucketNode->handle.valueInTxn);
}
free(pBucketNode);
}
} }
} }
free(dataBuf); free(serializedObj);
return pMeta; return pMeta;
} }
...@@ -146,14 +212,15 @@ int32_t tqStoreClose(TqMetaStore* pMeta) { ...@@ -146,14 +212,15 @@ int32_t tqStoreClose(TqMetaStore* pMeta) {
//free memory //free memory
for(int i = 0; i < TQ_BUCKET_SIZE; i++) { for(int i = 0; i < TQ_BUCKET_SIZE; i++) {
TqMetaList* pNode = pMeta->bucket[i]; TqMetaList* pNode = pMeta->bucket[i];
pMeta->bucket[i] = NULL;
while(pNode) { while(pNode) {
ASSERT(pNode->unpersistNext == NULL); ASSERT(pNode->unpersistNext == NULL);
ASSERT(pNode->unpersistPrev == NULL); ASSERT(pNode->unpersistPrev == NULL);
if(pNode->handle.valueInTxn) { if(pNode->handle.valueInTxn
&& pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
pMeta->deleter(pNode->handle.valueInTxn); pMeta->deleter(pNode->handle.valueInTxn);
} }
if(pNode->handle.valueInUse) { if(pNode->handle.valueInUse
&& pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
pMeta->deleter(pNode->handle.valueInUse); pMeta->deleter(pNode->handle.valueInUse);
} }
TqMetaList* next = pNode->next; TqMetaList* next = pNode->next;
...@@ -175,10 +242,12 @@ int32_t tqStoreDelete(TqMetaStore* pMeta) { ...@@ -175,10 +242,12 @@ int32_t tqStoreDelete(TqMetaStore* pMeta) {
TqMetaList* pNode = pMeta->bucket[i]; TqMetaList* pNode = pMeta->bucket[i];
pMeta->bucket[i] = NULL; pMeta->bucket[i] = NULL;
while(pNode) { while(pNode) {
if(pNode->handle.valueInTxn) { if(pNode->handle.valueInTxn
&& pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
pMeta->deleter(pNode->handle.valueInTxn); pMeta->deleter(pNode->handle.valueInTxn);
} }
if(pNode->handle.valueInUse) { if(pNode->handle.valueInUse
&& pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
pMeta->deleter(pNode->handle.valueInUse); pMeta->deleter(pNode->handle.valueInUse);
} }
TqMetaList* next = pNode->next; TqMetaList* next = pNode->next;
...@@ -199,73 +268,89 @@ int32_t tqStorePersist(TqMetaStore* pMeta) { ...@@ -199,73 +268,89 @@ int32_t tqStorePersist(TqMetaStore* pMeta) {
int64_t* bufPtr = (int64_t*)writeBuf; int64_t* bufPtr = (int64_t*)writeBuf;
TqMetaList *pHead = pMeta->unpersistHead; TqMetaList *pHead = pMeta->unpersistHead;
TqMetaList *pNode = pHead->unpersistNext; TqMetaList *pNode = pHead->unpersistNext;
TqSerializedHead *pSHead = malloc(sizeof(TqSerializedHead));
if(pSHead == NULL) {
//TODO: memory error
return -1;
}
pSHead->ver = TQ_SVER;
pSHead->checksum = 0;
pSHead->ssize = sizeof(TqSerializedHead);
int allocatedSize = sizeof(TqSerializedHead);
int offset = lseek(pMeta->fileFd, 0, SEEK_CUR);
while(pHead != pNode) { while(pHead != pNode) {
if(pNode->handle.valueInUse == NULL) { int nBytes = 0;
//put delete token in data file
uint32_t delete = TQ_ACTION_DELETE; if(pNode->handle.valueInUse) {
int nBytes = write(pMeta->fileFd, &delete, sizeof(uint32_t)); if(pNode->handle.valueInTxn) {
ASSERT(nBytes == sizeof(uint32_t)); pSHead->action = TQ_ACTION_INUSE_CONT;
} else {
pSHead->action = TQ_ACTION_INUSE;
}
if(pNode->handle.valueInUse == TQ_DELETE_TOKEN) {
pSHead->ssize = sizeof(TqSerializedHead);
} else {
pMeta->serializer(pNode->handle.valueInUse, &pSHead);
}
nBytes = write(pMeta->fileFd, pSHead, pSHead->ssize);
ASSERT(nBytes == pSHead->ssize);
}
if(pNode->handle.valueInTxn) {
pSHead->action = TQ_ACTION_INTXN;
if(pNode->handle.valueInTxn == TQ_DELETE_TOKEN) {
pSHead->ssize = sizeof(TqSerializedHead);
} else {
pMeta->serializer(pNode->handle.valueInTxn, &pSHead);
}
int nBytesTxn = write(pMeta->fileFd, pSHead, pSHead->ssize);
ASSERT(nBytesTxn == pSHead->ssize);
nBytes += nBytesTxn;
}
//remove from list //write idx file
//TODO: endian check and convert
*(bufPtr++) = pNode->handle.key;
*(bufPtr++) = pNode->handle.offset;
*(bufPtr++) = (int64_t)nBytes;
if((char*)(bufPtr + 3) > writeBuf + TQ_PAGE_SIZE) {
nBytes = write(pMeta->idxFd, writeBuf, sizeof(writeBuf));
//TODO: handle error with tfile
ASSERT(nBytes == sizeof(writeBuf));
memset(writeBuf, 0, TQ_PAGE_SIZE);
bufPtr = (int64_t*)writeBuf;
}
//remove from unpersist list
pHead->unpersistNext = pNode->unpersistNext;
pHead->unpersistNext->unpersistPrev = pHead;
pNode->unpersistPrev = pNode->unpersistNext = NULL;
pNode = pHead->unpersistNext;
//remove from bucket
if(pNode->handle.valueInUse == TQ_DELETE_TOKEN &&
pNode->handle.valueInTxn == NULL
) {
int bucketKey = pNode->handle.key & TQ_BUCKET_SIZE; int bucketKey = pNode->handle.key & TQ_BUCKET_SIZE;
TqMetaList* pBucketHead = pMeta->bucket[bucketKey]; TqMetaList* pBucketHead = pMeta->bucket[bucketKey];
if(pBucketHead == pNode) { if(pBucketHead == pNode) {
pMeta->bucket[bucketKey] = pBucketHead->next; pMeta->bucket[bucketKey] = pNode->next;
} else { } else {
TqMetaList* pBucketNode = pBucketHead; TqMetaList* pBucketNode = pBucketHead;
while(pBucketNode->next != NULL while(pBucketNode->next != NULL
&& pBucketNode->next != pNode) { && pBucketNode->next != pNode) {
pBucketNode = pBucketNode->next; pBucketNode = pBucketNode->next;
} }
if(pBucketNode->next != NULL) { //impossible for pBucket->next == NULL
ASSERT(pBucketNode->next == pNode); ASSERT(pBucketNode->next == pNode);
pBucketNode->next = pNode->next; pBucketNode->next = pNode->next;
if(pNode->handle.valueInUse) {
pMeta->deleter(pNode->handle.valueInUse);
pNode->handle.valueInUse = NULL;
}
free(pNode);
}
}
} else {
//TODO: do not allocate each time
//serialize
void* pBytes = NULL;
int sz = pMeta->serializer(pNode->handle.valueInUse, &pBytes);
ASSERT(pBytes != NULL);
//get current offset
//append data
int64_t offset = lseek(pMeta->fileFd, 0, SEEK_CUR);
int nBytes = write(pMeta->fileFd, pBytes, sz);
free(pBytes);
//TODO: handle error in tfile
ASSERT(nBytes == sz);
pNode->handle.offset = offset;
pNode->handle.serializedSize = sz;
//write idx
//TODO: endian check and convert
*(bufPtr++) = pNode->handle.key;
*(bufPtr++) = pNode->handle.offset;
*(bufPtr++) = (int64_t)sz;
if((char*)(bufPtr + 3) > writeBuf + TQ_PAGE_SIZE) {
nBytes = write(pMeta->idxFd, writeBuf, sizeof(writeBuf));
//TODO: handle error in tfile
ASSERT(nBytes == sizeof(writeBuf));
memset(writeBuf, 0, TQ_PAGE_SIZE);
bufPtr = (int64_t*)writeBuf;
} }
free(pNode);
} }
//remove from unpersist list
pHead->unpersistNext = pNode->unpersistNext;
pHead->unpersistNext->unpersistPrev = pHead;
pNode->unpersistPrev = pNode->unpersistNext = NULL;
pNode = pHead->unpersistNext;
} }
//write left bytes //write left bytes
free(pSHead);
if((char*)bufPtr != writeBuf) { if((char*)bufPtr != writeBuf) {
int used = (char*)bufPtr - writeBuf; int used = (char*)bufPtr - writeBuf;
int nBytes = write(pMeta->idxFd, writeBuf, used); int nBytes = write(pMeta->idxFd, writeBuf, used);
...@@ -284,7 +369,8 @@ static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value ...@@ -284,7 +369,8 @@ static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value
while(pNode) { while(pNode) {
if(pNode->handle.key == key) { if(pNode->handle.key == key) {
//TODO: think about thread safety //TODO: think about thread safety
if(pNode->handle.valueInUse) { if(pNode->handle.valueInUse
&& pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
pMeta->deleter(pNode->handle.valueInUse); pMeta->deleter(pNode->handle.valueInUse);
} }
//change pointer ownership //change pointer ownership
...@@ -333,11 +419,13 @@ int32_t tqHandleMovePut(TqMetaStore* pMeta, int64_t key, void* value) { ...@@ -333,11 +419,13 @@ int32_t tqHandleMovePut(TqMetaStore* pMeta, int64_t key, void* value) {
while(pNode) { while(pNode) {
if(pNode->handle.key == key) { if(pNode->handle.key == key) {
//TODO: think about thread safety //TODO: think about thread safety
if(pNode->handle.valueInTxn) { if(pNode->handle.valueInTxn
&& pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
pMeta->deleter(pNode->handle.valueInTxn); pMeta->deleter(pNode->handle.valueInTxn);
} }
//change pointer ownership //change pointer ownership
pNode->handle.valueInTxn = value; pNode->handle.valueInTxn = value;
tqLinkUnpersist(pMeta, pNode);
return 0; return 0;
} else { } else {
pNode = pNode->next; pNode = pNode->next;
...@@ -353,6 +441,7 @@ int32_t tqHandleMovePut(TqMetaStore* pMeta, int64_t key, void* value) { ...@@ -353,6 +441,7 @@ int32_t tqHandleMovePut(TqMetaStore* pMeta, int64_t key, void* value) {
pNewNode->handle.valueInTxn = value; pNewNode->handle.valueInTxn = value;
pNewNode->next = pMeta->bucket[bucketKey]; pNewNode->next = pMeta->bucket[bucketKey];
pMeta->bucket[bucketKey] = pNewNode; pMeta->bucket[bucketKey] = pNewNode;
tqLinkUnpersist(pMeta, pNewNode);
return 0; return 0;
} }
...@@ -368,11 +457,13 @@ int32_t tqHandleCopyPut(TqMetaStore* pMeta, int64_t key, void* value, size_t vsi ...@@ -368,11 +457,13 @@ int32_t tqHandleCopyPut(TqMetaStore* pMeta, int64_t key, void* value, size_t vsi
while(pNode) { while(pNode) {
if(pNode->handle.key == key) { if(pNode->handle.key == key) {
//TODO: think about thread safety //TODO: think about thread safety
if(pNode->handle.valueInTxn) { if(pNode->handle.valueInTxn
&& pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
pMeta->deleter(pNode->handle.valueInTxn); pMeta->deleter(pNode->handle.valueInTxn);
} }
//change pointer ownership //change pointer ownership
pNode->handle.valueInTxn = vmem; pNode->handle.valueInTxn = vmem;
tqLinkUnpersist(pMeta, pNode);
return 0; return 0;
} else { } else {
pNode = pNode->next; pNode = pNode->next;
...@@ -388,6 +479,7 @@ int32_t tqHandleCopyPut(TqMetaStore* pMeta, int64_t key, void* value, size_t vsi ...@@ -388,6 +479,7 @@ int32_t tqHandleCopyPut(TqMetaStore* pMeta, int64_t key, void* value, size_t vsi
pNewNode->handle.valueInTxn = vmem; pNewNode->handle.valueInTxn = vmem;
pNewNode->next = pMeta->bucket[bucketKey]; pNewNode->next = pMeta->bucket[bucketKey];
pMeta->bucket[bucketKey] = pNewNode; pMeta->bucket[bucketKey] = pNewNode;
tqLinkUnpersist(pMeta, pNewNode);
return 0; return 0;
} }
...@@ -396,7 +488,8 @@ static void* tqHandleGetUncommitted(TqMetaStore* pMeta, int64_t key) { ...@@ -396,7 +488,8 @@ static void* tqHandleGetUncommitted(TqMetaStore* pMeta, int64_t key) {
TqMetaList* pNode = pMeta->bucket[bucketKey]; TqMetaList* pNode = pMeta->bucket[bucketKey];
while(pNode) { while(pNode) {
if(pNode->handle.key == key) { if(pNode->handle.key == key) {
if(pNode->handle.valueInTxn != NULL) { if(pNode->handle.valueInTxn != NULL
&& pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
return pNode->handle.valueInTxn; return pNode->handle.valueInTxn;
} else { } else {
return NULL; return NULL;
...@@ -413,17 +506,13 @@ int32_t tqHandleCommit(TqMetaStore* pMeta, int64_t key) { ...@@ -413,17 +506,13 @@ int32_t tqHandleCommit(TqMetaStore* pMeta, int64_t key) {
TqMetaList* pNode = pMeta->bucket[bucketKey]; TqMetaList* pNode = pMeta->bucket[bucketKey];
while(pNode) { while(pNode) {
if(pNode->handle.key == key) { if(pNode->handle.key == key) {
if(pNode->handle.valueInUse) { if(pNode->handle.valueInUse
&& pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
pMeta->deleter(pNode->handle.valueInUse); pMeta->deleter(pNode->handle.valueInUse);
} }
pNode->handle.valueInUse = pNode->handle.valueInTxn; pNode->handle.valueInUse = pNode->handle.valueInTxn;
pNode->handle.valueInTxn = NULL; pNode->handle.valueInTxn = NULL;
if(pNode->unpersistNext == NULL) { tqLinkUnpersist(pMeta, pNode);
pNode->unpersistNext = pMeta->unpersistHead->unpersistNext;
pNode->unpersistPrev = pMeta->unpersistHead;
pMeta->unpersistHead->unpersistNext->unpersistPrev = pNode;
pMeta->unpersistHead->unpersistNext = pNode;
}
return 0; return 0;
} else { } else {
pNode = pNode->next; pNode = pNode->next;
...@@ -437,9 +526,12 @@ int32_t tqHandleAbort(TqMetaStore* pMeta, int64_t key) { ...@@ -437,9 +526,12 @@ int32_t tqHandleAbort(TqMetaStore* pMeta, int64_t key) {
TqMetaList* pNode = pMeta->bucket[bucketKey]; TqMetaList* pNode = pMeta->bucket[bucketKey];
while(pNode) { while(pNode) {
if(pNode->handle.key == key) { if(pNode->handle.key == key) {
if(pNode->handle.valueInTxn != NULL) { if(pNode->handle.valueInTxn) {
pMeta->deleter(pNode->handle.valueInTxn); if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
pMeta->deleter(pNode->handle.valueInTxn);
}
pNode->handle.valueInTxn = NULL; pNode->handle.valueInTxn = NULL;
tqLinkUnpersist(pMeta, pNode);
return 0; return 0;
} }
return -1; return -1;
...@@ -454,9 +546,11 @@ int32_t tqHandleDel(TqMetaStore* pMeta, int64_t key) { ...@@ -454,9 +546,11 @@ int32_t tqHandleDel(TqMetaStore* pMeta, int64_t key) {
int64_t bucketKey = key & TQ_BUCKET_SIZE; int64_t bucketKey = key & TQ_BUCKET_SIZE;
TqMetaList* pNode = pMeta->bucket[bucketKey]; TqMetaList* pNode = pMeta->bucket[bucketKey];
while(pNode) { while(pNode) {
if(pNode->handle.key == key) { if(pNode->handle.valueInTxn
&& pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
pMeta->deleter(pNode->handle.valueInTxn); pMeta->deleter(pNode->handle.valueInTxn);
pNode->handle.valueInTxn = NULL; pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
tqLinkUnpersist(pMeta, pNode);
return 0; return 0;
} else { } else {
pNode = pNode->next; pNode = pNode->next;
...@@ -474,21 +568,20 @@ int32_t tqHandleClear(TqMetaStore* pMeta, int64_t key) { ...@@ -474,21 +568,20 @@ int32_t tqHandleClear(TqMetaStore* pMeta, int64_t key) {
if(pNode->handle.key == key) { if(pNode->handle.key == key) {
if(pNode->handle.valueInUse != NULL) { if(pNode->handle.valueInUse != NULL) {
exist = true; exist = true;
pMeta->deleter(pNode->handle.valueInUse); if(pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
pNode->handle.valueInUse = NULL; pMeta->deleter(pNode->handle.valueInUse);
}
pNode->handle.valueInUse = TQ_DELETE_TOKEN;
} }
if(pNode->handle.valueInTxn != NULL) { if(pNode->handle.valueInTxn != NULL) {
exist = true; exist = true;
pMeta->deleter(pNode->handle.valueInTxn); if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
pNode->handle.valueInTxn = NULL; pMeta->deleter(pNode->handle.valueInTxn);
}
pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
} }
if(exist) { if(exist) {
if(pNode->unpersistNext == NULL) { tqLinkUnpersist(pMeta, pNode);
pNode->unpersistNext = pMeta->unpersistHead->unpersistNext;
pNode->unpersistPrev = pMeta->unpersistHead;
pMeta->unpersistHead->unpersistNext->unpersistPrev = pNode;
pMeta->unpersistHead->unpersistNext = pNode;
}
return 0; return 0;
} }
return -1; return -1;
......
...@@ -9,19 +9,22 @@ struct Foo { ...@@ -9,19 +9,22 @@ struct Foo {
int32_t a; int32_t a;
}; };
int FooSerializer(const void* pObj, void** ppBytes) { int FooSerializer(const void* pObj, TqSerializedHead** ppHead) {
Foo* foo = (Foo*) pObj; Foo* foo = (Foo*) pObj;
*ppBytes = realloc(*ppBytes, sizeof(int32_t)); if((*ppHead) == NULL || (*ppHead)->ssize < sizeof(TqSerializedHead) + sizeof(int32_t)) {
**(int32_t**)ppBytes = foo->a; *ppHead = (TqSerializedHead*)realloc(*ppHead, sizeof(TqSerializedHead) + sizeof(int32_t));
return sizeof(int32_t); (*ppHead)->ssize = sizeof(TqSerializedHead) + sizeof(int32_t);
}
*(int32_t*)(*ppHead)->content = foo->a;
return (*ppHead)->ssize;
} }
const void* FooDeserializer(const void* pBytes, void** ppObj) { const void* FooDeserializer(const TqSerializedHead* pHead, void** ppObj) {
if(*ppObj == NULL) { if(*ppObj == NULL) {
*ppObj = realloc(*ppObj, sizeof(int32_t)); *ppObj = realloc(*ppObj, sizeof(int32_t));
} }
Foo* pFoo = *(Foo**)ppObj; Foo* pFoo = *(Foo**)ppObj;
pFoo->a = *(int32_t*)pBytes; pFoo->a = *(int32_t*)pHead->content;
return NULL; return NULL;
} }
...@@ -104,3 +107,27 @@ TEST_F(TqMetaTest, abortTest) { ...@@ -104,3 +107,27 @@ TEST_F(TqMetaTest, abortTest) {
pFoo = (Foo*) tqHandleGet(pMeta, 1); pFoo = (Foo*) tqHandleGet(pMeta, 1);
EXPECT_EQ(pFoo == NULL, true); EXPECT_EQ(pFoo == NULL, true);
} }
TEST_F(TqMetaTest, deleteTest) {
Foo* pFoo = (Foo*)malloc(sizeof(Foo));
pFoo->a = 3;
tqHandleMovePut(pMeta, 1, pFoo);
pFoo = (Foo*) tqHandleGet(pMeta, 1);
EXPECT_EQ(pFoo == NULL, true);
tqHandleCommit(pMeta, 1);
pFoo = (Foo*) tqHandleGet(pMeta, 1);
ASSERT_EQ(pFoo != NULL, true);
EXPECT_EQ(pFoo->a, 3);
tqHandleDel(pMeta, 1);
pFoo = (Foo*) tqHandleGet(pMeta, 1);
ASSERT_EQ(pFoo != NULL, true);
EXPECT_EQ(pFoo->a, 3);
tqHandleCommit(pMeta, 1);
pFoo = (Foo*) tqHandleGet(pMeta, 1);
EXPECT_EQ(pFoo == NULL, true);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册