diff --git a/requirements.txt b/requirements.txt index 588778c2b4f4ee052822d751072f8c3437169e54..0156341d41a60dfa88502a94dd111c220d8c759e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,4 +8,3 @@ Flask-Babel >= 1.0.0 six >= 1.14.0 protobuf >= 3.11.0 opencv-python -hdfs diff --git a/visualdl/io/bfile.py b/visualdl/io/bfile.py index 6545fec76141757bf9b141e1fc6328fd5d05a0ba..7e9833d0e877f4a8ffa42fb0a84a765440783be9 100644 --- a/visualdl/io/bfile.py +++ b/visualdl/io/bfile.py @@ -15,7 +15,21 @@ import os import tempfile -import hdfs +import hashlib +import base64 + +try: + import hdfs + from hdfs.util import HdfsError + HDFS_ENABLED = True +except ImportError: + HDFS_ENABLED = False +try: + from baidubce.services.bos.bos_client import BosClient + from baidubce import exception + BOS_ENABLED = True +except ImportError: + BOS_ENABLED = False # Note: Some codes here refer to TensorBoardX. # A good default block size depends on the system in question. @@ -23,6 +37,12 @@ import hdfs _DEFAULT_BLOCK_SIZE = 16 * 1024 * 1024 +def content_md5(buffer): + md5 = hashlib.md5() + md5.update(buffer) + return base64.standard_b64encode(md5.digest()) + + class FileFactory(object): def __init__(self): self._register_factories = {} @@ -33,11 +53,20 @@ class FileFactory(object): def get_filesystem(self, path): if path.startswith( 'hdfs://') and "hdfs" not in self._register_factories: + if not HDFS_ENABLED: + raise RuntimeError('Please install module named "hdfs".') try: default_file_factory.register_filesystem("hdfs", HDFileSystem()) except hdfs.util.HdfsError: raise RuntimeError( "Please initialize `~/.hdfscli.cfg` for HDFS.") + elif path.startswith( + 'bos://') and "bos" not in self._register_factories: + if not BOS_ENABLED: + raise RuntimeError( + 'Please install module named "bce-python-sdk".') + default_file_factory.register_filesystem("bos", BosFileSystem()) + prefix = "" index = path.find("://") if index >= 0: @@ -113,7 +142,9 @@ class HDFileSystem(object): @staticmethod def join(path, *paths): - return os.path.join(path, *paths) + result = os.path.join(path, *paths) + result.replace('\\', '/') + return result def read(self, filename, binary_mode=False, size=0, continue_from=None): offset = 0 @@ -121,10 +152,13 @@ class HDFileSystem(object): offset = continue_from.get("last_offset", 0) encoding = None if binary_mode else "utf-8" - with self.cli.read(hdfs_path=filename[7:], offset=offset, encoding=encoding) as reader: - data = reader.read() - continue_from_token = {"last_offset": offset + len(data)} - return data, continue_from_token + try: + with self.cli.read(hdfs_path=filename[7:], offset=offset, encoding=encoding) as reader: + data = reader.read() + continue_from_token = {"last_offset": offset + len(data)} + return data, continue_from_token + except HdfsError: + raise EOFError('No more events to read on HDFS.') def append(self, filename, file_content, binary_mode=False): self.cli.write(hdfs_path=filename[7:], data=file_content, append=True) @@ -137,6 +171,145 @@ class HDFileSystem(object): return (['hdfs://'+root, dirs, files] for root, dirs, files in walks) +class BosFileSystem(object): + def __init__(self): + from visualdl.io import bos_conf + self.bos_client = BosClient(bos_conf.config) + self.file_length_map = {} + + @staticmethod + def _get_object_info(path): + path = path[6:] + index = path.index('/') + bucket_name = path[0:index] + object_key = path[index+1:] + return bucket_name, object_key + + def exists(self, path): + bucket_name, object_key = BosFileSystem._get_object_info(path) + try: + self.bos_client.get_object_meta_data(bucket_name, object_key) + return True + except exception.BceError: + return False + + def get_meta(self, bucket_name, object_key): + return self.bos_client.get_object_meta_data(bucket_name, object_key) + + def makedirs(self, path): + if not path.endswith('/'): + path += '/' + if self.exists(path): + return + bucket_name, object_key = BosFileSystem._get_object_info(path) + if not object_key.endswith('/'): + object_key += '/' + init_data = b'' + self.bos_client.append_object(bucket_name=bucket_name, + key=object_key, + data=init_data, + content_md5=content_md5(init_data), + content_length=len(init_data)) + + @staticmethod + def join(path, *paths): + result = os.path.join(path, *paths) + result.replace('\\', '/') + return result + + def read(self, filename, binary_mode=False, size=0, continue_from=None): + bucket_name, object_key = BosFileSystem._get_object_info(filename) + offset = 0 + if continue_from is not None: + offset = continue_from.get("last_offset", 0) + data = self.bos_client.get_object_as_string(bucket_name=bucket_name, + key=object_key) + data = data[offset:] + continue_from_token = {"last_offset": len(data)} + return data, continue_from_token + + def append(self, filename, file_content, binary_mode=False): + bucket_name, object_key = BosFileSystem._get_object_info(filename) + if not self.exists(filename): + init_data = b'' + self.bos_client.append_object(bucket_name=bucket_name, + key=object_key, + data=init_data, + content_md5=content_md5(init_data), + content_length=len(init_data), + offset=0) + content_length = len(file_content) + + offset = self.get_meta(bucket_name, object_key).metadata.content_length + self.bos_client.append_object(bucket_name=bucket_name, + key=object_key, + data=file_content, + content_md5=content_md5(file_content), + content_length=content_length, + offset=offset) + + def write(self, filename, file_content, binary_mode=False): + bucket_name, object_key = BosFileSystem._get_object_info(filename) + + self.bos_client.append_object(bucket_name=bucket_name, + key=object_key, + data=file_content, + content_md5=content_md5(file_content), + content_length=len(file_content)) + + def walk(self, dir): + class WalkGenerator(): + def __init__(self, bucket_name, contents): + self.contents = None + self.length = 0 + self.bucket = bucket_name + self.handle_contents(contents) + self.count = 0 + + def handle_contents(self, contents): + contents_map = {} + for item in contents: + try: + rindex = item.rindex('/') + key = item[0:rindex] + value = item[rindex + 1:] + except ValueError: + key = '.' + value = item + if key in contents_map.keys(): + contents_map[key].append(value) + else: + contents_map[key] = [value] + temp_walk = [] + for key, value in contents_map.items(): + temp_walk.append([BosFileSystem.join('bos://' + self.bucket, key), [], value]) + # print("temp_walk=", temp_walk) + self.length = len(temp_walk) + self.contents = temp_walk + + def __iter__(self): + return self + + def __next__(self): + if self.count < self.length: + self.count += 1 + return self.contents[self.count - 1] + else: + raise StopIteration + + bucket_name, object_key = BosFileSystem._get_object_info(dir) + + if object_key in ['.', './']: + prefix = None + else: + prefix = object_key if object_key.endswith( + '/') else object_key + '/' + response = self.bos_client.list_objects(bucket_name, + prefix=prefix) + contents = [content.key for content in response.contents] + return WalkGenerator(bucket_name, contents) + + class BFile(object): def __init__(self, filename, mode): if mode not in ('r', 'rb', 'br', 'w', 'wb', 'bw'): diff --git a/visualdl/io/bos_conf.py b/visualdl/io/bos_conf.py new file mode 100644 index 0000000000000000000000000000000000000000..5be94f468aa5f7ad2d5a37c6f1bb3518dd5babc4 --- /dev/null +++ b/visualdl/io/bos_conf.py @@ -0,0 +1,29 @@ +# Copyright (c) 2020 VisualDL Authors. All Rights Reserve. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ======================================================================= + +from baidubce.bce_client_configuration import BceClientConfiguration +from baidubce.auth.bce_credentials import BceCredentials +import os +# Set Host, AccessKeyID and SecretAccessKey of BosClient + +bos_host = os.getenv("BOS_HOST") +access_key_id = os.getenv("BOS_AK") +secret_access_key = os.getenv("BOS_SK") + + +# Create BceClientConfiguration +config = BceClientConfiguration( + credentials=BceCredentials(access_key_id, secret_access_key), + endpoint=bos_host) diff --git a/visualdl/io/requirements.txt b/visualdl/io/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..6675d9f3d08591f31a857e9665b549508451c0bb --- /dev/null +++ b/visualdl/io/requirements.txt @@ -0,0 +1,2 @@ +hdfs +bce-python-sdk diff --git a/visualdl/reader/record_reader.py b/visualdl/reader/record_reader.py index 65f97484fde319cb862ae5013521e2b4bee1a4a5..557d051d166ea3de8db4eb0e5c67f2e30d26b984 100644 --- a/visualdl/reader/record_reader.py +++ b/visualdl/reader/record_reader.py @@ -14,7 +14,6 @@ # ======================================================================= from visualdl.io import bfile import struct -from hdfs.util import HdfsError class _RecordReader(object): @@ -31,10 +30,7 @@ class _RecordReader(object): def get_next(self): # Read the header self._curr_event = None - try: - header_str = self.file_handle.read(8) - except HdfsError: - raise EOFError('No more events to read on HDFS.') + header_str = self.file_handle.read(8) if len(header_str) != 8: # Hit EOF so raise and exit raise EOFError('No more events to read on LFS.') diff --git a/visualdl/server/args.py b/visualdl/server/args.py index 124dd20bb38ef710534f8cd3aa49159efc0993fb..59dac44fe54e8b34007f9971dc07ad97adad9e29 100644 --- a/visualdl/server/args.py +++ b/visualdl/server/args.py @@ -157,13 +157,6 @@ def parse_args(): default=False, help="serve api only" ) - parser.add_argument( - "-B", - "--open_browser", - action="store_true", - default=False, - help="open browser automatically" - ) args = parser.parse_args()