tqRead.c 12.4 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
L
Liu Jicong 已提交
17

L
Liu Jicong 已提交
18
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) {
L
Liu Jicong 已提交
19 20 21 22 23
  int32_t code = 0;
  taosThreadMutexLock(&pHandle->pWalReader->mutex);
  int64_t offset = *fetchOffset;

  while (1) {
L
Liu Jicong 已提交
24
    if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) {
L
Liu Jicong 已提交
25 26
      tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return",
              pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode), offset);
L
Liu Jicong 已提交
27 28 29 30 31
      *fetchOffset = offset - 1;
      code = -1;
      goto END;
    }

L
Liu Jicong 已提交
32 33
    if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) {
      code = walFetchBody(pHandle->pWalReader, ppCkHead);
L
Liu Jicong 已提交
34 35 36 37 38 39 40 41 42 43 44

      if (code < 0) {
        ASSERT(0);
        *fetchOffset = offset;
        code = -1;
        goto END;
      }
      *fetchOffset = offset;
      code = 0;
      goto END;
    } else {
L
Liu Jicong 已提交
45
      if (pHandle->fetchMeta) {
L
Liu Jicong 已提交
46
        SWalCont* pHead = &((*ppCkHead)->head);
L
Liu Jicong 已提交
47
        if (IS_META_MSG(pHead->msgType)) {
L
Liu Jicong 已提交
48
          code = walFetchBody(pHandle->pWalReader, ppCkHead);
L
Liu Jicong 已提交
49 50 51 52 53 54 55 56 57 58 59 60

          if (code < 0) {
            ASSERT(0);
            *fetchOffset = offset;
            code = -1;
            goto END;
          }
          *fetchOffset = offset;
          code = 0;
          goto END;
        }
      }
L
Liu Jicong 已提交
61
      code = walSkipFetchBody(pHandle->pWalReader, *ppCkHead);
L
Liu Jicong 已提交
62 63 64 65 66 67 68 69 70 71 72 73 74 75
      if (code < 0) {
        ASSERT(0);
        *fetchOffset = offset;
        code = -1;
        goto END;
      }
      offset++;
    }
  }
END:
  taosThreadMutexUnlock(&pHandle->pWalReader->mutex);
  return code;
}

L
Liu Jicong 已提交
76 77 78
STqReader* tqOpenReader(SVnode* pVnode) {
  STqReader* pReader = taosMemoryMalloc(sizeof(STqReader));
  if (pReader == NULL) {
L
Liu Jicong 已提交
79 80
    return NULL;
  }
L
Liu Jicong 已提交
81

L
Liu Jicong 已提交
82 83 84 85
  pReader->pWalReader = walOpenReader(pVnode->pWal, NULL);
  if (pReader->pWalReader == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105

  pReader->pVnodeMeta = pVnode->pMeta;
  pReader->pMsg = NULL;
  pReader->ver = -1;
  pReader->pColIdList = NULL;
  pReader->cachedSchemaVer = 0;
  pReader->cachedSchemaSuid = 0;
  pReader->pSchema = NULL;
  pReader->pSchemaWrapper = NULL;
  pReader->tbIdHash = NULL;
  return pReader;
}

void tqCloseReader(STqReader* pReader) {
  // close wal reader
  // free cached schema
  // free hash
  taosMemoryFree(pReader);
}

L
Liu Jicong 已提交
106
int32_t tqSeekVer(STqReader* pReader, int64_t ver) {
L
Liu Jicong 已提交
107 108 109 110 111 112 113
  if (walReadSeekVer(pReader->pWalReader, ver) < 0) {
    ASSERT(pReader->pWalReader->curInvalid);
    ASSERT(pReader->pWalReader->curVersion == ver);
    return -1;
  }
  ASSERT(pReader->pWalReader->curVersion == ver);
  return 0;
L
Liu Jicong 已提交
114 115
}

L
Liu Jicong 已提交
116 117 118 119 120 121
int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
  bool fromProcessedMsg = pReader->pMsg != NULL;

  while (1) {
    if (!fromProcessedMsg) {
      if (walNextValidMsg(pReader->pWalReader) < 0) {
L
Liu Jicong 已提交
122
        pReader->ver = pReader->pWalReader->curVersion - pReader->pWalReader->curInvalid;
L
Liu Jicong 已提交
123 124
        ret->offset.type = TMQ_OFFSET__LOG;
        ret->offset.version = pReader->ver;
L
Liu Jicong 已提交
125
        ret->fetchType = FETCH_TYPE__NONE;
L
Liu Jicong 已提交
126
        ASSERT(ret->offset.version >= 0);
L
Liu Jicong 已提交
127 128 129 130 131 132 133 134 135 136 137 138 139 140
        return -1;
      }
      void* body = pReader->pWalReader->pHead->head.body;
      if (pReader->pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
        // TODO do filter
        ret->fetchType = FETCH_TYPE__META;
        ret->meta = pReader->pWalReader->pHead->head.body;
        return 0;
      } else {
        tqReaderSetDataMsg(pReader, body, pReader->pWalReader->pHead->head.version);
      }
    }

    while (tqNextDataBlock(pReader)) {
L
Liu Jicong 已提交
141
      // TODO mem free
L
Liu Jicong 已提交
142 143 144
      memset(&ret->data, 0, sizeof(SSDataBlock));
      int32_t code = tqRetrieveDataBlock(&ret->data, pReader);
      if (code != 0 || ret->data.info.rows == 0) {
L
Liu Jicong 已提交
145
        ASSERT(0);
L
Liu Jicong 已提交
146
        continue;
L
Liu Jicong 已提交
147 148 149 150 151 152
      }
      ret->fetchType = FETCH_TYPE__DATA;
      return 0;
    }

    if (fromProcessedMsg) {
L
Liu Jicong 已提交
153 154
      ret->offset.type = TMQ_OFFSET__LOG;
      ret->offset.version = pReader->ver;
L
Liu Jicong 已提交
155
      ASSERT(pReader->ver >= 0);
L
Liu Jicong 已提交
156 157 158 159
      ret->fetchType = FETCH_TYPE__NONE;
      return 0;
    }
  }
L
Liu Jicong 已提交
160 161
}

