未验证 提交 1f1ac8f1 编写于 作者: G groot 提交者: GitHub

Fix bulkinsert row count error (#25862)

Signed-off-by: Nyhmo <yihua.mo@zilliz.com>
上级 22d56849
......@@ -1253,6 +1253,7 @@ func assignSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest) importutil
colID := req.GetImportTask().GetCollectionId()
segmentIDReq := composeAssignSegmentIDRequest(1, shardID, chNames, colID, partID)
targetChName := segmentIDReq.GetSegmentIDRequests()[0].GetChannelName()
logFields = append(logFields, zap.Int64("collection ID", colID))
logFields = append(logFields, zap.String("target channel name", targetChName))
log.Info("assign segment for the import task", logFields...)
resp, err := node.dataCoord.AssignSegmentID(context.Background(), segmentIDReq)
......@@ -1313,6 +1314,7 @@ func createBinLogsFunc(node *DataNode, req *datapb.ImportTaskRequest, schema *sc
log.Info("fields data is empty, no need to generate binlog", logFields...)
return nil, nil, nil
}
logFields = append(logFields, zap.Int("row count", rowNum))
colID := req.GetImportTask().GetCollectionId()
fieldInsert, fieldStats, err := createBinLogs(rowNum, schema, ts, fields, node, segmentID, colID, partID)
......@@ -1406,12 +1408,6 @@ func composeAssignSegmentIDRequest(rowNum int, shardID int, chNames []string,
// use the first field's row count as segment row count
// all the fields row count are same, checked by ImportWrapper
// ask DataCoord to alloc a new segment
log.Info("import task flush segment",
zap.Int("rowCount", rowNum),
zap.Any("channel names", chNames),
zap.Int64("collectionID", collID),
zap.Int64("partitionID", partID),
zap.Int("shardID", shardID))
segReqs := []*datapb.SegmentIDRequest{
{
ChannelName: chNames[shardID],
......
......@@ -454,33 +454,39 @@ func (p *ImportWrapper) flushFunc(fields BlockData, shardID int, partitionID int
}
// if fields data is empty, do nothing
var rowNum int
rowNum := 0
memSize := 0
for _, field := range fields {
rowNum = field.RowNum()
memSize += field.GetMemorySize()
break
}
if rowNum <= 0 {
log.Warn("import wrapper: fields data is empty", logFields...)
return nil
}
logFields = append(logFields, zap.Int("rowNum", rowNum), zap.Int("memSize", memSize))
log.Info("import wrapper: flush block data to binlog", logFields...)
// if there is no segment for this shard, create a new one
// if the segment exists and its size almost exceed segmentSize, close it and create a new one
var segment *WorkingSegment
if shard, ok := p.workingSegments[shardID]; ok {
if segment, exists := shard[partitionID]; exists {
// the segment already exists, check its size, if the size exceeds(or almost) segmentSize, close the segment
if int64(segment.memSize)+int64(memSize) >= p.segmentSize {
err := p.closeWorkingSegment(segment)
if segmentTemp, exists := shard[partitionID]; exists {
log.Info("import wrapper: compare working segment memSize with segmentSize",
zap.Int("memSize", segmentTemp.memSize), zap.Int64("segmentSize", p.segmentSize))
if int64(segmentTemp.memSize)+int64(memSize) >= p.segmentSize {
// the segment already exists, check its size, if the size exceeds(or almost) segmentSize, close the segment
err := p.closeWorkingSegment(segmentTemp)
if err != nil {
logFields = append(logFields, zap.Error(err))
log.Warn("import wrapper: failed to close working segment", logFields...)
return err
}
segment = nil
p.workingSegments[shardID][partitionID] = nil
} else {
// the exist segment size is small, no need to close
segment = segmentTemp
}
}
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册