ob_direct_load_mem_loader.cpp 4.9 KB
Newer Older
O
obdev 已提交
1 2
// Copyright (c) 2022-present Oceanbase Inc. All Rights Reserved.
// Author:
3
//   suzhi.yt <>
O
obdev 已提交
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42

#define USING_LOG_PREFIX STORAGE

#include "storage/direct_load/ob_direct_load_mem_loader.h"
#include "storage/direct_load/ob_direct_load_external_block_reader.h"
#include "storage/direct_load/ob_direct_load_external_table.h"
#include "storage/direct_load/ob_direct_load_mem_sample.h"

namespace oceanbase
{
namespace storage
{
using namespace common;
using namespace blocksstable;

/**
 * ObDirectLoadMemLoader
 */

ObDirectLoadMemLoader::ObDirectLoadMemLoader(ObDirectLoadMemContext *mem_ctx)
  : mem_ctx_(mem_ctx)
{
}

ObDirectLoadMemLoader::~ObDirectLoadMemLoader()
{
}

int ObDirectLoadMemLoader::add_table(ObIDirectLoadPartitionTable *table)
{
  int ret = OB_SUCCESS;
  if (OB_UNLIKELY(nullptr == table)) {
    ret = OB_INVALID_ARGUMENT;
    LOG_WARN("invalid args", KR(ret), KP(table));
  } else {
    ObDirectLoadExternalTable *external_table = nullptr;
    if (OB_ISNULL(external_table = dynamic_cast<ObDirectLoadExternalTable *>(table))) {
      ret = OB_ERR_UNEXPECTED;
      LOG_WARN("unexpected table", KR(ret), KPC(table));
L
leftgeek 已提交
43
    } else if (OB_UNLIKELY(external_table->get_fragments().count() <= 0)) {
O
obdev 已提交
44
      ret = OB_INVALID_ARGUMENT;
L
leftgeek 已提交
45 46
      LOG_WARN("files handle should have at least one handle",
          KR(ret), K(external_table->get_fragments().count()));
O
obdev 已提交
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
    } else if (OB_FAIL(fragments_.push_back(external_table->get_fragments()))) {
      LOG_WARN("fail to push back", KR(ret));
    }
  }
  return ret;
}

int ObDirectLoadMemLoader::work()
{
  typedef ObDirectLoadExternalBlockReader<ObDirectLoadExternalMultiPartitionRow> ExternalReader;
  int ret = OB_SUCCESS;
  const ObDirectLoadExternalMultiPartitionRow *external_row = nullptr;
  ChunkType *chunk = nullptr;
  RowType row;
  for (int64_t i = 0; OB_SUCC(ret) && i < fragments_.count(); i++) {
L
leftgeek 已提交
62
    ObDirectLoadExternalFragment &fragment = fragments_.at(i);
O
obdev 已提交
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
    ExternalReader external_reader;
    if (OB_FAIL(external_reader.init(mem_ctx_->table_data_desc_.external_data_block_size_,
                                     fragment.max_data_block_size_,
                                     mem_ctx_->table_data_desc_.compressor_type_))) {
      LOG_WARN("fail to init external reader", KR(ret));
    } else if (OB_FAIL(external_reader.open(fragment.file_handle_, 0, fragment.file_size_))) {
      LOG_WARN("fail to open file", KR(ret));
    }
    while (OB_SUCC(ret) && !(mem_ctx_->has_error_)) {
      if (external_row == nullptr) {
        if (OB_FAIL(external_reader.get_next_item(external_row))) {
          if (OB_UNLIKELY(OB_ITER_END != ret)) {
            LOG_WARN("fail to get next item", KR(ret));
          } else {
            ret = OB_SUCCESS;
            break;
          }
        }
      }
      if (OB_SUCC(ret) && chunk == nullptr) {
        //等待内存空出
        while (mem_ctx_->fly_mem_chunk_count_ >= mem_ctx_->table_data_desc_.max_mem_chunk_count_ &&
               !(mem_ctx_->has_error_)) {
          usleep(500000);
        }
        if (mem_ctx_->has_error_) {
          ret = OB_INVALID_ARGUMENT;
          LOG_WARN("some error ocurr", KR(ret));
        }
        if (OB_SUCC(ret)) {
93
          chunk = OB_NEW(ChunkType, ObMemAttr(MTL_ID(), "TLD_MemChunkVal"));
O
obdev 已提交
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
          if (chunk == nullptr) {
            ret = OB_ALLOCATE_MEMORY_FAILED;
            LOG_WARN("fail to allocate mem", KR(ret));
          } else {
            ATOMIC_AAF(&(mem_ctx_->fly_mem_chunk_count_), 1);
            if (OB_FAIL(chunk->init(MTL_ID(), mem_ctx_->table_data_desc_.mem_chunk_size_))) {
              LOG_WARN("fail to init external sort", KR(ret));
            }
          }
        }
      }

      if (OB_SUCC(ret)) {
        row = *external_row;
        ret = chunk->add_item(row);
        if (ret == OB_BUF_NOT_ENOUGH) {
          ret = OB_SUCCESS;
          if (OB_FAIL(close_chunk(chunk))) {
            LOG_WARN("fail to close chunk", KR(ret));
          }
        } else if (ret != OB_SUCCESS) {
          LOG_WARN("fail to add item", KR(ret));
        } else {
          external_row = nullptr;
        }
      }
    }
L
leftgeek 已提交
121 122 123
    if (OB_SUCC(ret)) {
      fragment.reset();
    }
O
obdev 已提交
124 125 126
  }

  if (OB_SUCC(ret)) {
127 128
    if (chunk != nullptr && OB_FAIL(close_chunk(chunk))) {
      LOG_WARN("fail to close chunk", KR(ret));
O
obdev 已提交
129 130 131
    }
  }

132 133 134 135 136 137
  if (chunk != nullptr) {
    chunk->~ChunkType();
    ob_free(chunk);
    chunk = nullptr;
  }

O
obdev 已提交
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
  return ret;
}

int ObDirectLoadMemLoader::close_chunk(ChunkType *&chunk)
{
  int ret = OB_SUCCESS;
  CompareType compare;
  if (OB_FAIL(compare.init(*(mem_ctx_->datum_utils_)))) {
    LOG_WARN("fail to init compare", KR(ret));
  } else if (OB_FAIL(chunk->sort(compare))) {
    LOG_WARN("fail to sort chunk", KR(ret));
  } else if (OB_FAIL(mem_ctx_->mem_chunk_queue_.push(chunk))) {
    LOG_WARN("fail to push", KR(ret));
  } else {
    chunk = nullptr;
  }

  if (chunk != nullptr) {
    chunk->~ChunkType();
    ob_free(chunk);
    chunk = nullptr;
  }
  return ret;
}

} // namespace storage
} // namespace oceanbase