streamMeta.c 14.6 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "executor.h"
17
#include "streamBackendRocksdb.h"
18
#include "streamInt.h"
dengyihao's avatar
dengyihao 已提交
19
#include "tref.h"
L
Liu Jicong 已提交
20
#include "ttimer.h"
L
Liu Jicong 已提交
21

dengyihao's avatar
dengyihao 已提交
22
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
dengyihao's avatar
dengyihao 已提交
23
int32_t             streamBackendId = 0;
Y
yihaoDeng 已提交
24
int32_t             streamBackendCfWrapperId = 0;
Y
yihaoDeng 已提交
25 26

static void streamMetaEnvInit() {
Y
yihaoDeng 已提交
27
  streamBackendId = taosOpenRef(64, streamBackendCleanup);
Y
yihaoDeng 已提交
28
  streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup);
Y
yihaoDeng 已提交
29
}
dengyihao's avatar
dengyihao 已提交
30 31

void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
Y
yihaoDeng 已提交
32 33
void streamMetaCleanup() {
  taosCloseRef(streamBackendId);
Y
yihaoDeng 已提交
34
  taosCloseRef(streamBackendCfWrapperId);
Y
yihaoDeng 已提交
35
}
dengyihao's avatar
dengyihao 已提交
36

37
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) {
dengyihao's avatar
dengyihao 已提交
38
  int32_t      code = -1;
L
Liu Jicong 已提交
39 40 41 42 43
  SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
  if (pMeta == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
44

L
Liu Jicong 已提交
45 46
  int32_t len = strlen(path) + 20;
  char*   streamPath = taosMemoryCalloc(1, len);
47
  sprintf(streamPath, "%s/%s", path, "stream");
48
  pMeta->path = taosStrdup(streamPath);
49
  if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) {
L
Liu Jicong 已提交
50 51
    goto _err;
  }
dengyihao's avatar
dengyihao 已提交
52
  memset(streamPath, 0, len);
L
Liu Jicong 已提交
53

L
Liu Jicong 已提交
54
  sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints");
dengyihao's avatar
dengyihao 已提交
55
  code = taosMulModeMkDir(streamPath, 0755);
dengyihao's avatar
dengyihao 已提交
56 57 58 59
  if (code != 0) {
    terrno = TAOS_SYSTEM_ERROR(code);
    goto _err;
  }
60

61
  if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
L
Liu Jicong 已提交
62 63 64
    goto _err;
  }

65
  if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) {
L
Liu Jicong 已提交
66 67 68
    goto _err;
  }

69
  _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
H
Haojun Liao 已提交
70
  pMeta->pTasks = taosHashInit(64, fp, true, HASH_NO_LOCK);
L
Liu Jicong 已提交
71 72 73 74
  if (pMeta->pTasks == NULL) {
    goto _err;
  }

75 76 77 78 79 80 81
  // task list
  pMeta->pTaskList = taosArrayInit(4, sizeof(int32_t));
  if (pMeta->pTaskList == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

L
Liu Jicong 已提交
82 83 84 85
  if (streamMetaBegin(pMeta) < 0) {
    goto _err;
  }

H
Haojun Liao 已提交
86
  pMeta->walScanCounter = 0;
87
  pMeta->vgId = vgId;
L
Liu Jicong 已提交
88 89
  pMeta->ahandle = ahandle;
  pMeta->expandFunc = expandFunc;
dengyihao's avatar
dengyihao 已提交
90

dengyihao's avatar
dengyihao 已提交
91 92 93
  memset(streamPath, 0, len);
  sprintf(streamPath, "%s/%s", pMeta->path, "state");
  code = taosMulModeMkDir(streamPath, 0755);
dengyihao's avatar
dengyihao 已提交
94 95 96 97 98
  if (code != 0) {
    terrno = TAOS_SYSTEM_ERROR(code);
    goto _err;
  }

dengyihao's avatar
dengyihao 已提交
99
  pMeta->streamBackend = streamBackendInit(streamPath);
dengyihao's avatar
dengyihao 已提交
100 101 102
  if (pMeta->streamBackend == NULL) {
    goto _err;
  }
dengyihao's avatar
dengyihao 已提交
103
  pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
Y
yihaoDeng 已提交
104 105
  pMeta->pTaskBackendUnique =
      taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
dengyihao's avatar
dengyihao 已提交
106

dengyihao's avatar
dengyihao 已提交
107
  taosMemoryFree(streamPath);
dengyihao's avatar
dengyihao 已提交
108

109
  taosInitRWLatch(&pMeta->lock);
Y
yihaoDeng 已提交
110 111
  taosThreadMutexInit(&pMeta->backendMutex, NULL);

112
  return pMeta;
L
Liu Jicong 已提交
113

L
Liu Jicong 已提交
114
_err:
dengyihao's avatar
dengyihao 已提交
115
  taosMemoryFree(streamPath);
L
Liu Jicong 已提交
116
  taosMemoryFree(pMeta->path);
L
Liu Jicong 已提交
117
  if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
118
  if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
L
Liu Jicong 已提交
119
  if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
120
  if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb);
