From 8160997f02e65ae006cb1a86713e00bb7c2a42e5 Mon Sep 17 00:00:00 2001 From: ly0 Date: Fri, 23 Jul 2021 17:33:21 +0800 Subject: [PATCH] patch some bug fix:part 2 --- deps/oblib/src/lib/lock/ob_scond.h | 52 ++++++++-------- deps/oblib/src/lib/queue/ob_priority_queue.h | 54 ++++++++++------- .../lib/queue/test_priority_queue.cpp | 59 +++++++++++++------ src/observer/omt/ob_tenant.cpp | 6 +- src/observer/omt/ob_tenant.h | 2 +- 5 files changed, 109 insertions(+), 64 deletions(-) diff --git a/deps/oblib/src/lib/lock/ob_scond.h b/deps/oblib/src/lib/lock/ob_scond.h index 2660f8f15b..ba8839fd81 100644 --- a/deps/oblib/src/lib/lock/ob_scond.h +++ b/deps/oblib/src/lib/lock/ob_scond.h @@ -137,22 +137,21 @@ public: } }; -struct SCond { +template +struct SCondTemp { public: typedef SimpleCond CondPerCpu; typedef SCondReadyFlag Lock; typedef SCondCounter Counter; typedef SCondSimpleIdGen IdGen; enum { CPU_COUNT = OB_MAX_CPU_NUM, COND_COUNT = CPU_COUNT, LOOP_LIMIT = 8 }; - SCond() - {} - ~SCond() - {} - void signal(uint32_t x = 1) + void signal(uint32_t x = 1, int prio=0) { - uint32_t v = conds_[id_gen_.get() % COND_COUNT].signal(x); - if (v < x) { - n2wakeup_.add(x - v); + for (int p = PRIO-1; p >= prio && x > 0; p--) { + x -= conds_[id_gen_.get() % COND_COUNT][p].signal(x); + } + if (x > 0) { + n2wakeup_.add(x); lock_.set_ready(); int64_t loop_cnt = 0; while (loop_cnt++ < LOOP_LIMIT && lock_.lock()) { @@ -164,11 +163,12 @@ public: } } } - void prepare() + void prepare(int prio=0) { uint32_t id = 0; - uint32_t key = get_key(id); - get_wait_key() = ((uint64_t)id << 32) + key; + uint32_t key = get_key(prio, id); + id += (prio << 16); + get_wait_key() = ((uint64_t)id<<32) + key; } void wait(int64_t timeout) { @@ -177,15 +177,14 @@ public: } protected: - uint32_t get_key(uint32_t& id) - { - return conds_[id = (id_gen_.next() % COND_COUNT)].get_key(); + uint32_t get_key(int prio, uint32_t& id) + { + return conds_[id = (id_gen_.next() % COND_COUNT)][prio].get_key(); } void wait(uint32_t id, uint32_t key, int64_t timeout) { - conds_[id % COND_COUNT].wait(key, timeout); + conds_[((uint16_t)id) % COND_COUNT][id >> 16].wait(key, timeout); } - private: static uint64_t& get_wait_key() { @@ -194,20 +193,27 @@ private: } void do_wakeup() { - uint32_t n2wakeup = n2wakeup_.fetch(); - for (int i = 0; n2wakeup > 0 && i < COND_COUNT; i++) { - n2wakeup -= conds_[i].signal(n2wakeup); + uint32_t n2wakeup = 0; + //for (int p = PRIO - 1; p >= 0; p--) { + n2wakeup = n2wakeup_.fetch(); + // } + for (int p = PRIO - 1; n2wakeup > 0 && p >= 0; p--) { + for(int i = 0; n2wakeup > 0 && i < COND_COUNT; i++) { + n2wakeup -= conds_[i][p].signal(n2wakeup); + } } } private: Lock lock_ CACHE_ALIGNED; - CondPerCpu conds_[COND_COUNT]; + CondPerCpu conds_[COND_COUNT][PRIO]; Counter n2wakeup_; IdGen id_gen_; }; -}; // end namespace common -}; // end namespace oceanbase +using SCond = SCondTemp<1>; + +}; // end namespace common +}; // end namespace oceanbase #endif /* OCEANBASE_LOCK_OB_SCOND_H_ */ diff --git a/deps/oblib/src/lib/queue/ob_priority_queue.h b/deps/oblib/src/lib/queue/ob_priority_queue.h index e7ada57514..23d92394ea 100644 --- a/deps/oblib/src/lib/queue/ob_priority_queue.h +++ b/deps/oblib/src/lib/queue/ob_priority_queue.h @@ -107,10 +107,10 @@ private: DISALLOW_COPY_AND_ASSIGN(ObPriorityQueue); }; -template +template class ObPriorityQueue2 { public: - enum { PRIO_CNT = HIGH_PRIOS + LOW_PRIOS }; + enum { PRIO_CNT = HIGH_HIGH_PRIOS + HIGH_PRIOS + LOW_PRIOS }; ObPriorityQueue2() : queue_(), size_(0), limit_(INT64_MAX) {} @@ -150,10 +150,13 @@ public: } else if (OB_FAIL(queue_[priority].push(data))) { // do nothing } else { - cond_.signal(); - // if (priority < HIGH_PRIOS) { - // high_cond_.signal(); - // } + if (priority < HIGH_HIGH_PRIOS) { + cond_.signal(1, 0); + } else if (priority < HIGH_PRIOS + HIGH_HIGH_PRIOS) { + cond_.signal(1, 1); + } else { + cond_.signal(1, 2); + } } if (OB_FAIL(ret)) { @@ -162,6 +165,22 @@ public: return ret; } + int pop(ObLink*& data, int64_t timeout_us) + { + return do_pop(data, PRIO_CNT, timeout_us); + } + + int pop_high(ObLink*& data, int64_t timeout_us) + { + return do_pop(data, HIGH_PRIOS, timeout_us); + } + + int pop_high_high(ObLink*& data, int64_t timeout_us) + { + return do_pop(data, HIGH_HIGH_PRIOS, timeout_us); + } + +private: inline int do_pop(ObLink*& data, int64_t plimit, int64_t timeout_us) { int ret = OB_ENTRY_NOT_EXIST; @@ -169,8 +188,14 @@ public: ret = OB_INVALID_ARGUMENT; COMMON_LOG(ERROR, "timeout is invalid", K(ret), K(timeout_us)); } else { - cond_.prepare(); - for (int i = 0; OB_ENTRY_NOT_EXIST == ret && i < plimit; i++) { + if (plimit <= HIGH_HIGH_PRIOS) { + cond_.prepare(0); + } else if (plimit <= HIGH_PRIOS + HIGH_HIGH_PRIOS) { + cond_.prepare(1); + } else { + cond_.prepare(2); + } + for (int i = 0; OB_ENTRY_NOT_EXIST == ret && i < plimit; i++) { if (OB_SUCCESS == queue_[i].pop(data)) { ret = OB_SUCCESS; } @@ -185,18 +210,7 @@ public: return ret; } - int pop(ObLink*& data, int64_t timeout_us) - { - return do_pop(data, PRIO_CNT, timeout_us); - } - - int pop_high(ObLink*& data, int64_t timeout_us) - { - return do_pop(data, HIGH_PRIOS, timeout_us); - } - -private: - SCond cond_; + SCondTemp<3> cond_; ObLinkQueue queue_[PRIO_CNT]; int64_t size_ CACHE_ALIGNED; int64_t limit_ CACHE_ALIGNED; diff --git a/deps/oblib/unittest/lib/queue/test_priority_queue.cpp b/deps/oblib/unittest/lib/queue/test_priority_queue.cpp index ac24dc8133..441c767236 100644 --- a/deps/oblib/unittest/lib/queue/test_priority_queue.cpp +++ b/deps/oblib/unittest/lib/queue/test_priority_queue.cpp @@ -15,9 +15,11 @@ #include "lib/queue/ob_priority_queue.h" #include "lib/coro/co.h" #include "lib/thread/thread_pool.h" +#include using namespace oceanbase::lib; using namespace oceanbase::common; +using namespace std; class TestQueue : public ThreadPool { public: @@ -31,16 +33,18 @@ public: {} int64_t val_; }; - typedef ObPriorityQueue<3> Queue; - TestQueue() : seq_(0) + typedef ObPriorityQueue2<1, 2> Queue; + TestQueue(): push_seq_(0), pop_seq_(0) { - limit_ = atoll(getenv("limit") ?: "1000000"); + limit_ = atoll(getenv("limit")?: "1000000"); } virtual ~TestQueue() {} void do_stress() { set_thread_count(atoi(getenv("n_thread") ?: "8")); + n_pusher_ = atoi(getenv("n_pusher")?: "4"); + queue_.set_limit(65536); int ret = OB_SUCCESS; if (OB_FAIL(start())) { LIB_LOG(ERROR, "start fail", K(ret), K(errno)); @@ -51,59 +55,78 @@ public: } void print() { - int64_t last_seq = ATOMIC_LOAD(&seq_); - while (ATOMIC_LOAD(&seq_) < limit_) { + int64_t last_seq = ATOMIC_LOAD(&pop_seq_); + while (ATOMIC_LOAD(&pop_seq_) < limit_) { sleep(1); - int64_t cur_seq = ATOMIC_LOAD(&seq_); + int64_t cur_seq = ATOMIC_LOAD(&pop_seq_); LIB_LOG(INFO, "queue", "tps", BATCH * (cur_seq - last_seq)); last_seq = cur_seq; } } - int64_t get_seq() + int64_t get_seq(int64_t &seq) { - return ATOMIC_FAA(&seq_, 1); + return ATOMIC_FAA(&seq, 1); } int insert(int64_t seq) { int err = 0; QData* data = new QData(seq); - err = queue_.push(data, (int)data->val_ % 3); + err = queue_.push(data, data->val_ % 3); return err; } - int del(int64_t seq) + int del(uint64_t idx) { - UNUSED(seq); int err; QData* data = NULL; - err = queue_.pop((ObLink*&)data, 500); + if (idx == 0) { + err = queue_.pop_high_high((ObLink*&)data, 10000); + } else { + err = queue_.pop((ObLink*&)data, 10000); + } + // auto now = ObTimeUtility::current_time(); + // if (data) { + // if (now - data->val_ > 100) { + // cout << now - data->val_ << endl; + // } + // usleep(500); + // } delete data; return err; } + void run1() override { int ret = OB_SUCCESS; int64_t seq = 0; const uint64_t idx = get_thread_idx(); - while ((seq = get_seq()) < limit_) { - if (0 == (idx % 2)) { + cout << "idx: " << idx << endl; + if (idx >= get_thread_count() - n_pusher_) { + while ((seq = get_seq(push_seq_)) < limit_) { for (int i = 0; i < BATCH; i++) { + //::usleep(10000); + auto now = ObTimeUtility::current_time(); do { - ret = insert(i); + ret = insert(now); } while (OB_FAIL(ret)); } - } else { + } + } else { + while ((seq = get_seq(pop_seq_)) < limit_) { for (int i = 0; i < BATCH; i++) { do { - ret = del(i); + ret = del(idx); } while (OB_FAIL(ret)); } } } + std::cout << idx << " finished" << std::endl; } private: - int64_t seq_ CACHE_ALIGNED; + int64_t push_seq_ CACHE_ALIGNED; + int64_t pop_seq_ CACHE_ALIGNED; int64_t limit_; + int64_t n_pusher_; Queue queue_; }; // end of class Consumer diff --git a/src/observer/omt/ob_tenant.cpp b/src/observer/omt/ob_tenant.cpp index 164ea4d641..dbd999f801 100644 --- a/src/observer/omt/ob_tenant.cpp +++ b/src/observer/omt/ob_tenant.cpp @@ -647,7 +647,7 @@ int ObTenant::get_new_request(ObThWorker& w, int64_t timeout, rpc::ObRequest*& r if (OB_UNLIKELY(only_high_high_prio)) { // We must ensure at least one worker can process the highest // priority task. - ret = req_queue_.do_pop(task, QQ_HIGH + 1, timeout); + ret = req_queue_.pop_high_high(task, timeout); } else if (OB_UNLIKELY(only_high_prio)) { // We must ensure at least number of tokens of workers which don't // process low priority task. @@ -656,7 +656,9 @@ int ObTenant::get_new_request(ObThWorker& w, int64_t timeout, rpc::ObRequest*& r // If large requests exist and this worker doesn't have LQT but // can acquire, do it. ATOMIC_INC(&pop_normal_cnt_); - if (large_req_queue_.size() > 0 && !w.has_lq_token() && acquire_lq_token()) { + if (large_req_queue_.size() > 0 && + !w.has_lq_token() && + acquire_lq_token()) { w.set_lq_token(); } if (OB_LIKELY(!w.has_lq_token())) { diff --git a/src/observer/omt/ob_tenant.h b/src/observer/omt/ob_tenant.h index 074c287771..ed2ff386c9 100644 --- a/src/observer/omt/ob_tenant.h +++ b/src/observer/omt/ob_tenant.h @@ -558,7 +558,7 @@ protected: /// tenant task queue, // 'hp' for high priority and 'np' for normal priority - common::ObPriorityQueue2 req_queue_; + common::ObPriorityQueue2<1, QQ_MAX_PRIO - 1, RQ_MAX_PRIO - QQ_MAX_PRIO> req_queue_; common::ObLinkQueue large_req_queue_; // Create a request queue for each level of nested requests -- GitLab