tqSink.c 9.3 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16 17
#include "tcommon.h"
#include "tmsg.h"
L
Liu Jicong 已提交
18 19
#include "tq.h"

20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
int32_t tdBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBlock* pDataBlock,
                         SBatchDeleteReq* deleteReq) {
  ASSERT(pDataBlock->info.type == STREAM_DELETE_RESULT);
  int32_t          totRow = pDataBlock->info.rows;
  SColumnInfoData* pTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  for (int32_t row = 0; row < totRow; row++) {
    int64_t ts = *(int64_t*)colDataGetData(pTsCol, row);
    /*int64_t     groupId = *(int64_t*)colDataGetData(pGidCol, row);*/
    int64_t groupId = 0;
    char*   name = buildCtbNameByGroupId(stbFullName, groupId);
    tqDebug("delete msg: groupId :%ld, name: %s", groupId, name);
    SMetaReader mr = {0};
    metaReaderInit(&mr, pVnode->pMeta, 0);
    if (metaGetTableEntryByName(&mr, name) < 0) {
      metaReaderClear(&mr);
      taosMemoryFree(name);
      return -1;
    }

    int64_t uid = mr.me.uid;
    metaReaderClear(&mr);
    taosMemoryFree(name);
    SSingleDeleteReq req = {
        .ts = ts,
        .uid = uid,
    };
    taosArrayPush(deleteReq->deleteReqs, &req);
  }
  return 0;
}

SSubmitReq* tdBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, bool createTb,
                            int64_t suid, const char* stbFullName, int32_t vgId, SBatchDeleteReq* pDeleteReq) {
54
  SSubmitReq* ret = NULL;
55 56
  SArray*     schemaReqs = NULL;
  SArray*     schemaReqSz = NULL;
57 58 59 60 61 62 63 64
  SArray*     tagArray = taosArrayInit(1, sizeof(STagVal));
  if (!tagArray) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  int32_t sz = taosArrayGetSize(pBlocks);

65 66 67 68 69
  if (createTb) {
    schemaReqs = taosArrayInit(sz, sizeof(void*));
    schemaReqSz = taosArrayInit(sz, sizeof(int32_t));
    for (int32_t i = 0; i < sz; i++) {
      SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
70 71 72 73 74
      if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
        int32_t padding1 = 0;
        void*   padding2 = taosMemoryMalloc(1);
        taosArrayPush(schemaReqSz, &padding1);
        taosArrayPush(schemaReqs, &padding2);
75
      }
76

77 78 79 80
      STagVal tagVal = {
          .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
          .type = TSDB_DATA_TYPE_UBIGINT,
          .i64 = (int64_t)pDataBlock->info.groupId,
L
Liu Jicong 已提交
81 82
      };
      STag* pTag = NULL;
83 84 85 86
      taosArrayClear(tagArray);
      taosArrayPush(tagArray, &tagVal);
      tTagNew(tagArray, 1, false, &pTag);
      if (pTag == NULL) {
L
Liu Jicong 已提交
87
        terrno = TSDB_CODE_OUT_OF_MEMORY;
88 89 90
        taosArrayDestroy(tagArray);
        return NULL;
      }
91 92

      SVCreateTbReq createTbReq = {0};
93 94 95
      SName         name = {0};
      tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);

96
      createTbReq.name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId);
97
      createTbReq.ctb.name = strdup((char*)tNameGetTableName(&name));  // strdup(stbFullName);
98 99 100
      createTbReq.flags = 0;
      createTbReq.type = TSDB_CHILD_TABLE;
      createTbReq.ctb.suid = suid;
101 102 103
      createTbReq.ctb.pTag = (uint8_t*)pTag;

      int32_t code;
104
      int32_t schemaLen;
105 106
      tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
      if (code < 0) {
107
        tdDestroySVCreateTbReq(&createTbReq);
108
        taosArrayDestroy(tagArray);
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
        taosMemoryFreeClear(ret);
        return NULL;
      }

      void* schemaStr = taosMemoryMalloc(schemaLen);
      if (schemaStr == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        return NULL;
      }
      taosArrayPush(schemaReqs, &schemaStr);
      taosArrayPush(schemaReqSz, &schemaLen);

      SEncoder encoder = {0};
      tEncoderInit(&encoder, schemaStr, schemaLen);
      code = tEncodeSVCreateTbReq(&encoder, &createTbReq);
      if (code < 0) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
126 127
        return NULL;
      }
128 129
      tEncoderClear(&encoder);
      tdDestroySVCreateTbReq(&createTbReq);
130
    }
131 132
  }
  taosArrayDestroy(tagArray);
133

134 135 136 137
  // cal size
  int32_t cap = sizeof(SSubmitReq);
  for (int32_t i = 0; i < sz; i++) {
    SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
138 139 140 141
    if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
      continue;
    }
    int32_t rows = pDataBlock->info.rows;
142 143 144 145 146 147 148 149
    // TODO min
    int32_t rowSize = pDataBlock->info.rowSize;
    int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);

    int32_t schemaLen = 0;
    if (createTb) {
      schemaLen = *(int32_t*)taosArrayGet(schemaReqSz, i);
    }
