TsFileProcessor.java 60.3 KB
Newer Older
B
Boris 已提交
1
/*
J
Jialin Qiao 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
19
package org.apache.iotdb.db.storageengine.dataregion.memtable;
J
Jialin Qiao 已提交
20

21
import org.apache.iotdb.common.rpc.thrift.TSStatus;
22
import org.apache.iotdb.commons.conf.CommonDescriptor;
23
import org.apache.iotdb.commons.exception.MetadataException;
24
import org.apache.iotdb.commons.path.AlignedPath;
25
import org.apache.iotdb.commons.path.PartialPath;
26
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
27
import org.apache.iotdb.commons.utils.TestOnly;
28
import org.apache.iotdb.db.conf.IoTDBConfig;
J
Jialin Qiao 已提交
29 30
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.TsFileProcessorException;
J
Jialin Qiao 已提交
31
import org.apache.iotdb.db.exception.WriteProcessException;
32
import org.apache.iotdb.db.exception.WriteProcessRejectException;
J
JackieTien97 已提交
33
import org.apache.iotdb.db.exception.query.QueryProcessException;
34
import org.apache.iotdb.db.pipe.agent.PipeAgent;
35
import org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
36 37 38 39 40 41 42
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils;
43
import org.apache.iotdb.db.service.metrics.WritingMetrics;
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.DataRegionInfo;
import org.apache.iotdb.db.storageengine.dataregion.flush.CloseFileListener;
import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
import org.apache.iotdb.db.storageengine.dataregion.flush.FlushListener;
import org.apache.iotdb.db.storageengine.dataregion.flush.FlushManager;
import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
import org.apache.iotdb.db.storageengine.dataregion.flush.NotifyFlushMemTable;
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import org.apache.iotdb.db.storageengine.dataregion.wal.node.IWALNode;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener;
import org.apache.iotdb.db.storageengine.rescon.memory.MemTableManager;
import org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
62
import org.apache.iotdb.db.utils.MemUtils;
63
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
64
import org.apache.iotdb.db.utils.datastructure.TVList;
J
Jialin Qiao 已提交
65
import org.apache.iotdb.rpc.RpcUtils;
66
import org.apache.iotdb.rpc.TSStatusCode;
67
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
68
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
J
Jialin Qiao 已提交
69
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
70
import org.apache.iotdb.tsfile.utils.Binary;
71
import org.apache.iotdb.tsfile.utils.Pair;
J
Jialin Qiao 已提交
72
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
73

J
Jialin Qiao 已提交
74 75 76
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

77 78 79 80
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
81
import java.util.HashMap;
82 83
import java.util.Iterator;
import java.util.List;
84
import java.util.Map;
85 86
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
87
import java.util.concurrent.CopyOnWriteArrayList;
88 89 90
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

91 92 93
import static org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.GET_QUERY_RESOURCE_FROM_MEM;
import static org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet.FLUSHING_MEMTABLE;
import static org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet.WORKING_MEMTABLE;
L
liuminghui233 已提交
94

95
@SuppressWarnings("java:S1135") // ignore todos
J
Jialin Qiao 已提交
96 97
public class TsFileProcessor {

98
  /** logger fot this class. */
J
Jialin Qiao 已提交
99
  private static final Logger logger = LoggerFactory.getLogger(TsFileProcessor.class);
100

101
  /** storgae group name of this tsfile. */
102
  private final String storageGroupName;
103

104
  /** IoTDB config. */
105 106
  private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();

107
  /** whether it's enable mem control. */
108
  private final boolean enableMemControl = config.isEnableMemControl();
S
SilverNarcissus 已提交
109

110
  /** database info for mem control. */
111
  private final DataRegionInfo dataRegionInfo;
112
  /** tsfile processor info for mem control. */
113
  private TsFileProcessorInfo tsFileProcessorInfo;
114

115
  /** sync this object in read() and asyncTryToFlush(). */
116
  private final ConcurrentLinkedDeque<IMemTable> flushingMemTables = new ConcurrentLinkedDeque<>();
117

118
  /** modification to memtable mapping. */
119
  private final List<Pair<Modification, IMemTable>> modsToMemtable = new ArrayList<>();
S
SilverNarcissus 已提交
120

121
  /** writer for restore tsfile and flushing. */
J
Jialin Qiao 已提交
122
  private RestorableTsFileIOWriter writer;
S
SilverNarcissus 已提交
123

124
  /** tsfile resource for index this tsfile. */
J
JackieTien97 已提交
125
  private final TsFileResource tsFileResource;
S
SilverNarcissus 已提交
126 127

  /** time range index to indicate this processor belongs to which time range */
128
  private long timeRangeId;
J
Jialin Qiao 已提交
129 130 131 132
  /**
   * Whether the processor is in the queue of the FlushManager or being flushed by a flush thread.
   */
  private volatile boolean managedByFlushManager;
133

134
  /** a lock to mutual exclude read and read */
J
JackieTien97 已提交
135
  private final ReadWriteLock flushQueryLock = new ReentrantReadWriteLock();
J
Jialin Qiao 已提交
136
  /**
137 138
   * It is set by the StorageGroupProcessor and checked by flush threads. (If shouldClose == true
   * and its flushingMemTables are all flushed, then the flush thread will close this file.)
J
Jialin Qiao 已提交
139 140
   */
  private volatile boolean shouldClose;
141

142
  /** working memtable. */
J
Jialin Qiao 已提交
143
  private IMemTable workMemTable;
T
Tianan Li 已提交
144

145
  /** last flush time to flush the working memtable. */
146 147
  private long lastWorkMemtableFlushTime;

148
  /** this callback is called before the workMemtable is added into the flushingMemTables. */
149
  private final DataRegion.UpdateEndTimeCallBack updateLatestFlushTimeCallback;
150

151
  /** wal node. */
A
Alan Choo 已提交
152
  private final IWALNode walNode;
S
SilverNarcissus 已提交
153

154
  /** whether it's a sequence file or not. */
J
JackieTien97 已提交
155
  private final boolean sequence;
S
SilverNarcissus 已提交
156

157
  /** total memtable size for mem control. */
158 159
  private long totalMemTableSize;

160
  private static final String FLUSH_QUERY_WRITE_LOCKED = "{}: {} get flushQueryLock write lock";
161 162
  private static final String FLUSH_QUERY_WRITE_RELEASE =
      "{}: {} get flushQueryLock write lock released";
163

164
  /** close file listener. */
165
  private final List<CloseFileListener> closeFileListeners = new CopyOnWriteArrayList<>();
S
SilverNarcissus 已提交
166

167 168
  /** flush file listener. */
  private final List<FlushListener> flushListeners = new ArrayList<>();
169

170 171 172 173
  private final QueryExecutionMetricSet QUERY_EXECUTION_METRICS =
      QueryExecutionMetricSet.getInstance();
  private final QueryResourceMetricSet QUERY_RESOURCE_METRICS =
      QueryResourceMetricSet.getInstance();
L
liuminghui233 已提交
174

175 176 177
  private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS =
      PerformanceOverviewMetrics.getInstance();

178
  @SuppressWarnings("squid:S107")
179
  public TsFileProcessor(
180 181
      String storageGroupName,
      File tsfile,
182
      DataRegionInfo dataRegionInfo,
183
      CloseFileListener closeTsFileCallback,
184
      DataRegion.UpdateEndTimeCallBack updateLatestFlushTimeCallback,
185
      boolean sequence)
J
Jialin Qiao 已提交
186 187
      throws IOException {
    this.storageGroupName = storageGroupName;
188 189 190
    // this.sequence should be assigned at first because `this` will be passed as parameter to other
    // val later
    this.sequence = sequence;
191
    this.tsFileResource = new TsFileResource(tsfile, this);
192
    this.dataRegionInfo = dataRegionInfo;
193
    this.writer = new RestorableTsFileIOWriter(tsfile);
J
Jialin Qiao 已提交
194
    this.updateLatestFlushTimeCallback = updateLatestFlushTimeCallback;
195 196 197
    this.walNode =
        WALManager.getInstance()
            .applyForWALNode(WALManager.getApplicantUniqueId(storageGroupName, sequence));
A
Alan Choo 已提交
198 199
    flushListeners.add(FlushListener.DefaultMemTableFLushListener.INSTANCE);
    flushListeners.add(this.walNode);
200
    closeFileListeners.add(closeTsFileCallback);
A
Alan Choo 已提交
201
    logger.info("create a new tsfile processor {}", tsfile.getAbsolutePath());
J
Jialin Qiao 已提交
202 203
  }

204
  @SuppressWarnings("java:S107") // ignore number of arguments
