tqSink.c 21.1 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16 17
#include "tcommon.h"
#include "tmsg.h"
L
Liu Jicong 已提交
18 19
#include "tq.h"

L
Liu Jicong 已提交
20
int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBlock* pDataBlock,
21 22
                         SBatchDeleteReq* deleteReq) {
  int32_t          totRow = pDataBlock->info.rows;
L
Liu Jicong 已提交
23 24
  SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pDataBlock->pDataBlock, END_TS_COLUMN_INDEX);
25
  SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
26 27
  SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);

28 29
  tqDebug("stream delete msg: row %d", totRow);

30
  for (int32_t row = 0; row < totRow; row++) {
L
Liu Jicong 已提交
31 32
    int64_t startTs = *(int64_t*)colDataGetData(pStartTsCol, row);
    int64_t endTs = *(int64_t*)colDataGetData(pEndTsCol, row);
33
    int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);
34
    char*   name;
35 36 37 38 39
    void*   varTbName = NULL;
    if (!colDataIsNull(pTbNameCol, totRow, row, NULL)) {
      varTbName = colDataGetVarData(pTbNameCol, row);
    }

40 41 42 43 44 45
    if (varTbName != NULL && varTbName != (void*)-1) {
      name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
      memcpy(name, varDataVal(varTbName), varDataLen(varTbName));
    } else {
      name = buildCtbNameByGroupId(stbFullName, groupId);
    }
L
Liu Jicong 已提交
46 47
    tqDebug("stream delete msg: vgId:%d, groupId :%" PRId64 ", name: %s, start ts:%" PRId64 "end ts:%" PRId64,
            pVnode->config.vgId, groupId, name, startTs, endTs);
48
#if 0
49 50 51 52
    SMetaReader mr = {0};
    metaReaderInit(&mr, pVnode->pMeta, 0);
    if (metaGetTableEntryByName(&mr, name) < 0) {
      metaReaderClear(&mr);
53
      tqDebug("stream delete msg, skip vgId:%d since no table: %s", pVnode->config.vgId, name);
54
      taosMemoryFree(name);
5
54liuyao 已提交
55
      continue;
56 57 58 59 60
    }

    int64_t uid = mr.me.uid;
    metaReaderClear(&mr);
    taosMemoryFree(name);
61
#endif
62
    SSingleDeleteReq req = {
L
Liu Jicong 已提交
63 64
        .startTs = startTs,
        .endTs = endTs,
65
    };
66 67 68
    strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN);
    taosMemoryFree(name);
    /*tqDebug("stream delete msg, active: vgId:%d, ts:%" PRId64 " name:%s", pVnode->config.vgId, ts, name);*/
69 70 71 72 73
    taosArrayPush(deleteReq->deleteReqs, &req);
  }
  return 0;
}

