diff --git a/python_module/megengine/data/_queue.py b/python_module/megengine/data/_queue.py index 94898e99ccc0784ce8f93f457dfb7472f3b90e72..a9e328c65c56e4f4ba736b510176677b6c735c32 100644 --- a/python_module/megengine/data/_queue.py +++ b/python_module/megengine/data/_queue.py @@ -12,6 +12,7 @@ import queue import subprocess from multiprocessing import Queue +import pyarrow import pyarrow.plasma as plasma MGE_PLASMA_MEMORY = int(os.environ.get("MGE_PLASMA_MEMORY", 4000000000)) # 4GB @@ -22,7 +23,7 @@ MGE_PLASMA_STORE_MANAGER = None def _clear_plasma_store(): - # `_PlasmaStoreManager.__del__` will not ne called automaticly in subprocess, + # `_PlasmaStoreManager.__del__` will not be called automaticly in subprocess, # so this function should be called explicitly global MGE_PLASMA_STORE_MANAGER if MGE_PLASMA_STORE_MANAGER is not None: @@ -31,19 +32,27 @@ def _clear_plasma_store(): class _PlasmaStoreManager: + __initialized = False + def __init__(self): self.socket_name = "/tmp/mge_plasma_{}".format( binascii.hexlify(os.urandom(8)).decode() ) debug_flag = bool(os.environ.get("MGE_DATALOADER_PLASMA_DEBUG", 0)) + # NOTE: this is a hack. Directly use `plasma_store` may make subprocess + # difficult to handle the exception happened in `plasma-store-server`. + # For `plasma_store` is just a wrapper of `plasma-store-server`, which use + # `os.execv` to call the executable `plasma-store-server`. + cmd_path = os.path.join(pyarrow.__path__[0], "plasma-store-server") self.plasma_store = subprocess.Popen( - ["plasma_store", "-s", self.socket_name, "-m", str(MGE_PLASMA_MEMORY),], + [cmd_path, "-s", self.socket_name, "-m", str(MGE_PLASMA_MEMORY),], stdout=None if debug_flag else subprocess.DEVNULL, stderr=None if debug_flag else subprocess.DEVNULL, ) + self.__initialized = True def __del__(self): - if self.plasma_store and self.plasma_store.returncode is None: + if self.__initialized and self.plasma_store.returncode is None: self.plasma_store.kill() @@ -64,10 +73,15 @@ class PlasmaShmQueue: if MGE_PLASMA_STORE_MANAGER is None: try: MGE_PLASMA_STORE_MANAGER = _PlasmaStoreManager() - except FileNotFoundError as e: - raise FileNotFoundError( - "command 'plasma_store' not found in your $PATH!" - "Please make sure pyarrow installed and add into $PATH." + except Exception as e: + err_info = ( + "Please make sure pyarrow installed correctly!\n" + "You can try reinstall pyarrow and see if you can run " + "`plasma_store -s /tmp/mge_plasma_xxx -m 1000` normally." + ) + raise RuntimeError( + "Exception happened in starting plasma_store: {}\n" + "Tips: {}".format(str(e), err_info) ) self.socket_name = MGE_PLASMA_STORE_MANAGER.socket_name