streamMeta.c 10.8 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "executor.h"
17
#include "streamBackendRocksdb.h"
18
#include "streamInc.h"
dengyihao's avatar
dengyihao 已提交
19
#include "tref.h"
L
Liu Jicong 已提交
20
#include "ttimer.h"
L
Liu Jicong 已提交
21

dengyihao's avatar
dengyihao 已提交
22
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
dengyihao's avatar
dengyihao 已提交
23
int32_t             streamBackendId = 0;
dengyihao's avatar
dengyihao 已提交
24 25 26 27 28
static void         streamMetaEnvInit() { streamBackendId = taosOpenRef(20, streamBackendCleanup); }

void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
void streamMetaCleanup() { taosCloseRef(streamBackendId); }

29
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) {
dengyihao's avatar
dengyihao 已提交
30
  int32_t      code = -1;
L
Liu Jicong 已提交
31 32 33 34 35
  SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
  if (pMeta == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
36

L
Liu Jicong 已提交
37 38
  int32_t len = strlen(path) + 20;
  char*   streamPath = taosMemoryCalloc(1, len);
39
  sprintf(streamPath, "%s/%s", path, "stream");
40
  pMeta->path = taosStrdup(streamPath);
41
  if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) {
L
Liu Jicong 已提交
42 43
    goto _err;
  }
dengyihao's avatar
dengyihao 已提交
44
  memset(streamPath, 0, len);
L
Liu Jicong 已提交
45

L
Liu Jicong 已提交
46
  sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints");
dengyihao's avatar
dengyihao 已提交
47
  code = taosMulModeMkDir(streamPath, 0755);
dengyihao's avatar
dengyihao 已提交
48 49 50 51
  if (code != 0) {
    terrno = TAOS_SYSTEM_ERROR(code);
    goto _err;
  }
52

53
  if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
L
Liu Jicong 已提交
54 55 56
    goto _err;
  }

57
  if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) {
L
Liu Jicong 已提交
58 59 60
    goto _err;
  }

61
  _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
H
Haojun Liao 已提交
62
  pMeta->pTasks = taosHashInit(64, fp, true, HASH_NO_LOCK);
L
Liu Jicong 已提交
63 64 65 66
  if (pMeta->pTasks == NULL) {
    goto _err;
  }

67 68 69 70 71 72 73
  // task list
  pMeta->pTaskList = taosArrayInit(4, sizeof(int32_t));
  if (pMeta->pTaskList == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

L
Liu Jicong 已提交
74 75 76 77
  if (streamMetaBegin(pMeta) < 0) {
    goto _err;
  }

H
Haojun Liao 已提交
78
  pMeta->walScanCounter = 0;
79
  pMeta->vgId = vgId;
L
Liu Jicong 已提交
80 81
  pMeta->ahandle = ahandle;
  pMeta->expandFunc = expandFunc;
dengyihao's avatar
dengyihao 已提交
82

dengyihao's avatar
dengyihao 已提交
83 84 85
  memset(streamPath, 0, len);
  sprintf(streamPath, "%s/%s", pMeta->path, "state");
  code = taosMulModeMkDir(streamPath, 0755);
dengyihao's avatar
dengyihao 已提交
86 87 88 89 90
  if (code != 0) {
    terrno = TAOS_SYSTEM_ERROR(code);
    goto _err;
  }

dengyihao's avatar
dengyihao 已提交
91
  pMeta->streamBackend = streamBackendInit(streamPath);
dengyihao's avatar
dengyihao 已提交
92
  pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
dengyihao's avatar
dengyihao 已提交
93

dengyihao's avatar
dengyihao 已提交
94
  taosMemoryFree(streamPath);
dengyihao's avatar
dengyihao 已提交
95

96
  taosInitRWLatch(&pMeta->lock);
97
  return pMeta;
L
Liu Jicong 已提交
98

L
Liu Jicong 已提交
99
_err:
dengyihao's avatar
dengyihao 已提交
100
  taosMemoryFree(streamPath);
L
Liu Jicong 已提交
101
  taosMemoryFree(pMeta->path);
L
Liu Jicong 已提交
102
  if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
103
  if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
L
Liu Jicong 已提交
104
  if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
105
  if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb);
L
Liu Jicong 已提交
106
  if (pMeta->db) tdbClose(pMeta->db);
dengyihao's avatar
dengyihao 已提交
107
  // if (pMeta->streamBackend) streamBackendCleanup(pMeta->streamBackend);
L
Liu Jicong 已提交
108
  taosMemoryFree(pMeta);
dengyihao's avatar
dengyihao 已提交
109
  qError("failed to open stream meta");
L
Liu Jicong 已提交
110 111 112 113
  return NULL;
}

void streamMetaClose(SStreamMeta* pMeta) {
114
  tdbAbort(pMeta->db, pMeta->txn);
115
  tdbTbClose(pMeta->pTaskDb);
116
  tdbTbClose(pMeta->pCheckpointDb);
117
  tdbClose(pMeta->db);
L
Liu Jicong 已提交
118 119 120 121

  void* pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pMeta->pTasks, pIter);
