tqSink.c 18.0 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16 17
#include "tcommon.h"
#include "tmsg.h"
L
Liu Jicong 已提交
18 19
#include "tq.h"

H
Haojun Liao 已提交
20
#define MAX_CACHE_TABLE_INFO_NUM 10240
L
liuyao 已提交
21

22
typedef struct STableSinkInfo {
L
liuyao 已提交
23 24
  uint64_t uid;
  char     tbName[TSDB_TABLE_NAME_LEN];
25
} STableSinkInfo;
L
liuyao 已提交
26

H
Haojun Liao 已提交
27 28 29
int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
                         const char* pIdStr) {
  int32_t          totalRows = pDataBlock->info.rows;
L
Liu Jicong 已提交
30 31
  SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pEndTsCol = taosArrayGet(pDataBlock->pDataBlock, END_TS_COLUMN_INDEX);
32
  SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
33 34
  SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);

H
Haojun Liao 已提交
35
  tqDebug("s-task:%s build %d rows delete msg for table:%s", pIdStr, totalRows, stbFullName);
36

H
Haojun Liao 已提交
37 38 39
  for (int32_t row = 0; row < totalRows; row++) {
    int64_t skey = *(int64_t*)colDataGetData(pStartTsCol, row);
    int64_t ekey = *(int64_t*)colDataGetData(pEndTsCol, row);
40
    int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);
H
Haojun Liao 已提交
41

42
    char*   name;
43
    void*   varTbName = NULL;
H
Haojun Liao 已提交
44
    if (!colDataIsNull(pTbNameCol, totalRows, row, NULL)) {
45 46 47
      varTbName = colDataGetVarData(pTbNameCol, row);
    }

48 49 50 51 52 53
    if (varTbName != NULL && varTbName != (void*)-1) {
      name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
      memcpy(name, varDataVal(varTbName), varDataLen(varTbName));
    } else {
      name = buildCtbNameByGroupId(stbFullName, groupId);
    }
54

H
Haojun Liao 已提交
55 56 57 58
    tqDebug("s-task:%s build delete msg groupId:%" PRId64 ", name:%s, skey:%" PRId64 " ekey:%" PRId64,
            pIdStr, groupId, name, skey, ekey);

    SSingleDeleteReq req = { .startTs = skey, .endTs = ekey};
L
Liu Jicong 已提交
59
    strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN - 1);
60
    taosMemoryFree(name);
H
Haojun Liao 已提交
61

62 63
    taosArrayPush(deleteReq->deleteReqs, &req);
  }
H
Haojun Liao 已提交
64

65 66 67
  return 0;
}

68
static int32_t encodeCreateChildTableForRPC(SVCreateTbBatchReq* pReqs, int32_t vgId, void** pBuf, int32_t* contLen) {
69
  int32_t ret = 0;
5
54liuyao 已提交
70

71
  tEncodeSize(tEncodeSVCreateTbBatchReq, pReqs, *contLen, ret);
5
54liuyao 已提交
72 73 74 75 76 77 78 79 80 81 82 83 84
  if (ret < 0) {
    ret = -1;
    goto end;
  }
  *contLen += sizeof(SMsgHead);
  *pBuf = rpcMallocCont(*contLen);
  if (NULL == *pBuf) {
    ret = -1;
    goto end;
  }
  ((SMsgHead*)(*pBuf))->vgId = vgId;
  ((SMsgHead*)(*pBuf))->contLen = htonl(*contLen);
  SEncoder coder = {0};
85
  tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead));
86
  if (tEncodeSVCreateTbBatchReq(&coder, pReqs) < 0) {
5
54liuyao 已提交
87 88 89 90 91 92 93 94 95 96 97 98 99
    rpcFreeCont(*pBuf);
    *pBuf = NULL;
    *contLen = 0;
    tEncoderClear(&coder);
    ret = -1;
    goto end;
  }
  tEncoderClear(&coder);

end:
  return ret;
}

