未验证 提交 ec61a87b 编写于 作者: J JJW 提交者: GitHub

fix(core): fix One quic connection corresponds to one rxstream (#109)

Co-authored-by: Njjwygjj <wangyaoguang@cel.la>
上级 38571210
......@@ -65,7 +65,7 @@ func (s quicDevHandler) Listen() error {
}
func (s quicDevHandler) Read(st quic.Stream) error {
stream := dispatcher.Dispatcher(s.serverlessHandle, rx.FromReader(st))
stream := dispatcher.Dispatcher(s.serverlessHandle, rx.FromReaderWithY3(st))
go func() {
for customer := range stream.Observe() {
......
......@@ -74,7 +74,7 @@ func (s quicServerHandler) Listen() error {
}
func (s quicServerHandler) Read(st quic.Stream) error {
stream := dispatcher.Dispatcher(s.serverlessHandle, rx.FromReader(st))
stream := dispatcher.Dispatcher(s.serverlessHandle, rx.FromReaderWithY3(st))
y3codec := y3.NewCodec(0x10)
......
......@@ -63,9 +63,14 @@ type quicDevHandler struct {
func (s *quicDevHandler) Listen() error {
err := mocker.EmitMockDataFromCloud(s.serverAddr)
return err
}
func (s *quicDevHandler) Read(st quic.Stream) error {
flows, sinks := workflow.Build(s.serverlessConfig)
stream := dispatcher.DispatcherWithFunc(flows, s.mergeChan)
stream := dispatcher.DispatcherWithFunc(flows, st)
go func() {
for customer := range stream.Observe() {
......@@ -90,23 +95,6 @@ func (s *quicDevHandler) Listen() error {
}
}
}()
return err
}
func (s *quicDevHandler) Read(st quic.Stream) error {
go func() {
for {
buf := make([]byte, 3*1024)
n, err := st.Read(buf)
if err != nil {
break
} else {
value := buf[:n]
s.mergeChan <- value
}
}
}()
return nil
}
......@@ -59,9 +59,14 @@ type quicHandler struct {
}
func (s *quicHandler) Listen() error {
return nil
}
func (s *quicHandler) Read(st quic.Stream) error {
flows, sinks := workflow.Build(s.serverlessConfig)
stream := dispatcher.DispatcherWithFunc(flows, s.mergeChan)
stream := dispatcher.DispatcherWithFunc(flows, st)
go func() {
for customer := range stream.Observe() {
......@@ -85,23 +90,6 @@ func (s *quicHandler) Listen() error {
}
}
}()
return nil
}
func (s *quicHandler) Read(st quic.Stream) error {
go func() {
for {
buf := make([]byte, 3*1024)
n, err := st.Read(buf)
if err != nil {
break
} else {
value := buf[:n]
s.mergeChan <- value
}
}
}()
return nil
}
......@@ -32,8 +32,8 @@ func AutoDispatcher(appPath string, rxstream rx.RxStream) (rx.RxStream, error) {
return Dispatcher(handler, rxstream), nil
}
func DispatcherWithFunc(flows []func() (io.ReadWriter, func()), reader chan []byte) rx.RxStream {
stream := rx.FromChannel(reader)
func DispatcherWithFunc(flows []func() (io.ReadWriter, func()), reader io.Reader) rx.RxStream {
stream := rx.FromReader(reader)
for _, flow := range flows {
stream = stream.MergeReadWriterWithFunc(flow)
......
......@@ -34,6 +34,28 @@ func FromChannel(channel chan []byte) RxStream {
}
func FromReader(reader io.Reader) RxStream {
next := make(chan rxgo.Item)
go func() {
defer close(next)
for {
buf := make([]byte, 3*1024)
n, err := reader.Read(buf)
if err != nil {
break
} else {
value := buf[:n]
next <- Of(value)
}
}
}()
return ConvertObservable(rxgo.FromChannel(next))
}
func FromReaderWithY3(reader io.Reader) RxStream {
source := y3.FromStream(reader)
return ConvertObservableWithY3(source)
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册