提交 34c30fcf 编写于 作者: O openeuler-ci-bot 提交者: Gitee

!95 atune: split AI services and configuration services

Merge pull request !95 from hanxinke/master
......@@ -55,6 +55,7 @@ install:
install -m 750 pkg/atuned $(BINDIR)
install -m 640 misc/atuned.service $(SYSTEMDDIR)
install -m 640 misc/atuned.cnf $(DESTDIR)/etc/atuned/
install -m 640 misc/opt.service $(SYSTEMDDIR)
install -m 640 database/atuned.db $(DESTDIR)/var/lib/atuned/
install -m 640 misc/atune-adm $(DESTDIR)$(PREFIX)/share/bash-completion/completions/
\cp -rf scripts/* $(DESTDIR)$(PREFIX)/$(LIBEXEC)/atuned/scripts/
......
......@@ -34,14 +34,6 @@ LOG.setLevel(logging.ERROR)
APP = Flask(__name__)
API = Api(APP)
API.add_resource(configurator.Configurator, '/v1/setting', '/setting')
API.add_resource(monitor.Monitor, '/v1/monitor', '/monitor')
API.add_resource(optimizer.Optimizer, '/v1/optimizer', '/v1/optimizer/<string:task_id>')
API.add_resource(collector.Collector, '/v1/collector', '/v1/collector')
API.add_resource(classification.Classification, '/v1/classification', '/v1/classification')
API.add_resource(profile.Profile, '/v1/profile', '/v1/profile')
API.add_resource(train.Training, '/v1/training', '/v1/training')
def config_log(level):
"""app config log"""
......@@ -55,7 +47,7 @@ def config_log(level):
root_logger.addHandler(syslog_handler)
def main(filename):
def main(filename, port):
"""app main function"""
if not os.path.exists(filename):
return
......@@ -68,6 +60,16 @@ def main(filename):
SESSION_COOKIE_HTTPONLY=True,
SESSION_COOKIE_SAMESITE='Lax')
if port == 'rest':
API.add_resource(configurator.Configurator, '/v1/setting', '/setting')
API.add_resource(monitor.Monitor, '/v1/monitor', '/monitor')
API.add_resource(collector.Collector, '/v1/collector', '/v1/collector')
API.add_resource(profile.Profile, '/v1/profile', '/v1/profile')
API.add_resource(train.Training, '/v1/training', '/v1/training')
else:
API.add_resource(optimizer.Optimizer, '/v1/optimizer', '/v1/optimizer/<string:task_id>')
API.add_resource(classification.Classification, '/v1/classification', '/v1/classification')
if config.has_option("server", "tls") and config.get("server", "tls") == "true":
cert_file = config.get("server", "tlshttpcertfile")
key_file = config.get("server", "tlshttpkeyfile")
......@@ -76,13 +78,20 @@ def main(filename):
context.load_cert_chain(certfile=cert_file, keyfile=key_file)
context.load_verify_locations(ca_file)
context.verify_mode = ssl.CERT_REQUIRED
APP.run(host="localhost", port=config.get("server", "rest_port"),
ssl_context=context)
if port == 'rest':
APP.run(host=config.get("server", "rest_host"), port=config.get("server", "rest_port"),
ssl_context=context)
else:
APP.run(host=config.get("server", "opt_host"), port=config.get("server", "opt_port"),
ssl_context=context)
else:
APP.run(host="localhost", port=config.get("server", "rest_port"))
if port == 'rest':
APP.run(host=config.get("server", "rest_host"), port=config.get("server", "rest_port"))
else:
APP.run(host=config.get("server", "opt_host"), port=config.get("server", "opt_port"))
if __name__ == '__main__':
if len(sys.argv) != 2:
if len(sys.argv) != 3:
sys.exit(-1)
main(sys.argv[1])
main(sys.argv[1], sys.argv[2])
......@@ -32,7 +32,7 @@ LOGGER = logging.getLogger(__name__)
class Classification(Resource):
"""restful api for classification, in order to provide the method of post"""
model_path = "modelpath"
data_path = "data"
csv_data = "data"
model = "model"
@marshal_with_field(CLASSIFICATION_POST_FIELD)
......@@ -42,11 +42,13 @@ class Classification(Resource):
current_app.logger.info(args)
model_path = args.get(self.model_path)
data_path = args.get(self.data_path)
model = args.get(self.model, None)
data = utils.read_from_csv(data_path)
data = []
for each in args.get(self.csv_data):
if each is not None:
data.append(each)
if not data:
abort("data path may be not exist")
abort("data may be not exist")
classification = WorkloadCharacterization(model_path)
resource_limit = ""
......
......@@ -74,7 +74,7 @@ COLLECTOR_POST_PARSER.add_argument('pipe', required=True,
CLASSIFICATION_POST_PARSER = reqparse.RequestParser()
CLASSIFICATION_POST_PARSER.add_argument('modelpath', required=True, help="The modelfile to be used")
CLASSIFICATION_POST_PARSER.add_argument('data', help="The data path to be used")
CLASSIFICATION_POST_PARSER.add_argument('data', type=list, location='json', help="The data to be used")
CLASSIFICATION_POST_PARSER.add_argument('model', help="The model self trained to be used")
PROFILE_GET_PARSER = reqparse.RequestParser()
......
......@@ -187,7 +187,7 @@ func runatuned(ctx *cli.Context) error {
return err
}
if err := utils.WaitForPyservice(config.LocalHost, config.RestPort); err != nil {
if err := utils.WaitForPyservice(); err != nil {
log.Errorf("waiting for pyservice failed: %v", err)
return err
}
......
......@@ -63,7 +63,6 @@ const (
// python service url
const (
Protocol string = "http"
LocalHost string = "localhost"
APIVersion string = "v1"
ConfiguratorURI string = "setting"
......@@ -100,7 +99,10 @@ var (
Address string
Connect string
Port string
LocalHost string
RestPort string
OptHost string
OptPort string
TLS bool
TLSServerCertFile string
TLSServerKeyFile string
......@@ -143,7 +145,12 @@ func (c *Cfg) Load() error {
Address = section.Key("address").MustString(DefaultTgtAddr)
Connect = section.Key("connect").MustString("")
Port = section.Key("port").MustString(DefaultTgtPort)
LocalHost = section.Key("rest_host").MustString("localhost")
RestPort = section.Key("rest_port").MustString("8383")
OptHost = section.Key("opt_host").MustString("localhost")
OptPort = section.Key("opt_port").MustString("3838")
utils.RestHost = LocalHost
utils.RestPort = RestPort
TLS = section.Key("tls").MustBool(false)
if TLS {
......@@ -184,6 +191,21 @@ func GetURL(uri string) string {
if TLS {
protocol = "https"
}
if IsOptPort(uri) {
return fmt.Sprintf("%s://%s:%s/%s/%s", protocol, OptHost, OptPort, APIVersion, uri)
}
url := fmt.Sprintf("%s://%s:%s/%s/%s", protocol, LocalHost, RestPort, APIVersion, uri)
return url
}
// IsOptPort return true if using opt port and host
func IsOptPort(uri string) bool {
if strings.EqualFold(uri, OptimizerURI) {
return true
}
if strings.EqualFold(uri, ClassificationURI) {
return true
}
return false
}
......@@ -47,7 +47,7 @@ func (m *Monitor) Set(cfg *config.Cfg) {
// Run method start the monitor service
func (m *Monitor) Run() error {
if err := utils.WaitForPyservice(config.LocalHost, config.RestPort); err != nil {
if err := utils.WaitForPyservice(); err != nil {
log.Errorf("waiting for pyservice failed: %v", err)
return err
}
......
......@@ -52,6 +52,7 @@ func (p *PyEngine) Run() error {
cmdSlice = append(cmdSlice, "python3")
cmdSlice = append(cmdSlice, path.Join(config.DefaultAnalysisPath, "app.py"))
cmdSlice = append(cmdSlice, path.Join(config.DefaultConfPath, "atuned.cnf"))
cmdSlice = append(cmdSlice, "rest")
cmdStr := strings.Join(cmdSlice, " ")
cmd := exec.Command("sh", "-c", cmdStr)
......
......@@ -53,7 +53,7 @@ func (t *Timer) Set(cfg *config.Cfg) {
//Run method start the ticker, auto-tuning the system
func (t *Timer) Run() error {
/* Static & Dynamic judge */
if err := utils.WaitForPyservice(config.LocalHost, config.RestPort); err != nil {
if err := utils.WaitForPyservice(); err != nil {
log.Errorf("waiting for pyservice failed: %v", err)
return err
}
......
......@@ -17,6 +17,7 @@ import (
PB "atune/api/profile"
"bufio"
"encoding/xml"
"encoding/csv"
"fmt"
"gopkg.in/yaml.v2"
"io"
......@@ -81,6 +82,9 @@ const (
MaxFileSize int64 = 100 * 1024 * 1024
)
var RestHost = "localhost"
var RestPort = "8383"
// CheckArgs method check command args num
func CheckArgs(context *cli.Context, expected, checkType int) error {
var err error
......@@ -249,10 +253,10 @@ func RemoveDuplicateElement(message []string) []string {
}
// WaitForPyservice method waiting for pyEngine service start success
func WaitForPyservice(address string, port string) error {
func WaitForPyservice() error {
ticker := time.NewTicker(time.Second * period)
timeout := time.After(time.Second * timeout)
addr := address + ":" + port
addr := RestHost + ":" + RestPort
for {
select {
case <-ticker.C:
......@@ -384,3 +388,26 @@ func ReadAllFile(path string) string {
}
return string(content)
}
// read data from test.csv
func ReadCSV(path string) ([][]string, error) {
file, err := os.Open(path)
if err != nil {
return nil, err
}
defer file.Close()
lines, err := csv.NewReader(file).ReadAll()
if err != nil {
return nil, err
}
data := make([][]string, len(lines))
for _, line := range lines {
if len(line) > 0 {
data = append(data, line)
}
}
return data, nil
}
......@@ -67,6 +67,7 @@ cp -rf %{name} ../../A-Tune
%defattr(0640,root,root,-)
%attr(0640,root,root) /usr/lib/atuned/modules/daemon_profile_server.so
%attr(0640,root,root) %{_unitdir}/atuned.service
%attr(0640,root,root) %{_unitdir}/opt.service
%attr(0750,root,root) %{_bindir}/atuned
%attr(0750,root,root) /usr/libexec/atuned/scripts/*
%attr(0750,root,root) /usr/libexec/atuned/analysis/*
......
......@@ -31,8 +31,15 @@ address = /var/run/atuned/atuned.sock
# the rest service listening port, default is 8383
# the port can be set between 0 to 65535 which not be used
rest_host = localhost
rest_port = 8383
# the tuning optimizer host and port, start by opt.service
# if opt_host is same as rest_host, two ports cannot be same
# the port can be set between 0 to 65535 which not be used
opt_host = localhost
opt_port = 3838
# when run analysis command, the numbers of collected data.
# default is 20
sample_num = 20
......
[Unit]
Description=A-Tune AI service
After=network.target
Requires=polkit.service
[Service]
ExecStart=python3 /usr/libexec/atuned/analysis/app.py /etc/atuned/atuned.cnf opt
[Install]
WantedBy=multi-user.target
......@@ -70,9 +70,9 @@ type RespCollectorPost struct {
// ClassifyPostBody : the body send to classify service
type ClassifyPostBody struct {
Data string `json:"data"`
ModelPath string `json:"modelpath,omitempty"`
Model string `json:"model,omitempty"`
Data [][]string `json:"data"`
ModelPath string `json:"modelpath,omitempty"`
Model string `json:"model,omitempty"`
}
// RespClassify : the response of classify model
......@@ -388,9 +388,16 @@ func (s *ProfileServer) Analysis(message *PB.AnalysisMessage, stream PB.ProfileM
return err
}
data, err := utils.ReadCSV(respCollectPost.Path)
if err != nil {
log.Errorf("Failed to read data from CSV: %v", err)
_ = stream.Send(&PB.AckCheck{Name: err.Error()})
return err
}
//2. send the collected data to the model for completion type identification
body := new(ClassifyPostBody)
body.Data = respCollectPost.Path
body.Data = data
body.ModelPath = path.Join(config.DefaultAnalysisPath, "models")
if message.GetModel() != "" {
......@@ -953,9 +960,16 @@ func (s *ProfileServer) Charaterization(profileInfo *PB.ProfileInfo, stream PB.P
return err
}
data, err := utils.ReadCSV(respCollectPost.Path)
if err != nil {
log.Errorf("Failed to read data from CSV: %v", err)
_ = stream.Send(&PB.AckCheck{Name: err.Error()})
return err
}
//2. send the collected data to the model for completion type identification
body := new(ClassifyPostBody)
body.Data = respCollectPost.Path
body.Data = data
body.ModelPath = path.Join(config.DefaultAnalysisPath, "models")
respPostIns, err := body.Post()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册