205 206
  public TsFileProcessor(
      String storageGroupName,
207
      DataRegionInfo dataRegionInfo,
208 209
      TsFileResource tsFileResource,
      CloseFileListener closeUnsealedTsFileProcessor,
210
      DataRegion.UpdateEndTimeCallBack updateLatestFlushTimeCallback,
211
      boolean sequence,
212
      RestorableTsFileIOWriter writer) {
213
    this.storageGroupName = storageGroupName;
214
    this.tsFileResource = tsFileResource;
215
    this.dataRegionInfo = dataRegionInfo;
216 217 218
    this.writer = writer;
    this.updateLatestFlushTimeCallback = updateLatestFlushTimeCallback;
    this.sequence = sequence;
219 220 221
    this.walNode =
        WALManager.getInstance()
            .applyForWALNode(WALManager.getApplicantUniqueId(storageGroupName, sequence));
A
Alan Choo 已提交
222 223
    flushListeners.add(FlushListener.DefaultMemTableFLushListener.INSTANCE);
    flushListeners.add(this.walNode);
224
    closeFileListeners.add(closeUnsealedTsFileProcessor);
A
Alan Choo 已提交
225
    logger.info("reopen a tsfile processor {}", tsFileResource.getTsFile());
226 227
  }

228 229 230 231 232 233 234 235
  /**
   * insert data in an InsertRowNode into the workingMemtable.
   *
   * @param insertRowNode physical plan of insertion
   */
  public void insert(InsertRowNode insertRowNode) throws WriteProcessException {

    if (workMemTable == null) {
236
      long startTime = System.nanoTime();
A
Alan Choo 已提交
237
      createNewWorkingMemTable();
238
      PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(System.nanoTime() - startTime);
239 240 241 242
    }

    long[] memIncrements = null;
    if (enableMemControl) {
243
      long startTime = System.nanoTime();
244
      if (insertRowNode.isAligned()) {
245 246 247 248
        memIncrements =
            checkAlignedMemCostAndAddToTspInfo(
                insertRowNode.getDevicePath().getFullPath(), insertRowNode.getMeasurements(),
                insertRowNode.getDataTypes(), insertRowNode.getValues());
249
      } else {
250 251 252 253
        memIncrements =
            checkMemCostAndAddToTspInfo(
                insertRowNode.getDevicePath().getFullPath(), insertRowNode.getMeasurements(),
                insertRowNode.getDataTypes(), insertRowNode.getValues());
254
      }
255
      PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(System.nanoTime() - startTime);
256 257
    }

258
    long startTime = System.nanoTime();
259
    WALFlushListener walFlushListener;
A
Alan Choo 已提交
260
    try {
261
      walFlushListener = walNode.log(workMemTable.getMemTableId(), insertRowNode);
A
Alan Choo 已提交
262 263 264 265
      if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) {
        throw walFlushListener.getCause();
      }
    } catch (Exception e) {
266
      if (enableMemControl) {
A
Alan Choo 已提交
267
        rollbackMemoryInfo(memIncrements);
268
      }
A
Alan Choo 已提交
269 270 271 272 273
      throw new WriteProcessException(
          String.format(
              "%s: %s write WAL failed",
              storageGroupName, tsFileResource.getTsFile().getAbsolutePath()),
          e);
274
    } finally {
275
      PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(System.nanoTime() - startTime);
276 277
    }

278
    startTime = System.nanoTime();
279 280 281 282 283 284 285 286

    PipeInsertionDataNodeListener.getInstance()
        .listenToInsertNode(
            dataRegionInfo.getDataRegion().getDataRegionId(),
            walFlushListener.getWalEntryHandler(),
            insertRowNode,
            tsFileResource);

287 288 289 290 291 292 293 294 295 296 297 298 299 300 301
    if (insertRowNode.isAligned()) {
      workMemTable.insertAlignedRow(insertRowNode);
    } else {
      workMemTable.insert(insertRowNode);
    }

    // update start time of this memtable
    tsFileResource.updateStartTime(
        insertRowNode.getDeviceID().toStringID(), insertRowNode.getTime());
    // for sequence tsfile, we update the endTime only when the file is prepared to be closed.
    // for unsequence tsfile, we have to update the endTime for each insertion.
    if (!sequence) {
      tsFileResource.updateEndTime(
          insertRowNode.getDeviceID().toStringID(), insertRowNode.getTime());
    }
302 303 304

    tsFileResource.updateProgressIndex(insertRowNode.getProgressIndex());

305
    PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(System.nanoTime() - startTime);
306 307
  }

A
Alan Choo 已提交
308 309 310 311 312
  private void createNewWorkingMemTable() throws WriteProcessException {
    workMemTable = MemTableManager.getInstance().getAvailableMemTable(storageGroupName);
    walNode.onMemTableCreated(workMemTable, tsFileResource.getTsFilePath());
  }

313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
  /**
   * insert batch data of insertTabletPlan into the workingMemtable. The rows to be inserted are in
   * the range [start, end). Null value in each column values will be replaced by the subsequent
   * non-null value, e.g., {1, null, 3, null, 5} will be {1, 3, 5, null, 5}
   *
   * @param insertTabletNode insert a tablet of a device
   * @param start start index of rows to be inserted in insertTabletPlan
   * @param end end index of rows to be inserted in insertTabletPlan
   * @param results result array
   */
  public void insertTablet(
      InsertTabletNode insertTabletNode, int start, int end, TSStatus[] results)
      throws WriteProcessException {

    if (workMemTable == null) {
328
      long startTime = System.nanoTime();
A
Alan Choo 已提交
329
      createNewWorkingMemTable();
330
      PERFORMANCE_OVERVIEW_METRICS.recordCreateMemtableBlockCost(System.nanoTime() - startTime);
331 332 333 334 335
    }

    long[] memIncrements = null;
    try {
      if (enableMemControl) {
336
        long startTime = System.nanoTime();
337
        if (insertTabletNode.isAligned()) {
338 339 340 341 342 343 344 345
          memIncrements =
              checkAlignedMemCostAndAddToTsp(
                  insertTabletNode.getDevicePath().getFullPath(),
                  insertTabletNode.getMeasurements(),
                  insertTabletNode.getDataTypes(),
                  insertTabletNode.getColumns(),
                  start,
                  end);
346
        } else {
347 348 349 350 351 352 353 354
          memIncrements =
              checkMemCostAndAddToTspInfo(
                  insertTabletNode.getDevicePath().getFullPath(),
                  insertTabletNode.getMeasurements(),
                  insertTabletNode.getDataTypes(),
                  insertTabletNode.getColumns(),
                  start,
                  end);
355
        }
356
        PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemoryBlockCost(System.nanoTime() - startTime);
357 358 359 360 361 362 363 364
      }
    } catch (WriteProcessException e) {
      for (int i = start; i < end; i++) {
        results[i] = RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT, e.getMessage());
      }
      throw new WriteProcessException(e);
    }

365
    long startTime = System.nanoTime();
366
    WALFlushListener walFlushListener;
367
    try {
368
      walFlushListener = walNode.log(workMemTable.getMemTableId(), insertTabletNode, start, end);
A
Alan Choo 已提交
369 370
      if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) {
        throw walFlushListener.getCause();
371 372 373 374 375
      }
    } catch (Exception e) {
      for (int i = start; i < end; i++) {
        results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
      }
376
      if (enableMemControl) {
377 378 379
        rollbackMemoryInfo(memIncrements);
      }
      throw new WriteProcessException(e);
380
    } finally {
381
      PERFORMANCE_OVERVIEW_METRICS.recordScheduleWalCost(System.nanoTime() - startTime);
382 383
    }

384
    startTime = System.nanoTime();
385 386 387 388 389 390 391 392

    PipeInsertionDataNodeListener.getInstance()
        .listenToInsertNode(
            dataRegionInfo.getDataRegion().getDataRegionId(),
            walFlushListener.getWalEntryHandler(),
            insertTabletNode,
            tsFileResource);

393 394 395 396 397 398 399 400 401 402 403 404 405 406 407
    try {
      if (insertTabletNode.isAligned()) {
        workMemTable.insertAlignedTablet(insertTabletNode, start, end);
      } else {
        workMemTable.insertTablet(insertTabletNode, start, end);
      }
    } catch (WriteProcessException e) {
      for (int i = start; i < end; i++) {
        results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
      }
      throw new WriteProcessException(e);
    }
    for (int i = start; i < end; i++) {
      results[i] = RpcUtils.SUCCESS_STATUS;
    }
408

409 410 411 412 413 414 415 416
    tsFileResource.updateStartTime(
        insertTabletNode.getDeviceID().toStringID(), insertTabletNode.getTimes()[start]);
    // for sequence tsfile, we update the endTime only when the file is prepared to be closed.
    // for unsequence tsfile, we have to update the endTime for each insertion.
    if (!sequence) {
      tsFileResource.updateEndTime(
          insertTabletNode.getDeviceID().toStringID(), insertTabletNode.getTimes()[end - 1]);
    }
