tqUtil.c 15.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tq.h"

18 19
#define IS_OFFSET_RESET_TYPE(_t)  ((_t) < 0)

dengyihao's avatar
dengyihao 已提交
20 21
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
                                 const SMqMetaRsp* pRsp, int32_t vgId);
22

23 24
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
  char buf[128] = {0};
H
Haojun Liao 已提交
25
  sprintf(buf, "0x%" PRIx64 "-0x%x", streamId, taskId);
26 27 28
  return taosStrdup(buf);
}

29
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem) {
30
  int32_t code = tAppendDataToInputQueue(pTask, pQueueItem);
31
  if (code < 0) {
H
Haojun Liao 已提交
32
    tqError("s-task:%s failed to put into queue, too many", pTask->id.idStr);
33 34 35 36 37 38 39 40 41 42 43
    return -1;
  }

  if (streamSchedExec(pTask) < 0) {
    tqError("stream task:%d failed to be launched, code:%s", pTask->id.taskId, tstrerror(terrno));
    return -1;
  }

  return TSDB_CODE_SUCCESS;
}

44
int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq) {
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
  pRsp->reqOffset = pReq->reqOffset;

  pRsp->blockData = taosArrayInit(0, sizeof(void*));
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));

  if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL) {
    return -1;
  }

  pRsp->withTbName = 0;
  pRsp->withSchema = false;
  return 0;
}

static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) {
  pRsp->reqOffset = pReq->reqOffset;

  pRsp->withTbName = 1;
  pRsp->withSchema = 1;
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
  pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
  pRsp->blockSchema = taosArrayInit(0, sizeof(void*));

  if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL || pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) {
H
Haojun Liao 已提交
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
    if (pRsp->blockData != NULL) {
      pRsp->blockData = taosArrayDestroy(pRsp->blockData);
    }

    if (pRsp->blockDataLen != NULL) {
      pRsp->blockDataLen = taosArrayDestroy(pRsp->blockDataLen);
    }

    if (pRsp->blockTbName != NULL) {
      pRsp->blockTbName = taosArrayDestroy(pRsp->blockTbName);
    }

    if (pRsp->blockSchema != NULL) {
      pRsp->blockSchema = taosArrayDestroy(pRsp->blockSchema);
    }
85 86 87 88 89 90
    return -1;
  }

  return 0;
}

91
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, bool* pBlockReturned) {
92 93 94 95 96
  uint64_t     consumerId = pRequest->consumerId;
  STqOffsetVal reqOffset = pRequest->reqOffset;
  STqOffset*   pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey);
  int32_t      vgId = TD_VID(pTq->pVnode);

97
  *pBlockReturned = false;
98 99 100 101
  // In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
  if (pOffset != NULL) {
    *pOffsetVal = pOffset->val;

wmmhello's avatar
wmmhello 已提交
102
    char formatBuf[TSDB_OFFSET_LEN] = {0};
103
    tFormatOffset(formatBuf, TSDB_OFFSET_LEN, pOffsetVal);
dengyihao's avatar
dengyihao 已提交
104 105
    tqDebug("tmq poll: consumer:0x%" PRIx64
            ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue. reqId:0x%" PRIx64,
106 107 108 109
            consumerId, pHandle->subKey, vgId, formatBuf, pRequest->reqId);
    return 0;
  } else {
    // no poll occurs in this vnode for this topic, let's seek to the right offset value.
110
    if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
111 112 113 114 115 116 117 118 119 120
      if (pRequest->useSnapshot) {
        tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
                consumerId, pHandle->subKey, vgId);

        if (pHandle->fetchMeta) {
          tqOffsetResetToMeta(pOffsetVal, 0);
        } else {
          tqOffsetResetToData(pOffsetVal, 0, 0);
        }
      } else {
121
        walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
122
        tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer);
123 124
      }
    } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
125
      walRefLastVer(pTq->pVnode->pWal, pHandle->pRef);
126 127
      if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
        SMqDataRsp dataRsp = {0};
128
        tqInitDataRsp(&dataRsp, pRequest);
129

130
        tqOffsetResetToLog(&dataRsp.rspOffset, pHandle->pRef->refVer + 1);
131 132
        tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
                pHandle->subKey, vgId, dataRsp.rspOffset.version);
133 134
        int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId);
        tDeleteMqDataRsp(&dataRsp);
135 136 137 138 139 140

        *pBlockReturned = true;
        return code;
      } else {
        STaosxRsp taosxRsp = {0};
        tqInitTaosxRsp(&taosxRsp, pRequest);
141
        tqOffsetResetToLog(&taosxRsp.rspOffset, pHandle->pRef->refVer + 1);
142
        int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
143 144 145 146
        tDeleteSTaosxRsp(&taosxRsp);

        *pBlockReturned = true;
        return code;
wmmhello's avatar
wmmhello 已提交
147
      }
148
    } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
dengyihao's avatar
dengyihao 已提交
149 150
      tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64
              " in vg %d, subkey %s, reset none failed",
