grpc_service.go 9.1 KB
Newer Older
1 2 3 4
package proxy

import (
	"context"
Z
zhenshan.cao 已提交
5
	"errors"
D
dragondriver 已提交
6 7
	"log"
	"strconv"
8

C
cai.zhang 已提交
9 10
	"github.com/golang/protobuf/proto"
	"github.com/zilliztech/milvus-distributed/internal/msgstream"
11 12 13 14 15 16 17 18
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
	"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
	"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
)

func (p *Proxy) Insert(ctx context.Context, in *servicepb.RowBatch) (*servicepb.IntegerRangeResponse, error) {
	it := &InsertTask{
19
		BaseInsertTask: BaseInsertTask{
20 21 22 23 24 25 26 27 28 29 30 31 32
			BaseMsg: msgstream.BaseMsg{
				HashValues: in.HashKeys,
			},
			InsertRequest: internalpb.InsertRequest{
				MsgType:        internalpb.MsgType_kInsert,
				CollectionName: in.CollectionName,
				PartitionTag:   in.PartitionTag,
				RowData:        in.RowData,
			},
		},
		done:                  make(chan error),
		manipulationMsgStream: p.manipulationMsgStream,
	}
33

34 35 36 37 38
	it.ctx, it.cancel = context.WithCancel(ctx)
	// TODO: req_id, segment_id, channel_id, proxy_id, timestamps, row_ids

	defer it.cancel()

39 40 41 42 43
	fn := func() error {
		select {
		case <-ctx.Done():
			return errors.New("insert timeout")
		default:
D
dragondriver 已提交
44
			return p.taskSch.DmQueue.Enqueue(it)
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
		}
	}
	err := fn()

	if err != nil {
		return &servicepb.IntegerRangeResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
				Reason:    err.Error(),
			},
		}, nil
	}

	err = it.WaitToFinish()
	if err != nil {
B
bigsheeper 已提交
60 61 62
		return &servicepb.IntegerRangeResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
63
				Reason:    err.Error(),
B
bigsheeper 已提交
64
			},
65
		}, nil
66
	}
67 68

	return it.result, nil
69 70 71
}

func (p *Proxy) CreateCollection(ctx context.Context, req *schemapb.CollectionSchema) (*commonpb.Status, error) {
72 73 74 75 76 77 78 79 80 81 82 83 84 85
	cct := &CreateCollectionTask{
		CreateCollectionRequest: internalpb.CreateCollectionRequest{
			MsgType: internalpb.MsgType_kCreateCollection,
			Schema:  &commonpb.Blob{},
			// TODO: req_id, timestamp, proxy_id
		},
		masterClient: p.masterClient,
		done:         make(chan error),
	}
	schemaBytes, _ := proto.Marshal(req)
	cct.CreateCollectionRequest.Schema.Value = schemaBytes
	cct.ctx, cct.cancel = context.WithCancel(ctx)
	defer cct.cancel()

86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
	fn := func() error {
		select {
		case <-ctx.Done():
			return errors.New("create collection timeout")
		default:
			return p.taskSch.DdQueue.Enqueue(cct)
		}
	}
	err := fn()
	if err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    err.Error(),
		}, err
	}

	err = cct.WaitToFinish()
	if err != nil {
B
bigsheeper 已提交
104 105
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
106 107
			Reason:    err.Error(),
		}, err
108
	}
109 110

	return cct.result, nil
111 112 113 114 115 116 117 118 119 120 121 122 123 124
}

