tqRead.c 12.2 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
L
Liu Jicong 已提交
17

L
Liu Jicong 已提交
18
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) {
L
Liu Jicong 已提交
19 20 21 22 23
  int32_t code = 0;
  taosThreadMutexLock(&pHandle->pWalReader->mutex);
  int64_t offset = *fetchOffset;

  while (1) {
L
Liu Jicong 已提交
24
    if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) {
L
Liu Jicong 已提交
25 26 27 28 29 30 31
      tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", pHandle->consumerId,
              pHandle->epoch, TD_VID(pTq->pVnode), offset);
      *fetchOffset = offset - 1;
      code = -1;
      goto END;
    }

L
Liu Jicong 已提交
32 33
    if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) {
      code = walFetchBody(pHandle->pWalReader, ppCkHead);
L
Liu Jicong 已提交
34 35 36 37 38 39 40 41 42 43 44

      if (code < 0) {
        ASSERT(0);
        *fetchOffset = offset;
        code = -1;
        goto END;
      }
      *fetchOffset = offset;
      code = 0;
      goto END;
    } else {
L
Liu Jicong 已提交
45
      if (pHandle->fetchMeta) {
L
Liu Jicong 已提交
46
        SWalCont* pHead = &((*ppCkHead)->head);
L
Liu Jicong 已提交
47
        if (IS_META_MSG(pHead->msgType)) {
L
Liu Jicong 已提交
48
          code = walFetchBody(pHandle->pWalReader, ppCkHead);
L
Liu Jicong 已提交
49 50 51 52 53 54 55 56 57 58 59 60

          if (code < 0) {
            ASSERT(0);
            *fetchOffset = offset;
            code = -1;
            goto END;
          }
          *fetchOffset = offset;
          code = 0;
          goto END;
        }
      }
L
Liu Jicong 已提交
61
      code = walSkipFetchBody(pHandle->pWalReader, *ppCkHead);
L
Liu Jicong 已提交
62 63 64 65 66 67 68 69 70 71 72 73 74 75
      if (code < 0) {
        ASSERT(0);
        *fetchOffset = offset;
        code = -1;
        goto END;
      }
      offset++;
    }
  }
END:
  taosThreadMutexUnlock(&pHandle->pWalReader->mutex);
  return code;
}

L
Liu Jicong 已提交
76 77 78
STqReader* tqOpenReader(SVnode* pVnode) {
  STqReader* pReader = taosMemoryMalloc(sizeof(STqReader));
  if (pReader == NULL) {
L
Liu Jicong 已提交
79 80
    return NULL;
  }
L
Liu Jicong 已提交
81

L
Liu Jicong 已提交
82 83 84 85
  pReader->pWalReader = walOpenReader(pVnode->pWal, NULL);
  if (pReader->pWalReader == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105

  pReader->pVnodeMeta = pVnode->pMeta;
  pReader->pMsg = NULL;
  pReader->ver = -1;
  pReader->pColIdList = NULL;
  pReader->cachedSchemaVer = 0;
  pReader->cachedSchemaSuid = 0;
  pReader->pSchema = NULL;
  pReader->pSchemaWrapper = NULL;
  pReader->tbIdHash = NULL;
  return pReader;
}

void tqCloseReader(STqReader* pReader) {
  // close wal reader
  // free cached schema
  // free hash
  taosMemoryFree(pReader);
}

L
Liu Jicong 已提交
106 107 108 109 110
int32_t tqSeekVer(STqReader* pReader, int64_t ver) {
  //
  return walReadSeekVer(pReader->pWalReader, ver);
}

L
Liu Jicong 已提交
111 112 113 114 115 116
int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
  bool fromProcessedMsg = pReader->pMsg != NULL;

  while (1) {
    if (!fromProcessedMsg) {
      if (walNextValidMsg(pReader->pWalReader) < 0) {
L
Liu Jicong 已提交
117 118
        ret->offset.type = TMQ_OFFSET__LOG;
        ret->offset.version = pReader->ver;
L
Liu Jicong 已提交
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
        ret->fetchType = FETCH_TYPE__NONE;
        return -1;
      }
      void* body = pReader->pWalReader->pHead->head.body;
      if (pReader->pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
        // TODO do filter
        ret->fetchType = FETCH_TYPE__META;
        ret->meta = pReader->pWalReader->pHead->head.body;
        return 0;
      } else {
        tqReaderSetDataMsg(pReader, body, pReader->pWalReader->pHead->head.version);
      }
    }

    while (tqNextDataBlock(pReader)) {
      memset(&ret->data, 0, sizeof(SSDataBlock));
      int32_t code = tqRetrieveDataBlock(&ret->data, pReader);
      if (code != 0 || ret->data.info.rows == 0) {
L
Liu Jicong 已提交
137 138
        ASSERT(0);
#if 0
L
Liu Jicong 已提交
139 140 141 142 143 144
        if (fromProcessedMsg) {
          ret->fetchType = FETCH_TYPE__NONE;
          return 0;
        } else {
          break;
        }
L
Liu Jicong 已提交
145
#endif
L
Liu Jicong 已提交
146 147 148 149 150 151
      }
      ret->fetchType = FETCH_TYPE__DATA;
      return 0;
    }

    if (fromProcessedMsg) {
L
Liu Jicong 已提交
152 153
      ret->offset.type = TMQ_OFFSET__LOG;
      ret->offset.version = pReader->ver;
L
Liu Jicong 已提交
154 155 156 157
      ret->fetchType = FETCH_TYPE__NONE;
      return 0;
    }
  }
L
Liu Jicong 已提交
158 159
}

