tqRead.c 13.0 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
L
Liu Jicong 已提交
17

L
Liu Jicong 已提交
18
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) {
L
Liu Jicong 已提交
19 20 21 22 23
  int32_t code = 0;
  taosThreadMutexLock(&pHandle->pWalReader->mutex);
  int64_t offset = *fetchOffset;

  while (1) {
L
Liu Jicong 已提交
24
    if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) {
L
Liu Jicong 已提交
25 26
      tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return",
              pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode), offset);
L
Liu Jicong 已提交
27 28 29 30 31
      *fetchOffset = offset - 1;
      code = -1;
      goto END;
    }

L
Liu Jicong 已提交
32 33
    if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) {
      code = walFetchBody(pHandle->pWalReader, ppCkHead);
L
Liu Jicong 已提交
34 35 36 37 38 39 40 41 42 43 44

      if (code < 0) {
        ASSERT(0);
        *fetchOffset = offset;
        code = -1;
        goto END;
      }
      *fetchOffset = offset;
      code = 0;
      goto END;
    } else {
L
Liu Jicong 已提交
45
      if (pHandle->fetchMeta) {
L
Liu Jicong 已提交
46
        SWalCont* pHead = &((*ppCkHead)->head);
L
Liu Jicong 已提交
47
        if (IS_META_MSG(pHead->msgType)) {
L
Liu Jicong 已提交
48
          code = walFetchBody(pHandle->pWalReader, ppCkHead);
L
Liu Jicong 已提交
49 50 51 52 53 54 55 56 57 58 59 60

          if (code < 0) {
            ASSERT(0);
            *fetchOffset = offset;
            code = -1;
            goto END;
          }
          *fetchOffset = offset;
          code = 0;
          goto END;
        }
      }
L
Liu Jicong 已提交
61
      code = walSkipFetchBody(pHandle->pWalReader, *ppCkHead);
L
Liu Jicong 已提交
62 63 64 65 66 67 68 69 70 71 72 73 74 75
      if (code < 0) {
        ASSERT(0);
        *fetchOffset = offset;
        code = -1;
        goto END;
      }
      offset++;
    }
  }
END:
  taosThreadMutexUnlock(&pHandle->pWalReader->mutex);
  return code;
}

