dataInserter.c 10.0 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "dataSinkInt.h"
#include "dataSinkMgt.h"
#include "executorimpl.h"
#include "planner.h"
#include "tcompression.h"
#include "tdatablock.h"
#include "tglobal.h"
#include "tqueue.h"

extern SDataSinkStat gDataSinkStat;

D
dapan1121 已提交
27 28 29 30 31
typedef struct SSubmitRes {
  int64_t     affectedRows;
  int32_t     code;
  SSubmitRsp *pRsp;
} SSubmitRes;
D
dapan1121 已提交
32 33 34 35

typedef struct SDataInserterHandle {
  SDataSinkHandle     sink;
  SDataSinkManager*   pManager;
D
dapan1121 已提交
36 37 38 39 40
  STSchema*           pSchema;
  SQueryInserterNode* pNode;
  SSubmitRes          submitRes;
  SInserterParam*     pParam;
  SArray*             pDataBlocks;
D
dapan1121 已提交
41
  SHashObj*           pCols;
D
dapan1121 已提交
42 43 44 45 46
  int32_t             status;
  bool                queryEnd;
  uint64_t            useconds;
  uint64_t            cachedSize;
  TdThreadMutex       mutex;
D
dapan1121 已提交
47
  tsem_t              ready;  
D
dapan1121 已提交
48 49
} SDataInserterHandle;

D
dapan1121 已提交
50 51 52
typedef struct SSubmitRspParam {
  SDataInserterHandle* pInserter;
} SSubmitRspParam;
D
dapan1121 已提交
53

D
dapan1121 已提交
54 55 56
int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) {
  SSubmitRspParam* pParam = (SSubmitRspParam*)param;
  SDataInserterHandle* pInserter = pParam->pInserter;
D
dapan1121 已提交
57

D
dapan1121 已提交
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
  pInserter->submitRes.code = code;
  
  if (code == TSDB_CODE_SUCCESS) {
    pInserter->submitRes.pRsp = taosMemoryCalloc(1, sizeof(SSubmitRsp));
    SDecoder    coder = {0};
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
    code = tDecodeSSubmitRsp(&coder, pInserter->submitRes.pRsp);
    if (code) {
      tFreeSSubmitRsp(pInserter->submitRes.pRsp);
      pInserter->submitRes.code = code;
      goto _return;
    }
    
    if (pInserter->submitRes.pRsp->nBlocks > 0) {
      for (int32_t i = 0; i < pInserter->submitRes.pRsp->nBlocks; ++i) {
        SSubmitBlkRsp *blk = pInserter->submitRes.pRsp->pBlocks + i;
        if (TSDB_CODE_SUCCESS != blk->code) {
          code = blk->code;
          tFreeSSubmitRsp(pInserter->submitRes.pRsp);
          pInserter->submitRes.code = code;
          goto _return;
        }
      }
    }
    
    pInserter->submitRes.affectedRows += pInserter->submitRes.pRsp->affectedRows;
    qDebug("submit rsp received, affectedRows:%d, total:%d", pInserter->submitRes.pRsp->affectedRows, pInserter->submitRes.affectedRows);
D
dapan1121 已提交
85

D
dapan1121 已提交
86 87
    tFreeSSubmitRsp(pInserter->submitRes.pRsp);
  }
D
dapan1121 已提交
88

D
dapan1121 已提交
89
_return:
D
dapan1121 已提交
90

D
dapan1121 已提交
91
  tsem_post(&pInserter->ready);
D
dapan1121 已提交
92

D
dapan1121 已提交
93
  taosMemoryFree(param);
D
dapan1121 已提交
94
  
D
dapan1121 已提交
95
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
96 97 98
}


