tqSink.c 21.0 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16 17
#include "tcommon.h"
#include "tmsg.h"
L
Liu Jicong 已提交
18 19
#include "tq.h"

L
Liu Jicong 已提交
20
int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBlock* pDataBlock,
21 22 23 24 25
                         SBatchDeleteReq* deleteReq) {
  ASSERT(pDataBlock->info.type == STREAM_DELETE_RESULT);
  int32_t          totRow = pDataBlock->info.rows;
  SColumnInfoData* pTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
26 27
  SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);

28 29
  tqDebug("stream delete msg: row %d", totRow);

30 31
  for (int32_t row = 0; row < totRow; row++) {
    int64_t ts = *(int64_t*)colDataGetData(pTsCol, row);
32
    int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);
33
    char*   name;
34 35 36 37 38
    void*   varTbName = NULL;
    if (!colDataIsNull(pTbNameCol, totRow, row, NULL)) {
      varTbName = colDataGetVarData(pTbNameCol, row);
    }

39 40 41 42 43 44
    if (varTbName != NULL && varTbName != (void*)-1) {
      name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
      memcpy(name, varDataVal(varTbName), varDataLen(varTbName));
    } else {
      name = buildCtbNameByGroupId(stbFullName, groupId);
    }
45 46 47
    tqDebug("stream delete msg: vgId:%d, groupId :%" PRId64 ", name: %s, ts:%" PRId64, pVnode->config.vgId, groupId,
            name, ts);
#if 0
48 49 50 51
    SMetaReader mr = {0};
    metaReaderInit(&mr, pVnode->pMeta, 0);
    if (metaGetTableEntryByName(&mr, name) < 0) {
      metaReaderClear(&mr);
52
      tqDebug("stream delete msg, skip vgId:%d since no table: %s", pVnode->config.vgId, name);
53
      taosMemoryFree(name);
5
54liuyao 已提交
54
      continue;
55 56 57 58 59
    }

    int64_t uid = mr.me.uid;
    metaReaderClear(&mr);
    taosMemoryFree(name);
60
#endif
61 62 63
    SSingleDeleteReq req = {
        .ts = ts,
    };
64 65 66
    strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN);
    taosMemoryFree(name);
    /*tqDebug("stream delete msg, active: vgId:%d, ts:%" PRId64 " name:%s", pVnode->config.vgId, ts, name);*/
67 68 69 70 71
    taosArrayPush(deleteReq->deleteReqs, &req);
  }
  return 0;
}