417 418 419

    tsFileResource.updateProgressIndex(insertTabletNode.getProgressIndex());

420
    PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(System.nanoTime() - startTime);
421 422
  }

423
  @SuppressWarnings("squid:S3776") // high Cognitive Complexity
424 425
  private long[] checkMemCostAndAddToTspInfo(
      String deviceId, String[] measurements, TSDataType[] dataTypes, Object[] values)
H
Haonan 已提交
426
      throws WriteProcessException {
427 428 429 430
    // memory of increased PrimitiveArray and TEXT values, e.g., add a long[128], add 128*8
    long memTableIncrement = 0L;
    long textDataIncrement = 0L;
    long chunkMetadataIncrement = 0L;
431
    // get device id
432
    IDeviceID deviceID = getDeviceID(deviceId);
433

434
    for (int i = 0; i < dataTypes.length; i++) {
435
      // skip failed Measurements
436
      if (dataTypes[i] == null || measurements[i] == null) {
437 438
        continue;
      }
439
      if (workMemTable.checkIfChunkDoesNotExist(deviceID, measurements[i])) {
440
        // ChunkMetadataIncrement
441 442
        chunkMetadataIncrement += ChunkMetadata.calculateRamSize(measurements[i], dataTypes[i]);
        memTableIncrement += TVList.tvListArrayMemCost(dataTypes[i]);
443
      } else {
444
        // here currentChunkPointNum >= 1
445
        long currentChunkPointNum = workMemTable.getCurrentTVListSize(deviceID, measurements[i]);
446 447
        memTableIncrement +=
            (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE) == 0
448
                ? TVList.tvListArrayMemCost(dataTypes[i])
449
                : 0;
450 451
      }
      // TEXT data mem size
452
      if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {
453
        textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
454 455 456 457 458 459 460
      }
    }
    updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, textDataIncrement);
    return new long[] {memTableIncrement, textDataIncrement, chunkMetadataIncrement};
  }

  @SuppressWarnings("squid:S3776") // high Cognitive Complexity
461 462
  private long[] checkAlignedMemCostAndAddToTspInfo(
      String deviceId, String[] measurements, TSDataType[] dataTypes, Object[] values)
463 464 465 466 467
      throws WriteProcessException {
    // memory of increased PrimitiveArray and TEXT values, e.g., add a long[128], add 128*8
    long memTableIncrement = 0L;
    long textDataIncrement = 0L;
    long chunkMetadataIncrement = 0L;
468
    // get device id
469
    IDeviceID deviceID = getDeviceID(deviceId);
470
    if (workMemTable.checkIfChunkDoesNotExist(deviceID, AlignedPath.VECTOR_PLACEHOLDER)) {
471
      // for new device of this mem table
472 473 474
      // ChunkMetadataIncrement
      chunkMetadataIncrement +=
          ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, TSDataType.VECTOR)
475 476
              * dataTypes.length;
      memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes);
477 478 479 480 481 482 483 484 485 486
      for (int i = 0; i < dataTypes.length; i++) {
        // skip failed Measurements
        if (dataTypes[i] == null || measurements[i] == null) {
          continue;
        }
        // TEXT data mem size
        if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {
          textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
        }
      }
487
    } else {
488 489
      // for existed device of this mem table
      AlignedWritableMemChunk alignedMemChunk =
490
          ((AlignedWritableMemChunkGroup) workMemTable.getMemTableMap().get(deviceID))
491
              .getAlignedMemChunk();
492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509
      List<TSDataType> dataTypesInTVList = new ArrayList<>();
      for (int i = 0; i < dataTypes.length; i++) {
        // skip failed Measurements
        if (dataTypes[i] == null || measurements[i] == null) {
          continue;
        }

        // extending the column of aligned mem chunk
        if (!alignedMemChunk.containsMeasurement(measurements[i])) {
          memTableIncrement +=
              (alignedMemChunk.alignedListSize() / PrimitiveArrayManager.ARRAY_SIZE + 1)
                  * AlignedTVList.valueListArrayMemCost(dataTypes[i]);
          dataTypesInTVList.add(dataTypes[i]);
        }
        // TEXT data mem size
        if (dataTypes[i] == TSDataType.TEXT && values[i] != null) {
          textDataIncrement += MemUtils.getBinarySize((Binary) values[i]);
        }
510
      }
511 512 513 514
      // here currentChunkPointNum >= 1
      if ((alignedMemChunk.alignedListSize() % PrimitiveArrayManager.ARRAY_SIZE) == 0) {
        dataTypesInTVList.addAll(((AlignedTVList) alignedMemChunk.getTVList()).getTsDataTypes());
        memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypesInTVList);
515 516
      }
    }
517
    updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, textDataIncrement);
518
    return new long[] {memTableIncrement, textDataIncrement, chunkMetadataIncrement};
519 520
  }

521 522 523 524 525 526 527
  private long[] checkMemCostAndAddToTspInfo(
      String deviceId,
      String[] measurements,
      TSDataType[] dataTypes,
      Object[] columns,
      int start,
      int end)
528 529 530 531 532 533 534
      throws WriteProcessException {
    if (start >= end) {
      return new long[] {0, 0, 0};
    }
    long[] memIncrements = new long[3]; // memTable, text, chunk metadata

    // get device id
535
    IDeviceID deviceID = getDeviceID(deviceId);
536

537
    for (int i = 0; i < dataTypes.length; i++) {
538
      // skip failed Measurements
539
      if (dataTypes[i] == null || columns[i] == null || measurements[i] == null) {
540 541
        continue;
      }
542
      updateMemCost(dataTypes[i], measurements[i], deviceID, start, end, memIncrements, columns[i]);
543 544 545 546 547 548 549 550
    }
    long memTableIncrement = memIncrements[0];
    long textDataIncrement = memIncrements[1];
    long chunkMetadataIncrement = memIncrements[2];
    updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, textDataIncrement);
    return memIncrements;
  }

551
  private long[] checkAlignedMemCostAndAddToTsp(
552 553 554 555 556 557 558
      String deviceId,
      String[] measurements,
      TSDataType[] dataTypes,
      Object[] columns,
      int start,
      int end)
      throws WriteProcessException {
559 560 561 562 563 564
    if (start >= end) {
      return new long[] {0, 0, 0};
    }
    long[] memIncrements = new long[3]; // memTable, text, chunk metadata

    // get device id
565
    IDeviceID deviceID = getDeviceID(deviceId);
566

567
    updateAlignedMemCost(dataTypes, deviceID, measurements, start, end, memIncrements, columns);
568 569 570 571 572 573 574
    long memTableIncrement = memIncrements[0];
    long textDataIncrement = memIncrements[1];
    long chunkMetadataIncrement = memIncrements[2];
    updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, textDataIncrement);
    return memIncrements;
  }

