dataInserter.c 11.0 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "dataSinkInt.h"
#include "dataSinkMgt.h"
#include "executorimpl.h"
#include "planner.h"
#include "tcompression.h"
#include "tdatablock.h"
#include "tglobal.h"
#include "tqueue.h"

extern SDataSinkStat gDataSinkStat;

D
dapan1121 已提交
27 28 29 30 31
typedef struct SSubmitRes {
  int64_t     affectedRows;
  int32_t     code;
  SSubmitRsp *pRsp;
} SSubmitRes;
D
dapan1121 已提交
32 33 34 35

typedef struct SDataInserterHandle {
  SDataSinkHandle     sink;
  SDataSinkManager*   pManager;
D
dapan1121 已提交
36 37 38 39 40
  STSchema*           pSchema;
  SQueryInserterNode* pNode;
  SSubmitRes          submitRes;
  SInserterParam*     pParam;
  SArray*             pDataBlocks;
D
dapan1121 已提交
41
  SHashObj*           pCols;
D
dapan1121 已提交
42 43 44 45 46
  int32_t             status;
  bool                queryEnd;
  uint64_t            useconds;
  uint64_t            cachedSize;
  TdThreadMutex       mutex;
D
dapan1121 已提交
47
  tsem_t              ready;  
D
dapan1121 已提交
48 49
} SDataInserterHandle;

D
dapan1121 已提交
50 51 52
typedef struct SSubmitRspParam {
  SDataInserterHandle* pInserter;
} SSubmitRspParam;
D
dapan1121 已提交
53

D
dapan1121 已提交
54 55 56
int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) {
  SSubmitRspParam* pParam = (SSubmitRspParam*)param;
  SDataInserterHandle* pInserter = pParam->pInserter;
D
dapan1121 已提交
57

D
dapan1121 已提交
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
  pInserter->submitRes.code = code;
  
  if (code == TSDB_CODE_SUCCESS) {
    pInserter->submitRes.pRsp = taosMemoryCalloc(1, sizeof(SSubmitRsp));
    SDecoder    coder = {0};
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
    code = tDecodeSSubmitRsp(&coder, pInserter->submitRes.pRsp);
    if (code) {
      tFreeSSubmitRsp(pInserter->submitRes.pRsp);
      pInserter->submitRes.code = code;
      goto _return;
    }
    
    if (pInserter->submitRes.pRsp->nBlocks > 0) {
      for (int32_t i = 0; i < pInserter->submitRes.pRsp->nBlocks; ++i) {
        SSubmitBlkRsp *blk = pInserter->submitRes.pRsp->pBlocks + i;
        if (TSDB_CODE_SUCCESS != blk->code) {
          code = blk->code;
          tFreeSSubmitRsp(pInserter->submitRes.pRsp);
          pInserter->submitRes.code = code;
          goto _return;
        }
      }
    }
    
    pInserter->submitRes.affectedRows += pInserter->submitRes.pRsp->affectedRows;
    qDebug("submit rsp received, affectedRows:%d, total:%d", pInserter->submitRes.pRsp->affectedRows, pInserter->submitRes.affectedRows);
D
dapan1121 已提交
85

D
dapan1121 已提交
86 87
    tFreeSSubmitRsp(pInserter->submitRes.pRsp);
  }
D
dapan1121 已提交
88

D
dapan1121 已提交
89
_return:
D
dapan1121 已提交
90

D
dapan1121 已提交
91
  tsem_post(&pInserter->ready);
D
dapan1121 已提交
92

D
dapan1121 已提交
93
  taosMemoryFree(pMsg->pData);
D
dapan1121 已提交
94
  
D
dapan1121 已提交
95
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
96 97 98
}


D
dapan1121 已提交
99 100 101 102 103 104 105
static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, SSubmitReq* pMsg, void* pTransporter, SEpSet* pEpset) {
  // send the fetch remote task result reques
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (NULL == pMsgSendInfo) {
    taosMemoryFreeClear(pMsg);
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return terrno;
D
dapan1121 已提交
106 107
  }

D
dapan1121 已提交
108 109
  SSubmitRspParam* pParam = taosMemoryCalloc(1, sizeof(SSubmitRspParam));
  pParam->pInserter = pInserter;
D
dapan1121 已提交
110

D
dapan1121 已提交
111
  pMsgSendInfo->param = pParam;
D
dapan1121 已提交
112
  pMsgSendInfo->paramFreeFp = taosMemoryFree;        
D
dapan1121 已提交
113
  pMsgSendInfo->msgInfo.pData = pMsg;
D
dapan1121 已提交
114
  pMsgSendInfo->msgInfo.len = ntohl(pMsg->length);
D
dapan1121 已提交
115 116
  pMsgSendInfo->msgType = TDMT_VND_SUBMIT;
  pMsgSendInfo->fp = inserterCallback;
D
dapan1121 已提交
117

D
dapan1121 已提交
118 119
  int64_t transporterId = 0;
  return asyncSendMsgToServer(pTransporter, pEpset, &transporterId, pMsgSendInfo);
D
dapan1121 已提交
120 121 122
}


