tqSink.c 6.4 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "tq.h"

C
Cary Xu 已提交
18
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid,
19
                            const char* stbFullName, int32_t vgId) {
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
  SSubmitReq* ret = NULL;
  SArray*     tagArray = taosArrayInit(1, sizeof(STagVal));
  if (!tagArray) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  // cal size
  int32_t cap = sizeof(SSubmitReq);
  int32_t sz = taosArrayGetSize(pBlocks);
  for (int32_t i = 0; i < sz; i++) {
    SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
    int32_t      rows = pDataBlock->info.rows;
    // TODO min
    int32_t rowSize = pDataBlock->info.rowSize;
    int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);
    int32_t schemaLen = 0;

    if (createTb) {
      SVCreateTbReq createTbReq = {0};
      char*         cname = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId);
      createTbReq.name = cname;
      createTbReq.flags = 0;
      createTbReq.type = TSDB_CHILD_TABLE;
      createTbReq.ctb.suid = suid;

L
Liu Jicong 已提交
46
      STagVal tagVal = {
47
          .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
L
Liu Jicong 已提交
48 49 50 51
          .type = TSDB_DATA_TYPE_UBIGINT,
          .i64 = (int64_t)pDataBlock->info.groupId,
      };
      STag* pTag = NULL;
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
      taosArrayClear(tagArray);
      taosArrayPush(tagArray, &tagVal);
      tTagNew(tagArray, 1, false, &pTag);
      if (pTag == NULL) {
        tdDestroySVCreateTbReq(&createTbReq);
        taosArrayDestroy(tagArray);
        return NULL;
      }
      createTbReq.ctb.pTag = (uint8_t*)pTag;

      int32_t code;
      tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);

      tdDestroySVCreateTbReq(&createTbReq);
      if (code < 0) {
        taosArrayDestroy(tagArray);
        return NULL;
      }
    }

    cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen;
  }

  // assign data
  // TODO
  ret = rpcMallocCont(cap);
  ret->header.vgId = vgId;
  ret->version = htonl(1);
  ret->length = sizeof(SSubmitReq);
  ret->numOfBlocks = htonl(sz);

83
  SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq));
84 85 86 87 88 89 90 91 92 93 94 95
  for (int32_t i = 0; i < sz; i++) {
    SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);

    blkHead->numOfRows = htons(pDataBlock->info.rows);
    blkHead->sversion = htonl(pTSchema->version);
    // TODO
    blkHead->suid = htobe64(suid);
    // uid is assigned by vnode
    blkHead->uid = 0;

    int32_t rows = pDataBlock->info.rows;

96
    int32_t dataLen = 0;
97

98
    void* blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk));
99 100 101 102 103 104 105 106 107 108

    int32_t schemaLen = 0;
    if (createTb) {
      SVCreateTbReq createTbReq = {0};
      char*         cname = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId);
      createTbReq.name = cname;
      createTbReq.flags = 0;
      createTbReq.type = TSDB_CHILD_TABLE;
      createTbReq.ctb.suid = suid;

L
Liu Jicong 已提交
109
      STagVal tagVal = {
110
          .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
L
Liu Jicong 已提交
111 112 113
          .type = TSDB_DATA_TYPE_UBIGINT,
          .i64 = (int64_t)pDataBlock->info.groupId,
      };
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
      taosArrayClear(tagArray);
      taosArrayPush(tagArray, &tagVal);
      STag* pTag = NULL;
      tTagNew(tagArray, 1, false, &pTag);
      if (pTag == NULL) {
        tdDestroySVCreateTbReq(&createTbReq);
        taosArrayDestroy(tagArray);
        taosMemoryFreeClear(ret);
        return NULL;
      }
      createTbReq.ctb.pTag = (uint8_t*)pTag;

      int32_t code;
      tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
      if (code < 0) {
        tdDestroySVCreateTbReq(&createTbReq);
        taosArrayDestroy(tagArray);
        taosMemoryFreeClear(ret);
        return NULL;
      }

      SEncoder encoder = {0};
136
      tEncoderInit(&encoder, blkSchema, schemaLen);
137 138 139 140 141 142 143 144 145 146 147 148
      code = tEncodeSVCreateTbReq(&encoder, &createTbReq);
      tEncoderClear(&encoder);
      tdDestroySVCreateTbReq(&createTbReq);

      if (code < 0) {
        taosArrayDestroy(tagArray);
        taosMemoryFreeClear(ret);
        return NULL;
      }
    }
    blkHead->schemaLen = htonl(schemaLen);

149
    STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168

    for (int32_t j = 0; j < rows; j++) {
      SRowBuilder rb = {0};
      tdSRowInit(&rb, pTSchema->version);
      tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen);
      tdSRowResetBuf(&rb, rowData);

      for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
        const STColumn*  pColumn = &pTSchema->columns[k];
        SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k);
        if (colDataIsNull_s(pColData, j)) {
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, pColumn->offset, k);
        } else {
          void* data = colDataGetData(pColData, j);
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k);
        }
      }
      int32_t rowLen = TD_ROW_LEN(rowData);
      rowData = POINTER_SHIFT(rowData, rowLen);
169
      dataLen += rowLen;
170 171 172 173
    }
    blkHead->dataLen = htonl(dataLen);

    ret->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
174
    blkHead = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk) + schemaLen + dataLen);
175 176 177 178 179 180 181
  }

  ret->length = htonl(ret->length);
  taosArrayDestroy(tagArray);
  return ret;
}

L
Liu Jicong 已提交
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
  const SArray* pRes = (const SArray*)data;
  SVnode*       pVnode = (SVnode*)vnode;

  ASSERT(pTask->tbSink.pTSchema);
  SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
                                     pTask->tbSink.stbFullName, pVnode->config.vgId);
  /*tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);*/
  // build write msg
  SRpcMsg msg = {
      .msgType = TDMT_VND_SUBMIT,
      .pCont = pReq,
      .contLen = ntohl(pReq->length),
  };

  ASSERT(tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) == 0);
}