未验证 提交 2c5edb4f 编写于 作者: Y Yulong Ao 提交者: GitHub

[Auto Parallel] Add the recorder and trial class for the tuner (#40555)

Add the recorder
上级 0c703fe7
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
class MetricRecord(object):
"""
One record for a single metric at a given execution step.
"""
def __init__(self, value, step):
self._value = value
self._step = step
@property
def value(self):
return self._value
@value.setter
def value(self, value):
self._value = value
@property
def step(self):
return self._step
@step.setter
def step(self, step):
self._step = step
def mean(self):
return np.mean(self.value)
def get_state(self):
return {"value": self.value, "step": self.step}
@classmethod
def from_state(cls, state):
return cls(**state)
def __eq__(self, other):
if not isinstance(other, MetricRecord):
return False
return other.value == self.value and other.step == self.step
def __repr__(self):
return "MetricRecord(value={}, step={})".format(self.value, self.step)
class MetricRecords(object):
"""
Records of a single metric across different executions.
"""
def __init__(self, direction="min"):
if direction not in {"min", "max"}:
raise ValueError(
"direction should be one of {min, max}, but got: {}.".format(
direction))
self._direction = direction
self._records = {}
@property
def records(self):
return sorted(self._records.values(), key=lambda r: r.step)
@records.setter
def records(self, records):
for r in records:
self.update(r.value, step=r.step)
@property
def direction(self):
return self._direction
@direction.setter
def direction(self, direction):
self._direction = direction
def update(self, value, step=0):
if step in self._records:
self._records[step].set_value(value)
else:
self._records[step] = MetricRecord(value, step=step)
def get_best_value(self):
values = list(r.mean() for r in self._records.values())
if not values:
return None
if self._direction == "min":
return np.nanmin(values)
return np.nanmax(values)
def get_best_step(self):
best_value = self.get_best_value()
if best_value is None:
return None
for r in self._records.values():
if r.mean() == best_value:
return r.step
def get_statistics(self):
records = self.records
records_values = [r.mean() for r in records]
if not len(records_values):
return {}
return {
"min": float(np.nanmin(records_values)),
"max": float(np.nanmax(records_values)),
"mean": float(np.nanmean(records_values)),
"median": float(np.nanmedian(records_values)),
"var": float(np.nanvar(records_values)),
"std": float(np.nanstd(records_values)),
}
def get_state(self):
state = {}
state["direction"] = self._direction
state["records"] = [r.get_state() for r in self.records]
return state
@classmethod
def from_state(cls, state):
records = cls(state["direction"])
records.records = [MetricRecord.from_state(r) for r in state["records"]]
print("here 1", records.records)
return records
class MetricsRecorder(object):
"""
Record the values for all metrics.
"""
def __init__(self, metrics=None):
self._records = {}
self.register_metrics(metrics)
@property
def records(self):
return self._records
def exists(self, name):
return name in self._records
def register_metrics(self, metrics=None):
metrics = metrics or []
for metric in metrics:
self.register(metric.name)
def register(self, name, direction=None):
if self.exists(name):
raise ValueError("Metric {} have been registered.".format(name))
if direction is None:
direction = "min"
self._records[name] = MetricRecords(direction)
def update(self, name, value, step=0):
value = float(value)
if not self.exists(name):
self.register(name)
prev_best = self._records[name].get_best_value()
self._records[name].update(value, step=step)
new_best = self._records[name].get_best_value()
improved = new_best != prev_best
return improved
def get_records(self, name):
return self._records[name].records
def set_records(self, name, records):
if not self.exists(name):
self.register(name)
self._records[name].records = records
def get_best_value(self, name):
return self._records[name].get_best_value()
def get_best_step(self, name):
return self._records[name].get_best_step()
def get_statistics(self, name):
return self._records[name].get_statistics()
def get_state(self):
return {
"metrics": {
name: metric_records.get_state()
for name, metric_records in self._records.items()
}
}
@classmethod
def from_state(cls, state):
recorder = cls()
recorder._records = {
name: MetricRecords.from_state(metric_records)
for name, metric_records in state["metrics"].items()
}
return recorder
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import random
import time
from enum import Enum
from .storable import Storable
from .recorder import MetricsRecorder
from .tunable_space import TunableSpace
class TrialStatus:
RUNNING = "RUNNING"
COMPLETED = "COMPLETED"
STOPPED = "STOPPED"
INVALID = "INVALID"
class Trial(Storable):
def __init__(self, tunable_space, trial_id=None,
status=TrialStatus.RUNNING):
self._id = _generate_trial_id() if trial_id is None else trial_id
self._space = tunable_space
self._recorder = MetricsRecorder()
self._score = None
self._best_step = None
self._status = status
@property
def id(self):
return self._id
@property
def space(self):
return self._space
@property
def recorder(self):
return self._recorder
@property
def score(self):
return self._score
@score.setter
def score(self, score):
self._score = score
@property
def best_step(self):
return self._best_step
@best_step.setter
def best_step(self, best_step):
self._best_step = best_step
@property
def status(self):
return self._status
@status.setter
def status(self, status):
self._status = status
def summary(self):
print("Tunable space:")
if self.space.values:
for tv, value in self.space.values.items():
print(tv + ":", value)
if self.score is not None:
print("Score: {}".format(self.score))
def get_state(self):
return {
"id": self.id,
"space": self.space.get_state(),
"recorder": self.recorder.get_state(),
"score": self.score,
"best_step": self.best_step,
"status": self.status,
}
def set_state(self, state):
self._id = state["id"]
self._space = TunableSpace.from_state(state["space"])
self._recorder = MetricsRecorder.from_state(state["recorder"])
self._score = state["score"]
self._best_step = state["best_step"]
self._status = state["status"]
@classmethod
def from_state(cls, state):
trial = cls(tunable_space=None)
trial.set_state(state)
return trial
def _generate_trial_id():
s = str(time.time()) + str(random.randint(1, int(1e7)))
return hashlib.sha256(s.encode("utf-8")).hexdigest()[:32]
...@@ -11,4 +11,9 @@ if(WITH_DISTRIBUTE AND WITH_GPU) ...@@ -11,4 +11,9 @@ if(WITH_DISTRIBUTE AND WITH_GPU)
set_tests_properties(test_engine_api PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 80) set_tests_properties(test_engine_api PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 80)
py_test_modules(test_converter MODULES test_converter ENVS ${dist_ENVS}) py_test_modules(test_converter MODULES test_converter ENVS ${dist_ENVS})
set_tests_properties(test_converter PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 50) set_tests_properties(test_converter PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 50)
py_test_modules(test_tunable_variable MODULES test_tunable_variable ENVS ${dist_ENVS})
py_test_modules(test_tunable_space MODULES test_tunable_space ENVS ${dist_ENVS})
py_test_modules(test_recorder MODULES test_recorder ENVS ${dist_ENVS})
py_test_modules(test_trial MODULES test_trial ENVS ${dist_ENVS})
endif() endif()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
from paddle.distributed.auto_parallel.tuner import recorder as rd
class TestRecorder(unittest.TestCase):
def test_register(self):
recorder = rd.MetricsRecorder()
recorder.register("metric")
self.assertEqual(set(recorder.records.keys()), {"metric"})
self.assertEqual(recorder.records["metric"].direction, "min")
def test_exists(self):
recorder = rd.MetricsRecorder()
recorder.register("metric", direction="max")
self.assertTrue(recorder.exists("metric"))
def test_update(self):
recorder = rd.MetricsRecorder()
recorder.update("metric", 4, 1000)
self.assertEqual(recorder.records["metric"].direction, "min")
self.assertEqual(
recorder.get_records("metric"), [rd.MetricRecord(4, 1000)])
def test_get_records(self):
recorder = rd.MetricsRecorder()
recorder.update("metric", 1, step=0)
recorder.update("metric", 2, step=1)
recorder.update("metric", 3, step=2)
recorder.update("metric", 4, step=3)
self.assertEqual(
recorder.get_records("metric"), [
rd.MetricRecord(1, 0),
rd.MetricRecord(2, 1),
rd.MetricRecord(3, 2),
rd.MetricRecord(4, 3),
])
def test_set_records(self):
recorder = rd.MetricsRecorder()
recorder.set_records(
"metric",
[
rd.MetricRecord(1, 0),
rd.MetricRecord(2, 1),
rd.MetricRecord(3, 2),
rd.MetricRecord(4, 3),
], )
self.assertEqual(
recorder.get_records("metric"), [
rd.MetricRecord(1, 0),
rd.MetricRecord(2, 1),
rd.MetricRecord(3, 2),
rd.MetricRecord(4, 3),
])
def test_get_best_value(self):
recorder = rd.MetricsRecorder()
recorder.register("metric_min", "min")
recorder.register("metric_max", "max")
recorder.set_records(
"metric_min",
[
rd.MetricRecord(1, 0),
rd.MetricRecord(2, 1),
rd.MetricRecord(3, 2),
rd.MetricRecord(4, 3),
], )
self.assertEqual(recorder.get_best_value("metric_min"), 1)
recorder.set_records(
"metric_max",
[
rd.MetricRecord(1, 0),
rd.MetricRecord(2, 1),
rd.MetricRecord(3, 2),
rd.MetricRecord(4, 3),
], )
self.assertEqual(recorder.get_best_value("metric_max"), 4)
def test_get_best_step(self):
recorder = rd.MetricsRecorder()
recorder.register("metric_min", "min")
recorder.set_records(
"metric_min",
[
rd.MetricRecord(1, 0),
rd.MetricRecord(2, 1),
rd.MetricRecord(3, 2),
rd.MetricRecord(4, 3),
], )
self.assertEqual(recorder.get_best_step("metric_min"), 0)
recorder.register("metric_max", "max")
recorder.set_records(
"metric_max",
[
rd.MetricRecord(1, 0),
rd.MetricRecord(2, 1),
rd.MetricRecord(3, 2),
rd.MetricRecord(4, 3),
], )
self.assertEqual(recorder.get_best_step("metric_max"), 3)
def test_get_statistics(self):
recorder = rd.MetricsRecorder()
records = [rd.MetricRecord(np.random.random(), i) for i in range(14)]
recorder.set_records("metric", records)
stats = recorder.get_statistics("metric")
records = [r.value for r in records]
self.assertEqual(stats["min"], np.min(records))
self.assertEqual(stats["max"], np.max(records))
self.assertEqual(stats["mean"], np.mean(records))
self.assertEqual(stats["median"], np.median(records))
self.assertEqual(stats["var"], np.var(records))
self.assertEqual(stats["std"], np.std(records))
def test_serialization(self):
recorder = rd.MetricsRecorder()
recorder.register("metric")
recorder.set_records(
"metric",
[
rd.MetricRecord(1, 0),
rd.MetricRecord(2, 1),
rd.MetricRecord(3, 2),
rd.MetricRecord(4, 3),
], )
print(recorder.get_state())
new_recorder = rd.MetricsRecorder.from_state(recorder.get_state())
self.assertEqual(new_recorder.records.keys(), recorder.records.keys())
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from paddle.distributed.auto_parallel.tuner import tunable_space as ts
from paddle.distributed.auto_parallel.tuner import trial as tr
class TestTiral(unittest.TestCase):
def test_trial(self):
space = ts.TunableSpace()
space.choice("choice", [0, 1, 2, 3], default=2)
trial = tr.Trial(space, trial_id="trial-1")
trial.recorder.register("latency", direction="min")
trial.recorder.update("latency", 0.1, step=0)
trial.recorder.update("latency", 0.2, step=1)
trial.best_step = 0
self.assertEqual(trial.id, "trial-1")
self.assertEqual(trial.space.get_value("choice"), 2)
self.assertEqual(trial.best_step, 0)
self.assertEqual(trial.status, "RUNNING")
def test_serialization(self):
space = ts.TunableSpace()
space.int_range("int_range", start=1, stop=4, default=2)
trial = tr.Trial(space, trial_id="trial-2", status="COMPLETED")
trial.recorder.register("latency", direction="min")
trial.recorder.update("latency", 0.1, step=0)
trial.recorder.update("latency", 0.2, step=1)
trial.best_step = 0
new_trial = tr.Trial.from_state(trial.get_state())
self.assertEqual(new_trial.id, "trial-2")
self.assertEqual(new_trial.space.get_value("int_range"), 2)
self.assertEqual(new_trial.best_step, 0)
self.assertEqual(new_trial.status, "COMPLETED")
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册