L
Liu Jicong 已提交
76 77 78
STqReader* tqOpenReader(SVnode* pVnode) {
  STqReader* pReader = taosMemoryMalloc(sizeof(STqReader));
  if (pReader == NULL) {
L
Liu Jicong 已提交
79 80
    return NULL;
  }
L
Liu Jicong 已提交
81

L
Liu Jicong 已提交
82 83 84 85
  pReader->pWalReader = walOpenReader(pVnode->pWal, NULL);
  if (pReader->pWalReader == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100

  pReader->pVnodeMeta = pVnode->pMeta;
  pReader->pMsg = NULL;
  pReader->ver = -1;
  pReader->pColIdList = NULL;
  pReader->cachedSchemaVer = 0;
  pReader->cachedSchemaSuid = 0;
  pReader->pSchema = NULL;
  pReader->pSchemaWrapper = NULL;
  pReader->tbIdHash = NULL;
  return pReader;
}

void tqCloseReader(STqReader* pReader) {
  // close wal reader
101 102 103
  if (pReader->pWalReader) {
    walCloseReader(pReader->pWalReader);
  }
L
Liu Jicong 已提交
104
  // free cached schema
105 106 107 108 109 110 111 112 113
  if (pReader->pSchema) {
    taosMemoryFree(pReader->pSchema);
  }
  if (pReader->pSchemaWrapper) {
    tDeleteSSchemaWrapper(pReader->pSchemaWrapper);
  }
  if (pReader->pColIdList) {
    taosArrayDestroy(pReader->pColIdList);
  }
L
Liu Jicong 已提交
114
  // free hash
115
  taosHashCleanup(pReader->tbIdHash);
L
Liu Jicong 已提交
116 117 118
  taosMemoryFree(pReader);
}

L
Liu Jicong 已提交
119
int32_t tqSeekVer(STqReader* pReader, int64_t ver) {
L
Liu Jicong 已提交
120 121 122 123 124 125 126
  if (walReadSeekVer(pReader->pWalReader, ver) < 0) {
    ASSERT(pReader->pWalReader->curInvalid);
    ASSERT(pReader->pWalReader->curVersion == ver);
    return -1;
  }
  ASSERT(pReader->pWalReader->curVersion == ver);
  return 0;
L
Liu Jicong 已提交
127 128
}

L
Liu Jicong 已提交
129 130 131 132 133 134
int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
  bool fromProcessedMsg = pReader->pMsg != NULL;

  while (1) {
    if (!fromProcessedMsg) {
      if (walNextValidMsg(pReader->pWalReader) < 0) {
135 136
        pReader->ver =
            pReader->pWalReader->curVersion - (pReader->pWalReader->curInvalid | pReader->pWalReader->curStopped);
L
Liu Jicong 已提交
137 138
        ret->offset.type = TMQ_OFFSET__LOG;
        ret->offset.version = pReader->ver;
L
Liu Jicong 已提交
139
        ret->fetchType = FETCH_TYPE__NONE;
S
Shengliang Guan 已提交
140
        tqDebug("return offset %" PRId64 ", no more valid", ret->offset.version);
L
Liu Jicong 已提交
141
        ASSERT(ret->offset.version >= 0);
L
Liu Jicong 已提交
142 143 144 145 146 147 148 149 150 151 152 153 154 155
        return -1;
      }
      void* body = pReader->pWalReader->pHead->head.body;
      if (pReader->pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
        // TODO do filter
        ret->fetchType = FETCH_TYPE__META;
        ret->meta = pReader->pWalReader->pHead->head.body;
        return 0;
      } else {
        tqReaderSetDataMsg(pReader, body, pReader->pWalReader->pHead->head.version);
      }
    }

    while (tqNextDataBlock(pReader)) {
L
Liu Jicong 已提交
156
      // TODO mem free
L
Liu Jicong 已提交
157 158 159
      memset(&ret->data, 0, sizeof(SSDataBlock));
      int32_t code = tqRetrieveDataBlock(&ret->data, pReader);
      if (code != 0 || ret->data.info.rows == 0) {
L
Liu Jicong 已提交
160
        ASSERT(0);
L
Liu Jicong 已提交
161
        continue;
L
Liu Jicong 已提交
162 163 164 165 166 167
      }
      ret->fetchType = FETCH_TYPE__DATA;
      return 0;
    }

    if (fromProcessedMsg) {
L
Liu Jicong 已提交
168 169
      ret->offset.type = TMQ_OFFSET__LOG;
      ret->offset.version = pReader->ver;
L
Liu Jicong 已提交
170
      ASSERT(pReader->ver >= 0);
L
Liu Jicong 已提交
171
      ret->fetchType = FETCH_TYPE__NONE;
S
Shengliang Guan 已提交
172
      tqDebug("return offset %" PRId64 ", processed finish", ret->offset.version);
L
Liu Jicong 已提交
173 174 175
      return 0;
    }
  }
L
Liu Jicong 已提交
176 177
}

L
Liu Jicong 已提交
178 179
int32_t tqReaderSetDataMsg(STqReader* pReader, SSubmitReq* pMsg, int64_t ver) {
  pReader->pMsg = pMsg;
L
Liu Jicong 已提交
180

L
Liu Jicong 已提交
181
  if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
L
Liu Jicong 已提交
182
  while (true) {
L
Liu Jicong 已提交
183 184
    if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) return -1;
    if (pReader->pBlock == NULL) break;
L
Liu Jicong 已提交
185 186
  }

L
Liu Jicong 已提交
187 188 189
  if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
  pReader->ver = ver;
  memset(&pReader->blkIter, 0, sizeof(SSubmitBlkIter));
L
Liu Jicong 已提交
190
  return 0;
L
Liu Jicong 已提交
191 192
}

L
Liu Jicong 已提交
193 194
bool tqNextDataBlock(STqReader* pReader) {
  if (pReader->pMsg == NULL) return false;
L
Liu Jicong 已提交
195
  while (1) {
L
Liu Jicong 已提交
196
    if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) {
L
Liu Jicong 已提交
197 198
      return false;
    }
L
Liu Jicong 已提交
199 200
    if (pReader->pBlock == NULL) {
      pReader->pMsg = NULL;
5
54liuyao 已提交
201 202
      return false;
    }
L
Liu Jicong 已提交
203

L
Liu Jicong 已提交
204
    if (pReader->tbIdHash == NULL) {
L
Liu Jicong 已提交
205 206
      return true;
    }
L
Liu Jicong 已提交
207
    void* ret = taosHashGet(pReader->tbIdHash, &pReader->msgIter.uid, sizeof(int64_t));
S
Shengliang Guan 已提交
208
    /*tqDebug("search uid %" PRId64, pHandle->msgIter.uid);*/
L
Liu Jicong 已提交
209
    if (ret != NULL) {
S
Shengliang Guan 已提交
210
      /*tqDebug("find   uid %" PRId64, pHandle->msgIter.uid);*/
L
Liu Jicong 已提交
211
      return true;
212 213 214 215 216
    }
  }
  return false;
}