L
Liu Jicong 已提交
162 163
int32_t tqReaderSetDataMsg(STqReader* pReader, SSubmitReq* pMsg, int64_t ver) {
  pReader->pMsg = pMsg;
L
Liu Jicong 已提交
164

L
Liu Jicong 已提交
165
  if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
L
Liu Jicong 已提交
166
  while (true) {
L
Liu Jicong 已提交
167 168
    if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) return -1;
    if (pReader->pBlock == NULL) break;
L
Liu Jicong 已提交
169 170
  }

L
Liu Jicong 已提交
171 172 173
  if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
  pReader->ver = ver;
  memset(&pReader->blkIter, 0, sizeof(SSubmitBlkIter));
L
Liu Jicong 已提交
174
  return 0;
L
Liu Jicong 已提交
175 176
}

L
Liu Jicong 已提交
177 178
bool tqNextDataBlock(STqReader* pReader) {
  if (pReader->pMsg == NULL) return false;
L
Liu Jicong 已提交
179
  while (1) {
L
Liu Jicong 已提交
180
    if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) {
L
Liu Jicong 已提交
181 182
      return false;
    }
L
Liu Jicong 已提交
183 184
    if (pReader->pBlock == NULL) {
      pReader->pMsg = NULL;
5
54liuyao 已提交
185 186
      return false;
    }
L
Liu Jicong 已提交
187

L
Liu Jicong 已提交
188
    if (pReader->tbIdHash == NULL) {
L
Liu Jicong 已提交
189 190
      return true;
    }
L
Liu Jicong 已提交
191
    void* ret = taosHashGet(pReader->tbIdHash, &pReader->msgIter.uid, sizeof(int64_t));
S
Shengliang Guan 已提交
192
    /*tqDebug("search uid %" PRId64, pHandle->msgIter.uid);*/
L
Liu Jicong 已提交
193
    if (ret != NULL) {
S
Shengliang Guan 已提交
194
      /*tqDebug("find   uid %" PRId64, pHandle->msgIter.uid);*/
L
Liu Jicong 已提交
195
      return true;
196 197 198 199 200
    }
  }
  return false;
}

L
Liu Jicong 已提交
201
bool tqNextDataBlockFilterOut(STqReader* pHandle, SHashObj* filterOutUids) {
202 203 204 205 206 207 208 209 210 211
  while (1) {
    if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
      return false;
    }
    if (pHandle->pBlock == NULL) return false;

    ASSERT(pHandle->tbIdHash == NULL);
    void* ret = taosHashGet(filterOutUids, &pHandle->msgIter.uid, sizeof(int64_t));
    if (ret == NULL) {
      return true;
L
Liu Jicong 已提交
212 213 214 215 216
    }
  }
  return false;
}