74 75 76
SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema,
                            SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName,
                            SBatchDeleteReq* pDeleteReq) {
77
  SSubmitReq* ret = NULL;
78 79
  SArray*     schemaReqs = NULL;
  SArray*     schemaReqSz = NULL;
80 81 82 83 84 85 86 87
  SArray*     tagArray = taosArrayInit(1, sizeof(STagVal));
  if (!tagArray) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  int32_t sz = taosArrayGetSize(pBlocks);

88 89 90 91 92
  if (createTb) {
    schemaReqs = taosArrayInit(sz, sizeof(void*));
    schemaReqSz = taosArrayInit(sz, sizeof(int32_t));
    for (int32_t i = 0; i < sz; i++) {
      SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
93 94
      if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
        int32_t padding1 = 0;
L
Liu Jicong 已提交
95
        void*   padding2 = NULL;
96 97
        taosArrayPush(schemaReqSz, &padding1);
        taosArrayPush(schemaReqs, &padding2);
L
Liu Jicong 已提交
98
        continue;
99
      }
100

101 102 103 104 105 106 107
      //      STag* pTag = NULL;
      //      taosArrayClear(tagArray);
      //      SArray *tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
      //      for(int j = 0; j < pTagSchemaWrapper->nCols; j++){
      //        STagVal tagVal = {
      //            .cid = pTagSchemaWrapper->pSchema[j].colId,
      //            .type = pTagSchemaWrapper->pSchema[j].type,
H
Haojun Liao 已提交
108
      //            .i64 = (int64_t)pDataBlock->info.id.groupId,
109 110 111 112 113 114 115 116 117 118 119 120
      //        };
      //        taosArrayPush(tagArray, &tagVal);
      //        taosArrayPush(tagName, pTagSchemaWrapper->pSchema[j].name);
      //      }
      //
      //      tTagNew(tagArray, 1, false, &pTag);
      //      if (pTag == NULL) {
      //        terrno = TSDB_CODE_OUT_OF_MEMORY;
      //        taosArrayDestroy(tagArray);
      //        taosArrayDestroy(tagName);
      //        return NULL;
      //      }
121

122
      SVCreateTbReq createTbReq = {0};
L
Liu Jicong 已提交
123 124

      // set const
125 126
      createTbReq.flags = 0;
      createTbReq.type = TSDB_CHILD_TABLE;
L
Liu Jicong 已提交
127
      createTbReq.ctb.suid = suid;
128

L
Liu Jicong 已提交
129
      // set super table name
130 131 132 133
      SName name = {0};
      tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
      createTbReq.ctb.stbName = strdup((char*)tNameGetTableName(&name));  // strdup(stbFullName);

L
Liu Jicong 已提交
134 135 136 137 138
      // set tag content
      taosArrayClear(tagArray);
      STagVal tagVal = {
          .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
          .type = TSDB_DATA_TYPE_UBIGINT,
H
Haojun Liao 已提交
139
          .i64 = (int64_t)pDataBlock->info.id.groupId,
L
Liu Jicong 已提交
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
      };
      taosArrayPush(tagArray, &tagVal);
      createTbReq.ctb.tagNum = taosArrayGetSize(tagArray);

      STag* pTag = NULL;
      tTagNew(tagArray, 1, false, &pTag);
      if (pTag == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        taosArrayDestroy(tagArray);
        taosArrayDestroyP(schemaReqs, taosMemoryFree);
        taosArrayDestroy(schemaReqSz);
        return NULL;
      }
      createTbReq.ctb.pTag = (uint8_t*)pTag;

      // set tag name
      SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
      char    tagNameStr[TSDB_COL_NAME_LEN] = {0};
      strcpy(tagNameStr, "group_id");
      taosArrayPush(tagName, tagNameStr);
      createTbReq.ctb.tagName = tagName;

      // set table name
163 164 165
      if (pDataBlock->info.parTbName[0]) {
        createTbReq.name = strdup(pDataBlock->info.parTbName);
      } else {
H
Haojun Liao 已提交
166
        createTbReq.name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId);
167 168
      }

L
Liu Jicong 已提交
169
      // save schema len
170
      int32_t code;
171
      int32_t schemaLen;
172 173
      tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
      if (code < 0) {
174
        tdDestroySVCreateTbReq(&createTbReq);
175
        taosArrayDestroy(tagArray);
L
Liu Jicong 已提交
176 177
        taosArrayDestroyP(schemaReqs, taosMemoryFree);
        taosArrayDestroy(schemaReqSz);
178 179
        return NULL;
      }
L
Liu Jicong 已提交
180
      taosArrayPush(schemaReqSz, &schemaLen);
181

L
Liu Jicong 已提交
182
      // save schema str
183 184 185
      void* schemaStr = taosMemoryMalloc(schemaLen);
      if (schemaStr == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
186
        tdDestroySVCreateTbReq(&createTbReq);
L
Liu Jicong 已提交
187 188 189
        taosArrayDestroy(tagArray);
        taosArrayDestroyP(schemaReqs, taosMemoryFree);
        taosArrayDestroy(schemaReqSz);
190 191 192 193 194 195 196 197 198
        return NULL;
      }
      taosArrayPush(schemaReqs, &schemaStr);

      SEncoder encoder = {0};
      tEncoderInit(&encoder, schemaStr, schemaLen);
      code = tEncodeSVCreateTbReq(&encoder, &createTbReq);
      if (code < 0) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
199
        tdDestroySVCreateTbReq(&createTbReq);
L
Liu Jicong 已提交
200 201 202 203
        taosArrayDestroy(tagArray);
        taosArrayDestroyP(schemaReqs, taosMemoryFree);
        taosArrayDestroy(schemaReqSz);
        tEncoderClear(&encoder);
204 205
        return NULL;
      }
