diff --git a/python/pipeline/dag.py b/python/pipeline/dag.py index 3fdb9b47f2df2dd12db371793090b838a8634aa0..83c30393e73fd4164b23866db04a8fe77887f2ef 100644 --- a/python/pipeline/dag.py +++ b/python/pipeline/dag.py @@ -53,7 +53,7 @@ class DAGExecutor(object): self._id_lock = threading.Lock() self._cv = threading.Condition() - self._globel_resp_dict = {} + self._fetch_buffer = {} self._id_counter = 0 self._reset_max_id = 1000000000000000000 self._is_run = False @@ -106,15 +106,15 @@ class DAGExecutor(object): format(type(channeldata)))) with self._cv: data_id = channeldata.id - self._globel_resp_dict[data_id] = channeldata + self._fetch_buffer[data_id] = channeldata self._cv.notify_all() def _get_channeldata_from_fetch_buffer(self, data_id): resp = None with self._cv: - while data_id not in self._globel_resp_dict: + while data_id not in self._fetch_buffer: self._cv.wait() - resp = self._globel_resp_dict.pop(data_id) + resp = self._fetch_buffer.pop(data_id) self._cv.notify_all() return resp