diff --git a/internal/util/flowgraph/node.go b/internal/util/flowgraph/node.go index 65a997952e205e01ac56aa68609efed20240cf09..a8e78c740b0628fc6fd1456e365a772ff6a49cac 100644 --- a/internal/util/flowgraph/node.go +++ b/internal/util/flowgraph/node.go @@ -147,29 +147,33 @@ func (nodeCtx *nodeCtx) collectInputMessages() context.Context { for i := 1; i < len(nodeCtx.inputMessages); i++ { if t < nodeCtx.inputMessages[i].TimeTick() { latestTime = nodeCtx.inputMessages[i].TimeTick() - //err := errors.New("Fatal, misaligned time tick," + - // "t1=" + strconv.FormatUint(time, 10) + - // ", t2=" + strconv.FormatUint((*nodeCtx.inputMessages[i]).TimeTick(), 10) + - // ", please restart pulsar") - //panic(err) } } + // wait for time tick - for i := 0; i < len(nodeCtx.inputMessages); i++ { - for nodeCtx.inputMessages[i].TimeTick() != latestTime { - channel := nodeCtx.inputChannels[i] - select { - case <-time.After(10 * time.Second): - panic("cannot find time tick in flow graph") - case msg, ok := <-channel: + sign := make(chan struct{}) + go func() { + for i := 0; i < len(nodeCtx.inputMessages); i++ { + for nodeCtx.inputMessages[i].TimeTick() != latestTime { + fmt.Println("try to align timestamp, t1 =", latestTime, ", t2 =", nodeCtx.inputMessages[i].TimeTick()) + channel := nodeCtx.inputChannels[i] + msg, ok := <-channel if !ok { log.Println("input channel closed") - return nil + return } nodeCtx.inputMessages[i] = msg.msg } } + sign <- struct{}{} + }() + + select { + case <-time.After(10 * time.Second): + panic("Fatal, misaligned time tick, please restart pulsar") + case <-sign: } + } return ctx }