tqSink.c 10.6 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16 17
#include "tcommon.h"
#include "tmsg.h"
L
Liu Jicong 已提交
18 19
#include "tq.h"

L
Liu Jicong 已提交
20
int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBlock* pDataBlock,
21 22 23 24 25 26 27
                         SBatchDeleteReq* deleteReq) {
  ASSERT(pDataBlock->info.type == STREAM_DELETE_RESULT);
  int32_t          totRow = pDataBlock->info.rows;
  SColumnInfoData* pTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX);
  SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
  for (int32_t row = 0; row < totRow; row++) {
    int64_t ts = *(int64_t*)colDataGetData(pTsCol, row);
28
    int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);
29
    char*   name = buildCtbNameByGroupId(stbFullName, groupId);
L
Liu Jicong 已提交
30
    tqDebug("stream delete msg: groupId :%ld, name: %s", groupId, name);
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
    SMetaReader mr = {0};
    metaReaderInit(&mr, pVnode->pMeta, 0);
    if (metaGetTableEntryByName(&mr, name) < 0) {
      metaReaderClear(&mr);
      taosMemoryFree(name);
      return -1;
    }

    int64_t uid = mr.me.uid;
    metaReaderClear(&mr);
    taosMemoryFree(name);
    SSingleDeleteReq req = {
        .ts = ts,
        .uid = uid,
    };
    taosArrayPush(deleteReq->deleteReqs, &req);
  }
  return 0;
}

51
SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, SSchemaWrapper* pTagSchemaWrapper, bool createTb,
52
                            int64_t suid, const char* stbFullName, SBatchDeleteReq* pDeleteReq) {
53
  SSubmitReq* ret = NULL;
54 55
  SArray*     schemaReqs = NULL;
  SArray*     schemaReqSz = NULL;
56 57 58 59 60 61 62 63
  SArray*     tagArray = taosArrayInit(1, sizeof(STagVal));
  if (!tagArray) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return NULL;
  }

  int32_t sz = taosArrayGetSize(pBlocks);

64 65 66 67 68
  if (createTb) {
    schemaReqs = taosArrayInit(sz, sizeof(void*));
    schemaReqSz = taosArrayInit(sz, sizeof(int32_t));
    for (int32_t i = 0; i < sz; i++) {
      SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
69 70
      if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
        int32_t padding1 = 0;
L
Liu Jicong 已提交
71
        void*   padding2 = NULL;
72 73
        taosArrayPush(schemaReqSz, &padding1);
        taosArrayPush(schemaReqs, &padding2);
L
Liu Jicong 已提交
74
        continue;
75
      }
76

77 78 79 80
      STagVal tagVal = {
          .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
          .type = TSDB_DATA_TYPE_UBIGINT,
          .i64 = (int64_t)pDataBlock->info.groupId,
L
Liu Jicong 已提交
81 82
      };
      STag* pTag = NULL;
83 84 85 86
      taosArrayClear(tagArray);
      taosArrayPush(tagArray, &tagVal);
      tTagNew(tagArray, 1, false, &pTag);
      if (pTag == NULL) {
L
Liu Jicong 已提交
87
        terrno = TSDB_CODE_OUT_OF_MEMORY;
88 89 90
        taosArrayDestroy(tagArray);
        return NULL;
      }
91

92
      SArray *tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
93 94 95
      char     tagNameStr[TSDB_COL_NAME_LEN] = {0};
      strcpy(tagNameStr, "group_id");
      taosArrayPush(tagName, tagNameStr);
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117

//      STag* pTag = NULL;
//      taosArrayClear(tagArray);
//      SArray *tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
//      for(int j = 0; j < pTagSchemaWrapper->nCols; j++){
//        STagVal tagVal = {
//            .cid = pTagSchemaWrapper->pSchema[j].colId,
//            .type = pTagSchemaWrapper->pSchema[j].type,
//            .i64 = (int64_t)pDataBlock->info.groupId,
//        };
//        taosArrayPush(tagArray, &tagVal);
//        taosArrayPush(tagName, pTagSchemaWrapper->pSchema[j].name);
//      }
//
//      tTagNew(tagArray, 1, false, &pTag);
//      if (pTag == NULL) {
//        terrno = TSDB_CODE_OUT_OF_MEMORY;
//        taosArrayDestroy(tagArray);
//        taosArrayDestroy(tagName);
//        return NULL;
//      }

118
      SVCreateTbReq createTbReq = {0};
119 120 121
      SName         name = {0};
      tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);

