vnodeBufferPool.c 5.1 KB
Newer Older
H
more  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "vnodeDef.h"

/* ------------------------ STRUCTURES ------------------------ */
H
more  
Hongze Cheng 已提交
19 20
#define VNODE_BUF_POOL_SHARDS 3

H
more  
Hongze Cheng 已提交
21
struct SVBufPool {
H
more  
Hongze Cheng 已提交
22 23
  pthread_mutex_t mutex;
  pthread_cond_t  hasFree;
H
more  
Hongze Cheng 已提交
24 25
  TD_DLIST(SVMemAllocator) free;
  TD_DLIST(SVMemAllocator) incycle;
H
more  
Hongze Cheng 已提交
26
  SVMemAllocator *inuse;
H
more  
Hongze Cheng 已提交
27 28
  // MAF for submodules to use
  SMemAllocatorFactory *pMAF;
H
more  
Hongze Cheng 已提交
29 30
};

H
Hongze Cheng 已提交
31 32 33
static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pMAF);
static void           vBufPoolDestroyMA(SMemAllocatorFactory *pMAF, SMemAllocator *pMA);

H
more  
Hongze Cheng 已提交
34 35 36 37 38 39 40 41
int vnodeOpenBufPool(SVnode *pVnode) {
  uint64_t capacity;

  if ((pVnode->pBufPool = (SVBufPool *)calloc(1, sizeof(SVBufPool))) == NULL) {
    /* TODO */
    return -1;
  }

H
Hongze Cheng 已提交
42 43
  TD_DLIST_INIT(&(pVnode->pBufPool->free));
  TD_DLIST_INIT(&(pVnode->pBufPool->incycle));
H
more  
Hongze Cheng 已提交
44 45 46 47 48 49 50 51 52 53 54 55 56

  pVnode->pBufPool->inuse = NULL;

  // TODO
  capacity = pVnode->config.wsize / VNODE_BUF_POOL_SHARDS;

  for (int i = 0; i < VNODE_BUF_POOL_SHARDS; i++) {
    SVMemAllocator *pVMA = vmaCreate(capacity, pVnode->config.ssize, pVnode->config.lsize);
    if (pVMA == NULL) {
      // TODO: handle error
      return -1;
    }

H
Hongze Cheng 已提交
57
    TD_DLIST_APPEND(&(pVnode->pBufPool->free), pVMA);
H
more  
Hongze Cheng 已提交
58 59
  }

H
Hongze Cheng 已提交
60 61 62 63 64 65 66 67 68
  pVnode->pBufPool->pMAF = (SMemAllocatorFactory *)malloc(sizeof(SMemAllocatorFactory));
  if (pVnode->pBufPool->pMAF == NULL) {
    // TODO: handle error
    return -1;
  }
  pVnode->pBufPool->pMAF->impl = pVnode;
  pVnode->pBufPool->pMAF->create = vBufPoolCreateMA;
  pVnode->pBufPool->pMAF->destroy = vBufPoolDestroyMA;

H
more  
Hongze Cheng 已提交
69 70 71 72 73 74 75 76
  return 0;
}

void vnodeCloseBufPool(SVnode *pVnode) {
  if (pVnode->pBufPool) {
    vmaDestroy(pVnode->pBufPool->inuse);

    while (true) {
H
more  
Hongze Cheng 已提交
77
      SVMemAllocator *pVMA = TD_DLIST_HEAD(&(pVnode->pBufPool->incycle));
H
more  
Hongze Cheng 已提交
78
      if (pVMA == NULL) break;
H
Hongze Cheng 已提交
79
      TD_DLIST_POP(&(pVnode->pBufPool->incycle), pVMA);
H
more  
Hongze Cheng 已提交
80 81 82 83
      vmaDestroy(pVMA);
    }

    while (true) {
H
more  
Hongze Cheng 已提交
84
      SVMemAllocator *pVMA = TD_DLIST_HEAD(&(pVnode->pBufPool->free));
H
more  
Hongze Cheng 已提交
85
      if (pVMA == NULL) break;
H
Hongze Cheng 已提交
86
      TD_DLIST_POP(&(pVnode->pBufPool->free), pVMA);
H
more  
Hongze Cheng 已提交
87 88 89 90 91 92 93 94
      vmaDestroy(pVMA);
    }

    free(pVnode->pBufPool);
    pVnode->pBufPool = NULL;
  }
}

H
Hongze Cheng 已提交
95 96 97 98 99
int vnodeBufPoolSwitch(SVnode *pVnode) {
  SVMemAllocator *pvma = pVnode->pBufPool->inuse;

  pVnode->pBufPool->inuse = NULL;

H
Hongze Cheng 已提交
100 101 102
  if (pvma) {
    TD_DLIST_APPEND(&(pVnode->pBufPool->incycle), pvma);
  }
H
Hongze Cheng 已提交
103 104 105 106 107 108
  return 0;
}

