streamMeta.c 10.3 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "executor.h"
17
#include "streamBackendRocksdb.h"
18
#include "streamInc.h"
dengyihao's avatar
dengyihao 已提交
19
#include "tref.h"
L
Liu Jicong 已提交
20
#include "ttimer.h"
L
Liu Jicong 已提交
21

22
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) {
dengyihao's avatar
dengyihao 已提交
23
  int32_t      code = -1;
L
Liu Jicong 已提交
24 25 26 27 28
  SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
  if (pMeta == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
29

L
Liu Jicong 已提交
30 31
  int32_t len = strlen(path) + 20;
  char*   streamPath = taosMemoryCalloc(1, len);
32
  sprintf(streamPath, "%s/%s", path, "stream");
33
  pMeta->path = taosStrdup(streamPath);
34
  if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) {
L
Liu Jicong 已提交
35
    taosMemoryFree(streamPath);
L
Liu Jicong 已提交
36 37 38
    goto _err;
  }

L
Liu Jicong 已提交
39
  sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints");
dengyihao's avatar
dengyihao 已提交
40
  code = taosMulModeMkDir(streamPath, 0755);
dengyihao's avatar
dengyihao 已提交
41 42 43 44 45
  if (code != 0) {
    terrno = TAOS_SYSTEM_ERROR(code);
    taosMemoryFree(streamPath);
    goto _err;
  }
L
Liu Jicong 已提交
46
  taosMemoryFree(streamPath);
47

48
  if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
L
Liu Jicong 已提交
49 50 51
    goto _err;
  }

52
  if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) {
L
Liu Jicong 已提交
53 54 55
    goto _err;
  }

56
  _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
H
Haojun Liao 已提交
57
  pMeta->pTasks = taosHashInit(64, fp, true, HASH_NO_LOCK);
L
Liu Jicong 已提交
58 59 60 61
  if (pMeta->pTasks == NULL) {
    goto _err;
  }

62 63 64 65 66 67 68
  // task list
  pMeta->pTaskList = taosArrayInit(4, sizeof(int32_t));
  if (pMeta->pTaskList == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

L
Liu Jicong 已提交
69 70 71 72
  if (streamMetaBegin(pMeta) < 0) {
    goto _err;
  }

H
Haojun Liao 已提交
73
  pMeta->walScanCounter = 0;
74
  pMeta->vgId = vgId;
L
Liu Jicong 已提交
75 76
  pMeta->ahandle = ahandle;
  pMeta->expandFunc = expandFunc;
dengyihao's avatar
dengyihao 已提交
77 78 79 80 81 82 83 84 85 86 87

  char* statePath = taosMemoryCalloc(1, len);
  sprintf(statePath, "%s/%s", pMeta->path, "state");
  code = taosMulModeMkDir(statePath, 0755);
  if (code != 0) {
    terrno = TAOS_SYSTEM_ERROR(code);
    taosMemoryFree(streamPath);
    goto _err;
  }

  pMeta->streamBackend = streamBackendInit(statePath);
dengyihao's avatar
dengyihao 已提交
88 89 90
  pMeta->streamBackendId = taosOpenRef(20, streamBackendCleanup);
  pMeta->streamBackendRid = taosAddRef(pMeta->streamBackendId, pMeta->streamBackend);

dengyihao's avatar
dengyihao 已提交
91 92
  taosMemoryFree(statePath);

93
  taosInitRWLatch(&pMeta->lock);
94
  return pMeta;
L
Liu Jicong 已提交
95

L
Liu Jicong 已提交
96
_err:
L
Liu Jicong 已提交
97
  taosMemoryFree(pMeta->path);
L
Liu Jicong 已提交
98
  if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
99
  if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
L
Liu Jicong 已提交
100
  if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
101
  if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb);
L
Liu Jicong 已提交
102
  if (pMeta->db) tdbClose(pMeta->db);
dengyihao's avatar
dengyihao 已提交
103
  // if (pMeta->streamBackend) streamBackendCleanup(pMeta->streamBackend);
L
Liu Jicong 已提交
104
  taosMemoryFree(pMeta);
dengyihao's avatar
dengyihao 已提交
105
  qError("failed to open stream meta");
L
Liu Jicong 已提交
106 107 108 109
  return NULL;
}

void streamMetaClose(SStreamMeta* pMeta) {
110
  tdbAbort(pMeta->db, pMeta->txn);
111
  tdbTbClose(pMeta->pTaskDb);
112
  tdbTbClose(pMeta->pCheckpointDb);
113
  tdbClose(pMeta->db);
L
Liu Jicong 已提交
114 115 116 117

  void* pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pMeta->pTasks, pIter);
118 119 120 121
    if (pIter == NULL) {
      break;
    }

