client.py 2.4 KB
Newer Older
1 2 3
import ctypes
import os

4 5 6 7
__lib__ = None


def get_c_lib():
Y
Yu Yang 已提交
8
    global __lib__
9 10 11 12
    if __lib__ is None:
        path = os.path.join(os.path.dirname(__file__), "libpaddle_master.so")
        __lib__ = ctypes.cdll.LoadLibrary(path)
    return __lib__
13 14 15 16 17 18 19


class client(object):
    """
    client is a client to the master server.
    """

20
    def __init__(self, etcd_endpoints, timeout_sec, buf_size=0):
21 22
        self.c = get_c_lib().paddle_new_etcd_master_client(
            etcd_endpoints, timeout_sec, buf_size)
23

24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
    def request_save_model(self, trainer_id, block_ms):
        """request to save model

        Conventionally the 0-th trainer will save model. But in
        distributed training, any trainer could be killed. This
        function asks the master server if the trainer should proceed
        with saving model.

        :param trainer_id: trainer id.
        :param block_ms: number of millisecond that other save model
        will be blocked if this save model request succeeded.

        Returns:
            int: 1 if the save the model request is approved, 0 if
            does the request is rejected because other trainer is
            saving the model, -1 if error happened.

        """
42 43
        return get_c_lib().paddle_request_save_model(self.c, trainer_id,
                                                     block_ms)
44 45

    def release(self):
46
        get_c_lib().paddle_release_master_client(self.c)
47 48 49 50 51 52 53 54
        self.c = None

    def set_dataset(self, paths):
        holder_type = ctypes.c_char_p * len(paths)
        holder = holder_type()
        for idx, path in enumerate(paths):
            c_ptr = ctypes.c_char_p(path)
            holder[idx] = c_ptr
55
        get_c_lib().paddle_set_dataset(self.c, holder, len(paths))
56 57

    def next_record(self):
58 59 60 61 62 63
        """gets next record for training

        Returns:
            string: the record.
            int: error code, 0 if successful, < 0 otherwise.
        """
64 65
        p = ctypes.c_char_p()
        ret = ctypes.pointer(p)
66
        size = get_c_lib().paddle_next_record(self.c, ret)
G
gongweibao 已提交
67
        if size < 0:
G
gongweibao 已提交
68 69 70
            # Error
            return None, size

71
        if size == 0:
H
Helin Wang 已提交
72
            # Empty record
G
gongweibao 已提交
73 74
            return "", 0

75
        record = ret.contents.value[:size]
H
Helin Wang 已提交
76
        # Memory created from C should be freed.
77
        get_c_lib().mem_free(ret.contents)
G
gongweibao 已提交
78
        return record, 0
Y
Yancey 已提交
79 80 81

    def paddle_start_get_records(self, pass_id):
        get_c_lib().paddle_start_get_records(self.c, pass_id)