L
Liu Jicong 已提交
121
  if (pMeta->db) tdbClose(pMeta->db);
dengyihao's avatar
dengyihao 已提交
122
  // if (pMeta->streamBackend) streamBackendCleanup(pMeta->streamBackend);
L
Liu Jicong 已提交
123
  taosMemoryFree(pMeta);
dengyihao's avatar
dengyihao 已提交
124
  qError("failed to open stream meta");
L
Liu Jicong 已提交
125 126 127 128
  return NULL;
}

void streamMetaClose(SStreamMeta* pMeta) {
129
  tdbAbort(pMeta->db, pMeta->txn);
130
  tdbTbClose(pMeta->pTaskDb);
131
  tdbTbClose(pMeta->pCheckpointDb);
132
  tdbClose(pMeta->db);
L
Liu Jicong 已提交
133 134 135 136

  void* pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pMeta->pTasks, pIter);
137 138 139 140
    if (pIter == NULL) {
      break;
    }

L
Liu Jicong 已提交
141
    SStreamTask* pTask = *(SStreamTask**)pIter;
142 143 144 145 146 147 148 149
    if (pTask->schedTimer) {
      taosTmrStop(pTask->schedTimer);
      pTask->schedTimer = NULL;
    }

    if (pTask->launchTaskTimer) {
      taosTmrStop(pTask->launchTaskTimer);
      pTask->launchTaskTimer = NULL;
L
Liu Jicong 已提交
150
    }
151

152
    tFreeStreamTask(pTask);
L
Liu Jicong 已提交
153
  }
154

L
Liu Jicong 已提交
155
  taosHashCleanup(pMeta->pTasks);
dengyihao's avatar
dengyihao 已提交
156
  taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
157
  pMeta->pTaskList = taosArrayDestroy(pMeta->pTaskList);
L
Liu Jicong 已提交
158
  taosMemoryFree(pMeta->path);
Y
yihaoDeng 已提交
159 160
  taosThreadMutexDestroy(&pMeta->backendMutex);
  taosHashCleanup(pMeta->pTaskBackendUnique);
L
Liu Jicong 已提交
161 162 163
  taosMemoryFree(pMeta);
}

164 165
#if 0
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t ver, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
166 167 168 169 170 171
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
    return -1;
  }
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
172
  if (tDecodeStreamTask(&decoder, pTask) < 0) {
L
Liu Jicong 已提交
173
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
174 175 176 177
    goto FAIL;
  }
  tDecoderClear(&decoder);

178
  if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
L
Liu Jicong 已提交
179 180 181 182
    ASSERT(0);
    goto FAIL;
  }

183
  if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
L
Liu Jicong 已提交
184 185
    goto FAIL;
  }
L
Liu Jicong 已提交
186

187 188
  if (tdbTbUpsert(pMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), msg, msgLen, pMeta->txn) < 0) {
    taosHashRemove(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t));
L
Liu Jicong 已提交
189
    ASSERT(0);
L
Liu Jicong 已提交
190
    goto FAIL;
L
Liu Jicong 已提交
191
  }
L
Liu Jicong 已提交
192

L
Liu Jicong 已提交
193 194 195
  return 0;