206 207
      tEncoderClear(&encoder);
      tdDestroySVCreateTbReq(&createTbReq);
208
    }
209 210
  }
  taosArrayDestroy(tagArray);
211

212 213 214 215
  // cal size
  int32_t cap = sizeof(SSubmitReq);
  for (int32_t i = 0; i < sz; i++) {
    SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
216 217 218 219
    if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
      continue;
    }
    int32_t rows = pDataBlock->info.rows;
L
Liu Jicong 已提交
220
    /*int32_t rowSize = pDataBlock->info.rowSize;*/
221 222 223 224 225 226
    int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);

    int32_t schemaLen = 0;
    if (createTb) {
      schemaLen = *(int32_t*)taosArrayGet(schemaReqSz, i);
    }
227 228 229 230 231
    cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen;
  }

  // assign data
  ret = rpcMallocCont(cap);
232
  ret->header.vgId = pVnode->config.vgId;
233 234 235
  ret->length = sizeof(SSubmitReq);
  ret->numOfBlocks = htonl(sz);

236
  SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq));
237 238
  for (int32_t i = 0; i < sz; i++) {
    SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
239 240
    if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
      pDeleteReq->suid = suid;
K
kailixu 已提交
241
      pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
L
Liu Jicong 已提交
242
      tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, pDeleteReq);
243 244
      continue;
    }
245

246
    blkHead->numOfRows = htonl(pDataBlock->info.rows);
247 248 249 250 251 252 253
    blkHead->sversion = htonl(pTSchema->version);
    blkHead->suid = htobe64(suid);
    // uid is assigned by vnode
    blkHead->uid = 0;

    int32_t rows = pDataBlock->info.rows;

L
Liu Jicong 已提交
254
    tqDebug("tq sink, convert block1 %d, rows: %d", i, rows);
L
Liu Jicong 已提交
255

256
    int32_t dataLen = 0;
257
    int32_t schemaLen = 0;
L
Liu Jicong 已提交
258
    void*   blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk));
259
    if (createTb) {
260 261 262
      schemaLen = *(int32_t*)taosArrayGet(schemaReqSz, i);
      void* schemaStr = taosArrayGetP(schemaReqs, i);
      memcpy(blkSchema, schemaStr, schemaLen);
263 264 265
    }
    blkHead->schemaLen = htonl(schemaLen);

266
    STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);
267 268 269 270 271 272 273 274 275 276
    for (int32_t j = 0; j < rows; j++) {
      SRowBuilder rb = {0};
      tdSRowInit(&rb, pTSchema->version);
      tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen);
      tdSRowResetBuf(&rb, rowData);

      for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
        const STColumn*  pColumn = &pTSchema->columns[k];
        SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k);
        if (colDataIsNull_s(pColData, j)) {
277
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k);
278 279 280 281 282
        } else {
          void* data = colDataGetData(pColData, j);
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k);
        }
      }
283
      tdSRowEnd(&rb);
284 285
      int32_t rowLen = TD_ROW_LEN(rowData);
      rowData = POINTER_SHIFT(rowData, rowLen);
286
      dataLen += rowLen;
287 288 289 290
    }
    blkHead->dataLen = htonl(dataLen);

    ret->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
291
    blkHead = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk) + schemaLen + dataLen);
292 293 294
  }

  ret->length = htonl(ret->length);
295

L
Liu Jicong 已提交
296
  taosArrayDestroyP(schemaReqs, taosMemoryFree);
297 298
  taosArrayDestroy(schemaReqSz);

299 300 301
  return ret;
}