D
dapan1121 已提交
123
int32_t dataBlockToSubmit(SDataInserterHandle* pInserter, SSubmitReq** pReq) {
D
dapan1121 已提交
124 125 126 127 128
  const SArray* pBlocks = pInserter->pDataBlocks;
  const STSchema* pTSchema = pInserter->pSchema; 
  int64_t uid = pInserter->pNode->tableId; 
  int64_t suid = pInserter->pNode->stableId; 
  int32_t vgId = pInserter->pNode->vgId;
D
dapan1121 已提交
129
  bool fullCol = (pInserter->pNode->pCols->length == pTSchema->numOfCols);
D
dapan1121 已提交
130

D
dapan1121 已提交
131 132 133 134 135 136
  SSubmitReq* ret = NULL;
  int32_t sz = taosArrayGetSize(pBlocks);

  // cal size
  int32_t cap = sizeof(SSubmitReq);
  for (int32_t i = 0; i < sz; i++) {
D
dapan1121 已提交
137
    SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);
D
dapan1121 已提交
138 139 140 141 142 143 144 145 146 147
    int32_t      rows = pDataBlock->info.rows;
    // TODO min
    int32_t rowSize = pDataBlock->info.rowSize;
    int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);

    cap += sizeof(SSubmitBlk) + rows * maxLen;
  }

  // assign data
  // TODO
D
dapan1121 已提交
148 149
  ret = taosMemoryCalloc(1, cap);
  ret->header.vgId = htonl(vgId);
D
dapan1121 已提交
150
  ret->version = htonl(pTSchema->version);
D
dapan1121 已提交
151 152 153 154 155
  ret->length = sizeof(SSubmitReq);
  ret->numOfBlocks = htonl(sz);

  SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq));
  for (int32_t i = 0; i < sz; i++) {
D
dapan1121 已提交
156
    SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);
D
dapan1121 已提交
157 158 159 160 161 162 163

    blkHead->sversion = htonl(pTSchema->version);
    // TODO
    blkHead->suid = htobe64(suid);
    blkHead->uid = htobe64(uid);
    blkHead->schemaLen = htonl(0);

D
dapan1121 已提交
164
    int32_t rows = 0;
D
dapan1121 已提交
165 166
    int32_t dataLen = 0;
    STSRow* rowData = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk));
D
dapan1121 已提交
167 168 169
    int64_t lastTs =  TSKEY_MIN;
    bool    ignoreRow = false;
    for (int32_t j = 0; j < pDataBlock->info.rows; j++) {
D
dapan1121 已提交
170 171 172 173 174
      SRowBuilder rb = {0};
      tdSRowInit(&rb, pTSchema->version);
      tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen);
      tdSRowResetBuf(&rb, rowData);

D
dapan1121 已提交
175
      ignoreRow = false;
D
dapan1121 已提交
176 177
      for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
        const STColumn*  pColumn = &pTSchema->columns[k];
D
dapan1121 已提交
178 179 180 181 182 183 184 185 186 187 188 189
        SColumnInfoData* pColData = NULL;
        int16_t colIdx = k;
        if (!fullCol) {
          int16_t *slotId = taosHashGet(pInserter->pCols, &pColumn->colId, sizeof(pColumn->colId));
          if (NULL == slotId) {
            continue;
          }
          
          colIdx = *slotId;
        }

        pColData = taosArrayGet(pDataBlock->pDataBlock, colIdx);
D
dapan1121 已提交
190 191 192 193 194 195
        if (pColData->info.type != pColumn->type) {
          qError("col type mis-match, schema type:%d, type in block:%d", pColumn->type, pColData->info.type);
          terrno = TSDB_CODE_APP_ERROR;
          return TSDB_CODE_APP_ERROR;
        }
        
D
dapan1121 已提交
196
        if (colDataIsNull_s(pColData, j)) {
D
dapan1121 已提交
197 198 199 200 201
          if (0 == k && TSDB_DATA_TYPE_TIMESTAMP == pColumn->type) {
            ignoreRow = true;
            break;
          }
          
D
dapan1121 已提交
202
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k);
D
dapan1121 已提交
203 204
        } else {
          void* data = colDataGetData(pColData, j);
D
dapan1121 已提交
205 206 207 208 209 210 211 212
          if (0 == k && TSDB_DATA_TYPE_TIMESTAMP == pColumn->type) {
            if (*(int64_t*)data == lastTs) {
              ignoreRow = true;
              break;
            } else {
              lastTs = *(int64_t*)data;
            }
          }
D
dapan1121 已提交
213 214 215
          tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k);
        }
      }
216
      tdSRowEnd(&rb);
D
dapan1121 已提交
217 218 219 220 221 222

      if (ignoreRow) {
        continue;
      }
      
      rows++;
D
dapan1121 已提交
223 224 225 226 227 228
      int32_t rowLen = TD_ROW_LEN(rowData);
      rowData = POINTER_SHIFT(rowData, rowLen);
      dataLen += rowLen;
    }
    
    blkHead->dataLen = htonl(dataLen);
229
    blkHead->numOfRows = htonl(rows);