FAIL:
196
  if (pTask) tFreeStreamTask(pTask);
L
Liu Jicong 已提交
197
  return -1;
L
Liu Jicong 已提交
198
}
199
#endif
L
Liu Jicong 已提交
200

L
Liu Jicong 已提交
201 202
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
  void*   buf = NULL;
L
Liu Jicong 已提交
203 204
  int32_t len;
  int32_t code;
Y
yihaoDeng 已提交
205
  pTask->ver = SSTREAM_TASK_VER;
206
  tEncodeSize(tEncodeStreamTask, pTask, len, code);
L
Liu Jicong 已提交
207 208 209
  if (code < 0) {
    return -1;
  }
L
Liu Jicong 已提交
210
  buf = taosMemoryCalloc(1, len);
L
Liu Jicong 已提交
211 212 213 214
  if (buf == NULL) {
    return -1;
  }

215
  SEncoder encoder = {0};
L
Liu Jicong 已提交
216
  tEncoderInit(&encoder, buf, len);
217
  tEncodeStreamTask(&encoder, pTask);
218
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
219

220
  if (tdbTbUpsert(pMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), buf, len, pMeta->txn) < 0) {
H
Haojun Liao 已提交
221
    qError("s-task:%s save to disk failed, code:%s", pTask->id.idStr, tstrerror(terrno));
L
Liu Jicong 已提交
222 223 224
    return -1;
  }

L
Liu Jicong 已提交
225
  taosMemoryFree(buf);
L
Liu Jicong 已提交
226 227 228
  return 0;
}

229 230 231 232 233 234 235 236 237 238 239
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
  int32_t code = tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(taskId), pMeta->txn);
  if (code != 0) {
    qError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, taskId, tstrerror(terrno));
  } else {
    qDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, taskId);
  }

  return code;
}

240
// add to the ready tasks hash map, not the restored tasks hash map
241 242 243
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) {
  *pAdded = false;

244 245
  void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
  if (p == NULL) {
246 247 248 249 250
    if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
      tFreeStreamTask(pTask);
      return -1;
    }

H
Haojun Liao 已提交
251 252
    taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);

253 254 255 256 257 258 259 260 261 262 263
    if (streamMetaSaveTask(pMeta, pTask) < 0) {
      tFreeStreamTask(pTask);
      return -1;
    }

    if (streamMetaCommit(pMeta) < 0) {
      tFreeStreamTask(pTask);
      return -1;
    }
  } else {
    return 0;
264 265 266
  }

  taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES);
267
  *pAdded = true;
L
Liu Jicong 已提交
268 269
  return 0;
}
270

271
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) {
272 273
  size_t size = taosHashGetSize(pMeta->pTasks);
  ASSERT(taosArrayGetSize(pMeta->pTaskList) == taosHashGetSize(pMeta->pTasks));
274
  return (int32_t)size;
275
}
L
Liu Jicong 已提交
276

L
Liu Jicong 已提交
277 278 279 280
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
  taosRLockLatch(&pMeta->lock);

  SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
281
  if (ppTask != NULL) {
282
    if (!streamTaskShouldStop(&(*ppTask)->status)) {
H
Haojun Liao 已提交
283
      int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
284
      taosRUnLockLatch(&pMeta->lock);
285
      qTrace("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref);
286 287
      return *ppTask;
    }
L
Liu Jicong 已提交
288
  }
289

L
Liu Jicong 已提交
290 291 292 293 294
  taosRUnLockLatch(&pMeta->lock);
  return NULL;
}

void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
H
Haojun Liao 已提交
295 296
  int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1);
  if (ref > 0) {
297
    qTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref);
H
Haojun Liao 已提交
298
  } else if (ref == 0) {
299
    ASSERT(streamTaskShouldStop(&pTask->status));
H
Haojun Liao 已提交
300
    qTrace("s-task:%s all refs are gone, free it", pTask->id.idStr);
301
    tFreeStreamTask(pTask);
H
Haojun Liao 已提交
302 303 304 305 306 307 308 309 310 311 312 313
  } else if (ref < 0) {
    qError("task ref is invalid, ref:%d, %s", ref, pTask->id.idStr);
  }
}