575 576 577
  private void updateMemCost(
      TSDataType dataType,
      String measurement,
578
      IDeviceID deviceId,
579 580 581 582
      int start,
      int end,
      long[] memIncrements,
      Object column) {
583 584 585 586 587
    // memIncrements = [memTable, text, chunk metadata] respectively

    if (workMemTable.checkIfChunkDoesNotExist(deviceId, measurement)) {
      // ChunkMetadataIncrement
      memIncrements[2] += ChunkMetadata.calculateRamSize(measurement, dataType);
588 589
      memIncrements[0] +=
          ((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1)
590
              * TVList.tvListArrayMemCost(dataType);
591
    } else {
592
      long currentChunkPointNum = workMemTable.getCurrentTVListSize(deviceId, measurement);
593
      if (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE == 0) {
594 595
        memIncrements[0] +=
            ((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1)
596
                * TVList.tvListArrayMemCost(dataType);
597
      } else {
598
        long acquireArray =
599 600
            (end - start - 1 + (currentChunkPointNum % PrimitiveArrayManager.ARRAY_SIZE))
                / PrimitiveArrayManager.ARRAY_SIZE;
601 602 603
        if (acquireArray != 0) {
          memIncrements[0] += acquireArray * TVList.tvListArrayMemCost(dataType);
        }
604 605
      }
    }
606 607 608 609 610
    // TEXT data size
    if (dataType == TSDataType.TEXT) {
      Binary[] binColumn = (Binary[]) column;
      memIncrements[1] += MemUtils.getBinaryColumnSize(binColumn, start, end);
    }
611 612
  }

613 614
  private void updateAlignedMemCost(
      TSDataType[] dataTypes,
615
      IDeviceID deviceId,
616
      String[] measurementIds,
617 618 619 620 621
      int start,
      int end,
      long[] memIncrements,
      Object[] columns) {
    // memIncrements = [memTable, text, chunk metadata] respectively
622
    if (workMemTable.checkIfChunkDoesNotExist(deviceId, AlignedPath.VECTOR_PLACEHOLDER)) {
623 624
      // ChunkMetadataIncrement
      memIncrements[2] +=
625 626
          dataTypes.length
              * ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, TSDataType.VECTOR);
627 628
      memIncrements[0] +=
          ((end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1)
629
              * AlignedTVList.alignedTvListArrayMemCost(dataTypes);
630 631 632 633 634 635 636 637 638 639 640 641
      for (int i = 0; i < dataTypes.length; i++) {
        TSDataType dataType = dataTypes[i];
        String measurement = measurementIds[i];
        Object column = columns[i];
        if (dataType == null || column == null || measurement == null) {
          continue;
        }
        // TEXT data size
        if (dataType == TSDataType.TEXT) {
          Binary[] binColumn = (Binary[]) columns[i];
          memIncrements[1] += MemUtils.getBinaryColumnSize(binColumn, start, end);
        }
642
      }
643 644 645

    } else {
      AlignedWritableMemChunk alignedMemChunk =
646 647
          ((AlignedWritableMemChunkGroup) workMemTable.getMemTableMap().get(deviceId))
              .getAlignedMemChunk();
648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667
      List<TSDataType> dataTypesInTVList = new ArrayList<>();
      for (int i = 0; i < dataTypes.length; i++) {
        TSDataType dataType = dataTypes[i];
        String measurement = measurementIds[i];
        Object column = columns[i];
        if (dataType == null || column == null || measurement == null) {
          continue;
        }
        // extending the column of aligned mem chunk
        if (!alignedMemChunk.containsMeasurement(measurementIds[i])) {
          memIncrements[0] +=
              (alignedMemChunk.alignedListSize() / PrimitiveArrayManager.ARRAY_SIZE + 1)
                  * AlignedTVList.valueListArrayMemCost(dataType);
          dataTypesInTVList.add(dataType);
        }
        // TEXT data size
        if (dataType == TSDataType.TEXT) {
          Binary[] binColumn = (Binary[]) columns[i];
          memIncrements[1] += MemUtils.getBinaryColumnSize(binColumn, start, end);
        }
668
      }
669 670 671 672 673 674 675 676 677 678
      long acquireArray;
      if (alignedMemChunk.alignedListSize() % PrimitiveArrayManager.ARRAY_SIZE == 0) {
        acquireArray = (end - start) / PrimitiveArrayManager.ARRAY_SIZE + 1L;
      } else {
        acquireArray =
            (end
                    - start
                    - 1
                    + (alignedMemChunk.alignedListSize() % PrimitiveArrayManager.ARRAY_SIZE))
                / PrimitiveArrayManager.ARRAY_SIZE;
679
      }
680 681 682 683
      if (acquireArray != 0) {
        dataTypesInTVList.addAll(((AlignedTVList) alignedMemChunk.getTVList()).getTsDataTypes());
        memIncrements[0] +=
            acquireArray * AlignedTVList.alignedTvListArrayMemCost(dataTypesInTVList);
684 685 686 687
      }
    }
  }

688
  private void updateMemoryInfo(
689
      long memTableIncrement, long chunkMetadataIncrement, long textDataIncrement)
690
      throws WriteProcessException {
691
    memTableIncrement += textDataIncrement;
692
    dataRegionInfo.addStorageGroupMemCost(memTableIncrement);
693
    tsFileProcessorInfo.addTSPMemCost(chunkMetadataIncrement);
694
    if (dataRegionInfo.needToReportToSystem()) {
695
      try {
696
        if (!SystemInfo.getInstance().reportStorageGroupStatus(dataRegionInfo, this)) {
697
          StorageEngine.blockInsertionIfReject(this);
698
        }
H
Haonan 已提交
699
      } catch (WriteProcessRejectException e) {
700
        dataRegionInfo.releaseStorageGroupMemCost(memTableIncrement);
701
        tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement);
702
        SystemInfo.getInstance().resetStorageGroupStatus(dataRegionInfo);
703 704 705 706 707 708 709
        throw e;
      }
    }
    workMemTable.addTVListRamCost(memTableIncrement);
    workMemTable.addTextDataSize(textDataIncrement);
  }

710 711 712 713 714 715
  private void rollbackMemoryInfo(long[] memIncrements) {
    long memTableIncrement = memIncrements[0];
    long textDataIncrement = memIncrements[1];
    long chunkMetadataIncrement = memIncrements[2];

    memTableIncrement += textDataIncrement;
716
    dataRegionInfo.releaseStorageGroupMemCost(memTableIncrement);
717
    tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement);
718
    SystemInfo.getInstance().resetStorageGroupStatus(dataRegionInfo);
719 720 721 722
    workMemTable.releaseTVListRamCost(memTableIncrement);
    workMemTable.releaseTextDataSize(textDataIncrement);
  }

J
Jialin Qiao 已提交
723 724
  /**
   * Delete data which belongs to the timeseries `deviceId.measurementId` and the timestamp of which
725 726 727
   * <= 'timestamp' in the deletion. <br>
   *
   * <p>Delete data in both working MemTable and flushing MemTables.
J
Jialin Qiao 已提交
728
   */
729
  public void deleteDataInMemory(Deletion deletion, Set<PartialPath> devicePaths) {
J
Jialin Qiao 已提交
730
    flushQueryLock.writeLock().lock();
731
    if (logger.isDebugEnabled()) {
732 733
      logger.debug(
          FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName());
734
    }
J
Jialin Qiao 已提交
735 736
    try {
      if (workMemTable != null) {
737 738 739 740 741
        logger.info(
            "[Deletion] Deletion with path: {}, time:{}-{} in workMemTable",
            deletion.getPath(),
            deletion.getStartTime(),
            deletion.getEndTime());
742
        for (PartialPath device : devicePaths) {
743 744
          workMemTable.delete(
              deletion.getPath(), device, deletion.getStartTime(), deletion.getEndTime());
745
        }
J
Jialin Qiao 已提交
746
      }
747
      // flushing memTables are immutable, only record this deletion in these memTables for read
748 749
      if (!flushingMemTables.isEmpty()) {
        modsToMemtable.add(new Pair<>(deletion, flushingMemTables.getLast()));
J
Jialin Qiao 已提交
750 751 752
      }
    } finally {
      flushQueryLock.writeLock().unlock();
753
      if (logger.isDebugEnabled()) {
754 755
        logger.debug(
            FLUSH_QUERY_WRITE_RELEASE, storageGroupName, tsFileResource.getTsFile().getName());
756
      }
J
Jialin Qiao 已提交
757 758 759
    }
  }

760
  public WALFlushListener logDeleteDataNodeInWAL(DeleteDataNode deleteDataNode) {
761 762 763
    return walNode.log(workMemTable.getMemTableId(), deleteDataNode);
  }

764
  public TsFileResource getTsFileResource() {
J
Jialin Qiao 已提交
765 766 767
    return tsFileResource;
  }

768
  public boolean shouldFlush() {
769 770 771
    if (workMemTable == null) {
      return false;
    }
772
    if (workMemTable.shouldFlush()) {
773 774 775 776
      logger.info(
          "The memtable size {} of tsfile {} reaches the mem control threshold",
          workMemTable.memSize(),
          tsFileResource.getTsFile().getAbsolutePath());
777 778 779
      return true;
    }
    if (!enableMemControl && workMemTable.memSize() >= getMemtableSizeThresholdBasedOnSeriesNum()) {
780 781 782 783
      logger.info(
          "The memtable size {} of tsfile {} reaches the threshold",
          workMemTable.memSize(),
          tsFileResource.getTsFile().getAbsolutePath());
784 785 786
      return true;
    }
    if (workMemTable.reachTotalPointNumThreshold()) {
787 788
      logger.info(
          "The avg series points num {} of tsfile {} reaches the threshold",
789
          workMemTable.getTotalPointsNum() / workMemTable.getSeriesNumber(),
J
JackieTien97 已提交
790
          tsFileResource.getTsFile().getAbsolutePath());
791 792 793
      return true;
    }
    return false;
794 795 796
  }

  private long getMemtableSizeThresholdBasedOnSeriesNum() {
797
    return config.getMemtableSizeThreshold();
J
Jialin Qiao 已提交
798 799
  }

800
  public boolean shouldClose() {
J
JackieTien97 已提交
801
    long fileSize = tsFileResource.getTsFileSize();
802 803
    long fileSizeThreshold = sequence ? config.getSeqTsFileSize() : config.getUnSeqTsFileSize();

804
    if (fileSize >= fileSizeThreshold) {
805 806 807 808 809
      logger.info(
          "{} fileSize {} >= fileSizeThreshold {}",
          tsFileResource.getTsFilePath(),
          fileSize,
          fileSizeThreshold);
810 811
    }
    return fileSize >= fileSizeThreshold;
J
Jialin Qiao 已提交
812 813
  }

