query_node.go 9.1 KB
Newer Older
B
bigsheeper 已提交
1 2 3 4
package reader

import "C"
import (
5 6 7 8 9 10
	"errors"
	"fmt"
	"suvlim/pulsar"
	"suvlim/pulsar/schema"
	"sync"
	"time"
B
bigsheeper 已提交
11 12
)

B
bigsheeper 已提交
13 14 15 16 17 18 19 20 21
type QueryNodeDataBuffer struct {
	InsertBuffer  []*schema.InsertMsg
	DeleteBuffer  []*schema.DeleteMsg
	SearchBuffer  []*schema.SearchMsg
	validInsertBuffer  []bool
	validDeleteBuffer  []bool
	validSearchBuffer  []bool
}

B
bigsheeper 已提交
22 23 24 25 26 27 28 29
type QueryNodeTimeSync struct {
	deleteTimeSync uint64
	insertTimeSync uint64
	searchTimeSync uint64
}

type QueryNode struct {
	Collections               []*Collection
30
	messageClient 			  pulsar.MessageClient
B
bigsheeper 已提交
31
	queryNodeTimeSync         *QueryNodeTimeSync
B
bigsheeper 已提交
32
	buffer					  QueryNodeDataBuffer
B
bigsheeper 已提交
33 34
}

35 36 37
func NewQueryNode(timeSync uint64) *QueryNode {
	mc := pulsar.MessageClient{}

B
bigsheeper 已提交
38 39 40 41 42 43 44 45
	queryNodeTimeSync := &QueryNodeTimeSync {
		deleteTimeSync: timeSync,
		insertTimeSync: timeSync,
		searchTimeSync: timeSync,
	}

	return &QueryNode{
		Collections:           nil,
46
		messageClient: 		   mc,
B
bigsheeper 已提交
47 48 49 50
		queryNodeTimeSync:     queryNodeTimeSync,
	}
}

51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
func (node *QueryNode)doQueryNode(wg *sync.WaitGroup) {
	wg.Add(3)
	go node.Insert(node.messageClient.InsertMsg, wg)
	go node.Delete(node.messageClient.DeleteMsg, wg)
	go node.Search(node.messageClient.SearchMsg, wg)
	wg.Wait()
}

func (node *QueryNode) PrepareBatchMsg() {
	node.messageClient.PrepareBatchMsg(pulsar.JobType(0))
}

func (node *QueryNode) StartMessageClient() {
	topics := []string{"insert", "delete"}
	node.messageClient.InitClient("pulsar://localhost:6650", topics)

	go node.messageClient.ReceiveMessage()
}

B
bigsheeper 已提交
70 71 72 73 74
func (node *QueryNode) AddNewCollection(collectionName string, schema CollectionSchema) error {
	var collection, err = NewCollection(collectionName, schema)
	node.Collections = append(node.Collections, collection)
	return err
}
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121

func (node *QueryNode) GetSegmentByEntityId(entityId int64) *Segment {
	// TODO: get id2segment info from pulsar
	return nil
}

func (node *QueryNode) GetTargetSegment(collectionName *string, partitionTag *string) (*Segment, error) {
	var targetPartition *Partition

	for _, collection := range node.Collections {
		if *collectionName == collection.CollectionName {
			for _, partition := range collection.Partitions {
				if *partitionTag == partition.PartitionName {
					targetPartition = partition
					break
				}
			}
		}
	}

	if targetPartition == nil {
		return nil, errors.New("cannot found target partition")
	}

	for _, segment := range targetPartition.Segments {
		var segmentStatus = segment.GetStatus()
		if segmentStatus == 0 {
			return segment, nil
		}
	}

	return nil, errors.New("cannot found target segment")
}

func (node *QueryNode) GetTimeSync() uint64 {
	// TODO: Add time sync
	return 0
}

////////////////////////////////////////////////////////////////////////////////////////////////////

