dataInserter.c 11.2 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "dataSinkInt.h"
#include "dataSinkMgt.h"
#include "executorimpl.h"
#include "planner.h"
#include "tcompression.h"
#include "tdatablock.h"
#include "tglobal.h"
#include "tqueue.h"

extern SDataSinkStat gDataSinkStat;

D
dapan1121 已提交
27 28 29
typedef struct SSubmitRes {
  int64_t     affectedRows;
  int32_t     code;
H
Hongze Cheng 已提交
30
  SSubmitRsp* pRsp;
D
dapan1121 已提交
31
} SSubmitRes;
D
dapan1121 已提交
32 33 34 35

typedef struct SDataInserterHandle {
  SDataSinkHandle     sink;
  SDataSinkManager*   pManager;
D
dapan1121 已提交
36 37 38 39 40
  STSchema*           pSchema;
  SQueryInserterNode* pNode;
  SSubmitRes          submitRes;
  SInserterParam*     pParam;
  SArray*             pDataBlocks;
D
dapan1121 已提交
41
  SHashObj*           pCols;
D
dapan1121 已提交
42 43 44 45 46
  int32_t             status;
  bool                queryEnd;
  uint64_t            useconds;
  uint64_t            cachedSize;
  TdThreadMutex       mutex;
H
Hongze Cheng 已提交
47
  tsem_t              ready;
D
dapan1121 已提交
48 49
} SDataInserterHandle;

D
dapan1121 已提交
50 51 52
typedef struct SSubmitRspParam {
  SDataInserterHandle* pInserter;
} SSubmitRspParam;
D
dapan1121 已提交
53

D
dapan1121 已提交
54
int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) {
H
Hongze Cheng 已提交
55
  SSubmitRspParam*     pParam = (SSubmitRspParam*)param;
D
dapan1121 已提交
56
  SDataInserterHandle* pInserter = pParam->pInserter;
D
dapan1121 已提交
57

D
dapan1121 已提交
58
  pInserter->submitRes.code = code;
H
Hongze Cheng 已提交
59

D
dapan1121 已提交
60 61
  if (code == TSDB_CODE_SUCCESS) {
    pInserter->submitRes.pRsp = taosMemoryCalloc(1, sizeof(SSubmitRsp));
H
Hongze Cheng 已提交
62
    SDecoder coder = {0};
D
dapan1121 已提交
63 64 65 66 67 68 69
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
    code = tDecodeSSubmitRsp(&coder, pInserter->submitRes.pRsp);
    if (code) {
      tFreeSSubmitRsp(pInserter->submitRes.pRsp);
      pInserter->submitRes.code = code;
      goto _return;
    }
H
Hongze Cheng 已提交
70

D
dapan1121 已提交
71 72
    if (pInserter->submitRes.pRsp->nBlocks > 0) {
      for (int32_t i = 0; i < pInserter->submitRes.pRsp->nBlocks; ++i) {
H
Hongze Cheng 已提交
73
        SSubmitBlkRsp* blk = pInserter->submitRes.pRsp->pBlocks + i;
D
dapan1121 已提交
74 75 76 77 78 79 80 81
        if (TSDB_CODE_SUCCESS != blk->code) {
          code = blk->code;
          tFreeSSubmitRsp(pInserter->submitRes.pRsp);
          pInserter->submitRes.code = code;
          goto _return;
        }
      }
    }
H
Hongze Cheng 已提交
82

D
dapan1121 已提交
83
    pInserter->submitRes.affectedRows += pInserter->submitRes.pRsp->affectedRows;
H
Haojun Liao 已提交
84
    qDebug("submit rsp received, affectedRows:%d, total:%"PRId64, pInserter->submitRes.pRsp->affectedRows,
H
Hongze Cheng 已提交
85
           pInserter->submitRes.affectedRows);
D
dapan1121 已提交
86

D
dapan1121 已提交
87 88
    tFreeSSubmitRsp(pInserter->submitRes.pRsp);
  }
D
dapan1121 已提交
89

D
dapan1121 已提交
90
_return:
D
dapan1121 已提交
91

D
dapan1121 已提交
92
  tsem_post(&pInserter->ready);
D
dapan1121 已提交
93

D
dapan1121 已提交
94
  taosMemoryFree(pMsg->pData);
H
Hongze Cheng 已提交
95

D
dapan1121 已提交
96
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
97 98
}

