提交 04bd4c94 编写于 作者: S shenyuhan

add process lock

上级 5cc5d15c
...@@ -39,24 +39,26 @@ CACHE_TIME = 60 * 10 ...@@ -39,24 +39,26 @@ CACHE_TIME = 60 * 10
class HubServer(object): class HubServer(object):
def __init__(self, config_file_path=None): def __init__(self, config_file_path=None):
lock.write_acquire()
print("开始初始化HubServer")
if not config_file_path: if not config_file_path:
config_file_path = os.path.join(hub.CONF_HOME, 'config.json') config_file_path = os.path.join(hub.CONF_HOME, 'config.json')
if not os.path.exists(hub.CONF_HOME): if not os.path.exists(hub.CONF_HOME):
utils.mkdir(hub.CONF_HOME) utils.mkdir(hub.CONF_HOME)
if not os.path.exists(config_file_path): if not os.path.exists(config_file_path):
with open(config_file_path, 'w+') as fp: with open(config_file_path, 'w+') as fp:
lock.flock(fp, lock.LOCK_EX)
fp.write(json.dumps(default_server_config)) fp.write(json.dumps(default_server_config))
lock.flock(fp, lock.LOCK_UN)
with open(config_file_path) as fp: with open(config_file_path) as fp:
self.config = json.load(fp) self.config = json.load(fp)
fp_lock = open(config_file_path)
lock.flock(fp_lock, lock.LOCK_EX)
utils.check_url(self.config['server_url']) utils.check_url(self.config['server_url'])
self.server_url = self.config['server_url'] self.server_url = self.config['server_url']
self.request() self.request()
self._load_resource_list_file_if_valid() self._load_resource_list_file_if_valid()
lock.write_release() lock.flock(fp_lock, lock.LOCK_UN)
def get_server_url(self): def get_server_url(self):
random.seed(int(time.time())) random.seed(int(time.time()))
......
# coding:utf8 import fcntl
import threading import os
import time
from collections import deque
from itertools import islice
from sys import version_info
if version_info.major == 2:
import thread as _thread
elif version_info.major == 3:
import _thread
class Condition: class WinLock(object):
def __init__(self, lock=None): def flock(self, *args):
if lock is None: pass
lock = _thread.RLock()
self._lock = lock
self.acquire = lock.acquire
self.release = lock.release
try:
self._release_save = lock._release_save
except AttributeError:
pass
try:
self._acquire_restore = lock._acquire_restore
except AttributeError:
pass
try:
self._is_owned = lock._is_owned
except AttributeError:
pass
self._waiters = deque()
def __enter__(self):
return self._lock.__enter__()
def __exit__(self, *args):
return self._lock.__exit__(*args)
def __repr__(self):
return "<Condition(%s, %d)>" % (self._lock, len(self._waiters))
def _release_save(self):
self._lock.release()
def _acquire_restore(self, x):
self._lock.acquire()
def _is_owned(self):
if self._lock.acquire(0):
self._lock.release()
return False
else:
return True
def wait(self, timeout=None):
if not self._is_owned():
print("cannot wait on un-acquired lock")
return False
waiter = _thread.allocate_lock()
waiter.acquire()
self._waiters.append(waiter)
saved_state = self._release_save()
gotit = False
try:
if timeout is None:
waiter.acquire()
gotit = True
else:
if timeout > 0:
gotit = waiter.acquire(True, timeout)
else:
gotit = waiter.acquire(False)
return gotit
finally:
self._acquire_restore(saved_state)
if not gotit:
try:
self._waiters.remove(waiter)
except ValueError:
pass
def wait_for(self, predicate, timeout=None):
endtime = None
waittime = timeout
result = predicate()
while not result:
if waittime is not None:
if endtime is None:
endtime = time.monotonic() + waittime
else:
waittime = endtime - time.monotonic()
if waittime <= 0:
break
self.wait(waittime)
result = predicate()
return result
def notify(self, n=1):
if not self._is_owned():
print("cannot notify on un-acquired lock")
return False
all_waiters = self._waiters
waiters_to_notify = deque(islice(all_waiters, n))
if not waiters_to_notify:
return
for waiter in waiters_to_notify:
waiter.release()
try:
all_waiters.remove(waiter)
except ValueError:
pass
def notify_all(self):
self.notify(len(self._waiters))
class RWLock(object):
def __init__(self): def __init__(self):
self.lock = _thread.allocate_lock() self.LOCK_EX = ""
self.read_cond = Condition(self.lock) self.LOCK_UN = ""
self.write_cond = Condition(self.lock)
self.read_waiter = 0
self.write_waiter = 0
self.state = 0
self.owners = []
def write_acquire(self, blocking=True):
me = _thread.get_ident()
with self.lock:
while not self._write_acquire(me):
if not blocking:
return False
self.write_waiter += 1
self.write_cond.wait()
self.write_waiter -= 1
return True
def _write_acquire(self, me): class Lock(object):
# 获取写锁只有当锁没人占用,或者当前线程已经占用 def __init__(self):
if self.state == 0 or (self.state < 0 and me in self.owners): if os.name == "posix":
self.state -= 1 self.lock = fcntl
self.owners.append(me)
return True
if self.state > 0 and me in self.owners:
print('cannot recursively wrlock a rdlocked lock')
return False
return False
def read_acquire(self, blocking=True):
me = _thread.get_ident()
with self.lock:
while not self._read_acquire(me):
if not blocking:
return False
self.read_waiter += 1
self.read_cond.wait()
self.read_waiter -= 1
return True
def _read_acquire(self, me):
if self.state < 0:
# 如果锁被写锁占用
return False
if not self.write_waiter:
ok = True
else: else:
ok = me in self.owners self.lock = WinLock()
if ok:
self.state += 1
self.owners.append(me)
return True
return False
def unlock(self): def get_lock(self):
me = _thread.get_ident() return self.lock
with self.lock:
try:
self.owners.remove(me)
except ValueError:
print('cannot release un-acquired lock')
return False
if self.state > 0:
self.state -= 1
else:
self.state += 1
if not self.state:
if self.write_waiter: # 如果有写操作在等待(默认写优先)
self.write_cond.notify()
elif self.read_waiter:
self.read_cond.notify_all()
elif self.write_waiter:
self.write_cond.notify()
read_release = unlock
write_release = unlock
lock = Lock().get_lock()
lock = RWLock()
...@@ -32,6 +32,7 @@ from paddlehub.common.logger import logger ...@@ -32,6 +32,7 @@ from paddlehub.common.logger import logger
from paddlehub.common.lock import lock from paddlehub.common.lock import lock
from paddlehub.common.downloader import default_downloader from paddlehub.common.downloader import default_downloader
from paddlehub.module import module_desc_pb2 from paddlehub.module import module_desc_pb2
from paddlehub.common.dir import CONF_HOME
from paddlehub.module import check_info_pb2 from paddlehub.module import check_info_pb2
from paddlehub.module.signature import Signature, create_signature from paddlehub.module.signature import Signature, create_signature
from paddlehub.module.checker import ModuleChecker from paddlehub.module.checker import ModuleChecker
...@@ -118,8 +119,11 @@ class Module(object): ...@@ -118,8 +119,11 @@ class Module(object):
self.cache_fetch_dict = None self.cache_fetch_dict = None
self.cache_program = None self.cache_program = None
fp_lock = open(os.path.join(CONF_HOME, 'config.json'))
if name: if name:
lock.flock(fp_lock, lock.LOCK_EX)
self._init_with_name(name=name, version=version) self._init_with_name(name=name, version=version)
lock.flock(fp_lock, lock.LOCK_UN)
elif module_dir: elif module_dir:
self._init_with_module_file(module_dir=module_dir[0]) self._init_with_module_file(module_dir=module_dir[0])
elif signatures: elif signatures:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册