未验证 提交 7178e69e 编写于 作者: 走神的阿圆's avatar 走神的阿圆 提交者: GitHub

Update bos (#697)

上级 4c2e1ccd
......@@ -17,6 +17,7 @@ import os
import tempfile
import hashlib
import base64
import time
try:
import hdfs
......@@ -27,6 +28,8 @@ except ImportError:
try:
from baidubce.services.bos.bos_client import BosClient
from baidubce import exception
from baidubce.bce_client_configuration import BceClientConfiguration
from baidubce.auth.bce_credentials import BceCredentials
BOS_ENABLED = True
except ImportError:
BOS_ENABLED = False
......@@ -153,7 +156,8 @@ class HDFileSystem(object):
encoding = None if binary_mode else "utf-8"
try:
with self.cli.read(hdfs_path=filename[7:], offset=offset, encoding=encoding) as reader:
with self.cli.read(hdfs_path=filename[7:], offset=offset,
encoding=encoding) as reader:
data = reader.read()
continue_from_token = {"last_offset": offset + len(data)}
return data, continue_from_token
......@@ -168,21 +172,43 @@ class HDFileSystem(object):
def walk(self, dir):
walks = self.cli.walk(hdfs_path=dir[7:])
return (['hdfs://'+root, dirs, files] for root, dirs, files in walks)
return (['hdfs://' + root, dirs, files] for root, dirs, files in walks)
class BosFileSystem(object):
def __init__(self):
from visualdl.io import bos_conf
self.bos_client = BosClient(bos_conf.config)
self.max_contents_count = 1
self.max_contents_time = 1
self.get_bos_config()
self.bos_client = BosClient(self.config)
self.file_length_map = {}
self._file_contents_to_add = b''
self._file_contents_count = 0
self._start_append_time = time.time()
def get_bos_config(self):
bos_host = os.getenv("BOS_HOST")
if not bos_host:
raise KeyError('${BOS_HOST} is not found.')
access_key_id = os.getenv("BOS_AK")
if not access_key_id:
raise KeyError('${BOS_AK} is not found.')
secret_access_key = os.getenv("BOS_SK")
if not secret_access_key:
raise KeyError('${BOS_SK} is not found.')
self.max_contents_count = int(os.getenv('BOS_CACHE_COUNT', 1))
self.max_contents_time = int(os.getenv('BOS_CACHE_TIME', 1))
self.config = BceClientConfiguration(
credentials=BceCredentials(access_key_id, secret_access_key),
endpoint=bos_host)
@staticmethod
def _get_object_info(path):
path = path[6:]
index = path.index('/')
bucket_name = path[0:index]
object_key = path[index+1:]
object_key = path[index + 1:]
return bucket_name, object_key
def exists(self, path):
......@@ -222,13 +248,33 @@ class BosFileSystem(object):
offset = 0
if continue_from is not None:
offset = continue_from.get("last_offset", 0)
data = self.bos_client.get_object_as_string(bucket_name=bucket_name,
key=object_key)
data = data[offset:]
continue_from_token = {"last_offset": len(data)}
length = int(
self.get_meta(bucket_name, object_key).metadata.content_length)
if offset < length:
data = self.bos_client.get_object_as_string(bucket_name=bucket_name,
key=object_key,
range=[offset,
length - 1])
else:
data = b''
continue_from_token = {"last_offset": length}
return data, continue_from_token
def append(self, filename, file_content, binary_mode=False):
def ready_to_append(self):
if self._file_contents_count >= self.max_contents_count or \
time.time() - self._start_append_time > self.max_contents_time:
return True
else:
return False
def append(self, filename, file_content, binary_mode=False, force=False):
self._file_contents_to_add += file_content
self._file_contents_count += 1
if not force and not self.ready_to_append():
return
file_content = self._file_contents_to_add
bucket_name, object_key = BosFileSystem._get_object_info(filename)
if not self.exists(filename):
init_data = b''
......@@ -247,6 +293,9 @@ class BosFileSystem(object):
content_md5=content_md5(file_content),
content_length=content_length,
offset=offset)
self._file_contents_to_add = b''
self._file_contents_count = 0
self._start_append_time = time.time()
def write(self, filename, file_content, binary_mode=False):
bucket_name, object_key = BosFileSystem._get_object_info(filename)
......@@ -282,8 +331,9 @@ class BosFileSystem(object):
contents_map[key] = [value]
temp_walk = []
for key, value in contents_map.items():
temp_walk.append([BosFileSystem.join('bos://' + self.bucket, key), [], value])
# print("temp_walk=", temp_walk)
temp_walk.append(
[BosFileSystem.join('bos://' + self.bucket, key), [],
value])
self.length = len(temp_walk)
self.contents = temp_walk
......@@ -471,6 +521,8 @@ class BFile(object):
self.write_temp.seek(len(chunk))
def close(self):
if isinstance(self.fs, BosFileSystem):
self.fs.append(self._filename, b'', self.binary_mode, force=True)
self.flush()
if self.write_temp is not None:
self.write_temp.close()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册