L
Liu Jicong 已提交
122
    SStreamTask* pTask = *(SStreamTask**)pIter;
L
Liu Jicong 已提交
123 124 125 126
    if (pTask->timer) {
      taosTmrStop(pTask->timer);
      pTask->timer = NULL;
    }
127

128
    tFreeStreamTask(pTask);
L
Liu Jicong 已提交
129
  }
130

L
Liu Jicong 已提交
131
  taosHashCleanup(pMeta->pTasks);
dengyihao's avatar
dengyihao 已提交
132 133 134
  taosRemoveRef(pMeta->streamBackendId, pMeta->streamBackendRid);
  // streamBackendCleanup(pMeta->streamBackend);
  taosCloseRef(pMeta->streamBackendId);
135
  pMeta->pTaskList = taosArrayDestroy(pMeta->pTaskList);
L
Liu Jicong 已提交
136 137 138 139
  taosMemoryFree(pMeta->path);
  taosMemoryFree(pMeta);
}

140 141
#if 0
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t ver, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
142 143 144 145 146 147
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
    return -1;
  }
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
148
  if (tDecodeStreamTask(&decoder, pTask) < 0) {
L
Liu Jicong 已提交
149
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
150 151 152 153
    goto FAIL;
  }
  tDecoderClear(&decoder);

154
  if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
L
Liu Jicong 已提交
155 156 157 158
    ASSERT(0);
    goto FAIL;
  }

159
  if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
L
Liu Jicong 已提交
160 161
    goto FAIL;
  }
L
Liu Jicong 已提交
162

163 164
  if (tdbTbUpsert(pMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), msg, msgLen, pMeta->txn) < 0) {
    taosHashRemove(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t));
L
Liu Jicong 已提交
165
    ASSERT(0);
L
Liu Jicong 已提交
166
    goto FAIL;
L
Liu Jicong 已提交
167
  }
L
Liu Jicong 已提交
168

L
Liu Jicong 已提交
169 170 171
  return 0;

FAIL:
172
  if (pTask) tFreeStreamTask(pTask);
L
Liu Jicong 已提交
173
  return -1;
L
Liu Jicong 已提交
174
}
175
#endif
L
Liu Jicong 已提交
176

L
Liu Jicong 已提交
177 178
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
  void*   buf = NULL;
L
Liu Jicong 已提交
179 180
  int32_t len;
  int32_t code;
181
  tEncodeSize(tEncodeStreamTask, pTask, len, code);
L
Liu Jicong 已提交
182 183 184
  if (code < 0) {
    return -1;
  }
L
Liu Jicong 已提交
185
  buf = taosMemoryCalloc(1, len);
L
Liu Jicong 已提交
186 187 188 189
  if (buf == NULL) {
    return -1;
  }

190
  SEncoder encoder = {0};
L
Liu Jicong 已提交
191
  tEncoderInit(&encoder, buf, len);
192
  tEncodeStreamTask(&encoder, pTask);
193
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
194

195
  if (tdbTbUpsert(pMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), buf, len, pMeta->txn) < 0) {
L
Liu Jicong 已提交
196 197 198
    return -1;
  }

L
Liu Jicong 已提交
199
  taosMemoryFree(buf);
L
Liu Jicong 已提交
200 201 202
  return 0;
}

203 204
// add to the ready tasks hash map, not the restored tasks hash map
int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) {
L
Liu Jicong 已提交
205
  if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
L
liuyao 已提交
206
    tFreeStreamTask(pTask);
L
Liu Jicong 已提交
207 208 209 210
    return -1;
  }

  if (streamMetaSaveTask(pMeta, pTask) < 0) {
L
liuyao 已提交
211
    tFreeStreamTask(pTask);
L
Liu Jicong 已提交
212 213 214
    return -1;
  }

215 216 217
  if (streamMetaCommit(pMeta) < 0) {
    tFreeStreamTask(pTask);
    return -1;
L
Liu Jicong 已提交
218 219
  }

220 221 222 223 224 225
  void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
  if (p == NULL) {
    taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
  }

  taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES);
L
Liu Jicong 已提交
226 227
  return 0;
}
228 229

int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta) {
230 231 232
  size_t size = taosHashGetSize(pMeta->pTasks);
  ASSERT(taosArrayGetSize(pMeta->pTaskList) == taosHashGetSize(pMeta->pTasks));

233
  return (int32_t)size;
234
}
L
Liu Jicong 已提交
235

L
Liu Jicong 已提交
236 237 238 239
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
  taosRLockLatch(&pMeta->lock);

  SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
240
  if (ppTask != NULL) {
241
    if (!streamTaskShouldStop(&(*ppTask)->status)) {
242 243 244 245
      atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
      taosRUnLockLatch(&pMeta->lock);
      return *ppTask;
    }
L
Liu Jicong 已提交
246
  }