814
  public void syncClose() {
815 816
    logger.info(
        "Sync close file: {}, will firstly async close it",
J
JackieTien97 已提交
817
        tsFileResource.getTsFile().getAbsolutePath());
J
Jialin Qiao 已提交
818 819 820 821 822 823
    if (shouldClose) {
      return;
    }
    synchronized (flushingMemTables) {
      try {
        asyncClose();
824
        logger.info("Start to wait until file {} is closed", tsFileResource);
825 826 827
        long startTime = System.currentTimeMillis();
        while (!flushingMemTables.isEmpty()) {
          flushingMemTables.wait(60_000);
828
          if (System.currentTimeMillis() - startTime > 60_000 && !flushingMemTables.isEmpty()) {
张凌哲 已提交
829 830
            logger.warn(
                "{} has spent {}s for waiting flushing one memtable; {} left (first: {}). FlushingManager info: {}",
J
JackieTien97 已提交
831
                this.tsFileResource.getTsFile().getAbsolutePath(),
张凌哲 已提交
832
                (System.currentTimeMillis() - startTime) / 1000,
833 834
                flushingMemTables.size(),
                flushingMemTables.getFirst(),
835
                FlushManager.getInstance());
836 837
          }
        }
J
Jialin Qiao 已提交
838
      } catch (InterruptedException e) {
839 840 841 842 843
        logger.error(
            "{}: {} wait close interrupted",
            storageGroupName,
            tsFileResource.getTsFile().getName(),
            e);
J
Jialin Qiao 已提交
844 845 846
        Thread.currentThread().interrupt();
      }
    }
J
JackieTien97 已提交
847
    logger.info("File {} is closed synchronously", tsFileResource.getTsFile().getAbsolutePath());
J
Jialin Qiao 已提交
848 849
  }

S
SilverNarcissus 已提交
850
  /** async close one tsfile, register and close it by another thread */
851
  public void asyncClose() {
J
Jialin Qiao 已提交
852
    flushQueryLock.writeLock().lock();
853
    if (logger.isDebugEnabled()) {
854 855
      logger.debug(
          FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName());
856
    }
J
Jialin Qiao 已提交
857
    try {
858 859 860
      if (logger.isInfoEnabled()) {
        if (workMemTable != null) {
          logger.info(
861
              "{}: flush a working memtable in async close tsfile {}, memtable size: {}, tsfile "
862
                  + "size: {}, plan index: [{}, {}], progress index: {}",
863 864
              storageGroupName,
              tsFileResource.getTsFile().getAbsolutePath(),
J
JackieTien97 已提交
865
              workMemTable.memSize(),
866 867
              tsFileResource.getTsFileSize(),
              workMemTable.getMinPlanIndex(),
868 869
              workMemTable.getMaxPlanIndex(),
              tsFileResource.getMaxProgressIndex());
870
        } else {
871 872 873 874
          logger.info(
              "{}: flush a NotifyFlushMemTable in async close tsfile {}, tsfile size: {}",
              storageGroupName,
              tsFileResource.getTsFile().getAbsolutePath(),
J
JackieTien97 已提交
875
              tsFileResource.getTsFileSize());
876 877 878
        }
      }

J
Jialin Qiao 已提交
879 880 881 882 883
      if (shouldClose) {
        return;
      }
      // when a flush thread serves this TsFileProcessor (because the processor is submitted by
      // registerTsFileProcessor()), the thread will seal the corresponding TsFile and
884
      // execute other cleanup works if "shouldClose == true and flushingMemTables is empty".
J
Jialin Qiao 已提交
885 886 887 888

      // To ensure there must be a flush thread serving this processor after the field `shouldClose`
      // is set true, we need to generate a NotifyFlushMemTable as a signal task and submit it to
      // the FlushManager.
889

890
      // we have to add the memtable into flushingList first and then set the shouldClose tag.
891
      // see https://issues.apache.org/jira/browse/IOTDB-510
892
      IMemTable tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : workMemTable;
893

J
Jialin Qiao 已提交
894
      try {
895
        PipeAgent.runtime().assignSimpleProgressIndexIfNeeded(tsFileResource);
896
        PipeInsertionDataNodeListener.getInstance()
897 898
            .listenToTsFile(
                dataRegionInfo.getDataRegion().getDataRegionId(), tsFileResource, false);
899

900 901
        // When invoke closing TsFile after insert data to memTable, we shouldn't flush until invoke
        // flushing memTable in System module.
902 903
        addAMemtableIntoFlushingList(tmpMemTable);
        logger.info("Memtable {} has been added to flushing list", tmpMemTable);
904 905
        shouldClose = true;
      } catch (Exception e) {
906 907 908 909 910
        logger.error(
            "{}: {} async close failed, because",
            storageGroupName,
            tsFileResource.getTsFile().getName(),
            e);
J
Jialin Qiao 已提交
911 912 913
      }
    } finally {
      flushQueryLock.writeLock().unlock();
914
      if (logger.isDebugEnabled()) {
915 916
        logger.debug(
            FLUSH_QUERY_WRITE_RELEASE, storageGroupName, tsFileResource.getTsFile().getName());
917
      }
J
Jialin Qiao 已提交
918 919 920 921
    }
  }

  /**
张凌哲 已提交
922 923
   * TODO if the flushing thread is too fast, the tmpMemTable.wait() may never wakeup Tips: I am
   * trying to solve this issue by checking whether the table exist before wait()
J
Jialin Qiao 已提交
924 925 926 927
   */
  public void syncFlush() throws IOException {
    IMemTable tmpMemTable;
    flushQueryLock.writeLock().lock();
928
    if (logger.isDebugEnabled()) {
929 930
      logger.debug(
          FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName());
931
    }
J
Jialin Qiao 已提交
932 933
    try {
      tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : workMemTable;
934
      if (logger.isDebugEnabled() && tmpMemTable.isSignalMemTable()) {
935 936 937 938
        logger.debug(
            "{}: {} add a signal memtable into flushing memtable list when sync flush",
            storageGroupName,
            tsFileResource.getTsFile().getName());
J
Jialin Qiao 已提交
939 940 941 942
      }
      addAMemtableIntoFlushingList(tmpMemTable);
    } finally {
      flushQueryLock.writeLock().unlock();
943
      if (logger.isDebugEnabled()) {
944 945
        logger.debug(
            FLUSH_QUERY_WRITE_RELEASE, storageGroupName, tsFileResource.getTsFile().getName());
946
      }
J
Jialin Qiao 已提交
947 948
    }

949
    synchronized (flushingMemTables) {
J
Jialin Qiao 已提交
950 951
      try {
        long startWait = System.currentTimeMillis();
952
        while (flushingMemTables.contains(tmpMemTable)) {
953
          flushingMemTables.wait(1000);
J
Jialin Qiao 已提交
954 955

          if ((System.currentTimeMillis() - startWait) > 60_000) {
956 957
            logger.warn(
                "has waited for synced flushing a memtable in {} for 60 seconds.",
J
JackieTien97 已提交
958
                this.tsFileResource.getTsFile().getAbsolutePath());
J
Jialin Qiao 已提交
959 960 961 962
            startWait = System.currentTimeMillis();
          }
        }
      } catch (InterruptedException e) {
963 964 965 966 967
        logger.error(
            "{}: {} wait flush finished meets error",
            storageGroupName,
            tsFileResource.getTsFile().getName(),
            e);
J
Jialin Qiao 已提交
968 969 970 971 972
        Thread.currentThread().interrupt();
      }
    }
  }

973
  /** put the working memtable into flushing list and set the working memtable to null */
