tqRead.c 12.8 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
L
Liu Jicong 已提交
17

L
Liu Jicong 已提交
18
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) {
L
Liu Jicong 已提交
19 20 21 22 23
  int32_t code = 0;
  taosThreadMutexLock(&pHandle->pWalReader->mutex);
  int64_t offset = *fetchOffset;

  while (1) {
L
Liu Jicong 已提交
24
    if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) {
L
Liu Jicong 已提交
25 26
      tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return",
              pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode), offset);
L
Liu Jicong 已提交
27 28 29 30 31
      *fetchOffset = offset - 1;
      code = -1;
      goto END;
    }

L
Liu Jicong 已提交
32 33
    if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) {
      code = walFetchBody(pHandle->pWalReader, ppCkHead);
L
Liu Jicong 已提交
34 35 36 37 38 39 40 41 42 43 44

      if (code < 0) {
        ASSERT(0);
        *fetchOffset = offset;
        code = -1;
        goto END;
      }
      *fetchOffset = offset;
      code = 0;
      goto END;
    } else {
L
Liu Jicong 已提交
45
      if (pHandle->fetchMeta) {
L
Liu Jicong 已提交
46
        SWalCont* pHead = &((*ppCkHead)->head);
L
Liu Jicong 已提交
47
        if (IS_META_MSG(pHead->msgType)) {
L
Liu Jicong 已提交
48
          code = walFetchBody(pHandle->pWalReader, ppCkHead);
L
Liu Jicong 已提交
49 50 51 52 53 54 55 56 57 58 59 60

          if (code < 0) {
            ASSERT(0);
            *fetchOffset = offset;
            code = -1;
            goto END;
          }
          *fetchOffset = offset;
          code = 0;
          goto END;
        }
      }
L
Liu Jicong 已提交
61
      code = walSkipFetchBody(pHandle->pWalReader, *ppCkHead);
L
Liu Jicong 已提交
62 63 64 65 66 67 68 69 70 71 72 73 74 75
      if (code < 0) {
        ASSERT(0);
        *fetchOffset = offset;
        code = -1;
        goto END;
      }
      offset++;
    }
  }
END:
  taosThreadMutexUnlock(&pHandle->pWalReader->mutex);
  return code;
}

