main.go 1.1 KB
Newer Older
1 2 3 4 5
package main

import (
	"context"
	"fmt"
Z
zhenshan.cao 已提交
6
	"github.com/czs007/suvlim/conf"
B
bigsheeper 已提交
7
	storage "github.com/czs007/suvlim/storage/pkg"
8 9
	"github.com/czs007/suvlim/writer/message_client"
	"github.com/czs007/suvlim/writer/write_node"
10 11 12
	"log"
	"strconv"
	"sync"
13 14 15
)

func main() {
B
bigsheeper 已提交
16 17 18 19
	pulsarAddr := "pulsar://"
	pulsarAddr += conf.Config.Pulsar.Address
	pulsarAddr += ":"
	pulsarAddr += strconv.FormatInt(int64(conf.Config.Pulsar.Port), 10)
20
	mc := message_client.MessageClient{}
21

B
bigsheeper 已提交
22
	mc.InitClient(pulsarAddr)
23 24
	//TODO::close client / consumer/ producer

25
	mc.ReceiveMessage()
26
	wg := sync.WaitGroup{}
X
xige-16 已提交
27
	ctx := context.Background()
Z
zhenshan.cao 已提交
28
	kv, err := storage.NewStore(ctx, conf.Config.Storage.Driver)
29
	// TODO:: if err != nil, should retry link
30 31 32 33 34
	if err != nil {
		log.Fatal(err)
	}

	wn := write_node.WriteNode{
35
		KvStore:       &kv,
36 37 38 39
		MessageClient: &mc,
		TimeSync:      100,
	}

40
	//TODO:: start a gorouter for searchById
41
	for {
42 43
		if ctx.Err() != nil {
			break
44
		}
45 46 47 48
		msgLength := wn.MessageClient.PrepareBatchMsg()
		if msgLength > 0 {
			wn.DoWriteNode(ctx, &wg)
			fmt.Println("write node do a batch message, storage len: ", msgLength)
49 50
		}
	}
51
	wn.Close()
52
}