未验证 提交 5a856358 编写于 作者: 走神的阿圆's avatar 走神的阿圆 提交者: GitHub

add BosFS (#679)

上级 4e7a384e
......@@ -8,4 +8,3 @@ Flask-Babel >= 1.0.0
six >= 1.14.0
protobuf >= 3.11.0
opencv-python
hdfs
......@@ -15,7 +15,21 @@
import os
import tempfile
import hdfs
import hashlib
import base64
try:
import hdfs
from hdfs.util import HdfsError
HDFS_ENABLED = True
except ImportError:
HDFS_ENABLED = False
try:
from baidubce.services.bos.bos_client import BosClient
from baidubce import exception
BOS_ENABLED = True
except ImportError:
BOS_ENABLED = False
# Note: Some codes here refer to TensorBoardX.
# A good default block size depends on the system in question.
......@@ -23,6 +37,12 @@ import hdfs
_DEFAULT_BLOCK_SIZE = 16 * 1024 * 1024
def content_md5(buffer):
md5 = hashlib.md5()
md5.update(buffer)
return base64.standard_b64encode(md5.digest())
class FileFactory(object):
def __init__(self):
self._register_factories = {}
......@@ -33,11 +53,20 @@ class FileFactory(object):
def get_filesystem(self, path):
if path.startswith(
'hdfs://') and "hdfs" not in self._register_factories:
if not HDFS_ENABLED:
raise RuntimeError('Please install module named "hdfs".')
try:
default_file_factory.register_filesystem("hdfs", HDFileSystem())
except hdfs.util.HdfsError:
raise RuntimeError(
"Please initialize `~/.hdfscli.cfg` for HDFS.")
elif path.startswith(
'bos://') and "bos" not in self._register_factories:
if not BOS_ENABLED:
raise RuntimeError(
'Please install module named "bce-python-sdk".')
default_file_factory.register_filesystem("bos", BosFileSystem())
prefix = ""
index = path.find("://")
if index >= 0:
......@@ -113,7 +142,9 @@ class HDFileSystem(object):
@staticmethod
def join(path, *paths):
return os.path.join(path, *paths)
result = os.path.join(path, *paths)
result.replace('\\', '/')
return result
def read(self, filename, binary_mode=False, size=0, continue_from=None):
offset = 0
......@@ -121,10 +152,13 @@ class HDFileSystem(object):
offset = continue_from.get("last_offset", 0)
encoding = None if binary_mode else "utf-8"
with self.cli.read(hdfs_path=filename[7:], offset=offset, encoding=encoding) as reader:
data = reader.read()
continue_from_token = {"last_offset": offset + len(data)}
return data, continue_from_token
try:
with self.cli.read(hdfs_path=filename[7:], offset=offset, encoding=encoding) as reader:
data = reader.read()
continue_from_token = {"last_offset": offset + len(data)}
return data, continue_from_token
except HdfsError:
raise EOFError('No more events to read on HDFS.')
def append(self, filename, file_content, binary_mode=False):
self.cli.write(hdfs_path=filename[7:], data=file_content, append=True)
......@@ -137,6 +171,145 @@ class HDFileSystem(object):
return (['hdfs://'+root, dirs, files] for root, dirs, files in walks)
class BosFileSystem(object):
def __init__(self):
from visualdl.io import bos_conf
self.bos_client = BosClient(bos_conf.config)
self.file_length_map = {}
@staticmethod
def _get_object_info(path):
path = path[6:]
index = path.index('/')
bucket_name = path[0:index]
object_key = path[index+1:]
return bucket_name, object_key
def exists(self, path):
bucket_name, object_key = BosFileSystem._get_object_info(path)
try:
self.bos_client.get_object_meta_data(bucket_name, object_key)
return True
except exception.BceError:
return False
def get_meta(self, bucket_name, object_key):
return self.bos_client.get_object_meta_data(bucket_name, object_key)
def makedirs(self, path):
if not path.endswith('/'):
path += '/'
if self.exists(path):
return
bucket_name, object_key = BosFileSystem._get_object_info(path)
if not object_key.endswith('/'):
object_key += '/'
init_data = b''
self.bos_client.append_object(bucket_name=bucket_name,
key=object_key,
data=init_data,
content_md5=content_md5(init_data),
content_length=len(init_data))
@staticmethod
def join(path, *paths):
result = os.path.join(path, *paths)
result.replace('\\', '/')
return result
def read(self, filename, binary_mode=False, size=0, continue_from=None):
bucket_name, object_key = BosFileSystem._get_object_info(filename)
offset = 0
if continue_from is not None:
offset = continue_from.get("last_offset", 0)
data = self.bos_client.get_object_as_string(bucket_name=bucket_name,
key=object_key)
data = data[offset:]
continue_from_token = {"last_offset": len(data)}
return data, continue_from_token
def append(self, filename, file_content, binary_mode=False):
bucket_name, object_key = BosFileSystem._get_object_info(filename)
if not self.exists(filename):
init_data = b''
self.bos_client.append_object(bucket_name=bucket_name,
key=object_key,
data=init_data,
content_md5=content_md5(init_data),
content_length=len(init_data),
offset=0)
content_length = len(file_content)
offset = self.get_meta(bucket_name, object_key).metadata.content_length
self.bos_client.append_object(bucket_name=bucket_name,
key=object_key,
data=file_content,
content_md5=content_md5(file_content),
content_length=content_length,
offset=offset)
def write(self, filename, file_content, binary_mode=False):
bucket_name, object_key = BosFileSystem._get_object_info(filename)
self.bos_client.append_object(bucket_name=bucket_name,
key=object_key,
data=file_content,
content_md5=content_md5(file_content),
content_length=len(file_content))
def walk(self, dir):
class WalkGenerator():
def __init__(self, bucket_name, contents):
self.contents = None
self.length = 0
self.bucket = bucket_name
self.handle_contents(contents)
self.count = 0
def handle_contents(self, contents):
contents_map = {}
for item in contents:
try:
rindex = item.rindex('/')
key = item[0:rindex]
value = item[rindex + 1:]
except ValueError:
key = '.'
value = item
if key in contents_map.keys():
contents_map[key].append(value)
else:
contents_map[key] = [value]
temp_walk = []
for key, value in contents_map.items():
temp_walk.append([BosFileSystem.join('bos://' + self.bucket, key), [], value])
# print("temp_walk=", temp_walk)
self.length = len(temp_walk)
self.contents = temp_walk
def __iter__(self):
return self
def __next__(self):
if self.count < self.length:
self.count += 1
return self.contents[self.count - 1]
else:
raise StopIteration
bucket_name, object_key = BosFileSystem._get_object_info(dir)
if object_key in ['.', './']:
prefix = None
else:
prefix = object_key if object_key.endswith(
'/') else object_key + '/'
response = self.bos_client.list_objects(bucket_name,
prefix=prefix)
contents = [content.key for content in response.contents]
return WalkGenerator(bucket_name, contents)
class BFile(object):
def __init__(self, filename, mode):
if mode not in ('r', 'rb', 'br', 'w', 'wb', 'bw'):
......
# Copyright (c) 2020 VisualDL Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =======================================================================
from baidubce.bce_client_configuration import BceClientConfiguration
from baidubce.auth.bce_credentials import BceCredentials
import os
# Set Host, AccessKeyID and SecretAccessKey of BosClient
bos_host = os.getenv("BOS_HOST")
access_key_id = os.getenv("BOS_AK")
secret_access_key = os.getenv("BOS_SK")
# Create BceClientConfiguration
config = BceClientConfiguration(
credentials=BceCredentials(access_key_id, secret_access_key),
endpoint=bos_host)
......@@ -14,7 +14,6 @@
# =======================================================================
from visualdl.io import bfile
import struct
from hdfs.util import HdfsError
class _RecordReader(object):
......@@ -31,10 +30,7 @@ class _RecordReader(object):
def get_next(self):
# Read the header
self._curr_event = None
try:
header_str = self.file_handle.read(8)
except HdfsError:
raise EOFError('No more events to read on HDFS.')
header_str = self.file_handle.read(8)
if len(header_str) != 8:
# Hit EOF so raise and exit
raise EOFError('No more events to read on LFS.')
......
......@@ -157,13 +157,6 @@ def parse_args():
default=False,
help="serve api only"
)
parser.add_argument(
"-B",
"--open_browser",
action="store_true",
default=False,
help="open browser automatically"
)
args = parser.parse_args()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册