72 73 74
SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema,
                            SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName,
                            SBatchDeleteReq* pDeleteReq) {
75
  SSubmitReq* ret = NULL;
76 77
  SArray*     schemaReqs = NULL;
  SArray*     schemaReqSz = NULL;
78 79 80 81 82 83 84 85
  SArray*     tagArray = taosArrayInit(1, sizeof(STagVal));
  if (!tagArray) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  int32_t sz = taosArrayGetSize(pBlocks);

86 87 88 89 90
  if (createTb) {
    schemaReqs = taosArrayInit(sz, sizeof(void*));
    schemaReqSz = taosArrayInit(sz, sizeof(int32_t));
    for (int32_t i = 0; i < sz; i++) {
      SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
91 92
      if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
        int32_t padding1 = 0;
L
Liu Jicong 已提交
93
        void*   padding2 = NULL;
94 95
        taosArrayPush(schemaReqSz, &padding1);
        taosArrayPush(schemaReqs, &padding2);
L
Liu Jicong 已提交
96
        continue;
97
      }
98

99 100 101 102 103 104 105
      //      STag* pTag = NULL;
      //      taosArrayClear(tagArray);
      //      SArray *tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
      //      for(int j = 0; j < pTagSchemaWrapper->nCols; j++){
      //        STagVal tagVal = {
      //            .cid = pTagSchemaWrapper->pSchema[j].colId,
      //            .type = pTagSchemaWrapper->pSchema[j].type,
H
Haojun Liao 已提交
106
      //            .i64 = (int64_t)pDataBlock->info.id.groupId,
107 108 109 110 111 112 113 114 115 116 117 118
      //        };
      //        taosArrayPush(tagArray, &tagVal);
      //        taosArrayPush(tagName, pTagSchemaWrapper->pSchema[j].name);
      //      }
      //
      //      tTagNew(tagArray, 1, false, &pTag);
      //      if (pTag == NULL) {
      //        terrno = TSDB_CODE_OUT_OF_MEMORY;
      //        taosArrayDestroy(tagArray);
      //        taosArrayDestroy(tagName);
      //        return NULL;
      //      }
119

120
      SVCreateTbReq createTbReq = {0};
L
Liu Jicong 已提交
121 122

      // set const
123 124
      createTbReq.flags = 0;
      createTbReq.type = TSDB_CHILD_TABLE;
L
Liu Jicong 已提交
125
      createTbReq.ctb.suid = suid;
126

L
Liu Jicong 已提交
127
      // set super table name
128 129 130 131
      SName name = {0};
      tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
      createTbReq.ctb.stbName = strdup((char*)tNameGetTableName(&name));  // strdup(stbFullName);

L
Liu Jicong 已提交
132 133 134 135 136
      // set tag content
      taosArrayClear(tagArray);
      STagVal tagVal = {
          .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
          .type = TSDB_DATA_TYPE_UBIGINT,
H
Haojun Liao 已提交
137
          .i64 = (int64_t)pDataBlock->info.id.groupId,
L
Liu Jicong 已提交
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
      };
      taosArrayPush(tagArray, &tagVal);
      createTbReq.ctb.tagNum = taosArrayGetSize(tagArray);

      STag* pTag = NULL;
      tTagNew(tagArray, 1, false, &pTag);
      if (pTag == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        taosArrayDestroy(tagArray);
        taosArrayDestroyP(schemaReqs, taosMemoryFree);
        taosArrayDestroy(schemaReqSz);
        return NULL;
      }
      createTbReq.ctb.pTag = (uint8_t*)pTag;

      // set tag name
      SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
      char    tagNameStr[TSDB_COL_NAME_LEN] = {0};
      strcpy(tagNameStr, "group_id");
      taosArrayPush(tagName, tagNameStr);
      createTbReq.ctb.tagName = tagName;

      // set table name
161 162 163
      if (pDataBlock->info.parTbName[0]) {
        createTbReq.name = strdup(pDataBlock->info.parTbName);
      } else {
H
Haojun Liao 已提交
164
        createTbReq.name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId);
165 166
      }

L
Liu Jicong 已提交
167
      // save schema len
168
      int32_t code;
169
      int32_t schemaLen;
170 171
      tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
      if (code < 0) {
172
        tdDestroySVCreateTbReq(&createTbReq);
173
        taosArrayDestroy(tagArray);
L
Liu Jicong 已提交
174 175
        taosArrayDestroyP(schemaReqs, taosMemoryFree);
        taosArrayDestroy(schemaReqSz);
176 177
        return NULL;
      }
L
Liu Jicong 已提交
178
      taosArrayPush(schemaReqSz, &schemaLen);
179

L
Liu Jicong 已提交
180
      // save schema str
181 182 183
      void* schemaStr = taosMemoryMalloc(schemaLen);
      if (schemaStr == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
184
        tdDestroySVCreateTbReq(&createTbReq);
L
Liu Jicong 已提交
185 186 187
        taosArrayDestroy(tagArray);
        taosArrayDestroyP(schemaReqs, taosMemoryFree);
        taosArrayDestroy(schemaReqSz);
188 189 190 191 192 193 194 195 196
        return NULL;
      }
      taosArrayPush(schemaReqs, &schemaStr);

      SEncoder encoder = {0};
      tEncoderInit(&encoder, schemaStr, schemaLen);
      code = tEncodeSVCreateTbReq(&encoder, &createTbReq);
      if (code < 0) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
197
        tdDestroySVCreateTbReq(&createTbReq);
L
Liu Jicong 已提交
198 199 200 201
        taosArrayDestroy(tagArray);
        taosArrayDestroyP(schemaReqs, taosMemoryFree);
        taosArrayDestroy(schemaReqSz);
        tEncoderClear(&encoder);
202 203
        return NULL;
      }