L
Liu Jicong 已提交
217
int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) {
218
  // TODO: cache multiple schema
L
Liu Jicong 已提交
219 220 221 222 223 224
  int32_t sversion = htonl(pReader->pBlock->sversion);
  if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion ||
      pReader->cachedSchemaSuid != pReader->msgIter.suid) {
    if (pReader->pSchema) taosMemoryFree(pReader->pSchema);
    pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion);
    if (pReader->pSchema == NULL) {
S
Shengliang Guan 已提交
225
      tqWarn("cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 "), version %d, possibly dropped table",
L
Liu Jicong 已提交
226
             pReader->msgIter.uid, pReader->msgIter.suid, pReader->cachedSchemaVer);
L
Liu Jicong 已提交
227
      /*ASSERT(0);*/
228
      terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
L
Liu Jicong 已提交
229 230
      return -1;
    }
L
Liu Jicong 已提交
231

L
Liu Jicong 已提交
232 233 234
    if (pReader->pSchemaWrapper) tDeleteSSchemaWrapper(pReader->pSchemaWrapper);
    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion, true);
    if (pReader->pSchemaWrapper == NULL) {
S
Shengliang Guan 已提交
235
      tqWarn("cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
L
Liu Jicong 已提交
236
             pReader->msgIter.uid, pReader->cachedSchemaVer);
L
Liu Jicong 已提交
237 238 239 240
      /*ASSERT(0);*/
      terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
      return -1;
    }
L
Liu Jicong 已提交
241 242
    pReader->cachedSchemaVer = sversion;
    pReader->cachedSchemaSuid = pReader->msgIter.suid;
L
Liu Jicong 已提交
243 244
  }

L
Liu Jicong 已提交
245 246
  STSchema*       pTschema = pReader->pSchema;
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
L
Liu Jicong 已提交
247

L
Liu Jicong 已提交
248
  int32_t colNumNeed = taosArrayGetSize(pReader->pColIdList);
L
Liu Jicong 已提交
249

L
Liu Jicong 已提交
250 251 252 253
  if (colNumNeed == 0) {
    int32_t colMeta = 0;
    while (colMeta < pSchemaWrapper->nCols) {
      SSchema*        pColSchema = &pSchemaWrapper->pSchema[colMeta];
254
      SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
L
Liu Jicong 已提交
255
      int32_t         code = blockDataAppendColInfo(pBlock, &colInfo);
256
      if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
257
        goto FAIL;
L
Liu Jicong 已提交
258
      }
L
Liu Jicong 已提交
259
      colMeta++;
L
Liu Jicong 已提交
260 261 262 263 264 265 266 267 268 269 270
    }
  } else {
    if (colNumNeed > pSchemaWrapper->nCols) {
      colNumNeed = pSchemaWrapper->nCols;
    }

    int32_t colMeta = 0;
    int32_t colNeed = 0;
    while (colMeta < pSchemaWrapper->nCols && colNeed < colNumNeed) {
      SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta];
      col_id_t colIdSchema = pColSchema->colId;
L
Liu Jicong 已提交
271
      col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pReader->pColIdList, colNeed);
L
Liu Jicong 已提交
272 273 274 275 276
      if (colIdSchema < colIdNeed) {
        colMeta++;
      } else if (colIdSchema > colIdNeed) {
        colNeed++;
      } else {
277
        SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
L
Liu Jicong 已提交
278
        int32_t         code = blockDataAppendColInfo(pBlock, &colInfo);
279
        if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
280 281 282 283 284
          goto FAIL;
        }
        colMeta++;
        colNeed++;
      }
L
Liu Jicong 已提交
285 286
    }
  }
L
Liu Jicong 已提交
287

L
Liu Jicong 已提交
288
  if (blockDataEnsureCapacity(pBlock, pReader->msgIter.numOfRows) < 0) {
289 290 291 292
    goto FAIL;
  }

  int32_t colActual = blockDataGetNumOfCols(pBlock);
L
Liu Jicong 已提交
293

L
Liu Jicong 已提交
294 295 296 297
  STSRowIter iter = {0};
  tdSTSRowIterInit(&iter, pTschema);
  STSRow* row;
  int32_t curRow = 0;
