streamMeta.c 8.5 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "executor.h"
17
#include "streamInc.h"
L
Liu Jicong 已提交
18
#include "ttimer.h"
L
Liu Jicong 已提交
19

20
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) {
dengyihao's avatar
dengyihao 已提交
21
  int32_t      code = -1;
L
Liu Jicong 已提交
22 23 24 25 26
  SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
  if (pMeta == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
27

L
Liu Jicong 已提交
28 29
  int32_t len = strlen(path) + 20;
  char*   streamPath = taosMemoryCalloc(1, len);
30
  sprintf(streamPath, "%s/%s", path, "stream");
31
  pMeta->path = taosStrdup(streamPath);
32
  if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) {
L
Liu Jicong 已提交
33
    taosMemoryFree(streamPath);
L
Liu Jicong 已提交
34 35 36
    goto _err;
  }

L
Liu Jicong 已提交
37
  sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints");
dengyihao's avatar
dengyihao 已提交
38
  code = taosMulModeMkDir(streamPath, 0755);
dengyihao's avatar
dengyihao 已提交
39 40 41 42 43
  if (code != 0) {
    terrno = TAOS_SYSTEM_ERROR(code);
    taosMemoryFree(streamPath);
    goto _err;
  }
L
Liu Jicong 已提交
44
  taosMemoryFree(streamPath);
45

46
  if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
L
Liu Jicong 已提交
47 48 49
    goto _err;
  }

50
  if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) {
L
Liu Jicong 已提交
51 52 53
    goto _err;
  }

54 55
  _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
  pMeta->pTasks = taosHashInit(64, fp, true, HASH_ENTRY_LOCK);
L
Liu Jicong 已提交
56 57 58 59 60 61 62 63
  if (pMeta->pTasks == NULL) {
    goto _err;
  }

  if (streamMetaBegin(pMeta) < 0) {
    goto _err;
  }

64
  pMeta->vgId = vgId;
L
Liu Jicong 已提交
65 66
  pMeta->ahandle = ahandle;
  pMeta->expandFunc = expandFunc;
67
  taosInitRWLatch(&pMeta->lock);
68
  return pMeta;
L
Liu Jicong 已提交
69

L
Liu Jicong 已提交
70
_err:
L
Liu Jicong 已提交
71
  taosMemoryFree(pMeta->path);
L
Liu Jicong 已提交
72 73
  if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
  if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
74
  if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb);
L
Liu Jicong 已提交
75 76
  if (pMeta->db) tdbClose(pMeta->db);
  taosMemoryFree(pMeta);
L
Liu Jicong 已提交
77 78 79 80
  return NULL;
}

void streamMetaClose(SStreamMeta* pMeta) {
81
  tdbAbort(pMeta->db, pMeta->txn);
82
  tdbTbClose(pMeta->pTaskDb);
83
  tdbTbClose(pMeta->pCheckpointDb);
84
  tdbClose(pMeta->db);
L
Liu Jicong 已提交
85 86

  void* pIter = NULL;
87 88 89 90
//  while(pMeta->walScan) {
//    qDebug("wait stream daemon quit");
//    taosMsleep(100);
//  }
91

L
Liu Jicong 已提交
92 93
  while (1) {
    pIter = taosHashIterate(pMeta->pTasks, pIter);
94 95 96 97
    if (pIter == NULL) {
      break;
    }

L
Liu Jicong 已提交
98
    SStreamTask* pTask = *(SStreamTask**)pIter;
L
Liu Jicong 已提交
99 100 101 102
    if (pTask->timer) {
      taosTmrStop(pTask->timer);
      pTask->timer = NULL;
    }
103

104
    tFreeStreamTask(pTask);
L
Liu Jicong 已提交
105
    /*streamMetaReleaseTask(pMeta, pTask);*/
L
Liu Jicong 已提交
106
  }
107

L
Liu Jicong 已提交
108 109 110 111 112
  taosHashCleanup(pMeta->pTasks);
  taosMemoryFree(pMeta->path);
  taosMemoryFree(pMeta);
}

113 114
#if 0
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t ver, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
115 116 117 118 119 120
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
    return -1;
  }
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
121
  if (tDecodeStreamTask(&decoder, pTask) < 0) {
L
Liu Jicong 已提交
122
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
123 124 125 126
    goto FAIL;
  }
  tDecoderClear(&decoder);

127
  if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
L
Liu Jicong 已提交
128 129 130 131
    ASSERT(0);
    goto FAIL;
  }

132
  if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
L
Liu Jicong 已提交
133 134
    goto FAIL;
  }
L
Liu Jicong 已提交
135

136 137
  if (tdbTbUpsert(pMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), msg, msgLen, pMeta->txn) < 0) {
    taosHashRemove(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t));
L
Liu Jicong 已提交
138
    ASSERT(0);
L
Liu Jicong 已提交
139
    goto FAIL;
L
Liu Jicong 已提交
140
  }
L
Liu Jicong 已提交
141

L
Liu Jicong 已提交
142 143 144
  return 0;

FAIL:
145
  if (pTask) tFreeStreamTask(pTask);
L
Liu Jicong 已提交
146
  return -1;
L
Liu Jicong 已提交
147
}
148
#endif
L
Liu Jicong 已提交
149

L
Liu Jicong 已提交
150 151
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
  void*   buf = NULL;
L
Liu Jicong 已提交
152 153
  int32_t len;
  int32_t code;
154
  tEncodeSize(tEncodeStreamTask, pTask, len, code);
L
Liu Jicong 已提交
155 156 157
  if (code < 0) {
    return -1;
  }
L
Liu Jicong 已提交
158
  buf = taosMemoryCalloc(1, len);