L
Liu Jicong 已提交
160 161
int32_t tqReaderSetDataMsg(STqReader* pReader, SSubmitReq* pMsg, int64_t ver) {
  pReader->pMsg = pMsg;
L
Liu Jicong 已提交
162

L
Liu Jicong 已提交
163
  if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
L
Liu Jicong 已提交
164
  while (true) {
L
Liu Jicong 已提交
165 166
    if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) return -1;
    if (pReader->pBlock == NULL) break;
L
Liu Jicong 已提交
167 168
  }

L
Liu Jicong 已提交
169 170 171
  if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
  pReader->ver = ver;
  memset(&pReader->blkIter, 0, sizeof(SSubmitBlkIter));
L
Liu Jicong 已提交
172
  return 0;
L
Liu Jicong 已提交
173 174
}

L
Liu Jicong 已提交
175 176
bool tqNextDataBlock(STqReader* pReader) {
  if (pReader->pMsg == NULL) return false;
L
Liu Jicong 已提交
177
  while (1) {
L
Liu Jicong 已提交
178
    if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) {
L
Liu Jicong 已提交
179 180
      return false;
    }
L
Liu Jicong 已提交
181 182
    if (pReader->pBlock == NULL) {
      pReader->pMsg = NULL;
5
54liuyao 已提交
183 184
      return false;
    }
L
Liu Jicong 已提交
185

L
Liu Jicong 已提交
186
    if (pReader->tbIdHash == NULL) {
L
Liu Jicong 已提交
187 188
      return true;
    }
L
Liu Jicong 已提交
189
    void* ret = taosHashGet(pReader->tbIdHash, &pReader->msgIter.uid, sizeof(int64_t));
190
    /*tqDebug("search uid %ld", pHandle->msgIter.uid);*/
L
Liu Jicong 已提交
191
    if (ret != NULL) {
192
      /*tqDebug("find   uid %ld", pHandle->msgIter.uid);*/
L
Liu Jicong 已提交
193
      return true;
194 195 196 197 198
    }
  }
  return false;
}

L
Liu Jicong 已提交
199
bool tqNextDataBlockFilterOut(STqReader* pHandle, SHashObj* filterOutUids) {
200 201 202 203 204 205 206 207 208 209
  while (1) {
    if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
      return false;
    }
    if (pHandle->pBlock == NULL) return false;

    ASSERT(pHandle->tbIdHash == NULL);
    void* ret = taosHashGet(filterOutUids, &pHandle->msgIter.uid, sizeof(int64_t));
    if (ret == NULL) {
      return true;
L
Liu Jicong 已提交
210 211 212 213 214
    }
  }
  return false;
}