122 123 124 125
    if (pIter == NULL) {
      break;
    }

L
Liu Jicong 已提交
126
    SStreamTask* pTask = *(SStreamTask**)pIter;
L
Liu Jicong 已提交
127 128 129 130
    if (pTask->timer) {
      taosTmrStop(pTask->timer);
      pTask->timer = NULL;
    }
131

132
    tFreeStreamTask(pTask);
L
Liu Jicong 已提交
133
  }
134

L
Liu Jicong 已提交
135
  taosHashCleanup(pMeta->pTasks);
dengyihao's avatar
dengyihao 已提交
136
  taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
137
  pMeta->pTaskList = taosArrayDestroy(pMeta->pTaskList);
L
Liu Jicong 已提交
138 139 140 141
  taosMemoryFree(pMeta->path);
  taosMemoryFree(pMeta);
}

142 143
#if 0
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t ver, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
144 145 146 147 148 149
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
    return -1;
  }
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
150
  if (tDecodeStreamTask(&decoder, pTask) < 0) {
L
Liu Jicong 已提交
151
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
152 153 154 155
    goto FAIL;
  }
  tDecoderClear(&decoder);

156
  if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
L
Liu Jicong 已提交
157 158 159 160
    ASSERT(0);
    goto FAIL;
  }

161
  if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
L
Liu Jicong 已提交
162 163
    goto FAIL;
  }
L
Liu Jicong 已提交
164

165 166
  if (tdbTbUpsert(pMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), msg, msgLen, pMeta->txn) < 0) {
    taosHashRemove(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t));
L
Liu Jicong 已提交
167
    ASSERT(0);
L
Liu Jicong 已提交
168
    goto FAIL;
L
Liu Jicong 已提交
169
  }
L
Liu Jicong 已提交
170

L
Liu Jicong 已提交
171 172 173
  return 0;

FAIL:
174
  if (pTask) tFreeStreamTask(pTask);
L
Liu Jicong 已提交
175
  return -1;
L
Liu Jicong 已提交
176
}
177
#endif
L
Liu Jicong 已提交
178

L
Liu Jicong 已提交
179 180
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
  void*   buf = NULL;
L
Liu Jicong 已提交
181 182
  int32_t len;
  int32_t code;
183
  tEncodeSize(tEncodeStreamTask, pTask, len, code);
L
Liu Jicong 已提交
184 185 186
  if (code < 0) {
    return -1;
  }
L
Liu Jicong 已提交
187
  buf = taosMemoryCalloc(1, len);
L
Liu Jicong 已提交
188 189 190 191
  if (buf == NULL) {
    return -1;
  }

192
  SEncoder encoder = {0};
L
Liu Jicong 已提交
193
  tEncoderInit(&encoder, buf, len);
194
  tEncodeStreamTask(&encoder, pTask);
195
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
196

197
  if (tdbTbUpsert(pMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), buf, len, pMeta->txn) < 0) {
L
Liu Jicong 已提交
198 199 200
    return -1;
  }

L
Liu Jicong 已提交
201
  taosMemoryFree(buf);
L
Liu Jicong 已提交
202 203 204
  return 0;
}

205 206
// add to the ready tasks hash map, not the restored tasks hash map
int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) {
207 208
  void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
  if (p == NULL) {
209 210 211 212 213 214 215 216 217 218 219 220 221 222
    if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
      tFreeStreamTask(pTask);
      return -1;
    }

    if (streamMetaSaveTask(pMeta, pTask) < 0) {
      tFreeStreamTask(pTask);
      return -1;
    }

    if (streamMetaCommit(pMeta) < 0) {
      tFreeStreamTask(pTask);
      return -1;
    }
223
    taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
224 225
  } else {
    return 0;
226 227 228
  }

  taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES);
L
Liu Jicong 已提交
229 230
  return 0;
}
231 232

int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta) {
233 234 235
  size_t size = taosHashGetSize(pMeta->pTasks);
  ASSERT(taosArrayGetSize(pMeta->pTaskList) == taosHashGetSize(pMeta->pTasks));

236
  return (int32_t)size;
237
}
L
Liu Jicong 已提交
238

L
Liu Jicong 已提交
239 240 241 242
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
  taosRLockLatch(&pMeta->lock);

  SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
243
  if (ppTask != NULL) {
244
    if (!streamTaskShouldStop(&(*ppTask)->status)) {
245 246 247 248
      atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
      taosRUnLockLatch(&pMeta->lock);
      return *ppTask;
    }
L
Liu Jicong 已提交
249
  }
250

L
Liu Jicong 已提交
251 252 253 254 255 256
  taosRUnLockLatch(&pMeta->lock);
  return NULL;
}

void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
  int32_t left = atomic_sub_fetch_32(&pTask->refCnt, 1);