J
Jialin Qiao 已提交
974 975
  public void asyncFlush() {
    flushQueryLock.writeLock().lock();
976
    if (logger.isDebugEnabled()) {
977 978
      logger.debug(
          FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName());
979
    }
J
Jialin Qiao 已提交
980 981 982 983
    try {
      if (workMemTable == null) {
        return;
      }
984 985
      logger.info(
          "Async flush a memtable to tsfile: {}", tsFileResource.getTsFile().getAbsolutePath());
J
Jialin Qiao 已提交
986
      addAMemtableIntoFlushingList(workMemTable);
987
    } catch (Exception e) {
988 989 990 991 992
      logger.error(
          "{}: {} add a memtable into flushing list failed",
          storageGroupName,
          tsFileResource.getTsFile().getName(),
          e);
J
Jialin Qiao 已提交
993 994
    } finally {
      flushQueryLock.writeLock().unlock();
995
      if (logger.isDebugEnabled()) {
996 997
        logger.debug(
            FLUSH_QUERY_WRITE_RELEASE, storageGroupName, tsFileResource.getTsFile().getName());
998
      }
J
Jialin Qiao 已提交
999 1000 1001 1002 1003 1004 1005 1006 1007
    }
  }

  /**
   * this method calls updateLatestFlushTimeCallback and move the given memtable into the flushing
   * queue, set the current working memtable as null and then register the tsfileProcessor into the
   * flushManager again.
   */
  private void addAMemtableIntoFlushingList(IMemTable tobeFlushed) throws IOException {
1008 1009 1010
    Map<String, Long> lastTimeForEachDevice = new HashMap<>();
    if (sequence) {
      lastTimeForEachDevice = tobeFlushed.getMaxTime();
1011 1012 1013 1014 1015 1016 1017 1018 1019
      // If some devices have been removed in MemTable, the number of device in MemTable and
      // tsFileResource will not be the same. And the endTime of these devices in resource will be
      // Long.minValue.
      // In the case, we need to delete the removed devices in tsFileResource.
      if (lastTimeForEachDevice.size() != tsFileResource.getDevices().size()) {
        tsFileResource.deleteRemovedDeviceAndUpdateEndTime(lastTimeForEachDevice);
      } else {
        tsFileResource.updateEndTime(lastTimeForEachDevice);
      }
1020
    }
1021

1022
    for (FlushListener flushListener : flushListeners) {
A
Alan Choo 已提交
1023
      flushListener.onMemTableFlushStarted(tobeFlushed);
1024 1025
    }

1026 1027 1028
    lastWorkMemtableFlushTime = System.currentTimeMillis();
    updateLatestFlushTimeCallback.call(this, lastTimeForEachDevice, lastWorkMemtableFlushTime);

1029 1030 1031
    if (enableMemControl) {
      SystemInfo.getInstance().addFlushingMemTableCost(tobeFlushed.getTVListsRamCost());
    }
J
Jialin Qiao 已提交
1032
    flushingMemTables.addLast(tobeFlushed);
1033 1034 1035
    if (logger.isDebugEnabled()) {
      logger.debug(
          "{}: {} Memtable (signal = {}) is added into the flushing Memtable, queue size = {}",
1036 1037 1038 1039
          storageGroupName,
          tsFileResource.getTsFile().getName(),
          tobeFlushed.isSignalMemTable(),
          flushingMemTables.size());
1040
    }
1041

1042
    if (!(tobeFlushed.isSignalMemTable() || tobeFlushed.isEmpty())) {
1043 1044
      totalMemTableSize += tobeFlushed.memSize();
    }
J
Jialin Qiao 已提交
1045 1046 1047 1048
    workMemTable = null;
    FlushManager.getInstance().registerTsFileProcessor(this);
  }

1049
  /** put back the memtable to MemTablePool and make metadata in writer visible */
J
Jialin Qiao 已提交
1050 1051
  private void releaseFlushedMemTable(IMemTable memTable) {
    flushQueryLock.writeLock().lock();
1052
    if (logger.isDebugEnabled()) {
1053 1054
      logger.debug(
          FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName());
1055
    }
J
Jialin Qiao 已提交
1056 1057
    try {
      writer.makeMetadataVisible();
1058 1059
      if (!flushingMemTables.remove(memTable)) {
        logger.warn(
1060
            "{}: {} put the memtable (signal={}) out of flushingMemtables but it is not in the queue.",
1061 1062 1063
            storageGroupName,
            tsFileResource.getTsFile().getName(),
            memTable.isSignalMemTable());
张凌哲 已提交
1064
      } else if (logger.isDebugEnabled()) {
1065
        logger.debug(
1066 1067
            "{}: {} memtable (signal={}) is removed from the queue. {} left.",
            storageGroupName,
J
JackieTien97 已提交
1068
            tsFileResource.getTsFile().getName(),
1069 1070
            memTable.isSignalMemTable(),
            flushingMemTables.size());
1071
      }
J
Jialin Qiao 已提交
1072
      memTable.release();
1073
      MemTableManager.getInstance().decreaseMemtableNumber();
1074
      if (enableMemControl) {
H
Haonan 已提交
1075
        // reset the mem cost in StorageGroupProcessorInfo
1076
        dataRegionInfo.releaseStorageGroupMemCost(memTable.getTVListsRamCost());
H
Haonan 已提交
1077
        if (logger.isDebugEnabled()) {
1078 1079 1080 1081 1082 1083
          logger.debug(
              "[mem control] {}: {} flush finished, try to reset system memcost, "
                  + "flushing memtable list size: {}",
              storageGroupName,
              tsFileResource.getTsFile().getName(),
              flushingMemTables.size());
H
Haonan 已提交
1084
        }
1085
        // report to System
1086
        SystemInfo.getInstance().resetStorageGroupStatus(dataRegionInfo);
1087
        SystemInfo.getInstance().resetFlushingMemTableCost(memTable.getTVListsRamCost());
1088
      }
1089
      if (logger.isDebugEnabled()) {
1090 1091 1092 1093 1094 1095
        logger.debug(
            "{}: {} flush finished, remove a memtable from flushing list, "
                + "flushing memtable list size: {}",
            storageGroupName,
            tsFileResource.getTsFile().getName(),
            flushingMemTables.size());
1096 1097
      }
    } catch (Exception e) {
J
JackieTien97 已提交
1098
      logger.error("{}: {}", storageGroupName, tsFileResource.getTsFile().getName(), e);
J
Jialin Qiao 已提交
1099 1100
    } finally {
      flushQueryLock.writeLock().unlock();
1101
      if (logger.isDebugEnabled()) {
1102 1103
        logger.debug(
            FLUSH_QUERY_WRITE_RELEASE, storageGroupName, tsFileResource.getTsFile().getName());
1104
      }
J
Jialin Qiao 已提交
1105 1106 1107
    }
  }

1108 1109
  /** This method will synchronize the memTable and release its flushing resources */
  private void syncReleaseFlushedMemTable(IMemTable memTable) {
1110
    synchronized (flushingMemTables) {
1111
      releaseFlushedMemTable(memTable);
1112
      flushingMemTables.notifyAll();
1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123
      if (logger.isDebugEnabled()) {
        logger.debug(
            "{}: {} released a memtable (signal={}), flushingMemtables size ={}",
            storageGroupName,
            tsFileResource.getTsFile().getName(),
            memTable.isSignalMemTable(),
            flushingMemTables.size());
      }
    }
  }

J
Jialin Qiao 已提交
1124 1125 1126 1127
  /**
   * Take the first MemTable from the flushingMemTables and flush it. Called by a flush thread of
   * the flush manager pool
   */
1128
  @SuppressWarnings({"squid:S3776", "squid:S2142"}) // Suppress high Cognitive Complexity warning