func (node *QueryNode) InitQueryNodeCollection() {
	// TODO: remove hard code, add collection creation request
	var collection, _ = NewCollection("collection1", "fakeSchema")
	node.Collections = append(node.Collections, collection)
	var partition, _ = collection.NewPartition("partition1")
	collection.Partitions = append(collection.Partitions, partition)
B
bigsheeper 已提交
122 123
	// TODO: add segment id
	var segment, _ = partition.NewSegment(0)
124 125 126 127 128 129 130 131
	partition.Segments = append(partition.Segments, segment)
}

func (node *QueryNode) SegmentsManagement() {
	var timeSync = node.GetTimeSync()
	for _, collection := range node.Collections {
		for _, partition := range collection.Partitions {
			for _, segment := range partition.Segments {
B
bigsheeper 已提交
132
				// TODO: check segment status
133 134 135
				if timeSync >= segment.SegmentCloseTime {
					segment.Close()
					// TODO: add atomic segment id
B
bigsheeper 已提交
136
					var newSegment, _ = partition.NewSegment(0)
B
bigsheeper 已提交
137
					newSegment.SegmentCloseTime = timeSync + SegmentLifetime
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
					partition.Segments = append(partition.Segments, newSegment)
				}
			}
		}
	}
}

func (node *QueryNode) SegmentService() {
	for {
		time.Sleep(200 * time.Millisecond)
		node.SegmentsManagement()
		fmt.Println("do segments management in 200ms")
	}
}

///////////////////////////////////////////////////////////////////////////////////////////////////
func (node *QueryNode) Insert(insertMessages []*schema.InsertMsg, wg *sync.WaitGroup) schema.Status {
B
bigsheeper 已提交
155
	var timeSync = node.GetTimeSync()
156 157 158 159 160 161 162 163
	var collectionName = insertMessages[0].CollectionName
	var partitionTag = insertMessages[0].PartitionTag
	var clientId = insertMessages[0].ClientId

	// TODO: prevent Memory copy
	var entityIds []int64
	var timestamps []uint64
	var vectorRecords [][]*schema.FieldValue
B
bigsheeper 已提交
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181

	for i, msg := range node.buffer.InsertBuffer {
		if msg.Timestamp <= timeSync {
			entityIds = append(entityIds, msg.EntityId)
			timestamps = append(timestamps, msg.Timestamp)
			vectorRecords = append(vectorRecords, msg.Fields)
			node.buffer.validInsertBuffer[i] = false
		}
	}

	for i, isValid := range node.buffer.validInsertBuffer {
		if !isValid {
			copy(node.buffer.InsertBuffer[i:], node.buffer.InsertBuffer[i+1:]) // Shift a[i+1:] left one index.
			node.buffer.InsertBuffer[len(node.buffer.InsertBuffer)-1] = nil    // Erase last element (write zero value).
			node.buffer.InsertBuffer = node.buffer.InsertBuffer[:len(node.buffer.InsertBuffer)-1]     // Truncate slice.
		}
	}

182
	for _, msg := range insertMessages {
B
bigsheeper 已提交
183 184 185 186 187 188 189 190
		if msg.Timestamp <= timeSync {
			entityIds = append(entityIds, msg.EntityId)
			timestamps = append(timestamps, msg.Timestamp)
			vectorRecords = append(vectorRecords, msg.Fields)
		} else {
			node.buffer.InsertBuffer = append(node.buffer.InsertBuffer, msg)
			node.buffer.validInsertBuffer = append(node.buffer.validInsertBuffer, true)
		}
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206
	}

	var targetSegment, err = node.GetTargetSegment(&collectionName, &partitionTag)
	if err != nil {
		// TODO: throw runtime error
		fmt.Println(err.Error())
		return schema.Status{}
	}

	var result = SegmentInsert(targetSegment, collectionName, partitionTag, &entityIds, &timestamps, vectorRecords)

	wg.Done()
	return publishResult(&result, clientId)
}