257 258 259
  if (left < 0) {
    qError("task ref is invalid, ref:%d, %s", left, pTask->id.idStr);
  } else if (left == 0) {
260
    ASSERT(streamTaskShouldStop(&pTask->status));
261
    tFreeStreamTask(pTask);
L
Liu Jicong 已提交
262 263 264
  }
}

265
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
H
Haojun Liao 已提交
266 267
  taosWLockLatch(&pMeta->lock);

L
Liu Jicong 已提交
268 269 270
  SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
  if (ppTask) {
    SStreamTask* pTask = *ppTask;
271

L
Liu Jicong 已提交
272
    taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
L
Liu Jicong 已提交
273
    tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn);
274

dengyihao's avatar
dengyihao 已提交
275
    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
276
    int32_t num = taosArrayGetSize(pMeta->pTaskList);
277 278

    qDebug("s-task:%s set the drop task flag, remain running s-task:%d", pTask->id.idStr, num - 1);
279
    for (int32_t i = 0; i < num; ++i) {
280 281 282 283 284 285 286
      int32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i);
      if (*pTaskId == taskId) {
        taosArrayRemove(pMeta->pTaskList, i);
        break;
      }
    }

L
Liu Jicong 已提交
287
    streamMetaReleaseTask(pMeta, pTask);
288 289
  } else {
    qDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId);
L
Liu Jicong 已提交
290
  }
H
Haojun Liao 已提交
291 292

  taosWUnLockLatch(&pMeta->lock);
L
Liu Jicong 已提交
293 294
}

L
Liu Jicong 已提交
295
int32_t streamMetaBegin(SStreamMeta* pMeta) {
296 297
  if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
L
Liu Jicong 已提交
298 299 300 301 302
    return -1;
  }
  return 0;
}

303
// todo add error log
L
Liu Jicong 已提交
304
int32_t streamMetaCommit(SStreamMeta* pMeta) {
305
  if (tdbCommit(pMeta->db, pMeta->txn) < 0) {
306
    qError("failed to commit stream meta");
L
Liu Jicong 已提交
307 308
    return -1;
  }
309

310
  if (tdbPostCommit(pMeta->db, pMeta->txn) < 0) {
311
    qError("failed to commit stream meta");
312 313
    return -1;
  }
314 315 316

  if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
317 318
    return -1;
  }
319

L
Liu Jicong 已提交
320 321 322
  return 0;
}

323
int32_t streamMetaAbort(SStreamMeta* pMeta) {
324
  if (tdbAbort(pMeta->db, pMeta->txn) < 0) {
325 326
    return -1;
  }
327 328 329

  if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
330 331
    return -1;
  }
L
Liu Jicong 已提交
332 333
  return 0;
}
334

L
Liu Jicong 已提交
335
int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
L
Liu Jicong 已提交
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351
  TBC* pCur = NULL;
  if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
    return -1;
  }

  void*    pKey = NULL;
  int32_t  kLen = 0;
  void*    pVal = NULL;
  int32_t  vLen = 0;
  SDecoder decoder;

  tdbTbcMoveToFirst(pCur);

  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
    SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
    if (pTask == NULL) {
L
Liu Jicong 已提交
352 353
      tdbFree(pKey);
      tdbFree(pVal);
5
54liuyao 已提交
354
      tdbTbcClose(pCur);
L
Liu Jicong 已提交
355 356
      return -1;
    }
357

L
Liu Jicong 已提交
358
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
359
    tDecodeStreamTask(&decoder, pTask);
L
Liu Jicong 已提交
360
    tDecoderClear(&decoder);
361

362
    // remove duplicate
363 364
    void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
    if (p == NULL) {
365 366 367 368
      if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.version) < 0) {
        tdbFree(pKey);
        tdbFree(pVal);
        tdbTbcClose(pCur);
369
        taosMemoryFree(pTask);
370 371
        return -1;
      }
372
      taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
373
    } else {
374 375 376 377
      tdbFree(pKey);
      tdbFree(pVal);
      tdbTbcClose(pCur);
      taosMemoryFree(pTask);
378
      continue;
379 380
    }
    if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, sizeof(void*)) < 0) {
L
Liu Jicong 已提交
381 382
      tdbFree(pKey);
      tdbFree(pVal);
5
54liuyao 已提交
383
      tdbTbcClose(pCur);
384
      taosMemoryFree(pTask);
385 386
      return -1;
    }
387

L
Liu Jicong 已提交
388
    if (pTask->fillHistory) {
389
      ASSERT(pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM);
L
Liu Jicong 已提交
390 391
      streamTaskCheckDownstream(pTask, ver);
    }
392 393
  }

394 395
  tdbFree(pKey);
  tdbFree(pVal);
396 397
  if (tdbTbcClose(pCur) < 0) {
    return -1;
L
Liu Jicong 已提交
398 399 400 401
  }

  return 0;
}