tqRestore.c 5.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tq.h"

18
static int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle);
19 20 21 22

// this function should be executed by stream threads.
// there is a case that the WAL increases more fast than the restore procedure, and this restore procedure
// will not stop eventually.
23 24 25
int tqStreamTasksScanWal(STQ* pTq) {
  int32_t vgId = TD_VID(pTq->pVnode);
  SStreamMeta* pMeta = pTq->pStreamMeta;
26
  int64_t st = taosGetTimestampMs();
27

28
  while (1) {
29 30
    int32_t scan = pMeta->walScan;
    tqDebug("vgId:%d continue check if data in wal are available, scan:%d", vgId, scan);
31 32

    // check all restore tasks
33
    bool shouldIdle = true;
34
    doCreateReqsByScanWal(pTq->pStreamMeta, pTq->pOffsetStore, &shouldIdle);
35

36
    int32_t times = 0;
37

38
    if (shouldIdle) {
39 40 41
      taosWLockLatch(&pMeta->lock);
      pMeta->walScan -= 1;
      times = pMeta->walScan;
42

43 44
      ASSERT(pMeta->walScan >= 0);

45 46 47 48
      if (pMeta->walScan <= 0) {
        taosWUnLockLatch(&pMeta->lock);
        break;
      }
49

50
      taosWUnLockLatch(&pMeta->lock);
51
      tqDebug("vgId:%d scan wal for stream tasks for %d times", vgId, times);
52
    }
53 54
  }

55 56
  int64_t el = (taosGetTimestampMs() - st);
  tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%"PRId64" ms", vgId, el);
57
  return 0;
58 59
}

60 61
static SArray* extractTaskIdList(SStreamMeta* pStreamMeta, int32_t numOfTasks) {
  SArray* pTaskIdList = taosArrayInit(numOfTasks, sizeof(int32_t));
62
  void*   pIter = NULL;
63

64 65
  taosWLockLatch(&pStreamMeta->lock);
  while(1) {
66
    pIter = taosHashIterate(pStreamMeta->pTasks, pIter);
67 68 69 70 71
    if (pIter == NULL) {
      break;
    }

    SStreamTask* pTask = *(SStreamTask**)pIter;
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
    taosArrayPush(pTaskIdList, &pTask->id.taskId);
  }

  taosWUnLockLatch(&pStreamMeta->lock);
  return pTaskIdList;
}

int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle) {
  *pScanIdle = true;
  bool    noNewDataInWal = true;
  int32_t vgId = pStreamMeta->vgId;

  int32_t numOfTasks = taosHashGetSize(pStreamMeta->pTasks);
  if (numOfTasks == 0) {
    return TSDB_CODE_SUCCESS;
  }

  tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks);
  SArray* pTaskIdList = extractTaskIdList(pStreamMeta, numOfTasks);

  for (int32_t i = 0; i < numOfTasks; ++i) {
    int32_t*     pTaskId = taosArrayGet(pTaskIdList, i);
    SStreamTask* pTask = streamMetaAcquireTask(pStreamMeta, *pTaskId);
    if (pTask == NULL) {
      continue;
    }

H
Haojun Liao 已提交
99 100
    int32_t status = pTask->status.taskStatus;
    if ((pTask->taskLevel != TASK_LEVEL__SOURCE) || (status == TASK_STATUS__DROPPING)) {
101
      streamMetaReleaseTask(pStreamMeta, pTask);
102 103 104
      continue;
    }

H
Haojun Liao 已提交
105 106
    if (status == TASK_STATUS__RECOVER_PREPARE || status == TASK_STATUS__WAIT_DOWNSTREAM) {
      tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, status);
107
      streamMetaReleaseTask(pStreamMeta, pTask);
108 109 110 111 112 113 114 115
      continue;
    }

    // check if offset value exists
    char key[128] = {0};
    createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId);

    if (tInputQueueIsFull(pTask)) {
116
      tqDebug("vgId:%d s-task:%s input queue is full, do nothing", vgId, pTask->id.idStr);
117
      streamMetaReleaseTask(pStreamMeta, pTask);
118 119 120
      continue;
    }

121 122
    *pScanIdle = false;

123 124
    // check if offset value exists
    STqOffset* pOffset = tqOffsetRead(pOffsetStore, key);
125 126 127 128 129
    ASSERT(pOffset != NULL);

    // seek the stored version and extract data from WAL
    int32_t code = walReadSeekVer(pTask->exec.pWalReader, pOffset->val.version);
    if (code != TSDB_CODE_SUCCESS) {  // no data in wal, quit
130
      streamMetaReleaseTask(pStreamMeta, pTask);
131 132 133 134 135 136 137 138 139
      continue;
    }

    // append the data for the stream
    tqDebug("vgId:%d wal reader seek to ver:%" PRId64 " %s", vgId, pOffset->val.version, pTask->id.idStr);

    SPackedData packData = {0};
    code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData);
    if (code != TSDB_CODE_SUCCESS) {  // failed, continue
140
      streamMetaReleaseTask(pStreamMeta, pTask);
141 142 143 144 145 146 147
      continue;
    }

    SStreamDataSubmit2* p = streamDataSubmitNew(packData, STREAM_INPUT__DATA_SUBMIT);
    if (p == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      tqError("%s failed to create data submit for stream since out of memory", pTask->id.idStr);
148
      streamMetaReleaseTask(pStreamMeta, pTask);
149 150 151
      continue;
    }

152
    noNewDataInWal = false;
153 154 155 156 157 158 159

    tqDebug("s-task:%s submit data extracted from WAL", pTask->id.idStr);
    code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)p, packData.ver);
    if (code == TSDB_CODE_SUCCESS) {
      pOffset->val.version = walReaderGetCurrentVer(pTask->exec.pWalReader);
      tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr,
              pOffset->val.version);
160
    } else {
161
      // do nothing
162
    }
163 164 165

    streamDataSubmitDestroy(p);
    taosFreeQitem(p);
166
    streamMetaReleaseTask(pStreamMeta, pTask);
167 168
  }

169 170
  // all wal are checked, and no new data available in wal.
  if (noNewDataInWal) {
171 172
    *pScanIdle = true;
  }
173 174

  taosArrayDestroy(pTaskIdList);
175 176 177
  return 0;
}