298

L
Liu Jicong 已提交
299
  tInitSubmitBlkIter(&pReader->msgIter, pReader->pBlock, &pReader->blkIter);
300

L
Liu Jicong 已提交
301 302
  pBlock->info.uid = pReader->msgIter.uid;
  pBlock->info.rows = pReader->msgIter.numOfRows;
303

L
Liu Jicong 已提交
304
  while ((row = tGetSubmitBlkNext(&pReader->blkIter)) != NULL) {
L
Liu Jicong 已提交
305 306
    tdSTSRowIterReset(&iter, row);
    // get all wanted col of that block
L
Liu Jicong 已提交
307
    for (int32_t i = 0; i < colActual; i++) {
308
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
L
Liu Jicong 已提交
309 310 311 312
      SCellVal         sVal = {0};
      if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) {
        break;
      }
313
      if (colDataAppend(pColData, curRow, sVal.val, sVal.valType != TD_VTYPE_NORM) < 0) {
L
Liu Jicong 已提交
314
        goto FAIL;
L
Liu Jicong 已提交
315
      }
L
Liu Jicong 已提交
316 317 318
    }
    curRow++;
  }
L
Liu Jicong 已提交
319
  return 0;
320

L
Liu Jicong 已提交
321 322
FAIL:
  tDeleteSSDataBlock(pBlock);
L
Liu Jicong 已提交
323
  return -1;
L
Liu Jicong 已提交
324
}
H
Hongze Cheng 已提交
325

L
Liu Jicong 已提交
326
void tqReaderSetColIdList(STqReader* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; }
H
Hongze Cheng 已提交
327

L
Liu Jicong 已提交
328
int tqReaderSetTbUidList(STqReader* pHandle, const SArray* tbUidList) {
H
Hongze Cheng 已提交
329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346
  if (pHandle->tbIdHash) {
    taosHashClear(pHandle->tbIdHash);
  }

  pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
  if (pHandle->tbIdHash == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
    taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
  }

  return 0;
}

L
Liu Jicong 已提交
347
int tqReaderAddTbUidList(STqReader* pHandle, const SArray* tbUidList) {
H
Hongze Cheng 已提交
348 349 350 351 352 353 354 355 356 357 358 359 360 361 362
  if (pHandle->tbIdHash == NULL) {
    pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
    if (pHandle->tbIdHash == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
  }

  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
    taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
  }

  return 0;
}
363

L
Liu Jicong 已提交
364
int tqReaderRemoveTbUidList(STqReader* pHandle, const SArray* tbUidList) {
365 366
  ASSERT(pHandle->tbIdHash != NULL);

L
Liu Jicong 已提交
367 368
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
369 370 371 372 373
    taosHashRemove(pHandle->tbIdHash, pKey, sizeof(int64_t));
  }

  return 0;
}
L
Liu Jicong 已提交
374 375 376 377 378 379 380 381 382

int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
  void* pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pTq->handles, pIter);
    if (pIter == NULL) break;
    STqHandle* pExec = (STqHandle*)pIter;
    if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
      for (int32_t i = 0; i < 5; i++) {
L
Liu Jicong 已提交
383
        int32_t code = qUpdateQualifiedTableId(pExec->execHandle.execCol.task[i], tbUidList, isAdd);
L
Liu Jicong 已提交
384 385 386 387 388 389 390
        ASSERT(code == 0);
      }
    } else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__DB) {
      if (!isAdd) {
        int32_t sz = taosArrayGetSize(tbUidList);
        for (int32_t i = 0; i < sz; i++) {
          int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
L
Liu Jicong 已提交
391
          taosHashPut(pExec->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0);
L
Liu Jicong 已提交
392 393 394 395 396 397 398 399 400
        }
      }
    } else {
      // tq update id
    }
  }
  while (1) {
    pIter = taosHashIterate(pTq->pStreamTasks, pIter);
    if (pIter == NULL) break;
401 402
    SStreamTask* pTask = *(SStreamTask**)pIter;
    if (pTask->isDataScan) {
L
Liu Jicong 已提交
403 404 405 406 407 408
      int32_t code = qUpdateQualifiedTableId(pTask->exec.executor, tbUidList, isAdd);
      ASSERT(code == 0);
    }
  }
  return 0;
}