streamMeta.c 7.2 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "executor.h"
17
#include "streamInc.h"
L
Liu Jicong 已提交
18
#include "ttimer.h"
L
Liu Jicong 已提交
19 20 21 22 23 24 25

SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) {
  SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
  if (pMeta == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }
L
Liu Jicong 已提交
26 27
  int32_t len = strlen(path) + 20;
  char*   streamPath = taosMemoryCalloc(1, len);
28 29
  sprintf(streamPath, "%s/%s", path, "stream");
  pMeta->path = strdup(streamPath);
L
Liu Jicong 已提交
30
  if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) {
L
Liu Jicong 已提交
31
    taosMemoryFree(streamPath);
L
Liu Jicong 已提交
32 33 34
    goto _err;
  }

L
Liu Jicong 已提交
35
  sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints");
wafwerar's avatar
wafwerar 已提交
36
  taosMulModeMkDir(streamPath, 0755);
L
Liu Jicong 已提交
37
  taosMemoryFree(streamPath);
38

L
Liu Jicong 已提交
39 40 41 42
  if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb) < 0) {
    goto _err;
  }

43
  if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb) < 0) {
L
Liu Jicong 已提交
44 45 46
    goto _err;
  }

L
Liu Jicong 已提交
47 48 49 50 51 52 53 54 55
  pMeta->pTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
  if (pMeta->pTasks == NULL) {
    goto _err;
  }

  if (streamMetaBegin(pMeta) < 0) {
    goto _err;
  }

L
Liu Jicong 已提交
56 57
  pMeta->ahandle = ahandle;
  pMeta->expandFunc = expandFunc;
L
Liu Jicong 已提交
58

59
  return pMeta;
L
Liu Jicong 已提交
60

L
Liu Jicong 已提交
61
_err:
L
Liu Jicong 已提交
62
  taosMemoryFree(pMeta->path);
L
Liu Jicong 已提交
63 64
  if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
  if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
65
  if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb);
L
Liu Jicong 已提交
66 67
  if (pMeta->db) tdbClose(pMeta->db);
  taosMemoryFree(pMeta);
L
Liu Jicong 已提交
68 69 70 71
  return NULL;
}

void streamMetaClose(SStreamMeta* pMeta) {
72 73
  tdbCommit(pMeta->db, &pMeta->txn);
  tdbTbClose(pMeta->pTaskDb);
74
  tdbTbClose(pMeta->pCheckpointDb);
75
  tdbClose(pMeta->db);
L
Liu Jicong 已提交
76 77 78 79 80 81 82 83 84 85 86 87 88

  void* pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pMeta->pTasks, pIter);
    if (pIter == NULL) break;
    SStreamTask* pTask = *(SStreamTask**)pIter;
    tFreeSStreamTask(pTask);
  }
  taosHashCleanup(pMeta->pTasks);
  taosMemoryFree(pMeta->path);
  taosMemoryFree(pMeta);
}

89
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen) {
L
Liu Jicong 已提交
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
  SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
  if (pTask == NULL) {
    return -1;
  }
  SDecoder decoder;
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
  if (tDecodeSStreamTask(&decoder, pTask) < 0) {
    ASSERT(0);
    goto FAIL;
  }
  tDecoderClear(&decoder);

  if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) {
    ASSERT(0);
    goto FAIL;
  }

L
Liu Jicong 已提交
107 108 109
  if (taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
    goto FAIL;
  }
L
Liu Jicong 已提交
110 111

  if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), msg, msgLen, &pMeta->txn) < 0) {
L
Liu Jicong 已提交
112
    taosHashRemove(pMeta->pTasks, &pTask->taskId, sizeof(int32_t));
L
Liu Jicong 已提交
113
    ASSERT(0);
L
Liu Jicong 已提交
114
    goto FAIL;
L
Liu Jicong 已提交
115 116 117 118
  }
  return 0;

FAIL:
L
Liu Jicong 已提交
119
  if (pTask) tFreeSStreamTask(pTask);
L
Liu Jicong 已提交
120
  return -1;
L
Liu Jicong 已提交
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
}

