提交 768ea68c 编写于 作者: H Hongze Cheng

more

上级 fed05bb6
...@@ -29,6 +29,7 @@ typedef struct STsdbMemAllocator STsdbMemAllocator; ...@@ -29,6 +29,7 @@ typedef struct STsdbMemAllocator STsdbMemAllocator;
STsdb *tsdbOpen(const char *path, const STsdbOptions *); STsdb *tsdbOpen(const char *path, const STsdbOptions *);
void tsdbClose(STsdb *); void tsdbClose(STsdb *);
void tsdbRemove(const char *path); void tsdbRemove(const char *path);
int tsdbInsertData(STsdb *pTsdb, void *);
// STsdbOptions // STsdbOptions
int tsdbOptionsInit(STsdbOptions *); int tsdbOptionsInit(STsdbOptions *);
......
...@@ -22,10 +22,10 @@ ...@@ -22,10 +22,10 @@
extern "C" { extern "C" {
#endif #endif
typedef struct SMemAllocator SMemAllocator; typedef struct SMemAllocator SMemAllocator;
typedef struct SMemAllocatorFactory SMemAllocatorFactory;
struct SMemAllocator { struct SMemAllocator {
char name[16];
void *impl; void *impl;
void *(*malloc)(SMemAllocator *, uint64_t size); void *(*malloc)(SMemAllocator *, uint64_t size);
void *(*calloc)(SMemAllocator *, uint64_t nmemb, uint64_t size); void *(*calloc)(SMemAllocator *, uint64_t nmemb, uint64_t size);
...@@ -34,11 +34,11 @@ struct SMemAllocator { ...@@ -34,11 +34,11 @@ struct SMemAllocator {
uint64_t (*usage)(SMemAllocator *); uint64_t (*usage)(SMemAllocator *);
}; };
typedef struct { struct SMemAllocatorFactory {
void *impl; void *impl;
SMemAllocator *(*create)(); SMemAllocator *(*create)(SMemAllocatorFactory *);
void (*destroy)(SMemAllocator *); void (*destroy)(SMemAllocatorFactory *, SMemAllocator *);
} SMemAllocatorFactory; };
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -21,6 +21,9 @@ extern "C" { ...@@ -21,6 +21,9 @@ extern "C" {
#endif #endif
typedef struct { typedef struct {
uint64_t processed;
uint64_t committed;
uint64_t applied;
} SVState; } SVState;
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -19,9 +19,12 @@ ...@@ -19,9 +19,12 @@
#define VNODE_BUF_POOL_SHARDS 3 #define VNODE_BUF_POOL_SHARDS 3
struct SVBufPool { struct SVBufPool {
// buffer pool impl
SList free; SList free;
SList incycle; SList incycle;
SListNode *inuse; SListNode *inuse;
// MAF for submodules
SMemAllocatorFactory maf;
}; };
typedef enum { typedef enum {
...@@ -49,6 +52,11 @@ typedef struct { ...@@ -49,6 +52,11 @@ typedef struct {
SVArenaNode node; SVArenaNode node;
} SVArenaAllocator; } SVArenaAllocator;
typedef struct {
SVnode * pVnode;
SListNode *pNode;
} SVMAWrapper;
typedef struct { typedef struct {
T_REF_DECLARE() T_REF_DECLARE()
uint64_t capacity; uint64_t capacity;
...@@ -59,8 +67,10 @@ typedef struct { ...@@ -59,8 +67,10 @@ typedef struct {
}; };
} SVMemAllocator; } SVMemAllocator;
static SListNode *vBufPoolNewNode(uint64_t capacity, EVMemAllocatorT type); static SListNode * vBufPoolNewNode(uint64_t capacity, EVMemAllocatorT type);
static void vBufPoolFreeNode(SListNode *pNode); static void vBufPoolFreeNode(SListNode *pNode);
static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pmaf);
static void vBufPoolDestroyMA(SMemAllocatorFactory *pmaf, SMemAllocator *pma);
int vnodeOpenBufPool(SVnode *pVnode) { int vnodeOpenBufPool(SVnode *pVnode) {
uint64_t capacity; uint64_t capacity;
...@@ -89,6 +99,10 @@ int vnodeOpenBufPool(SVnode *pVnode) { ...@@ -89,6 +99,10 @@ int vnodeOpenBufPool(SVnode *pVnode) {
tdListAppendNode(&(pVnode->pBufPool->free), pNode); tdListAppendNode(&(pVnode->pBufPool->free), pNode);
} }
pVnode->pBufPool->maf.impl = pVnode;
pVnode->pBufPool->maf.create = vBufPoolCreateMA;
pVnode->pBufPool->maf.destroy = vBufPoolDestroyMA;
return 0; return 0;
} }
...@@ -185,4 +199,82 @@ static void vBufPoolFreeNode(SListNode *pNode) { ...@@ -185,4 +199,82 @@ static void vBufPoolFreeNode(SListNode *pNode) {
} }
free(pNode); free(pNode);
}
static void *vBufPoolMalloc(SMemAllocator *pma, uint64_t size) {
SVMAWrapper * pvmaw = (SVMAWrapper *)(pma->impl);
SVMemAllocator *pvma = (SVMemAllocator *)(pvmaw->pNode->data);
void * ptr = NULL;
if (pvma->type == E_V_ARENA_ALLOCATOR) {
SVArenaAllocator *pvaa = &(pvma->vaa);
if (POINTER_DISTANCE(pvaa->inuse->ptr, pvaa->inuse->data) + size > pvaa->inuse->size) {
SVArenaNode *pNode = (SVArenaNode *)malloc(sizeof(*pNode) + MAX(size, pvaa->ssize));
if (pNode == NULL) {
// TODO: handle error
return NULL;
}
pNode->prev = pvaa->inuse;
pNode->size = MAX(size, pvaa->ssize);
pNode->ptr = pNode->data;
pvaa->inuse = pNode;
}
ptr = pvaa->inuse->ptr;
pvaa->inuse->ptr = POINTER_SHIFT(ptr, size);
} else if (pvma->type == E_V_HEAP_ALLOCATOR) {
/* TODO */
}
return NULL;
}
static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pmaf) {
SVnode * pVnode;
SMemAllocator * pma;
SVMemAllocator *pvma;
SVMAWrapper * pvmaw;
pVnode = (SVnode *)(pmaf->impl);
pma = (SMemAllocator *)calloc(1, sizeof(*pma) + sizeof(SVMAWrapper));
if (pma == NULL) {
// TODO: handle error
return NULL;
}
pvmaw = (SVMAWrapper *)POINTER_SHIFT(pma, sizeof(*pma));
// No allocator used currently
if (pVnode->pBufPool->inuse == NULL) {
while (listNEles(&(pVnode->pBufPool->free)) == 0) {
// TODO: wait until all released ro kill query
// tsem_wait();
ASSERT(0);
}
pVnode->pBufPool->inuse = tdListPopHead(&(pVnode->pBufPool->free));
pvma = (SVMemAllocator *)(pVnode->pBufPool->inuse->data);
T_REF_INIT_VAL(pvma, 1);
} else {
pvma = (SVMemAllocator *)(pVnode->pBufPool->inuse->data);
}
T_REF_INC(pvma);
pvmaw->pVnode = pVnode;
pvmaw->pNode = pVnode->pBufPool->inuse;
pma->impl = pvmaw;
pma->malloc = vBufPoolMalloc;
pma->calloc = NULL; /* TODO */
pma->realloc = NULL; /* TODO */
pma->free = NULL; /* TODO */
pma->usage = NULL; /* TODO */
return pma;
}
static void vBufPoolDestroyMA(SMemAllocatorFactory *pmaf, SMemAllocator *pma) { /* TODO */
} }
\ No newline at end of file
...@@ -21,46 +21,32 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { ...@@ -21,46 +21,32 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
} }
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
#if 0 // TODO
int reqType; /* TODO */ int code = 0;
size_t reqSize; /* TODO */
uint64_t reqVersion = 0; /* TODO */
int code = 0;
// Copy the request to vnode buffer switch (pMsg->msgType) {
void *pReq = mMalloc(pVnode->inuse, reqSize);
if (pReq == NULL) {
// TODO: handle error
}
memcpy(pReq, pMsg, reqSize);
// Push the request to TQ so consumers can consume
tqPushMsg(pVnode->pTq, pReq, 0);
// Process the request
switch (reqType) {
case TSDB_MSG_TYPE_CREATE_TABLE: case TSDB_MSG_TYPE_CREATE_TABLE:
code = metaCreateTable(pVnode->pMeta, NULL /* TODO */); if (metaCreateTable(pVnode->pMeta, pMsg->pCont) < 0) {
/* TODO */
return -1;
}
break; break;
case TSDB_MSG_TYPE_DROP_TABLE: case TSDB_MSG_TYPE_DROP_TABLE:
code = metaDropTable(pVnode->pMeta, 0 /* TODO */); if (metaDropTable(pVnode->pMeta, pMsg->pCont) < 0) {
/* TODO */
return -1;
}
break; break;
case TSDB_MSG_TYPE_SUBMIT: case TSDB_MSG_TYPE_SUBMIT:
/* TODO */ if (tsdbInsertData(pVnode->pTsdb, pMsg->pCont) < 0) {
/* TODO */
return -1;
}
break; break;
default: default:
break; break;
} }
if (vnodeShouldCommit(pVnode)) {
if (vnodeAsyncCommit(pVnode) < 0) {
// TODO: handle error
}
}
return code;
#endif
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册