L
Liu Jicong 已提交
302
void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
L
Liu Jicong 已提交
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319
  const SArray*   pBlocks = (const SArray*)data;
  SVnode*         pVnode = (SVnode*)vnode;
  int64_t         suid = pTask->tbSink.stbUid;
  char*           stbFullName = pTask->tbSink.stbFullName;
  STSchema*       pTSchema = pTask->tbSink.pTSchema;
  SSchemaWrapper* pSchemaWrapper = pTask->tbSink.pSchemaWrapper;

  int32_t blockSz = taosArrayGetSize(pBlocks);

  SArray* tagArray = taosArrayInit(1, sizeof(STagVal));
  if (!tagArray) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return;
  }

  tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, blockSz);
  for (int32_t i = 0; i < blockSz; i++) {
320
    bool         createTb = true;
L
Liu Jicong 已提交
321 322 323 324 325 326
    SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
    if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
      SBatchDeleteReq deleteReq = {0};
      deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
      deleteReq.suid = suid;
      tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, &deleteReq);
327 328 329 330
      if (taosArrayGetSize(deleteReq.deleteReqs) == 0) {
        taosArrayDestroy(deleteReq.deleteReqs);
        continue;
      }
L
Liu Jicong 已提交
331 332 333 334 335

      int32_t len;
      int32_t code;
      tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
      if (code < 0) {
L
Liu Jicong 已提交
336 337
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        return;
L
Liu Jicong 已提交
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
      }
      SEncoder encoder;
      void*    serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
      void*    abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
      tEncoderInit(&encoder, abuf, len);
      tEncodeSBatchDeleteReq(&encoder, &deleteReq);
      tEncoderClear(&encoder);
      taosArrayDestroy(deleteReq.deleteReqs);

      ((SMsgHead*)serializedDeleteReq)->vgId = pVnode->config.vgId;

      SRpcMsg msg = {
          .msgType = TDMT_VND_BATCH_DEL,
          .pCont = serializedDeleteReq,
          .contLen = len + sizeof(SMsgHead),
      };
      if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
        tqDebug("failed to put delete req into write-queue since %s", terrstr());
      }
    } else {
358 359
      char* ctbName = NULL;
      // set child table name
L
Liu Jicong 已提交
360
      if (pDataBlock->info.parTbName[0]) {
361
        ctbName = strdup(pDataBlock->info.parTbName);
L
Liu Jicong 已提交
362
      } else {
H
Haojun Liao 已提交
363
        ctbName = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId);
L
Liu Jicong 已提交
364 365
      }

366 367 368 369 370 371 372
      int32_t schemaLen = 0;
      void*   schemaStr = NULL;

      int64_t     uid = 0;
      SMetaReader mr = {0};
      metaReaderInit(&mr, pVnode->pMeta, 0);
      if (metaGetTableEntryByName(&mr, ctbName) < 0) {
L
Liu Jicong 已提交
373
        metaReaderClear(&mr);
374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394
        tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName);

        SVCreateTbReq createTbReq = {0};

        // set const
        createTbReq.flags = 0;
        createTbReq.type = TSDB_CHILD_TABLE;
        createTbReq.ctb.suid = suid;

        // set super table name
        SName name = {0};
        tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
        createTbReq.ctb.stbName = strdup((char*)tNameGetTableName(&name));  // strdup(stbFullName);
        createTbReq.name = ctbName;
        ctbName = NULL;

        // set tag content
        taosArrayClear(tagArray);
        STagVal tagVal = {
            .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
            .type = TSDB_DATA_TYPE_UBIGINT,
H
Haojun Liao 已提交
395
            .i64 = (int64_t)pDataBlock->info.id.groupId,
396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423
        };
        taosArrayPush(tagArray, &tagVal);
        createTbReq.ctb.tagNum = taosArrayGetSize(tagArray);

        STag* pTag = NULL;
        tTagNew(tagArray, 1, false, &pTag);
        if (pTag == NULL) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          taosArrayDestroy(tagArray);
          tdDestroySVCreateTbReq(&createTbReq);
          return;
        }
        createTbReq.ctb.pTag = (uint8_t*)pTag;

        // set tag name
        SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
        char    tagNameStr[TSDB_COL_NAME_LEN] = {0};
        strcpy(tagNameStr, "group_id");
        taosArrayPush(tagName, tagNameStr);
        createTbReq.ctb.tagName = tagName;

        int32_t code;
        tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
        if (code < 0) {
          tdDestroySVCreateTbReq(&createTbReq);
          taosArrayDestroy(tagArray);
          return;
        }