100 101
static int32_t tqGetTableInfo(SSHashObj* pTableInfoMap,uint64_t groupId, STableSinkInfo** pInfo) {
  void* pVal = tSimpleHashGet(pTableInfoMap, &groupId, sizeof(uint64_t));
L
liuyao 已提交
102
  if (pVal) {
103
    *pInfo = *(STableSinkInfo**)pVal;
L
liuyao 已提交
104 105
    return TSDB_CODE_SUCCESS;
  }
106

L
liuyao 已提交
107 108 109
  return TSDB_CODE_FAILED;
}

110
int32_t tqPutTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STableSinkInfo* pTbl) {
H
Haojun Liao 已提交
111 112
  if (tSimpleHashGetSize(tblInfo) > MAX_CACHE_TABLE_INFO_NUM) {
    return TSDB_CODE_FAILED;
L
liuyao 已提交
113
  }
H
Haojun Liao 已提交
114

L
liuyao 已提交
115 116 117
  return tSimpleHashPut(tblInfo, &groupId, sizeof(uint64_t), &pTbl, POINTER_BYTES);
}

118
int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
5
54liuyao 已提交
119 120
  void*   buf = NULL;
  int32_t tlen = 0;
121
  encodeCreateChildTableForRPC(pReqs, TD_VID(pVnode), &buf, &tlen);
5
54liuyao 已提交
122

H
Haojun Liao 已提交
123
  SRpcMsg msg = { .msgType = TDMT_VND_CREATE_TABLE, .pCont = buf, .contLen = tlen };
5
54liuyao 已提交
124 125 126 127 128 129
  if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
    tqError("failed to put into write-queue since %s", terrstr());
  }

  return TSDB_CODE_SUCCESS;
}
130

H
Haojun Liao 已提交
131
void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
L
Liu Jicong 已提交
132 133 134 135 136
  const SArray* pBlocks = (const SArray*)data;
  SVnode*       pVnode = (SVnode*)vnode;
  int64_t       suid = pTask->tbSink.stbUid;
  char*         stbFullName = pTask->tbSink.stbFullName;
  STSchema*     pTSchema = pTask->tbSink.pTSchema;
L
Liu Jicong 已提交
137 138 139

  int32_t blockSz = taosArrayGetSize(pBlocks);

140
  tqDebug("vgId:%d, s-task:%s write results %d blocks into table", TD_VID(pVnode), pTask->id.idStr, blockSz);
L
Liu Jicong 已提交
141

L
Liu Jicong 已提交
142 143 144
  void*   pBuf = NULL;
  SArray* tagArray = NULL;
  SArray* pVals = NULL;
145
  SArray* crTblArray = NULL;
L
Liu Jicong 已提交
146 147 148

  for (int32_t i = 0; i < blockSz; i++) {
    SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
5
54liuyao 已提交
149
    int32_t      rows = pDataBlock->info.rows;
H
Haojun Liao 已提交
150

L
Liu Jicong 已提交
151
    if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
H
Haojun Liao 已提交
152 153 154
      SBatchDeleteReq deleteReq = {.suid = suid, .deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq))};

      tqBuildDeleteReq(stbFullName, pDataBlock, &deleteReq, pTask->id.idStr);
L
Liu Jicong 已提交
155 156 157 158 159 160 161 162
      if (taosArrayGetSize(deleteReq.deleteReqs) == 0) {
        taosArrayDestroy(deleteReq.deleteReqs);
        continue;
      }

      int32_t len;
      int32_t code;
      tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
H
Haojun Liao 已提交
163 164
      if (code != TSDB_CODE_SUCCESS) {
        qError("s-task:%s failed to encode delete request", pTask->id.idStr);
L
Liu Jicong 已提交
165
      }
H
Haojun Liao 已提交
166

L
Liu Jicong 已提交
167 168 169 170 171 172 173 174 175 176
      SEncoder encoder;
      void*    serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
      void*    abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
      tEncoderInit(&encoder, abuf, len);
      tEncodeSBatchDeleteReq(&encoder, &deleteReq);
      tEncoderClear(&encoder);
      taosArrayDestroy(deleteReq.deleteReqs);

      ((SMsgHead*)serializedDeleteReq)->vgId = pVnode->config.vgId;