204 205
      tEncoderClear(&encoder);
      tdDestroySVCreateTbReq(&createTbReq);
206
    }
207 208
  }
  taosArrayDestroy(tagArray);
209

210 211 212 213
  // cal size
  int32_t cap = sizeof(SSubmitReq);
  for (int32_t i = 0; i < sz; i++) {
    SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
214 215 216 217
    if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
      continue;
    }
    int32_t rows = pDataBlock->info.rows;
L
Liu Jicong 已提交
218
    /*int32_t rowSize = pDataBlock->info.rowSize;*/
219 220 221 222 223 224
    int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);

    int32_t schemaLen = 0;
    if (createTb) {
      schemaLen = *(int32_t*)taosArrayGet(schemaReqSz, i);
    }
225 226 227 228 229
    cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen;
  }

  // assign data
  ret = rpcMallocCont(cap);
230
  ret->header.vgId = pVnode->config.vgId;
231 232 233
  ret->length = sizeof(SSubmitReq);
  ret->numOfBlocks = htonl(sz);

234
  SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq));
235 236
  for (int32_t i = 0; i < sz; i++) {
    SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
237 238
    if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
      pDeleteReq->suid = suid;
K
kailixu 已提交
239
      pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
L
Liu Jicong 已提交
240
      tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, pDeleteReq);
241 242
      continue;
    }
243

244
    blkHead->numOfRows = htonl(pDataBlock->info.rows);
245 246 247 248 249 250 251
    blkHead->sversion = htonl(pTSchema->version);
    blkHead->suid = htobe64(suid);
    // uid is assigned by vnode
    blkHead->uid = 0;

    int32_t rows = pDataBlock->info.rows;

L
Liu Jicong 已提交
252
    tqDebug("tq sink, convert block1 %d, rows: %d", i, rows);
L
Liu Jicong 已提交
253

254
    int32_t dataLen = 0;
255
    int32_t schemaLen = 0;
L
Liu Jicong 已提交
256
    void*   blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk));
257
    if (createTb) {
258 259 260
      schemaLen = *(int32_t*)taosArrayGet(schemaReqSz, i);
      void* schemaStr = taosArrayGetP(schemaReqs, i);
      memcpy(blkSchema, schemaStr, schemaLen);
261 262 263
    }
    blkHead->schemaLen = htonl(schemaLen);

264
    STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);
265 266 267 268 269 270 271 272 273 274
    for (int32_t j = 0; j < rows; j++) {
      SRowBuilder rb = {0};
      tdSRowInit(&rb, pTSchema->version);
      tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen);
      tdSRowResetBuf(&rb, rowData);

      for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
        const STColumn*  pColumn = &pTSchema->columns[k];
        SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k);
        if (colDataIsNull_s(pColData, j)) {
275
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k);
276 277 278 279 280
        } else {
          void* data = colDataGetData(pColData, j);
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k);
        }
      }
281
      tdSRowEnd(&rb);
282 283
      int32_t rowLen = TD_ROW_LEN(rowData);
      rowData = POINTER_SHIFT(rowData, rowLen);
284
      dataLen += rowLen;
285 286 287 288
    }
    blkHead->dataLen = htonl(dataLen);

    ret->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
289
    blkHead = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk) + schemaLen + dataLen);
290 291 292
  }

  ret->length = htonl(ret->length);
293

L
Liu Jicong 已提交
294
  taosArrayDestroyP(schemaReqs, taosMemoryFree);
295 296
  taosArrayDestroy(schemaReqSz);

297 298 299
  return ret;
}

