tqRead.c 10.6 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Hongze Cheng 已提交
16
#include "tq.h"
L
Liu Jicong 已提交
17

L
Liu Jicong 已提交
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead** ppHeadWithCkSum) {
  int32_t code = 0;
  taosThreadMutexLock(&pHandle->pWalReader->mutex);
  int64_t offset = *fetchOffset;

  while (1) {
    if (walFetchHead(pHandle->pWalReader, offset, *ppHeadWithCkSum) < 0) {
      tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", pHandle->consumerId,
              pHandle->epoch, TD_VID(pTq->pVnode), offset);
      *fetchOffset = offset - 1;
      code = -1;
      goto END;
    }

    if ((*ppHeadWithCkSum)->head.msgType == TDMT_VND_SUBMIT) {
      code = walFetchBody(pHandle->pWalReader, ppHeadWithCkSum);

      if (code < 0) {
        ASSERT(0);
        *fetchOffset = offset;
        code = -1;
        goto END;
      }
      *fetchOffset = offset;
      code = 0;
      goto END;
    } else {
L
Liu Jicong 已提交
45 46
      if (pHandle->fetchMeta) {
        SWalReadHead* pHead = &((*ppHeadWithCkSum)->head);
L
Liu Jicong 已提交
47
        if (IS_META_MSG(pHead->msgType)) {
L
Liu Jicong 已提交
48 49 50 51 52 53 54 55 56 57 58 59 60
          code = walFetchBody(pHandle->pWalReader, ppHeadWithCkSum);

          if (code < 0) {
            ASSERT(0);
            *fetchOffset = offset;
            code = -1;
            goto END;
          }
          *fetchOffset = offset;
          code = 0;
          goto END;
        }
      }
L
Liu Jicong 已提交
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
      code = walSkipFetchBody(pHandle->pWalReader, *ppHeadWithCkSum);
      if (code < 0) {
        ASSERT(0);
        *fetchOffset = offset;
        code = -1;
        goto END;
      }
      offset++;
    }
  }
END:
  taosThreadMutexUnlock(&pHandle->pWalReader->mutex);
  return code;
}

L
Liu Jicong 已提交
76 77
SStreamReader* tqInitSubmitMsgScanner(SMeta* pMeta) {
  SStreamReader* pReadHandle = taosMemoryMalloc(sizeof(SStreamReader));
L
Liu Jicong 已提交
78 79 80 81 82 83 84
  if (pReadHandle == NULL) {
    return NULL;
  }
  pReadHandle->pVnodeMeta = pMeta;
  pReadHandle->pMsg = NULL;
  pReadHandle->ver = -1;
  pReadHandle->pColIdList = NULL;
L
Liu Jicong 已提交
85 86
  pReadHandle->cachedSchemaVer = 0;
  pReadHandle->cachedSchemaSuid = 0;
L
Liu Jicong 已提交
87 88
  pReadHandle->pSchema = NULL;
  pReadHandle->pSchemaWrapper = NULL;
L
Liu Jicong 已提交
89
  pReadHandle->tbIdHash = NULL;
L
Liu Jicong 已提交
90 91 92
  return pReadHandle;
}

L
Liu Jicong 已提交
93
int32_t tqReadHandleSetMsg(SStreamReader* pReadHandle, SSubmitReq* pMsg, int64_t ver) {
L
Liu Jicong 已提交
94
  pReadHandle->pMsg = pMsg;
L
Liu Jicong 已提交
95

C
Cary Xu 已提交
96
  if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1;
L
Liu Jicong 已提交
97
  while (true) {
C
Cary Xu 已提交
98
    if (tGetSubmitMsgNext(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1;
L
Liu Jicong 已提交
99 100 101
    if (pReadHandle->pBlock == NULL) break;
  }

C
Cary Xu 已提交
102
  if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1;
L
Liu Jicong 已提交
103 104
  pReadHandle->ver = ver;
  memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter));
L
Liu Jicong 已提交
105
  return 0;
L
Liu Jicong 已提交
106 107
}

L
Liu Jicong 已提交
108
bool tqNextDataBlock(SStreamReader* pHandle) {
5
54liuyao 已提交
109
  if (pHandle->pMsg == NULL) return false;
L
Liu Jicong 已提交
110
  while (1) {
C
Cary Xu 已提交
111
    if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
L
Liu Jicong 已提交
112 113
      return false;
    }
5
54liuyao 已提交
114 115 116 117
    if (pHandle->pBlock == NULL) {
      pHandle->pMsg = NULL;
      return false;
    }
L
Liu Jicong 已提交
118

L
Liu Jicong 已提交
119 120 121
    if (pHandle->tbIdHash == NULL) {
      return true;
    }
C
Cary Xu 已提交
122
    void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->msgIter.uid, sizeof(int64_t));