int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) {
  void* buf = NULL;
  if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) {
    return -1;
  }
  taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));

  int32_t len;
  int32_t code;
  tEncodeSize(tEncodeSStreamTask, pTask, len, code);
  if (code < 0) {
    return -1;
  }
  buf = taosMemoryCalloc(1, sizeof(len));
  if (buf == NULL) {
    return -1;
  }

  SEncoder encoder;
  tEncoderInit(&encoder, buf, len);
  tEncodeSStreamTask(&encoder, pTask);

  if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), buf, len, &pMeta->txn) < 0) {
    ASSERT(0);
    return -1;
  }

  return 0;
}

L
Liu Jicong 已提交
153 154 155 156 157 158 159 160 161 162
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId) {
  SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
  if (ppTask) {
    ASSERT((*ppTask)->taskId == taskId);
    return *ppTask;
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
163 164 165 166 167 168 169
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
  SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
  if (ppTask) {
    SStreamTask* pTask = *ppTask;
    taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
    atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING);

L
Liu Jicong 已提交
170 171 172 173
    if (tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), &pMeta->txn) < 0) {
      /*return -1;*/
    }

L
Liu Jicong 已提交
174 175 176 177
    if (pTask->triggerParam != 0) {
      taosTmrStop(pTask->timer);
    }

L
Liu Jicong 已提交
178 179 180 181 182 183
    while (1) {
      int8_t schedStatus =
          atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__DROPPING);
      if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
        tFreeSStreamTask(pTask);
        break;
L
Liu Jicong 已提交
184 185
      } else if (schedStatus == TASK_SCHED_STATUS__DROPPING) {
        break;
L
Liu Jicong 已提交
186
      }
L
Liu Jicong 已提交
187
      taosMsleep(10);
L
Liu Jicong 已提交
188
    }
L
Liu Jicong 已提交
189
  }
L
Liu Jicong 已提交
190

L
Liu Jicong 已提交
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
  return 0;
}

int32_t streamMetaBegin(SStreamMeta* pMeta) {
  if (tdbTxnOpen(&pMeta->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
      0) {
    return -1;
  }

  if (tdbBegin(pMeta->db, &pMeta->txn) < 0) {
    return -1;
  }
  return 0;
}

int32_t streamMetaCommit(SStreamMeta* pMeta) {
  if (tdbCommit(pMeta->db, &pMeta->txn) < 0) {
    return -1;
  }
210 211 212 213 214 215 216 217
  memset(&pMeta->txn, 0, sizeof(TXN));
  if (tdbTxnOpen(&pMeta->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
      0) {
    return -1;
  }
  if (tdbBegin(pMeta->db, &pMeta->txn) < 0) {
    return -1;
  }
L
Liu Jicong 已提交
218 219 220
  return 0;
}

221 222 223 224 225 226 227 228 229 230 231 232
int32_t streamMetaAbort(SStreamMeta* pMeta) {
  if (tdbAbort(pMeta->db, &pMeta->txn) < 0) {
    return -1;
  }
  memset(&pMeta->txn, 0, sizeof(TXN));
  if (tdbTxnOpen(&pMeta->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
      0) {
    return -1;
  }
  if (tdbBegin(pMeta->db, &pMeta->txn) < 0) {
    return -1;
  }
L
Liu Jicong 已提交
233 234
  return 0;
}
235

L
Liu Jicong 已提交
236
int32_t streamLoadTasks(SStreamMeta* pMeta) {
L
Liu Jicong 已提交
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
  TBC* pCur = NULL;
  if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
    ASSERT(0);
    return -1;
  }

  void*    pKey = NULL;
  int32_t  kLen = 0;
  void*    pVal = NULL;
  int32_t  vLen = 0;
  SDecoder decoder;

  tdbTbcMoveToFirst(pCur);

  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
    SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
    if (pTask == NULL) {
L
Liu Jicong 已提交
254 255
      tdbFree(pKey);
      tdbFree(pVal);
L
Liu Jicong 已提交
256 257 258 259 260
      return -1;
    }
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
    tDecodeSStreamTask(&decoder, pTask);
    tDecoderClear(&decoder);
261 262

    if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) {
L
Liu Jicong 已提交
263 264
      tdbFree(pKey);
      tdbFree(pVal);
265 266 267 268
      return -1;
    }

    if (taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
L
Liu Jicong 已提交
269 270
      tdbFree(pKey);
      tdbFree(pVal);
271 272 273 274
      return -1;
    }
  }

275 276
  tdbFree(pKey);
  tdbFree(pVal);
277 278
  if (tdbTbcClose(pCur) < 0) {
    return -1;
L
Liu Jicong 已提交
279 280 281 282
  }

  return 0;
}