L
Liu Jicong 已提交
300
void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
L
Liu Jicong 已提交
301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
  const SArray*   pBlocks = (const SArray*)data;
  SVnode*         pVnode = (SVnode*)vnode;
  int64_t         suid = pTask->tbSink.stbUid;
  char*           stbFullName = pTask->tbSink.stbFullName;
  STSchema*       pTSchema = pTask->tbSink.pTSchema;
  SSchemaWrapper* pSchemaWrapper = pTask->tbSink.pSchemaWrapper;

  int32_t blockSz = taosArrayGetSize(pBlocks);

  SArray* tagArray = taosArrayInit(1, sizeof(STagVal));
  if (!tagArray) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return;
  }

  tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, blockSz);
  for (int32_t i = 0; i < blockSz; i++) {
318
    bool         createTb = true;
L
Liu Jicong 已提交
319 320 321 322 323 324
    SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
    if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
      SBatchDeleteReq deleteReq = {0};
      deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
      deleteReq.suid = suid;
      tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, &deleteReq);
325 326 327 328
      if (taosArrayGetSize(deleteReq.deleteReqs) == 0) {
        taosArrayDestroy(deleteReq.deleteReqs);
        continue;
      }
L
Liu Jicong 已提交
329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355

      int32_t len;
      int32_t code;
      tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
      if (code < 0) {
        //
        ASSERT(0);
      }
      SEncoder encoder;
      void*    serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
      void*    abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
      tEncoderInit(&encoder, abuf, len);
      tEncodeSBatchDeleteReq(&encoder, &deleteReq);
      tEncoderClear(&encoder);
      taosArrayDestroy(deleteReq.deleteReqs);

      ((SMsgHead*)serializedDeleteReq)->vgId = pVnode->config.vgId;

      SRpcMsg msg = {
          .msgType = TDMT_VND_BATCH_DEL,
          .pCont = serializedDeleteReq,
          .contLen = len + sizeof(SMsgHead),
      };
      if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
        tqDebug("failed to put delete req into write-queue since %s", terrstr());
      }
    } else {
356 357
      char* ctbName = NULL;
      // set child table name
L
Liu Jicong 已提交
358
      if (pDataBlock->info.parTbName[0]) {
359
        ctbName = strdup(pDataBlock->info.parTbName);
L
Liu Jicong 已提交
360
      } else {
H
Haojun Liao 已提交
361
        ctbName = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId);
L
Liu Jicong 已提交
362 363
      }

364 365 366 367 368 369 370
      int32_t schemaLen = 0;
      void*   schemaStr = NULL;

      int64_t     uid = 0;
      SMetaReader mr = {0};
      metaReaderInit(&mr, pVnode->pMeta, 0);
      if (metaGetTableEntryByName(&mr, ctbName) < 0) {
L
Liu Jicong 已提交
371
        metaReaderClear(&mr);
372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392
        tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName);

        SVCreateTbReq createTbReq = {0};

        // set const
        createTbReq.flags = 0;
        createTbReq.type = TSDB_CHILD_TABLE;
        createTbReq.ctb.suid = suid;

        // set super table name
        SName name = {0};
        tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
        createTbReq.ctb.stbName = strdup((char*)tNameGetTableName(&name));  // strdup(stbFullName);
        createTbReq.name = ctbName;
        ctbName = NULL;

        // set tag content
        taosArrayClear(tagArray);
        STagVal tagVal = {
            .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
            .type = TSDB_DATA_TYPE_UBIGINT,
H
Haojun Liao 已提交
393
            .i64 = (int64_t)pDataBlock->info.id.groupId,
394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421
        };
        taosArrayPush(tagArray, &tagVal);
        createTbReq.ctb.tagNum = taosArrayGetSize(tagArray);

        STag* pTag = NULL;
        tTagNew(tagArray, 1, false, &pTag);
        if (pTag == NULL) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          taosArrayDestroy(tagArray);
          tdDestroySVCreateTbReq(&createTbReq);
          return;
        }
        createTbReq.ctb.pTag = (uint8_t*)pTag;

        // set tag name
        SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
        char    tagNameStr[TSDB_COL_NAME_LEN] = {0};
        strcpy(tagNameStr, "group_id");
        taosArrayPush(tagName, tagNameStr);
        createTbReq.ctb.tagName = tagName;

        int32_t code;
        tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
        if (code < 0) {
          tdDestroySVCreateTbReq(&createTbReq);
          taosArrayDestroy(tagArray);
          return;
        }