L
Liu Jicong 已提交
215
int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) {
216
  // TODO: cache multiple schema
L
Liu Jicong 已提交
217 218 219 220 221 222
  int32_t sversion = htonl(pReader->pBlock->sversion);
  if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion ||
      pReader->cachedSchemaSuid != pReader->msgIter.suid) {
    if (pReader->pSchema) taosMemoryFree(pReader->pSchema);
    pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion);
    if (pReader->pSchema == NULL) {
L
Liu Jicong 已提交
223
      tqWarn("cannot found tsschema for table: uid: %ld (suid: %ld), version %d, possibly dropped table",
L
Liu Jicong 已提交
224
             pReader->msgIter.uid, pReader->msgIter.suid, pReader->cachedSchemaVer);
L
Liu Jicong 已提交
225
      /*ASSERT(0);*/
226
      terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
L
Liu Jicong 已提交
227 228
      return -1;
    }
L
Liu Jicong 已提交
229

L
Liu Jicong 已提交
230 231 232
    if (pReader->pSchemaWrapper) tDeleteSSchemaWrapper(pReader->pSchemaWrapper);
    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion, true);
    if (pReader->pSchemaWrapper == NULL) {
L
Liu Jicong 已提交
233
      tqWarn("cannot found schema wrapper for table: suid: %ld, version %d, possibly dropped table",
L
Liu Jicong 已提交
234
             pReader->msgIter.uid, pReader->cachedSchemaVer);
L
Liu Jicong 已提交
235 236 237 238
      /*ASSERT(0);*/
      terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
      return -1;
    }
L
Liu Jicong 已提交
239 240
    pReader->cachedSchemaVer = sversion;
    pReader->cachedSchemaSuid = pReader->msgIter.suid;
L
Liu Jicong 已提交
241 242
  }

L
Liu Jicong 已提交
243 244
  STSchema*       pTschema = pReader->pSchema;
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
L
Liu Jicong 已提交
245

L
Liu Jicong 已提交
246
  int32_t colNumNeed = taosArrayGetSize(pReader->pColIdList);
L
Liu Jicong 已提交
247

L
Liu Jicong 已提交
248 249 250 251
  if (colNumNeed == 0) {
    int32_t colMeta = 0;
    while (colMeta < pSchemaWrapper->nCols) {
      SSchema*        pColSchema = &pSchemaWrapper->pSchema[colMeta];
252
      SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
L
Liu Jicong 已提交
253
      int32_t         code = blockDataAppendColInfo(pBlock, &colInfo);
254
      if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
255
        goto FAIL;
L
Liu Jicong 已提交
256
      }
L
Liu Jicong 已提交
257
      colMeta++;
L
Liu Jicong 已提交
258 259 260 261 262 263 264 265 266 267 268
    }
  } else {
    if (colNumNeed > pSchemaWrapper->nCols) {
      colNumNeed = pSchemaWrapper->nCols;
    }

    int32_t colMeta = 0;
    int32_t colNeed = 0;
    while (colMeta < pSchemaWrapper->nCols && colNeed < colNumNeed) {
      SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta];
      col_id_t colIdSchema = pColSchema->colId;
L
Liu Jicong 已提交
269
      col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pReader->pColIdList, colNeed);
L
Liu Jicong 已提交
270 271 272 273 274
      if (colIdSchema < colIdNeed) {
        colMeta++;
      } else if (colIdSchema > colIdNeed) {
        colNeed++;
      } else {
275
        SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
L
Liu Jicong 已提交
276
        int32_t         code = blockDataAppendColInfo(pBlock, &colInfo);
277
        if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
278 279 280 281 282
          goto FAIL;
        }
        colMeta++;
        colNeed++;
      }
L
Liu Jicong 已提交
283 284
    }
  }
L
Liu Jicong 已提交
285

L
Liu Jicong 已提交
286
  if (blockDataEnsureCapacity(pBlock, pReader->msgIter.numOfRows) < 0) {
287 288 289 290
    goto FAIL;
  }

  int32_t colActual = blockDataGetNumOfCols(pBlock);
L
Liu Jicong 已提交
291

L
Liu Jicong 已提交
292 293 294 295
  STSRowIter iter = {0};
  tdSTSRowIterInit(&iter, pTschema);
  STSRow* row;
  int32_t curRow = 0;
296

