server.go 2.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
package main

import (
	"bufio"
	"bytes"
	"encoding/json"
	"fmt"
	"io"
	"io/ioutil"
	"log"
	"net/http"
	"os"
)

type Input struct {
	Keys []uint64 `json:"keys"`
}

type SingleValue struct {
	Status uint32 `json:"status"`
	Value  string `json:"value"`
}
type Output struct {
	Values []SingleValue `json:"values"`
}

type ServerNode struct {
	Ip   string `json:"ip"`
	Port uint64 `json:"port"`
}

type CubeServer struct {
	Name  string       `json:"dict_name"`
	Shard uint64       `json:"shard"`
	Nodes []ServerNode `json:"nodes"`
}

func (server *CubeServer) SplitKeys(keys []uint64) (splited_keys map[uint64]Input, offset map[uint64][]uint64) {
	splited_keys = make(map[uint64]Input)

	offset = make(map[uint64][]uint64)
	for i, key := range keys {
		shard_id := key % server.Shard
		temp_split, _ := splited_keys[shard_id]
		temp_split.Keys = append(temp_split.Keys, key)
		splited_keys[shard_id] = temp_split

		temp_offset, _ := offset[shard_id]
		temp_offset = append(temp_offset, uint64(i))
		offset[shard_id] = temp_offset
	}

	return splited_keys, offset
}

func (server *CubeServer) Seek(input string, output_path string) (err error) {
	file, err := os.Open(input)
	if err != nil {
		return err
	}
	defer file.Close()

	buf := bufio.NewReader(file)

	for {
		line, err := buf.ReadBytes('\n')
		//line = strings.TrimSpace(line)
		if err != nil || io.EOF == err {
			break
		}
		var temp_input Input
		json.Unmarshal(line, &temp_input)
		key_nums := len(temp_input.Keys)
		var output Output
		output.Values = make([]SingleValue, key_nums+1)
		splited_keys, offset := server.SplitKeys(temp_input.Keys)
		for shard_id, keys := range splited_keys {
			cur_output, _ := server.Post(shard_id, keys)
			for index, single_value := range cur_output.Values {
				output.Values[offset[shard_id][index]] = single_value
			}
		}
		json_str, _ := json.Marshal(output)
		fp, err := os.OpenFile(output_path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0755)
		if err != nil {
			log.Fatal(err)
		}
		defer fp.Close()
		_, err = fp.Write(json_str)
	}
	return err
}

func (server *CubeServer) Post(shard_id uint64, input Input) (output Output, err error) {
	if shard_id >= uint64(len(server.Nodes)) {
		err = fmt.Errorf("have no shard:%v", shard_id)
		return output, err
	}
	json_str, _ := json.Marshal(input)
	URL := fmt.Sprintf("http://%s:%v/DictService/seek", server.Nodes[shard_id].Ip, server.Nodes[shard_id].Port)
	req, err := http.NewRequest("POST", URL, bytes.NewBuffer(json_str))
	if err != nil {
		return output, err
	}
	req.Header.Set("Content-Type", "application/json")
	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		return output, err
	}
	body, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		return output, err
	}
	err = json.Unmarshal(body, &output)
	return output, err
}