streamMeta.c 9.1 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "executor.h"
17
#include "streamInc.h"
L
Liu Jicong 已提交
18
#include "ttimer.h"
L
Liu Jicong 已提交
19

20
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) {
dengyihao's avatar
dengyihao 已提交
21
  int32_t      code = -1;
L
Liu Jicong 已提交
22 23 24 25 26
  SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
  if (pMeta == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
27

L
Liu Jicong 已提交
28 29
  int32_t len = strlen(path) + 20;
  char*   streamPath = taosMemoryCalloc(1, len);
30
  sprintf(streamPath, "%s/%s", path, "stream");
31
  pMeta->path = taosStrdup(streamPath);
32
  if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) {
L
Liu Jicong 已提交
33
    taosMemoryFree(streamPath);
L
Liu Jicong 已提交
34 35 36
    goto _err;
  }

L
Liu Jicong 已提交
37
  sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints");
dengyihao's avatar
dengyihao 已提交
38
  code = taosMulModeMkDir(streamPath, 0755);
dengyihao's avatar
dengyihao 已提交
39 40 41 42 43
  if (code != 0) {
    terrno = TAOS_SYSTEM_ERROR(code);
    taosMemoryFree(streamPath);
    goto _err;
  }
L
Liu Jicong 已提交
44
  taosMemoryFree(streamPath);
45

46
  if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
L
Liu Jicong 已提交
47 48 49
    goto _err;
  }

50
  if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) {
L
Liu Jicong 已提交
51 52 53
    goto _err;
  }

54
  _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
H
Haojun Liao 已提交
55
  pMeta->pTasks = taosHashInit(64, fp, true, HASH_NO_LOCK);
L
Liu Jicong 已提交
56 57 58 59
  if (pMeta->pTasks == NULL) {
    goto _err;
  }

60 61 62 63 64 65 66
  // task list
  pMeta->pTaskList = taosArrayInit(4, sizeof(int32_t));
  if (pMeta->pTaskList == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    goto _err;
  }

L
Liu Jicong 已提交
67 68 69 70
  if (streamMetaBegin(pMeta) < 0) {
    goto _err;
  }

71
  pMeta->walScan = 0;
72
  pMeta->vgId = vgId;
L
Liu Jicong 已提交
73 74
  pMeta->ahandle = ahandle;
  pMeta->expandFunc = expandFunc;
75
  taosInitRWLatch(&pMeta->lock);
76
  return pMeta;
L
Liu Jicong 已提交
77

L
Liu Jicong 已提交
78
_err:
L
Liu Jicong 已提交
79
  taosMemoryFree(pMeta->path);
L
Liu Jicong 已提交
80
  if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
81
  if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
L
Liu Jicong 已提交
82
  if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
83
  if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb);
L
Liu Jicong 已提交
84 85
  if (pMeta->db) tdbClose(pMeta->db);
  taosMemoryFree(pMeta);
L
Liu Jicong 已提交
86 87 88 89
  return NULL;
}

void streamMetaClose(SStreamMeta* pMeta) {
90
  tdbAbort(pMeta->db, pMeta->txn);
91
  tdbTbClose(pMeta->pTaskDb);
92
  tdbTbClose(pMeta->pCheckpointDb);
93
  tdbClose(pMeta->db);
L
Liu Jicong 已提交
94 95 96 97

  void* pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pMeta->pTasks, pIter);
98 99 100 101
    if (pIter == NULL) {
      break;
    }

L
Liu Jicong 已提交
102
    SStreamTask* pTask = *(SStreamTask**)pIter;
L
Liu Jicong 已提交
103 104 105 106
    if (pTask->timer) {
      taosTmrStop(pTask->timer);
      pTask->timer = NULL;
    }
107

108
    tFreeStreamTask(pTask);
L
Liu Jicong 已提交
109
  }
110

L
Liu Jicong 已提交
111
  taosHashCleanup(pMeta->pTasks);
112
  pMeta->pTaskList = taosArrayDestroy(pMeta->pTaskList);
L
Liu Jicong 已提交
113 114 115 116
  taosMemoryFree(pMeta->path);
  taosMemoryFree(pMeta);
}

