tqRestore.c 4.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tq.h"

18
static int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle);
19 20 21 22 23
static int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList);

// this function should be executed by stream threads.
// there is a case that the WAL increases more fast than the restore procedure, and this restore procedure
// will not stop eventually.
24 25 26
int tqStreamTasksScanWal(STQ* pTq) {
  int32_t vgId = TD_VID(pTq->pVnode);
  SStreamMeta* pMeta = pTq->pStreamMeta;
27
  int64_t st = taosGetTimestampMs();
28

29
  while (1) {
30
    tqInfo("vgId:%d continue check if data in wal are available", vgId);
31 32

    // check all restore tasks
33 34
    bool allFull = true;
    streamTaskReplayWal(pTq->pStreamMeta, pTq->pOffsetStore, &allFull);
35

36
    int32_t times = 0;
37

38 39 40 41
    if (allFull) {
      taosWLockLatch(&pMeta->lock);
      pMeta->walScan -= 1;
      times = pMeta->walScan;
42

43 44 45 46
      if (pMeta->walScan <= 0) {
        taosWUnLockLatch(&pMeta->lock);
        break;
      }
47

48 49 50
      taosWUnLockLatch(&pMeta->lock);
      tqInfo("vgId:%d scan wal for stream tasks for %d times", vgId, times);
    }
51 52
  }

53 54 55
  double el = (taosGetTimestampMs() - st) / 1000.0;
  tqInfo("vgId:%d scan wal for stream tasks completed, elapsed time:%.2f sec", vgId, el);
  return 0;
56 57
}

58 59 60 61 62 63 64 65
int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle) {
  void*   pIter = NULL;
  int32_t vgId = pStreamMeta->vgId;

  *pScanIdle = true;

  bool allWalChecked = true;
  tqDebug("vgId:%d start to check wal to extract new submit block", vgId);
66 67

  while (1) {
68
    pIter = taosHashIterate(pStreamMeta->pTasks, pIter);
69 70 71 72 73 74 75 76 77
    if (pIter == NULL) {
      break;
    }

    SStreamTask* pTask = *(SStreamTask**)pIter;
    if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
      continue;
    }

78 79
    int8_t status = pTask->status.taskStatus;
    if (status == TASK_STATUS__RECOVER_PREPARE || status == TASK_STATUS__WAIT_DOWNSTREAM) {
80 81
      tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr,
              pTask->status.taskStatus);
82 83 84 85 86 87 88 89
      continue;
    }

    // check if offset value exists
    char key[128] = {0};
    createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId);

    if (tInputQueueIsFull(pTask)) {
90
      tqDebug("vgId:%d s-task:%s input queue is full, do nothing", vgId, pTask->id.idStr);
91 92 93
      continue;
    }

94 95
    *pScanIdle = false;

96 97
    // check if offset value exists
    STqOffset* pOffset = tqOffsetRead(pOffsetStore, key);
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
    ASSERT(pOffset != NULL);

    // seek the stored version and extract data from WAL
    int32_t code = walReadSeekVer(pTask->exec.pWalReader, pOffset->val.version);
    if (code != TSDB_CODE_SUCCESS) {  // no data in wal, quit
      continue;
    }

    // append the data for the stream
    tqDebug("vgId:%d wal reader seek to ver:%" PRId64 " %s", vgId, pOffset->val.version, pTask->id.idStr);

    SPackedData packData = {0};
    code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData);
    if (code != TSDB_CODE_SUCCESS) {  // failed, continue
      continue;
    }

    SStreamDataSubmit2* p = streamDataSubmitNew(packData, STREAM_INPUT__DATA_SUBMIT);
    if (p == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      tqError("%s failed to create data submit for stream since out of memory", pTask->id.idStr);
      continue;
    }

    allWalChecked = false;

    tqDebug("s-task:%s submit data extracted from WAL", pTask->id.idStr);
    code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)p, packData.ver);
    if (code == TSDB_CODE_SUCCESS) {
      pOffset->val.version = walReaderGetCurrentVer(pTask->exec.pWalReader);
      tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr,
              pOffset->val.version);
130
    } else {
131
      // do nothing
132
    }
133 134 135

    streamDataSubmitDestroy(p);
    taosFreeQitem(p);
136 137
  }

138 139 140
  if (allWalChecked) {
    *pScanIdle = true;
  }
141 142 143
  return 0;
}