func (p *Proxy) Search(ctx context.Context, req *servicepb.Query) (*servicepb.QueryResult, error) {
	qt := &QueryTask{
		SearchRequest: internalpb.SearchRequest{
			MsgType: internalpb.MsgType_kSearch,
			Query:   &commonpb.Blob{},
			// TODO: req_id, proxy_id, timestamp, result_channel_id
		},
		queryMsgStream: p.queryMsgStream,
		done:           make(chan error),
		resultBuf:      make(chan []*internalpb.SearchResult),
	}
	qt.ctx, qt.cancel = context.WithCancel(ctx)
D
dragondriver 已提交
125 126 127
	// Hack with test, shit here but no other ways
	reqID, _ := strconv.Atoi(req.CollectionName[len(req.CollectionName)-1:])
	qt.ReqID = int64(reqID)
128 129
	queryBytes, _ := proto.Marshal(req)
	qt.SearchRequest.Query.Value = queryBytes
D
dragondriver 已提交
130
	log.Printf("grpc address of query task: %p", qt)
131 132
	defer qt.cancel()

133 134 135 136 137
	fn := func() error {
		select {
		case <-ctx.Done():
			return errors.New("create collection timeout")
		default:
D
dragondriver 已提交
138
			return p.taskSch.DqQueue.Enqueue(qt)
139 140 141 142 143 144 145 146 147
		}
	}
	err := fn()
	if err != nil {
		return &servicepb.QueryResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
				Reason:    err.Error(),
			},
D
dragondriver 已提交
148
		}, nil
149 150 151 152
	}

	err = qt.WaitToFinish()
	if err != nil {
B
bigsheeper 已提交
153 154 155
		return &servicepb.QueryResult{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
156
				Reason:    err.Error(),
B
bigsheeper 已提交
157
			},
D
dragondriver 已提交
158
		}, nil
159
	}
160 161

	return qt.result, nil
162 163 164
}

func (p *Proxy) DropCollection(ctx context.Context, req *servicepb.CollectionName) (*commonpb.Status, error) {
165 166 167 168 169 170 171 172 173 174 175 176
	dct := &DropCollectionTask{
		DropCollectionRequest: internalpb.DropCollectionRequest{
			MsgType: internalpb.MsgType_kDropCollection,
			// TODO: req_id, timestamp, proxy_id
			CollectionName: req,
		},
		masterClient: p.masterClient,
		done:         make(chan error),
	}
	dct.ctx, dct.cancel = context.WithCancel(ctx)
	defer dct.cancel()

177 178 179 180 181 182 183 184 185 186
	fn := func() error {
		select {
		case <-ctx.Done():
			return errors.New("create collection timeout")
		default:
			return p.taskSch.DdQueue.Enqueue(dct)
		}
	}
	err := fn()
	if err != nil {
B
bigsheeper 已提交
187 188
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
189 190
			Reason:    err.Error(),
		}, err
191
	}
192 193 194 195 196 197 198 199 200 201

	err = dct.WaitToFinish()
	if err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    err.Error(),
		}, err
	}

	return dct.result, nil
202 203 204
}

func (p *Proxy) HasCollection(ctx context.Context, req *servicepb.CollectionName) (*servicepb.BoolResponse, error) {
205 206 207 208 209
	hct := &HasCollectionTask{
		HasCollectionRequest: internalpb.HasCollectionRequest{
			MsgType: internalpb.MsgType_kHasCollection,
			// TODO: req_id, timestamp, proxy_id
			CollectionName: req,
210
		},
211 212 213 214 215 216
		masterClient: p.masterClient,
		done:         make(chan error),
	}
	hct.ctx, hct.cancel = context.WithCancel(ctx)
	defer hct.cancel()

217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
	fn := func() error {
		select {
		case <-ctx.Done():
			return errors.New("create collection timeout")
		default:
			return p.taskSch.DdQueue.Enqueue(hct)
		}
	}
	err := fn()
	if err != nil {
		return &servicepb.BoolResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
				Reason:    err.Error(),
			},
		}, err
	}

	err = hct.WaitToFinish()
	if err != nil {
B
bigsheeper 已提交
237 238 239
		return &servicepb.BoolResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
240
				Reason:    err.Error(),
B
bigsheeper 已提交
241
			},
242
		}, err
243
	}
244 245

	return hct.result, nil
246 247 248
}