L
Liu Jicong 已提交
217
bool tqNextDataBlockFilterOut(STqReader* pHandle, SHashObj* filterOutUids) {
218 219 220 221 222 223 224 225 226 227
  while (1) {
    if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
      return false;
    }
    if (pHandle->pBlock == NULL) return false;

    ASSERT(pHandle->tbIdHash == NULL);
    void* ret = taosHashGet(filterOutUids, &pHandle->msgIter.uid, sizeof(int64_t));
    if (ret == NULL) {
      return true;
L
Liu Jicong 已提交
228 229 230 231 232
    }
  }
  return false;
}

L
Liu Jicong 已提交
233
int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) {
234
  // TODO: cache multiple schema
L
Liu Jicong 已提交
235 236 237 238 239 240
  int32_t sversion = htonl(pReader->pBlock->sversion);
  if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion ||
      pReader->cachedSchemaSuid != pReader->msgIter.suid) {
    if (pReader->pSchema) taosMemoryFree(pReader->pSchema);
    pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion);
    if (pReader->pSchema == NULL) {
S
Shengliang Guan 已提交
241
      tqWarn("cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 "), version %d, possibly dropped table",
L
Liu Jicong 已提交
242
             pReader->msgIter.uid, pReader->msgIter.suid, pReader->cachedSchemaVer);
L
Liu Jicong 已提交
243
      /*ASSERT(0);*/
244
      terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
L
Liu Jicong 已提交
245 246
      return -1;
    }
L
Liu Jicong 已提交
247

L
Liu Jicong 已提交
248 249 250
    if (pReader->pSchemaWrapper) tDeleteSSchemaWrapper(pReader->pSchemaWrapper);
    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion, true);
    if (pReader->pSchemaWrapper == NULL) {
S
Shengliang Guan 已提交
251
      tqWarn("cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
L
Liu Jicong 已提交
252
             pReader->msgIter.uid, pReader->cachedSchemaVer);
L
Liu Jicong 已提交
253 254 255 256
      /*ASSERT(0);*/
      terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
      return -1;
    }
L
Liu Jicong 已提交
257 258
    pReader->cachedSchemaVer = sversion;
    pReader->cachedSchemaSuid = pReader->msgIter.suid;
L
Liu Jicong 已提交
259 260
  }

L
Liu Jicong 已提交
261 262
  STSchema*       pTschema = pReader->pSchema;
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
L
Liu Jicong 已提交
263

L
Liu Jicong 已提交
264
  int32_t colNumNeed = taosArrayGetSize(pReader->pColIdList);
L
Liu Jicong 已提交
265

L
Liu Jicong 已提交
266 267 268 269
  if (colNumNeed == 0) {
    int32_t colMeta = 0;
    while (colMeta < pSchemaWrapper->nCols) {
      SSchema*        pColSchema = &pSchemaWrapper->pSchema[colMeta];
270
      SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
L
Liu Jicong 已提交
271
      int32_t         code = blockDataAppendColInfo(pBlock, &colInfo);
272
      if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
273
        goto FAIL;
L
Liu Jicong 已提交
274
      }
L
Liu Jicong 已提交
275
      colMeta++;
L
Liu Jicong 已提交
276 277 278 279 280 281 282 283 284 285 286
    }
  } else {
    if (colNumNeed > pSchemaWrapper->nCols) {
      colNumNeed = pSchemaWrapper->nCols;
    }

    int32_t colMeta = 0;
    int32_t colNeed = 0;
    while (colMeta < pSchemaWrapper->nCols && colNeed < colNumNeed) {
      SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta];
      col_id_t colIdSchema = pColSchema->colId;
L
Liu Jicong 已提交
287
      col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pReader->pColIdList, colNeed);
L
Liu Jicong 已提交
288 289 290 291 292
      if (colIdSchema < colIdNeed) {
        colMeta++;
      } else if (colIdSchema > colIdNeed) {
        colNeed++;
      } else {
293
        SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
L
Liu Jicong 已提交
294
        int32_t         code = blockDataAppendColInfo(pBlock, &colInfo);
295
        if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
296 297 298 299 300
          goto FAIL;
        }
        colMeta++;
        colNeed++;
      }
L
Liu Jicong 已提交
301 302
    }
  }
L
Liu Jicong 已提交
303

L
Liu Jicong 已提交
304
  if (blockDataEnsureCapacity(pBlock, pReader->msgIter.numOfRows) < 0) {
305
    terrno = TSDB_CODE_OUT_OF_MEMORY;
306 307 308 309
    goto FAIL;
  }

  int32_t colActual = blockDataGetNumOfCols(pBlock);