L
Liu Jicong 已提交
424

425 426 427 428 429 430 431 432
        // set schema str
        schemaStr = taosMemoryMalloc(schemaLen);
        if (schemaStr == NULL) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          tdDestroySVCreateTbReq(&createTbReq);
          taosArrayDestroy(tagArray);
          return;
        }
L
Liu Jicong 已提交
433

434 435 436 437 438 439 440 441 442 443 444
        SEncoder encoder = {0};
        tEncoderInit(&encoder, schemaStr, schemaLen);
        code = tEncodeSVCreateTbReq(&encoder, &createTbReq);
        if (code < 0) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          tdDestroySVCreateTbReq(&createTbReq);
          taosArrayDestroy(tagArray);
          tEncoderClear(&encoder);
          taosMemoryFree(schemaStr);
          return;
        }
L
Liu Jicong 已提交
445
        tEncoderClear(&encoder);
446 447 448 449 450 451 452 453 454 455
        tdDestroySVCreateTbReq(&createTbReq);
      } else {
        if (mr.me.type != TSDB_CHILD_TABLE) {
          tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName,
                  mr.me.type);
          metaReaderClear(&mr);
          taosMemoryFree(ctbName);
          continue;
        }
        if (mr.me.ctbEntry.suid != suid) {
456 457
          tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid: %" PRId64
                  ", actual suid %" PRId64 "",
L
Liu Jicong 已提交
458
                  TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid);
459 460 461 462 463 464 465 466
          metaReaderClear(&mr);
          taosMemoryFree(ctbName);
          continue;
        }

        createTb = false;
        uid = mr.me.uid;
        metaReaderClear(&mr);
L
Liu Jicong 已提交
467

468 469
        tqDebug("vgId:%d, stream write, table %s, uid %" PRId64 " already exist, skip create", TD_VID(pVnode), ctbName,
                uid);
L
Liu Jicong 已提交
470

471
        taosMemoryFreeClear(ctbName);
L
Liu Jicong 已提交
472 473 474 475 476 477 478 479 480
      }

      int32_t cap = sizeof(SSubmitReq);

      int32_t rows = pDataBlock->info.rows;
      int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);

      cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen;

481 482 483 484
      SSubmitReq* pSubmit = rpcMallocCont(cap);
      pSubmit->header.vgId = pVnode->config.vgId;
      pSubmit->length = sizeof(SSubmitReq);
      pSubmit->numOfBlocks = htonl(1);
L
Liu Jicong 已提交
485

486
      SSubmitBlk* blkHead = POINTER_SHIFT(pSubmit, sizeof(SSubmitReq));
L
Liu Jicong 已提交
487 488 489 490 491 492 493 494

      blkHead->numOfRows = htonl(pDataBlock->info.rows);
      blkHead->sversion = htonl(pTSchema->version);
      blkHead->suid = htobe64(suid);
      // uid is assigned by vnode
      blkHead->uid = 0;
      blkHead->schemaLen = 0;

L
Liu Jicong 已提交
495
      tqDebug("tq sink, convert block2 %d, rows: %d", i, rows);
L
Liu Jicong 已提交
496 497 498 499 500 501 502 503

      int32_t dataLen = 0;
      void*   blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk));
      STSRow* rowData = blkSchema;
      if (createTb) {
        memcpy(blkSchema, schemaStr, schemaLen);
        blkHead->schemaLen = htonl(schemaLen);
        rowData = POINTER_SHIFT(blkSchema, schemaLen);
504
      } else {
L
Liu Jicong 已提交
505
        blkHead->uid = htobe64(uid);
L
Liu Jicong 已提交
506 507
      }

508
      taosMemoryFreeClear(schemaStr);
