提交 5ce82b9c 编写于 作者: C Cleber Rosa

avocado.utils.datadrainer: introduce file descriptor drainer

This drainer (partial) implementation reads data from a file
descriptor.  Users should build on top of this class to come up with a
full implementation (as exemplified in the accompanying tests).
Signed-off-by: NCleber Rosa <crosa@redhat.com>
上级 7be2c078
......@@ -23,6 +23,9 @@ of the user.
"""
import abc
import io
import os
import select
import threading
......@@ -105,3 +108,38 @@ class BaseDrainer(metaclass=abc.ABCMeta):
Waits on the thread completion
"""
self._thread.join()
class FDDrainer(BaseDrainer):
"""
Drainer whose source is a file descriptor
This drainer uses select to efficiently wait for data to be available on
a file descriptor. If the file descriptor is closed, the drainer responds
by shuting itself down.
This drainer doesn't provide a write() implementation, and is
consequently not a complete implementation users can pick and use.
"""
name = 'avocado.utils.datadrainer.FDDrainer'
def data_available(self):
try:
return select.select([self._source], [], [], 1)[0]
except OSError as exc:
if exc.errno == 9:
return False
def read(self):
data = b''
try:
data = os.read(self._source, io.DEFAULT_BUFFER_SIZE)
except OSError as exc:
if exc.errno == 9:
self._internal_quit = True
return data
def write(self, data):
# necessary to avoid pylint W0223
raise NotImplementedError
import io
import socket
import unittest
from avocado.utils import datadrainer
......@@ -34,3 +36,38 @@ class Custom(unittest.TestCase):
magic.start()
magic.wait()
self.assertEqual(Magic.magic, magic.destination)
class Socket(datadrainer.FDDrainer):
name = 'test_utils_datadrainer.Socket'
def __init__(self, source):
super(Socket, self).__init__(source)
self.data_buffer = io.BytesIO()
self._write_count = 0
def write(self, data):
self.data_buffer.write(data)
self._write_count += len(data)
if self._write_count > 2:
self._internal_quit = True
class CustomSocket(unittest.TestCase):
def setUp(self):
self.socket1, self.socket2 = socket.socketpair(socket.AF_UNIX)
def test(self):
socket_drainer = Socket(self.socket2.fileno())
socket_drainer.start()
self.socket1.send(b'1')
self.socket1.send(b'2')
self.socket1.send(b'3')
socket_drainer.wait()
self.assertEqual(socket_drainer.data_buffer.getvalue(), b'123')
def tearDown(self):
self.socket1.close()
self.socket2.close()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册