L
Liu Jicong 已提交
310

L
Liu Jicong 已提交
311 312 313 314
  STSRowIter iter = {0};
  tdSTSRowIterInit(&iter, pTschema);
  STSRow* row;
  int32_t curRow = 0;
315

L
Liu Jicong 已提交
316
  tInitSubmitBlkIter(&pReader->msgIter, pReader->pBlock, &pReader->blkIter);
317

L
Liu Jicong 已提交
318 319
  pBlock->info.uid = pReader->msgIter.uid;
  pBlock->info.rows = pReader->msgIter.numOfRows;
320
  pBlock->info.version = pReader->pMsg->version;
321

L
Liu Jicong 已提交
322
  while ((row = tGetSubmitBlkNext(&pReader->blkIter)) != NULL) {
L
Liu Jicong 已提交
323 324
    tdSTSRowIterReset(&iter, row);
    // get all wanted col of that block
L
Liu Jicong 已提交
325
    for (int32_t i = 0; i < colActual; i++) {
326
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
L
Liu Jicong 已提交
327 328 329 330
      SCellVal         sVal = {0};
      if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) {
        break;
      }
331
      if (colDataAppend(pColData, curRow, sVal.val, sVal.valType != TD_VTYPE_NORM) < 0) {
L
Liu Jicong 已提交
332
        goto FAIL;
L
Liu Jicong 已提交
333
      }
L
Liu Jicong 已提交
334 335 336
    }
    curRow++;
  }
L
Liu Jicong 已提交
337
  return 0;
338

L
Liu Jicong 已提交
339
FAIL:
340
  blockDataFreeRes(pBlock);
L
Liu Jicong 已提交
341
  return -1;
L
Liu Jicong 已提交
342
}
H
Hongze Cheng 已提交
343

L
Liu Jicong 已提交
344
void tqReaderSetColIdList(STqReader* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; }
H
Hongze Cheng 已提交
345

L
Liu Jicong 已提交
346 347 348 349 350
int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList) {
  if (pReader->tbIdHash) {
    taosHashClear(pReader->tbIdHash);
  } else {
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
H
Hongze Cheng 已提交
351 352
  }

L
Liu Jicong 已提交
353
  if (pReader->tbIdHash == NULL) {
H
Hongze Cheng 已提交
354 355 356 357 358 359
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
L
Liu Jicong 已提交
360
    taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
H
Hongze Cheng 已提交
361 362 363 364 365
  }

  return 0;
}

L
Liu Jicong 已提交
366 367 368 369
int tqReaderAddTbUidList(STqReader* pReader, const SArray* tbUidList) {
  if (pReader->tbIdHash == NULL) {
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
    if (pReader->tbIdHash == NULL) {
H
Hongze Cheng 已提交
370 371 372 373 374 375 376
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
  }

  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
L
Liu Jicong 已提交
377
    taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
H
Hongze Cheng 已提交
378 379 380 381
  }

  return 0;
}
382

L
Liu Jicong 已提交
383 384
int tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
  ASSERT(pReader->tbIdHash != NULL);
385

L
Liu Jicong 已提交
386 387
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
L
Liu Jicong 已提交
388
    taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t));
389 390 391 392
  }

  return 0;
}
L
Liu Jicong 已提交
393 394 395 396 397 398 399 400

int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
  void* pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pTq->handles, pIter);
    if (pIter == NULL) break;
    STqHandle* pExec = (STqHandle*)pIter;
    if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
L
Liu Jicong 已提交
401 402
      int32_t code = qUpdateQualifiedTableId(pExec->execHandle.execCol.task, tbUidList, isAdd);
      ASSERT(code == 0);
L
Liu Jicong 已提交
403 404 405 406 407
    } else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__DB) {
      if (!isAdd) {
        int32_t sz = taosArrayGetSize(tbUidList);
        for (int32_t i = 0; i < sz; i++) {
          int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
L
Liu Jicong 已提交
408
          taosHashPut(pExec->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0);
L
Liu Jicong 已提交
409 410 411 412 413 414 415 416 417
        }
      }
    } else {
      // tq update id
    }
  }
  while (1) {
    pIter = taosHashIterate(pTq->pStreamTasks, pIter);
    if (pIter == NULL) break;
418
    SStreamTask* pTask = *(SStreamTask**)pIter;
419
    if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
L
Liu Jicong 已提交
420 421 422 423 424 425
      int32_t code = qUpdateQualifiedTableId(pTask->exec.executor, tbUidList, isAdd);
      ASSERT(code == 0);
    }
  }
  return 0;
}