From 2c5edb4f58a4102fdeebbb29a2e4fc6a64b67da6 Mon Sep 17 00:00:00 2001 From: Yulong Ao Date: Tue, 15 Mar 2022 16:25:36 +0800 Subject: [PATCH] [Auto Parallel] Add the recorder and trial class for the tuner (#40555) Add the recorder --- .../auto_parallel/tuner/recorder.py | 214 ++++++++++++++++++ .../distributed/auto_parallel/tuner/trial.py | 114 ++++++++++ .../unittests/auto_parallel/CMakeLists.txt | 5 + .../unittests/auto_parallel/test_recorder.py | 152 +++++++++++++ .../unittests/auto_parallel/test_trial.py | 53 +++++ 5 files changed, 538 insertions(+) create mode 100644 python/paddle/distributed/auto_parallel/tuner/recorder.py create mode 100644 python/paddle/distributed/auto_parallel/tuner/trial.py create mode 100644 python/paddle/fluid/tests/unittests/auto_parallel/test_recorder.py create mode 100644 python/paddle/fluid/tests/unittests/auto_parallel/test_trial.py diff --git a/python/paddle/distributed/auto_parallel/tuner/recorder.py b/python/paddle/distributed/auto_parallel/tuner/recorder.py new file mode 100644 index 00000000000..140336566a1 --- /dev/null +++ b/python/paddle/distributed/auto_parallel/tuner/recorder.py @@ -0,0 +1,214 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np + + +class MetricRecord(object): + """ + One record for a single metric at a given execution step. + """ + + def __init__(self, value, step): + self._value = value + self._step = step + + @property + def value(self): + return self._value + + @value.setter + def value(self, value): + self._value = value + + @property + def step(self): + return self._step + + @step.setter + def step(self, step): + self._step = step + + def mean(self): + return np.mean(self.value) + + def get_state(self): + return {"value": self.value, "step": self.step} + + @classmethod + def from_state(cls, state): + return cls(**state) + + def __eq__(self, other): + if not isinstance(other, MetricRecord): + return False + return other.value == self.value and other.step == self.step + + def __repr__(self): + return "MetricRecord(value={}, step={})".format(self.value, self.step) + + +class MetricRecords(object): + """ + Records of a single metric across different executions. + """ + + def __init__(self, direction="min"): + if direction not in {"min", "max"}: + raise ValueError( + "direction should be one of {min, max}, but got: {}.".format( + direction)) + self._direction = direction + self._records = {} + + @property + def records(self): + return sorted(self._records.values(), key=lambda r: r.step) + + @records.setter + def records(self, records): + for r in records: + self.update(r.value, step=r.step) + + @property + def direction(self): + return self._direction + + @direction.setter + def direction(self, direction): + self._direction = direction + + def update(self, value, step=0): + if step in self._records: + self._records[step].set_value(value) + else: + self._records[step] = MetricRecord(value, step=step) + + def get_best_value(self): + values = list(r.mean() for r in self._records.values()) + if not values: + return None + if self._direction == "min": + return np.nanmin(values) + return np.nanmax(values) + + def get_best_step(self): + best_value = self.get_best_value() + if best_value is None: + return None + for r in self._records.values(): + if r.mean() == best_value: + return r.step + + def get_statistics(self): + records = self.records + records_values = [r.mean() for r in records] + if not len(records_values): + return {} + return { + "min": float(np.nanmin(records_values)), + "max": float(np.nanmax(records_values)), + "mean": float(np.nanmean(records_values)), + "median": float(np.nanmedian(records_values)), + "var": float(np.nanvar(records_values)), + "std": float(np.nanstd(records_values)), + } + + def get_state(self): + state = {} + state["direction"] = self._direction + state["records"] = [r.get_state() for r in self.records] + return state + + @classmethod + def from_state(cls, state): + records = cls(state["direction"]) + records.records = [MetricRecord.from_state(r) for r in state["records"]] + print("here 1", records.records) + return records + + +class MetricsRecorder(object): + """ + Record the values for all metrics. + """ + + def __init__(self, metrics=None): + self._records = {} + self.register_metrics(metrics) + + @property + def records(self): + return self._records + + def exists(self, name): + return name in self._records + + def register_metrics(self, metrics=None): + metrics = metrics or [] + for metric in metrics: + self.register(metric.name) + + def register(self, name, direction=None): + if self.exists(name): + raise ValueError("Metric {} have been registered.".format(name)) + if direction is None: + direction = "min" + self._records[name] = MetricRecords(direction) + + def update(self, name, value, step=0): + value = float(value) + if not self.exists(name): + self.register(name) + + prev_best = self._records[name].get_best_value() + self._records[name].update(value, step=step) + new_best = self._records[name].get_best_value() + + improved = new_best != prev_best + return improved + + def get_records(self, name): + return self._records[name].records + + def set_records(self, name, records): + if not self.exists(name): + self.register(name) + self._records[name].records = records + + def get_best_value(self, name): + return self._records[name].get_best_value() + + def get_best_step(self, name): + return self._records[name].get_best_step() + + def get_statistics(self, name): + return self._records[name].get_statistics() + + def get_state(self): + return { + "metrics": { + name: metric_records.get_state() + for name, metric_records in self._records.items() + } + } + + @classmethod + def from_state(cls, state): + recorder = cls() + recorder._records = { + name: MetricRecords.from_state(metric_records) + for name, metric_records in state["metrics"].items() + } + return recorder diff --git a/python/paddle/distributed/auto_parallel/tuner/trial.py b/python/paddle/distributed/auto_parallel/tuner/trial.py new file mode 100644 index 00000000000..22a6638c5ca --- /dev/null +++ b/python/paddle/distributed/auto_parallel/tuner/trial.py @@ -0,0 +1,114 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import hashlib +import random +import time +from enum import Enum + +from .storable import Storable +from .recorder import MetricsRecorder +from .tunable_space import TunableSpace + + +class TrialStatus: + RUNNING = "RUNNING" + COMPLETED = "COMPLETED" + STOPPED = "STOPPED" + INVALID = "INVALID" + + +class Trial(Storable): + def __init__(self, tunable_space, trial_id=None, + status=TrialStatus.RUNNING): + self._id = _generate_trial_id() if trial_id is None else trial_id + self._space = tunable_space + self._recorder = MetricsRecorder() + self._score = None + self._best_step = None + self._status = status + + @property + def id(self): + return self._id + + @property + def space(self): + return self._space + + @property + def recorder(self): + return self._recorder + + @property + def score(self): + return self._score + + @score.setter + def score(self, score): + self._score = score + + @property + def best_step(self): + return self._best_step + + @best_step.setter + def best_step(self, best_step): + self._best_step = best_step + + @property + def status(self): + return self._status + + @status.setter + def status(self, status): + self._status = status + + def summary(self): + print("Tunable space:") + if self.space.values: + for tv, value in self.space.values.items(): + print(tv + ":", value) + + if self.score is not None: + print("Score: {}".format(self.score)) + + def get_state(self): + return { + "id": self.id, + "space": self.space.get_state(), + "recorder": self.recorder.get_state(), + "score": self.score, + "best_step": self.best_step, + "status": self.status, + } + + def set_state(self, state): + self._id = state["id"] + self._space = TunableSpace.from_state(state["space"]) + self._recorder = MetricsRecorder.from_state(state["recorder"]) + self._score = state["score"] + self._best_step = state["best_step"] + self._status = state["status"] + + @classmethod + def from_state(cls, state): + trial = cls(tunable_space=None) + trial.set_state(state) + return trial + + +def _generate_trial_id(): + s = str(time.time()) + str(random.randint(1, int(1e7))) + return hashlib.sha256(s.encode("utf-8")).hexdigest()[:32] diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt b/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt index 1f7ae53acdf..4a2fba70de4 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/auto_parallel/CMakeLists.txt @@ -11,4 +11,9 @@ if(WITH_DISTRIBUTE AND WITH_GPU) set_tests_properties(test_engine_api PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 80) py_test_modules(test_converter MODULES test_converter ENVS ${dist_ENVS}) set_tests_properties(test_converter PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 50) + + py_test_modules(test_tunable_variable MODULES test_tunable_variable ENVS ${dist_ENVS}) + py_test_modules(test_tunable_space MODULES test_tunable_space ENVS ${dist_ENVS}) + py_test_modules(test_recorder MODULES test_recorder ENVS ${dist_ENVS}) + py_test_modules(test_trial MODULES test_trial ENVS ${dist_ENVS}) endif() diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_recorder.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_recorder.py new file mode 100644 index 00000000000..ab704a6a257 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_recorder.py @@ -0,0 +1,152 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import numpy as np + +from paddle.distributed.auto_parallel.tuner import recorder as rd + + +class TestRecorder(unittest.TestCase): + def test_register(self): + recorder = rd.MetricsRecorder() + recorder.register("metric") + self.assertEqual(set(recorder.records.keys()), {"metric"}) + self.assertEqual(recorder.records["metric"].direction, "min") + + def test_exists(self): + recorder = rd.MetricsRecorder() + recorder.register("metric", direction="max") + self.assertTrue(recorder.exists("metric")) + + def test_update(self): + recorder = rd.MetricsRecorder() + recorder.update("metric", 4, 1000) + self.assertEqual(recorder.records["metric"].direction, "min") + self.assertEqual( + recorder.get_records("metric"), [rd.MetricRecord(4, 1000)]) + + def test_get_records(self): + recorder = rd.MetricsRecorder() + recorder.update("metric", 1, step=0) + recorder.update("metric", 2, step=1) + recorder.update("metric", 3, step=2) + recorder.update("metric", 4, step=3) + self.assertEqual( + recorder.get_records("metric"), [ + rd.MetricRecord(1, 0), + rd.MetricRecord(2, 1), + rd.MetricRecord(3, 2), + rd.MetricRecord(4, 3), + ]) + + def test_set_records(self): + recorder = rd.MetricsRecorder() + recorder.set_records( + "metric", + [ + rd.MetricRecord(1, 0), + rd.MetricRecord(2, 1), + rd.MetricRecord(3, 2), + rd.MetricRecord(4, 3), + ], ) + self.assertEqual( + recorder.get_records("metric"), [ + rd.MetricRecord(1, 0), + rd.MetricRecord(2, 1), + rd.MetricRecord(3, 2), + rd.MetricRecord(4, 3), + ]) + + def test_get_best_value(self): + recorder = rd.MetricsRecorder() + recorder.register("metric_min", "min") + recorder.register("metric_max", "max") + + recorder.set_records( + "metric_min", + [ + rd.MetricRecord(1, 0), + rd.MetricRecord(2, 1), + rd.MetricRecord(3, 2), + rd.MetricRecord(4, 3), + ], ) + self.assertEqual(recorder.get_best_value("metric_min"), 1) + + recorder.set_records( + "metric_max", + [ + rd.MetricRecord(1, 0), + rd.MetricRecord(2, 1), + rd.MetricRecord(3, 2), + rd.MetricRecord(4, 3), + ], ) + self.assertEqual(recorder.get_best_value("metric_max"), 4) + + def test_get_best_step(self): + recorder = rd.MetricsRecorder() + + recorder.register("metric_min", "min") + recorder.set_records( + "metric_min", + [ + rd.MetricRecord(1, 0), + rd.MetricRecord(2, 1), + rd.MetricRecord(3, 2), + rd.MetricRecord(4, 3), + ], ) + self.assertEqual(recorder.get_best_step("metric_min"), 0) + + recorder.register("metric_max", "max") + recorder.set_records( + "metric_max", + [ + rd.MetricRecord(1, 0), + rd.MetricRecord(2, 1), + rd.MetricRecord(3, 2), + rd.MetricRecord(4, 3), + ], ) + self.assertEqual(recorder.get_best_step("metric_max"), 3) + + def test_get_statistics(self): + recorder = rd.MetricsRecorder() + records = [rd.MetricRecord(np.random.random(), i) for i in range(14)] + recorder.set_records("metric", records) + stats = recorder.get_statistics("metric") + records = [r.value for r in records] + self.assertEqual(stats["min"], np.min(records)) + self.assertEqual(stats["max"], np.max(records)) + self.assertEqual(stats["mean"], np.mean(records)) + self.assertEqual(stats["median"], np.median(records)) + self.assertEqual(stats["var"], np.var(records)) + self.assertEqual(stats["std"], np.std(records)) + + def test_serialization(self): + recorder = rd.MetricsRecorder() + recorder.register("metric") + recorder.set_records( + "metric", + [ + rd.MetricRecord(1, 0), + rd.MetricRecord(2, 1), + rd.MetricRecord(3, 2), + rd.MetricRecord(4, 3), + ], ) + print(recorder.get_state()) + new_recorder = rd.MetricsRecorder.from_state(recorder.get_state()) + self.assertEqual(new_recorder.records.keys(), recorder.records.keys()) + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_trial.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_trial.py new file mode 100644 index 00000000000..fc52d1c394e --- /dev/null +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_trial.py @@ -0,0 +1,53 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from paddle.distributed.auto_parallel.tuner import tunable_space as ts +from paddle.distributed.auto_parallel.tuner import trial as tr + + +class TestTiral(unittest.TestCase): + def test_trial(self): + space = ts.TunableSpace() + space.choice("choice", [0, 1, 2, 3], default=2) + trial = tr.Trial(space, trial_id="trial-1") + trial.recorder.register("latency", direction="min") + trial.recorder.update("latency", 0.1, step=0) + trial.recorder.update("latency", 0.2, step=1) + trial.best_step = 0 + + self.assertEqual(trial.id, "trial-1") + self.assertEqual(trial.space.get_value("choice"), 2) + self.assertEqual(trial.best_step, 0) + self.assertEqual(trial.status, "RUNNING") + + def test_serialization(self): + space = ts.TunableSpace() + space.int_range("int_range", start=1, stop=4, default=2) + trial = tr.Trial(space, trial_id="trial-2", status="COMPLETED") + trial.recorder.register("latency", direction="min") + trial.recorder.update("latency", 0.1, step=0) + trial.recorder.update("latency", 0.2, step=1) + trial.best_step = 0 + + new_trial = tr.Trial.from_state(trial.get_state()) + self.assertEqual(new_trial.id, "trial-2") + self.assertEqual(new_trial.space.get_value("int_range"), 2) + self.assertEqual(new_trial.best_step, 0) + self.assertEqual(new_trial.status, "COMPLETED") + + +if __name__ == "__main__": + unittest.main() -- GitLab