提交 072fdf7c 编写于 作者: O obdev 提交者: ob-robot

[CP]:sync json pl manage from 42x to master

上级 dd7737c7
......@@ -579,7 +579,8 @@ ObJsonNode *ObJsonObject::clone(ObIAllocator* allocator, bool is_deep_copy) cons
for (uint64_t i = 0; i < len && OB_SUCC(ret); i++) {
if (is_deep_copy) {
char* str_buf = NULL;
if (OB_ISNULL(str_buf = static_cast<char*>(allocator->alloc(object_array_[i].get_key().length())))) {
bool is_key_empty = object_array_[i].get_key().length() == 0;
if (!is_key_empty && OB_ISNULL(str_buf = static_cast<char*>(allocator->alloc(object_array_[i].get_key().length())))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory failed", K(ret), K(object_array_[i].get_key().length()));
} else {
......
......@@ -34,6 +34,7 @@ ob_set_subtarget(ob_pl common
ob_pl_stmt.cpp
ob_pl_type.cpp
ob_pl_user_type.cpp
ob_pl_json_type.cpp
ob_pl_persistent.cpp
)
......
......@@ -23,6 +23,7 @@
#include "pl/ob_pl_compile.h"
#include "pl/ob_pl_code_generator.h"
#include "pl/ob_pl_user_type.h"
#include "pl/ob_pl_json_type.h"
#include "pl/ob_pl_stmt.h"
#include "pl/ob_pl_interface_pragma.h"
#include "observer/ob_server_struct.h"
......@@ -3027,6 +3028,13 @@ int ObPLExecState::final(int ret)
exec_ctx_bak_.restore(*ctx_.exec_ctx_);
}
if (OB_NOT_NULL(ctx_.exec_ctx_->get_my_session())) {
#ifdef OB_BUILD_ORACLE_PL
ObSQLSessionInfo *session = ctx_.exec_ctx_->get_my_session();
ObPlJsonTypeManager::release_useless_resource(session->get_json_pl_mngr());
#endif
}
return OB_SUCCESS;
}
......
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifdef OB_BUILD_ORACLE_PL
#define USING_LOG_PREFIX PL
#include "pl/ob_pl_user_type.h"
#include "pl/ob_pl_json_type.h"
namespace oceanbase
{
using namespace common;
using namespace share::schema;
using namespace jit;
using namespace obmysql;
using namespace sql;
namespace pl
{
int ObPLJsonBaseType::deep_copy(ObPLOpaque *dst)
{
int ret = OB_SUCCESS;
ObPlJsonNode* pl_node = data_;
ObJsonNode* ref_node = nullptr;
ObJsonNode* data_node = nullptr;
ObPlJsonTypeManager* pl_manager = nullptr;
if (OB_NOT_NULL(data_)) {
ref_node = pl_node->get_ref_node();
data_node = pl_node->get_data_node();
pl_manager = pl_node->get_manager();
}
ObPLJsonBaseType *copy = NULL;
OZ (ObPLOpaque::deep_copy(dst));
CK (OB_NOT_NULL(copy = new(dst)ObPLJsonBaseType()));
OX (copy->set_err_behavior(static_cast<int32_t>(behavior_)));
if (OB_SUCC(ret) && OB_NOT_NULL(data_)) {
ObPlJsonNode* dup = nullptr;
if (OB_FAIL(pl_manager->create_empty_node(dup))) {
LOG_WARN("fail to create empty node", K(ret), K(pl_manager->get_map_count()),
K(pl_manager->get_list_count()), K(pl_manager->get_alloc_count()),
K(pl_manager->get_free_count()), K(pl_manager->get_holding_count()));
} else if (OB_FAIL(dup->assign(pl_node))) {
LOG_WARN("fail to assign node", K(ret));
} else {
copy->set_data(dup);
if (OB_FAIL(pl_manager->check_candidate_list())) {
LOG_WARN("fail to checkin candidate list node.", K(ret));
}
}
}
return ret;
}
ObPlJsonNode::ObPlJsonNode(ObPlJsonTypeManager *pl_handle, ObJsonNode* json_node)
: ObPlJsonNode(pl_handle)
{
set_data_node(json_node);
increase_ref();
}
int ObPlJsonNode::assign(ObPlJsonNode* from)
{
int ret = OB_SUCCESS;
if (from->get_ref_node()) {
ObJsonNode* origin = from->get_ref_node();
ObPlJsonNode* from_origin = nullptr;
if (OB_FAIL(pl_manager_->get_node(origin, from_origin))) {
LOG_WARN("fail to get node from manger", K(ret));
} else {
from_origin->increase_ref();
set_ref_node(from_origin->get_data_node());
set_data_node(from->get_data_node());
}
} else {
from->increase_ref();
set_ref_node(from->get_data_node());
set_data_node(from->get_data_node());
}
return ret;
}
int ObPlJsonNode::clone(ObPlJsonNode* other, bool is_deep_copy)
{
return clone(other->get_data_node(), is_deep_copy);
}
int ObPlJsonNode::clone(ObJsonNode* other, bool is_deep_copy)
{
int ret = OB_SUCCESS;
ObJsonNode *json_dst = other->clone(&allocator_, true);
if (OB_ISNULL(json_dst)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("alloc memory for clone json node failed", K(ret));
} else {
set_data_node(json_dst);
increase_ref();
}
return ret;
}
int ObPlJsonNode::parse_tree(const ObString& text, ObJsonInType in_type)
{
int ret = OB_SUCCESS;
ObJsonNode* tree = nullptr;
if (OB_FAIL(ObJsonBaseFactory::get_json_tree(&allocator_, text, in_type, tree, ObJsonParser::JSN_RELAXED_FLAG))) {
LOG_WARN("fail to get json base", K(ret), K(in_type));
} else {
set_data_node(tree);
increase_ref();
}
return ret;
}
int ObPlJsonNode::unref()
{
int ret = OB_SUCCESS;
if (OB_NOT_NULL(get_ref_node())) {
ObPlJsonNode* dom_node = nullptr;
if (OB_FAIL(pl_manager_->get_node(get_ref_node(), dom_node))) {
LOG_WARN("fail to get node from manger", K(ret));
} else if (OB_ISNULL(dom_node)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to unassign get dom node is null", K(ret));
} else if (dom_node->decrease_ref() == 0) {
if (OB_FAIL(pl_manager_->remove_node(dom_node, false))) {
LOG_WARN("fail to remove node from manger", K(ret));
}
}
} else if (decrease_ref() == 0) {
if (OB_FAIL(pl_manager_->remove_node(this, false))) {
LOG_WARN("fail to remove node from manger", K(ret));
}
}
return ret;
}
void ObPlJsonNode::reuse()
{
data_ = nullptr;
origin_ = nullptr;
ref_count_ = 0;
ref_type_ = 0;
allocator_.reset();
}
void ObPlJsonNode::free()
{
reuse();
}
int ObPlJsonTypeManager::destroy_node(ObPlJsonNode* node)
{
int ret = OB_SUCCESS;
if (OB_FAIL(node->unref())) {
LOG_WARN("failed to unref current node", K(ret));
} else if (OB_FAIL(check_candidate_list())) {
LOG_WARN("failed to eliminate node", K(ret));
} else if (node->get_ref_node()) {
free_empty_node(node);
}
return ret;
}
ObPlJsonTypeManager::ObPlJsonTypeManager(uint64_t tenant_id)
: dom_node_allocator_(sizeof(ObPlJsonNode),
common::OB_MALLOC_NORMAL_BLOCK_SIZE - 32,
ObMalloc(lib::ObMemAttr(tenant_id, "JsonPlDom"))),
candidates_(dom_node_allocator_),
json_dom_map_(),
tenant_id_(tenant_id),
is_init_(false)
{
}
void ObPlJsonTypeManager::free_empty_node(ObPlJsonNode* node)
{
dom_node_allocator_.free(node);
++free_count_;
}
int ObPlJsonTypeManager::create_empty_node(ObPlJsonNode*& res)
{
int ret = OB_SUCCESS;
ObPlJsonNode* tmp = nullptr;
if (OB_ISNULL(tmp = static_cast<ObPlJsonNode*>(dom_node_allocator_.alloc(PL_JSON_DOM_LEN)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc empty dom node", K(ret), K(*this));
} else {
res = new (tmp) ObPlJsonNode(this);
++alloc_count_;
}
LOG_DEBUG("json pl manager statistic:", K(get_map_count()), K(get_list_count()),
K(get_alloc_count()), K(get_free_count()), K(get_holding_count()));
return ret;
}
int ObPlJsonTypeManager::create_new_node(ObJsonNode* data, ObPlJsonNode*& res)
{
int ret = OB_SUCCESS;
ObPlJsonNode* tmp = nullptr;
ObJsonNode* clone = nullptr;
if (OB_FAIL(init())) {
LOG_WARN("failed to init", K(ret));
} else if (OB_FAIL(create_empty_node(tmp))) {
LOG_WARN("failed to create empty node", K(ret), K(get_map_count()), K(get_list_count()),
K(get_alloc_count()), K(get_free_count()), K(get_holding_count()));
} else if (OB_ISNULL(clone = data->clone(&tmp->get_allocator(), true))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to clone node", K(ret));
} else {
tmp->set_data_node(clone);
tmp->increase_ref();
if (OB_FAIL(add_node(tmp))) {
LOG_WARN("failed to add tree", K(ret), K(get_map_count()), K(get_list_count()),
K(get_alloc_count()), K(get_free_count()), K(get_holding_count()));
} else {
res = tmp;
}
}
return ret;
}
int ObPlJsonTypeManager::create_ref_node(ObJsonNode* data, ObJsonNode* ref, ObPlJsonNode*& res)
{
int ret = OB_SUCCESS;
ObPlJsonNode* tmp = nullptr;
ObPlJsonNode* origin = nullptr;
if (OB_FAIL(init())) {
LOG_WARN("failed to init", K(ret));
} else if (OB_FAIL(get_node(ref, origin))) {
LOG_WARN("failed to get node", K(ret));
} else if (OB_FAIL(create_empty_node(tmp))) {
LOG_WARN("failed to create empty node", K(ret), K(get_map_count()), K(get_list_count()),
K(get_alloc_count()), K(get_free_count()), K(get_holding_count()));
} else {
tmp->set_data_node(data);
tmp->set_ref_node(origin->get_data_node());
origin->increase_ref();
res = tmp;
}
return ret;
}
int ObPlJsonTypeManager::create_new_node(const ObString& text, ObJsonInType in_type, ObPlJsonNode*& res)
{
int ret = OB_SUCCESS;
ObPlJsonNode* tmp = nullptr;
if (OB_FAIL(init())) {
LOG_WARN("failed to init", K(ret));
} else if (OB_FAIL(create_empty_node(tmp))) {
LOG_WARN("failed to create empty node", K(ret), K(get_map_count()), K(get_list_count()),
K(get_alloc_count()), K(get_free_count()), K(get_holding_count()));
} else if (OB_FAIL(tmp->parse_tree(text, in_type))) {
LOG_WARN("failed to parse tree", K(ret));
} else if (OB_FAIL(add_node(tmp))) {
LOG_WARN("failed to add tree", K(ret), K(get_map_count()), K(get_list_count()),
K(get_alloc_count()), K(get_free_count()), K(get_holding_count()));
} else {
res = tmp;
}
return ret;
}
int ObPlJsonTypeManager::init()
{
int ret = OB_SUCCESS;
if (!is_init_) {
ObMemAttr bucket_attr(tenant_id_, "jsonPlBucket");
ObMemAttr node_attr(tenant_id_, "jsonPlBuckNode");
if (OB_FAIL(json_dom_map_.create(JSON_PL_BUCKET_NUM, bucket_attr, node_attr))) {
LOG_WARN("failed to create json bucket num", K(ret));
} else {
is_init_ = true;
}
}
return ret;
}
int ObPlJsonTypeManager::add_node(ObPlJsonNode* node)
{
int ret = OB_SUCCESS;
ObPlJsonNode* value = nullptr;
if (OB_FAIL(init())) {
LOG_WARN("failed to init", K(ret));
} else if (OB_NOT_NULL(node->data_)
&& OB_FAIL(json_dom_map_.get_refactored(reinterpret_cast<uint64_t>(node->data_), value))) {
if (ret == OB_HASH_NOT_EXIST) {
if (OB_FAIL(json_dom_map_.set_refactored(reinterpret_cast<uint64_t>(node->data_), node))) {
LOG_WARN("failed to set json pl object into bucket.", K(ret));
}
}
}
return ret;
}
int ObPlJsonTypeManager::remove_node(ObPlJsonNode* node, bool do_force)
{
int ret = OB_SUCCESS;
ObPlJsonNode* dom_value = nullptr;
if (OB_FAIL(init())) {
LOG_WARN("failed to init", K(ret));
} else if (OB_FAIL(json_dom_map_.get_refactored(reinterpret_cast<uint64_t>(node->data_), dom_value))) {
if (ret != OB_HASH_NOT_EXIST) {
LOG_WARN("failed to set json pl object into bucket.", K(ret));
} else {
ret = OB_SUCCESS;
}
} else if (!do_force && OB_FAIL(candidates_.push_front(node))) {
LOG_WARN("failed to add into candidates list.", K(ret));
} else if (OB_FAIL(json_dom_map_.erase_refactored(reinterpret_cast<uint64_t>(node->data_)))) {
LOG_WARN("failed to remove from candidates list.", K(ret));
}
return ret;
}
int ObPlJsonTypeManager::get_node(ObJsonNode* node, ObPlJsonNode*& value)
{
int ret = OB_SUCCESS;
value = nullptr;
ObPlJsonNode* dom_value = nullptr;
if (OB_FAIL(init())) {
LOG_WARN("failed to init", K(ret));
} else if (OB_FAIL(json_dom_map_.get_refactored(reinterpret_cast<uint64_t>(node), dom_value))) {
if (ret != OB_HASH_NOT_EXIST) {
LOG_WARN("failed to set json pl object into bucket.", K(ret));
} else {
ObList<ObPlJsonNode*, ObIAllocator>::iterator it = candidates_.begin();
for (; it != candidates_.end(); ++it) {
ObPlJsonNode* temp = *it;
if (temp->data_ == node) {
value = temp;
break;
}
}
if (OB_NOT_NULL(value)) {
ret = OB_SUCCESS;
}
}
} else if (OB_ISNULL(dom_value)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get json node from hash.", K(ret));
} else {
value = dom_value;
}
return ret;
}
int ObPlJsonTypeManager::check_candidate_list()
{
int ret = OB_SUCCESS;
if (OB_FAIL(init())) {
LOG_WARN("failed to init", K(ret));
} else {
ObList<ObPlJsonNode*, ObIAllocator>::iterator it = candidates_.begin();
for (; it != candidates_.end(); ) {
ObPlJsonNode* temp = *it;
if (temp->ref_count()) {
if (OB_FAIL(json_dom_map_.set_refactored(reinterpret_cast<uint64_t>(temp->data_), temp))) {
LOG_WARN("failed to init", K(ret));
} else {
++it;
}
} else if (!temp->ref_count()) {
++it;
free(temp);
}
}
}
return ret;
}
void ObPlJsonTypeManager::free(ObPlJsonNode* node)
{
node->free();
candidates_.erase(node);
dom_node_allocator_.free(node);
}
void ObPlJsonTypeManager::destroy()
{
ObJsonDomMap::iterator iter = json_dom_map_.begin();
for (; iter != json_dom_map_.end(); ) {
ObPlJsonNode* node = iter->second;
if (OB_NOT_NULL(node)) {
iter++;
node->free();
} else {
++iter;
}
}
candidates_.clear();
json_dom_map_.destroy();
dom_node_allocator_.reset();
is_init_ = false;
}
uint64_t ObPlJsonTypeManager::get_map_count()
{
return json_dom_map_.size();
}
uint64_t ObPlJsonTypeManager::get_list_count()
{
return candidates_.size();
}
uint64_t ObPlJsonTypeManager::get_alloc_count()
{
return alloc_count_;
}
uint64_t ObPlJsonTypeManager::get_free_count()
{
return free_count_;
}
uint64_t ObPlJsonTypeManager::get_holding_count()
{
return alloc_count_ - free_count_;
}
void ObPlJsonTypeManager::release(intptr_t handle)
{
ObPlJsonTypeManager* manager = reinterpret_cast<ObPlJsonTypeManager*>(handle);
if (manager) {
manager->destroy();
}
}
void ObPlJsonTypeManager::release_useless_resource(intptr_t handle)
{
ObPlJsonTypeManager* manager = reinterpret_cast<ObPlJsonTypeManager*>(handle);
if (manager) {
manager->check_candidate_list();
}
}
} // namespace pl
} // namespace oceanbase
#endif
\ No newline at end of file
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifdef OB_BUILD_ORACLE_PL
#ifndef DEV_SRC_PL_OB_PL_JSON_TYPE_H_
#define DEV_SRC_PL_OB_PL_JSON_TYPE_H_
#include "pl/ob_pl_type.h"
#include "lib/hash/ob_hashmap.h"
#include "lib/json_type/ob_json_tree.h"
#include "lib/json_type/ob_json_parse.h"
#include "pl/ob_pl_user_type.h"
namespace oceanbase
{
namespace pl
{
struct ObPlJsonNode;
class ObPlJsonTypeManager {
public:
const uint32_t JSON_PL_BUCKET_NUM = 1000;
typedef common::hash::ObHashMap<uint64_t, ObPlJsonNode*, common::hash::NoPthreadDefendMode> ObJsonDomMap;
ObPlJsonTypeManager(uint64_t tenant_id);
int create_new_node(const ObString& text, ObJsonInType in_type, ObPlJsonNode*& res);
int create_ref_node(ObJsonNode* data, ObJsonNode* ref, ObPlJsonNode*& res);
int create_new_node(ObJsonNode* data, ObPlJsonNode*& res);
int create_empty_node(ObPlJsonNode*& res);
void free_empty_node(ObPlJsonNode* node);
int destroy_node(ObPlJsonNode* node);
int add_node(ObPlJsonNode*);
int remove_node(ObPlJsonNode*, bool force = true);
int get_node(ObJsonNode*, ObPlJsonNode*&);
int init();
int check_candidate_list();
void destroy();
void free(ObPlJsonNode* node);
common::ObIAllocator* get_dom_node_allocator() { return &dom_node_allocator_; }
static void release(intptr_t handle);
static void release_useless_resource(intptr_t handle);
uint64_t get_map_count();
uint64_t get_list_count();
uint64_t get_alloc_count();
uint64_t get_free_count();
uint64_t get_holding_count();
common::ObSmallBlockAllocator<> dom_node_allocator_;
ObList<ObPlJsonNode*, common::ObIAllocator> candidates_;
ObJsonDomMap json_dom_map_;
uint64_t tenant_id_;
bool is_init_;
uint64_t alloc_count_;
uint64_t free_count_;
TO_STRING_KV("alloc total size", dom_node_allocator_.get_total_mem_size(),
"node map count", json_dom_map_.size(),
"list count", candidates_.size(),
K_(tenant_id),
K_(alloc_count),
K_(free_count));
};
struct ObPlJsonNode {
ObPlJsonNode(ObPlJsonTypeManager *pl_handle)
: data_(nullptr),
origin_(nullptr),
ref_count_(0),
ref_type_(0),
allocator_(ObMemAttr(pl_handle->tenant_id_, "JsonPlManager"), OB_MALLOC_NORMAL_BLOCK_SIZE),
pl_manager_(pl_handle) {}
ObPlJsonNode(ObPlJsonTypeManager *pl_handle,
ObJsonNode* json_node);
int parse_tree(const ObString& text, ObJsonInType in_type);
common::ObIAllocator& get_allocator() { return allocator_; }
int clone(ObPlJsonNode* other, bool is_deep_copy = true);
int clone(ObJsonNode* other, bool is_deep_copy = true);
int32_t increase_ref() { return ++ref_count_; }
int32_t decrease_ref() { return --ref_count_; }
void set_data_node(ObJsonNode* node) { data_ = node; }
void set_ref_node(ObJsonNode* node) { origin_ = node; }
ObJsonNode* get_ref_node() { return origin_; }
ObJsonNode* get_data_node() { return data_; }
int32_t ref_count() { return ref_count_; }
ObPlJsonTypeManager* get_manager() { return pl_manager_; }
int unref();
int assign(ObPlJsonNode* from);
void reuse();
void free();
ObJsonNode* data_; // for save current using obj
ObJsonNode* origin_; // for save reference original obj
int32_t ref_count_;
int32_t ref_type_;
common::ObArenaAllocator allocator_;
ObPlJsonTypeManager *pl_manager_;
TO_STRING_KV(KPC(data_), KPC(origin_), K_(ref_count), KPC(pl_manager_));
};
static uint32_t PL_JSON_DOM_LEN = sizeof(ObPlJsonNode);
class ObPLJsonBaseType : public ObPLOpaque
{
public:
enum JSN_ERR_BEHAVIOR {
JSN_PL_NULL_ON_ERR,
JSN_PL_ERR_ON_ERR,
JSN_PL_ERR_ON_EMP,
JSN_PL_ERR_ON_MISMATCH,
JSN_PL_ERR_ON_INVALID = 7
};
ObPLJsonBaseType()
: ObPLOpaque(ObPLOpaqueType::PL_JSON_TYPE),
data_(NULL),
behavior_(0)
{}
void destroy()
{
if (OB_NOT_NULL(data_)) {
ObPlJsonTypeManager* manager = data_->get_manager();
manager->destroy_node(data_);
}
data_ = NULL;
behavior_ = 0;
}
virtual ~ObPLJsonBaseType()
{
destroy();
}
public:
virtual int deep_copy(ObPLOpaque *dst);
void set_data(ObPlJsonNode *data) { data_ = data; }
void set_err_behavior(int behavior) { behavior_ = behavior; }
int get_err_behavior() { return behavior_ ; }
ObPlJsonNode* get_data() { return data_; }
TO_STRING_KV(KPC(data_), K_(behavior));
private:
ObPlJsonNode *data_;
int behavior_;
};
} // namespace pl
} // namespace oceanbase
#endif /* DEV_SRC_PL_OB_PL_JSON_TYPE_H_ */
#endif
\ No newline at end of file
......@@ -26,6 +26,7 @@
#include "pl/ob_pl_allocator.h"
#include "share/ob_lob_access_utils.h"
#include "observer/mysql/ob_query_driver.h"
#include "lib/json_type/ob_json_parse.h"
namespace oceanbase
{
......@@ -382,6 +383,14 @@ int ObUserDefinedType::destruct_obj(ObObj &src, ObSQLSessionInfo *session)
case PL_OPAQUE_TYPE: {
ObPLOpaque *opaque = reinterpret_cast<ObPLOpaque*>(src.get_ext());
CK (OB_NOT_NULL(opaque));
// json pl object manage
if (OB_NOT_NULL(opaque) && opaque->is_json_type()) {
ObPLJsonBaseType* pl_jsontype = static_cast<ObPLJsonBaseType*>(opaque);
ObPlJsonNode* pl_json_node = pl_jsontype->get_data();
if (OB_NOT_NULL(pl_json_node) && OB_NOT_NULL(pl_json_node->get_data_node())) {
pl_jsontype->destroy();
}
}
OX (opaque->~ObPLOpaque());
}
break;
......@@ -5606,32 +5615,6 @@ int ObPLXmlType::deep_copy(ObPLOpaque *dst)
return ret;
}
int ObPLJsonBaseType::deep_copy(ObPLOpaque *dst)
{
int ret = OB_SUCCESS;
ObPLJsonBaseType *copy = NULL;
OZ (ObPLOpaque::deep_copy(dst));
CK (OB_NOT_NULL(copy = new(dst)ObPLJsonBaseType()));
OX (copy->set_err_behavior(static_cast<int32_t>(behavior_)));
if (OB_SUCC(ret) && OB_NOT_NULL(data_)) {
if (need_shallow_copy()) {
copy->set_data(data_);
copy->set_shallow_copy(1);
} else {
ObJsonNode *json_dst = data_->clone(&copy->get_allocator(), true);
if (OB_ISNULL(json_dst)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("alloc memory for clone json node failed", K(ret));
} else {
copy->set_data(json_dst);
}
}
}
return ret;
}
//---------- for ObPLVarray ----------
int ObPLVArray::deep_copy(ObPLCollection *src, ObIAllocator *allocator, bool ignore_del_element)
......
......@@ -14,6 +14,7 @@
#define DEV_SRC_PL_OB_PL_USER_TYPE_H_
#include "pl/ob_pl_type.h"
#include "rpc/obmysql/ob_mysql_util.h"
#include "lib/hash/ob_hashmap.h"
#include "lib/hash/ob_array_index_hash_set.h"
#include "lib/container/ob_array_wrap.h"
#include "lib/json_type/ob_json_tree.h"
......@@ -1438,47 +1439,6 @@ private:
ObObj *data_;
};
class ObPLJsonBaseType : public ObPLOpaque
{
public:
enum JSN_ERR_BEHAVIOR {
JSN_PL_NULL_ON_ERR,
JSN_PL_ERR_ON_ERR,
JSN_PL_ERR_ON_EMP,
JSN_PL_ERR_ON_MISMATCH,
JSN_PL_ERR_ON_INVALID = 7
};
ObPLJsonBaseType()
: ObPLOpaque(ObPLOpaqueType::PL_JSON_TYPE),
data_(NULL),
behavior_(0),
is_shallow_copy_(0)
{}
virtual ~ObPLJsonBaseType()
{
data_ = NULL;
behavior_ = 0;
}
public:
virtual int deep_copy(ObPLOpaque *dst);
void set_data(ObJsonNode *data) { data_ = data; }
void set_err_behavior(int behavior) { behavior_ = behavior; }
int get_err_behavior() { return behavior_ ; }
ObJsonNode* get_data() { return data_; }
bool need_shallow_copy() { return is_shallow_copy_ > 0; }
void set_shallow_copy(int value) { is_shallow_copy_ = value; }
TO_STRING_KV(KPC(data_), K_(behavior), K_(is_shallow_copy));
private:
ObJsonNode *data_;
int behavior_;
int is_shallow_copy_;
};
#endif
} // namespace pl
......
......@@ -34,6 +34,7 @@ private:
static int get_array_value(sql::ObExecContext &ctx,
sql::ParamStore &params,
ObJsonNode *&json_val,
ObPlJsonNode *&pl_json_node,
int& error_behavior,
int expect_param_nums = 2);
};
......
......@@ -65,6 +65,7 @@ private:
static int get_object_value(sql::ObExecContext &ctx,
sql::ParamStore &params,
ObJsonNode *&json_val,
ObPlJsonNode *&pl_json_node,
int& error_behavior,
int expect_param_nums = 2);
static int get_lob_proc(sql::ObExecContext &ctx, sql::ParamStore &params, common::ObObj &result, bool is_clob);
......
......@@ -17,7 +17,7 @@
#include "lib/json_type/ob_json_tree.h"
#include "sql/engine/ob_exec_context.h"
#include "sql/session/ob_sql_session_info.h"
#include "pl/ob_pl_user_type.h"
#include "pl/ob_pl_json_type.h"
namespace oceanbase
{
......@@ -41,7 +41,11 @@ public:
static int parse(sql::ObExecContext &ctx, sql::ParamStore &params,
common::ObObj &result, ObJsonNodeType expect_type = ObJsonNodeType::J_ERROR);
static int get_jsontree(sql::ObExecContext &ctx, ObObj &obj, ObJsonNode *&json_doc, int32_t &err_behavior);
static int get_jsontree(sql::ObExecContext &ctx,
ObObj &obj,
ObJsonNode*& json_doc,
ObPlJsonNode*& pl_json_node,
int32_t &err_behavior);
static int get_jsontype(sql::ObExecContext &ctx,
sql::ParamStore &params,
ObJsonNodeType &json_type,
......@@ -51,9 +55,18 @@ public:
static int make_jsontype(sql::ObExecContext &ctx, const ObString &str,
ObJsonInType in_type, ObJsonNodeType expect_type,
ObPLJsonBaseType *&jsontype);
static int make_jsontype(sql::ObExecContext &ctx,
ObJsonNode* data,
int behavior,
ObPLJsonBaseType *&jsontype);
static int transform_JsonBase_2_PLJsonType(sql::ObExecContext &ctx,
ObJsonNode* json_val,
ObPLJsonBaseType *&jsontype);
static int transform_JsonBase_2_PLJsonType(sql::ObExecContext &ctx,
ObJsonNode* json_ref_val,
ObJsonNode* json_val,
ObPLJsonBaseType *&jsontype);
static int print_decimal(number::ObNumber &num, ObScale scale, ObJsonBuffer &j_buf);
static int get_json_object(sql::ObExecContext &ctx, ObJsonNode*& json_val);
static int get_json_array(sql::ObExecContext &ctx, ObJsonNode*& json_val);
......
......@@ -98,22 +98,20 @@ int ObExprTreat::cg_expr(ObExprCGCtx &expr_cg_ctx,
static int treat_as_json_udt(const ObExpr &expr, ObEvalCtx &ctx, common::ObIAllocator &temp_allocator,
pl::ObPLOpaque *opaque, ObDatum &res) {
INIT_SUCC(ret);
ObJsonNode *json_doc = nullptr;
pl::ObPLJsonBaseType *jsontype = nullptr;
pl::ObPlJsonNode *pl_json_node = nullptr;
pl::ObPLJsonBaseType *new_jsontype = nullptr;
ObObj res_obj;
if(OB_ISNULL(jsontype = static_cast<pl::ObPLJsonBaseType*>(opaque))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("cast to json type is null", K(ret), K(opaque));
} else if(OB_ISNULL(json_doc = jsontype->get_data())) {
} else if(OB_ISNULL(pl_json_node = jsontype->get_data())) {
res.set_null();
} else {
ObJsonNode * json_node_copy = nullptr;
if (OB_ISNULL(json_node_copy = json_doc->clone(&ctx.exec_ctx_.get_allocator()))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to clone json node", K(ret));
} else if (OB_FAIL(pl::ObPlJsonUtil::transform_JsonBase_2_PLJsonType(ctx.exec_ctx_, json_node_copy, new_jsontype))) {
if (OB_FAIL(pl::ObPlJsonUtil::transform_JsonBase_2_PLJsonType(ctx.exec_ctx_,
pl_json_node->get_ref_node() ? pl_json_node->get_ref_node() : pl_json_node->get_data_node(),
new_jsontype))) {
LOG_WARN("failed to transfrom ObJsonNode to ObPLJsonBaseType", K(ret));
} else if(OB_ISNULL(new_jsontype)) {
ret = OB_ERR_UNEXPECTED;
......
......@@ -36,6 +36,7 @@
#include "share/rc/ob_tenant_base.h"
#include "pl/sys_package/ob_dbms_sql.h"
#include "pl/ob_pl_package_state.h"
#include "pl/ob_pl_json_type.h"
#include "rpc/obmysql/ob_sql_sock_session.h"
#include "sql/engine/expr/ob_expr_regexp_context.h"
......@@ -94,6 +95,7 @@ ObBasicSessionInfo::ObBasicSessionInfo(const uint64_t tenant_id)
package_info_allocator_(sizeof(pl::ObPLPackageState), common::OB_MALLOC_NORMAL_BLOCK_SIZE - 32,
ObMalloc(lib::ObMemAttr(orig_tenant_id_, "SessPackageInfo"))),
name_pool_(lib::ObMemAttr(orig_tenant_id_, ObModIds::OB_SQL_SESSION), OB_MALLOC_NORMAL_BLOCK_SIZE),
json_pl_mngr_(0),
trans_flags_(),
sql_scope_flags_(),
need_reset_package_(false),
......@@ -286,6 +288,9 @@ void ObBasicSessionInfo::destroy()
}
total_stmt_tables_.reset();
cur_stmt_tables_.reset();
#ifdef OB_BUILD_ORACLE_PL
pl::ObPlJsonTypeManager::release(get_json_pl_mngr());
#endif
}
void ObBasicSessionInfo::clean_status()
......@@ -6657,5 +6662,48 @@ observer::ObSMConnection *ObBasicSessionInfo::get_sm_connection()
}
return conn;
}
void ObBasicSessionInfo::destory_json_pl_mngr()
{
#ifdef OB_BUILD_ORACLE_PL
if (json_pl_mngr_) {
pl::ObPlJsonTypeManager* handle = reinterpret_cast<pl::ObPlJsonTypeManager*>(json_pl_mngr_);
handle->destroy();
common::ObIAllocator& allocator = get_session_allocator();
allocator.free(handle);
json_pl_mngr_ = 0;
}
#endif
}
intptr_t ObBasicSessionInfo::get_json_pl_mngr()
{
int ret = OB_SUCCESS;
#ifdef OB_BUILD_ORACLE_PL
if (!json_pl_mngr_) {
common::ObIAllocator& allocator = get_session_allocator();
pl::ObPlJsonTypeManager* handle = static_cast<pl::ObPlJsonTypeManager*>(
allocator.alloc(sizeof(pl::ObPlJsonTypeManager)));
if (OB_ISNULL(handle)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate handle", K(ret));
} else {
handle = new (handle) pl::ObPlJsonTypeManager(orig_tenant_id_);
if (OB_FAIL(handle->init())) {
allocator.free(handle);
LOG_WARN("failed to init json pl type manager", K(ret));
} else {
json_pl_mngr_ = reinterpret_cast<intptr_t>(handle);
}
}
}
#else
ret = OB_NOT_SUPPORTED;
LOG_WARN("failed to create json pl type manager", K(ret));
#endif
return json_pl_mngr_;
}
}//end of namespace sql
}//end of namespace oceanbase
......@@ -592,8 +592,8 @@ public:
const common::ObLogIdLevelMap *get_log_id_level_map() const;
const common::ObString &get_client_version() const { return client_version_; }
const common::ObString &get_driver_version() const { return driver_version_; }
void destory_json_pl_mngr();
intptr_t get_json_pl_mngr();
int get_tx_timeout(int64_t &tx_timeout) const
{
tx_timeout = sys_vars_cache_.get_ob_trx_timeout();
......@@ -2181,6 +2181,7 @@ protected:
common::ObSmallBlockAllocator<> cursor_info_allocator_; // for alloc memory of PS CURSOR/SERVER REF CURSOR
common::ObSmallBlockAllocator<> package_info_allocator_; // for alloc memory of session package state
common::ObStringBuf name_pool_; // for variables names and statement names
intptr_t json_pl_mngr_; // for pl json manage
TransFlags trans_flags_;
SqlScopeFlags sql_scope_flags_;
bool need_reset_package_; // for dbms_session.reset_package
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册