L
Liu Jicong 已提交
297
  tInitSubmitBlkIter(&pReader->msgIter, pReader->pBlock, &pReader->blkIter);
298

L
Liu Jicong 已提交
299 300
  pBlock->info.uid = pReader->msgIter.uid;
  pBlock->info.rows = pReader->msgIter.numOfRows;
301

L
Liu Jicong 已提交
302
  while ((row = tGetSubmitBlkNext(&pReader->blkIter)) != NULL) {
L
Liu Jicong 已提交
303 304
    tdSTSRowIterReset(&iter, row);
    // get all wanted col of that block
L
Liu Jicong 已提交
305
    for (int32_t i = 0; i < colActual; i++) {
306
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
L
Liu Jicong 已提交
307 308 309 310
      SCellVal         sVal = {0};
      if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) {
        break;
      }
311
      if (colDataAppend(pColData, curRow, sVal.val, sVal.valType != TD_VTYPE_NORM) < 0) {
L
Liu Jicong 已提交
312
        goto FAIL;
L
Liu Jicong 已提交
313
      }
L
Liu Jicong 已提交
314 315 316
    }
    curRow++;
  }
L
Liu Jicong 已提交
317
  return 0;
318

L
Liu Jicong 已提交
319 320
FAIL:
  tDeleteSSDataBlock(pBlock);
L
Liu Jicong 已提交
321
  return -1;
L
Liu Jicong 已提交
322
}
H
Hongze Cheng 已提交
323

L
Liu Jicong 已提交
324
void tqReaderSetColIdList(STqReader* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; }
H
Hongze Cheng 已提交
325

L
Liu Jicong 已提交
326
int tqReaderSetTbUidList(STqReader* pHandle, const SArray* tbUidList) {
H
Hongze Cheng 已提交
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344
  if (pHandle->tbIdHash) {
    taosHashClear(pHandle->tbIdHash);
  }

  pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
  if (pHandle->tbIdHash == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
    taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
  }

  return 0;
}

L
Liu Jicong 已提交
345
int tqReaderAddTbUidList(STqReader* pHandle, const SArray* tbUidList) {
H
Hongze Cheng 已提交
346 347 348 349 350 351 352 353 354 355 356 357 358 359 360
  if (pHandle->tbIdHash == NULL) {
    pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
    if (pHandle->tbIdHash == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
  }

  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
    taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
  }

  return 0;
}
361

L
Liu Jicong 已提交
362
int tqReaderRemoveTbUidList(STqReader* pHandle, const SArray* tbUidList) {
363 364
  ASSERT(pHandle->tbIdHash != NULL);

L
Liu Jicong 已提交
365 366
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
367 368 369 370 371
    taosHashRemove(pHandle->tbIdHash, pKey, sizeof(int64_t));
  }

  return 0;
}
L
Liu Jicong 已提交
372 373 374 375 376 377 378 379 380

int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
  void* pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pTq->handles, pIter);
    if (pIter == NULL) break;
    STqHandle* pExec = (STqHandle*)pIter;
    if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
      for (int32_t i = 0; i < 5; i++) {
L
Liu Jicong 已提交
381
        int32_t code = qUpdateQualifiedTableId(pExec->execHandle.execCol.task[i], tbUidList, isAdd);
L
Liu Jicong 已提交
382 383 384 385 386 387 388
        ASSERT(code == 0);
      }
    } else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__DB) {
      if (!isAdd) {
        int32_t sz = taosArrayGetSize(tbUidList);
        for (int32_t i = 0; i < sz; i++) {
          int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
L
Liu Jicong 已提交
389
          taosHashPut(pExec->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0);
L
Liu Jicong 已提交
390 391 392 393 394 395 396 397 398
        }
      }
    } else {
      // tq update id
    }
  }
  while (1) {
    pIter = taosHashIterate(pTq->pStreamTasks, pIter);
    if (pIter == NULL) break;
399 400
    SStreamTask* pTask = *(SStreamTask**)pIter;
    if (pTask->isDataScan) {
L
Liu Jicong 已提交
401 402 403 404 405 406
      int32_t code = qUpdateQualifiedTableId(pTask->exec.executor, tbUidList, isAdd);
      ASSERT(code == 0);
    }
  }
  return 0;
}