151 152 153 154 155 156 157 158 159
              pHandle->subKey, consumerId, vgId, pRequest->subKey);
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
      return -1;
    }
  }

  return 0;
}

160 161 162 163 164 165
static void setRequestVersion(STqOffsetVal* offset, int64_t ver){
  if(offset->type == TMQ_OFFSET__LOG){
    offset->version = ver + 1;
  }
}

166 167 168 169
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
                                                   SRpcMsg* pMsg, STqOffsetVal* pOffset) {
  uint64_t consumerId = pRequest->consumerId;
  int32_t  vgId = TD_VID(pTq->pVnode);
wmmhello's avatar
wmmhello 已提交
170
  terrno        = 0;
171 172

  SMqDataRsp dataRsp = {0};
H
Haojun Liao 已提交
173
  tqInitDataRsp(&dataRsp, pRequest);
174
  dataRsp.reqOffset.type = pOffset->type; // stroe origin type for getting offset in tmq_get_vgroup_offset
175 176

  qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
177
  int code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
H
Haojun Liao 已提交
178
  if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) {
179 180 181
    goto end;
  }

182
  //   till now, all data has been transferred to consumer, new data needs to push client once arrived.
183
  if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST && dataRsp.blockNum == 0) {
184 185
    // lock
    taosWLockLatch(&pTq->lock);
Y
yihaoDeng 已提交
186
    int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
H
Haojun Liao 已提交
187 188
    if (pOffset->version >= ver ||
        dataRsp.rspOffset.version >= ver) {  // check if there are data again to avoid lost data
189 190 191 192
      code = tqRegisterPushHandle(pTq, pHandle, pMsg);
      taosWUnLockLatch(&pTq->lock);
      goto end;
    }
193
    taosWUnLockLatch(&pTq->lock);
194
  }
195
  setRequestVersion(&dataRsp.reqOffset, pOffset->version);
H
Haojun Liao 已提交
196
  code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId);
197

dengyihao's avatar
dengyihao 已提交
198
end : {
199 200
  char buf[TSDB_OFFSET_LEN] = {0};
  tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.rspOffset);
dengyihao's avatar
dengyihao 已提交
201 202 203 204
  tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64
          " code:%d",
          consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
  tDeleteMqDataRsp(&dataRsp);
205
  return code;
H
Haojun Liao 已提交
206
  }
207 208
}

dengyihao's avatar
dengyihao 已提交
209 210 211 212 213 214 215
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
                                                  SRpcMsg* pMsg, STqOffsetVal* offset) {
  int         code = 0;
  int32_t     vgId = TD_VID(pTq->pVnode);
  SWalCkHead* pCkHead = NULL;
  SMqMetaRsp  metaRsp = {0};
  STaosxRsp   taosxRsp = {0};
216
  tqInitTaosxRsp(&taosxRsp, pRequest);
217
  taosxRsp.reqOffset.type = offset->type;   // store origin type for getting offset in tmq_get_vgroup_offset
218 219 220

  if (offset->type != TMQ_OFFSET__LOG) {
    if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) {
221 222
      code = -1;
      goto end;
223 224 225
    }

    if (metaRsp.metaRspLen > 0) {
226
      code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId);
227 228 229 230
      tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64
              ",ts:%" PRId64,
              pRequest->consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
              metaRsp.rspOffset.ts);
231
      taosMemoryFree(metaRsp.metaRsp);
232
      goto end;
233 234 235
    }

    tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64
236 237 238
            ",ts:%" PRId64,
            pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type,
            taosxRsp.rspOffset.uid, taosxRsp.rspOffset.ts);
239
    if (taosxRsp.blockNum > 0) {
240
      code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
241
      goto end;
242
    } else {
243 244 245 246 247
      *offset = taosxRsp.rspOffset;
    }
  }

  if (offset->type == TMQ_OFFSET__LOG) {
248
    walReaderVerifyOffset(pHandle->pWalReader, offset);
249
    int64_t fetchVer = offset->version;
250 251 252
    pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
    if (pCkHead == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
253 254
      code = -1;
      goto end;
255
    }
256

257 258 259 260 261 262
    walSetReaderCapacity(pHandle->pWalReader, 2048);
    int totalRows = 0;
    while (1) {
      int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
      if (savedEpoch > pRequest->epoch) {
        tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64
263 264
               ", found new consumer epoch %d, discard req epoch %d",
               pRequest->consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch);
265 266 267 268 269
        break;
      }

      if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
        tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
270
        setRequestVersion(&taosxRsp.reqOffset, offset->version);
271
        code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
272
        goto end;
273 274 275
      }

      SWalCont* pHead = &pCkHead->head;
dengyihao's avatar
dengyihao 已提交
276 277
      tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d",
              pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType);