L
Liu Jicong 已提交
422

423 424 425 426 427 428 429 430
        // set schema str
        schemaStr = taosMemoryMalloc(schemaLen);
        if (schemaStr == NULL) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          tdDestroySVCreateTbReq(&createTbReq);
          taosArrayDestroy(tagArray);
          return;
        }
L
Liu Jicong 已提交
431

432 433 434 435 436 437 438 439 440 441 442
        SEncoder encoder = {0};
        tEncoderInit(&encoder, schemaStr, schemaLen);
        code = tEncodeSVCreateTbReq(&encoder, &createTbReq);
        if (code < 0) {
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          tdDestroySVCreateTbReq(&createTbReq);
          taosArrayDestroy(tagArray);
          tEncoderClear(&encoder);
          taosMemoryFree(schemaStr);
          return;
        }
L
Liu Jicong 已提交
443
        tEncoderClear(&encoder);
444 445 446 447 448 449 450 451 452 453
        tdDestroySVCreateTbReq(&createTbReq);
      } else {
        if (mr.me.type != TSDB_CHILD_TABLE) {
          tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName,
                  mr.me.type);
          metaReaderClear(&mr);
          taosMemoryFree(ctbName);
          continue;
        }
        if (mr.me.ctbEntry.suid != suid) {
454 455
          tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid: %" PRId64
                  ", actual suid %" PRId64 "",
L
Liu Jicong 已提交
456
                  TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid);
457 458 459 460 461 462 463 464
          metaReaderClear(&mr);
          taosMemoryFree(ctbName);
          continue;
        }

        createTb = false;
        uid = mr.me.uid;
        metaReaderClear(&mr);
L
Liu Jicong 已提交
465

466 467
        tqDebug("vgId:%d, stream write, table %s, uid %" PRId64 " already exist, skip create", TD_VID(pVnode), ctbName,
                uid);
L
Liu Jicong 已提交
468

469
        taosMemoryFreeClear(ctbName);
L
Liu Jicong 已提交
470 471 472 473 474 475 476 477 478
      }

      int32_t cap = sizeof(SSubmitReq);

      int32_t rows = pDataBlock->info.rows;
      int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);

      cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen;

479 480 481 482
      SSubmitReq* pSubmit = rpcMallocCont(cap);
      pSubmit->header.vgId = pVnode->config.vgId;
      pSubmit->length = sizeof(SSubmitReq);
      pSubmit->numOfBlocks = htonl(1);
L
Liu Jicong 已提交
483

484
      SSubmitBlk* blkHead = POINTER_SHIFT(pSubmit, sizeof(SSubmitReq));
L
Liu Jicong 已提交
485 486 487 488 489 490 491 492

      blkHead->numOfRows = htonl(pDataBlock->info.rows);
      blkHead->sversion = htonl(pTSchema->version);
      blkHead->suid = htobe64(suid);
      // uid is assigned by vnode
      blkHead->uid = 0;
      blkHead->schemaLen = 0;

L
Liu Jicong 已提交
493
      tqDebug("tq sink, convert block2 %d, rows: %d", i, rows);
L
Liu Jicong 已提交
494 495 496 497 498 499 500 501

      int32_t dataLen = 0;
      void*   blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk));
      STSRow* rowData = blkSchema;
      if (createTb) {
        memcpy(blkSchema, schemaStr, schemaLen);
        blkHead->schemaLen = htonl(schemaLen);
        rowData = POINTER_SHIFT(blkSchema, schemaLen);
502
      } else {
L
Liu Jicong 已提交
503
        blkHead->uid = htobe64(uid);
L
Liu Jicong 已提交
504 505
      }

506
      taosMemoryFreeClear(schemaStr);
