提交 d7a397a9 编写于 作者: J JLY2015 提交者: ob-robot

[vector index] handle hybird vector index calling LLM error

上级 a68ddb2d
......@@ -20,7 +20,8 @@ namespace common {
#define DECL_EVAL_MACRO(macro, args...) macro(args)
#define DECL_ATTR_LIST(M) \
DECL_EVAL_MACRO(M, Section, ROOT_SERVICE, LOAD_BALANCE, DAILY_MERGE, LOCATION_CACHE, \
SSTABLE, LOGSERVICE, CACHE, TRANS, TENANT, RPC, OBPROXY, OBSERVER, RESOURCE_LIMIT); \
SSTABLE, LOGSERVICE, CACHE, TRANS, TENANT, RPC, OBPROXY, OBSERVER, \
RESOURCE_LIMIT, AI); \
DECL_EVAL_MACRO(M, Scope, CLUSTER, TENANT); \
DECL_EVAL_MACRO(M, Source, DEFAULT, FILE, OBADMIN, CMDLINE, CLUSTER, TENANT); \
DECL_EVAL_MACRO(M, Session, NO, YES); \
......
......@@ -2291,6 +2291,16 @@ DEF_BOOL(_enable_persistent_compiled_routine, OB_CLUSTER_PARAMETER, "true",
"The default value is TRUE. Value: TRUE: turned on FALSE: turned off",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
// AI / LLM
DEF_TIME(model_request_timeout, OB_TENANT_PARAMETER, "60s", "[1s,)",
"Used to control the HTTP timeout for accessing the model. Especially, the default value is 60s.",
ObParameterAttr(Section::AI, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_INT(model_max_retries, OB_TENANT_PARAMETER, "2", "[1,)",
"Used to control the retry times after a failed model interaction. Especially, the default value is 2",
ObParameterAttr(Section::AI, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_STR_WITH_CHECKER(sql_protocol_min_tls_version, OB_CLUSTER_PARAMETER, "none",
common::ObConfigSQLTlsVersionChecker,
"SQL SSL control options, used to specify the minimum SSL/TLS version number. "
......
......@@ -507,7 +507,19 @@ int ObHybridVectorRefreshTask::prepare_for_embedding(ObPluginVectorIndexAdaptor
ObCollationType col_type = CS_TYPE_INVALID;
int64_t dim = 0;
int64_t loop_cnt = 0;
uint64_t timeout_us = ObTimeUtility::current_time() + ObInsertLobColumnHelper::LOB_TX_TIMEOUT;
int64_t http_timeout_us = 0;
int64_t http_max_retries = 0;
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
if (tenant_config.is_valid()) {
http_timeout_us = tenant_config->model_request_timeout;
http_max_retries = tenant_config->model_max_retries;
} else {
SHARE_LOG_RET(WARN, OB_INVALID_CONFIG, "init model request timeout and max retries config with default value");
http_timeout_us = 60 * 1000 * 1000; // 60 seconds
http_max_retries = 2;
}
if (OB_ISNULL(task_ctx)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error", K(ret), KPC(task_ctx));
......@@ -647,7 +659,7 @@ int ObHybridVectorRefreshTask::prepare_for_embedding(ObPluginVectorIndexAdaptor
} else if (OB_FAIL(ob_write_string(task_ctx->allocator_, endpoint->get_url(), url, true))) {
LOG_WARN("fail to write string", K(ret));
} else if (OB_FAIL(task_ctx->embedding_task_->init(url, endpoint->get_request_model_name(),
endpoint->get_provider(), access_key, chunk_array, col_type, dim, timeout_us))) {
endpoint->get_provider(), access_key, chunk_array, col_type, dim, http_timeout_us, http_max_retries))) {
LOG_WARN("failed to init embedding task", K(ret), KPC(endpoint));
} else {
ObEmbeddingTaskHandler *embedding_handler = nullptr;
......
......@@ -67,14 +67,14 @@ const ObString ObEmbeddingTask::USER_KEY_NAME = "user_key";
const ObString ObEmbeddingTask::INPUT_NAME = "input";
const ObString ObEmbeddingTask::DIMENSIONS_NAME = "dimensions";
const int64_t ObEmbeddingTask::HTTP_REQUEST_TIMEOUT = 20 * 1000 * 1000; // default http request timeout. 20 seconds
const int64_t ObEmbeddingTask::HTTP_REQUEST_TIMEOUT = 60 * 1000 * 1000; // default http request timeout. 60 seconds
// Reschedule related constants
const int64_t ObEmbeddingTask::MAX_RESCHEDULE_RETRY_CNT = 3;
const int64_t ObEmbeddingTask::RESCHEDULE_RETRY_INTERVAL_US = 100 * 1000; // 100ms
// HTTP retry related constants
const int64_t ObEmbeddingTask::MAX_HTTP_RETRY_CNT = 3;
const int64_t ObEmbeddingTask::MAX_HTTP_RETRY_CNT = 2;
const int64_t ObEmbeddingTask::HTTP_RETRY_BASE_INTERVAL_US = 1 * 1000 * 1000; // 1 second
const int64_t ObEmbeddingTask::HTTP_RETRY_MAX_INTERVAL_US = 10 * 1000 * 1000; // 10 seconds
const int64_t ObEmbeddingTask::HTTP_RETRY_MULTIPLIER = 2;
......@@ -171,7 +171,7 @@ ObEmbeddingTask::ObEmbeddingTask() : local_allocator_("EmbeddingTask", OB_MALLOC
internal_error_message_(),
task_lock_(),
batch_size_(10),
current_batch_idx_(),
current_batch_idx_(0),
http_send_time_us_(0),
http_response_data_size_(0),
http_response_data_(nullptr),
......@@ -184,6 +184,8 @@ ObEmbeddingTask::ObEmbeddingTask() : local_allocator_("EmbeddingTask", OB_MALLOC
http_total_retry_count_(0),
http_retry_start_time_us_(0),
http_last_retry_time_us_(0),
http_max_retry_count_(0),
wait_for_completion_timeout_us_(0),
need_retry_flag_(false),
original_batch_size_(batch_size_),
batch_size_adjusted_(false),
......@@ -219,7 +221,7 @@ ObEmbeddingTask::ObEmbeddingTask(ObArenaAllocator &allocator) : local_allocator_
internal_error_message_(),
task_lock_(),
batch_size_(10),
current_batch_idx_(),
current_batch_idx_(0),
http_send_time_us_(0),
http_response_data_size_(0),
http_response_data_(nullptr),
......@@ -232,6 +234,8 @@ ObEmbeddingTask::ObEmbeddingTask(ObArenaAllocator &allocator) : local_allocator_
http_total_retry_count_(0),
http_retry_start_time_us_(0),
http_last_retry_time_us_(0),
http_max_retry_count_(0),
wait_for_completion_timeout_us_(0),
need_retry_flag_(false),
original_batch_size_(batch_size_),
batch_size_adjusted_(false),
......@@ -253,15 +257,19 @@ int ObEmbeddingTask::init(const ObString &model_url,
const ObCollationType col_type,
int64_t dimension,
int64_t http_timeout_us,
int64_t http_max_retries,
storage::ObEmbeddingIOCallbackHandle *cb_handle)
{
int ret = OB_SUCCESS;
if (is_inited_) {
ret = OB_INIT_TWICE;
LOG_WARN("ObEmbeddingTask already inited", K(ret), K(model_url), K(model_name), K(user_key), K(input_chunks));
} else if (http_timeout_us <= 0 || http_max_retries <= 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid http timeout", K(ret), K(http_timeout_us), K(http_max_retries));
} else if (OB_FAIL(input_chunks_.assign(input_chunks))) {
LOG_WARN("failed to assign input chunks", K(ret), K(input_chunks));
} else if (OB_FAIL(init_curl_handler(model_url, user_key))) {
} else if (OB_FAIL(init_curl_handler(model_url, user_key, http_timeout_us))) {
LOG_WARN("failed to init curl handler", K(ret), K(model_url), K(user_key));
} else if (OB_FAIL(task_cond_.init(ObWaitEventIds::DEFAULT_COND_WAIT))) {
LOG_WARN("failed to init completion cond", K(ret));
......@@ -297,13 +305,15 @@ int ObEmbeddingTask::init(const ObString &model_url,
}
output_vectors_.prepare_allocate_and_keep_count(total_chunks_);
if (http_timeout_us > 0) {
http_timeout_us_ = http_timeout_us;
} else {
http_timeout_us_ = HTTP_REQUEST_TIMEOUT;
}
// Initialize retry related variables
http_timeout_us_ = http_timeout_us;
http_max_retry_count_ = http_max_retries;
// task total timeout for all retries
wait_for_completion_timeout_us_ = (http_max_retry_count_ + 1) * http_timeout_us_ * input_chunks.count() / batch_size_ +
(http_max_retry_count_ + 1) * HTTP_RETRY_MAX_INTERVAL_US;
http_retry_start_time_us_ = 0;
http_last_retry_time_us_ = 0;
http_total_retry_count_ = 0;
......@@ -312,7 +322,7 @@ int ObEmbeddingTask::init(const ObString &model_url,
current_batch_size_ = batch_size_;
successful_requests_count_ = 0;
col_type_ = col_type;
LOG_DEBUG("task initialized successfully", K(user_key_), K(task_id_), K(dimension_));
LOG_DEBUG("task initialized successfully", K(user_key_), K(task_id_), K(dimension_), K(http_max_retry_count_), K(http_timeout_us));
}
return ret;
......@@ -588,6 +598,7 @@ int ObEmbeddingTask::check_async_progress()
LOG_WARN("task not started yet", K(ret));
} else if (current_phase == OB_EMBEDDING_TASK_DONE) {
ret = OB_SUCCESS;
LOG_DEBUG("check_async_progress", K(current_phase), K(curl_request_in_progress_));
} else if (current_phase == OB_EMBEDDING_TASK_HTTP_SENT) {
if (OB_FAIL(check_http_progress())) {
if (ret == OB_NEED_RETRY) {
......@@ -608,6 +619,10 @@ int ObEmbeddingTask::check_async_progress()
LOG_WARN("failed to handle retry failure", K(ret));
}
}
// reset flag
if (OB_SUCC(ret)) {
need_retry_flag_ = false;
}
} else {
// not time to retry yet, continue waiting
ret = OB_SUCCESS;
......@@ -625,14 +640,9 @@ int ObEmbeddingTask::check_async_progress()
if (OB_FAIL(set_phase(OB_EMBEDDING_TASK_HTTP_COMPLETED))) {
LOG_WARN("failed to set phase to HTTP_COMPLETED", K(ret));
}
} else {
int64_t current_time = ObTimeUtility::current_time();
int64_t elapsed_time = current_time - http_send_time_us_;
if (elapsed_time > http_timeout_us_) {
if (OB_FAIL(complete_task(OB_EMBEDDING_TASK_DONE, OB_TIMEOUT, true))) {
LOG_WARN("failed to handle task failure", K(ret));
}
LOG_WARN("HTTP request timeout", K(elapsed_time), K(http_timeout_us_), K(*this));
} else { // http not response
if (REACH_TIME_INTERVAL(10L * 1000L * 1000L)) { //10s
LOG_INFO("wait http response", K(ret), K(*this));
}
}
} else if (current_phase == OB_EMBEDDING_TASK_HTTP_COMPLETED) {
......@@ -821,7 +831,18 @@ int ObEmbeddingTask::check_http_progress()
}
http_last_retry_time_us_ = ObTimeUtility::current_time();
}
LOG_WARN("curl request error, need retry", K(ret), K(need_retry_flag_), K(http_retry_count_), K(http_max_retry_count_));
}
} else if (res == CURLE_OPERATION_TIMEDOUT) { // curl timeout
if (++http_retry_count_ < http_max_retry_count_) {
need_retry_flag_ = true;
http_total_retry_count_++;
http_last_retry_time_us_ = ObTimeUtility::current_time();
ret = OB_NEED_RETRY;
} else {
ret = OB_TIMEOUT;
}
LOG_WARN("curl request timeot, need retry", K(ret), K(need_retry_flag_), K(http_retry_count_), K(http_max_retry_count_));
} else {
ret = OB_CURL_ERROR;
LOG_WARN("curl request failed", K(ret), K(res), K(*this));
......@@ -1020,7 +1041,6 @@ int ObEmbeddingTask::do_work(ThreadPoolType *thread_pool)
}
}
}
return ret;
}
......@@ -1460,7 +1480,7 @@ bool ObEmbeddingTask::should_retry_http_request(int64_t http_error_code) const
case 502:
case 503:
case 504:
return http_retry_count_ < MAX_HTTP_RETRY_CNT;
return http_retry_count_ < http_max_retry_count_;
default:
return false;
}
......@@ -1614,13 +1634,16 @@ int ObEmbeddingTask::maybe_callback()
return ret;
}
int ObEmbeddingTask::init_curl_handler(const ObString &model_url, const ObString &user_key)
int ObEmbeddingTask::init_curl_handler(const ObString &model_url, const ObString &user_key, const int64_t http_timeout_us)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(curl_multi_handle_ || curl_easy_handle_)) {
ret = OB_INIT_TWICE;
LOG_WARN("curl handles already initialized", K(ret), KPC(this));
}else if (OB_ISNULL(curl_multi_handle_ = curl_multi_init())) {
} else if (http_timeout_us <=0 ) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid http_timeout_us", K(ret), K(http_timeout_us));
} else if (OB_ISNULL(curl_multi_handle_ = curl_multi_init())) {
ret = OB_CURL_ERROR;
LOG_WARN("failed to init curl multi handle", K(ret));
} else if (OB_ISNULL(curl_easy_handle_ = curl_easy_init())) {
......@@ -1652,8 +1675,8 @@ int ObEmbeddingTask::init_curl_handler(const ObString &model_url, const ObString
} else {
curl_easy_setopt(curl_easy_handle_, CURLOPT_WRITEDATA, (void *)curl_response_data_);
curl_easy_setopt(curl_easy_handle_, CURLOPT_TIMEOUT, HTTP_REQUEST_TIMEOUT / 1000);
curl_easy_setopt(curl_easy_handle_, CURLOPT_CONNECTTIMEOUT, HTTP_REQUEST_TIMEOUT / 1000);
curl_easy_setopt(curl_easy_handle_, CURLOPT_TIMEOUT_MS, http_timeout_us / 1000);
curl_easy_setopt(curl_easy_handle_, CURLOPT_CONNECTTIMEOUT_MS, http_timeout_us / 1000);
CURLMcode multi_res = curl_multi_add_handle(curl_multi_handle_, curl_easy_handle_);
if (multi_res != CURLM_OK) {
......@@ -1663,11 +1686,12 @@ int ObEmbeddingTask::init_curl_handler(const ObString &model_url, const ObString
}
}
}
return ret;
}
int ObEmbeddingTask::wait_for_completion(const int64_t timeout_ms)
int ObEmbeddingTask::wait_for_completion()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
......@@ -1678,7 +1702,7 @@ int ObEmbeddingTask::wait_for_completion(const int64_t timeout_ms)
if (callback_done_) {
// do nothing
} else {
if (OB_FAIL(task_cond_.wait(timeout_ms))) {
if (OB_FAIL(task_cond_.wait(wait_for_completion_timeout_us_ / 1000))) {
LOG_WARN("failed to wait for completion", K(ret));
}
}
......
......@@ -168,6 +168,7 @@ class ObEmbeddingTask
const ObCollationType col_type,
int64_t dimension,
int64_t http_timeout_us,
int64_t http_max_retries,
storage::ObEmbeddingIOCallbackHandle *cb_handle = nullptr);
template <typename ThreadPoolType>
int do_work(ThreadPoolType *thread_pool);
......@@ -195,7 +196,7 @@ class ObEmbeddingTask
// 公共方法用于外部设置任务失败
int mark_task_failed(int error_code);
int maybe_callback();
int wait_for_completion(const int64_t timeout_ms = 0);
int wait_for_completion();
int wake_up();
void disable_callback();
void set_callback_done();
......@@ -260,7 +261,7 @@ private:
void reset_retry_state();
int map_http_error_to_internal_error(int64_t http_error_code) const;
void try_increase_batch_size();
int init_curl_handler(const ObString &model_url, const ObString &user_key);
int init_curl_handler(const ObString &model_url, const ObString &user_key, const int64_t http_timeout_us);
struct HttpResponseData {
HttpResponseData(ObIAllocator &allocator) : data(nullptr), size(0), allocator(allocator) {}
......@@ -358,6 +359,9 @@ private:
int64_t http_total_retry_count_;
int64_t http_retry_start_time_us_;
int64_t http_last_retry_time_us_;
int64_t http_max_retry_count_;
int64_t wait_for_completion_timeout_us_; // For controlling the maximum timeout of waiting for completion
bool need_retry_flag_;
// Batch size adjustment for retry
......
......@@ -1804,13 +1804,13 @@ int ObHNSWEmbeddingOperator::init(const ObTabletID &tablet_id)
}
if (OB_SUCC(ret)) {
if (OB_FAIL(embedmgr_->init(model_id_, http_timeout_us_, col_type))) {
if (OB_FAIL(embedmgr_->init(model_id_, col_type))) {
embedmgr_->~ObEmbeddingTaskMgr();
op_allocator_.free(embedmgr_);
embedmgr_ = nullptr;
LOG_WARN("failed to init embedding task manager", K(ret));
} else {
batch_size_ = 64; // TODO(fanfangyao.ffy):待调参
batch_size_ = 64; // TODO(fanfangyao.ffy): To be tuned
void *batch_buf = ob_malloc(sizeof(ObTaskBatchInfo), ObMemAttr(MTL_ID(), "TaskBatch"));
if (OB_ISNULL(batch_buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
......@@ -1845,7 +1845,6 @@ int ObHNSWEmbeddingOperator::execute(const ObChunk &input_chunk,
int ret = OB_SUCCESS;
output_chunk.reset();
result_state = ObPipelineOperator::NEED_MORE_INPUT;
int64_t wait_timeout_us = http_timeout_us_;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
......@@ -1876,7 +1875,7 @@ int ObHNSWEmbeddingOperator::execute(const ObChunk &input_chunk,
//wait for task completion
if (OB_SUCC(ret)) {
if (OB_FAIL(embedmgr_->wait_for_completion(wait_timeout_us))) {
if (OB_FAIL(embedmgr_->wait_for_completion())) {
LOG_WARN("wait for completion failed", K(ret));
} else if (OB_FAIL(get_ready_results(output_chunk, result_state))) {
LOG_WARN("get ready results failed", K(ret));
......
......@@ -752,7 +752,7 @@ public:
explicit ObHNSWEmbeddingOperator(ObPipeline *pipeline)
: ObVectorIndexBaseOperator(pipeline), embedmgr_(nullptr), vec_dim_(-1), rowkey_cnt_(-1),
text_col_idx_(-1), is_inited_(false), error_ret_code_(OB_SUCCESS),
batch_size_(0), current_batch_(nullptr), http_timeout_us_(20 * 1000 * 1000) /* 20s */
batch_size_(0), current_batch_(nullptr)
{}
~ObHNSWEmbeddingOperator();
int init(const ObTabletID &tablet_id);
......@@ -798,7 +798,6 @@ private:
blocksstable::ObBatchDatumRows *cur_datum_rows_;
int64_t cur_row_in_batch_;
bool chunk_exhausted_;
int64_t http_timeout_us_;
DISALLOW_COPY_AND_ASSIGN(ObHNSWEmbeddingOperator);
};
......
......@@ -299,14 +299,14 @@ int ObTaskSlotRing::init(const int64_t capacity)
int ret = OB_SUCCESS;
ObSpinLockGuard guard(lock_);
if (capacity <= 1) {
if (capacity <= 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid capacity", K(ret), K(capacity));
} else if (slots_.count() > 0) {
ret = OB_INIT_TWICE;
LOG_WARN("slot ring already initialized", K(ret));
} else {
capacity_ = capacity;
capacity_ = capacity + 1; // +1 for the extra slot to differentiate between full and empty queue
if (OB_FAIL(slots_.prepare_allocate(capacity_))) {
LOG_WARN("prepare allocate slots failed", K(ret), K(capacity_));
} else {
......@@ -433,14 +433,13 @@ void ObTaskSlotRing::clean_all_slots()
}
}
int ObTaskSlotRing::wait_all_tasks_finished(const int64_t timeout_us)
int ObTaskSlotRing::wait_all_tasks_finished()
{
int ret = OB_SUCCESS;
const int64_t timeout_ms = timeout_us / 1000;
for (int64_t i = 0; OB_SUCC(ret) && i < slots_.count(); ++i) {
Slot &slot = slots_.at(i);
if (OB_NOT_NULL(slot.task_)) {
if (OB_FAIL(slot.task_->wait_for_completion(timeout_ms))) {
if (OB_FAIL(slot.task_->wait_for_completion())) {
LOG_WARN("wait for task completion failed", K(ret), K(i));
}
}
......@@ -448,7 +447,7 @@ int ObTaskSlotRing::wait_all_tasks_finished(const int64_t timeout_us)
return ret;
}
int ObTaskSlotRing::wait_for_head_completion(const int64_t timeout_us)
int ObTaskSlotRing::wait_for_head_completion()
{
int ret = OB_SUCCESS;
share::ObEmbeddingTask *task_to_wait = nullptr;
......@@ -464,8 +463,8 @@ int ObTaskSlotRing::wait_for_head_completion(const int64_t timeout_us)
}
if (OB_NOT_NULL(task_to_wait)) {
if (OB_FAIL(task_to_wait->wait_for_completion(timeout_us / 1000))) {
LOG_WARN("wait for head embedding task completion failed", K(ret), K(timeout_us));
if (OB_FAIL(task_to_wait->wait_for_completion())) {
LOG_WARN("wait for head embedding task completion failed", K(ret));
}
}
return ret;
......@@ -547,8 +546,7 @@ ObEmbeddingTaskMgr::~ObEmbeddingTaskMgr()
int ret = OB_SUCCESS;
if (is_inited_) {
slot_ring_.disable_all_callbacks();
const int64_t timeout_us = http_timeout_us_;
if (OB_FAIL(slot_ring_.wait_all_tasks_finished(timeout_us))) {
if (OB_FAIL(slot_ring_.wait_all_tasks_finished())) {
LOG_WARN("failed to wait for all tasks to finish", K(ret));
}
slot_ring_.clean_all_slots();
......@@ -556,7 +554,7 @@ ObEmbeddingTaskMgr::~ObEmbeddingTaskMgr()
}
}
int ObEmbeddingTaskMgr::init(const ObString &model_id, const int64_t http_timeout_us, const ObCollationType col_type)
int ObEmbeddingTaskMgr::init(const ObString &model_id, const ObCollationType col_type)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
......@@ -575,10 +573,20 @@ int ObEmbeddingTaskMgr::init(const ObString &model_id, const int64_t http_timeou
}
if (OB_SUCC(ret)) {
// TODO(fanfangyao.ffy): 待调参
http_timeout_us_ = http_timeout_us;
cs_type_ = col_type;
const int64_t reserve_slots = ring_capacity_ > 0 ? ring_capacity_ : 5;
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
if (tenant_config.is_valid()) {
model_request_timeout_us_ = tenant_config->model_request_timeout;
model_max_retries_ = tenant_config->model_max_retries;
} else {
SHARE_LOG_RET(WARN, OB_INVALID_CONFIG, "init model request timeout and max retries config with default value");
model_request_timeout_us_ = 60 * 1000 * 1000; // 60 seconds
model_max_retries_ = 2;
}
}
if (OB_SUCC(ret)) {
const int64_t reserve_slots = ring_capacity_;
if (OB_FAIL(slot_ring_.init(reserve_slots))) {
LOG_WARN("init slot ring failed", K(ret), K(reserve_slots));
} else {
......@@ -649,7 +657,7 @@ int ObEmbeddingTaskMgr::submit_batch_info(ObTaskBatchInfo *&batch_info)
task = new (task_mem) share::ObEmbeddingTask();
const int64_t vec_dim = results.at(0)->get_vector_dim();
if (OB_FAIL(task->init(cfg_.model_url_, cfg_.model_name_, cfg_.provider_,
cfg_.user_key_, texts, cs_type_, vec_dim, http_timeout_us_, cb_handle))) {
cfg_.user_key_, texts, cs_type_, vec_dim, model_request_timeout_us_, model_max_retries_, cb_handle))) {
LOG_WARN("failed to initialize EmbeddingTask", K(ret));
}
}
......@@ -715,7 +723,7 @@ int ObEmbeddingTaskMgr::get_ready_batch_info(ObTaskBatchInfo *&batch_info, int &
return ret;
}
//TODO(fanfangyao.ffy): 在vectorindexctx处获取有已知bug,修复后将该流程放到vectorindexctx处
//TODO(fanfangyao.ffy): Move this process to vectorindexctx
int ObEmbeddingTaskMgr::get_ai_config(const common::ObString &model_id)
{
int ret = OB_SUCCESS;
......@@ -769,13 +777,13 @@ int ObEmbeddingTaskMgr::mark_task_ready(const int64_t slot_idx, const int ret_co
return ret;
}
int ObEmbeddingTaskMgr::wait_for_completion(const int64_t timeout_us)
int ObEmbeddingTaskMgr::wait_for_completion()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("embedding task mgr not inited", K(ret));
} else if (OB_FAIL(slot_ring_.wait_for_head_completion(timeout_us))) {
} else if (OB_FAIL(slot_ring_.wait_for_head_completion())) {
LOG_WARN("wait for head completion failed", K(ret));
}
return ret;
......
......@@ -158,7 +158,7 @@ public:
class ObTaskSlotRing
{
public:
ObTaskSlotRing() : lock_(), capacity_(1), slots_(), next_idx_(0), head_idx_(0) {}
ObTaskSlotRing() : lock_(), capacity_(0), slots_(), next_idx_(0), head_idx_(0) {}
~ObTaskSlotRing();
int init(const int64_t capacity);
......@@ -168,14 +168,14 @@ public:
int mark_ready(const int64_t slot_idx, const int ret_code);
// Pop ready batch_info
int pop_ready_in_order(ObTaskBatchInfo *&batch_info, int &ret_code);
int wait_for_head_completion(const int64_t timeout_us);
int wait_for_head_completion();
void set_task(const int64_t slot_idx, share::ObEmbeddingTask *task);
void set_batch_info(const int64_t slot_idx, ObTaskBatchInfo *batch_info);
// Cleanup operations
void disable_all_callbacks();
void clean_all_slots();
int wait_all_tasks_finished(const int64_t timeout_us);
int wait_all_tasks_finished();
TO_STRING_KV(K_(capacity), K_(next_idx), K_(head_idx));
......@@ -238,14 +238,14 @@ class ObEmbeddingTaskMgr
{
public:
ObEmbeddingTaskMgr() : allocator_("EmbedTaskMgr", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
embedding_handler_(nullptr), slot_ring_(), ring_capacity_(9),
cfg_(), is_inited_(false), is_failed_(false), http_timeout_us_(0), cs_type_(CS_TYPE_INVALID) {}
embedding_handler_(nullptr), slot_ring_(), ring_capacity_(8),
cfg_(), is_inited_(false), is_failed_(false), cs_type_(CS_TYPE_INVALID) {}
~ObEmbeddingTaskMgr();
int init(const common::ObString &model_id, const int64_t http_timeout_us, const ObCollationType cs_type);
int init(const common::ObString &model_id, const ObCollationType cs_type);
int submit_batch_info(ObTaskBatchInfo *&batch_info);
int get_ready_batch_info(ObTaskBatchInfo *&batch_info, int &error_ret_code);
int mark_task_ready(const int64_t slot_idx, const int ret_code);
int wait_for_completion(const int64_t timeout_ms = 0);
int wait_for_completion();
bool get_failed() const { return is_failed_; }
TO_STRING_KV(K_(ring_capacity), K_(slot_ring), K_(cfg), K_(is_inited));
......@@ -258,12 +258,13 @@ private:
ObArenaAllocator allocator_;
share::ObEmbeddingTaskHandler *embedding_handler_;
ObTaskSlotRing slot_ring_; // Ring buffer for task slots
int64_t ring_capacity_; // TODO(fanfangyao.ffy): 待调参
int64_t ring_capacity_; // TODO(fanfangyao.ffy): To be tuned
ObEmbeddingConfig cfg_;
bool is_inited_;
bool is_failed_;
int64_t http_timeout_us_;
ObCollationType cs_type_;
int64_t model_request_timeout_us_; //For controlling the maximum timeout of calling model http service
int64_t model_max_retries_; //For controlling the maximum retries of calling model http service
DISALLOW_COPY_AND_ASSIGN(ObEmbeddingTaskMgr);
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册