tqRestore.c 5.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tq.h"

18
static int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle);
19 20 21 22 23
static int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList);

// this function should be executed by stream threads.
// there is a case that the WAL increases more fast than the restore procedure, and this restore procedure
// will not stop eventually.
24 25 26
int tqStreamTasksScanWal(STQ* pTq) {
  int32_t vgId = TD_VID(pTq->pVnode);
  SStreamMeta* pMeta = pTq->pStreamMeta;
27
  int64_t st = taosGetTimestampMs();
28

29
  while (1) {
30
    tqInfo("vgId:%d continue check if data in wal are available", vgId);
31 32

    // check all restore tasks
33 34
    bool allFull = true;
    streamTaskReplayWal(pTq->pStreamMeta, pTq->pOffsetStore, &allFull);
35

36
    int32_t times = 0;
37

38 39 40 41
    if (allFull) {
      taosWLockLatch(&pMeta->lock);
      pMeta->walScan -= 1;
      times = pMeta->walScan;
42

43 44 45 46
      if (pMeta->walScan <= 0) {
        taosWUnLockLatch(&pMeta->lock);
        break;
      }
47

48 49 50
      taosWUnLockLatch(&pMeta->lock);
      tqInfo("vgId:%d scan wal for stream tasks for %d times", vgId, times);
    }
51 52
  }

53 54 55 56 57 58
  double el = (taosGetTimestampMs() - st) / 1000.0;
  tqInfo("vgId:%d scan wal for stream tasks completed, elapsed time:%.2f sec", vgId, el);

  // restore wal scan flag
//  atomic_store_8(&pTq->pStreamMeta->walScan, 0);
  return 0;
59 60
}

61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
//int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList) {
//  int32_t numOfTask = taosArrayGetSize(pTaskList);
//  if (numOfTask <= 0)  {
//    return TSDB_CODE_SUCCESS;
//  }
//
//  // todo: add lock
//  for (int32_t i = 0; i < numOfTask; ++i) {
//    SStreamTask* pTask = taosArrayGetP(pTaskList, i);
//    tqDebug("vgId:%d transfer s-task:%s state restore -> ready, checkpoint:%" PRId64 " checkpoint id:%" PRId64,
//            pStreamMeta->vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->chkInfo.id);
//    taosHashRemove(pStreamMeta->pWalReadTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
//
//    // NOTE: do not change the following order
//    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
//    taosHashPut(pStreamMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES);
//  }
//
//  return TSDB_CODE_SUCCESS;
//}

int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle) {
  void*   pIter = NULL;
  int32_t vgId = pStreamMeta->vgId;

  *pScanIdle = true;

  bool allWalChecked = true;
  tqDebug("vgId:%d start to check wal to extract new submit block", vgId);
90 91

  while (1) {
92
    pIter = taosHashIterate(pStreamMeta->pTasks, pIter);
93 94 95 96 97 98 99 100 101
    if (pIter == NULL) {
      break;
    }

    SStreamTask* pTask = *(SStreamTask**)pIter;
    if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
      continue;
    }

102 103 104 105
    if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE ||
        pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
      tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr,
              pTask->status.taskStatus);
106 107 108 109 110 111 112 113
      continue;
    }

    // check if offset value exists
    char key[128] = {0};
    createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId);

    if (tInputQueueIsFull(pTask)) {
114
      tqDebug("vgId:%d s-task:%s input queue is full, do nothing", vgId, pTask->id.idStr);
115 116 117
      continue;
    }

118 119
    *pScanIdle = false;

120 121
    // check if offset value exists
    STqOffset* pOffset = tqOffsetRead(pOffsetStore, key);
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
    ASSERT(pOffset != NULL);

    // seek the stored version and extract data from WAL
    int32_t code = walReadSeekVer(pTask->exec.pWalReader, pOffset->val.version);
    if (code != TSDB_CODE_SUCCESS) {  // no data in wal, quit
      continue;
    }

    // append the data for the stream
    tqDebug("vgId:%d wal reader seek to ver:%" PRId64 " %s", vgId, pOffset->val.version, pTask->id.idStr);

    SPackedData packData = {0};
    code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData);
    if (code != TSDB_CODE_SUCCESS) {  // failed, continue
      continue;
    }

    SStreamDataSubmit2* p = streamDataSubmitNew(packData, STREAM_INPUT__DATA_SUBMIT);
    if (p == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      tqError("%s failed to create data submit for stream since out of memory", pTask->id.idStr);
      continue;
    }

    allWalChecked = false;

    tqDebug("s-task:%s submit data extracted from WAL", pTask->id.idStr);
    code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)p, packData.ver);
    if (code == TSDB_CODE_SUCCESS) {
      pOffset->val.version = walReaderGetCurrentVer(pTask->exec.pWalReader);
      tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr,
              pOffset->val.version);
154
    } else {
155
      // do nothing
156
    }
157 158 159

    streamDataSubmitDestroy(p);
    taosFreeQitem(p);
160 161
  }

162 163 164
  if (allWalChecked) {
    *pScanIdle = true;
  }
165 166 167
  return 0;
}