L
Liu Jicong 已提交
159 160 161 162
  if (buf == NULL) {
    return -1;
  }

163
  SEncoder encoder = {0};
L
Liu Jicong 已提交
164
  tEncoderInit(&encoder, buf, len);
165
  tEncodeStreamTask(&encoder, pTask);
166
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
167

168
  if (tdbTbUpsert(pMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), buf, len, pMeta->txn) < 0) {
L
Liu Jicong 已提交
169 170 171
    return -1;
  }

L
Liu Jicong 已提交
172
  taosMemoryFree(buf);
L
Liu Jicong 已提交
173 174 175
  return 0;
}

176 177
// add to the ready tasks hash map, not the restored tasks hash map
int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) {
L
Liu Jicong 已提交
178 179 180 181 182 183 184 185
  if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
    return -1;
  }

  if (streamMetaSaveTask(pMeta, pTask) < 0) {
    return -1;
  }

186
  taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES);
L
Liu Jicong 已提交
187 188
  return 0;
}
189 190

int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta) {
191
  return (int32_t) taosHashGetSize(pMeta->pTasks);
192
}
L
Liu Jicong 已提交
193

L
Liu Jicong 已提交
194 195 196 197
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
  taosRLockLatch(&pMeta->lock);

  SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
198
  if (ppTask != NULL && (atomic_load_8(&((*ppTask)->status.taskStatus)) != TASK_STATUS__DROPPING)) {
199 200 201
    atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
    taosRUnLockLatch(&pMeta->lock);
    return *ppTask;
L
Liu Jicong 已提交
202
  }
203

L
Liu Jicong 已提交
204 205 206 207 208 209 210 211
  taosRUnLockLatch(&pMeta->lock);
  return NULL;
}

void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
  int32_t left = atomic_sub_fetch_32(&pTask->refCnt, 1);
  ASSERT(left >= 0);
  if (left == 0) {
212
    ASSERT(atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING);
213
    tFreeStreamTask(pTask);
L
Liu Jicong 已提交
214 215 216
  }
}

217
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
L
Liu Jicong 已提交
218 219 220 221
  SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
  if (ppTask) {
    SStreamTask* pTask = *ppTask;
    taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
L
Liu Jicong 已提交
222
    tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn);
L
Liu Jicong 已提交
223 224 225 226
    /*if (pTask->timer) {
     * taosTmrStop(pTask->timer);*/
    /*pTask->timer = NULL;*/
    /*}*/
227
    atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
L
Liu Jicong 已提交
228 229 230 231 232 233 234

    taosWLockLatch(&pMeta->lock);
    streamMetaReleaseTask(pMeta, pTask);
    taosWUnLockLatch(&pMeta->lock);
  }
}

L
Liu Jicong 已提交
235
int32_t streamMetaBegin(SStreamMeta* pMeta) {
236 237
  if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
L
Liu Jicong 已提交
238 239 240 241 242 243
    return -1;
  }
  return 0;
}

int32_t streamMetaCommit(SStreamMeta* pMeta) {
244
  if (tdbCommit(pMeta->db, pMeta->txn) < 0) {
245
    ASSERT(0);
L
Liu Jicong 已提交
246 247
    return -1;
  }
248

249
  if (tdbPostCommit(pMeta->db, pMeta->txn) < 0) {
250
    ASSERT(0);
251 252
    return -1;
  }
253 254 255

  if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
256 257
    return -1;
  }
L
Liu Jicong 已提交
258 259 260
  return 0;
}

261
int32_t streamMetaAbort(SStreamMeta* pMeta) {
262
  if (tdbAbort(pMeta->db, pMeta->txn) < 0) {
263 264
    return -1;
  }
265 266 267

  if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
               TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
268 269
    return -1;
  }
L
Liu Jicong 已提交
270 271
  return 0;
}
272

L
Liu Jicong 已提交
273
int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
L
Liu Jicong 已提交
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
  TBC* pCur = NULL;
  if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
    return -1;
  }

  void*    pKey = NULL;
  int32_t  kLen = 0;
  void*    pVal = NULL;
  int32_t  vLen = 0;
  SDecoder decoder;

  tdbTbcMoveToFirst(pCur);

  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
    SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
    if (pTask == NULL) {
L
Liu Jicong 已提交
290 291
      tdbFree(pKey);
      tdbFree(pVal);
5
54liuyao 已提交
292
      tdbTbcClose(pCur);
L
Liu Jicong 已提交
293 294 295
      return -1;
    }
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
296
    tDecodeStreamTask(&decoder, pTask);
L
Liu Jicong 已提交
297
    tDecoderClear(&decoder);
298

299 300
    // todo set correct initial version.
    if (pMeta->expandFunc(pMeta->ahandle, pTask, 0) < 0) {
L
Liu Jicong 已提交
301 302
      tdbFree(pKey);
      tdbFree(pVal);
5
54liuyao 已提交
303
      tdbTbcClose(pCur);
304 305 306
      return -1;
    }

307
    if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
L
Liu Jicong 已提交
308 309
      tdbFree(pKey);
      tdbFree(pVal);
5
54liuyao 已提交
310
      tdbTbcClose(pCur);
311 312
      return -1;
    }
313

314
    /*pTask->status.taskStatus = TASK_STATUS__NORMAL;*/
L
Liu Jicong 已提交
315
    if (pTask->fillHistory) {
316
      pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM;
L
Liu Jicong 已提交
317 318
      streamTaskCheckDownstream(pTask, ver);
    }
319 320
  }

321 322
  tdbFree(pKey);
  tdbFree(pVal);
323 324
  if (tdbTbcClose(pCur) < 0) {
    return -1;
L
Liu Jicong 已提交
325 326 327 328
  }

  return 0;
}