278 279 280

      // process meta
      if (pHead->msgType != TDMT_VND_SUBMIT) {
dengyihao's avatar
dengyihao 已提交
281
        if (totalRows > 0) {
282
          tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
283
          setRequestVersion(&taosxRsp.reqOffset, offset->version);
284
          code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
285
          goto end;
286 287 288
        }

        tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType));
289
        tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1);
290 291 292
        metaRsp.resMsgType = pHead->msgType;
        metaRsp.metaRspLen = pHead->bodyLen;
        metaRsp.metaRsp = pHead->body;
293
        code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId);
294
        goto end;
295 296 297 298 299 300 301 302 303
      }

      // process data
      SPackedData submit = {
          .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)),
          .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
          .ver = pHead->version,
      };

304 305
      code = tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows);
      if (code < 0) {
dengyihao's avatar
dengyihao 已提交
306 307
        tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId,
                pRequest->subKey);
308
        goto end;
309 310 311
      }

      if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
312
        tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
313
        setRequestVersion(&taosxRsp.reqOffset, offset->version);
314
        code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
315
        goto end;
316 317 318 319 320 321
      } else {
        fetchVer++;
      }
    }
  }

322 323
end:

324 325
  tDeleteSTaosxRsp(&taosxRsp);
  taosMemoryFreeClear(pCkHead);
326
  return code;
327 328 329 330 331 332 333 334 335 336
}

int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
  int32_t      code = -1;
  STqOffsetVal offset = {0};
  STqOffsetVal reqOffset = pRequest->reqOffset;

  // 1. reset the offset if needed
  if (IS_OFFSET_RESET_TYPE(reqOffset.type)) {
    // handle the reset offset cases, according to the consumer's choice.
337 338
    bool blockReturned = false;
    code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest, pMsg, &blockReturned);
339 340 341 342
    if (code != 0) {
      return code;
    }

343 344 345 346
    // empty block returned, quit
    if (blockReturned) {
      return 0;
    }
dengyihao's avatar
dengyihao 已提交
347
  } else {  // use the consumer specified offset
348 349 350 351 352 353 354
    // the offset value can not be monotonious increase??
    offset = reqOffset;
  }

  // this is a normal subscribe requirement
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
    return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset);
dengyihao's avatar
dengyihao 已提交
355
  } else {  // todo handle the case where re-balance occurs.
356 357
    // for taosx
    return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &offset);
358 359 360
  }
}

361 362 363 364 365 366 367 368 369
static void initMqRspHead(SMqRspHead* pMsgHead, int32_t type, int32_t epoch, int64_t consumerId, int64_t sver,
                          int64_t ever) {
  pMsgHead->consumerId = consumerId;
  pMsgHead->epoch = epoch;
  pMsgHead->mqMsgType = type;
  pMsgHead->walsver = sver;
  pMsgHead->walever = ever;
}

dengyihao's avatar
dengyihao 已提交
370 371
int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp,
                          int32_t vgId) {
372 373
  int32_t len = 0;
  int32_t code = 0;
374
  tEncodeSize(tEncodeMqMetaRsp, pRsp, len, code);
375 376 377 378 379 380 381 382 383
  if (code < 0) {
    return -1;
  }
  int32_t tlen = sizeof(SMqRspHead) + len;
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

384 385 386
  int64_t sver = 0, ever = 0;
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
  initMqRspHead(buf, TMQ_MSG_TYPE__POLL_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
387 388 389 390 391

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);
392
  tEncodeMqMetaRsp(&encoder, pRsp);
393 394
  tEncoderClear(&encoder);

dengyihao's avatar
dengyihao 已提交
395
  SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
396

397 398 399
  tmsgSendRsp(&resp);
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type %d, offset type:%d", vgId,
          pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type);
400 401 402

  return 0;
}
403 404

int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
dengyihao's avatar
dengyihao 已提交
405
                        int32_t type, int64_t sver, int64_t ever) {
406 407 408
  int32_t len = 0;
  int32_t code = 0;

409
  if (type == TMQ_MSG_TYPE__POLL_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) {
410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432
    tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
    tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
  }

  if (code < 0) {
    return -1;
  }

  int32_t tlen = sizeof(SMqRspHead) + len;
  void*   buf = rpcMallocCont(tlen);
  if (buf == NULL) {
    return -1;
  }

  SMqRspHead* pHead = (SMqRspHead*)buf;
  initMqRspHead(pHead, type, epoch, consumerId, sver, ever);

  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));

  SEncoder encoder = {0};
  tEncoderInit(&encoder, abuf, len);

433
  if (type == TMQ_MSG_TYPE__POLL_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) {
434 435 436 437 438 439
    tEncodeMqDataRsp(&encoder, pRsp);
  } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
    tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
  }

  tEncoderClear(&encoder);
dengyihao's avatar
dengyihao 已提交
440
  SRpcMsg rsp = {.info = *pRpcHandleInfo, .pCont = buf, .contLen = tlen, .code = 0};
441 442 443

  tmsgSendRsp(&rsp);
  return 0;
Y
yihaoDeng 已提交
444
}