H
Haojun Liao 已提交
177
      SRpcMsg msg = { .msgType = TDMT_VND_BATCH_DEL, .pCont = serializedDeleteReq, .contLen = len + sizeof(SMsgHead) };
L
Liu Jicong 已提交
178 179 180
      if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
        tqDebug("failed to put delete req into write-queue since %s", terrstr());
      }
5
54liuyao 已提交
181
    } else if (pDataBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
182 183 184 185 186
      SVCreateTbBatchReq reqs = {0};
      crTblArray = reqs.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
      if (NULL == reqs.pArray) {
        goto _end;
      }
H
Haojun Liao 已提交
187

5
54liuyao 已提交
188
      for (int32_t rowId = 0; rowId < rows; rowId++) {
189
        SVCreateTbReq  createTbReq = {0};
190
        SVCreateTbReq* pCreateTbReq = &createTbReq;
5
54liuyao 已提交
191 192 193 194 195 196 197 198 199

        // set const
        pCreateTbReq->flags = 0;
        pCreateTbReq->type = TSDB_CHILD_TABLE;
        pCreateTbReq->ctb.suid = suid;

        // set super table name
        SName name = {0};
        tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
200
        pCreateTbReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name));  // taosStrdup(stbFullName);
5
54liuyao 已提交
201 202 203 204 205 206

        // set tag content
        int32_t size = taosArrayGetSize(pDataBlock->pDataBlock);
        if (size == 2) {
          tagArray = taosArrayInit(1, sizeof(STagVal));
          if (!tagArray) {
wmmhello's avatar
wmmhello 已提交
207
            tdDestroySVCreateTbReq(pCreateTbReq);
5
54liuyao 已提交
208 209
            goto _end;
          }
H
Haojun Liao 已提交
210

5
54liuyao 已提交
211 212 213 214 215
          STagVal tagVal = {
              .cid = pTSchema->numOfCols + 1,
              .type = TSDB_DATA_TYPE_UBIGINT,
              .i64 = (int64_t)pDataBlock->info.id.groupId,
          };
H
Haojun Liao 已提交
216

5
54liuyao 已提交
217 218 219 220 221 222 223 224 225 226
          taosArrayPush(tagArray, &tagVal);

          // set tag name
          SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
          char    tagNameStr[TSDB_COL_NAME_LEN] = "group_id";
          taosArrayPush(tagName, tagNameStr);
          pCreateTbReq->ctb.tagName = tagName;
        } else {
          tagArray = taosArrayInit(size - 1, sizeof(STagVal));
          if (!tagArray) {
wmmhello's avatar
wmmhello 已提交
227
            tdDestroySVCreateTbReq(pCreateTbReq);
5
54liuyao 已提交
228 229 230 231
            goto _end;
          }
          for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) {
            SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId);
232 233 234

            STagVal tagVal = {.cid = pTSchema->numOfCols + step, .type = pTagData->info.type};
            void*   pData = colDataGetData(pTagData, rowId);
5
54liuyao 已提交
235
            if (colDataIsNull_s(pTagData, rowId)) {
5
54liuyao 已提交
236
              continue;
5
54liuyao 已提交
237 238 239 240 241 242 243 244 245
            } else if (IS_VAR_DATA_TYPE(pTagData->info.type)) {
              tagVal.nData = varDataLen(pData);
              tagVal.pData = varDataVal(pData);
            } else {
              memcpy(&tagVal.i64, pData, pTagData->info.bytes);
            }
            taosArrayPush(tagArray, &tagVal);
          }
        }
5
54liuyao 已提交
246
        pCreateTbReq->ctb.tagNum = TMAX(size - UD_TAG_COLUMN_INDEX, 1);