L
Liu Jicong 已提交
76 77 78
STqReader* tqOpenReader(SVnode* pVnode) {
  STqReader* pReader = taosMemoryMalloc(sizeof(STqReader));
  if (pReader == NULL) {
L
Liu Jicong 已提交
79 80
    return NULL;
  }
L
Liu Jicong 已提交
81

L
Liu Jicong 已提交
82 83 84 85
  pReader->pWalReader = walOpenReader(pVnode->pWal, NULL);
  if (pReader->pWalReader == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100

  pReader->pVnodeMeta = pVnode->pMeta;
  pReader->pMsg = NULL;
  pReader->ver = -1;
  pReader->pColIdList = NULL;
  pReader->cachedSchemaVer = 0;
  pReader->cachedSchemaSuid = 0;
  pReader->pSchema = NULL;
  pReader->pSchemaWrapper = NULL;
  pReader->tbIdHash = NULL;
  return pReader;
}

void tqCloseReader(STqReader* pReader) {
  // close wal reader
101 102 103
  if (pReader->pWalReader) {
    walCloseReader(pReader->pWalReader);
  }
L
Liu Jicong 已提交
104
  // free cached schema
105 106 107 108 109 110 111 112 113
  if (pReader->pSchema) {
    taosMemoryFree(pReader->pSchema);
  }
  if (pReader->pSchemaWrapper) {
    tDeleteSSchemaWrapper(pReader->pSchemaWrapper);
  }
  if (pReader->pColIdList) {
    taosArrayDestroy(pReader->pColIdList);
  }
L
Liu Jicong 已提交
114
  // free hash
115
  taosHashCleanup(pReader->tbIdHash);
L
Liu Jicong 已提交
116 117 118
  taosMemoryFree(pReader);
}

L
Liu Jicong 已提交
119
int32_t tqSeekVer(STqReader* pReader, int64_t ver) {
L
Liu Jicong 已提交
120 121 122 123 124 125 126
  if (walReadSeekVer(pReader->pWalReader, ver) < 0) {
    ASSERT(pReader->pWalReader->curInvalid);
    ASSERT(pReader->pWalReader->curVersion == ver);
    return -1;
  }
  ASSERT(pReader->pWalReader->curVersion == ver);
  return 0;
L
Liu Jicong 已提交
127 128
}

L
Liu Jicong 已提交
129 130 131 132 133 134
int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
  bool fromProcessedMsg = pReader->pMsg != NULL;

  while (1) {
    if (!fromProcessedMsg) {
      if (walNextValidMsg(pReader->pWalReader) < 0) {
L
Liu Jicong 已提交
135
        pReader->ver = pReader->pWalReader->curVersion - pReader->pWalReader->curInvalid;
L
Liu Jicong 已提交
136 137
        ret->offset.type = TMQ_OFFSET__LOG;
        ret->offset.version = pReader->ver;
L
Liu Jicong 已提交
138
        ret->fetchType = FETCH_TYPE__NONE;
L
Liu Jicong 已提交
139
        ASSERT(ret->offset.version >= 0);
L
Liu Jicong 已提交
140 141 142 143 144 145 146 147 148 149 150 151 152 153
        return -1;
      }
      void* body = pReader->pWalReader->pHead->head.body;
      if (pReader->pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
        // TODO do filter
        ret->fetchType = FETCH_TYPE__META;
        ret->meta = pReader->pWalReader->pHead->head.body;
        return 0;
      } else {
        tqReaderSetDataMsg(pReader, body, pReader->pWalReader->pHead->head.version);
      }
    }

    while (tqNextDataBlock(pReader)) {
L
Liu Jicong 已提交
154
      // TODO mem free
L
Liu Jicong 已提交
155 156 157
      memset(&ret->data, 0, sizeof(SSDataBlock));
      int32_t code = tqRetrieveDataBlock(&ret->data, pReader);
      if (code != 0 || ret->data.info.rows == 0) {
L
Liu Jicong 已提交
158
        ASSERT(0);
L
Liu Jicong 已提交
159
        continue;
L
Liu Jicong 已提交
160 161 162 163 164 165
      }
      ret->fetchType = FETCH_TYPE__DATA;
      return 0;
    }

    if (fromProcessedMsg) {
L
Liu Jicong 已提交
166 167
      ret->offset.type = TMQ_OFFSET__LOG;
      ret->offset.version = pReader->ver;
L
Liu Jicong 已提交
168
      ASSERT(pReader->ver >= 0);
L
Liu Jicong 已提交
169 170 171 172
      ret->fetchType = FETCH_TYPE__NONE;
      return 0;
    }
  }
L
Liu Jicong 已提交
173 174
}

L
Liu Jicong 已提交
175 176
int32_t tqReaderSetDataMsg(STqReader* pReader, SSubmitReq* pMsg, int64_t ver) {
  pReader->pMsg = pMsg;
L
Liu Jicong 已提交
177

L
Liu Jicong 已提交
178
  if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
L
Liu Jicong 已提交
179
  while (true) {
L
Liu Jicong 已提交
180 181
    if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) return -1;
    if (pReader->pBlock == NULL) break;
L
Liu Jicong 已提交
182 183
  }

L
Liu Jicong 已提交
184 185 186
  if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
  pReader->ver = ver;
  memset(&pReader->blkIter, 0, sizeof(SSubmitBlkIter));
L
Liu Jicong 已提交
187
  return 0;
L
Liu Jicong 已提交
188 189
}

L
Liu Jicong 已提交
190 191
bool tqNextDataBlock(STqReader* pReader) {
  if (pReader->pMsg == NULL) return false;
L
Liu Jicong 已提交
192
  while (1) {
L
Liu Jicong 已提交
193
    if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) {
L
Liu Jicong 已提交
194 195
      return false;
    }
L
Liu Jicong 已提交
196 197
    if (pReader->pBlock == NULL) {
      pReader->pMsg = NULL;
5
54liuyao 已提交
198 199
      return false;
    }
L
Liu Jicong 已提交
200

L
Liu Jicong 已提交
201
    if (pReader->tbIdHash == NULL) {
L
Liu Jicong 已提交
202 203
      return true;
    }
L
Liu Jicong 已提交
204
    void* ret = taosHashGet(pReader->tbIdHash, &pReader->msgIter.uid, sizeof(int64_t));
S
Shengliang Guan 已提交
205
    /*tqDebug("search uid %" PRId64, pHandle->msgIter.uid);*/
L
Liu Jicong 已提交
206
    if (ret != NULL) {
S
Shengliang Guan 已提交
207
      /*tqDebug("find   uid %" PRId64, pHandle->msgIter.uid);*/
L
Liu Jicong 已提交
208
      return true;
209 210 211 212 213
    }
  }
  return false;
}