张凌哲 已提交
1129
  public void flushOneMemTable() {
1130
    IMemTable memTableToFlush = flushingMemTables.getFirst();
1131

J
Jialin Qiao 已提交
1132 1133
    // signal memtable only may appear when calling asyncClose()
    if (!memTableToFlush.isSignalMemTable()) {
1134 1135 1136 1137 1138 1139 1140 1141 1142
      if (memTableToFlush.isEmpty()) {
        logger.info(
            "This normal memtable is empty, skip flush. {}: {}",
            storageGroupName,
            tsFileResource.getTsFile().getName());
      } else {
        try {
          writer.mark();
          MemTableFlushTask flushTask =
1143 1144 1145 1146 1147
              new MemTableFlushTask(
                  memTableToFlush,
                  writer,
                  storageGroupName,
                  dataRegionInfo.getDataRegion().getDataRegionId());
1148 1149 1150 1151 1152
          flushTask.syncFlushMemTable();
        } catch (Throwable e) {
          if (writer == null) {
            logger.info(
                "{}: {} is closed during flush, abandon flush task",
1153
                storageGroupName,
1154
                tsFileResource.getTsFile().getAbsolutePath());
1155 1156 1157
            synchronized (flushingMemTables) {
              flushingMemTables.notifyAll();
            }
1158
          } else {
1159
            logger.error(
1160
                "{}: {} meet error when flushing a memtable, change system mode to error",
1161
                storageGroupName,
1162
                tsFileResource.getTsFile().getAbsolutePath(),
1163
                e);
1164
            CommonDescriptor.getInstance().getConfig().handleUnrecoverableError();
1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181
            try {
              logger.error(
                  "{}: {} IOTask meets error, truncate the corrupted data",
                  storageGroupName,
                  tsFileResource.getTsFile().getAbsolutePath(),
                  e);
              writer.reset();
            } catch (IOException e1) {
              logger.error(
                  "{}: {} Truncate corrupted data meets error",
                  storageGroupName,
                  tsFileResource.getTsFile().getAbsolutePath(),
                  e1);
            }
            // release resource
            try {
              syncReleaseFlushedMemTable(memTableToFlush);
1182
              // make sure no read will search this file
1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201
              tsFileResource.setTimeIndex(config.getTimeIndexLevel().getTimeIndex());
              // this callback method will register this empty tsfile into TsFileManager
              for (CloseFileListener closeFileListener : closeFileListeners) {
                closeFileListener.onClosed(this);
              }
              // close writer
              writer.close();
              writer = null;
              synchronized (flushingMemTables) {
                flushingMemTables.notifyAll();
              }
            } catch (Exception e1) {
              logger.error(
                  "{}: {} Release resource meets error",
                  storageGroupName,
                  tsFileResource.getTsFile().getAbsolutePath(),
                  e1);
            }
            return;
1202
          }
J
Jialin Qiao 已提交
1203 1204
        }
      }
1205
    }
J
Jialin Qiao 已提交
1206

1207
    try {
1208
      flushQueryLock.writeLock().lock();
1209
      Iterator<Pair<Modification, IMemTable>> iterator = modsToMemtable.iterator();
1210
      while (iterator.hasNext()) {
1211 1212 1213 1214 1215 1216
        Pair<Modification, IMemTable> entry = iterator.next();
        if (entry.right.equals(memTableToFlush)) {
          entry.left.setFileOffset(tsFileResource.getTsFileSize());
          this.tsFileResource.getModFile().write(entry.left);
          tsFileResource.getModFile().close();
          iterator.remove();
1217 1218 1219 1220 1221
          logger.info(
              "[Deletion] Deletion with path: {}, time:{}-{} written when flush memtable",
              entry.left.getPath(),
              ((Deletion) (entry.left)).getStartTime(),
              ((Deletion) (entry.left)).getEndTime());
1222 1223 1224
        }
      }
    } catch (IOException e) {
1225 1226
      logger.error(
          "Meet error when writing into ModificationFile file of {} ",
1227
          tsFileResource.getTsFile().getAbsolutePath(),
1228
          e);
1229 1230
    } finally {
      flushQueryLock.writeLock().unlock();
1231 1232
    }

1233
    if (logger.isDebugEnabled()) {
1234 1235 1236
      logger.debug(
          "{}: {} try get lock to release a memtable (signal={})",
          storageGroupName,
1237
          tsFileResource.getTsFile().getAbsolutePath(),
1238
          memTableToFlush.isSignalMemTable());
1239
    }
1240

J
Jialin Qiao 已提交
1241
    // for sync flush
1242
    syncReleaseFlushedMemTable(memTableToFlush);
J
Jialin Qiao 已提交
1243

1244 1245 1246 1247
    // call flushed listener after memtable is released safely
    for (FlushListener flushListener : flushListeners) {
      flushListener.onMemTableFlushed(memTableToFlush);
    }
1248 1249 1250
    // retry to avoid unnecessary read-only mode
    int retryCnt = 0;
    while (shouldClose && flushingMemTables.isEmpty() && writer != null) {
J
Jialin Qiao 已提交
1251
      try {
1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263
        if (isEmpty()) {
          endEmptyFile();
        } else {
          writer.mark();
          updateCompressionRatio();
          if (logger.isDebugEnabled()) {
            logger.debug(
                "{}: {} flushingMemtables is empty and will close the file",
                storageGroupName,
                tsFileResource.getTsFile().getAbsolutePath());
          }
          endFile();
1264
        }
张凌哲 已提交
1265
        if (logger.isDebugEnabled()) {
1266
          logger.debug("{} flushingMemtables is clear", storageGroupName);
张凌哲 已提交
1267
        }
1268
      } catch (Exception e) {
1269
        logger.error(
1270
            "{}: {} marking or ending file meet error",
1271 1272 1273
            storageGroupName,
            tsFileResource.getTsFile().getAbsolutePath(),
            e);
1274
        // truncate broken metadata
J
Jialin Qiao 已提交
1275 1276 1277
        try {
          writer.reset();
        } catch (IOException e1) {
1278 1279 1280
          logger.error(
              "{}: {} truncate corrupted data meets error",
              storageGroupName,
1281
              tsFileResource.getTsFile().getAbsolutePath(),
1282
              e1);
J
Jialin Qiao 已提交
1283
        }
1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298
        // retry or set read-only
        if (retryCnt < 3) {
          logger.warn(
              "{} meet error when flush FileMetadata to {}, retry it again",
              storageGroupName,
              tsFileResource.getTsFile().getAbsolutePath(),
              e);
          retryCnt++;
          continue;
        } else {
          logger.error(
              "{} meet error when flush FileMetadata to {}, change system mode to error",
              storageGroupName,
              tsFileResource.getTsFile().getAbsolutePath(),
              e);
1299
          CommonDescriptor.getInstance().getConfig().handleUnrecoverableError();
1300 1301
          break;
        }
J
Jialin Qiao 已提交
1302 1303
      }
      // for sync close
1304
      if (logger.isDebugEnabled()) {
1305 1306 1307
        logger.debug(
            "{}: {} try to get flushingMemtables lock.",
            storageGroupName,
1308
            tsFileResource.getTsFile().getAbsolutePath());
1309
      }
J
Jialin Qiao 已提交
1310 1311 1312 1313 1314 1315
      synchronized (flushingMemTables) {
        flushingMemTables.notifyAll();
      }
    }
  }

1316
  private void updateCompressionRatio() {
1317 1318
    try {
      double compressionRatio = ((double) totalMemTableSize) / writer.getPos();
1319 1320 1321 1322 1323 1324
      logger.info(
          "The compression ratio of tsfile {} is {}, totalMemTableSize: {}, the file size: {}",
          writer.getFile().getAbsolutePath(),
          compressionRatio,
          totalMemTableSize,
          writer.getPos());
1325
      String dataRegionId = dataRegionInfo.getDataRegion().getDataRegionId();
1326
      WritingMetrics.getInstance()
1327
          .recordTsFileCompressionRatioOfFlushingMemTable(dataRegionId, compressionRatio);
1328 1329
      CompressionRatio.getInstance().updateRatio(compressionRatio);
    } catch (IOException e) {
1330 1331 1332 1333 1334
      logger.error(
          "{}: {} update compression ratio failed",
          storageGroupName,
          tsFileResource.getTsFile().getName(),
          e);
1335 1336 1337
    }
  }

S
SilverNarcissus 已提交
1338
  /** end file and write some meta */
J
Jialin Qiao 已提交
1339
  private void endFile() throws IOException, TsFileProcessorException {
1340
    logger.info("Start to end file {}", tsFileResource);
J
Jialin Qiao 已提交
1341
    long closeStartTime = System.currentTimeMillis();
H
HTHou 已提交
1342
    writer.endFile();
1343
    tsFileResource.serialize();
1344
    logger.info("Ended file {}", tsFileResource);
J
Jialin Qiao 已提交
1345 1346 1347

    // remove this processor from Closing list in StorageGroupProcessor,
    // mark the TsFileResource closed, no need writer anymore
1348 1349 1350
    for (CloseFileListener closeFileListener : closeFileListeners) {
      closeFileListener.onClosed(this);
    }
J
Jialin Qiao 已提交
1351

1352 1353
    if (enableMemControl) {
      tsFileProcessorInfo.clear();
1354
      dataRegionInfo.closeTsFileProcessorAndReportToSystem(this);
1355
    }
J
Jialin Qiao 已提交
1356 1357
    if (logger.isInfoEnabled()) {
      long closeEndTime = System.currentTimeMillis();
1358
      logger.info(
1359
          "Database {} close the file {}, TsFile size is {}, "
J
Jialin Qiao 已提交
1360
              + "time consumption of flushing metadata is {}ms",
1361 1362
          storageGroupName,
          tsFileResource.getTsFile().getAbsoluteFile(),
1363
          writer.getFile().length(),
J
Jialin Qiao 已提交
1364 1365
          closeEndTime - closeStartTime);
    }
1366 1367

    writer = null;
J
Jialin Qiao 已提交
1368 1369
  }

1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391
  /** end empty file and remove it from file system */
  private void endEmptyFile() throws TsFileProcessorException, IOException {
    logger.info("Start to end empty file {}", tsFileResource);

    // remove this processor from Closing list in DataRegion,
    // mark the TsFileResource closed, no need writer anymore
    writer.close();
    for (CloseFileListener closeFileListener : closeFileListeners) {
      closeFileListener.onClosed(this);
    }
    if (enableMemControl) {
      tsFileProcessorInfo.clear();
      dataRegionInfo.closeTsFileProcessorAndReportToSystem(this);
    }
    logger.info(
        "Storage group {} close and remove empty file {}",
        storageGroupName,
        tsFileResource.getTsFile().getAbsoluteFile());

    writer = null;
  }

L
lta 已提交
1392
  public boolean isManagedByFlushManager() {
J
Jialin Qiao 已提交
1393 1394 1395
    return managedByFlushManager;
  }

