vnodeBufferPool.c 5.1 KB
Newer Older
H
more  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "vnodeDef.h"

/* ------------------------ STRUCTURES ------------------------ */
H
more  
Hongze Cheng 已提交
19 20
#define VNODE_BUF_POOL_SHARDS 3

H
more  
Hongze Cheng 已提交
21
struct SVBufPool {
H
more  
Hongze Cheng 已提交
22 23
  pthread_mutex_t mutex;
  pthread_cond_t  hasFree;
H
more  
Hongze Cheng 已提交
24 25
  TD_DLIST(SVMemAllocator) free;
  TD_DLIST(SVMemAllocator) incycle;
H
more  
Hongze Cheng 已提交
26
  SVMemAllocator *inuse;
H
more  
Hongze Cheng 已提交
27 28
  // MAF for submodules to use
  SMemAllocatorFactory *pMAF;
H
more  
Hongze Cheng 已提交
29 30
};

H
Hongze Cheng 已提交
31 32 33
static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pMAF);
static void           vBufPoolDestroyMA(SMemAllocatorFactory *pMAF, SMemAllocator *pMA);

H
more  
Hongze Cheng 已提交
34 35 36 37 38 39 40 41
int vnodeOpenBufPool(SVnode *pVnode) {
  uint64_t capacity;

  if ((pVnode->pBufPool = (SVBufPool *)calloc(1, sizeof(SVBufPool))) == NULL) {
    /* TODO */
    return -1;
  }

H
Hongze Cheng 已提交
42 43
  TD_DLIST_INIT(&(pVnode->pBufPool->free));
  TD_DLIST_INIT(&(pVnode->pBufPool->incycle));
H
more  
Hongze Cheng 已提交
44 45 46 47 48 49 50 51 52 53 54 55 56

  pVnode->pBufPool->inuse = NULL;

  // TODO
  capacity = pVnode->config.wsize / VNODE_BUF_POOL_SHARDS;

  for (int i = 0; i < VNODE_BUF_POOL_SHARDS; i++) {
    SVMemAllocator *pVMA = vmaCreate(capacity, pVnode->config.ssize, pVnode->config.lsize);
    if (pVMA == NULL) {
      // TODO: handle error
      return -1;
    }

H
Hongze Cheng 已提交
57
    TD_DLIST_APPEND(&(pVnode->pBufPool->free), pVMA);
H
more  
Hongze Cheng 已提交
58 59
  }

H
Hongze Cheng 已提交
60 61 62 63 64 65 66 67 68
  pVnode->pBufPool->pMAF = (SMemAllocatorFactory *)malloc(sizeof(SMemAllocatorFactory));
  if (pVnode->pBufPool->pMAF == NULL) {
    // TODO: handle error
    return -1;
  }
  pVnode->pBufPool->pMAF->impl = pVnode;
  pVnode->pBufPool->pMAF->create = vBufPoolCreateMA;
  pVnode->pBufPool->pMAF->destroy = vBufPoolDestroyMA;

H
more  
Hongze Cheng 已提交
69 70 71 72 73 74 75 76
  return 0;
}

void vnodeCloseBufPool(SVnode *pVnode) {
  if (pVnode->pBufPool) {
    vmaDestroy(pVnode->pBufPool->inuse);

    while (true) {
H
more  
Hongze Cheng 已提交
77
      SVMemAllocator *pVMA = TD_DLIST_HEAD(&(pVnode->pBufPool->incycle));
H
more  
Hongze Cheng 已提交
78
      if (pVMA == NULL) break;
H
Hongze Cheng 已提交
79
      TD_DLIST_POP(&(pVnode->pBufPool->incycle), pVMA);
H
more  
Hongze Cheng 已提交
80 81 82 83
      vmaDestroy(pVMA);
    }

    while (true) {
H
more  
Hongze Cheng 已提交
84
      SVMemAllocator *pVMA = TD_DLIST_HEAD(&(pVnode->pBufPool->free));
H
more  
Hongze Cheng 已提交
85
      if (pVMA == NULL) break;
H
Hongze Cheng 已提交
86
      TD_DLIST_POP(&(pVnode->pBufPool->free), pVMA);
H
more  
Hongze Cheng 已提交
87 88 89 90 91 92 93 94
      vmaDestroy(pVMA);
    }

    free(pVnode->pBufPool);
    pVnode->pBufPool = NULL;
  }
}

H
Hongze Cheng 已提交
95 96 97 98 99
int vnodeBufPoolSwitch(SVnode *pVnode) {
  SVMemAllocator *pvma = pVnode->pBufPool->inuse;

  pVnode->pBufPool->inuse = NULL;

H
Hongze Cheng 已提交
100
  TD_DLIST_APPEND(&(pVnode->pBufPool->incycle), pvma);
H
Hongze Cheng 已提交
101 102 103 104 105 106 107 108
  return 0;
}