L
Liu Jicong 已提交
214
bool tqNextDataBlockFilterOut(STqReader* pHandle, SHashObj* filterOutUids) {
215 216 217 218 219 220 221 222 223 224
  while (1) {
    if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
      return false;
    }
    if (pHandle->pBlock == NULL) return false;

    ASSERT(pHandle->tbIdHash == NULL);
    void* ret = taosHashGet(filterOutUids, &pHandle->msgIter.uid, sizeof(int64_t));
    if (ret == NULL) {
      return true;
L
Liu Jicong 已提交
225 226 227 228 229
    }
  }
  return false;
}

L
Liu Jicong 已提交
230
int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) {
231
  // TODO: cache multiple schema
L
Liu Jicong 已提交
232 233 234 235 236 237
  int32_t sversion = htonl(pReader->pBlock->sversion);
  if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion ||
      pReader->cachedSchemaSuid != pReader->msgIter.suid) {
    if (pReader->pSchema) taosMemoryFree(pReader->pSchema);
    pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion);
    if (pReader->pSchema == NULL) {
S
Shengliang Guan 已提交
238
      tqWarn("cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 "), version %d, possibly dropped table",
L
Liu Jicong 已提交
239
             pReader->msgIter.uid, pReader->msgIter.suid, pReader->cachedSchemaVer);
L
Liu Jicong 已提交
240
      /*ASSERT(0);*/
241
      terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
L
Liu Jicong 已提交
242 243
      return -1;
    }
L
Liu Jicong 已提交
244

L
Liu Jicong 已提交
245 246 247
    if (pReader->pSchemaWrapper) tDeleteSSchemaWrapper(pReader->pSchemaWrapper);
    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion, true);
    if (pReader->pSchemaWrapper == NULL) {
S
Shengliang Guan 已提交
248
      tqWarn("cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
L
Liu Jicong 已提交
249
             pReader->msgIter.uid, pReader->cachedSchemaVer);
L
Liu Jicong 已提交
250 251 252 253
      /*ASSERT(0);*/
      terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
      return -1;
    }
L
Liu Jicong 已提交
254 255
    pReader->cachedSchemaVer = sversion;
    pReader->cachedSchemaSuid = pReader->msgIter.suid;
L
Liu Jicong 已提交
256 257
  }

L
Liu Jicong 已提交
258 259
  STSchema*       pTschema = pReader->pSchema;
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
L
Liu Jicong 已提交
260

L
Liu Jicong 已提交
261
  int32_t colNumNeed = taosArrayGetSize(pReader->pColIdList);
L
Liu Jicong 已提交
262

L
Liu Jicong 已提交
263 264 265 266
  if (colNumNeed == 0) {
    int32_t colMeta = 0;
    while (colMeta < pSchemaWrapper->nCols) {
      SSchema*        pColSchema = &pSchemaWrapper->pSchema[colMeta];
267
      SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
L
Liu Jicong 已提交
268
      int32_t         code = blockDataAppendColInfo(pBlock, &colInfo);
269
      if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
270
        goto FAIL;
L
Liu Jicong 已提交
271
      }
L
Liu Jicong 已提交
272
      colMeta++;
L
Liu Jicong 已提交
273 274 275 276 277 278 279 280 281 282 283
    }
  } else {
    if (colNumNeed > pSchemaWrapper->nCols) {
      colNumNeed = pSchemaWrapper->nCols;
    }

    int32_t colMeta = 0;
    int32_t colNeed = 0;
    while (colMeta < pSchemaWrapper->nCols && colNeed < colNumNeed) {
      SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta];
      col_id_t colIdSchema = pColSchema->colId;
L
Liu Jicong 已提交
284
      col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pReader->pColIdList, colNeed);
L
Liu Jicong 已提交
285 286 287 288 289
      if (colIdSchema < colIdNeed) {
        colMeta++;
      } else if (colIdSchema > colIdNeed) {
        colNeed++;
      } else {
290
        SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
L
Liu Jicong 已提交
291
        int32_t         code = blockDataAppendColInfo(pBlock, &colInfo);
292
        if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
293 294 295 296 297
          goto FAIL;
        }
        colMeta++;
        colNeed++;
      }
L
Liu Jicong 已提交
298 299
    }
  }
L
Liu Jicong 已提交
300

L
Liu Jicong 已提交
301
  if (blockDataEnsureCapacity(pBlock, pReader->msgIter.numOfRows) < 0) {
302 303 304 305
    goto FAIL;
  }

  int32_t colActual = blockDataGetNumOfCols(pBlock);