5
54liuyao 已提交
247 248 249 250 251

        STag* pTag = NULL;
        tTagNew(tagArray, 1, false, &pTag);
        tagArray = taosArrayDestroy(tagArray);
        if (pTag == NULL) {
wmmhello's avatar
wmmhello 已提交
252
          tdDestroySVCreateTbReq(pCreateTbReq);
5
54liuyao 已提交
253 254 255
          terrno = TSDB_CODE_OUT_OF_MEMORY;
          goto _end;
        }
256

257

5
54liuyao 已提交
258 259 260
        pCreateTbReq->ctb.pTag = (uint8_t*)pTag;

        // set table name
5
54liuyao 已提交
261
        if (!pDataBlock->info.parTbName[0]) {
5
54liuyao 已提交
262
          SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
263
          void*            pGpIdData = colDataGetData(pGpIdColInfo, rowId);
5
54liuyao 已提交
264 265
          pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, *(uint64_t*)pGpIdData);
        } else {
266
          pCreateTbReq->name = taosStrdup(pDataBlock->info.parTbName);
5
54liuyao 已提交
267
        }
268
        taosArrayPush(reqs.pArray, pCreateTbReq);
5
54liuyao 已提交
269
      }
270 271 272 273 274 275 276
      reqs.nReqs = taosArrayGetSize(reqs.pArray);
      if (tqPutReqToQueue(pVnode, &reqs) != TSDB_CODE_SUCCESS) {
        goto _end;
      }
      tagArray = taosArrayDestroy(tagArray);
      taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq);
      crTblArray = NULL;
L
Liu Jicong 已提交
277
    } else {
L
Liu Jicong 已提交
278
      SSubmitTbData tbData = {0};
279
      tqDebug("tq sink pipe, convert block:%d, rows:%d", i, rows);
L
Liu Jicong 已提交
280 281

      if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
L
Liu Jicong 已提交
282 283
        goto _end;
      }
L
Liu Jicong 已提交
284 285 286 287 288

      tbData.suid = suid;
      tbData.uid = 0;  // uid is assigned by vnode
      tbData.sver = pTSchema->version;

289 290
      STableSinkInfo* pTableSinkInfo = NULL;
      int32_t res = tqGetTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, &pTableSinkInfo);
L
liuyao 已提交
291
      if (res != TSDB_CODE_SUCCESS) {
292
        pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo));
L
Liu Jicong 已提交
293 294
      }

L
liuyao 已提交
295 296 297
      char* ctbName = pDataBlock->info.parTbName;
      if (!ctbName[0]) {
        if (res == TSDB_CODE_SUCCESS) {
298
          memcpy(ctbName, pTableSinkInfo->tbName, strlen(pTableSinkInfo->tbName));
L
liuyao 已提交
299 300 301
        } else {
          char* tmp = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId);
          memcpy(ctbName, tmp, strlen(tmp));
302
          memcpy(pTableSinkInfo->tbName, tmp, strlen(tmp));
L
liuyao 已提交
303
          taosMemoryFree(tmp);
304
          tqDebug("vgId:%d, gropuId:%" PRIu64 " datablock table name is null", TD_VID(pVnode),
L
liuyao 已提交
305 306 307
                  pDataBlock->info.id.groupId);
        }
      }
L
Liu Jicong 已提交
308

L
liuyao 已提交
309
      if (res == TSDB_CODE_SUCCESS) {
310
        tbData.uid = pTableSinkInfo->uid;
L
liuyao 已提交
311 312 313 314 315
      } else {
        SMetaReader mr = {0};
        metaReaderInit(&mr, pVnode->pMeta, 0);
        if (metaGetTableEntryByName(&mr, ctbName) < 0) {
          metaReaderClear(&mr);
316
          taosMemoryFree(pTableSinkInfo);
L
liuyao 已提交
317
          tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName);
L
Liu Jicong 已提交
318

L
liuyao 已提交
319
          SVCreateTbReq* pCreateTbReq = NULL;
L
Liu Jicong 已提交
320

L
liuyao 已提交
321 322 323
          if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) {
            goto _end;
          };
L
Liu Jicong 已提交
324

L
liuyao 已提交
325 326 327 328
          // set const
          pCreateTbReq->flags = 0;
          pCreateTbReq->type = TSDB_CHILD_TABLE;
          pCreateTbReq->ctb.suid = suid;
