streamMeta.c 8.2 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "executor.h"
17
#include "streamInc.h"
L
Liu Jicong 已提交
18
#include "ttimer.h"
L
Liu Jicong 已提交
19

20
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) {
dengyihao's avatar
dengyihao 已提交
21
  int32_t      code = -1;
L
Liu Jicong 已提交
22 23 24 25 26
  SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
  if (pMeta == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
27

L
Liu Jicong 已提交
28 29
  int32_t len = strlen(path) + 20;
  char*   streamPath = taosMemoryCalloc(1, len);
30
  sprintf(streamPath, "%s/%s", path, "stream");
31
  pMeta->path = taosStrdup(streamPath);
32
  if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) {
L
Liu Jicong 已提交
33
    taosMemoryFree(streamPath);
L
Liu Jicong 已提交
34 35 36
    goto _err;
  }

L
Liu Jicong 已提交
37
  sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints");
dengyihao's avatar
dengyihao 已提交
38
  code = taosMulModeMkDir(streamPath, 0755);
dengyihao's avatar
dengyihao 已提交
39 40 41 42 43
  if (code != 0) {
    terrno = TAOS_SYSTEM_ERROR(code);
    taosMemoryFree(streamPath);
    goto _err;
  }
L
Liu Jicong 已提交
44
  taosMemoryFree(streamPath);
45

46
  if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
L
Liu Jicong 已提交
47 48 49
    goto _err;
  }

50
  if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) {
L
Liu Jicong 已提交
51 52 53
    goto _err;
  }

54 55
  _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
  pMeta->pTasks = taosHashInit(64, fp, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
56 57 58 59 60 61 62 63
  if (pMeta->pTasks == NULL) {
    goto _err;
  }

  if (streamMetaBegin(pMeta) < 0) {
    goto _err;
  }

64
  pMeta->vgId = vgId;
L
Liu Jicong 已提交
65 66
  pMeta->ahandle = ahandle;
  pMeta->expandFunc = expandFunc;
67
  taosInitRWLatch(&pMeta->lock);
68
  return pMeta;
L
Liu Jicong 已提交
69

L
Liu Jicong 已提交
70
_err:
L
Liu Jicong 已提交
71
  taosMemoryFree(pMeta->path);
L
Liu Jicong 已提交
72 73
  if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
  if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
74
  if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb);
L
Liu Jicong 已提交
75 76
  if (pMeta->db) tdbClose(pMeta->db);
  taosMemoryFree(pMeta);
L
Liu Jicong 已提交
77 78 79 80
  return NULL;
}

void streamMetaClose(SStreamMeta* pMeta) {
81
  tdbAbort(pMeta->db, pMeta->txn);
82
  tdbTbClose(pMeta->pTaskDb);
83
  tdbTbClose(pMeta->pCheckpointDb);
84
  tdbClose(pMeta->db);
L
Liu Jicong 已提交
85 86 87 88

  void* pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pMeta->pTasks, pIter);
89 90 91 92
    if (pIter == NULL) {
      break;
    }

L
Liu Jicong 已提交
93
    SStreamTask* pTask = *(SStreamTask**)pIter;
L
Liu Jicong 已提交
94 95 96 97
    if (pTask->timer) {
      taosTmrStop(pTask->timer);
      pTask->timer = NULL;
    }
98

99
    tFreeStreamTask(pTask);
L
Liu Jicong 已提交
100
  }
101

L
Liu Jicong 已提交
102 103 104 105 106
  taosHashCleanup(pMeta->pTasks);
  taosMemoryFree(pMeta->path);
  taosMemoryFree(pMeta);
}

107 108
#if 0
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t ver, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
109 110 111 112 113 114
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
    return -1;
  }
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
115
  if (tDecodeStreamTask(&decoder, pTask) < 0) {
L
Liu Jicong 已提交
116
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
117 118 119 120
    goto FAIL;
  }
  tDecoderClear(&decoder);

121
  if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
L
Liu Jicong 已提交
122 123 124 125
    ASSERT(0);
    goto FAIL;
  }

126
  if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
L
Liu Jicong 已提交
127 128
    goto FAIL;
  }
L
Liu Jicong 已提交
129

130 131
  if (tdbTbUpsert(pMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), msg, msgLen, pMeta->txn) < 0) {
    taosHashRemove(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t));
L
Liu Jicong 已提交
132
    ASSERT(0);
L
Liu Jicong 已提交
133
    goto FAIL;
L
Liu Jicong 已提交
134
  }
L
Liu Jicong 已提交
135

L
Liu Jicong 已提交
136 137 138
  return 0;

FAIL:
139
  if (pTask) tFreeStreamTask(pTask);
L
Liu Jicong 已提交
140
  return -1;
L
Liu Jicong 已提交
141
}
142
#endif
L
Liu Jicong 已提交
143

L
Liu Jicong 已提交
144 145
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
  void*   buf = NULL;
L
Liu Jicong 已提交
146 147
  int32_t len;
  int32_t code;
148
  tEncodeSize(tEncodeStreamTask, pTask, len, code);
L
Liu Jicong 已提交
149 150 151
  if (code < 0) {
    return -1;
  }
L
Liu Jicong 已提交
152
  buf = taosMemoryCalloc(1, len);
L
Liu Jicong 已提交
153 154 155 156
  if (buf == NULL) {
    return -1;
  }

157
  SEncoder encoder = {0};
L
Liu Jicong 已提交
158
  tEncoderInit(&encoder, buf, len);