static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, int32_t taskId) {
  for (int32_t i = 0; i < num; ++i) {
    int32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i);
    if (*pTaskId == taskId) {
      taosArrayRemove(pMeta->pTaskList, i);
      break;
    }
L
Liu Jicong 已提交
314 315 316
  }
}

317
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) {
H
Haojun Liao 已提交
318
  SStreamTask* pTask = NULL;
H
Haojun Liao 已提交
319

H
Haojun Liao 已提交
320 321
  // pre-delete operation
  taosWLockLatch(&pMeta->lock);
L
Liu Jicong 已提交
322 323
  SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
  if (ppTask) {
H
Haojun Liao 已提交
324 325 326 327 328
    pTask = *ppTask;
    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
  } else {
    qDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId);
    taosWUnLockLatch(&pMeta->lock);
329
    return 0;
H
Haojun Liao 已提交
330 331 332 333 334
  }
  taosWUnLockLatch(&pMeta->lock);

  qDebug("s-task:0x%x set task status:%s", taskId, streamGetTaskStatusStr(TASK_STATUS__DROPPING));

Y
yihaoDeng 已提交
335
  while (1) {
H
Haojun Liao 已提交
336 337 338 339 340 341 342 343
    taosRLockLatch(&pMeta->lock);
    ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));

    if (ppTask) {
      if ((*ppTask)->status.timerActive == 0) {
        taosRUnLockLatch(&pMeta->lock);
        break;
      }
344

H
Haojun Liao 已提交
345 346 347 348 349 350 351 352 353 354 355 356 357
      taosMsleep(10);
      qDebug("s-task:%s wait for quit from timer", (*ppTask)->id.idStr);
      taosRUnLockLatch(&pMeta->lock);
    } else {
      taosRUnLockLatch(&pMeta->lock);
      break;
    }
  }

  // let's do delete of stream task
  taosWLockLatch(&pMeta->lock);
  ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
  if (ppTask) {
L
Liu Jicong 已提交
358
    taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
dengyihao's avatar
dengyihao 已提交
359
    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
360

H
Haojun Liao 已提交
361
    ASSERT(pTask->status.timerActive == 0);
362

H
Haojun Liao 已提交
363
    int32_t num = taosArrayGetSize(pMeta->pTaskList);
H
Haojun Liao 已提交
364 365 366 367 368
    doRemoveIdFromList(pMeta, num, pTask->id.taskId);

    // remove the ref by timer
    if (pTask->triggerParam != 0) {
      taosTmrStop(pTask->schedTimer);
369
    }
H
Haojun Liao 已提交
370 371 372

    streamMetaRemoveTask(pMeta, taskId);
    streamMetaReleaseTask(pMeta, pTask);
373
  } else {
H
Haojun Liao 已提交
374
    qDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId);
L
Liu Jicong 已提交
375
  }
H
Haojun Liao 已提交
376 377

  taosWUnLockLatch(&pMeta->lock);
378
  return 0;
L
Liu Jicong 已提交
379 380
}

L
Liu Jicong 已提交
381
int32_t streamMetaBegin(SStreamMeta* pMeta) {
382 383
  if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
L
Liu Jicong 已提交
384 385 386 387 388
    return -1;
  }
  return 0;
}

389
// todo add error log
L
Liu Jicong 已提交
390
int32_t streamMetaCommit(SStreamMeta* pMeta) {
391
  if (tdbCommit(pMeta->db, pMeta->txn) < 0) {
392
    qError("failed to commit stream meta");
L
Liu Jicong 已提交
393 394
    return -1;
  }
395

396
  if (tdbPostCommit(pMeta->db, pMeta->txn) < 0) {
397
    qError("failed to commit stream meta");
398 399
    return -1;
  }
400 401 402

  if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
403 404
    return -1;
  }
405

L
Liu Jicong 已提交
406 407 408
  return 0;
}

409
int32_t streamMetaAbort(SStreamMeta* pMeta) {
410
  if (tdbAbort(pMeta->db, pMeta->txn) < 0) {
411 412
    return -1;
  }
413 414 415

  if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
416 417
    return -1;
  }