L
Liu Jicong 已提交
306

L
Liu Jicong 已提交
307 308 309 310
  STSRowIter iter = {0};
  tdSTSRowIterInit(&iter, pTschema);
  STSRow* row;
  int32_t curRow = 0;
311

L
Liu Jicong 已提交
312
  tInitSubmitBlkIter(&pReader->msgIter, pReader->pBlock, &pReader->blkIter);
313

L
Liu Jicong 已提交
314 315
  pBlock->info.uid = pReader->msgIter.uid;
  pBlock->info.rows = pReader->msgIter.numOfRows;
316

L
Liu Jicong 已提交
317
  while ((row = tGetSubmitBlkNext(&pReader->blkIter)) != NULL) {
L
Liu Jicong 已提交
318 319
    tdSTSRowIterReset(&iter, row);
    // get all wanted col of that block
L
Liu Jicong 已提交
320
    for (int32_t i = 0; i < colActual; i++) {
321
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
L
Liu Jicong 已提交
322 323 324 325
      SCellVal         sVal = {0};
      if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) {
        break;
      }
326
      if (colDataAppend(pColData, curRow, sVal.val, sVal.valType != TD_VTYPE_NORM) < 0) {
L
Liu Jicong 已提交
327
        goto FAIL;
L
Liu Jicong 已提交
328
      }
L
Liu Jicong 已提交
329 330 331
    }
    curRow++;
  }
L
Liu Jicong 已提交
332
  return 0;
333

L
Liu Jicong 已提交
334
FAIL:
335
  blockDataFreeRes(pBlock);
L
Liu Jicong 已提交
336
  return -1;
L
Liu Jicong 已提交
337
}
H
Hongze Cheng 已提交
338

L
Liu Jicong 已提交
339
void tqReaderSetColIdList(STqReader* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; }
H
Hongze Cheng 已提交
340

L
Liu Jicong 已提交
341
int tqReaderSetTbUidList(STqReader* pHandle, const SArray* tbUidList) {
H
Hongze Cheng 已提交
342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
  if (pHandle->tbIdHash) {
    taosHashClear(pHandle->tbIdHash);
  }

  pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
  if (pHandle->tbIdHash == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
    taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
  }

  return 0;
}

L
Liu Jicong 已提交
360
int tqReaderAddTbUidList(STqReader* pHandle, const SArray* tbUidList) {
H
Hongze Cheng 已提交
361 362 363 364 365 366 367 368 369 370 371 372 373 374 375
  if (pHandle->tbIdHash == NULL) {
    pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
    if (pHandle->tbIdHash == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
  }

  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
    taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
  }

  return 0;
}
376

L
Liu Jicong 已提交
377
int tqReaderRemoveTbUidList(STqReader* pHandle, const SArray* tbUidList) {
378 379
  ASSERT(pHandle->tbIdHash != NULL);

L
Liu Jicong 已提交
380 381
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
382 383 384 385 386
    taosHashRemove(pHandle->tbIdHash, pKey, sizeof(int64_t));
  }

  return 0;
}
L
Liu Jicong 已提交
387 388 389 390 391 392 393 394 395

int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
  void* pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pTq->handles, pIter);
    if (pIter == NULL) break;
    STqHandle* pExec = (STqHandle*)pIter;
    if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
      for (int32_t i = 0; i < 5; i++) {
L
Liu Jicong 已提交
396
        int32_t code = qUpdateQualifiedTableId(pExec->execHandle.execCol.task[i], tbUidList, isAdd);
L
Liu Jicong 已提交
397 398 399 400 401 402 403
        ASSERT(code == 0);
      }
    } else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__DB) {
      if (!isAdd) {
        int32_t sz = taosArrayGetSize(tbUidList);
        for (int32_t i = 0; i < sz; i++) {
          int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
L
Liu Jicong 已提交
404
          taosHashPut(pExec->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0);
L
Liu Jicong 已提交
405 406 407 408 409 410 411 412 413
        }
      }
    } else {
      // tq update id
    }
  }
  while (1) {
    pIter = taosHashIterate(pTq->pStreamTasks, pIter);
    if (pIter == NULL) break;
414 415
    SStreamTask* pTask = *(SStreamTask**)pIter;
    if (pTask->isDataScan) {
L
Liu Jicong 已提交
416 417 418 419 420 421
      int32_t code = qUpdateQualifiedTableId(pTask->exec.executor, tbUidList, isAdd);
      ASSERT(code == 0);
    }
  }
  return 0;
}