150 151 152 153 154 155 156 157 158 159
    cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen;
  }

  // assign data
  // TODO
  ret = rpcMallocCont(cap);
  ret->header.vgId = vgId;
  ret->length = sizeof(SSubmitReq);
  ret->numOfBlocks = htonl(sz);

160
  SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq));
161 162
  for (int32_t i = 0; i < sz; i++) {
    SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
163 164 165 166 167
    if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
      pDeleteReq->suid = suid;
      tdBuildDeleteReq(pVnode, stbFullName, pDataBlock, pDeleteReq);
      continue;
    }
168

169
    blkHead->numOfRows = htonl(pDataBlock->info.rows);
170 171 172 173 174 175 176 177
    blkHead->sversion = htonl(pTSchema->version);
    // TODO
    blkHead->suid = htobe64(suid);
    // uid is assigned by vnode
    blkHead->uid = 0;

    int32_t rows = pDataBlock->info.rows;

L
Liu Jicong 已提交
178 179
    tqDebug("tq sink, convert block %d, rows: %d", i, rows);

180
    int32_t dataLen = 0;
181

182
    void* blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk));
183 184 185

    int32_t schemaLen = 0;
    if (createTb) {
186 187 188
      schemaLen = *(int32_t*)taosArrayGet(schemaReqSz, i);
      void* schemaStr = taosArrayGetP(schemaReqs, i);
      memcpy(blkSchema, schemaStr, schemaLen);
189 190 191
    }
    blkHead->schemaLen = htonl(schemaLen);

192
    STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);
193 194 195 196 197 198 199 200 201 202
    for (int32_t j = 0; j < rows; j++) {
      SRowBuilder rb = {0};
      tdSRowInit(&rb, pTSchema->version);
      tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen);
      tdSRowResetBuf(&rb, rowData);

      for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
        const STColumn*  pColumn = &pTSchema->columns[k];
        SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k);
        if (colDataIsNull_s(pColData, j)) {
203
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k);
204 205 206 207 208 209 210
        } else {
          void* data = colDataGetData(pColData, j);
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k);
        }
      }
      int32_t rowLen = TD_ROW_LEN(rowData);
      rowData = POINTER_SHIFT(rowData, rowLen);
211
      dataLen += rowLen;
212 213 214 215
    }
    blkHead->dataLen = htonl(dataLen);

    ret->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
216
    blkHead = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk) + schemaLen + dataLen);
217 218 219
  }

  ret->length = htonl(ret->length);
220 221 222 223

  if (schemaReqs) taosArrayDestroyP(schemaReqs, taosMemoryFree);
  taosArrayDestroy(schemaReqSz);

224 225 226
  return ret;
}

L
Liu Jicong 已提交
227
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
228 229 230
  const SArray*   pRes = (const SArray*)data;
  SVnode*         pVnode = (SVnode*)vnode;
  SBatchDeleteReq deleteReq = {0};
L
Liu Jicong 已提交
231

L
Liu Jicong 已提交
232
  tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, (int32_t)pRes->size);
L
Liu Jicong 已提交
233

L
Liu Jicong 已提交
234
  ASSERT(pTask->tbSink.pTSchema);
235
  deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
236
  SSubmitReq* pReq = tdBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
237
                                     pTask->tbSink.stbFullName, pVnode->config.vgId, &deleteReq);
L
Liu Jicong 已提交
238 239 240

  tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId);

241 242 243 244 245 246 247 248
  int32_t code;
  int32_t len;
  tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
  if (code < 0) {
    //
    ASSERT(0);
  }
  SEncoder encoder;
249
  void*    buf = rpcMallocCont(len + sizeof(SMsgHead));
250 251 252 253 254
  void*    abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
  tEncoderInit(&encoder, abuf, len);
  tEncodeSBatchDeleteReq(&encoder, &deleteReq);
  tEncoderClear(&encoder);

255 256
  ((SMsgHead*)buf)->vgId = pVnode->config.vgId;

257 258 259 260 261 262 263 264 265 266 267 268
  if (taosArrayGetSize(deleteReq.deleteReqs) != 0) {
    SRpcMsg msg = {
        .msgType = TDMT_VND_BATCH_DEL,
        .pCont = buf,
        .contLen = len + sizeof(SMsgHead),
    };
    if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
      tqDebug("failed to put into write-queue since %s", terrstr());
    }
  }
  taosArrayDestroy(deleteReq.deleteReqs);

L
Liu Jicong 已提交
269 270 271 272 273 274 275 276
  /*tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);*/
  // build write msg
  SRpcMsg msg = {
      .msgType = TDMT_VND_SUBMIT,
      .pCont = pReq,
      .contLen = ntohl(pReq->length),
  };

S
Shengliang Guan 已提交
277 278 279
  if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
    tqDebug("failed to put into write-queue since %s", terrstr());
  }
L
Liu Jicong 已提交
280
}