D
dapan1121 已提交
99 100 101 102 103 104 105
static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, SSubmitReq* pMsg, void* pTransporter, SEpSet* pEpset) {
  // send the fetch remote task result reques
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (NULL == pMsgSendInfo) {
    taosMemoryFreeClear(pMsg);
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return terrno;
D
dapan1121 已提交
106 107
  }

D
dapan1121 已提交
108 109
  SSubmitRspParam* pParam = taosMemoryCalloc(1, sizeof(SSubmitRspParam));
  pParam->pInserter = pInserter;
D
dapan1121 已提交
110

D
dapan1121 已提交
111 112 113 114 115
  pMsgSendInfo->param = pParam;
  pMsgSendInfo->msgInfo.pData = pMsg;
  pMsgSendInfo->msgInfo.len = sizeof(SSubmitReq);
  pMsgSendInfo->msgType = TDMT_VND_SUBMIT;
  pMsgSendInfo->fp = inserterCallback;
D
dapan1121 已提交
116

D
dapan1121 已提交
117 118
  int64_t transporterId = 0;
  return asyncSendMsgToServer(pTransporter, pEpset, &transporterId, pMsgSendInfo);
D
dapan1121 已提交
119 120 121
}


D
dapan1121 已提交
122 123 124 125 126 127
SSubmitReq* dataBlockToSubmit(SDataInserterHandle* pInserter) {
  const SArray* pBlocks = pInserter->pDataBlocks;
  const STSchema* pTSchema = pInserter->pSchema; 
  int64_t uid = pInserter->pNode->tableId; 
  int64_t suid = pInserter->pNode->stableId; 
  int32_t vgId = pInserter->pNode->vgId;
D
dapan1121 已提交
128
  bool fullCol = (pInserter->pNode->pCols->length == pTSchema->numOfCols);
D
dapan1121 已提交
129

D
dapan1121 已提交
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
  SSubmitReq* ret = NULL;
  int32_t sz = taosArrayGetSize(pBlocks);

  // cal size
  int32_t cap = sizeof(SSubmitReq);
  for (int32_t i = 0; i < sz; i++) {
    SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
    int32_t      rows = pDataBlock->info.rows;
    // TODO min
    int32_t rowSize = pDataBlock->info.rowSize;
    int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);

    cap += sizeof(SSubmitBlk) + rows * maxLen;
  }

  // assign data
  // TODO
  ret = rpcMallocCont(cap);
  ret->header.vgId = vgId;
D
dapan1121 已提交
149
  ret->version = htonl(pTSchema->version);
D
dapan1121 已提交
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
  ret->length = sizeof(SSubmitReq);
  ret->numOfBlocks = htonl(sz);

  SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq));
  for (int32_t i = 0; i < sz; i++) {
    SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);

    blkHead->numOfRows = htons(pDataBlock->info.rows);
    blkHead->sversion = htonl(pTSchema->version);
    // TODO
    blkHead->suid = htobe64(suid);
    blkHead->uid = htobe64(uid);
    blkHead->schemaLen = htonl(0);

    int32_t rows = pDataBlock->info.rows;
    int32_t dataLen = 0;
    STSRow* rowData = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk));
    for (int32_t j = 0; j < rows; j++) {
      SRowBuilder rb = {0};
      tdSRowInit(&rb, pTSchema->version);
      tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen);
      tdSRowResetBuf(&rb, rowData);

      for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
        const STColumn*  pColumn = &pTSchema->columns[k];
D
dapan1121 已提交
175 176 177 178 179 180 181 182 183 184 185 186
        SColumnInfoData* pColData = NULL;
        int16_t colIdx = k;
        if (!fullCol) {
          int16_t *slotId = taosHashGet(pInserter->pCols, &pColumn->colId, sizeof(pColumn->colId));
          if (NULL == slotId) {
            continue;
          }
          
          colIdx = *slotId;
        }

        pColData = taosArrayGet(pDataBlock->pDataBlock, colIdx);
D
dapan1121 已提交
187
        if (colDataIsNull_s(pColData, j)) {
D
dapan1121 已提交
188
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k);
D
dapan1121 已提交
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
        } else {
          void* data = colDataGetData(pColData, j);
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k);
        }
      }
      int32_t rowLen = TD_ROW_LEN(rowData);
      rowData = POINTER_SHIFT(rowData, rowLen);
      dataLen += rowLen;
    }
    
    blkHead->dataLen = htonl(dataLen);

    ret->length += sizeof(SSubmitBlk) + dataLen;
    blkHead = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk) + dataLen);
  }

  ret->length = htonl(ret->length);

  return ret;
}


D
dapan1121 已提交
211 212 213
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
  taosArrayPush(pInserter->pDataBlocks, pInput->pData);
D
dapan1121 已提交
214
  SSubmitReq* pMsg = dataBlockToSubmit(pInserter);