117 118
#if 0
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t ver, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
119 120 121 122 123 124
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
    return -1;
  }
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
125
  if (tDecodeStreamTask(&decoder, pTask) < 0) {
L
Liu Jicong 已提交
126
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
127 128 129 130
    goto FAIL;
  }
  tDecoderClear(&decoder);

131
  if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
L
Liu Jicong 已提交
132 133 134 135
    ASSERT(0);
    goto FAIL;
  }

136
  if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
L
Liu Jicong 已提交
137 138
    goto FAIL;
  }
L
Liu Jicong 已提交
139

140 141
  if (tdbTbUpsert(pMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), msg, msgLen, pMeta->txn) < 0) {
    taosHashRemove(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t));
L
Liu Jicong 已提交
142
    ASSERT(0);
L
Liu Jicong 已提交
143
    goto FAIL;
L
Liu Jicong 已提交
144
  }
L
Liu Jicong 已提交
145

L
Liu Jicong 已提交
146 147 148
  return 0;

FAIL:
149
  if (pTask) tFreeStreamTask(pTask);
L
Liu Jicong 已提交
150
  return -1;
L
Liu Jicong 已提交
151
}
152
#endif
L
Liu Jicong 已提交
153

L
Liu Jicong 已提交
154 155
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
  void*   buf = NULL;
L
Liu Jicong 已提交
156 157
  int32_t len;
  int32_t code;
158
  tEncodeSize(tEncodeStreamTask, pTask, len, code);
L
Liu Jicong 已提交
159 160 161
  if (code < 0) {
    return -1;
  }
L
Liu Jicong 已提交
162
  buf = taosMemoryCalloc(1, len);
L
Liu Jicong 已提交
163 164 165 166
  if (buf == NULL) {
    return -1;
  }

167
  SEncoder encoder = {0};
L
Liu Jicong 已提交
168
  tEncoderInit(&encoder, buf, len);
169
  tEncodeStreamTask(&encoder, pTask);
170
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
171

172
  if (tdbTbUpsert(pMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), buf, len, pMeta->txn) < 0) {
L
Liu Jicong 已提交
173 174 175
    return -1;
  }

L
Liu Jicong 已提交
176
  taosMemoryFree(buf);
L
Liu Jicong 已提交
177 178 179
  return 0;
}

180 181
// add to the ready tasks hash map, not the restored tasks hash map
int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) {
L
Liu Jicong 已提交
182
  if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
L
liuyao 已提交
183
    tFreeStreamTask(pTask);
L
Liu Jicong 已提交
184 185 186 187
    return -1;
  }

  if (streamMetaSaveTask(pMeta, pTask) < 0) {
L
liuyao 已提交
188
    tFreeStreamTask(pTask);
L
Liu Jicong 已提交
189 190 191
    return -1;
  }

192
  taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES);
193
  taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
L
Liu Jicong 已提交
194 195
  return 0;
}
196 197

int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta) {
198 199 200 201
  size_t size = taosHashGetSize(pMeta->pTasks);
  ASSERT(taosArrayGetSize(pMeta->pTaskList) == taosHashGetSize(pMeta->pTasks));

  return (int32_t) size;
202
}
L
Liu Jicong 已提交
203

L
Liu Jicong 已提交
204 205 206 207
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
  taosRLockLatch(&pMeta->lock);

  SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
208
  if (ppTask != NULL) {
209
    if (!streamTaskShouldStop(&(*ppTask)->status)) {
210 211 212 213
      atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
      taosRUnLockLatch(&pMeta->lock);
      return *ppTask;
    }
L
Liu Jicong 已提交
214
  }
215

L
Liu Jicong 已提交
216 217 218 219 220 221
  taosRUnLockLatch(&pMeta->lock);
  return NULL;
}

void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
  int32_t left = atomic_sub_fetch_32(&pTask->refCnt, 1);
222 223 224
  if (left < 0) {
    qError("task ref is invalid, ref:%d, %s", left, pTask->id.idStr);
  } else if (left == 0) {
225
    ASSERT(streamTaskShouldStop(&pTask->status));
226
    tFreeStreamTask(pTask);
L
Liu Jicong 已提交
227 228 229
  }
}

