diff --git a/contrib/data_safety_training/image_classification/README.md b/contrib/data_safety_training/image_classification/README.md new file mode 100644 index 0000000000000000000000000000000000000000..539be53ac907e94f9e4f9f451cc971590e414fa4 --- /dev/null +++ b/contrib/data_safety_training/image_classification/README.md @@ -0,0 +1,30 @@ +##Training process + + + + +- User send a request to server including task type, image size, etc. + +- Server response a path for User where it can download the Encoder. + +- User encode the images and send the processed data to DB after serialization. + +- Server fetch the uploaded data and start training model. + + +##Start the service on Server side + +""" + +python server/receiver.py + +""" + +##Start the request on User side + +""" +python submitter.py + +""" + + diff --git a/contrib/data_safety_training/image_classification/server/receive.py b/contrib/data_safety_training/image_classification/server/receive.py new file mode 100644 index 0000000000000000000000000000000000000000..70d19b02574e889aa66f0a98e12f061a8bf05a0f --- /dev/null +++ b/contrib/data_safety_training/image_classification/server/receive.py @@ -0,0 +1,28 @@ +import zmq +import socket +import msgpack +import os +random_port = 60001 +current_ip = socket.gethostbyname(socket.gethostname()) +print(current_ip) +os.system("python -m SimpleHTTPServer 8080 &") + +#listening for client +context = zmq.Context() +zmq_socket = context.socket(zmq.REP) +zmq_socket.bind("tcp://{}:{}".format(current_ip, random_port)) +print("binding tcp://{}:{}".format(current_ip, random_port)) + +#get mission and return the path of encoder +message = msgpack.loads(zmq_socket.recv()) +print(message["mission"]) +zmq_socket.send("user.py") + +#wait client finish encoding +while True: + message = zmq_socket.recv() + if message == 'complete': + break + +#start training +os.system("python -u server.py > server.log &") diff --git a/contrib/data_safety_training/image_classification/server/server.py b/contrib/data_safety_training/image_classification/server/server.py new file mode 100644 index 0000000000000000000000000000000000000000..39f4781d5e9b0c18a1548a0e09281f29bc187abd --- /dev/null +++ b/contrib/data_safety_training/image_classification/server/server.py @@ -0,0 +1,239 @@ +from __future__ import print_function +import os +import paddle +import paddle.fluid as fluid +import numpy +import sys +from paddle.fluid import layers +import redis +import time +from paddle.fluid.param_attr import ParamAttr +import math +import msgpack + + +def data_generater(samples,r): + # data generater + def train_data(): + for item in samples: + sample = msgpack.loads(r.get(str(item))) + conv = sample[0] + label = sample[1] + yield conv,label + return train_data + + +class ResNet(): + def __init__(self, layers=50): + self.layers = layers + + def net(self, input, class_dim=10): + layers = self.layers + supported_layers = [18, 34, 50, 101, 152] + assert layers in supported_layers, \ + "supported layers are {} but input layer is {}".format(supported_layers, layers) + + if layers == 18: + depth = [2, 2, 2, 2] + elif layers == 34 or layers == 50: + depth = [3, 4, 6, 3] + elif layers == 101: + depth = [3, 4, 23, 3] + elif layers == 152: + depth = [3, 8, 36, 3] + num_filters = [64, 128, 256, 512] + conv = input + if layers >= 50: + for block in range(len(depth)): + for i in range(depth[block]): + if layers in [101, 152] and block == 2: + if i == 0: + conv_name = "res" + str(block + 2) + "a" + else: + conv_name = "res" + str(block + 2) + "b" + str(i) + else: + conv_name = "res" + str(block + 2) + chr(97 + i) + conv = self.bottleneck_block( + input=conv, + num_filters=num_filters[block], + stride=2 if i == 0 and block != 0 else 1, + name=conv_name) + + pool = fluid.layers.pool2d( + input=conv, pool_type='avg', global_pooling=True) + stdv = 1.0 / math.sqrt(pool.shape[1] * 1.0) + out = fluid.layers.fc( + input=pool, + size=class_dim, + param_attr=fluid.param_attr.ParamAttr( + initializer=fluid.initializer.Uniform(-stdv, stdv)), + act = "softmax") + else: + for block in range(len(depth)): + for i in range(depth[block]): + conv_name = "res" + str(block + 2) + chr(97 + i) + conv = self.basic_block( + input=conv, + num_filters=num_filters[block], + stride=2 if i == 0 and block != 0 else 1, + is_first=block == i == 0, + name=conv_name) + + pool = fluid.layers.pool2d( + input=conv, pool_type='avg', global_pooling=True) + stdv = 1.0 / math.sqrt(pool.shape[1] * 1.0) + out = fluid.layers.fc( + input=pool, + size=class_dim, + param_attr=fluid.param_attr.ParamAttr( + initializer=fluid.initializer.Uniform(-stdv, stdv)), + act = "softmax") + return out + + def conv_bn_layer(self, + input, + num_filters, + filter_size, + stride=1, + groups=1, + act=None, + name=None): + conv = fluid.layers.conv2d( + input=input, + num_filters=num_filters, + filter_size=filter_size, + stride=stride, + padding=(filter_size - 1) // 2, + groups=groups, + act=None, + param_attr=ParamAttr(name=name + "_weights"), + bias_attr=False, + name=name + '.conv2d.output.1') + + if name == "conv1": + bn_name = "bn_" + name + else: + bn_name = "bn" + name[3:] + return fluid.layers.batch_norm( + input=conv, + act=act, + name=bn_name + '.output.1', + param_attr=ParamAttr(name=bn_name + '_scale'), + bias_attr=ParamAttr(bn_name + '_offset'), + moving_mean_name=bn_name + '_mean', + moving_variance_name=bn_name + '_variance', ) + + + + def shortcut(self, input, ch_out, stride, is_first, name): + ch_in = input.shape[1] + if ch_in != ch_out or stride != 1 or is_first == True: + return self.conv_bn_layer(input, ch_out, 1, stride, name=name) + else: + return input + + def bottleneck_block(self, input, num_filters, stride, name): + conv0 = self.conv_bn_layer( + input=input, + num_filters=num_filters, + filter_size=1, + act='relu', + name=name + "_branch2a") + conv1 = self.conv_bn_layer( + input=conv0, + num_filters=num_filters, + filter_size=3, + stride=stride, + act='relu', + name=name + "_branch2b") + conv2 = self.conv_bn_layer( + input=conv1, + num_filters=num_filters * 4, + filter_size=1, + act=None, + name=name + "_branch2c") + + short = self.shortcut( + input, + num_filters * 4, + stride, + is_first=False, + name=name + "_branch1") + + return fluid.layers.elementwise_add( + x=short, y=conv2, act='relu', name=name + ".add.output.5") + + def basic_block(self, input, num_filters, stride, is_first, name): + conv0 = self.conv_bn_layer( + input=input, + num_filters=num_filters, + filter_size=3, + act='relu', + stride=stride, + name=name + "_branch2a") + conv1 = self.conv_bn_layer( + input=conv0, + num_filters=num_filters, + filter_size=3, + act=None, + name=name + "_branch2b") + short = self.shortcut( + input, num_filters, stride, is_first, name=name + "_branch1") + return fluid.layers.elementwise_add(x=short, y=conv1, act='relu') + +# local redis config +redis_host = "127.0.0.1" +redis_port = 6379 +redis_password = "" +r = redis.StrictRedis(host=redis_host, port=redis_port, password=redis_password) + +# reader generation +reader = fluid.layers.py_reader(capacity=64, + shapes=[(-1,64, 8, 8), (-1,1)], + dtypes=['float32', 'int64']) + +samples = r.keys() +train_data = data_generater(samples,r) + +reader.decorate_paddle_reader(paddle.batch( + paddle.reader.shuffle( + train_data, buf_size=5000), + batch_size=64)) + +conv1,label = fluid.layers.read_file(reader) + +# train program +place = fluid.CUDAPlace(0) +model = ResNet(layers=50) +predicts = model.net(conv1,10) +cost = fluid.layers.cross_entropy(input=predicts, label=label) +accuracy = fluid.layers.accuracy(input=predicts, label=label) +loss = fluid.layers.mean(cost) + +optimizer = fluid.optimizer.Adam(learning_rate=0.001) +optimizer.minimize(loss) + +exe = fluid.Executor(place) +exe.run(fluid.default_startup_program()) + +total_time = 0 +EPOCH_NUM = 1 +step = 0 +train_start = time.time() +# start training +for pass_id in range(EPOCH_NUM): + reader.start() + try: + while True: + start_time = time.time() + loss_value,acc_value = exe.run(fetch_list=[loss.name,accuracy.name]) + step += 1 + if step % 10 == 0: + print("epoch: "+ str(pass_id)+"step: "+str(step)+"loss: "+ str(loss_value)+"acc: "+str(acc_value)) + end_time = time.time() + total_time += (end_time - start_time) + except fluid.core.EOFException: + reader.reset() +train_end = time.time() +print("total time: %d" % (train_end - train_start)) +print("computation time: %d" % total_time) diff --git a/contrib/data_safety_training/image_classification/server/user.py b/contrib/data_safety_training/image_classification/server/user.py new file mode 100644 index 0000000000000000000000000000000000000000..89668f35d4b69a4b7d561ac4b562fe4531e7b223 --- /dev/null +++ b/contrib/data_safety_training/image_classification/server/user.py @@ -0,0 +1,104 @@ +from __future__ import print_function +import os +import paddle +import paddle.fluid as fluid +import numpy +import sys +import redis +import time +from paddle.fluid import layers +from paddle.fluid.param_attr import ParamAttr +import msgpack +def conv_bn_layer(input, + num_filters, + filter_size, + stride=1, + groups=1, + act=None, + name=None): + conv = fluid.layers.conv2d( + input=input, + num_filters=num_filters, + filter_size=filter_size, + stride=stride, + padding=(filter_size - 1) // 2, + groups=groups, + act=None, + param_attr=ParamAttr(name=name + "_weights"), + bias_attr=False, + name=name + '.conv2d.output.1') + + if name == "conv1": + bn_name = "bn_" + name + else: + bn_name = "bn" + name[3:] + return fluid.layers.batch_norm( + input=conv, + act=act, + name=bn_name + '.output.1', + param_attr=ParamAttr(name=bn_name + '_scale'), + bias_attr=ParamAttr(bn_name + '_offset'), + moving_mean_name=bn_name + '_mean', + moving_variance_name=bn_name + '_variance', ) + + +def load_conf(conf_file, local_dict): + with open(conf_file) as fin: + for line in fin: + group = line.strip().split("=") + if len(group) != 2: + continue + local_dict[group[0]] = group[1] + return local_dict + +# redis DB configuration +redis_host = "127.0.0.1" +redis_port = 6379 +redis_password = "" + +start_time = time.time() +# start a redis client and empty the DB +r = redis.StrictRedis(host=redis_host, port=redis_port, password=redis_password) +r.flushall() + +# encoding program +images = fluid.layers.data(name='images', shape=[3,32,32], dtype='float32') +label = fluid.layers.data(name='label', shape=[1], dtype='int64') +place = fluid.CPUPlace() +conv1 = conv_bn_layer(input=images,num_filters=64,filter_size=7,stride=2,act='relu',name="conv1") +pool = fluid.layers.pool2d(input=conv1,pool_size=3,pool_stride=2,pool_padding=1,pool_type='max') +feeder = fluid.DataFeeder(place=place, feed_list=[images,label]) +pretrained_model = 'ResNet50_pretrained' +exe = fluid.Executor(place) +exe.run(fluid.default_startup_program()) + +# load pretrained mode and prepare datal +def if_exist(var): + return os.path.exists(os.path.join(pretrained_model, var.name)) +fluid.io.load_vars(exe, pretrained_model, main_program=fluid.default_main_program(), + predicate=if_exist) + + +train_data = paddle.dataset.cifar.train10() +step = 0 + +# start encoding and uploading +for data in train_data(): + pre_data = [] + pre_data.append(data) + res = exe.run(program=fluid.default_main_program(),feed=feeder.feed(pre_data), fetch_list=[pool.name]) + sample = [res[0][0].tolist(),data[1]] + step += 1 + file = msgpack.dumps(sample) + r.set(step,file) + if step % 100 == 0: + print(numpy.array(sample[0]).shape) + print("%dstart" % step) + +files = r.keys() +print("upload file numbers: %d" % len(files)) +end_time = time.time() +total_time = end_time - start_time +print("total time: %d"% total_time) + + diff --git a/contrib/data_safety_training/image_classification/submitter.py b/contrib/data_safety_training/image_classification/submitter.py new file mode 100644 index 0000000000000000000000000000000000000000..69bd5d8495bda836babbea6ca8dc80deb3677b3f --- /dev/null +++ b/contrib/data_safety_training/image_classification/submitter.py @@ -0,0 +1,18 @@ +import zmq +import socket +import msgpack +import os +mission_dict = {"mission": "image classification", "image_size": [3,32,32]} +#send request +context = zmq.Context() +zmq_socket = context.socket(zmq.REQ) +zmq_socket.connect("tcp://127.0.0.1:60001") +zmq_socket.send(msgpack.dumps(mission_dict)) + +#get and download encoder +file = zmq_socket.recv() +os.system("wget 127.0.0.1:8080/{}".format(file)) + +#data encoding +os.system("python -u user.py > user.log") +zmq_socket.send("complete") diff --git a/images/split_flow.png b/images/split_flow.png new file mode 100644 index 0000000000000000000000000000000000000000..7c884879b965c9952b04b16df7d0f7bfdb1cf00c Binary files /dev/null and b/images/split_flow.png differ diff --git a/paddle_fl/examples/submitter_demo/scheduler_client.py b/paddle_fl/examples/submitter_demo/scheduler_client.py index dccd1ec6c220a36f4ded3fdb3ac5c2cc1065d01b..59068a8d87b200ac6e4d375a40757c70afd5c0c7 100644 --- a/paddle_fl/examples/submitter_demo/scheduler_client.py +++ b/paddle_fl/examples/submitter_demo/scheduler_client.py @@ -122,7 +122,7 @@ ip_role = {} for i in range(len(ip_list)): if i < int(default_dict["server_nodes"]): ip_role[ip_list[i]] = 'server%d' % i - else: + else: ip_role[ip_list[i]] = 'trainer%d' % (i-int(default_dict["server_nodes"])) print(ip_role) @@ -190,4 +190,3 @@ while not all_job_sent: scheduler.init_env() print("init env done.") scheduler.start_fl_training() -