提交 fbedc9b5 编写于 作者: W WenJinyu 提交者: LINGuanRen

support delete mode tag(oss)

上级 e585fa1a
...@@ -28,6 +28,12 @@ enum StorageOpenMode { ...@@ -28,6 +28,12 @@ enum StorageOpenMode {
}; };
class ObIStorageUtil { class ObIStorageUtil {
public: public:
enum {
NONE = 0,
DELETE = 1,
TAGGING = 2,
MAX
};
virtual int is_exist(const common::ObString& uri, const common::ObString& storage_info, bool& exist) = 0; virtual int is_exist(const common::ObString& uri, const common::ObString& storage_info, bool& exist) = 0;
virtual int get_file_length( virtual int get_file_length(
const common::ObString& uri, const common::ObString& storage_info, int64_t& file_length) = 0; const common::ObString& uri, const common::ObString& storage_info, int64_t& file_length) = 0;
......
...@@ -274,7 +274,11 @@ void ObOssEnvIniter::global_destroy() ...@@ -274,7 +274,11 @@ void ObOssEnvIniter::global_destroy()
} }
} }
ObStorageOssBase::ObStorageOssBase() : aos_pool_(NULL), oss_option_(NULL), is_inited_(false) ObStorageOssBase::ObStorageOssBase() :
aos_pool_(NULL),
oss_option_(NULL),
is_inited_(false),
delete_mode_(ObIStorageUtil::DELETE)
{ {
memset(oss_domain_, 0, OB_MAX_URI_LENGTH); memset(oss_domain_, 0, OB_MAX_URI_LENGTH);
memset(oss_endpoint_, 0, MAX_OSS_ENDPOINT_LENGTH); memset(oss_endpoint_, 0, MAX_OSS_ENDPOINT_LENGTH);
...@@ -293,6 +297,7 @@ void ObStorageOssBase::reset() ...@@ -293,6 +297,7 @@ void ObStorageOssBase::reset()
memset(oss_endpoint_, 0, MAX_OSS_ENDPOINT_LENGTH); memset(oss_endpoint_, 0, MAX_OSS_ENDPOINT_LENGTH);
memset(oss_id_, 0, MAX_OSS_ID_LENGTH); memset(oss_id_, 0, MAX_OSS_ID_LENGTH);
memset(oss_key_, 0, MAX_OSS_KEY_LENGTH); memset(oss_key_, 0, MAX_OSS_KEY_LENGTH);
delete_mode_ = ObIStorageUtil::DELETE;
if (is_inited_) { if (is_inited_) {
if (NULL != aos_pool_) { if (NULL != aos_pool_) {
aos_pool_destroy(aos_pool_); aos_pool_destroy(aos_pool_);
...@@ -324,6 +329,23 @@ int ObStorageOssBase::init(const common::ObString& storage_info) ...@@ -324,6 +329,23 @@ int ObStorageOssBase::init(const common::ObString& storage_info)
return ret; return ret;
} }
int ObStorageOssBase::set_delete_mode(const char *parameter)
{
int ret = OB_SUCCESS;
if (NULL == parameter) {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "invalid args", K(ret), KP(parameter));
} else if (0 == strcmp(parameter, "delete")) {
delete_mode_ = ObIStorageUtil::DELETE;
} else if (0 == strcmp(parameter, "tagging")) {
delete_mode_ = ObIStorageUtil::TAGGING;
} else {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "delete mode is invalid", K(ret), K(parameter));
}
return ret;
}
int ObStorageOssBase::parse_oss_arg(const common::ObString& storage_info) int ObStorageOssBase::parse_oss_arg(const common::ObString& storage_info)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
...@@ -342,6 +364,7 @@ int ObStorageOssBase::parse_oss_arg(const common::ObString& storage_info) ...@@ -342,6 +364,7 @@ int ObStorageOssBase::parse_oss_arg(const common::ObString& storage_info)
const char* HOST = "host="; const char* HOST = "host=";
const char* ACCESS_ID = "access_id="; const char* ACCESS_ID = "access_id=";
const char* ASSCESS_KEY = "access_key="; const char* ASSCESS_KEY = "access_key=";
const char *DELETE_MODE = "delete_mode=";
MEMCPY(tmp, storage_info.ptr(), storage_info.length()); MEMCPY(tmp, storage_info.ptr(), storage_info.length());
tmp[storage_info.length()] = '\0'; tmp[storage_info.length()] = '\0';
...@@ -362,6 +385,10 @@ int ObStorageOssBase::parse_oss_arg(const common::ObString& storage_info) ...@@ -362,6 +385,10 @@ int ObStorageOssBase::parse_oss_arg(const common::ObString& storage_info)
if (OB_FAIL(set_oss_field(token + strlen(ASSCESS_KEY), oss_key_, sizeof(oss_key_)))) { if (OB_FAIL(set_oss_field(token + strlen(ASSCESS_KEY), oss_key_, sizeof(oss_key_)))) {
OB_LOG(WARN, "failed to set oss_key_", K(ret), K(token)); OB_LOG(WARN, "failed to set oss_key_", K(ret), K(token));
} }
} else if (0 == strncmp(DELETE_MODE, token, strlen(DELETE_MODE))) {
if (OB_FAIL(set_delete_mode(token + strlen(DELETE_MODE)))) {
OB_LOG(WARN, "failed to set delete mode", K(ret), K(token));
}
} else { } else {
OB_LOG(DEBUG, "unkown oss info", K(*token), K(storage_info)); OB_LOG(DEBUG, "unkown oss info", K(*token), K(storage_info));
} }
...@@ -1224,6 +1251,66 @@ int ObStorageOssUtil::mkdir(const common::ObString& uri, const common::ObString& ...@@ -1224,6 +1251,66 @@ int ObStorageOssUtil::mkdir(const common::ObString& uri, const common::ObString&
return ret; return ret;
} }
int ObStorageOssUtil::delete_object(
const common::ObString &uri,
const common::ObString &bucket_str,
const common::ObString &object_str)
{
int ret = OB_SUCCESS;
aos_string_t bucket;
aos_string_t object;
aos_str_set(&bucket, bucket_str.ptr());
aos_str_set(&object, object_str.ptr());
aos_table_t *resp_headers = NULL;
aos_status_t *aos_ret = NULL;
if (OB_ISNULL(aos_ret = oss_delete_object(oss_option_, &bucket, &object, &resp_headers))
|| !aos_status_is_ok(aos_ret)) {
ret = OB_OSS_ERROR;
OB_LOG(WARN, "delete object fail", K(ret), K(uri));
print_oss_info(aos_ret);
} else {
OB_LOG(INFO, "delete object succ", K(uri));
}
return ret;
}
int ObStorageOssUtil::tagging_object(
const common::ObString &uri,
const common::ObString &bucket_str,
const common::ObString &object_str)
{
int ret = OB_SUCCESS;
/*set object tagging*/
aos_string_t bucket;
aos_string_t object;
aos_table_t *head_resp_headers = NULL;
oss_tag_content_t *tag_content = NULL;
aos_list_t tag_list;
aos_status_t *aos_ret = NULL;
aos_str_set(&bucket, bucket_str.ptr());
aos_str_set(&object, object_str.ptr());
aos_list_init(&tag_list);
if (OB_ISNULL(tag_content = oss_create_tag_content(aos_pool_))) {
ret = OB_OSS_ERROR;
OB_LOG(WARN, "tag content is null", K(ret), K(uri));
} else {
aos_str_set(&tag_content->key, "delete_mode");
aos_str_set(&tag_content->value, "tagging");
aos_list_add_tail(&tag_content->node, &tag_list);
if (OB_ISNULL(aos_ret = oss_put_object_tagging(oss_option_, &bucket, &object, &tag_list, &head_resp_headers))
|| !aos_status_is_ok(aos_ret)) {
ret = OB_OSS_ERROR;
OB_LOG(WARN, "set object tag fail", K(ret), K(uri));
print_oss_info(aos_ret);
} else {
OB_LOG(INFO, "set object tag succ", K(uri));
}
}
return ret;
}
int ObStorageOssUtil::del_file(const common::ObString& uri, const common::ObString& storage_info) int ObStorageOssUtil::del_file(const common::ObString& uri, const common::ObString& storage_info)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
...@@ -1238,22 +1325,17 @@ int ObStorageOssUtil::del_file(const common::ObString& uri, const common::ObStri ...@@ -1238,22 +1325,17 @@ int ObStorageOssUtil::del_file(const common::ObString& uri, const common::ObStri
OB_LOG(WARN, "failed to init storage_info", K(ret), K(storage_info)); OB_LOG(WARN, "failed to init storage_info", K(ret), K(storage_info));
} else if (OB_FAIL(get_bucket_object_name(uri, bucket_str, object_str, allocator))) { } else if (OB_FAIL(get_bucket_object_name(uri, bucket_str, object_str, allocator))) {
OB_LOG(WARN, "bucket or object name is empty", K(ret), K(uri), K(bucket_str), K(object_str)); OB_LOG(WARN, "bucket or object name is empty", K(ret), K(uri), K(bucket_str), K(object_str));
} else { } else if (ObIStorageUtil::DELETE == get_delete_mode()) {
aos_string_t bucket; if (OB_FAIL(delete_object(uri, bucket_str, object_str))) {
aos_string_t object; OB_LOG(WARN, "failed to delete object", K(ret), K(uri));
aos_str_set(&bucket, bucket_str.ptr()); }
aos_str_set(&object, object_str.ptr()); } else if (ObIStorageUtil::TAGGING == get_delete_mode()) {
aos_table_t* resp_headers = NULL; if (OB_FAIL(tagging_object(uri, bucket_str, object_str))) {
aos_status_t* aos_ret = NULL; OB_LOG(WARN, "failed to tagging file", K(ret), K(uri));
if (OB_ISNULL(aos_ret = oss_delete_object(oss_option_, &bucket, &object, &resp_headers)) ||
!aos_status_is_ok(aos_ret)) {
ret = OB_OSS_ERROR;
OB_LOG(WARN, "delete object fail", K(ret), K(uri));
print_oss_info(aos_ret);
} else {
OB_LOG(INFO, "delete object succ", K(uri));
} }
} else {
ret = OB_INVALID_ARGUMENT;
OB_LOG(WARN, "delete mode invalid", K(ret), K(uri));
} }
// Finally reset to ensure that the interface can be called repeatedly // Finally reset to ensure that the interface can be called repeatedly
reset(); reset();
......
...@@ -96,6 +96,7 @@ public: ...@@ -96,6 +96,7 @@ public:
char*& remote_md5, int64_t& file_length); char*& remote_md5, int64_t& file_length);
int get_oss_file_length(const common::ObString& name, int64_t& file_length); int get_oss_file_length(const common::ObString& name, int64_t& file_length);
void print_oss_info(aos_status_s* aos_ret); void print_oss_info(aos_status_s* aos_ret);
int64_t get_delete_mode() {return delete_mode_;}
protected: protected:
aos_pool_t* aos_pool_; aos_pool_t* aos_pool_;
...@@ -104,6 +105,7 @@ protected: ...@@ -104,6 +105,7 @@ protected:
private: private:
int parse_oss_arg(const common::ObString& uri); int parse_oss_arg(const common::ObString& uri);
static int set_oss_field(const char* info, char* field, const int64_t length); static int set_oss_field(const char* info, char* field, const int64_t length);
int set_delete_mode(const char *parameter);
private: private:
bool is_inited_; bool is_inited_;
...@@ -111,6 +113,7 @@ private: ...@@ -111,6 +113,7 @@ private:
char oss_endpoint_[MAX_OSS_ENDPOINT_LENGTH]; char oss_endpoint_[MAX_OSS_ENDPOINT_LENGTH];
char oss_id_[MAX_OSS_ID_LENGTH]; char oss_id_[MAX_OSS_ID_LENGTH];
char oss_key_[MAX_OSS_KEY_LENGTH]; char oss_key_[MAX_OSS_KEY_LENGTH];
int64_t delete_mode_;
int init_oss_endpoint(); int init_oss_endpoint();
DISALLOW_COPY_AND_ASSIGN(ObStorageOssBase); DISALLOW_COPY_AND_ASSIGN(ObStorageOssBase);
}; };
...@@ -209,6 +212,14 @@ public: ...@@ -209,6 +212,14 @@ public:
private: private:
int strtotime(const char* date_time, int64_t& time); int strtotime(const char* date_time, int64_t& time);
int tagging_object(
const common::ObString &uri,
const common::ObString &bucket_str,
const common::ObString &object_str);
int delete_object(
const common::ObString &uri,
const common::ObString &bucket_str,
const common::ObString &object_str);
}; };
class ObStorageOssAppendWriter : public ObStorageOssBase, public ObIStorageWriter { class ObStorageOssAppendWriter : public ObStorageOssBase, public ObIStorageWriter {
......
...@@ -1514,6 +1514,14 @@ int check_backup_dest(const ObString& backup_dest) ...@@ -1514,6 +1514,14 @@ int check_backup_dest(const ObString& backup_dest)
LOG_WARN("failed to print backup dest buf", K(ret), K(backup_dest)); LOG_WARN("failed to print backup dest buf", K(ret), K(backup_dest));
} else if (OB_FAIL(dest.set(backup_dest_buf))) { } else if (OB_FAIL(dest.set(backup_dest_buf))) {
LOG_WARN("failed to set dest", K(ret), K(backup_dest_buf)); LOG_WARN("failed to set dest", K(ret), K(backup_dest_buf));
} else if (dest.is_nfs_storage() && 0 != strlen(dest.storage_info_)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("backup device is nfs, storage_info should be empty", K(ret), K_(dest.storage_info));
LOG_USER_ERROR(OB_NOT_SUPPORTED, "backup device is nfs, additional parameters are");
} else if (dest.is_nfs_storage() && strlen(dest.root_path_) != strlen(backup_dest_buf)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("backup device is nfs, backup dest should not set '?'", K(ret), K_(dest.storage_info));
LOG_USER_ERROR(OB_NOT_SUPPORTED, "backup device is nfs, setting '?' is");
} else if (OB_FAIL(cluster_dest.set(dest, OB_START_INCARNATION))) { } else if (OB_FAIL(cluster_dest.set(dest, OB_START_INCARNATION))) {
LOG_WARN("Failed to set cluster dest", K(ret), K(dest)); LOG_WARN("Failed to set cluster dest", K(ret), K(dest));
} else if (OB_FAIL(backup_info_mgr.get_last_extern_log_archive_backup_info( } else if (OB_FAIL(backup_info_mgr.get_last_extern_log_archive_backup_info(
...@@ -1698,6 +1706,14 @@ int check_backup_backup_dest(const ObString& backup_backup_dest) ...@@ -1698,6 +1706,14 @@ int check_backup_backup_dest(const ObString& backup_backup_dest)
LOG_WARN("failed to set dest", K(ret), K(backup_dest_buf)); LOG_WARN("failed to set dest", K(ret), K(backup_dest_buf));
} else if (OB_FAIL(dst_dest.set(backup_backup_dest_buf))) { } else if (OB_FAIL(dst_dest.set(backup_backup_dest_buf))) {
LOG_WARN("failed to set dest", K(ret), K(backup_backup_dest_buf)); LOG_WARN("failed to set dest", K(ret), K(backup_backup_dest_buf));
} else if (dst_dest.is_nfs_storage() && 0 != strlen(dst_dest.storage_info_)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("backup backup device is nfs, storage_info should be empty", K(ret), K_(dst_dest.storage_info));
LOG_USER_ERROR(OB_NOT_SUPPORTED, "backup backup device is nfs, additional parameters are");
} else if (dst_dest.is_nfs_storage() && strlen(dst_dest.root_path_) != strlen(backup_backup_dest_buf)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("backup backup device is nfs, backup backup dest should not set '?'", K(ret), K_(dst_dest.storage_info));
LOG_USER_ERROR(OB_NOT_SUPPORTED, "backup backup device is nfs, setting '?' is");
} else if (dst_dest.is_cos_storage()) { } else if (dst_dest.is_cos_storage()) {
ret = OB_NOT_SUPPORTED; ret = OB_NOT_SUPPORTED;
LOG_WARN("backup backup do not support cos storage", KR(ret)); LOG_WARN("backup backup do not support cos storage", KR(ret));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册