func (p *Proxy) DescribeCollection(ctx context.Context, req *servicepb.CollectionName) (*servicepb.CollectionDescription, error) {
249 250 251 252 253
	dct := &DescribeCollectionTask{
		DescribeCollectionRequest: internalpb.DescribeCollectionRequest{
			MsgType: internalpb.MsgType_kDescribeCollection,
			// TODO: req_id, timestamp, proxy_id
			CollectionName: req,
254
		},
255 256 257 258 259 260
		masterClient: p.masterClient,
		done:         make(chan error),
	}
	dct.ctx, dct.cancel = context.WithCancel(ctx)
	defer dct.cancel()

261 262 263 264 265 266 267 268 269 270
	fn := func() error {
		select {
		case <-ctx.Done():
			return errors.New("create collection timeout")
		default:
			return p.taskSch.DdQueue.Enqueue(dct)
		}
	}
	err := fn()
	if err != nil {
B
bigsheeper 已提交
271 272 273
		return &servicepb.CollectionDescription{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
274
				Reason:    err.Error(),
B
bigsheeper 已提交
275
			},
276
		}, err
277
	}
278 279 280 281 282 283 284 285 286 287 288 289

	err = dct.WaitToFinish()
	if err != nil {
		return &servicepb.CollectionDescription{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
				Reason:    err.Error(),
			},
		}, err
	}

	return dct.result, nil
290 291 292
}

func (p *Proxy) ShowCollections(ctx context.Context, req *commonpb.Empty) (*servicepb.StringListResponse, error) {
293 294 295 296
	sct := &ShowCollectionsTask{
		ShowCollectionRequest: internalpb.ShowCollectionRequest{
			MsgType: internalpb.MsgType_kDescribeCollection,
			// TODO: req_id, timestamp, proxy_id
297
		},
298 299 300 301 302 303
		masterClient: p.masterClient,
		done:         make(chan error),
	}
	sct.ctx, sct.cancel = context.WithCancel(ctx)
	defer sct.cancel()

304 305 306 307 308 309 310 311 312 313
	fn := func() error {
		select {
		case <-ctx.Done():
			return errors.New("create collection timeout")
		default:
			return p.taskSch.DdQueue.Enqueue(sct)
		}
	}
	err := fn()
	if err != nil {
B
bigsheeper 已提交
314 315 316
		return &servicepb.StringListResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
317
				Reason:    err.Error(),
B
bigsheeper 已提交
318
			},
319
		}, err
320
	}
321 322 323 324 325 326 327 328 329 330 331 332

	err = sct.WaitToFinish()
	if err != nil {
		return &servicepb.StringListResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
				Reason:    err.Error(),
			},
		}, err
	}

	return sct.result, nil
333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375
}

func (p *Proxy) CreatePartition(ctx context.Context, in *servicepb.PartitionName) (*commonpb.Status, error) {
	return &commonpb.Status{
		ErrorCode: 0,
		Reason:    "",
	}, nil
}

func (p *Proxy) DropPartition(ctx context.Context, in *servicepb.PartitionName) (*commonpb.Status, error) {
	return &commonpb.Status{
		ErrorCode: 0,
		Reason:    "",
	}, nil
}

func (p *Proxy) HasPartition(ctx context.Context, in *servicepb.PartitionName) (*servicepb.BoolResponse, error) {
	return &servicepb.BoolResponse{
		Status: &commonpb.Status{
			ErrorCode: 0,
			Reason:    "",
		},
		Value: true,
	}, nil
}

func (p *Proxy) DescribePartition(ctx context.Context, in *servicepb.PartitionName) (*servicepb.PartitionDescription, error) {
	return &servicepb.PartitionDescription{
		Status: &commonpb.Status{
			ErrorCode: 0,
			Reason:    "",
		},
	}, nil
}

func (p *Proxy) ShowPartitions(ctx context.Context, req *servicepb.CollectionName) (*servicepb.StringListResponse, error) {
	return &servicepb.StringListResponse{
		Status: &commonpb.Status{
			ErrorCode: 0,
			Reason:    "",
		},
	}, nil
}