L
Liu Jicong 已提交
329

L
liuyao 已提交
330 331 332 333
          // set super table name
          SName name = {0};
          tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
          pCreateTbReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name));  // taosStrdup(stbFullName);
L
Liu Jicong 已提交
334

L
liuyao 已提交
335 336 337 338 339 340 341 342 343 344 345 346 347
          // set tag content
          tagArray = taosArrayInit(1, sizeof(STagVal));
          if (!tagArray) {
            tdDestroySVCreateTbReq(pCreateTbReq);
            goto _end;
          }
          STagVal tagVal = {
              .cid = pTSchema->numOfCols + 1,
              .type = TSDB_DATA_TYPE_UBIGINT,
              .i64 = (int64_t)pDataBlock->info.id.groupId,
          };
          taosArrayPush(tagArray, &tagVal);
          pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray);
L
Liu Jicong 已提交
348

L
liuyao 已提交
349 350 351 352 353 354 355 356 357
          STag* pTag = NULL;
          tTagNew(tagArray, 1, false, &pTag);
          tagArray = taosArrayDestroy(tagArray);
          if (pTag == NULL) {
            tdDestroySVCreateTbReq(pCreateTbReq);
            terrno = TSDB_CODE_OUT_OF_MEMORY;
            goto _end;
          }
          pCreateTbReq->ctb.pTag = (uint8_t*)pTag;
L
Liu Jicong 已提交
358

L
liuyao 已提交
359 360 361 362 363 364
          // set tag name
          SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
          char    tagNameStr[TSDB_COL_NAME_LEN] = {0};
          strcpy(tagNameStr, "group_id");
          taosArrayPush(tagName, tagNameStr);
          pCreateTbReq->ctb.tagName = tagName;
L
Liu Jicong 已提交
365

L
liuyao 已提交
366 367 368 369 370 371 372 373 374 375
          // set table name
          pCreateTbReq->name = taosStrdup(ctbName);

          tbData.pCreateTbReq = pCreateTbReq;
          tbData.flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
        } else {
          if (mr.me.type != TSDB_CHILD_TABLE) {
            tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName,
                    mr.me.type);
            metaReaderClear(&mr);
376
            taosMemoryFree(pTableSinkInfo);
L
liuyao 已提交
377 378 379 380 381 382 383 384
            continue;
          }

          if (mr.me.ctbEntry.suid != suid) {
            tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid: %" PRId64
                    ", actual suid %" PRId64 "",
                    TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid);
            metaReaderClear(&mr);
385
            taosMemoryFree(pTableSinkInfo);
L
liuyao 已提交
386 387
            continue;
          }
L
Liu Jicong 已提交
388

L
liuyao 已提交
389
          tbData.uid = mr.me.uid;
390
          pTableSinkInfo->uid = mr.me.uid;
H
Haojun Liao 已提交
391 392 393 394
          int32_t code = tqPutTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, pTableSinkInfo);
          if (code != TSDB_CODE_SUCCESS) {
            taosMemoryFreeClear(pTableSinkInfo);
          }
L
Liu Jicong 已提交
395 396
          metaReaderClear(&mr);
        }
L
Liu Jicong 已提交
397 398
      }

L
Liu Jicong 已提交
399
      // rows