159
  tEncodeStreamTask(&encoder, pTask);
160
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
161

162
  if (tdbTbUpsert(pMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), buf, len, pMeta->txn) < 0) {
L
Liu Jicong 已提交
163 164 165
    return -1;
  }

L
Liu Jicong 已提交
166
  taosMemoryFree(buf);
L
Liu Jicong 已提交
167 168 169
  return 0;
}

170 171
// add to the ready tasks hash map, not the restored tasks hash map
int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) {
L
Liu Jicong 已提交
172
  if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
L
liuyao 已提交
173
    tFreeStreamTask(pTask);
L
Liu Jicong 已提交
174 175 176 177
    return -1;
  }

  if (streamMetaSaveTask(pMeta, pTask) < 0) {
L
liuyao 已提交
178
    tFreeStreamTask(pTask);
L
Liu Jicong 已提交
179 180 181
    return -1;
  }

182
  taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES);
L
Liu Jicong 已提交
183 184
  return 0;
}
185 186

int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta) {
187
  return (int32_t) taosHashGetSize(pMeta->pTasks);
188
}
L
Liu Jicong 已提交
189

L
Liu Jicong 已提交
190 191 192 193
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
  taosRLockLatch(&pMeta->lock);

  SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
194
  if (ppTask != NULL) {
195
    if (!streamTaskShouldStop(&(*ppTask)->status)) {
196 197 198 199
      atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
      taosRUnLockLatch(&pMeta->lock);
      return *ppTask;
    }
L
Liu Jicong 已提交
200
  }
201

L
Liu Jicong 已提交
202 203 204 205 206 207 208 209
  taosRUnLockLatch(&pMeta->lock);
  return NULL;
}

void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
  int32_t left = atomic_sub_fetch_32(&pTask->refCnt, 1);
  ASSERT(left >= 0);
  if (left == 0) {
210
    ASSERT(streamTaskShouldStop(&pTask->status));
211
    tFreeStreamTask(pTask);
L
Liu Jicong 已提交
212 213 214
  }
}

215
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
L
Liu Jicong 已提交
216 217 218 219
  SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
  if (ppTask) {
    SStreamTask* pTask = *ppTask;
    taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
L
Liu Jicong 已提交
220
    tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn);
221 222

    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__STOP);
L
Liu Jicong 已提交
223 224 225 226 227 228 229

    taosWLockLatch(&pMeta->lock);
    streamMetaReleaseTask(pMeta, pTask);
    taosWUnLockLatch(&pMeta->lock);
  }
}

L
Liu Jicong 已提交
230
int32_t streamMetaBegin(SStreamMeta* pMeta) {
231 232
  if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
L
Liu Jicong 已提交
233 234 235 236 237 238
    return -1;
  }
  return 0;
}

int32_t streamMetaCommit(SStreamMeta* pMeta) {
239
  if (tdbCommit(pMeta->db, pMeta->txn) < 0) {
240
    ASSERT(0);
L
Liu Jicong 已提交
241 242
    return -1;
  }
243

244
  if (tdbPostCommit(pMeta->db, pMeta->txn) < 0) {
245
    ASSERT(0);
246 247
    return -1;
  }
248 249 250

  if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
251 252
    return -1;
  }
L
Liu Jicong 已提交
253 254 255
  return 0;
}

256
int32_t streamMetaAbort(SStreamMeta* pMeta) {
257
  if (tdbAbort(pMeta->db, pMeta->txn) < 0) {
258 259
    return -1;
  }
260 261 262

  if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
263 264
    return -1;
  }
L
Liu Jicong 已提交
265 266
  return 0;
}
267

L
Liu Jicong 已提交
268
int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
L
Liu Jicong 已提交
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
  TBC* pCur = NULL;
  if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
    return -1;
  }

  void*    pKey = NULL;
  int32_t  kLen = 0;
  void*    pVal = NULL;
  int32_t  vLen = 0;
  SDecoder decoder;

  tdbTbcMoveToFirst(pCur);

  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
    SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
    if (pTask == NULL) {
L
Liu Jicong 已提交
285 286
      tdbFree(pKey);
      tdbFree(pVal);
5
54liuyao 已提交
287
      tdbTbcClose(pCur);
L
Liu Jicong 已提交
288 289
      return -1;
    }
290

L
Liu Jicong 已提交
291
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
292
    tDecodeStreamTask(&decoder, pTask);
L
Liu Jicong 已提交
293
    tDecoderClear(&decoder);
294

295
    if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.version) < 0) {
L
Liu Jicong 已提交
296 297
      tdbFree(pKey);
      tdbFree(pVal);
5
54liuyao 已提交
298
      tdbTbcClose(pCur);
299 300 301
      return -1;
    }

302
    if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
L
Liu Jicong 已提交
303 304
      tdbFree(pKey);
      tdbFree(pVal);
5
54liuyao 已提交
305
      tdbTbcClose(pCur);
306 307
      return -1;
    }
308

L
Liu Jicong 已提交
309
    if (pTask->fillHistory) {
310
      pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM;
L
Liu Jicong 已提交
311 312
      streamTaskCheckDownstream(pTask, ver);
    }
313 314
  }

315 316
  tdbFree(pKey);
  tdbFree(pVal);
317 318
  if (tdbTbcClose(pCur) < 0) {
    return -1;
L
Liu Jicong 已提交
319 320 321 322
  }

  return 0;
}