提交 768d7b0f 编写于 作者: L lishuangbin

update code

上级
from fastapi import APIRouter
from pydantic import BaseModel
from skl_kmeans import SklKMeans
import settings
import numpy as np
router = APIRouter()
class ClusterRequest(BaseModel):
points: list
@router.post('/api/skl-kmeans')
async def api_skl_kmeans(req: ClusterRequest):
try:
model = SklKMeans(settings.clusters)
response = model.fit(np.asarray(req.points))
response = {"results": response.tolist()}
except Exception as ex:
response = None
return response
import uvicorn
from fastapi import FastAPI
from starlette.middleware.cors import CORSMiddleware
from apscheduler.schedulers.asyncio import AsyncIOScheduler
import yaml
import api_skl_kmeans
from nacos_utils import NacosHelper
import settings
service_name = 'skl-kmeans'
service_port = 9999
beat_interval = 30
nacos_endpoint = '192.168.1.238:8848'
nacos_namespace_id = '1bc91fa5-37e3-4269-8e41-3f4e4efb7633'
nacos_group_name = 'DEFAULT_GROUP'
nacos_username = 'nacos'
nacos_password = 'nacos'
nacos_data_id = 'skl-kmeans'
def load_config(content):
yaml_config = yaml.full_load(content)
clusters = yaml_config['clusters']
settings.clusters = clusters
def nacos_config_callback(args):
content = args['raw_content']
load_config(content)
def get_application() -> FastAPI:
fastapi_kwargs= {
"debug": False,
"docs_url": "/docs",
"openapi_prefix": "",
"openapi_url": "/openapi.json",
"redoc_url": "/redoc",
"title": "sklkmeans",
"version": "1.0.0",
}
application = FastAPI(**fastapi_kwargs)
application.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
application.include_router(api_skl_kmeans.router, tags=['skl-kmeans'])
nacos = NacosHelper(nacos_endpoint, nacos_namespace_id, nacos_username, nacos_password)
nacos.set_service(service_name, service_port, nacos_group_name)
nacos.register()
# 注册配置变更监控回调
nacos.add_conf_watcher(nacos_data_id, nacos_group_name, nacos_config_callback)
# 启动时,强制同步一次配置
data_stream = nacos.load_conf(nacos_data_id,nacos_group_name)
load_config(data_stream)
@application.on_event('startup')
def init_scheduler():
scheduler = AsyncIOScheduler()
scheduler.add_job(nacos.beat_callback, 'interval', seconds=beat_interval)
scheduler.start()
return application
if __name__ == '__main__':
uvicorn.run(app=get_application(), host='0.0.0.0', port=service_port)
import nacos
from net_utils import net_helper
class NacosHelper:
service_name = None
service_port = None
service_group = None
def __init__(self, server_endpoint, namespace_id, username=None, password=None):
self.client = nacos.NacosClient(server_endpoint,
namespace=namespace_id,
username=username,
password=password)
self.endpoint = server_endpoint
self.service_ip = net_helper.get_host_ip()
def register(self):
self.client.add_naming_instance(self.service_name,
self.service_ip,
self.service_port,
group_name=self.service_group)
def unregister(self):
self.client.remove_naming_instance(self.service_name,
self.service_ip,
self.service_port)
def set_service(self, service_name, service_port, service_group):
self.service_name = service_name
self.service_port = service_port
self.service_group = service_group
async def beat_callback(self):
self.client.send_heartbeat(self.service_name,
self.service_ip,
self.service_port)
def load_conf(self, data_id, group):
return self.client.get_config(data_id=data_id, group=group, no_snapshot=True)
def add_conf_watcher(self, data_id, group, callback):
self.client.add_config_watcher(data_id=data_id, group=group, cb=callback)
import socket
class NetHelper:
def __init__(self):
self.name = 'net-helper'
@classmethod
def get_host_ip(cls):
st = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
st.connect(('10.255.255.255', 1))
IP = st.getsockname()[0]
except Exception:
IP = '127.0.0.1'
finally:
st.close()
return IP
net_helper = NetHelper()
if __name__ == '__main__':
print(net_helper.get_host_ip())
clusters = 2
\ No newline at end of file
from sklearn.cluster import KMeans
class SklKMeans:
def __init__(self, clusters):
self.model = KMeans(n_clusters=clusters, random_state=9)
def fit(self, inputs):
y_pred = self.model.fit_predict(inputs)
return y_pred
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册