247

L
Liu Jicong 已提交
248 249 250 251 252 253
  taosRUnLockLatch(&pMeta->lock);
  return NULL;
}

void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
  int32_t left = atomic_sub_fetch_32(&pTask->refCnt, 1);
254 255 256
  if (left < 0) {
    qError("task ref is invalid, ref:%d, %s", left, pTask->id.idStr);
  } else if (left == 0) {
257
    ASSERT(streamTaskShouldStop(&pTask->status));
258
    tFreeStreamTask(pTask);
L
Liu Jicong 已提交
259 260 261
  }
}

262
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
H
Haojun Liao 已提交
263 264
  taosWLockLatch(&pMeta->lock);

L
Liu Jicong 已提交
265 266 267
  SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
  if (ppTask) {
    SStreamTask* pTask = *ppTask;
268

dengyihao's avatar
dengyihao 已提交
269
    // taosWLockLatch(&pMeta->lock);
270

L
Liu Jicong 已提交
271
    taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
L
Liu Jicong 已提交
272
    tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn);
273

dengyihao's avatar
dengyihao 已提交
274 275
    //
    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
L
Liu Jicong 已提交
276

277
    int32_t num = taosArrayGetSize(pMeta->pTaskList);
278
    for (int32_t i = 0; i < num; ++i) {
279 280 281 282 283 284 285
      int32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i);
      if (*pTaskId == taskId) {
        taosArrayRemove(pMeta->pTaskList, i);
        break;
      }
    }

L
Liu Jicong 已提交
286 287
    streamMetaReleaseTask(pMeta, pTask);
  }
H
Haojun Liao 已提交
288 289

  taosWUnLockLatch(&pMeta->lock);
L
Liu Jicong 已提交
290 291
}

L
Liu Jicong 已提交
292
int32_t streamMetaBegin(SStreamMeta* pMeta) {
293 294
  if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
L
Liu Jicong 已提交
295 296 297 298 299 300
    return -1;
  }
  return 0;
}

int32_t streamMetaCommit(SStreamMeta* pMeta) {
301
  if (tdbCommit(pMeta->db, pMeta->txn) < 0) {
302
    qError("failed to commit stream meta");
L
Liu Jicong 已提交
303 304
    return -1;
  }
305

306
  if (tdbPostCommit(pMeta->db, pMeta->txn) < 0) {
307
    qError("failed to commit stream meta");
308 309
    return -1;
  }
310 311 312

  if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
313 314
    return -1;
  }
L
Liu Jicong 已提交
315 316 317
  return 0;
}

318
int32_t streamMetaAbort(SStreamMeta* pMeta) {
319
  if (tdbAbort(pMeta->db, pMeta->txn) < 0) {
320 321
    return -1;
  }
322 323 324

  if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
325 326
    return -1;
  }
L
Liu Jicong 已提交
327 328
  return 0;
}
329

L
Liu Jicong 已提交
330
int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
L
Liu Jicong 已提交
331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346
  TBC* pCur = NULL;
  if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
    return -1;
  }

  void*    pKey = NULL;
  int32_t  kLen = 0;
  void*    pVal = NULL;
  int32_t  vLen = 0;
  SDecoder decoder;

  tdbTbcMoveToFirst(pCur);

  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
    SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
    if (pTask == NULL) {
L
Liu Jicong 已提交
347 348
      tdbFree(pKey);
      tdbFree(pVal);
5
54liuyao 已提交
349
      tdbTbcClose(pCur);
L
Liu Jicong 已提交
350 351
      return -1;
    }
352

L
Liu Jicong 已提交
353
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
354
    tDecodeStreamTask(&decoder, pTask);
L
Liu Jicong 已提交
355
    tDecoderClear(&decoder);
356

357
    if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.version) < 0) {
L
Liu Jicong 已提交
358 359
      tdbFree(pKey);
      tdbFree(pVal);
5
54liuyao 已提交
360
      tdbTbcClose(pCur);
361 362 363
      return -1;
    }

364 365 366 367 368 369
    void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
    if (p == NULL) {
      taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
    }

    if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, sizeof(void*)) < 0) {
L
Liu Jicong 已提交
370 371
      tdbFree(pKey);
      tdbFree(pVal);
5
54liuyao 已提交
372
      tdbTbcClose(pCur);
373 374
      return -1;
    }
375

L
Liu Jicong 已提交
376
    if (pTask->fillHistory) {
377
      pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM;
L
Liu Jicong 已提交
378 379
      streamTaskCheckDownstream(pTask, ver);
    }
380 381
  }

382 383
  tdbFree(pKey);
  tdbFree(pVal);
384 385
  if (tdbTbcClose(pCur) < 0) {
    return -1;
L
Liu Jicong 已提交
386 387 388 389
  }

  return 0;
}