secondary_cache_adapter.cc 16.9 KB
Newer Older
1 2 3 4 5 6 7 8
//  Copyright (c) Meta Platforms, Inc. and affiliates.
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).

#include "cache/secondary_cache_adapter.h"

#include "monitoring/perf_context_imp.h"
9
#include "util/cast_util.h"
10 11 12 13 14 15 16 17 18 19 20 21

namespace ROCKSDB_NAMESPACE {

namespace {
// A distinct pointer value for marking "dummy" cache entries
struct Dummy {
  char val[7] = "kDummy";
};
const Dummy kDummy{};
Cache::ObjectPtr const kDummyObj = const_cast<Dummy*>(&kDummy);
}  // namespace

22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
// When CacheWithSecondaryAdapter is constructed with the distribute_cache_res
// parameter set to true, it manages the entire memory budget across the
// primary and secondary cache. The secondary cache is assumed to be in
// memory, such as the CompressedSecondaryCache. When a placeholder entry
// is inserted by a CacheReservationManager instance to reserve memory,
// the CacheWithSecondaryAdapter ensures that the reservation is distributed
// proportionally across the primary/secondary caches.
//
// The primary block cache is initially sized to the sum of the primary cache
// budget + teh secondary cache budget, as follows -
//   |---------    Primary Cache Configured Capacity  -----------|
//   |---Secondary Cache Budget----|----Primary Cache Budget-----|
//
// A ConcurrentCacheReservationManager member in the CacheWithSecondaryAdapter,
// pri_cache_res_,
// is used to help with tracking the distribution of memory reservations.
// Initially, it accounts for the entire secondary cache budget as a
// reservation against the primary cache. This shrinks the usable capacity of
// the primary cache to the budget that the user originally desired.
//
//   |--Reservation for Sec Cache--|-Pri Cache Usable Capacity---|
//
// When a reservation placeholder is inserted into the adapter, it is inserted
// directly into the primary cache. This means the entire charge of the
// placeholder is counted against the primary cache. To compensate and count
// a portion of it against the secondary cache, the secondary cache Deflate()
// method is called to shrink it. Since the Deflate() causes the secondary
// actual usage to shrink, it is refelcted here by releasing an equal amount
// from the pri_cache_res_ reservation. The Deflate() in the secondary cache
// can be, but is not required to be, implemented using its own cache
// reservation manager.
//
// For example, if the pri/sec ratio is 70/30, and the combined capacity is
// 100MB, the intermediate and final  state after inserting a reservation
// placeholder for 10MB would be as follows -
//
//   |-Reservation for Sec Cache-|-Pri Cache Usable Capacity-|---R---|
// 1. After inserting the placeholder in primary
//   |-------  30MB -------------|------- 60MB -------------|-10MB--|
// 2. After deflating the secondary and adjusting the reservation for
//    secondary against the primary
//   |-------  27MB -------------|------- 63MB -------------|-10MB--|
//
// Likewise, when the user inserted placeholder is released, the secondary
// cache Inflate() method is called to grow it, and the pri_cache_res_
// reservation is increased by an equal amount.
//
// Another way of implementing this would have been to simply split the user
// reservation into primary and seconary components. However, this would
// require allocating a structure to track the associated secondary cache
// reservation, which adds some complexity and overhead.
//
74 75
CacheWithSecondaryAdapter::CacheWithSecondaryAdapter(
    std::shared_ptr<Cache> target,
76
    std::shared_ptr<SecondaryCache> secondary_cache, bool distribute_cache_res)
77
    : CacheWrapper(std::move(target)),
78 79
      secondary_cache_(std::move(secondary_cache)),
      distribute_cache_res_(distribute_cache_res) {
80 81 82
  target_->SetEvictionCallback([this](const Slice& key, Handle* handle) {
    return EvictionHandler(key, handle);
  });
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
  if (distribute_cache_res_) {
    size_t sec_capacity = 0;
    pri_cache_res_ = std::make_shared<ConcurrentCacheReservationManager>(
        std::make_shared<CacheReservationManagerImpl<CacheEntryRole::kMisc>>(
            target_));
    Status s = secondary_cache_->GetCapacity(sec_capacity);
    assert(s.ok());
    // Initially, the primary cache is sized to uncompressed cache budget plsu
    // compressed secondary cache budget. The secondary cache budget is then
    // taken away from the primary cache through cache reservations. Later,
    // when a placeholder entry is inserted by the caller, its inserted
    // into the primary cache and the portion that should be assigned to the
    // secondary cache is freed from the reservation.
    s = pri_cache_res_->UpdateCacheReservation(sec_capacity);
    assert(s.ok());
    sec_cache_res_ratio_ = (double)sec_capacity / target_->GetCapacity();
  }
100 101 102 103 104 105
}

CacheWithSecondaryAdapter::~CacheWithSecondaryAdapter() {
  // `*this` will be destroyed before `*target_`, so we have to prevent
  // use after free
  target_->SetEvictionCallback({});
106 107 108 109 110 111 112 113
#ifndef NDEBUG
  if (distribute_cache_res_) {
    size_t sec_capacity = 0;
    Status s = secondary_cache_->GetCapacity(sec_capacity);
    assert(s.ok());
    assert(pri_cache_res_->GetTotalReservedCacheSize() == sec_capacity);
  }
#endif  // NDEBUG
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
}

bool CacheWithSecondaryAdapter::EvictionHandler(const Slice& key,
                                                Handle* handle) {
  auto helper = GetCacheItemHelper(handle);
  if (helper->IsSecondaryCacheCompatible()) {
    auto obj = target_->Value(handle);
    // Ignore dummy entry
    if (obj != kDummyObj) {
      // Spill into secondary cache.
      secondary_cache_->Insert(key, obj, helper).PermitUncheckedError();
    }
  }
  // Never takes ownership of obj
  return false;
}

bool CacheWithSecondaryAdapter::ProcessDummyResult(Cache::Handle** handle,
                                                   bool erase) {
  if (*handle && target_->Value(*handle) == kDummyObj) {
    target_->Release(*handle, erase);
    *handle = nullptr;
    return true;
  } else {
    return false;
  }
}

void CacheWithSecondaryAdapter::CleanupCacheObject(
    ObjectPtr obj, const CacheItemHelper* helper) {
  if (helper->del_cb) {
    helper->del_cb(obj, memory_allocator());
  }
}

Cache::Handle* CacheWithSecondaryAdapter::Promote(
    std::unique_ptr<SecondaryCacheResultHandle>&& secondary_handle,
    const Slice& key, const CacheItemHelper* helper, Priority priority,
    Statistics* stats, bool found_dummy_entry, bool kept_in_sec_cache) {
  assert(secondary_handle->IsReady());

  ObjectPtr obj = secondary_handle->Value();
  if (!obj) {
    // Nothing found.
    return nullptr;
  }
  // Found something.
  switch (helper->role) {
    case CacheEntryRole::kFilterBlock:
      RecordTick(stats, SECONDARY_CACHE_FILTER_HITS);
      break;
    case CacheEntryRole::kIndexBlock:
      RecordTick(stats, SECONDARY_CACHE_INDEX_HITS);
      break;
    case CacheEntryRole::kDataBlock:
      RecordTick(stats, SECONDARY_CACHE_DATA_HITS);
      break;
    default:
      break;
  }
  PERF_COUNTER_ADD(secondary_cache_hit_count, 1);
  RecordTick(stats, SECONDARY_CACHE_HITS);

177
  // Note: SecondaryCache::Size() is really charge (from the CreateCallback)
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
  size_t charge = secondary_handle->Size();
  Handle* result = nullptr;
  // Insert into primary cache, possibly as a standalone+dummy entries.
  if (secondary_cache_->SupportForceErase() && !found_dummy_entry) {
    // Create standalone and insert dummy
    // Allow standalone to be created even if cache is full, to avoid
    // reading the entry from storage.
    result =
        CreateStandalone(key, obj, helper, charge, /*allow_uncharged*/ true);
    assert(result);
    PERF_COUNTER_ADD(block_cache_standalone_handle_count, 1);

    // Insert dummy to record recent use
    // TODO: try to avoid case where inserting this dummy could overwrite a
    // regular entry
    Status s = Insert(key, kDummyObj, &kNoopCacheItemHelper, /*charge=*/0,
                      /*handle=*/nullptr, priority);
    s.PermitUncheckedError();
    // Nothing to do or clean up on dummy insertion failure
  } else {
    // Insert regular entry into primary cache.
    // Don't allow it to spill into secondary cache again if it was kept there.
    Status s = Insert(
        key, obj, kept_in_sec_cache ? helper->without_secondary_compat : helper,
        charge, &result, priority);
    if (s.ok()) {
      assert(result);
      PERF_COUNTER_ADD(block_cache_real_handle_count, 1);
    } else {
      // Create standalone result instead, even if cache is full, to avoid
      // reading the entry from storage.
      result =
          CreateStandalone(key, obj, helper, charge, /*allow_uncharged*/ true);
      assert(result);
      PERF_COUNTER_ADD(block_cache_standalone_handle_count, 1);
    }
  }
  return result;
}

218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
Status CacheWithSecondaryAdapter::Insert(const Slice& key, ObjectPtr value,
                                         const CacheItemHelper* helper,
                                         size_t charge, Handle** handle,
                                         Priority priority) {
  Status s = target_->Insert(key, value, helper, charge, handle, priority);
  if (s.ok() && value == nullptr && distribute_cache_res_) {
    size_t sec_charge = static_cast<size_t>(charge * (sec_cache_res_ratio_));
    s = secondary_cache_->Deflate(sec_charge);
    assert(s.ok());
    s = pri_cache_res_->UpdateCacheReservation(sec_charge, /*increase=*/false);
    assert(s.ok());
  }

  return s;
}

234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
Cache::Handle* CacheWithSecondaryAdapter::Lookup(const Slice& key,
                                                 const CacheItemHelper* helper,
                                                 CreateContext* create_context,
                                                 Priority priority,
                                                 Statistics* stats) {
  // NOTE: we could just StartAsyncLookup() and Wait(), but this should be a bit
  // more efficient
  Handle* result =
      target_->Lookup(key, helper, create_context, priority, stats);
  bool secondary_compatible = helper && helper->IsSecondaryCacheCompatible();
  bool found_dummy_entry =
      ProcessDummyResult(&result, /*erase=*/secondary_compatible);
  if (!result && secondary_compatible) {
    // Try our secondary cache
    bool kept_in_sec_cache = false;
    std::unique_ptr<SecondaryCacheResultHandle> secondary_handle =
        secondary_cache_->Lookup(key, helper, create_context, /*wait*/ true,
                                 found_dummy_entry, /*out*/ kept_in_sec_cache);
    if (secondary_handle) {
      result = Promote(std::move(secondary_handle), key, helper, priority,
                       stats, found_dummy_entry, kept_in_sec_cache);
    }
  }
  return result;
}

260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
bool CacheWithSecondaryAdapter::Release(Handle* handle,
                                        bool erase_if_last_ref) {
  if (erase_if_last_ref) {
    ObjectPtr v = target_->Value(handle);
    if (v == nullptr && distribute_cache_res_) {
      size_t charge = target_->GetCharge(handle);
      size_t sec_charge = static_cast<size_t>(charge * (sec_cache_res_ratio_));
      Status s = secondary_cache_->Inflate(sec_charge);
      assert(s.ok());
      s = pri_cache_res_->UpdateCacheReservation(sec_charge, /*increase=*/true);
      assert(s.ok());
    }
  }
  return target_->Release(handle, erase_if_last_ref);
}

276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378
Cache::ObjectPtr CacheWithSecondaryAdapter::Value(Handle* handle) {
  ObjectPtr v = target_->Value(handle);
  // TODO with stacked secondaries: might fail in EvictionHandler
  assert(v != kDummyObj);
  return v;
}

void CacheWithSecondaryAdapter::StartAsyncLookupOnMySecondary(
    AsyncLookupHandle& async_handle) {
  assert(!async_handle.IsPending());
  assert(async_handle.result_handle == nullptr);

  std::unique_ptr<SecondaryCacheResultHandle> secondary_handle =
      secondary_cache_->Lookup(async_handle.key, async_handle.helper,
                               async_handle.create_context, /*wait*/ false,
                               async_handle.found_dummy_entry,
                               /*out*/ async_handle.kept_in_sec_cache);
  if (secondary_handle) {
    // TODO with stacked secondaries: Check & process if already ready?
    async_handle.pending_handle = secondary_handle.release();
    async_handle.pending_cache = secondary_cache_.get();
  }
}

void CacheWithSecondaryAdapter::StartAsyncLookup(
    AsyncLookupHandle& async_handle) {
  target_->StartAsyncLookup(async_handle);
  if (!async_handle.IsPending()) {
    bool secondary_compatible =
        async_handle.helper &&
        async_handle.helper->IsSecondaryCacheCompatible();
    async_handle.found_dummy_entry |= ProcessDummyResult(
        &async_handle.result_handle, /*erase=*/secondary_compatible);

    if (async_handle.Result() == nullptr && secondary_compatible) {
      // Not found and not pending on another secondary cache
      StartAsyncLookupOnMySecondary(async_handle);
    }
  }
}

void CacheWithSecondaryAdapter::WaitAll(AsyncLookupHandle* async_handles,
                                        size_t count) {
  if (count == 0) {
    // Nothing to do
    return;
  }
  // Requests that are pending on *my* secondary cache, at the start of this
  // function
  std::vector<AsyncLookupHandle*> my_pending;
  // Requests that are pending on an "inner" secondary cache (managed somewhere
  // under target_), as of the start of this function
  std::vector<AsyncLookupHandle*> inner_pending;

  // Initial accounting of pending handles, excluding those already handled
  // by "outer" secondary caches. (See cur->pending_cache = nullptr.)
  for (size_t i = 0; i < count; ++i) {
    AsyncLookupHandle* cur = async_handles + i;
    if (cur->pending_cache) {
      assert(cur->IsPending());
      assert(cur->helper);
      assert(cur->helper->IsSecondaryCacheCompatible());
      if (cur->pending_cache == secondary_cache_.get()) {
        my_pending.push_back(cur);
        // Mark as "to be handled by this caller"
        cur->pending_cache = nullptr;
      } else {
        // Remember as potentially needing a lookup in my secondary
        inner_pending.push_back(cur);
      }
    }
  }

  // Wait on inner-most cache lookups first
  // TODO with stacked secondaries: because we are not using proper
  // async/await constructs here yet, there is a false synchronization point
  // here where all the results at one level are needed before initiating
  // any lookups at the next level. Probably not a big deal, but worth noting.
  if (!inner_pending.empty()) {
    target_->WaitAll(async_handles, count);
  }

  // For those that failed to find something, convert to lookup in my
  // secondary cache.
  for (AsyncLookupHandle* cur : inner_pending) {
    if (cur->Result() == nullptr) {
      // Not found, try my secondary
      StartAsyncLookupOnMySecondary(*cur);
      if (cur->IsPending()) {
        assert(cur->pending_cache == secondary_cache_.get());
        my_pending.push_back(cur);
        // Mark as "to be handled by this caller"
        cur->pending_cache = nullptr;
      }
    }
  }

  // Wait on all lookups on my secondary cache
  {
    std::vector<SecondaryCacheResultHandle*> my_secondary_handles;
    for (AsyncLookupHandle* cur : my_pending) {
      my_secondary_handles.push_back(cur->pending_handle);
    }
379
    secondary_cache_->WaitAll(std::move(my_secondary_handles));
380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405
  }

  // Process results
  for (AsyncLookupHandle* cur : my_pending) {
    std::unique_ptr<SecondaryCacheResultHandle> secondary_handle(
        cur->pending_handle);
    cur->pending_handle = nullptr;
    cur->result_handle = Promote(
        std::move(secondary_handle), cur->key, cur->helper, cur->priority,
        cur->stats, cur->found_dummy_entry, cur->kept_in_sec_cache);
    assert(cur->pending_cache == nullptr);
  }
}

std::string CacheWithSecondaryAdapter::GetPrintableOptions() const {
  std::string str = target_->GetPrintableOptions();
  str.append("  secondary_cache:\n");
  str.append(secondary_cache_->GetPrintableOptions());
  return str;
}

const char* CacheWithSecondaryAdapter::Name() const {
  // To the user, at least for now, configure the underlying cache with
  // a secondary cache. So we pretend to be that cache
  return target_->Name();
}
406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432

std::shared_ptr<Cache> NewTieredVolatileCache(
    TieredVolatileCacheOptions& opts) {
  if (!opts.cache_opts) {
    return nullptr;
  }

  std::shared_ptr<Cache> cache;
  if (opts.cache_type == PrimaryCacheType::kCacheTypeLRU) {
    LRUCacheOptions cache_opts =
        *(static_cast_with_check<LRUCacheOptions, ShardedCacheOptions>(
            opts.cache_opts));
    cache_opts.capacity += opts.comp_cache_opts.capacity;
    cache = cache_opts.MakeSharedCache();
  } else if (opts.cache_type == PrimaryCacheType::kCacheTypeHCC) {
    HyperClockCacheOptions cache_opts =
        *(static_cast_with_check<HyperClockCacheOptions, ShardedCacheOptions>(
            opts.cache_opts));
    cache = cache_opts.MakeSharedCache();
  } else {
    return nullptr;
  }
  std::shared_ptr<SecondaryCache> sec_cache;
  sec_cache = NewCompressedSecondaryCache(opts.comp_cache_opts);

  return std::make_shared<CacheWithSecondaryAdapter>(cache, sec_cache, true);
}
433
}  // namespace ROCKSDB_NAMESPACE