D
dapan1121 已提交
99 100 101 102 103 104 105
static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, SSubmitReq* pMsg, void* pTransporter, SEpSet* pEpset) {
  // send the fetch remote task result reques
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (NULL == pMsgSendInfo) {
    taosMemoryFreeClear(pMsg);
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return terrno;
D
dapan1121 已提交
106 107
  }

D
dapan1121 已提交
108 109
  SSubmitRspParam* pParam = taosMemoryCalloc(1, sizeof(SSubmitRspParam));
  pParam->pInserter = pInserter;
D
dapan1121 已提交
110

D
dapan1121 已提交
111
  pMsgSendInfo->param = pParam;
H
Hongze Cheng 已提交
112
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
D
dapan1121 已提交
113
  pMsgSendInfo->msgInfo.pData = pMsg;
D
dapan1121 已提交
114
  pMsgSendInfo->msgInfo.len = ntohl(pMsg->length);
D
dapan1121 已提交
115 116
  pMsgSendInfo->msgType = TDMT_VND_SUBMIT;
  pMsgSendInfo->fp = inserterCallback;
D
dapan1121 已提交
117

D
dapan1121 已提交
118 119
  int64_t transporterId = 0;
  return asyncSendMsgToServer(pTransporter, pEpset, &transporterId, pMsgSendInfo);
D
dapan1121 已提交
120 121
}

D
dapan1121 已提交
122
int32_t dataBlockToSubmit(SDataInserterHandle* pInserter, SSubmitReq** pReq) {
H
Hongze Cheng 已提交
123 124 125 126 127 128
  const SArray*   pBlocks = pInserter->pDataBlocks;
  const STSchema* pTSchema = pInserter->pSchema;
  int64_t         uid = pInserter->pNode->tableId;
  int64_t         suid = pInserter->pNode->stableId;
  int32_t         vgId = pInserter->pNode->vgId;
  bool            fullCol = (pInserter->pNode->pCols->length == pTSchema->numOfCols);
D
dapan1121 已提交
129

D
dapan1121 已提交
130
  SSubmitReq* ret = NULL;
H
Hongze Cheng 已提交
131
  int32_t     sz = taosArrayGetSize(pBlocks);
D
dapan1121 已提交
132 133 134 135

  // cal size
  int32_t cap = sizeof(SSubmitReq);
  for (int32_t i = 0; i < sz; i++) {
D
dapan1121 已提交
136
    SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);
D
dapan1121 已提交
137 138 139 140 141 142 143 144 145 146
    int32_t      rows = pDataBlock->info.rows;
    // TODO min
    int32_t rowSize = pDataBlock->info.rowSize;
    int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);

    cap += sizeof(SSubmitBlk) + rows * maxLen;
  }

  // assign data
  // TODO
D
dapan1121 已提交
147 148
  ret = taosMemoryCalloc(1, cap);
  ret->header.vgId = htonl(vgId);
D
dapan1121 已提交
149
  ret->version = htonl(pTSchema->version);
D
dapan1121 已提交
150 151 152 153 154
  ret->length = sizeof(SSubmitReq);
  ret->numOfBlocks = htonl(sz);

  SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq));
  for (int32_t i = 0; i < sz; i++) {
D
dapan1121 已提交
155
    SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);
D
dapan1121 已提交
156 157 158 159 160 161 162

    blkHead->sversion = htonl(pTSchema->version);
    // TODO
    blkHead->suid = htobe64(suid);
    blkHead->uid = htobe64(uid);
    blkHead->schemaLen = htonl(0);

D
dapan1121 已提交
163
    int32_t rows = 0;
D
dapan1121 已提交
164 165
    int32_t dataLen = 0;
    STSRow* rowData = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk));
H
Hongze Cheng 已提交
166
    int64_t lastTs = TSKEY_MIN;
D
dapan1121 已提交
167 168
    bool    ignoreRow = false;
    for (int32_t j = 0; j < pDataBlock->info.rows; j++) {
D
dapan1121 已提交
169 170 171 172 173
      SRowBuilder rb = {0};
      tdSRowInit(&rb, pTSchema->version);
      tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen);
      tdSRowResetBuf(&rb, rowData);