func (node *QueryNode) Delete(deleteMessages []*schema.DeleteMsg, wg *sync.WaitGroup) schema.Status {
B
bigsheeper 已提交
207
	var timeSync = node.GetTimeSync()
208 209 210 211 212 213
	var collectionName = deleteMessages[0].CollectionName
	var clientId = deleteMessages[0].ClientId

	// TODO: prevent Memory copy
	var entityIds []int64
	var timestamps []uint64
B
bigsheeper 已提交
214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230

	for i, msg := range node.buffer.DeleteBuffer {
		if msg.Timestamp <= timeSync {
			entityIds = append(entityIds, msg.EntityId)
			timestamps = append(timestamps, msg.Timestamp)
			node.buffer.validDeleteBuffer[i] = false
		}
	}

	for i, isValid := range node.buffer.validDeleteBuffer {
		if !isValid {
			copy(node.buffer.DeleteBuffer[i:], node.buffer.DeleteBuffer[i+1:]) // Shift a[i+1:] left one index.
			node.buffer.DeleteBuffer[len(node.buffer.DeleteBuffer)-1] = nil    // Erase last element (write zero value).
			node.buffer.DeleteBuffer = node.buffer.DeleteBuffer[:len(node.buffer.DeleteBuffer)-1]     // Truncate slice.
		}
	}

231
	for _, msg := range deleteMessages {
B
bigsheeper 已提交
232 233 234 235 236 237 238
		if msg.Timestamp <= timeSync {
			entityIds = append(entityIds, msg.EntityId)
			timestamps = append(timestamps, msg.Timestamp)
		} else {
			node.buffer.DeleteBuffer = append(node.buffer.DeleteBuffer, msg)
			node.buffer.validDeleteBuffer = append(node.buffer.validDeleteBuffer, true)
		}
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255
	}

	if entityIds == nil {
		// TODO: throw runtime error
		fmt.Println("no entities found")
		return schema.Status{}
	}
	// TODO: does all entities from a common batch are in the same segment?
	var targetSegment = node.GetSegmentByEntityId(entityIds[0])

	var result = SegmentDelete(targetSegment, collectionName, &entityIds, &timestamps)

	wg.Done()
	return publishResult(&result, clientId)
}

func (node *QueryNode) Search(searchMessages []*schema.SearchMsg, wg *sync.WaitGroup) schema.Status {
B
bigsheeper 已提交
256
	var timeSync = node.GetTimeSync()
257 258 259 260 261 262 263
	var collectionName = searchMessages[0].CollectionName
	var partitionTag = searchMessages[0].PartitionTag
	var clientId = searchMessages[0].ClientId
	var queryString = searchMessages[0].VectorParam.Json

	// TODO: prevent Memory copy
	var records []schema.VectorRecord
B
bigsheeper 已提交
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
	var timestamps []uint64

	for i, msg := range node.buffer.SearchBuffer {
		if msg.Timestamp <= timeSync {
			records = append(records, *msg.VectorParam.RowRecord)
			timestamps = append(timestamps, msg.Timestamp)
			node.buffer.validSearchBuffer[i] = false
		}
	}

	for i, isValid := range node.buffer.validSearchBuffer {
		if !isValid {
			copy(node.buffer.SearchBuffer[i:], node.buffer.SearchBuffer[i+1:]) // Shift a[i+1:] left one index.
			node.buffer.SearchBuffer[len(node.buffer.SearchBuffer)-1] = nil    // Erase last element (write zero value).
			node.buffer.SearchBuffer = node.buffer.SearchBuffer[:len(node.buffer.SearchBuffer)-1]     // Truncate slice.
		}
	}

282
	for _, msg := range searchMessages {
B
bigsheeper 已提交
283 284 285 286 287 288 289
		if msg.Timestamp <= timeSync {
			records = append(records, *msg.VectorParam.RowRecord)
			timestamps = append(timestamps, msg.Timestamp)
		} else {
			node.buffer.SearchBuffer = append(node.buffer.SearchBuffer, msg)
			node.buffer.validSearchBuffer = append(node.buffer.validSearchBuffer, true)
		}
290 291 292 293 294 295 296 297 298 299 300 301 302 303
	}

	var targetSegment, err = node.GetTargetSegment(&collectionName, &partitionTag)
	if err != nil {
		// TODO: throw runtime error
		fmt.Println(err.Error())
		return schema.Status{}
	}

	var result = SegmentSearch(targetSegment, collectionName, queryString, &timestamps, &records)

	wg.Done()
	return publishResult(&result, clientId)
}