service.go 3.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
package pserver

import (
	"errors"
	"fmt"
	"sync"
)

// ElementType is the type of elements of a Parameter.
type ElementType int

12 13 14 15
const (
	AlreadyInitialized = "pserver already initialized"
	Uninitialized      = "pserver not fully initialized"
)
16 17 18 19 20 21 22 23 24 25 26

// Supported element types
const (
	Int32 ElementType = iota
	UInt32
	Int64
	UInt64
	Float32
	Float64
)

W
wuyi05 已提交
27 28 29
// PsDesired is etcd path for store desired pserver count
const PsDesired = "/ps_desired"

30 31 32 33
// Parameter is a piece of data to sync with the parameter server.
type Parameter struct {
	Name        string
	ElementType ElementType
D
dzhwinter 已提交
34
	Content     []byte
35 36 37 38 39 40
}

// ParameterWithConfig contains the parameter and the configuration.
type ParameterWithConfig struct {
	Param  Parameter
	Config []byte // parameter configuration in Proto Buffer format
D
dongzhihong 已提交
41
	State  []byte // parameter training state
42 43 44 45 46
}

// Gradient is the gradient of the parameter.
type Gradient Parameter

H
Helin Wang 已提交
47
// Service is the RPC service for pserver.
48 49
type Service struct {
	initialized chan struct{}
50
	idx         int
51

52
	mu     sync.Mutex
D
dzhwinter 已提交
53
	optMap map[string]*optimizer
54 55
}

W
wuyi05 已提交
56 57
// NewService creates a new service, will bypass etcd registration if no
// endpoints specified.
58 59 60 61
func NewService(idx int) (*Service, error) {
	s := &Service{
		idx: idx,
	}
D
dongzhihong 已提交
62
	s.optMap = make(map[string]*optimizer)
63
	s.initialized = make(chan struct{})
W
wuyi05 已提交
64
	return s, nil
65 66
}

H
Helin Wang 已提交
67
// InitParam initializes a parameter.
68 69 70
func (s *Service) InitParam(paramWithConfigs ParameterWithConfig, dummy *int) error {
	select {
	case <-s.initialized:
71
		return errors.New(AlreadyInitialized)
72 73 74 75 76 77 78 79 80 81 82
	default:
	}

	// TODO(helin): parse parameter config

	s.mu.Lock()
	defer s.mu.Unlock()

	// TODO(helin): check if paramWithConfigs.Param.Content is
	// properly memory aligned, if not, make copy to a memory
	// aligned region.
D
dzhwinter 已提交
83
	s.optMap[paramWithConfigs.Param.Name] = newOptimizer(paramWithConfigs)
84 85 86
	return nil
}

H
Helin Wang 已提交
87 88
// FinishInitParams tells the parameter server that the parameter
// initialization has finished.
89 90 91
func (s *Service) FinishInitParams(dummy0 int, dummy1 *int) error {
	select {
	case <-s.initialized:
92
		return errors.New(AlreadyInitialized)
93 94 95 96 97 98 99
	default:
	}

	close(s.initialized)
	return nil
}

100
// SendGrad sends gradient to parameter servers for parameter
H
Helin Wang 已提交
101
// optimization.
102
func (s *Service) SendGrad(g Gradient, dummy *int) error {
103 104 105
	select {
	case <-s.initialized:
	default:
106
		return errors.New(Uninitialized)
107
	}
108 109

	s.mu.Lock()
H
Helin Wang 已提交
110
	defer s.mu.Unlock()
111

D
dzhwinter 已提交
112 113
	o, ok := s.optMap[g.Name]
	if !ok {
D
dzhwinter 已提交
114
		return fmt.Errorf("parameter: %s does not exist", g.Name)
D
dzhwinter 已提交
115
	}
116

D
dongzhihong 已提交
117
	return o.UpdateParameter(g)
118 119
}

120 121
// GetParam gets parameters from the parameter server.
func (s *Service) GetParam(name string, parameter *Parameter) error {
122 123
	<-s.initialized
	s.mu.Lock()
H
Helin Wang 已提交
124
	defer s.mu.Unlock()
125

D
dongzhihong 已提交
126
	opt, ok := s.optMap[name]
127 128
	if !ok {
		return fmt.Errorf("parameter: %s does not exist", name)
129 130
	}

131 132 133 134 135 136 137
	// The parameter content (a byte slice) may change
	// during RPC serialization due to write from other
	// goroutine, we allow it since mini-batch based deep
	// learning optimization methods are stochastic in
	// nature. This race condition is allowed deliberately
	// to save the program from making a copy of the
	// paramter content.
D
dongzhihong 已提交
138
	parameter.Name = name
D
dongzhihong 已提交
139
	parameter.ElementType = opt.elementType
140 141
	parameter.Content = opt.GetWeights()
	return nil
142 143
}

H
Helin Wang 已提交
144 145
// Save tells the parameter server to save parameters.
func (s *Service) Save(path string, dummy *int) error {
146
	<-s.initialized
D
dongzhihong 已提交
147 148 149 150 151 152 153
	for opt, ok := range s.optMap {
		if ok != nil {
			return fmt.Errorf("parameter optimizerMap error: ", ok)
		}
		state := opt.GetStates()
		weights := opt.GetWeights()
	}
154 155
	return nil
}