mempool.h 10.9 KB
Newer Older
W
wangguibao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once
W
wangguibao 已提交
16 17
#include <butil/atomicops.h>
#include <butil/logging.h>
W
wangguibao 已提交
18 19
#include <execinfo.h>
#include <pthread.h>
W
wangguibao 已提交
20
#include <iostream>
W
wangguibao 已提交
21 22 23
#include <new>
#include <sstream>
#include <string>
W
wangguibao 已提交
24 25 26 27 28 29 30 31

namespace im {
namespace fugue {

namespace lockfree {

template <class T>
class PushOnlyStack {
W
wangguibao 已提交
32 33
 public:
  PushOnlyStack() { _head.store(NULL, butil::memory_order_relaxed); }
W
wangguibao 已提交
34

W
wangguibao 已提交
35 36 37 38 39 40
  void push(T* node) {
    T* head = _head.load(butil::memory_order_relaxed);
    node->next = head;
    while (
        !_head.compare_exchange_weak(head, node, butil::memory_order_relaxed)) {
      node->next = head;
W
wangguibao 已提交
41
    }
W
wangguibao 已提交
42
  }
W
wangguibao 已提交
43

W
wangguibao 已提交
44
  T* release() { return _head.exchange(NULL, butil::memory_order_relaxed); }
W
wangguibao 已提交
45

W
wangguibao 已提交
46 47
 private:
  butil::atomic<T*> _head;
W
wangguibao 已提交
48 49 50 51
};

template <class T>
struct FreeListNode {
W
wangguibao 已提交
52 53 54
  uint64_t id;
  uint64_t next;
  T data;
W
wangguibao 已提交
55 56 57 58
};

template <class T, int CAP>
class FreeList {
W
wangguibao 已提交
59 60 61
 public:
  typedef FreeListNode<T> Node;
  static const uint64_t EMPTY = 0xFFFFFFFFFFFFFFFF;
W
wangguibao 已提交
62

W
wangguibao 已提交
63 64 65 66
  T* get() {
    uint64_t head = _head.load(butil::memory_order_acquire);
    if (head == EMPTY) {
      return new_node();
W
wangguibao 已提交
67 68
    }

W
wangguibao 已提交
69 70 71 72 73 74 75 76 77 78
    Node* node = address(head);
    while (!_head.compare_exchange_weak(
        head, node->next, butil::memory_order_acquire)) {
      if (head == EMPTY) {
        return new_node();
      }
      node = address(head);
    }
    return &node->data;
  }
W
wangguibao 已提交
79

W
wangguibao 已提交
80 81
  void put(T* value) {
    Node* node = container_of(value, Node, data);
W
wangguibao 已提交
82

W
wangguibao 已提交
83 84 85 86
    uint64_t head = _head.load(butil::memory_order_acquire);
    // add version
    node->id += (1UL << 32);
    node->next = head;
W
wangguibao 已提交
87

W
wangguibao 已提交
88 89 90 91 92 93
    // NOTE: we MUST use a temp var *head* to call compare_exchange_weak
    // because Boost.Atomic will update the *expected* even success
    // std::atomic do not have this limitation
    while (!_head.compare_exchange_weak(
        head, node->id, butil::memory_order_release)) {
      node->next = head;
W
wangguibao 已提交
94
    }
W
wangguibao 已提交
95
  }
W
wangguibao 已提交
96

W
wangguibao 已提交
97 98 99 100 101
  template <class F>
  void unsafe_foreach() {
    uint32_t used_blk_cnt = _slot_index.load(butil::memory_order_relaxed);
    for (uint32_t i = 0; i < used_blk_cnt; ++i) {
      F()(&_node[i]->data);
W
wangguibao 已提交
102
    }
W
wangguibao 已提交
103
  }
W
wangguibao 已提交
104

W
wangguibao 已提交
105 106 107 108 109
  uint32_t real_used_size() const {
    uint32_t used_blk_cnt = _slot_index.load(butil::memory_order_relaxed);
    uint64_t used_bytes = 0;
    for (uint32_t i = 0; i < used_blk_cnt; ++i) {
      used_bytes += _node[i]->data.offset;
W
wangguibao 已提交
110
    }
W
wangguibao 已提交
111 112
    return used_bytes >> 10;
  }
W
wangguibao 已提交
113

W
wangguibao 已提交
114 115 116
  uint32_t allocate_blocks() const {
    return _slot_index.load(butil::memory_order_relaxed);
  }
W
wangguibao 已提交
117

W
wangguibao 已提交
118 119 120 121 122 123 124
  uint32_t free_blocks() const {
    uint64_t head = _head.load(butil::memory_order_relaxed);
    uint32_t size = 0;
    while (head != FreeList::EMPTY) {
      const Node* head_ptr = address(head);
      head = head_ptr->next;
      ++size;
W
wangguibao 已提交
125
    }
W
wangguibao 已提交
126 127
    return size;
  }
W
wangguibao 已提交
128

W
wangguibao 已提交
129 130 131 132
  void reset() {
    _head.store(FreeList::EMPTY, butil::memory_order_relaxed);
    _slot_index.store(0, butil::memory_order_relaxed);
  }
W
wangguibao 已提交
133

W
wangguibao 已提交
134 135 136
  FreeList() {
    for (int i = 0; i < CAP; ++i) {
      _node[i] = NULL;
W
wangguibao 已提交
137
    }
W
wangguibao 已提交
138 139
    reset();
  }
W
wangguibao 已提交
140

W
wangguibao 已提交
141 142
 private:
  uint32_t slot(uint64_t id) const { return static_cast<uint32_t>(id); }
W
wangguibao 已提交
143

W
wangguibao 已提交
144 145 146 147 148
  T* new_node() {
    uint32_t index = _slot_index.fetch_add(1, butil::memory_order_relaxed);
    if (index >= CAP) {
      return NULL;
    }
W
wangguibao 已提交
149

W
wangguibao 已提交
150 151 152
    if (_node[index] != NULL) {
      return &(_node[index]->data);
    }
W
wangguibao 已提交
153

W
wangguibao 已提交
154 155
    Node* node = reinterpret_cast<Node*>(malloc(sizeof(Node)));
    new (node) Node;
W
wangguibao 已提交
156

W
wangguibao 已提交
157 158
    node->id = index;
    _node[index] = node;
W
wangguibao 已提交
159

W
wangguibao 已提交
160 161
    return &node->data;
  }
W
wangguibao 已提交
162

W
wangguibao 已提交
163
  Node* address(uint64_t id) { return _node[slot(id)]; }
W
wangguibao 已提交
164

W
wangguibao 已提交
165
  const Node* address(uint64_t id) const { return _node[slot(id)]; }
W
wangguibao 已提交
166

W
wangguibao 已提交
167 168 169 170 171
  butil::atomic<uint64_t> _head;
  butil::atomic<uint32_t> _slot_index;
  Node* _node[CAP];
};
}  // namespace lockfree
W
wangguibao 已提交
172 173 174 175

namespace memory {

struct Block {
W
wangguibao 已提交
176 177
  static const int BLOCK_SIZE = 2 * 1024 * 1024;
  char data[BLOCK_SIZE];
W
wangguibao 已提交
178 179 180
};

class GlobalBlockFreeList {
W
wangguibao 已提交
181 182 183 184 185 186 187
 public:
  static const int MAX_BLOCK_COUNT = 32 * 1024;
  typedef lockfree::FreeList<Block, MAX_BLOCK_COUNT> type;
  static type* instance() {
    static type singleton;
    return &singleton;
  }
W
wangguibao 已提交
188 189 190
};

struct BlockReference {
W
wangguibao 已提交
191 192 193
  BlockReference() : offset(0), block(NULL) {
    // do nothing
  }
W
wangguibao 已提交
194

W
wangguibao 已提交
195 196 197 198
  void reset() {
    offset = 0;
    block = NULL;
  }
W
wangguibao 已提交
199

W
wangguibao 已提交
200 201
  uint32_t offset;
  Block* block;
W
wangguibao 已提交
202 203 204
};

class Region {
W
wangguibao 已提交
205 206 207 208 209 210 211 212 213
 public:
  struct GlobalPut {
    void operator()(BlockReference* block_ref) {
      if (block_ref->block != NULL) {
        GlobalBlockFreeList::instance()->put(block_ref->block);
      }
      block_ref->reset();
    }
  };
W
wangguibao 已提交
214

W
wangguibao 已提交
215 216 217 218
  struct BigNode {
    BigNode* next;
    char data[0];
  };
W
wangguibao 已提交
219

W
wangguibao 已提交
220 221 222 223 224
  ~Region() {
    reset();
    delete[] _big_mem_start;
    _big_mem_start = NULL;
  }
W
wangguibao 已提交
225

W
wangguibao 已提交
226 227 228 229 230 231 232 233
  char const* debug_str() const {
    uint32_t alloc_blocks = _free_blocks.allocate_blocks();
    uint32_t free_blocks = _free_blocks.free_blocks();
    uint32_t used_mem_mb = _free_blocks.real_used_size();
    uint32_t big_buf_size = _big_mem_size.load(butil::memory_order_relaxed);
    uint32_t big_buf_count = _big_mem_count.load(butil::memory_order_relaxed);
    uint32_t mlc_mem_size = _mlc_mem_size.load(butil::memory_order_relaxed);
    uint32_t mlc_mem_count = _mlc_mem_count.load(butil::memory_order_relaxed);
W
wangguibao 已提交
234

W
wangguibao 已提交
235 236 237 238 239 240 241
    std::ostringstream oss;
    oss << "[alloc_blks:" << alloc_blocks << ",free_blks:" << free_blocks
        << ",used_mem_kb:" << used_mem_mb
        << ",big_mem_kb:" << (big_buf_size >> 10)
        << ",big_buf_cnt:" << big_buf_count
        << ",mlc_mem_kb:" << (mlc_mem_size >> 10)
        << ",mlc_cnt:" << mlc_mem_count << "]";
W
wangguibao 已提交
242

W
wangguibao 已提交
243 244
    return oss.str().c_str();
  }
W
wangguibao 已提交
245

W
wangguibao 已提交
246
  Region();
W
wangguibao 已提交
247

W
wangguibao 已提交
248
  void init();
W
wangguibao 已提交
249

W
wangguibao 已提交
250
  void reset();
W
wangguibao 已提交
251

W
wangguibao 已提交
252
  BlockReference* get();
W
wangguibao 已提交
253

W
wangguibao 已提交
254
  void* malloc(size_t size);
W
wangguibao 已提交
255

W
wangguibao 已提交
256
  void put(BlockReference* block);
W
wangguibao 已提交
257

W
wangguibao 已提交
258 259 260 261
  static const int MAX_BLOCK_COUNT = 1024;
  static const int BIG_MEM_THRESHOLD = 256 * 1024;
  static const int MLC_MEM_THRESHOLD = 4 * 1024 * 1024;
  static const int COUNTER_SIZE = MLC_MEM_THRESHOLD / BIG_MEM_THRESHOLD + 1;
W
wangguibao 已提交
262

W
wangguibao 已提交
263 264 265
 private:
  lockfree::FreeList<BlockReference, MAX_BLOCK_COUNT> _free_blocks;
  lockfree::PushOnlyStack<BigNode> _big_nodes;
W
wangguibao 已提交
266

W
wangguibao 已提交
267 268
  butil::atomic<uint32_t> _big_mem_size;
  butil::atomic<uint32_t> _big_mem_count;
W
wangguibao 已提交
269

W
wangguibao 已提交
270 271
  char* _big_mem_start;
  uint32_t _big_mem_capacity;
W
wangguibao 已提交
272

W
wangguibao 已提交
273 274 275 276 277
  butil::atomic<uint32_t> _mlc_mem_size;
  butil::atomic<uint32_t> _mlc_mem_count;
};
}  // namespace memory
}  // namespace fugue
W
wangguibao 已提交
278 279

class Mempool {
W
wangguibao 已提交
280 281 282 283 284 285 286 287
 public:
  void* malloc(size_t size) {
    size = _align(size);
    if (size <= _free_size) {
      void* p = _free_cursor;
      _free_size -= size;
      _free_cursor += size;
      return p;
W
wangguibao 已提交
288 289
    }

W
wangguibao 已提交
290 291
    return malloc_from_region(size);
  }
W
wangguibao 已提交
292

W
wangguibao 已提交
293 294 295
  void free(void* p, size_t size) {
    if (size >= fugue::memory::Region::BIG_MEM_THRESHOLD) {
      return;
W
wangguibao 已提交
296 297
    }

W
wangguibao 已提交
298 299 300 301 302 303
    if (_free_cursor - size == static_cast<char*>(p)) {
      size_t down_aligned = _down_align(size);
      _free_cursor -= down_aligned;
      _free_size += down_aligned;
    }
  }
W
wangguibao 已提交
304

W
wangguibao 已提交
305 306 307 308
  void* realloc(void* old_data, size_t old_size, size_t new_size) {
    if (old_size >= new_size) {
      return old_data;
    }
W
wangguibao 已提交
309

W
wangguibao 已提交
310 311 312 313 314 315 316 317 318 319
    size_t required = new_size - old_size;
    if (_free_cursor == static_cast<char*>(old_data) + old_size) {
      if (_free_size >= required) {
        _free_cursor += required;
        _free_size -= required;
        return old_data;
      } else {
        _free_cursor = static_cast<char*>(old_data);
        _free_size += old_size;
      }
W
wangguibao 已提交
320 321
    }

W
wangguibao 已提交
322 323 324 325
    void* p = this->malloc_from_region(new_size);
    if (p != NULL) {
      memcpy(p, old_data, old_size);
      return p;
W
wangguibao 已提交
326 327
    }

W
wangguibao 已提交
328 329 330 331 332 333 334 335 336 337 338 339 340 341
    return NULL;
  }

