提交 2c7d8f56 编写于 作者: B barriery

add id-generator into util module

上级 3c500fa3
......@@ -29,7 +29,7 @@ from .operator import Op, RequestOp, ResponseOp, VirtualOp
from .channel import (ThreadChannel, ProcessChannel, ChannelData,
ChannelDataEcode, ChannelDataType, ChannelStopError)
from .profiler import TimeProfiler, PerformanceTracer
from .util import NameGenerator
from .util import NameGenerator, ThreadIdGenerator
from .proto import pipeline_service_pb2
_LOGGER = logging.getLogger(__name__)
......@@ -74,9 +74,9 @@ class DAGExecutor(object):
if self._tracer is not None:
self._tracer.start()
self._id_lock = threading.Lock()
self._id_counter = 0
self._reset_max_id = 1000000000000000000
self._id_generator = ThreadIdGenerator(
max_id=1000000000000000000, base_counter=0, step=1)
self._cv_pool = {}
self._cv_for_cv_pool = threading.Condition()
self._fetch_buffer = {}
......@@ -98,13 +98,7 @@ class DAGExecutor(object):
_LOGGER.info("[DAG Executor] Stop")
def _get_next_data_id(self):
data_id = None
with self._id_lock:
if self._id_counter >= self._reset_max_id:
_LOGGER.info("[DAG Executor] Reset request id")
self._id_counter -= self._reset_max_id
data_id = self._id_counter
self._id_counter += 1
data_id = self._id_generator.next()
cond_v = threading.Condition()
with self._cv_for_cv_pool:
self._cv_pool[data_id] = cond_v
......
......@@ -13,13 +13,77 @@
# limitations under the License.
import sys
import logging
import threading
import multiprocessing
_LOGGER = logging.getLogger(__name__)
class NameGenerator(object):
# use unsafe-id-generator
def __init__(self, prefix):
self._idx = -1
self._prefix = prefix
self._id_generator = UnsafeIdGenerator(1000000000000000000)
def next(self):
next_id = self._id_generator.next()
return "{}{}".format(self._prefix, next_id)
class UnsafeIdGenerator(object):
def __init__(self, max_id, base_counter=0, step=1):
self._base_counter = base_counter
self._counter = self._base_counter
self._step = step
self._max_id = max_id # for reset
def next(self):
if self._counter >= self._max_id:
self._counter = self._base_counter
_LOGGER.info("Reset Id: {}".format(self._counter))
next_id = self._counter
self._counter += self._step
return next_id
class ThreadIdGenerator(UnsafeIdGenerator):
def __init__(self, max_id, base_counter=0, step=1, lock=None):
# if you want to use your lock, you may need to use Reentrant-Lock
self._lock = lock
if self._lock is None:
self._lock = threading.Lock()
super(ThreadIdGenerator, self).__init__(max_id, base_counter, step)
def next(self):
next_id = None
with self._lock:
if self._counter >= self._max_id:
self._counter = self._base_counter
_LOGGER.info("Reset Id: {}".format(self._counter))
next_id = self._counter
self._counter += self._step
return next_id
class ProcessIdGenerator(UnsafeIdGenerator):
def __init__(self, max_id, base_counter=0, step=1, lock=None):
# if you want to use your lock, you may need to use Reentrant-Lock
self._lock = lock
if self._lock is None:
self._lock = multiprocessing.Lock()
self._base_counter = base_counter
self._counter = multiprocessing.Manager().Value('i', 0)
self._step = step
self._max_id = max_id
def next(self):
self._idx += 1
return "{}{}".format(self._prefix, self._idx)
next_id = None
with self._lock:
if self._counter.value >= self._max_id:
self._counter.value = self._base_counter
_LOGGER.info("Reset Id: {}".format(self._counter.value))
next_id = self._counter.value
self._counter.value += self._step
return next_id
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册