122
      createTbReq.name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId);
123
      createTbReq.ctb.name = strdup((char*)tNameGetTableName(&name));  // strdup(stbFullName);
124 125 126
      createTbReq.flags = 0;
      createTbReq.type = TSDB_CHILD_TABLE;
      createTbReq.ctb.suid = suid;
127
      createTbReq.ctb.pTag = (uint8_t*)pTag;
128 129
      createTbReq.ctb.tagNum = taosArrayGetSize(tagArray);
      createTbReq.ctb.tagName = tagName;
130 131

      int32_t code;
132
      int32_t schemaLen;
133 134
      tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
      if (code < 0) {
135
        tdDestroySVCreateTbReq(&createTbReq);
136
        taosArrayDestroy(tagArray);
137 138 139 140 141 142 143
        taosMemoryFreeClear(ret);
        return NULL;
      }

      void* schemaStr = taosMemoryMalloc(schemaLen);
      if (schemaStr == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
144
        tdDestroySVCreateTbReq(&createTbReq);
145 146 147 148 149 150 151 152 153 154
        return NULL;
      }
      taosArrayPush(schemaReqs, &schemaStr);
      taosArrayPush(schemaReqSz, &schemaLen);

      SEncoder encoder = {0};
      tEncoderInit(&encoder, schemaStr, schemaLen);
      code = tEncodeSVCreateTbReq(&encoder, &createTbReq);
      if (code < 0) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
155
        tdDestroySVCreateTbReq(&createTbReq);
156 157
        return NULL;
      }
158 159
      tEncoderClear(&encoder);
      tdDestroySVCreateTbReq(&createTbReq);
160
    }
161 162
  }
  taosArrayDestroy(tagArray);
163

164 165 166 167
  // cal size
  int32_t cap = sizeof(SSubmitReq);
  for (int32_t i = 0; i < sz; i++) {
    SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
168 169 170 171
    if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
      continue;
    }
    int32_t rows = pDataBlock->info.rows;
L
Liu Jicong 已提交
172
    /*int32_t rowSize = pDataBlock->info.rowSize;*/
173 174 175 176 177 178
    int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);

    int32_t schemaLen = 0;
    if (createTb) {
      schemaLen = *(int32_t*)taosArrayGet(schemaReqSz, i);
    }
179 180 181 182 183
    cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen;
  }

  // assign data
  ret = rpcMallocCont(cap);
184
  ret->header.vgId = pVnode->config.vgId;
185 186 187
  ret->length = sizeof(SSubmitReq);
  ret->numOfBlocks = htonl(sz);

188
  SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq));
189 190
  for (int32_t i = 0; i < sz; i++) {
    SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
191 192
    if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
      pDeleteReq->suid = suid;
L
Liu Jicong 已提交
193
      tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, pDeleteReq);
194 195
      continue;
    }
196

197
    blkHead->numOfRows = htonl(pDataBlock->info.rows);
198 199 200 201 202 203 204
    blkHead->sversion = htonl(pTSchema->version);
    blkHead->suid = htobe64(suid);
    // uid is assigned by vnode
    blkHead->uid = 0;

    int32_t rows = pDataBlock->info.rows;

L
Liu Jicong 已提交
205 206
    tqDebug("tq sink, convert block %d, rows: %d", i, rows);

207
    int32_t dataLen = 0;
208

209
    void* blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk));