int vnodeBufPoolRecycle(SVnode *pVnode) {
  SVBufPool *     pBufPool = pVnode->pBufPool;
  SVMemAllocator *pvma = TD_DLIST_HEAD(&(pBufPool->incycle));
  ASSERT(pvma != NULL);

H
Hongze Cheng 已提交
109
  TD_DLIST_POP(&(pBufPool->incycle), pvma);
H
Hongze Cheng 已提交
110
  vmaReset(pvma);
H
Hongze Cheng 已提交
111
  TD_DLIST_APPEND(&(pBufPool->free), pvma);
H
Hongze Cheng 已提交
112 113 114 115

  return 0;
}

H
more  
Hongze Cheng 已提交
116
void *vnodeMalloc(SVnode *pVnode, uint64_t size) {
H
more  
Hongze Cheng 已提交
117 118 119 120 121
  SVBufPool *pBufPool = pVnode->pBufPool;

  if (pBufPool->inuse == NULL) {
    while (true) {
      // TODO: add sem_wait and sem_post
H
more  
Hongze Cheng 已提交
122
      pBufPool->inuse = TD_DLIST_HEAD(&(pBufPool->free));
H
more  
Hongze Cheng 已提交
123
      if (pBufPool->inuse) {
H
Hongze Cheng 已提交
124
        TD_DLIST_POP(&(pBufPool->free), pBufPool->inuse);
H
more  
Hongze Cheng 已提交
125
        break;
H
more  
Hongze Cheng 已提交
126 127
      } else {
        // tsem_wait(&(pBufPool->hasFree));
H
more  
Hongze Cheng 已提交
128 129 130 131 132
      }
    }
  }

  return vmaMalloc(pBufPool->inuse, size);
H
more  
Hongze Cheng 已提交
133 134 135
}

bool vnodeBufPoolIsFull(SVnode *pVnode) {
H
more  
Hongze Cheng 已提交
136 137
  if (pVnode->pBufPool->inuse == NULL) return false;
  return vmaIsFull(pVnode->pBufPool->inuse);
H
more  
Hongze Cheng 已提交
138 139
}

H
more  
Hongze Cheng 已提交
140 141
SMemAllocatorFactory *vBufPoolGetMAF(SVnode *pVnode) { return pVnode->pBufPool->pMAF; }

H
Hongze Cheng 已提交
142
/* ------------------------ STATIC METHODS ------------------------ */
H
more  
Hongze Cheng 已提交
143
typedef struct {
H
Hongze Cheng 已提交
144 145
  SVnode *        pVnode;
  SVMemAllocator *pVMA;
H
more  
Hongze Cheng 已提交
146 147
} SVMAWrapper;

H
Hongze Cheng 已提交
148 149
static FORCE_INLINE void *vmaMaloocCb(SMemAllocator *pMA, uint64_t size) {
  SVMAWrapper *pWrapper = (SVMAWrapper *)(pMA->impl);
H
more  
Hongze Cheng 已提交
150

H
Hongze Cheng 已提交
151
  return vmaMalloc(pWrapper->pVMA, size);
H
more  
Hongze Cheng 已提交
152 153
}

H
Hongze Cheng 已提交
154 155 156 157 158
// TODO: Add atomic operations here
static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pMAF) {
  SMemAllocator *pMA;
  SVnode *       pVnode = (SVnode *)(pMAF->impl);
  SVMAWrapper *  pWrapper;
H
more  
Hongze Cheng 已提交
159

H
Hongze Cheng 已提交
160 161
  pMA = (SMemAllocator *)calloc(1, sizeof(*pMA) + sizeof(SVMAWrapper));
  if (pMA == NULL) {
H
more  
Hongze Cheng 已提交
162 163 164
    return NULL;
  }

H
Hongze Cheng 已提交
165 166 167 168
  pVnode->pBufPool->inuse->_ref.val++;
  pWrapper = POINTER_SHIFT(pMA, sizeof(*pMA));
  pWrapper->pVnode = pVnode;
  pWrapper->pVMA = pVnode->pBufPool->inuse;
H
more  
Hongze Cheng 已提交
169

H
Hongze Cheng 已提交
170
  pMA->impl = pWrapper;
H
refact  
Hongze Cheng 已提交
171
  TD_MA_MALLOC_FUNC(pMA) = vmaMaloocCb;
H
more  
Hongze Cheng 已提交
172

H
Hongze Cheng 已提交
173
  return pMA;
H
more  
Hongze Cheng 已提交
174 175
}

H
Hongze Cheng 已提交
176 177 178 179
static void vBufPoolDestroyMA(SMemAllocatorFactory *pMAF, SMemAllocator *pMA) {
  SVMAWrapper *   pWrapper = (SVMAWrapper *)(pMA->impl);
  SVnode *        pVnode = pWrapper->pVnode;
  SVMemAllocator *pVMA = pWrapper->pVMA;
H
more  
Hongze Cheng 已提交
180

H
Hongze Cheng 已提交
181 182
  free(pMA);
  if (--pVMA->_ref.val == 0) {
H
Hongze Cheng 已提交
183 184
    TD_DLIST_POP(&(pVnode->pBufPool->incycle), pVMA);
    TD_DLIST_APPEND(&(pVnode->pBufPool->free), pVMA);
H
more  
Hongze Cheng 已提交
185
  }
H
Hongze Cheng 已提交
186
}