L
Liu Jicong 已提交
509 510 511 512 513 514 515 516 517 518 519 520 521 522

      for (int32_t j = 0; j < rows; j++) {
        SRowBuilder rb = {0};
        tdSRowInit(&rb, pTSchema->version);
        tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen);
        tdSRowResetBuf(&rb, rowData);

        for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
          const STColumn*  pColumn = &pTSchema->columns[k];
          SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k);
          if (colDataIsNull_s(pColData, j)) {
            tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k);
          } else {
            void* colData = colDataGetData(pColData, j);
L
Liu Jicong 已提交
523 524 525
            if (k == 0) {
              tqDebug("tq sink, row %d ts %" PRId64, j, *(int64_t*)colData);
            }
L
Liu Jicong 已提交
526 527 528 529 530 531 532 533 534 535
            tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, pColumn->offset, k);
          }
        }
        tdSRowEnd(&rb);
        int32_t rowLen = TD_ROW_LEN(rowData);
        rowData = POINTER_SHIFT(rowData, rowLen);
        dataLen += rowLen;
      }
      blkHead->dataLen = htonl(dataLen);

536 537
      pSubmit->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
      pSubmit->length = htonl(pSubmit->length);
L
Liu Jicong 已提交
538 539 540

      SRpcMsg msg = {
          .msgType = TDMT_VND_SUBMIT,
541 542
          .pCont = pSubmit,
          .contLen = ntohl(pSubmit->length),
L
Liu Jicong 已提交
543 544 545 546 547 548 549 550 551 552
      };

      if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
        tqDebug("failed to put into write-queue since %s", terrstr());
      }
    }
  }
  taosArrayDestroy(tagArray);
}

553
#if 0
L
Liu Jicong 已提交
554
void tqSinkToTableMerge(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
555 556 557
  const SArray*   pRes = (const SArray*)data;
  SVnode*         pVnode = (SVnode*)vnode;
  SBatchDeleteReq deleteReq = {0};
L
Liu Jicong 已提交
558

L
Liu Jicong 已提交
559
  tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, (int32_t)pRes->size);
L
Liu Jicong 已提交
560

561
  deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
562 563
  SSubmitReq* submitReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, pTask->tbSink.pSchemaWrapper, true,
                                          pTask->tbSink.stbUid, pTask->tbSink.stbFullName, &deleteReq);
L
Liu Jicong 已提交
564 565 566

  tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId);

L
Liu Jicong 已提交
567 568 569 570 571 572 573 574 575 576
  if (taosArrayGetSize(deleteReq.deleteReqs) != 0) {
    int32_t code;
    int32_t len;
    tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
    SEncoder encoder;
    void*    serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
    void*    abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
    tEncoderInit(&encoder, abuf, len);
    tEncodeSBatchDeleteReq(&encoder, &deleteReq);
    tEncoderClear(&encoder);
577

L
Liu Jicong 已提交
578
    ((SMsgHead*)serializedDeleteReq)->vgId = pVnode->config.vgId;
579

580 581
    SRpcMsg msg = {
        .msgType = TDMT_VND_BATCH_DEL,
L
Liu Jicong 已提交
582
        .pCont = serializedDeleteReq,
583 584 585
        .contLen = len + sizeof(SMsgHead),
    };
    if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
L
Liu Jicong 已提交
586
      rpcFreeCont(serializedDeleteReq);
587 588 589 590 591
      tqDebug("failed to put into write-queue since %s", terrstr());
    }
  }
  taosArrayDestroy(deleteReq.deleteReqs);

L
Liu Jicong 已提交
592 593 594 595
  /*tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);*/
  // build write msg
  SRpcMsg msg = {
      .msgType = TDMT_VND_SUBMIT,
L
Liu Jicong 已提交
596 597
      .pCont = submitReq,
      .contLen = ntohl(submitReq->length),
L
Liu Jicong 已提交
598 599
  };

S
Shengliang Guan 已提交
600 601 602
  if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
    tqDebug("failed to put into write-queue since %s", terrstr());
  }
L
Liu Jicong 已提交
603
}
604
#endif