1396 1397 1398 1399
  public void setManagedByFlushManager(boolean managedByFlushManager) {
    this.managedByFlushManager = managedByFlushManager;
  }

1400 1401 1402 1403 1404 1405 1406 1407 1408 1409
  /** sync method */
  public boolean isMemtableNotNull() {
    flushQueryLock.writeLock().lock();
    try {
      return workMemTable != null;
    } finally {
      flushQueryLock.writeLock().unlock();
    }
  }

S
SilverNarcissus 已提交
1410
  /** close this tsfile */
J
Jialin Qiao 已提交
1411 1412
  public void close() throws TsFileProcessorException {
    try {
1413
      // when closing resource file, its corresponding mod file is also closed.
J
Jialin Qiao 已提交
1414 1415 1416 1417 1418 1419
      tsFileResource.close();
    } catch (IOException e) {
      throw new TsFileProcessorException(e);
    }
  }

L
lta 已提交
1420
  public int getFlushingMemTableSize() {
J
Jialin Qiao 已提交
1421 1422 1423
    return flushingMemTables.size();
  }

1424 1425 1426 1427
  RestorableTsFileIOWriter getWriter() {
    return writer;
  }

L
lta 已提交
1428
  public String getStorageGroupName() {
J
Jialin Qiao 已提交
1429 1430 1431 1432 1433 1434 1435
    return storageGroupName;
  }

  /**
   * get the chunk(s) in the memtable (one from work memtable and the other ones in flushing
   * memtables and then compact them into one TimeValuePairSorter). Then get the related
   * ChunkMetadata of data on disk.
1436 1437
   *
   * @param seriesPaths selected paths
J
Jialin Qiao 已提交
1438
   */
1439
  public void query(
1440 1441 1442 1443
      List<PartialPath> seriesPaths,
      QueryContext context,
      List<TsFileResource> tsfileResourcesForQuery)
      throws IOException {
L
liuminghui233 已提交
1444
    long startTime = System.nanoTime();
J
Jialin Qiao 已提交
1445
    try {
L
liuminghui233 已提交
1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462
      Map<PartialPath, List<IChunkMetadata>> pathToChunkMetadataListMap = new HashMap<>();
      Map<PartialPath, List<ReadOnlyMemChunk>> pathToReadOnlyMemChunkMap = new HashMap<>();

      flushQueryLock.readLock().lock();
      try {
        for (PartialPath seriesPath : seriesPaths) {
          List<ReadOnlyMemChunk> readOnlyMemChunks = new ArrayList<>();
          for (IMemTable flushingMemTable : flushingMemTables) {
            if (flushingMemTable.isSignalMemTable()) {
              continue;
            }
            ReadOnlyMemChunk memChunk =
                flushingMemTable.query(
                    seriesPath, context.getQueryTimeLowerBound(), modsToMemtable);
            if (memChunk != null) {
              readOnlyMemChunks.add(memChunk);
            }
1463
          }
L
liuminghui233 已提交
1464 1465 1466 1467 1468 1469
          if (workMemTable != null) {
            ReadOnlyMemChunk memChunk =
                workMemTable.query(seriesPath, context.getQueryTimeLowerBound(), null);
            if (memChunk != null) {
              readOnlyMemChunks.add(memChunk);
            }
1470
          }
J
Jialin Qiao 已提交
1471

L
liuminghui233 已提交
1472 1473 1474
          List<IChunkMetadata> chunkMetadataList =
              ResourceByPathUtils.getResourceInstance(seriesPath)
                  .getVisibleMetadataListFromWriter(writer, tsFileResource, context);
J
JackieTien97 已提交
1475

L
liuminghui233 已提交
1476 1477 1478 1479 1480
          // get in memory data
          if (!readOnlyMemChunks.isEmpty() || !chunkMetadataList.isEmpty()) {
            pathToReadOnlyMemChunkMap.put(seriesPath, readOnlyMemChunks);
            pathToChunkMetadataListMap.put(seriesPath, chunkMetadataList);
          }
1481
        }
L
liuminghui233 已提交
1482 1483 1484
      } catch (QueryProcessException | MetadataException e) {
        logger.error(
            "{}: {} get ReadOnlyMemChunk has error",
1485
            storageGroupName,
L
liuminghui233 已提交
1486 1487 1488
            tsFileResource.getTsFile().getName(),
            e);
      } finally {
1489 1490 1491
        QUERY_RESOURCE_METRICS.recordQueryResourceNum(FLUSHING_MEMTABLE, flushingMemTables.size());
        QUERY_RESOURCE_METRICS.recordQueryResourceNum(
            WORKING_MEMTABLE, workMemTable != null ? 1 : 0);
L
liuminghui233 已提交
1492 1493 1494 1495 1496 1497 1498 1499

        flushQueryLock.readLock().unlock();
        if (logger.isDebugEnabled()) {
          logger.debug(
              "{}: {} release flushQueryLock",
              storageGroupName,
              tsFileResource.getTsFile().getName());
        }
1500
      }
1501

L
liuminghui233 已提交
1502 1503 1504 1505 1506 1507
      if (!pathToReadOnlyMemChunkMap.isEmpty() || !pathToChunkMetadataListMap.isEmpty()) {
        tsfileResourcesForQuery.add(
            new TsFileResource(
                pathToReadOnlyMemChunkMap, pathToChunkMetadataListMap, tsFileResource));
      }
    } finally {
1508
      QUERY_EXECUTION_METRICS.recordExecutionCost(
1509
          GET_QUERY_RESOURCE_FROM_MEM, System.nanoTime() - startTime);
1510
    }
J
Jialin Qiao 已提交
1511 1512
  }

1513 1514 1515 1516 1517 1518 1519
  public long getTimeRangeId() {
    return timeRangeId;
  }

  public void setTimeRangeId(long timeRangeId) {
    this.timeRangeId = timeRangeId;
  }
1520

S
SilverNarcissus 已提交
1521
  /** release resource of a memtable */
1522 1523 1524
  public void putMemTableBackAndClose() throws TsFileProcessorException {
    if (workMemTable != null) {
      workMemTable.release();
1525
      workMemTable = null;
1526 1527 1528 1529 1530 1531 1532
    }
    try {
      writer.close();
    } catch (IOException e) {
      throw new TsFileProcessorException(e);
    }
  }
1533

1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545
  public TsFileProcessorInfo getTsFileProcessorInfo() {
    return tsFileProcessorInfo;
  }

  public void setTsFileProcessorInfo(TsFileProcessorInfo tsFileProcessorInfo) {
    this.tsFileProcessorInfo = tsFileProcessorInfo;
  }

  public long getWorkMemTableRamCost() {
    return workMemTable != null ? workMemTable.getTVListsRamCost() : 0;
  }

1546 1547 1548 1549 1550
  /** Return Long.MAX_VALUE if workMemTable is null */
  public long getWorkMemTableCreatedTime() {
    return workMemTable != null ? workMemTable.getCreatedTime() : Long.MAX_VALUE;
  }

1551 1552 1553 1554
  public long getLastWorkMemtableFlushTime() {
    return lastWorkMemtableFlushTime;
  }

1555 1556 1557 1558
  public boolean isSequence() {
    return sequence;
  }

1559 1560
  public void setWorkMemTableShouldFlush() {
    workMemTable.setShouldFlush();
1561 1562
  }

1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577
  public void addFlushListener(FlushListener listener) {
    flushListeners.add(listener);
  }

  public void addCloseFileListener(CloseFileListener listener) {
    closeFileListeners.add(listener);
  }

  public void addFlushListeners(Collection<FlushListener> listeners) {
    flushListeners.addAll(listeners);
  }

  public void addCloseFileListeners(Collection<CloseFileListener> listeners) {
    closeFileListeners.addAll(listeners);
  }
1578 1579

  public void submitAFlushTask() {
1580
    this.dataRegionInfo.getDataRegion().submitAFlushTaskWhenShouldFlush(this);
1581 1582 1583 1584 1585
  }

  public boolean alreadyMarkedClosing() {
    return shouldClose;
  }
1586

1587 1588
  private IDeviceID getDeviceID(String deviceId) {
    return DeviceIDFactory.getInstance().getDeviceID(deviceId);
1589 1590
  }

1591 1592 1593 1594 1595
  public boolean isEmpty() {
    return totalMemTableSize == 0
        && (workMemTable == null || workMemTable.getTotalPointsNum() == 0);
  }

1596 1597 1598 1599
  @TestOnly
  public IMemTable getWorkMemTable() {
    return workMemTable;
  }
1600 1601 1602 1603 1604

  @TestOnly
  public ConcurrentLinkedDeque<IMemTable> getFlushingMemTable() {
    return flushingMemTables;
  }
J
Jialin Qiao 已提交
1605
}