From f166ce822c2fb341f0cb737667176eb6070ce5d9 Mon Sep 17 00:00:00 2001 From: Cleber Rosa Date: Wed, 5 Jun 2019 21:31:02 -0400 Subject: [PATCH] avocado.utils.datadrainer: introduce buffered file descriptor drainer Signed-off-by: Cleber Rosa --- avocado/utils/datadrainer.py | 22 +++++++++++++++++++ selftests/unit/test_utils_datadrainer.py | 28 ++++++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/avocado/utils/datadrainer.py b/avocado/utils/datadrainer.py index a583bc97..ea9b00dd 100644 --- a/avocado/utils/datadrainer.py +++ b/avocado/utils/datadrainer.py @@ -143,3 +143,25 @@ class FDDrainer(BaseDrainer): def write(self, data): # necessary to avoid pylint W0223 raise NotImplementedError + + +class BufferFDDrainer(FDDrainer): + """ + Drains data from a file descriptor and stores it in an internal buffer + """ + + name = 'avocado.utils.datadrainer.BufferFDDrainer' + + def __init__(self, source, stop_check=None, name=None): + super(BufferFDDrainer, self).__init__(source, stop_check, name) + self._data = io.BytesIO() + + def write(self, data): + self._data.write(data) + + @property + def data(self): + """ + Returns the buffer data, as bytes + """ + return self._data.getvalue() diff --git a/selftests/unit/test_utils_datadrainer.py b/selftests/unit/test_utils_datadrainer.py index 9db44841..a63279fb 100644 --- a/selftests/unit/test_utils_datadrainer.py +++ b/selftests/unit/test_utils_datadrainer.py @@ -71,3 +71,31 @@ class CustomSocket(unittest.TestCase): def tearDown(self): self.socket1.close() self.socket2.close() + + +class SocketBuffer(datadrainer.BufferFDDrainer): + + name = 'test_utils_datadrainer.SocketBuffer' + + def __init__(self, source): + super(SocketBuffer, self).__init__(source) + self._stop_check = lambda: len(self.data) > 2 + + +class CustomSocketBuffer(unittest.TestCase): + + def setUp(self): + self.socket1, self.socket2 = socket.socketpair(socket.AF_UNIX) + + def test(self): + socket_drainer = SocketBuffer(self.socket2.fileno()) + socket_drainer.start() + self.socket1.send(b'1') + self.socket1.send(b'2') + self.socket1.send(b'3') + socket_drainer.wait() + self.assertEqual(socket_drainer.data, b'123') + + def tearDown(self): + self.socket1.close() + self.socket2.close() -- GitLab