service.go 3.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
package pserver

import (
	"errors"
	"fmt"
	"sync"
)

// ElementType is the type of elements of a Parameter.
type ElementType int

H
Helin Wang 已提交
12
var ErrAlreadyInitialized = errors.New("pserver already initialized")
13
var ErrUninitialized = errors.New("pserver not fully initialized")
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31

// Supported element types
const (
	Int32 ElementType = iota
	UInt32
	Int64
	UInt64
	Float32
	Float64
)

// Parameter is a piece of data to sync with the parameter server.
type Parameter struct {
	Name        string
	ElementType ElementType
	Content     []byte
}

32 33 34 35
func (p *Parameter) toString() {
	fmt.Println(p.Name, p.ElementType, p.Content)
}

36 37 38 39 40 41 42 43 44
// ParameterWithConfig contains the parameter and the configuration.
type ParameterWithConfig struct {
	Param  Parameter
	Config []byte // parameter configuration in Proto Buffer format
}

// Gradient is the gradient of the parameter.
type Gradient Parameter

H
Helin Wang 已提交
45
// Service is the RPC service for pserver.
46 47 48 49 50 51 52 53
type Service struct {
	initialized chan struct{}

	mu       sync.Mutex
	opt      *optimizer
	paramMap map[string]Parameter
}

H
Helin Wang 已提交
54
// NewService creates a new service.
55
func NewService() *Service {
56
	s := &Service{opt: newOptimizer(sgd, 0.01)}
57 58 59 60 61
	s.paramMap = make(map[string]Parameter)
	s.initialized = make(chan struct{})
	return s
}

H
Helin Wang 已提交
62
// InitParam initializes a parameter.
63 64 65
func (s *Service) InitParam(paramWithConfigs ParameterWithConfig, dummy *int) error {
	select {
	case <-s.initialized:
H
Helin Wang 已提交
66
		return ErrAlreadyInitialized
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
	default:
	}

	// TODO(helin): parse parameter config

	s.mu.Lock()
	defer s.mu.Unlock()

	// TODO(helin): check if paramWithConfigs.Param.Content is
	// properly memory aligned, if not, make copy to a memory
	// aligned region.
	s.paramMap[paramWithConfigs.Param.Name] = paramWithConfigs.Param
	return nil
}

H
Helin Wang 已提交
82 83
// FinishInitParams tells the parameter server that the parameter
// initialization has finished.
84 85 86
func (s *Service) FinishInitParams(dummy0 int, dummy1 *int) error {
	select {
	case <-s.initialized:
H
Helin Wang 已提交
87
		return ErrAlreadyInitialized
88 89 90 91 92 93 94
	default:
	}

	close(s.initialized)
	return nil
}

95
// SendGrad sends gradient to parameter servers for parameter
H
Helin Wang 已提交
96
// optimization.
97
func (s *Service) SendGrad(g Gradient, dummy *int) error {
98 99 100 101 102
	select {
	case <-s.initialized:
	default:
		return ErrUninitialized
	}
103 104

	s.mu.Lock()
H
Helin Wang 已提交
105
	defer s.mu.Unlock()
106

107 108 109
	p, ok := s.paramMap[g.Name]
	if !ok {
		return fmt.Errorf("parameter: %s does not exist", g.Name)
110 111
	}

112
	return s.opt.UpdateParameter(p, g)
113 114
}

115 116
// GetParam gets parameters from the parameter server.
func (s *Service) GetParam(name string, parameter *Parameter) error {
117 118
	<-s.initialized
	s.mu.Lock()
H
Helin Wang 已提交
119
	defer s.mu.Unlock()
120

121 122 123
	p, ok := s.paramMap[name]
	if !ok {
		return fmt.Errorf("parameter: %s does not exist", name)
124 125
	}

126 127 128 129 130 131 132 133
	// The parameter content (a byte slice) may change
	// during RPC serialization due to write from other
	// goroutine, we allow it since mini-batch based deep
	// learning optimization methods are stochastic in
	// nature. This race condition is allowed deliberately
	// to save the program from making a copy of the
	// paramter content.
	*parameter = p
134 135 136
	return nil
}

H
Helin Wang 已提交
137 138
// Save tells the parameter server to save parameters.
func (s *Service) Save(path string, dummy *int) error {
139 140 141 142 143
	<-s.initialized

	// TODO
	return nil
}