  explicit Mempool(fugue::memory::Region* blocks)
      : _free_size(0), _free_cursor(NULL), _blocks(blocks) {
    _block = NULL;
  }

  ~Mempool() { release_block(); }

  void release_block() {
    if (_block) {
      _block->offset = fugue::memory::Block::BLOCK_SIZE - _free_size;
      _blocks->put(_block);
W
wangguibao 已提交
342 343
    }

W
wangguibao 已提交
344 345 346 347
    _free_size = 0;
    _free_cursor = NULL;
    _block = NULL;
  }
W
wangguibao 已提交
348

W
wangguibao 已提交
349 350 351 352
 private:
  void* malloc_from_region(size_t size) {
    if (size >= fugue::memory::Region::BIG_MEM_THRESHOLD) {
      return _blocks->malloc(size);
W
wangguibao 已提交
353 354
    }

W
wangguibao 已提交
355 356 357 358 359
    while (true) {
      fugue::memory::BlockReference* block = _blocks->get();
      if (block == NULL) {
        return NULL;
      }
W
wangguibao 已提交
360

W
wangguibao 已提交
361 362 363 364
      uint32_t free_size = fugue::memory::Block::BLOCK_SIZE - block->offset;
      if (size <= free_size) {
        if (_block) {
          _block->offset = fugue::memory::Block::BLOCK_SIZE - _free_size;
W
wangguibao 已提交
365
        }
W
wangguibao 已提交
366 367 368 369 370 371 372

        char* p = block->block->data + block->offset;
        _free_size = free_size - size;
        _free_cursor = p + size;
        _block = block;
        return p;
      }
W
wangguibao 已提交
373
    }
W
wangguibao 已提交
374 375
    return _blocks->malloc(size);
  }
W
wangguibao 已提交
376

W
wangguibao 已提交
377
  static const int ALIGN_SIZE = sizeof(void*);
W
wangguibao 已提交
378

W
wangguibao 已提交
379 380 381
  inline size_t _align(size_t size) const {
    return (size + (ALIGN_SIZE - 1)) & ~(ALIGN_SIZE - 1);
  }
W
wangguibao 已提交
382

W
wangguibao 已提交
383 384 385
  inline size_t _down_align(size_t size) const {
    return size & ~(ALIGN_SIZE - 1);
  }
W
wangguibao 已提交
386

W
wangguibao 已提交
387 388
  size_t _free_size;
  char* _free_cursor;
W
wangguibao 已提交
389

W
wangguibao 已提交
390 391
  fugue::memory::Region* _blocks;
  fugue::memory::BlockReference* _block;
W
wangguibao 已提交
392 393 394
};

extern __thread Mempool* g_mempool;
W
wangguibao 已提交
395
class mempool {
W
wangguibao 已提交
396 397 398 399
 public:
  virtual void* malloc(size_t size) = 0;
  virtual void free(void* p, size_t size) = 0;
  inline virtual ~mempool() {}
W
wangguibao 已提交
400
};
W
wangguibao 已提交
401

W
wangguibao 已提交
402
class GlobalMempool : public mempool {
W
wangguibao 已提交
403 404 405 406
 public:
  GlobalMempool() {
    // do nothing;
  }
W
wangguibao 已提交
407

W
wangguibao 已提交
408 409 410
  virtual ~GlobalMempool() {
    // do nothing;
  }
W
wangguibao 已提交
411

W
wangguibao 已提交
412 413 414 415
  static GlobalMempool* instance() {
    static GlobalMempool singleton;
    return &singleton;
  }
W
wangguibao 已提交
416

W
wangguibao 已提交
417
  void reset(Mempool* mempool) { g_mempool = mempool; }
W
wangguibao 已提交
418

W
wangguibao 已提交
419
  void* malloc(size_t size) { return g_mempool->malloc(size); }
W
wangguibao 已提交
420

W
wangguibao 已提交
421 422 423
  void* realloc(void* old_data, size_t old_size, size_t new_size) {
    return g_mempool->realloc(old_data, old_size, new_size);
  }
W
wangguibao 已提交
424

W
wangguibao 已提交
425
  void free(void* p, size_t s) { g_mempool->free(p, s); }
W
wangguibao 已提交
426

W
wangguibao 已提交
427
  void clear() { g_mempool->release_block(); }
W
wangguibao 已提交
428

W
wangguibao 已提交
429
  Mempool* get() { return g_mempool; }
W
wangguibao 已提交
430 431 432
};

class MempoolGuard {
W
wangguibao 已提交
433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452
 public:
  explicit MempoolGuard(fugue::memory::Region* region) : _mempool(region) {
    acquire();
  }

  void acquire() {
    _saved_mempool = g_mempool;
    g_mempool = &_mempool;
  }

  void release() {
    _mempool.release_block();
    g_mempool = _saved_mempool;
  }

  ~MempoolGuard() { release(); }

 private:
  Mempool _mempool;
  Mempool* _saved_mempool;
W
wangguibao 已提交
453 454 455
};

inline std::string print_trace() {
W
wangguibao 已提交
456 457
  static const int BT_BUF_SIZE = 400;
  std::stringstream debug_stream;
W
wangguibao 已提交
458

W
wangguibao 已提交
459 460 461
  void* buffer[BT_BUF_SIZE];
  int nptrs = backtrace(buffer, BT_BUF_SIZE);
  char** strings = backtrace_symbols(buffer, nptrs);
W
wangguibao 已提交
462

W
wangguibao 已提交
463 464 465
  for (int j = 0; j < nptrs; j++) {
    debug_stream << strings[j] << "\t";
  }
W
wangguibao 已提交
466

W
wangguibao 已提交
467
  return debug_stream.str();
W
wangguibao 已提交
468
}
W
wangguibao 已提交
469
}  // namespace im