# Copyright (c) 2020 VisualDL Authors. All Rights Reserve. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ======================================================================= from visualdl.io import bfile from visualdl.utils.crc32 import masked_crc32c from visualdl.proto import record_pb2 import struct import time import queue import threading import os QUEUE_TIMEOUT = os.getenv("VDL_QUEUE_TIMEOUT") if isinstance(QUEUE_TIMEOUT, str): QUEUE_TIMEOUT = int(QUEUE_TIMEOUT) class RecordWriter(object): """Package data with crc32 or not. """ def __init__(self, writer): self._writer = writer def write(self, data): """Package and write data to disk. Args: data (string or bytes): Data to write to disk. """ header = struct.pack(' 0: data = self._queue.get(True, queue_wait_duration) else: data = self._queue.get(False) if data == self._shutdown_signal: return self._record_writer.write(data) self._has_pending_data = True except queue.Empty: pass except Exception as e: # prevent the main thread from deadlock due to writing error. if not has_unresolved_bug: print('Warning: Writing data Error, Due to unresolved Exception {}'.format(e)) print('Warning: Writing data to FileSystem failed since {}.'.format( time.strftime("%a, %d %b %Y %H:%M:%S +0000", time.gmtime()))) has_unresolved_bug = True pass finally: if data: self._queue.task_done() now = time.time() if now > self._next_flush_time: if self._has_pending_data: self._record_writer.flush() self._has_pending_data = False self._next_flush_time = now + self._flush_secs