230
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
H
Haojun Liao 已提交
231 232
  taosWLockLatch(&pMeta->lock);

L
Liu Jicong 已提交
233 234 235
  SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
  if (ppTask) {
    SStreamTask* pTask = *ppTask;
236 237 238

    taosWLockLatch(&pMeta->lock);

L
Liu Jicong 已提交
239
    taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
L
Liu Jicong 已提交
240
    tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn);
241 242

    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__STOP);
L
Liu Jicong 已提交
243

244 245 246 247 248 249 250 251 252
    int32_t num = taosArrayGetSize(pMeta->pTaskList);
    for(int32_t i = 0; i < num; ++i) {
      int32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i);
      if (*pTaskId == taskId) {
        taosArrayRemove(pMeta->pTaskList, i);
        break;
      }
    }

L
Liu Jicong 已提交
253 254
    streamMetaReleaseTask(pMeta, pTask);
  }
H
Haojun Liao 已提交
255 256

  taosWUnLockLatch(&pMeta->lock);
L
Liu Jicong 已提交
257 258
}

L
Liu Jicong 已提交
259
int32_t streamMetaBegin(SStreamMeta* pMeta) {
260 261
  if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
L
Liu Jicong 已提交
262 263 264 265 266 267
    return -1;
  }
  return 0;
}

int32_t streamMetaCommit(SStreamMeta* pMeta) {
268
  if (tdbCommit(pMeta->db, pMeta->txn) < 0) {
269
    qError("failed to commit stream meta");
L
Liu Jicong 已提交
270 271
    return -1;
  }
272

273
  if (tdbPostCommit(pMeta->db, pMeta->txn) < 0) {
274
    qError("failed to commit stream meta");
275 276
    return -1;
  }
277 278 279

  if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
280 281
    return -1;
  }
L
Liu Jicong 已提交
282 283 284
  return 0;
}

285
int32_t streamMetaAbort(SStreamMeta* pMeta) {
286
  if (tdbAbort(pMeta->db, pMeta->txn) < 0) {
287 288
    return -1;
  }
289 290 291

  if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
292 293
    return -1;
  }
L
Liu Jicong 已提交
294 295
  return 0;
}
296

L
Liu Jicong 已提交
297
int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
L
Liu Jicong 已提交
298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313
  TBC* pCur = NULL;
  if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
    return -1;
  }

  void*    pKey = NULL;
  int32_t  kLen = 0;
  void*    pVal = NULL;
  int32_t  vLen = 0;
  SDecoder decoder;

  tdbTbcMoveToFirst(pCur);

  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
    SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
    if (pTask == NULL) {
L
Liu Jicong 已提交
314 315
      tdbFree(pKey);
      tdbFree(pVal);
5
54liuyao 已提交
316
      tdbTbcClose(pCur);
L
Liu Jicong 已提交
317 318
      return -1;
    }
319

L
Liu Jicong 已提交
320
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
321
    tDecodeStreamTask(&decoder, pTask);
L
Liu Jicong 已提交
322
    tDecoderClear(&decoder);
323

324
    if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.version) < 0) {
L
Liu Jicong 已提交
325 326
      tdbFree(pKey);
      tdbFree(pVal);
5
54liuyao 已提交
327
      tdbTbcClose(pCur);
328 329 330
      return -1;
    }

331
    if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
L
Liu Jicong 已提交
332 333
      tdbFree(pKey);
      tdbFree(pVal);
5
54liuyao 已提交
334
      tdbTbcClose(pCur);
335 336
      return -1;
    }
337

338 339
    taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);

L
Liu Jicong 已提交
340
    if (pTask->fillHistory) {
341
      pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM;
L
Liu Jicong 已提交
342 343
      streamTaskCheckDownstream(pTask, ver);
    }
344 345
  }

346 347
  tdbFree(pKey);
  tdbFree(pVal);
348 349
  if (tdbTbcClose(pCur) < 0) {
    return -1;
L
Liu Jicong 已提交
350 351 352 353
  }

  return 0;
}