mempool.h 11.0 KB
Newer Older
W
wangguibao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once
W
wangguibao 已提交
16 17 18 19 20

#ifdef BCLOUD
#include <base/atomicops.h>
#include <base/logging.h>
#else
W
wangguibao 已提交
21 22
#include <butil/atomicops.h>
#include <butil/logging.h>
W
wangguibao 已提交
23 24
#endif

W
wangguibao 已提交
25 26
#include <execinfo.h>
#include <pthread.h>
W
wangguibao 已提交
27
#include <iostream>
W
wangguibao 已提交
28 29 30
#include <new>
#include <sstream>
#include <string>
W
wangguibao 已提交
31 32 33 34

namespace im {
namespace fugue {

W
wangguibao 已提交
35 36 37 38
#ifdef BCLOUD
namespace butil = base;
#endif

W
wangguibao 已提交
39 40 41 42
namespace lockfree {

template <class T>
class PushOnlyStack {
W
wangguibao 已提交
43 44
 public:
  PushOnlyStack() { _head.store(NULL, butil::memory_order_relaxed); }
W
wangguibao 已提交
45

W
wangguibao 已提交
46 47 48 49 50 51
  void push(T* node) {
    T* head = _head.load(butil::memory_order_relaxed);
    node->next = head;
    while (
        !_head.compare_exchange_weak(head, node, butil::memory_order_relaxed)) {
      node->next = head;
W
wangguibao 已提交
52
    }
W
wangguibao 已提交
53
  }
W
wangguibao 已提交
54

W
wangguibao 已提交
55
  T* release() { return _head.exchange(NULL, butil::memory_order_relaxed); }
W
wangguibao 已提交
56

W
wangguibao 已提交
57 58
 private:
  butil::atomic<T*> _head;
W
wangguibao 已提交
59 60 61 62
};

template <class T>
struct FreeListNode {
W
wangguibao 已提交
63 64 65
  uint64_t id;
  uint64_t next;
  T data;
W
wangguibao 已提交
66 67 68 69
};

template <class T, int CAP>
class FreeList {
W
wangguibao 已提交
70 71 72
 public:
  typedef FreeListNode<T> Node;
  static const uint64_t EMPTY = 0xFFFFFFFFFFFFFFFF;
W
wangguibao 已提交
73

W
wangguibao 已提交
74 75 76 77
  T* get() {
    uint64_t head = _head.load(butil::memory_order_acquire);
    if (head == EMPTY) {
      return new_node();
W
wangguibao 已提交
78 79
    }

W
wangguibao 已提交
80 81 82 83 84 85 86 87 88 89
    Node* node = address(head);
    while (!_head.compare_exchange_weak(
        head, node->next, butil::memory_order_acquire)) {
      if (head == EMPTY) {
        return new_node();
      }
      node = address(head);
    }
    return &node->data;
  }
W
wangguibao 已提交
90

W
wangguibao 已提交
91 92
  void put(T* value) {
    Node* node = container_of(value, Node, data);
W
wangguibao 已提交
93

W
wangguibao 已提交
94 95 96 97
    uint64_t head = _head.load(butil::memory_order_acquire);
    // add version
    node->id += (1UL << 32);
    node->next = head;
W
wangguibao 已提交
98

W
wangguibao 已提交
99 100 101 102 103 104
    // NOTE: we MUST use a temp var *head* to call compare_exchange_weak
    // because Boost.Atomic will update the *expected* even success
    // std::atomic do not have this limitation
    while (!_head.compare_exchange_weak(
        head, node->id, butil::memory_order_release)) {
      node->next = head;
W
wangguibao 已提交
105
    }
W
wangguibao 已提交
106
  }
W
wangguibao 已提交
107

W
wangguibao 已提交
108 109 110 111 112
  template <class F>
  void unsafe_foreach() {
    uint32_t used_blk_cnt = _slot_index.load(butil::memory_order_relaxed);
    for (uint32_t i = 0; i < used_blk_cnt; ++i) {
      F()(&_node[i]->data);
W
wangguibao 已提交
113
    }
W
wangguibao 已提交
114
  }
W
wangguibao 已提交
115

W
wangguibao 已提交
116 117 118 119 120
  uint32_t real_used_size() const {
    uint32_t used_blk_cnt = _slot_index.load(butil::memory_order_relaxed);
    uint64_t used_bytes = 0;
    for (uint32_t i = 0; i < used_blk_cnt; ++i) {
      used_bytes += _node[i]->data.offset;
W
wangguibao 已提交
121
    }
W
wangguibao 已提交
122 123
    return used_bytes >> 10;
  }
W
wangguibao 已提交
124

W
wangguibao 已提交
125 126 127
  uint32_t allocate_blocks() const {
    return _slot_index.load(butil::memory_order_relaxed);
  }
W
wangguibao 已提交
128

W
wangguibao 已提交
129 130 131 132 133 134 135
  uint32_t free_blocks() const {
    uint64_t head = _head.load(butil::memory_order_relaxed);
    uint32_t size = 0;
    while (head != FreeList::EMPTY) {
      const Node* head_ptr = address(head);
      head = head_ptr->next;
      ++size;
W
wangguibao 已提交
136
    }
W
wangguibao 已提交
137 138
    return size;
  }
W
wangguibao 已提交
139

W
wangguibao 已提交
140 141 142 143
  void reset() {
    _head.store(FreeList::EMPTY, butil::memory_order_relaxed);
    _slot_index.store(0, butil::memory_order_relaxed);
  }
W
wangguibao 已提交
144

W
wangguibao 已提交
145 146 147
  FreeList() {
    for (int i = 0; i < CAP; ++i) {
      _node[i] = NULL;
W
wangguibao 已提交
148
    }
W
wangguibao 已提交
149 150
    reset();
  }
W
wangguibao 已提交
151

W
wangguibao 已提交
152 153
 private:
  uint32_t slot(uint64_t id) const { return static_cast<uint32_t>(id); }
W
wangguibao 已提交
154

W
wangguibao 已提交
155 156 157 158 159
  T* new_node() {
    uint32_t index = _slot_index.fetch_add(1, butil::memory_order_relaxed);
    if (index >= CAP) {
      return NULL;
    }
W
wangguibao 已提交
160

W
wangguibao 已提交
161 162 163
    if (_node[index] != NULL) {
      return &(_node[index]->data);
    }
W
wangguibao 已提交
164

W
wangguibao 已提交
165 166
    Node* node = reinterpret_cast<Node*>(malloc(sizeof(Node)));
    new (node) Node;
W
wangguibao 已提交
167

W
wangguibao 已提交
168 169
    node->id = index;
    _node[index] = node;
W
wangguibao 已提交
170

W
wangguibao 已提交
171 172
    return &node->data;
  }
W
wangguibao 已提交
173

W
wangguibao 已提交
174
  Node* address(uint64_t id) { return _node[slot(id)]; }
W
wangguibao 已提交
175

W
wangguibao 已提交
176
  const Node* address(uint64_t id) const { return _node[slot(id)]; }
W
wangguibao 已提交
177

W
wangguibao 已提交
178 179 180 181 182
  butil::atomic<uint64_t> _head;
  butil::atomic<uint32_t> _slot_index;
  Node* _node[CAP];
};
}  // namespace lockfree
W
wangguibao 已提交
183 184 185 186

namespace memory {

struct Block {
W
wangguibao 已提交
187 188
  static const int BLOCK_SIZE = 2 * 1024 * 1024;
  char data[BLOCK_SIZE];
W
wangguibao 已提交
189 190 191
};

class GlobalBlockFreeList {
W
wangguibao 已提交
192 193 194 195 196 197 198
 public:
  static const int MAX_BLOCK_COUNT = 32 * 1024;
  typedef lockfree::FreeList<Block, MAX_BLOCK_COUNT> type;
  static type* instance() {
    static type singleton;
    return &singleton;
  }
W
wangguibao 已提交
199 200 201
};

struct BlockReference {
W
wangguibao 已提交
202 203 204
  BlockReference() : offset(0), block(NULL) {
    // do nothing
  }
W
wangguibao 已提交
205

W
wangguibao 已提交
206 207 208 209
  void reset() {
    offset = 0;
    block = NULL;
  }
W
wangguibao 已提交
210

W
wangguibao 已提交
211 212
  uint32_t offset;
  Block* block;
W
wangguibao 已提交
213 214 215
};

class Region {
W
wangguibao 已提交
216 217 218 219 220 221 222 223 224
 public:
  struct GlobalPut {
    void operator()(BlockReference* block_ref) {
      if (block_ref->block != NULL) {
        GlobalBlockFreeList::instance()->put(block_ref->block);
      }
      block_ref->reset();
    }
  };
W
wangguibao 已提交
225

W
wangguibao 已提交
226 227 228 229
  struct BigNode {
    BigNode* next;
    char data[0];
  };
W
wangguibao 已提交
230

W
wangguibao 已提交
231 232 233 234 235
  ~Region() {
    reset();
    delete[] _big_mem_start;
    _big_mem_start = NULL;
  }
W
wangguibao 已提交
236

W
wangguibao 已提交
237 238 239 240 241 242 243 244
  char const* debug_str() const {
    uint32_t alloc_blocks = _free_blocks.allocate_blocks();
    uint32_t free_blocks = _free_blocks.free_blocks();
    uint32_t used_mem_mb = _free_blocks.real_used_size();
    uint32_t big_buf_size = _big_mem_size.load(butil::memory_order_relaxed);
    uint32_t big_buf_count = _big_mem_count.load(butil::memory_order_relaxed);
    uint32_t mlc_mem_size = _mlc_mem_size.load(butil::memory_order_relaxed);
    uint32_t mlc_mem_count = _mlc_mem_count.load(butil::memory_order_relaxed);
W
wangguibao 已提交
245

W
wangguibao 已提交
246 247 248 249 250 251 252
    std::ostringstream oss;
    oss << "[alloc_blks:" << alloc_blocks << ",free_blks:" << free_blocks
        << ",used_mem_kb:" << used_mem_mb
        << ",big_mem_kb:" << (big_buf_size >> 10)
        << ",big_buf_cnt:" << big_buf_count
        << ",mlc_mem_kb:" << (mlc_mem_size >> 10)
        << ",mlc_cnt:" << mlc_mem_count << "]";
W
wangguibao 已提交
253

W
wangguibao 已提交
254 255
    return oss.str().c_str();
  }
W
wangguibao 已提交
256

W
wangguibao 已提交
257
  Region();
W
wangguibao 已提交
258

W
wangguibao 已提交
259
  void init();
W
wangguibao 已提交
260

W
wangguibao 已提交
261
  void reset();
W
wangguibao 已提交
262

W
wangguibao 已提交
263
  BlockReference* get();
W
wangguibao 已提交
264

W
wangguibao 已提交
265
  void* malloc(size_t size);
W
wangguibao 已提交
266

W
wangguibao 已提交
267
  void put(BlockReference* block);
W
wangguibao 已提交
268

W
wangguibao 已提交
269 270 271 272
  static const int MAX_BLOCK_COUNT = 1024;
  static const int BIG_MEM_THRESHOLD = 256 * 1024;
  static const int MLC_MEM_THRESHOLD = 4 * 1024 * 1024;
  static const int COUNTER_SIZE = MLC_MEM_THRESHOLD / BIG_MEM_THRESHOLD + 1;
W
wangguibao 已提交
273

W
wangguibao 已提交
274 275 276
 private:
  lockfree::FreeList<BlockReference, MAX_BLOCK_COUNT> _free_blocks;
  lockfree::PushOnlyStack<BigNode> _big_nodes;
W
wangguibao 已提交
277

W
wangguibao 已提交
278 279
  butil::atomic<uint32_t> _big_mem_size;
  butil::atomic<uint32_t> _big_mem_count;
W
wangguibao 已提交
280

W
wangguibao 已提交
281 282
  char* _big_mem_start;
  uint32_t _big_mem_capacity;
W
wangguibao 已提交
283

W
wangguibao 已提交
284 285 286 287 288
  butil::atomic<uint32_t> _mlc_mem_size;
  butil::atomic<uint32_t> _mlc_mem_count;
};
}  // namespace memory
}  // namespace fugue
W
wangguibao 已提交
289 290

class Mempool {
W
wangguibao 已提交
291 292 293 294 295 296 297 298
 public:
  void* malloc(size_t size) {
    size = _align(size);
    if (size <= _free_size) {
      void* p = _free_cursor;
      _free_size -= size;
      _free_cursor += size;
      return p;
W
wangguibao 已提交
299 300
    }

W
wangguibao 已提交
301 302
    return malloc_from_region(size);
  }
W
wangguibao 已提交
303

W
wangguibao 已提交
304 305 306
  void free(void* p, size_t size) {
    if (size >= fugue::memory::Region::BIG_MEM_THRESHOLD) {
      return;
W
wangguibao 已提交
307 308
    }

W
wangguibao 已提交
309 310 311 312 313 314
    if (_free_cursor - size == static_cast<char*>(p)) {
      size_t down_aligned = _down_align(size);
      _free_cursor -= down_aligned;
      _free_size += down_aligned;
    }
  }
W
wangguibao 已提交
315

W
wangguibao 已提交
316 317 318 319
  void* realloc(void* old_data, size_t old_size, size_t new_size) {
    if (old_size >= new_size) {
      return old_data;
    }
W
wangguibao 已提交
320

W
wangguibao 已提交
321 322 323 324 325 326 327 328 329 330
    size_t required = new_size - old_size;
    if (_free_cursor == static_cast<char*>(old_data) + old_size) {
      if (_free_size >= required) {
        _free_cursor += required;
        _free_size -= required;
        return old_data;
      } else {
        _free_cursor = static_cast<char*>(old_data);
        _free_size += old_size;
      }
W
wangguibao 已提交
331 332
    }

W
wangguibao 已提交
333 334 335 336
    void* p = this->malloc_from_region(new_size);
    if (p != NULL) {
      memcpy(p, old_data, old_size);
      return p;
W
wangguibao 已提交
337 338
    }

W
wangguibao 已提交
339 340 341 342 343 344 345 346 347 348 349 350 351 352
    return NULL;
  }