D
dapan1121 已提交
174
      ignoreRow = false;
D
dapan1121 已提交
175 176
      for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
        const STColumn*  pColumn = &pTSchema->columns[k];
D
dapan1121 已提交
177
        SColumnInfoData* pColData = NULL;
H
Hongze Cheng 已提交
178
        int16_t          colIdx = k;
D
dapan1121 已提交
179
        if (!fullCol) {
H
Hongze Cheng 已提交
180
          int16_t* slotId = taosHashGet(pInserter->pCols, &pColumn->colId, sizeof(pColumn->colId));
D
dapan1121 已提交
181 182 183
          if (NULL == slotId) {
            continue;
          }
H
Hongze Cheng 已提交
184

D
dapan1121 已提交
185 186 187 188
          colIdx = *slotId;
        }

        pColData = taosArrayGet(pDataBlock->pDataBlock, colIdx);
D
dapan1121 已提交
189 190 191 192 193
        if (pColData->info.type != pColumn->type) {
          qError("col type mis-match, schema type:%d, type in block:%d", pColumn->type, pColData->info.type);
          terrno = TSDB_CODE_APP_ERROR;
          return TSDB_CODE_APP_ERROR;
        }
H
Hongze Cheng 已提交
194

D
dapan1121 已提交
195
        if (colDataIsNull_s(pColData, j)) {
D
dapan1121 已提交
196 197 198 199
          if (0 == k && TSDB_DATA_TYPE_TIMESTAMP == pColumn->type) {
            ignoreRow = true;
            break;
          }
H
Hongze Cheng 已提交
200

D
dapan1121 已提交
201
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k);
D
dapan1121 已提交
202 203
        } else {
          void* data = colDataGetData(pColData, j);
D
dapan1121 已提交
204 205 206 207 208 209 210 211
          if (0 == k && TSDB_DATA_TYPE_TIMESTAMP == pColumn->type) {
            if (*(int64_t*)data == lastTs) {
              ignoreRow = true;
              break;
            } else {
              lastTs = *(int64_t*)data;
            }
          }
D
dapan1121 已提交
212 213 214
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k);
        }
      }
H
Hongze Cheng 已提交
215
      if (!fullCol) {
C
Cary Xu 已提交
216 217
        rb.hasNone = true;
      }
218
      tdSRowEnd(&rb);
D
dapan1121 已提交
219 220 221 222

      if (ignoreRow) {
        continue;
      }
H
Hongze Cheng 已提交
223

D
dapan1121 已提交
224
      rows++;
D
dapan1121 已提交
225 226 227 228
      int32_t rowLen = TD_ROW_LEN(rowData);
      rowData = POINTER_SHIFT(rowData, rowLen);
      dataLen += rowLen;
    }
H
Hongze Cheng 已提交
229

D
dapan1121 已提交
230
    blkHead->dataLen = htonl(dataLen);
231
    blkHead->numOfRows = htonl(rows);
D
dapan1121 已提交
232 233 234 235 236 237 238

    ret->length += sizeof(SSubmitBlk) + dataLen;
    blkHead = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk) + dataLen);
  }

  ret->length = htonl(ret->length);

D
dapan1121 已提交
239 240 241
  *pReq = ret;

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
242 243
}

D
dapan1121 已提交
244 245
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
D
dapan1121 已提交
246 247
  taosArrayPush(pInserter->pDataBlocks, &pInput->pData);
  SSubmitReq* pMsg = NULL;
H
Hongze Cheng 已提交
248
  int32_t     code = dataBlockToSubmit(pInserter, &pMsg);
D
dapan1121 已提交
249 250 251
  if (code) {
    return code;
  }
D
dapan1121 已提交
252

D
dapan1121 已提交
253
  code = sendSubmitRequest(pInserter, pMsg, pInserter->pParam->readHandle->pMsgCb->clientRpc, &pInserter->pNode->epSet);
D
dapan1121 已提交
254 255
  if (code) {
    return code;
D
dapan1121 已提交
256 257
  }

D
dapan1121 已提交
258
  tsem_wait(&pInserter->ready);
D
dapan1121 已提交
259

D
dapan1121 已提交
260 261
  if (pInserter->submitRes.code) {
    return pInserter->submitRes.code;
D
dapan1121 已提交
262
  }