int vnodeBufPoolRecycle(SVnode *pVnode) {
  SVBufPool *     pBufPool = pVnode->pBufPool;
  SVMemAllocator *pvma = TD_DLIST_HEAD(&(pBufPool->incycle));
H
Hongze Cheng 已提交
109 110
  if (pvma == NULL) return 0;
  // ASSERT(pvma != NULL);
H
Hongze Cheng 已提交
111

H
Hongze Cheng 已提交
112
  TD_DLIST_POP(&(pBufPool->incycle), pvma);
H
Hongze Cheng 已提交
113
  vmaReset(pvma);
H
Hongze Cheng 已提交
114
  TD_DLIST_APPEND(&(pBufPool->free), pvma);
H
Hongze Cheng 已提交
115 116 117 118

  return 0;
}

H
more  
Hongze Cheng 已提交
119
void *vnodeMalloc(SVnode *pVnode, uint64_t size) {
H
more  
Hongze Cheng 已提交
120 121 122 123 124
  SVBufPool *pBufPool = pVnode->pBufPool;

  if (pBufPool->inuse == NULL) {
    while (true) {
      // TODO: add sem_wait and sem_post
H
more  
Hongze Cheng 已提交
125
      pBufPool->inuse = TD_DLIST_HEAD(&(pBufPool->free));
H
more  
Hongze Cheng 已提交
126
      if (pBufPool->inuse) {
H
Hongze Cheng 已提交
127
        TD_DLIST_POP(&(pBufPool->free), pBufPool->inuse);
H
more  
Hongze Cheng 已提交
128
        break;
H
more  
Hongze Cheng 已提交
129 130
      } else {
        // tsem_wait(&(pBufPool->hasFree));
H
more  
Hongze Cheng 已提交
131 132 133 134 135
      }
    }
  }

  return vmaMalloc(pBufPool->inuse, size);
H
more  
Hongze Cheng 已提交
136 137 138
}

bool vnodeBufPoolIsFull(SVnode *pVnode) {
H
more  
Hongze Cheng 已提交
139 140
  if (pVnode->pBufPool->inuse == NULL) return false;
  return vmaIsFull(pVnode->pBufPool->inuse);
H
more  
Hongze Cheng 已提交
141 142
}

H
more  
Hongze Cheng 已提交
143 144
SMemAllocatorFactory *vBufPoolGetMAF(SVnode *pVnode) { return pVnode->pBufPool->pMAF; }

H
Hongze Cheng 已提交
145
/* ------------------------ STATIC METHODS ------------------------ */
H
more  
Hongze Cheng 已提交
146
typedef struct {
H
Hongze Cheng 已提交
147 148
  SVnode *        pVnode;
  SVMemAllocator *pVMA;
H
more  
Hongze Cheng 已提交
149 150
} SVMAWrapper;

H
Hongze Cheng 已提交
151 152
static FORCE_INLINE void *vmaMaloocCb(SMemAllocator *pMA, uint64_t size) {
  SVMAWrapper *pWrapper = (SVMAWrapper *)(pMA->impl);
H
more  
Hongze Cheng 已提交
153

H
Hongze Cheng 已提交
154
  return vmaMalloc(pWrapper->pVMA, size);
H
more  
Hongze Cheng 已提交
155 156
}

H
Hongze Cheng 已提交
157 158 159 160 161
// TODO: Add atomic operations here
static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pMAF) {
  SMemAllocator *pMA;
  SVnode *       pVnode = (SVnode *)(pMAF->impl);
  SVMAWrapper *  pWrapper;
H
more  
Hongze Cheng 已提交
162

H
Hongze Cheng 已提交
163 164
  pMA = (SMemAllocator *)calloc(1, sizeof(*pMA) + sizeof(SVMAWrapper));
  if (pMA == NULL) {
H
more  
Hongze Cheng 已提交
165 166 167
    return NULL;
  }

H
Hongze Cheng 已提交
168 169 170 171
  pVnode->pBufPool->inuse->_ref.val++;
  pWrapper = POINTER_SHIFT(pMA, sizeof(*pMA));
  pWrapper->pVnode = pVnode;
  pWrapper->pVMA = pVnode->pBufPool->inuse;
H
more  
Hongze Cheng 已提交
172

H
Hongze Cheng 已提交
173
  pMA->impl = pWrapper;
H
refact  
Hongze Cheng 已提交
174
  TD_MA_MALLOC_FUNC(pMA) = vmaMaloocCb;
H
more  
Hongze Cheng 已提交
175

H
Hongze Cheng 已提交
176
  return pMA;
H
more  
Hongze Cheng 已提交
177 178
}

H
Hongze Cheng 已提交
179 180 181 182
static void vBufPoolDestroyMA(SMemAllocatorFactory *pMAF, SMemAllocator *pMA) {
  SVMAWrapper *   pWrapper = (SVMAWrapper *)(pMA->impl);
  SVnode *        pVnode = pWrapper->pVnode;
  SVMemAllocator *pVMA = pWrapper->pVMA;
H
more  
Hongze Cheng 已提交
183

H
Hongze Cheng 已提交
184 185
  free(pMA);
  if (--pVMA->_ref.val == 0) {
H
Hongze Cheng 已提交
186 187
    TD_DLIST_POP(&(pVnode->pBufPool->incycle), pVMA);
    TD_DLIST_APPEND(&(pVnode->pBufPool->free), pVMA);
H
more  
Hongze Cheng 已提交
188
  }
H
Hongze Cheng 已提交
189
}