D
dapan1121 已提交
230 231 232 233 234 235 236

    ret->length += sizeof(SSubmitBlk) + dataLen;
    blkHead = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk) + dataLen);
  }

  ret->length = htonl(ret->length);

D
dapan1121 已提交
237 238 239
  *pReq = ret;

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
240 241 242
}


D
dapan1121 已提交
243 244
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
D
dapan1121 已提交
245 246 247 248 249 250
  taosArrayPush(pInserter->pDataBlocks, &pInput->pData);
  SSubmitReq* pMsg = NULL;
  int32_t code = dataBlockToSubmit(pInserter, &pMsg);
  if (code) {
    return code;
  }
D
dapan1121 已提交
251

D
dapan1121 已提交
252
  code = sendSubmitRequest(pInserter, pMsg, pInserter->pParam->readHandle->pMsgCb->clientRpc, &pInserter->pNode->epSet);
D
dapan1121 已提交
253 254
  if (code) {
    return code;
D
dapan1121 已提交
255 256
  }

D
dapan1121 已提交
257
  tsem_wait(&pInserter->ready);
D
dapan1121 已提交
258

D
dapan1121 已提交
259 260
  if (pInserter->submitRes.code) {
    return pInserter->submitRes.code;
D
dapan1121 已提交
261
  }
D
dapan1121 已提交
262 263

  *pContinue = true;
D
dapan1121 已提交
264 265 266 267
  
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
268 269 270 271 272 273 274 275
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
  taosThreadMutexLock(&pInserter->mutex);
  pInserter->queryEnd = true;
  pInserter->useconds = useconds;
  taosThreadMutexUnlock(&pInserter->mutex);
}

D
dapan1121 已提交
276 277 278 279 280 281 282
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd) {
  SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
  *pLen = pDispatcher->submitRes.affectedRows;
  qDebug("got total affectedRows %" PRId64 , *pLen);
}


D
dapan1121 已提交
283
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
D
dapan1121 已提交
284 285 286 287
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
  atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pInserter->cachedSize);
  taosArrayDestroy(pInserter->pDataBlocks);
  taosMemoryFree(pInserter->pSchema);
D
dapan1121 已提交
288 289
  taosMemoryFree(pInserter->pParam);
  taosHashCleanup(pInserter->pCols);
D
dapan1121 已提交
290
  taosThreadMutexDestroy(&pInserter->mutex);
D
dapan1121 已提交
291 292 293 294
  return TSDB_CODE_SUCCESS;
}

static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
D
dapan1121 已提交
295
  SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
D
dapan1121 已提交
296 297 298 299 300 301 302 303 304 305 306 307

  *size = atomic_load_64(&pDispatcher->cachedSize);
  return TSDB_CODE_SUCCESS;
}

int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void *pParam) {
  SDataInserterHandle* inserter = taosMemoryCalloc(1, sizeof(SDataInserterHandle));
  if (NULL == inserter) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }

D
dapan1121 已提交
308
  SQueryInserterNode* pInserterNode = (SQueryInserterNode *)pDataSink;
D
dapan1121 已提交
309 310
  inserter->sink.fPut = putDataBlock;
  inserter->sink.fEndPut = endPut;
D
dapan1121 已提交
311
  inserter->sink.fGetLen = getDataLength;
D
dapan1121 已提交
312
  inserter->sink.fGetData = NULL;
D
dapan1121 已提交
313 314 315
  inserter->sink.fDestroy = destroyDataSinker;
  inserter->sink.fGetCacheSize = getCacheSize;
  inserter->pManager = pManager;
D
dapan1121 已提交
316
  inserter->pNode = pInserterNode;
D
dapan1121 已提交
317 318 319
  inserter->pParam = pParam;
  inserter->status = DS_BUF_EMPTY;
  inserter->queryEnd = false;
D
dapan1121 已提交
320 321 322 323 324 325 326

  int64_t suid = 0;
  int32_t code = tsdbGetTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid);
  if (code) {
    return code;
  }

D
dapan1121 已提交
327
  if (pInserterNode->stableId != suid) {
D
dapan1121 已提交
328 329 330 331 332
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    return terrno;
  }

  inserter->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
D
dapan1121 已提交
333 334 335 336 337
  taosThreadMutexInit(&inserter->mutex, NULL);
  if (NULL == inserter->pDataBlocks) {
    terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
    return TSDB_CODE_QRY_OUT_OF_MEMORY;
  }
D
dapan1121 已提交
338

D
dapan1121 已提交
339 340 341 342 343 344 345
  inserter->pCols = taosHashInit(pInserterNode->pCols->length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
  SNode* pNode = NULL;
  FOREACH(pNode, pInserterNode->pCols) {
    SColumnNode* pCol = (SColumnNode*)pNode;
    taosHashPut(inserter->pCols, &pCol->colId, sizeof(pCol->colId), &pCol->slotId, sizeof(pCol->slotId));
  }
  
D
dapan1121 已提交
346 347
  tsem_init(&inserter->ready, 0, 0);
  
D
dapan1121 已提交
348 349 350
  *pHandle = inserter;
  return TSDB_CODE_SUCCESS;
}