L
Liu Jicong 已提交
418 419
  return 0;
}
420

L
Liu Jicong 已提交
421
int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
L
Liu Jicong 已提交
422
  TBC* pCur = NULL;
423

L
Liu Jicong 已提交
424
  if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
425
    qError("vgId:%d failed to open stream meta, code:%s", pMeta->vgId, tstrerror(terrno));
L
Liu Jicong 已提交
426 427 428 429 430 431 432 433
    return -1;
  }

  void*    pKey = NULL;
  int32_t  kLen = 0;
  void*    pVal = NULL;
  int32_t  vLen = 0;
  SDecoder decoder;
434
  SArray*  pRecycleList = taosArrayInit(4, sizeof(int32_t));
L
Liu Jicong 已提交
435 436 437 438 439 440

  tdbTbcMoveToFirst(pCur);

  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
    SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
    if (pTask == NULL) {
L
Liu Jicong 已提交
441 442
      tdbFree(pKey);
      tdbFree(pVal);
5
54liuyao 已提交
443
      tdbTbcClose(pCur);
444
      taosArrayDestroy(pRecycleList);
L
Liu Jicong 已提交
445 446 447
      return -1;
    }
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
Y
yihaoDeng 已提交
448 449 450 451 452 453 454
    if (tDecodeStreamTask(&decoder, pTask) < 0) {
      tDecoderClear(&decoder);
      tdbFree(pKey);
      tdbFree(pVal);
      tdbTbcClose(pCur);
      taosArrayDestroy(pRecycleList);
      tFreeStreamTask(pTask);
Y
yihaoDeng 已提交
455 456 457 458
      qError(
          "stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
          "manually",
          tsDataDir);
Y
yihaoDeng 已提交
459 460
      return -1;
    }
461

462 463 464 465 466 467 468 469 470 471 472 473
    if (pTask->status.taskStatus == TASK_STATUS__DROPPING) {
      int32_t taskId = pTask->id.taskId;
      tFreeStreamTask(pTask);

      taosArrayPush(pRecycleList, &taskId);

      int32_t total = taosArrayGetSize(pRecycleList);
      qDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total);
      continue;
    }

    // do duplicate task check.
474 475
    void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
    if (p == NULL) {
476 477 478 479
      if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.version) < 0) {
        tdbFree(pKey);
        tdbFree(pVal);
        tdbTbcClose(pCur);
480 481
        tFreeStreamTask(pTask);
        taosArrayDestroy(pRecycleList);
482 483
        return -1;
      }
484

485
      taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
486
    } else {
487 488 489 490
      tdbFree(pKey);
      tdbFree(pVal);
      tdbTbcClose(pCur);
      taosMemoryFree(pTask);
491
      continue;
492
    }
493

494
    if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, sizeof(void*)) < 0) {
L
Liu Jicong 已提交
495 496
      tdbFree(pKey);
      tdbFree(pVal);
5
54liuyao 已提交
497
      tdbTbcClose(pCur);
498 499
      tFreeStreamTask(pTask);
      taosArrayDestroy(pRecycleList);
500 501
      return -1;
    }
502

503
    ASSERT(pTask->status.downstreamReady == 0);
504 505
  }

506 507
  tdbFree(pKey);
  tdbFree(pVal);
508
  if (tdbTbcClose(pCur) < 0) {
509
    taosArrayDestroy(pRecycleList);
510
    return -1;
L
Liu Jicong 已提交
511 512
  }

513
  if (taosArrayGetSize(pRecycleList) > 0) {
Y
yihaoDeng 已提交
514 515
    for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) {
      int32_t taskId = *(int32_t*)taosArrayGet(pRecycleList, i);
516 517 518 519
      streamMetaRemoveTask(pMeta, taskId);
    }
  }

Y
yihaoDeng 已提交
520
  qDebug("vgId:%d load %d task from disk", pMeta->vgId, (int32_t)taosArrayGetSize(pMeta->pTaskList));
521
  taosArrayDestroy(pRecycleList);
L
Liu Jicong 已提交
522 523
  return 0;
}