L
Liu Jicong 已提交
123 124
    if (ret != NULL) {
      return true;
125 126 127 128 129
    }
  }
  return false;
}

L
Liu Jicong 已提交
130
bool tqNextDataBlockFilterOut(SStreamReader* pHandle, SHashObj* filterOutUids) {
131 132 133 134 135 136 137 138 139 140
  while (1) {
    if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
      return false;
    }
    if (pHandle->pBlock == NULL) return false;

    ASSERT(pHandle->tbIdHash == NULL);
    void* ret = taosHashGet(filterOutUids, &pHandle->msgIter.uid, sizeof(int64_t));
    if (ret == NULL) {
      return true;
L
Liu Jicong 已提交
141 142 143 144 145
    }
  }
  return false;
}

L
Liu Jicong 已提交
146
int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, SStreamReader* pHandle) {
147
  // TODO: cache multiple schema
148
  int32_t sversion = htonl(pHandle->pBlock->sversion);
L
Liu Jicong 已提交
149 150
  if (pHandle->cachedSchemaSuid == 0 || pHandle->cachedSchemaVer != sversion ||
      pHandle->cachedSchemaSuid != pHandle->msgIter.suid) {
L
Liu Jicong 已提交
151
    if (pHandle->pSchema) taosMemoryFree(pHandle->pSchema);
C
Cary Xu 已提交
152
    pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion);
L
Liu Jicong 已提交
153
    if (pHandle->pSchema == NULL) {
L
Liu Jicong 已提交
154
      tqWarn("cannot found tsschema for table: uid: %ld (suid: %ld), version %d, possibly dropped table",
L
Liu Jicong 已提交
155
             pHandle->msgIter.uid, pHandle->msgIter.suid, pHandle->cachedSchemaVer);
L
Liu Jicong 已提交
156
      /*ASSERT(0);*/
157
      terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
L
Liu Jicong 已提交
158 159
      return -1;
    }
L
Liu Jicong 已提交
160

L
Liu Jicong 已提交
161
    if (pHandle->pSchemaWrapper) tDeleteSSchemaWrapper(pHandle->pSchemaWrapper);
H
Hongze Cheng 已提交
162
    pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion, true);
L
Liu Jicong 已提交
163 164
    if (pHandle->pSchemaWrapper == NULL) {
      tqWarn("cannot found schema wrapper for table: suid: %ld, version %d, possibly dropped table",
H
Hongze Cheng 已提交
165
             pHandle->msgIter.uid, pHandle->cachedSchemaVer);
L
Liu Jicong 已提交
166 167 168 169
      /*ASSERT(0);*/
      terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
      return -1;
    }
L
Liu Jicong 已提交
170
    pHandle->cachedSchemaVer = sversion;
L
Liu Jicong 已提交
171
    pHandle->cachedSchemaSuid = pHandle->msgIter.suid;
L
Liu Jicong 已提交
172 173 174 175 176 177 178
  }

  STSchema*       pTschema = pHandle->pSchema;
  SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper;

  int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList);

L
Liu Jicong 已提交
179 180 181 182
  if (colNumNeed == 0) {
    int32_t colMeta = 0;
    while (colMeta < pSchemaWrapper->nCols) {
      SSchema*        pColSchema = &pSchemaWrapper->pSchema[colMeta];
183
      SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
L
Liu Jicong 已提交
184
      int32_t         code = blockDataAppendColInfo(pBlock, &colInfo);
185
      if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
186
        goto FAIL;
L
Liu Jicong 已提交
187
      }
L
Liu Jicong 已提交
188
      colMeta++;
L
Liu Jicong 已提交
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
    }
  } else {
    if (colNumNeed > pSchemaWrapper->nCols) {
      colNumNeed = pSchemaWrapper->nCols;
    }

    int32_t colMeta = 0;
    int32_t colNeed = 0;
    while (colMeta < pSchemaWrapper->nCols && colNeed < colNumNeed) {
      SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta];
      col_id_t colIdSchema = pColSchema->colId;
      col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pHandle->pColIdList, colNeed);
      if (colIdSchema < colIdNeed) {
        colMeta++;
      } else if (colIdSchema > colIdNeed) {
        colNeed++;
      } else {
206
        SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
L
Liu Jicong 已提交
207
        int32_t         code = blockDataAppendColInfo(pBlock, &colInfo);
208
        if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
209 210 211 212 213
          goto FAIL;
        }
        colMeta++;
        colNeed++;
      }
L
Liu Jicong 已提交
214 215
    }
  }
L
Liu Jicong 已提交
216