210 211 212

    int32_t schemaLen = 0;
    if (createTb) {
213 214 215
      schemaLen = *(int32_t*)taosArrayGet(schemaReqSz, i);
      void* schemaStr = taosArrayGetP(schemaReqs, i);
      memcpy(blkSchema, schemaStr, schemaLen);
216 217 218
    }
    blkHead->schemaLen = htonl(schemaLen);

219
    STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);
220 221 222 223 224 225 226 227 228 229
    for (int32_t j = 0; j < rows; j++) {
      SRowBuilder rb = {0};
      tdSRowInit(&rb, pTSchema->version);
      tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen);
      tdSRowResetBuf(&rb, rowData);

      for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
        const STColumn*  pColumn = &pTSchema->columns[k];
        SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k);
        if (colDataIsNull_s(pColData, j)) {
230
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k);
231 232 233 234 235
        } else {
          void* data = colDataGetData(pColData, j);
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k);
        }
      }
236
      tdSRowEnd(&rb);
237 238
      int32_t rowLen = TD_ROW_LEN(rowData);
      rowData = POINTER_SHIFT(rowData, rowLen);
239
      dataLen += rowLen;
240 241 242 243
    }
    blkHead->dataLen = htonl(dataLen);

    ret->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
244
    blkHead = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk) + schemaLen + dataLen);
245 246 247
  }

  ret->length = htonl(ret->length);
248 249 250 251

  if (schemaReqs) taosArrayDestroyP(schemaReqs, taosMemoryFree);
  taosArrayDestroy(schemaReqSz);

252 253 254
  return ret;
}

L
Liu Jicong 已提交
255
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
256 257 258
  const SArray*   pRes = (const SArray*)data;
  SVnode*         pVnode = (SVnode*)vnode;
  SBatchDeleteReq deleteReq = {0};
L
Liu Jicong 已提交
259

L
Liu Jicong 已提交
260
  tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, (int32_t)pRes->size);
L
Liu Jicong 已提交
261

L
Liu Jicong 已提交
262
  ASSERT(pTask->tbSink.pTSchema);
263
  deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
264
  SSubmitReq* submitReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, pTask->tbSink.pSchemaWrapper, true, pTask->tbSink.stbUid,
L
Liu Jicong 已提交
265
                                          pTask->tbSink.stbFullName, &deleteReq);
L
Liu Jicong 已提交
266 267 268

  tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId);

L
Liu Jicong 已提交
269 270 271 272 273 274 275 276 277 278 279 280 281 282
  if (taosArrayGetSize(deleteReq.deleteReqs) != 0) {
    int32_t code;
    int32_t len;
    tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
    if (code < 0) {
      //
      ASSERT(0);
    }
    SEncoder encoder;
    void*    serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
    void*    abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
    tEncoderInit(&encoder, abuf, len);
    tEncodeSBatchDeleteReq(&encoder, &deleteReq);
    tEncoderClear(&encoder);
283

L
Liu Jicong 已提交
284
    ((SMsgHead*)serializedDeleteReq)->vgId = pVnode->config.vgId;
285

286 287
    SRpcMsg msg = {
        .msgType = TDMT_VND_BATCH_DEL,
L
Liu Jicong 已提交
288
        .pCont = serializedDeleteReq,
289 290 291
        .contLen = len + sizeof(SMsgHead),
    };
    if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
L
Liu Jicong 已提交
292
      rpcFreeCont(serializedDeleteReq);
293 294 295 296 297
      tqDebug("failed to put into write-queue since %s", terrstr());
    }
  }
  taosArrayDestroy(deleteReq.deleteReqs);

L
Liu Jicong 已提交
298 299 300 301
  /*tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema);*/
  // build write msg
  SRpcMsg msg = {
      .msgType = TDMT_VND_SUBMIT,
L
Liu Jicong 已提交
302 303
      .pCont = submitReq,
      .contLen = ntohl(submitReq->length),
L
Liu Jicong 已提交
304 305
  };

S
Shengliang Guan 已提交
306 307 308
  if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
    tqDebug("failed to put into write-queue since %s", terrstr());
  }
L
Liu Jicong 已提交
309
}