  explicit Mempool(fugue::memory::Region* blocks)
      : _free_size(0), _free_cursor(NULL), _blocks(blocks) {
    _block = NULL;
  }

  ~Mempool() { release_block(); }

  void release_block() {
    if (_block) {
      _block->offset = fugue::memory::Block::BLOCK_SIZE - _free_size;
      _blocks->put(_block);
W
wangguibao 已提交
353 354
    }

W
wangguibao 已提交
355 356 357 358
    _free_size = 0;
    _free_cursor = NULL;
    _block = NULL;
  }
W
wangguibao 已提交
359

W
wangguibao 已提交
360 361 362 363
 private:
  void* malloc_from_region(size_t size) {
    if (size >= fugue::memory::Region::BIG_MEM_THRESHOLD) {
      return _blocks->malloc(size);
W
wangguibao 已提交
364 365
    }

W
wangguibao 已提交
366 367 368 369 370
    while (true) {
      fugue::memory::BlockReference* block = _blocks->get();
      if (block == NULL) {
        return NULL;
      }
W
wangguibao 已提交
371

W
wangguibao 已提交
372 373 374 375
      uint32_t free_size = fugue::memory::Block::BLOCK_SIZE - block->offset;
      if (size <= free_size) {
        if (_block) {
          _block->offset = fugue::memory::Block::BLOCK_SIZE - _free_size;
W
wangguibao 已提交
376
        }
W
wangguibao 已提交
377 378 379 380 381 382 383

        char* p = block->block->data + block->offset;
        _free_size = free_size - size;
        _free_cursor = p + size;
        _block = block;
        return p;
      }
W
wangguibao 已提交
384
    }
W
wangguibao 已提交
385 386
    return _blocks->malloc(size);
  }
W
wangguibao 已提交
387

W
wangguibao 已提交
388
  static const int ALIGN_SIZE = sizeof(void*);
W
wangguibao 已提交
389

W
wangguibao 已提交
390 391 392
  inline size_t _align(size_t size) const {
    return (size + (ALIGN_SIZE - 1)) & ~(ALIGN_SIZE - 1);
  }
W
wangguibao 已提交
393

W
wangguibao 已提交
394 395 396
  inline size_t _down_align(size_t size) const {
    return size & ~(ALIGN_SIZE - 1);
  }
W
wangguibao 已提交
397

W
wangguibao 已提交
398 399
  size_t _free_size;
  char* _free_cursor;
W
wangguibao 已提交
400

W
wangguibao 已提交
401 402
  fugue::memory::Region* _blocks;
  fugue::memory::BlockReference* _block;
W
wangguibao 已提交
403 404 405
};

extern __thread Mempool* g_mempool;
W
wangguibao 已提交
406
class mempool {
W
wangguibao 已提交
407 408 409 410
 public:
  virtual void* malloc(size_t size) = 0;
  virtual void free(void* p, size_t size) = 0;
  inline virtual ~mempool() {}
W
wangguibao 已提交
411
};
W
wangguibao 已提交
412

W
wangguibao 已提交
413
class GlobalMempool : public mempool {
W
wangguibao 已提交
414 415 416 417
 public:
  GlobalMempool() {
    // do nothing;
  }
W
wangguibao 已提交
418

W
wangguibao 已提交
419 420 421
  virtual ~GlobalMempool() {
    // do nothing;
  }
W
wangguibao 已提交
422

W
wangguibao 已提交
423 424 425 426
  static GlobalMempool* instance() {
    static GlobalMempool singleton;
    return &singleton;
  }
W
wangguibao 已提交
427

W
wangguibao 已提交
428
  void reset(Mempool* mempool) { g_mempool = mempool; }
W
wangguibao 已提交
429

W
wangguibao 已提交
430
  void* malloc(size_t size) { return g_mempool->malloc(size); }
W
wangguibao 已提交
431

W
wangguibao 已提交
432 433 434
  void* realloc(void* old_data, size_t old_size, size_t new_size) {
    return g_mempool->realloc(old_data, old_size, new_size);
  }
W
wangguibao 已提交
435

W
wangguibao 已提交
436
  void free(void* p, size_t s) { g_mempool->free(p, s); }
W
wangguibao 已提交
437

W
wangguibao 已提交
438
  void clear() { g_mempool->release_block(); }
W
wangguibao 已提交
439

W
wangguibao 已提交
440
  Mempool* get() { return g_mempool; }
W
wangguibao 已提交
441 442 443
};

class MempoolGuard {
W
wangguibao 已提交
444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463
 public:
  explicit MempoolGuard(fugue::memory::Region* region) : _mempool(region) {
    acquire();
  }

  void acquire() {
    _saved_mempool = g_mempool;
    g_mempool = &_mempool;
  }

  void release() {
    _mempool.release_block();
    g_mempool = _saved_mempool;
  }

  ~MempoolGuard() { release(); }

 private:
  Mempool _mempool;
  Mempool* _saved_mempool;
W
wangguibao 已提交
464 465 466
};

inline std::string print_trace() {
W
wangguibao 已提交
467 468
  static const int BT_BUF_SIZE = 400;
  std::stringstream debug_stream;
W
wangguibao 已提交
469

W
wangguibao 已提交
470 471 472
  void* buffer[BT_BUF_SIZE];
  int nptrs = backtrace(buffer, BT_BUF_SIZE);
  char** strings = backtrace_symbols(buffer, nptrs);
W
wangguibao 已提交
473

W
wangguibao 已提交
474 475 476
  for (int j = 0; j < nptrs; j++) {
    debug_stream << strings[j] << "\t";
  }
W
wangguibao 已提交
477

W
wangguibao 已提交
478
  return debug_stream.str();
W
wangguibao 已提交
479
}
W
wangguibao 已提交
480
}  // namespace im