217
  if (blockDataEnsureCapacity(pBlock, pHandle->msgIter.numOfRows) < 0) {
218 219 220 221
    goto FAIL;
  }

  int32_t colActual = blockDataGetNumOfCols(pBlock);
L
Liu Jicong 已提交
222

L
Liu Jicong 已提交
223 224 225 226
  STSRowIter iter = {0};
  tdSTSRowIterInit(&iter, pTschema);
  STSRow* row;
  int32_t curRow = 0;
227

C
Cary Xu 已提交
228
  tInitSubmitBlkIter(&pHandle->msgIter, pHandle->pBlock, &pHandle->blkIter);
229 230

  pBlock->info.groupId = 0;
L
Liu Jicong 已提交
231
  pBlock->info.uid = pHandle->msgIter.uid;
232
  pBlock->info.rows = pHandle->msgIter.numOfRows;
233

C
Cary Xu 已提交
234
  while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
L
Liu Jicong 已提交
235 236
    tdSTSRowIterReset(&iter, row);
    // get all wanted col of that block
L
Liu Jicong 已提交
237
    for (int32_t i = 0; i < colActual; i++) {
238
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
L
Liu Jicong 已提交
239 240 241 242
      SCellVal         sVal = {0};
      if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) {
        break;
      }
243
      if (colDataAppend(pColData, curRow, sVal.val, sVal.valType != TD_VTYPE_NORM) < 0) {
L
Liu Jicong 已提交
244
        goto FAIL;
L
Liu Jicong 已提交
245
      }
L
Liu Jicong 已提交
246 247 248
    }
    curRow++;
  }
L
Liu Jicong 已提交
249
  return 0;
250

L
Liu Jicong 已提交
251 252
FAIL:
  tDeleteSSDataBlock(pBlock);
L
Liu Jicong 已提交
253
  return -1;
L
Liu Jicong 已提交
254
}
H
Hongze Cheng 已提交
255

L
Liu Jicong 已提交
256
void tqReadHandleSetColIdList(SStreamReader* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; }
H
Hongze Cheng 已提交
257

L
Liu Jicong 已提交
258
int tqReadHandleSetTbUidList(SStreamReader* pHandle, const SArray* tbUidList) {
H
Hongze Cheng 已提交
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
  if (pHandle->tbIdHash) {
    taosHashClear(pHandle->tbIdHash);
  }

  pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
  if (pHandle->tbIdHash == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
    taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
  }

  return 0;
}

L
Liu Jicong 已提交
277
int tqReadHandleAddTbUidList(SStreamReader* pHandle, const SArray* tbUidList) {
H
Hongze Cheng 已提交
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
  if (pHandle->tbIdHash == NULL) {
    pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
    if (pHandle->tbIdHash == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
  }

  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
    taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
  }

  return 0;
}
293

L
Liu Jicong 已提交
294
int tqReadHandleRemoveTbUidList(SStreamReader* pHandle, const SArray* tbUidList) {
295 296
  ASSERT(pHandle->tbIdHash != NULL);

L
Liu Jicong 已提交
297 298
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
299 300 301 302 303
    taosHashRemove(pHandle->tbIdHash, pKey, sizeof(int64_t));
  }

  return 0;
}
L
Liu Jicong 已提交
304 305 306 307 308 309 310 311 312

int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
  void* pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pTq->handles, pIter);
    if (pIter == NULL) break;
    STqHandle* pExec = (STqHandle*)pIter;
    if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
      for (int32_t i = 0; i < 5; i++) {
L
Liu Jicong 已提交
313
        int32_t code = qUpdateQualifiedTableId(pExec->execHandle.execCol.task[i], tbUidList, isAdd);
L
Liu Jicong 已提交
314 315 316 317 318 319 320
        ASSERT(code == 0);
      }
    } else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__DB) {
      if (!isAdd) {
        int32_t sz = taosArrayGetSize(tbUidList);
        for (int32_t i = 0; i < sz; i++) {
          int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
L
Liu Jicong 已提交
321
          taosHashPut(pExec->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0);
L
Liu Jicong 已提交
322 323 324 325 326 327 328 329 330
        }
      }
    } else {
      // tq update id
    }
  }
  while (1) {
    pIter = taosHashIterate(pTq->pStreamTasks, pIter);
    if (pIter == NULL) break;
331 332
    SStreamTask* pTask = *(SStreamTask**)pIter;
    if (pTask->isDataScan) {
L
Liu Jicong 已提交
333 334 335 336 337 338
      int32_t code = qUpdateQualifiedTableId(pTask->exec.executor, tbUidList, isAdd);
      ASSERT(code == 0);
    }
  }
  return 0;
}