L
Liu Jicong 已提交
507 508 509 510 511 512 513 514 515 516 517 518 519 520

      for (int32_t j = 0; j < rows; j++) {
        SRowBuilder rb = {0};
        tdSRowInit(&rb, pTSchema->version);
        tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen);
        tdSRowResetBuf(&rb, rowData);

        for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
          const STColumn*  pColumn = &pTSchema->columns[k];
          SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k);
          if (colDataIsNull_s(pColData, j)) {
            tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k);
          } else {
            void* colData = colDataGetData(pColData, j);
L
Liu Jicong 已提交
521 522 523
            if (k == 0) {
              tqDebug("tq sink, row %d ts %" PRId64, j, *(int64_t*)colData);
            }
L
Liu Jicong 已提交
524 525 526 527 528 529 530 531 532 533
            tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, pColumn->offset, k);
          }
        }
        tdSRowEnd(&rb);
        int32_t rowLen = TD_ROW_LEN(rowData);
        rowData = POINTER_SHIFT(rowData, rowLen);
        dataLen += rowLen;
      }
      blkHead->dataLen = htonl(dataLen);

534 535
      pSubmit->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
      pSubmit->length = htonl(pSubmit->length);
L
Liu Jicong 已提交
536 537 538

      SRpcMsg msg = {
          .msgType = TDMT_VND_SUBMIT,
539 540
          .pCont = pSubmit,
          .contLen = ntohl(pSubmit->length),
L
Liu Jicong 已提交
541 542 543 544 545 546 547 548 549 550
      };

      if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
        tqDebug("failed to put into write-queue since %s", terrstr());
      }
    }
  }
  taosArrayDestroy(tagArray);
}

551
#if 0
L
Liu Jicong 已提交
552
void tqSinkToTableMerge(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
553 554 555
  const SArray*   pRes = (const SArray*)data;
  SVnode*         pVnode = (SVnode*)vnode;
  SBatchDeleteReq deleteReq = {0};
L
Liu Jicong 已提交
556

L
Liu Jicong 已提交
557
  tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, (int32_t)pRes->size);
L
Liu Jicong 已提交
558

L
Liu Jicong 已提交
559
  ASSERT(pTask->tbSink.pTSchema);
560
  deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
561 562
  SSubmitReq* submitReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, pTask->tbSink.pSchemaWrapper, true,
                                          pTask->tbSink.stbUid, pTask->tbSink.stbFullName, &deleteReq);
L
Liu Jicong 已提交
563 564 565

  tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId);

L
Liu Jicong 已提交
566 567 568 569 570 571 572 573 574 575 576 577 578 579
  if (taosArrayGetSize(deleteReq.deleteReqs) != 0) {
    int32_t code;
    int32_t len;
    tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
    if (code < 0) {
      //
      ASSERT(0);
    }
    SEncoder encoder;
    void*    serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
    void*    abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
    tEncoderInit(&encoder, abuf, len);
    tEncodeSBatchDeleteReq(&encoder, &deleteReq);
    tEncoderClear(&encoder);
580

L
Liu Jicong 已提交
581
    ((SMsgHead*)serializedDeleteReq)->vgId = pVnode->config.vgId;
582

583 584
    SRpcMsg msg = {
        .msgType = TDMT_VND_BATCH_DEL,
L
Liu Jicong 已提交
585
        .pCont = serializedDeleteReq,
586 587 588
        .contLen = len + sizeof(SMsgHead),
    };
    if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
L
Liu Jicong 已提交
589
      rpcFreeCont(serializedDeleteReq);
590 591 592 593 594
      tqDebug("failed to put into write-queue since %s", terrstr());
    }
  }
  taosArrayDestroy(deleteReq.deleteReqs);

L
Liu Jicong 已提交
595 596 597 598
  /*tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);*/
  // build write msg
  SRpcMsg msg = {
      .msgType = TDMT_VND_SUBMIT,
L
Liu Jicong 已提交
599 600
      .pCont = submitReq,
      .contLen = ntohl(submitReq->length),
L
Liu Jicong 已提交
601 602
  };

S
Shengliang Guan 已提交
603 604 605
  if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
    tqDebug("failed to put into write-queue since %s", terrstr());
  }
L
Liu Jicong 已提交
606
}
607
#endif