D
dapan1121 已提交
263 264

  *pContinue = true;
H
Hongze Cheng 已提交
265

D
dapan1121 已提交
266 267 268
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
269 270 271 272 273 274 275 276
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
  taosThreadMutexLock(&pInserter->mutex);
  pInserter->queryEnd = true;
  pInserter->useconds = useconds;
  taosThreadMutexUnlock(&pInserter->mutex);
}

D
dapan1121 已提交
277 278 279
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd) {
  SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
  *pLen = pDispatcher->submitRes.affectedRows;
H
Hongze Cheng 已提交
280
  qDebug("got total affectedRows %" PRId64, *pLen);
D
dapan1121 已提交
281 282
}

D
dapan1121 已提交
283
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
D
dapan1121 已提交
284 285 286 287
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
  atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pInserter->cachedSize);
  taosArrayDestroy(pInserter->pDataBlocks);
  taosMemoryFree(pInserter->pSchema);
D
dapan1121 已提交
288 289
  taosMemoryFree(pInserter->pParam);
  taosHashCleanup(pInserter->pCols);
D
dapan1121 已提交
290
  taosThreadMutexDestroy(&pInserter->mutex);
D
dapan1121 已提交
291 292 293 294
  return TSDB_CODE_SUCCESS;
}

static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
D
dapan1121 已提交
295
  SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
D
dapan1121 已提交
296 297 298 299 300

  *size = atomic_load_64(&pDispatcher->cachedSize);
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
301 302
int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle,
                           void* pParam) {
D
dapan1121 已提交
303 304 305 306 307 308
  SDataInserterHandle* inserter = taosMemoryCalloc(1, sizeof(SDataInserterHandle));
  if (NULL == inserter) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }

H
Hongze Cheng 已提交
309
  SQueryInserterNode* pInserterNode = (SQueryInserterNode*)pDataSink;
D
dapan1121 已提交
310 311
  inserter->sink.fPut = putDataBlock;
  inserter->sink.fEndPut = endPut;
D
dapan1121 已提交
312
  inserter->sink.fGetLen = getDataLength;
D
dapan1121 已提交
313
  inserter->sink.fGetData = NULL;
D
dapan1121 已提交
314 315 316
  inserter->sink.fDestroy = destroyDataSinker;
  inserter->sink.fGetCacheSize = getCacheSize;
  inserter->pManager = pManager;
D
dapan1121 已提交
317
  inserter->pNode = pInserterNode;
D
dapan1121 已提交
318 319 320
  inserter->pParam = pParam;
  inserter->status = DS_BUF_EMPTY;
  inserter->queryEnd = false;
D
dapan1121 已提交
321 322

  int64_t suid = 0;
H
Hongze Cheng 已提交
323 324
  int32_t code =
      tsdbGetTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid);
D
dapan1121 已提交
325
  if (code) {
D
dapan1121 已提交
326
    destroyDataSinker((SDataSinkHandle*)inserter);
D
dapan1121 已提交
327 328 329
    return code;
  }

D
dapan1121 已提交
330
  if (pInserterNode->stableId != suid) {
D
dapan1121 已提交
331 332 333 334 335
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    return terrno;
  }

  inserter->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
D
dapan1121 已提交
336 337 338 339 340
  taosThreadMutexInit(&inserter->mutex, NULL);
  if (NULL == inserter->pDataBlocks) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }
D
dapan1121 已提交
341

H
Hongze Cheng 已提交
342 343
  inserter->pCols = taosHashInit(pInserterNode->pCols->length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT),
                                 false, HASH_NO_LOCK);
D
dapan1121 已提交
344 345 346 347 348
  SNode* pNode = NULL;
  FOREACH(pNode, pInserterNode->pCols) {
    SColumnNode* pCol = (SColumnNode*)pNode;
    taosHashPut(inserter->pCols, &pCol->colId, sizeof(pCol->colId), &pCol->slotId, sizeof(pCol->slotId));
  }
H
Hongze Cheng 已提交
349

D
dapan1121 已提交
350
  tsem_init(&inserter->ready, 0, 0);
H
Hongze Cheng 已提交
351

D
dapan1121 已提交
352 353 354
  *pHandle = inserter;
  return TSDB_CODE_SUCCESS;
}