L
Liu Jicong 已提交
400
      if (!pVals && !(pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)))) {
L
Liu Jicong 已提交
401
        taosArrayDestroy(tbData.aRowP);
wmmhello's avatar
wmmhello 已提交
402
        tdDestroySVCreateTbReq(tbData.pCreateTbReq);
L
Liu Jicong 已提交
403 404 405 406 407
        goto _end;
      }

      for (int32_t j = 0; j < rows; j++) {
        taosArrayClear(pVals);
408
        int32_t dataIndex = 0;
L
Liu Jicong 已提交
409
        for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
410
          const STColumn* pCol = &pTSchema->columns[k];
L
Liu Jicong 已提交
411
          if (k == 0) {
412
            SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
413
            void*            colData = colDataGetData(pColData, j);
L
Liu Jicong 已提交
414 415
            tqDebug("tq sink pipe2, row %d, col %d ts %" PRId64, j, k, *(int64_t*)colData);
          }
416
          if (IS_SET_NULL(pCol)) {
L
Liu Jicong 已提交
417 418
            SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
            taosArrayPush(pVals, &cv);
419
          } else {
420 421 422
            SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
            if (colDataIsNull_s(pColData, j)) {
              SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
L
Liu Jicong 已提交
423
              taosArrayPush(pVals, &cv);
424
              dataIndex++;
L
Liu Jicong 已提交
425
            } else {
426 427
              void* colData = colDataGetData(pColData, j);
              if (IS_STR_DATA_TYPE(pCol->type)) {
H
Haojun Liao 已提交
428 429
                // address copy, no value
                SValue  sv = (SValue){.nData = varDataLen(colData), .pData = varDataVal(colData)};
430 431 432 433 434 435 436 437 438
                SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
                taosArrayPush(pVals, &cv);
              } else {
                SValue sv;
                memcpy(&sv.val, colData, tDataTypes[pCol->type].bytes);
                SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
                taosArrayPush(pVals, &cv);
              }
              dataIndex++;
L
Liu Jicong 已提交
439 440 441 442 443
            }
          }
        }
        SRow* pRow = NULL;
        if ((terrno = tRowBuild(pVals, (STSchema*)pTSchema, &pRow)) < 0) {
444
          tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
L
Liu Jicong 已提交
445 446 447
          goto _end;
        }
        ASSERT(pRow);
L
Liu Jicong 已提交
448
        taosArrayPush(tbData.aRowP, &pRow);
L
Liu Jicong 已提交
449 450
      }

L
Liu Jicong 已提交
451 452
      SSubmitReq2 submitReq = {0};
      if (!(submitReq.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
453
        tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
L
Liu Jicong 已提交
454 455 456 457
        goto _end;
      }

      taosArrayPush(submitReq.aSubmitTbData, &tbData);
L
Liu Jicong 已提交
458 459

      // encode
L
Liu Jicong 已提交
460
      int32_t len;
L
Liu Jicong 已提交
461
      int32_t code;
462
      tEncodeSize(tEncodeSubmitReq, &submitReq, len, code);
L
Liu Jicong 已提交
463
      SEncoder encoder;
464
      len += sizeof(SSubmitReq2Msg);
L
Liu Jicong 已提交
465 466
      pBuf = rpcMallocCont(len);
      if (NULL == pBuf) {
467
        tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
L
Liu Jicong 已提交
468 469
        goto _end;
      }
470 471 472 473
      ((SSubmitReq2Msg*)pBuf)->header.vgId = TD_VID(pVnode);
      ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
      ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
      tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
474
      if (tEncodeSubmitReq(&encoder, &submitReq) < 0) {
L
Liu Jicong 已提交
475 476
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        tqError("failed to encode submit req since %s", terrstr());
L
Liu Jicong 已提交
477 478
        tEncoderClear(&encoder);
        rpcFreeCont(pBuf);
479
        tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
L
Liu Jicong 已提交
480
        continue;
L
Liu Jicong 已提交
481 482
      }
      tEncoderClear(&encoder);
483
      tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
L
Liu Jicong 已提交
484

485
      SRpcMsg msg = { .msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len };
L
Liu Jicong 已提交
486 487 488 489 490
      if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
        tqDebug("failed to put into write-queue since %s", terrstr());
      }
    }
  }
491 492 493

  tqDebug("vgId:%d, s-task:%s write results completed", TD_VID(pVnode), pTask->id.idStr);

L
Liu Jicong 已提交
494 495 496
_end:
  taosArrayDestroy(tagArray);
  taosArrayDestroy(pVals);
497
  taosArrayDestroyEx(crTblArray, (FDelete)tdDestroySVCreateTbReq);
L
Liu Jicong 已提交
498 499
  // TODO: change
}