D
dapan1121 已提交
215

D
dapan1121 已提交
216 217 218
  int32_t code = sendSubmitRequest(pInserter, pMsg, pInserter->pParam->readHandle->pMsgCb->clientRpc, &pInserter->pNode->epSet);
  if (code) {
    return code;
D
dapan1121 已提交
219 220
  }

D
dapan1121 已提交
221
  tsem_wait(&pInserter->ready);
D
dapan1121 已提交
222

D
dapan1121 已提交
223 224
  if (pInserter->submitRes.code) {
    return pInserter->submitRes.code;
D
dapan1121 已提交
225
  }
D
dapan1121 已提交
226 227

  *pContinue = true;
D
dapan1121 已提交
228 229 230 231
  
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
232 233 234 235 236 237 238 239
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
  taosThreadMutexLock(&pInserter->mutex);
  pInserter->queryEnd = true;
  pInserter->useconds = useconds;
  taosThreadMutexUnlock(&pInserter->mutex);
}

D
dapan1121 已提交
240 241 242 243 244 245 246
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd) {
  SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
  *pLen = pDispatcher->submitRes.affectedRows;
  qDebug("got total affectedRows %" PRId64 , *pLen);
}


D
dapan1121 已提交
247
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
D
dapan1121 已提交
248 249 250 251 252
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
  atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pInserter->cachedSize);
  taosArrayDestroy(pInserter->pDataBlocks);
  taosMemoryFree(pInserter->pSchema);
  taosThreadMutexDestroy(&pInserter->mutex);
D
dapan1121 已提交
253 254 255 256
  return TSDB_CODE_SUCCESS;
}

static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
D
dapan1121 已提交
257
  SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
D
dapan1121 已提交
258 259 260 261 262 263 264 265 266 267 268 269

  *size = atomic_load_64(&pDispatcher->cachedSize);
  return TSDB_CODE_SUCCESS;
}

int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void *pParam) {
  SDataInserterHandle* inserter = taosMemoryCalloc(1, sizeof(SDataInserterHandle));
  if (NULL == inserter) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }

D
dapan1121 已提交
270
  SQueryInserterNode* pInserterNode = (SQueryInserterNode *)pDataSink;
D
dapan1121 已提交
271 272
  inserter->sink.fPut = putDataBlock;
  inserter->sink.fEndPut = endPut;
D
dapan1121 已提交
273
  inserter->sink.fGetLen = getDataLength;
D
dapan1121 已提交
274
  inserter->sink.fGetData = NULL;
D
dapan1121 已提交
275 276 277
  inserter->sink.fDestroy = destroyDataSinker;
  inserter->sink.fGetCacheSize = getCacheSize;
  inserter->pManager = pManager;
D
dapan1121 已提交
278
  inserter->pNode = pInserterNode;
D
dapan1121 已提交
279 280 281
  inserter->pParam = pParam;
  inserter->status = DS_BUF_EMPTY;
  inserter->queryEnd = false;
D
dapan1121 已提交
282 283 284 285 286 287 288

  int64_t suid = 0;
  int32_t code = tsdbGetTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid);
  if (code) {
    return code;
  }

D
dapan1121 已提交
289
  if (pInserterNode->stableId != suid) {
D
dapan1121 已提交
290 291 292 293 294
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    return terrno;
  }

  inserter->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
D
dapan1121 已提交
295 296 297 298 299
  taosThreadMutexInit(&inserter->mutex, NULL);
  if (NULL == inserter->pDataBlocks) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }
D
dapan1121 已提交
300

D
dapan1121 已提交
301 302 303 304 305 306 307
  inserter->pCols = taosHashInit(pInserterNode->pCols->length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
  SNode* pNode = NULL;
  FOREACH(pNode, pInserterNode->pCols) {
    SColumnNode* pCol = (SColumnNode*)pNode;
    taosHashPut(inserter->pCols, &pCol->colId, sizeof(pCol->colId), &pCol->slotId, sizeof(pCol->slotId));
  }
  
D
dapan1121 已提交
308 309
  tsem_init(&inserter->ready, 0, 0);
  
D
dapan1121 已提交
310 311 312
  *pHandle = inserter;
  return TSDB_CODE_SUCCESS;
}