dataInserter.c 15.0 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "dataSinkInt.h"
#include "dataSinkMgt.h"
18
#include "executorInt.h"
D
dapan1121 已提交
19
#include "planner.h"
20
#include "storageapi.h"
D
dapan1121 已提交
21 22 23 24 25 26 27
#include "tcompression.h"
#include "tdatablock.h"
#include "tglobal.h"
#include "tqueue.h"

extern SDataSinkStat gDataSinkStat;

D
dapan1121 已提交
28
typedef struct SSubmitRes {
X
Xiaoyu Wang 已提交
29 30
  int64_t      affectedRows;
  int32_t      code;
H
Haojun Liao 已提交
31
  SSubmitRsp2* pRsp;
D
dapan1121 已提交
32
} SSubmitRes;
D
dapan1121 已提交
33 34 35 36

typedef struct SDataInserterHandle {
  SDataSinkHandle     sink;
  SDataSinkManager*   pManager;
D
dapan1121 已提交
37 38 39 40 41
  STSchema*           pSchema;
  SQueryInserterNode* pNode;
  SSubmitRes          submitRes;
  SInserterParam*     pParam;
  SArray*             pDataBlocks;
D
dapan1121 已提交
42
  SHashObj*           pCols;
D
dapan1121 已提交
43 44
  int32_t             status;
  bool                queryEnd;
45
  bool                fullOrderColList;
D
dapan1121 已提交
46 47 48
  uint64_t            useconds;
  uint64_t            cachedSize;
  TdThreadMutex       mutex;
H
Hongze Cheng 已提交
49
  tsem_t              ready;
X
Xiaoyu Wang 已提交
50
  bool                explain;
D
dapan1121 已提交
51 52
} SDataInserterHandle;

D
dapan1121 已提交
53 54 55
typedef struct SSubmitRspParam {
  SDataInserterHandle* pInserter;
} SSubmitRspParam;
D
dapan1121 已提交
56

D
dapan1121 已提交
57
int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) {
H
Hongze Cheng 已提交
58
  SSubmitRspParam*     pParam = (SSubmitRspParam*)param;
D
dapan1121 已提交
59
  SDataInserterHandle* pInserter = pParam->pInserter;
D
dapan1121 已提交
60

D
dapan1121 已提交
61
  pInserter->submitRes.code = code;
H
Hongze Cheng 已提交
62

D
dapan1121 已提交
63
  if (code == TSDB_CODE_SUCCESS) {
64
    pInserter->submitRes.pRsp = taosMemoryCalloc(1, sizeof(SSubmitRsp2));
H
Hongze Cheng 已提交
65
    SDecoder coder = {0};
D
dapan1121 已提交
66
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
H
Haojun Liao 已提交
67
    code = tDecodeSSubmitRsp2(&coder, pInserter->submitRes.pRsp);
D
dapan1121 已提交
68
    if (code) {
H
Haojun Liao 已提交
69
      taosMemoryFree(pInserter->submitRes.pRsp);
D
dapan1121 已提交
70 71 72
      pInserter->submitRes.code = code;
      goto _return;
    }
H
Hongze Cheng 已提交
73

H
Haojun Liao 已提交
74 75 76 77 78 79 80 81
    if (pInserter->submitRes.pRsp->affectedRows > 0) {
      SArray* pCreateTbList = pInserter->submitRes.pRsp->aCreateTbRsp;
      int32_t numOfTables = taosArrayGetSize(pCreateTbList);

      for (int32_t i = 0; i < numOfTables; ++i) {
        SVCreateTbRsp* pRsp = taosArrayGet(pCreateTbList, i);
        if (TSDB_CODE_SUCCESS != pRsp->code) {
          code = pRsp->code;
H
Haojun Liao 已提交
82
          taosMemoryFree(pInserter->submitRes.pRsp);
D
dapan1121 已提交
83 84 85 86 87
          pInserter->submitRes.code = code;
          goto _return;
        }
      }
    }
H
Hongze Cheng 已提交
88

89
    pInserter->submitRes.affectedRows += pInserter->submitRes.pRsp->affectedRows;
X
Xiaoyu Wang 已提交
90
    qDebug("submit rsp received, affectedRows:%d, total:%" PRId64, pInserter->submitRes.pRsp->affectedRows,
H
Hongze Cheng 已提交
91
           pInserter->submitRes.affectedRows);
H
Haojun Liao 已提交
92 93
    tDecoderClear(&coder);
    taosMemoryFree(pInserter->submitRes.pRsp);
D
dapan1121 已提交
94
  }
D
dapan1121 已提交
95

D
dapan1121 已提交
96 97
_return:
  tsem_post(&pInserter->ready);
D
dapan1121 已提交
98
  taosMemoryFree(pMsg->pData);
D
dapan1121 已提交
99
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
100 101
}

X
Xiaoyu Wang 已提交
102 103
static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, void* pMsg, int32_t msgLen, void* pTransporter,
                                 SEpSet* pEpset) {
D
dapan1121 已提交
104 105 106 107
  // send the fetch remote task result reques
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (NULL == pMsgSendInfo) {
    taosMemoryFreeClear(pMsg);
S
Shengliang Guan 已提交
108
    terrno = TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
109
    return terrno;
D
dapan1121 已提交
110 111
  }

D
dapan1121 已提交
112 113
  SSubmitRspParam* pParam = taosMemoryCalloc(1, sizeof(SSubmitRspParam));
  pParam->pInserter = pInserter;
D
dapan1121 已提交
114

D
dapan1121 已提交
115
  pMsgSendInfo->param = pParam;
H
Hongze Cheng 已提交
116
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
D
dapan1121 已提交
117
  pMsgSendInfo->msgInfo.pData = pMsg;
118
  pMsgSendInfo->msgInfo.len = msgLen;
D
dapan1121 已提交
119 120
  pMsgSendInfo->msgType = TDMT_VND_SUBMIT;
  pMsgSendInfo->fp = inserterCallback;
D
dapan1121 已提交
121

D
dapan1121 已提交
122 123
  int64_t transporterId = 0;
  return asyncSendMsgToServer(pTransporter, pEpset, &transporterId, pMsgSendInfo);
D
dapan1121 已提交
124 125
}

126
static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int32_t* pLen) {
X
Xiaoyu Wang 已提交
127
  int32_t code = TSDB_CODE_SUCCESS;
128
  int32_t len = 0;
X
Xiaoyu Wang 已提交
129
  void*   pBuf = NULL;
130
  tEncodeSize(tEncodeSubmitReq, pReq, len, code);
131 132
  if (TSDB_CODE_SUCCESS == code) {
    SEncoder encoder;
133
    len += sizeof(SSubmitReq2Msg);
134 135 136 137
    pBuf = taosMemoryMalloc(len);
    if (NULL == pBuf) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
138 139 140 141
    ((SSubmitReq2Msg*)pBuf)->header.vgId = htonl(vgId);
    ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
    ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
142
    code = tEncodeSubmitReq(&encoder, pReq);
143 144
    tEncoderClear(&encoder);
  }
D
dapan1121 已提交
145

146 147 148 149 150 151 152 153
  if (TSDB_CODE_SUCCESS == code) {
    *pData = pBuf;
    *pLen = len;
  } else {
    taosMemoryFree(pBuf);
  }
  return code;
}
D
dapan1121 已提交
154

X
Xiaoyu Wang 已提交
155 156
int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** ppReq, const SSDataBlock* pDataBlock,
                                const STSchema* pTSchema, int64_t uid, int32_t vgId, tb_uid_t suid) {
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
  SSubmitReq2* pReq = *ppReq;
  SArray*      pVals = NULL;
  int32_t      numOfBlks = 0;

  terrno = TSDB_CODE_SUCCESS;

  if (NULL == pReq) {
    if (!(pReq = taosMemoryMalloc(sizeof(SSubmitReq2)))) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _end;
    }

    if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _end;
    }
D
dapan1121 已提交
173 174
  }

175 176
  int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
  int32_t rows = pDataBlock->info.rows;
D
dapan1121 已提交
177

178 179 180 181 182 183 184
  SSubmitTbData tbData = {0};
  if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
    goto _end;
  }
  tbData.suid = suid;
  tbData.uid = uid;
  tbData.sver = pTSchema->version;
D
dapan1121 已提交
185

186 187 188 189
  if (!pVals && !(pVals = taosArrayInit(colNum, sizeof(SColVal)))) {
    taosArrayDestroy(tbData.aRowP);
    goto _end;
  }
D
dapan1121 已提交
190

191 192 193
  int64_t lastTs = TSKEY_MIN;
  bool    ignoreRow = false;
  bool    disorderTs = false;
H
Hongze Cheng 已提交
194

195 196
  for (int32_t j = 0; j < rows; ++j) {  // iterate by row
    taosArrayClear(pVals);
D
dapan1121 已提交
197

198 199
    int32_t offset = 0;
    for (int32_t k = 0; k < pTSchema->numOfCols; ++k) {  // iterate by column
X
Xiaoyu Wang 已提交
200 201
      int16_t         colIdx = k;
      const STColumn* pCol = &pTSchema->columns[k];
202
      if (!pInserter->fullOrderColList) {
203 204 205
        int16_t* slotId = taosHashGet(pInserter->pCols, &pCol->colId, sizeof(pCol->colId));
        if (NULL == slotId) {
          continue;
D
dapan1121 已提交
206
        }
H
Hongze Cheng 已提交
207

208 209
        colIdx = *slotId;
      }
H
Hongze Cheng 已提交
210

211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
      SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, colIdx);
      void*            var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);

      switch (pColInfoData->info.type) {
        case TSDB_DATA_TYPE_NCHAR:
        case TSDB_DATA_TYPE_VARCHAR: {  // TSDB_DATA_TYPE_BINARY
          ASSERT(pColInfoData->info.type == pCol->type);
          if (colDataIsNull_s(pColInfoData, j)) {
            SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
            taosArrayPush(pVals, &cv);
          } else {
            void*   data = colDataGetVarData(pColInfoData, j);
            SValue  sv = (SValue){.nData = varDataLen(data), .pData = varDataVal(data)};  // address copy, no value
            SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
            taosArrayPush(pVals, &cv);
D
dapan1121 已提交
226
          }
227 228 229 230 231 232 233
          break;
        }
        case TSDB_DATA_TYPE_VARBINARY:
        case TSDB_DATA_TYPE_DECIMAL:
        case TSDB_DATA_TYPE_BLOB:
        case TSDB_DATA_TYPE_JSON:
        case TSDB_DATA_TYPE_MEDIUMBLOB:
234 235 236
          qError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
          terrno = TSDB_CODE_APP_ERROR;
          goto _end;
237 238 239 240
          break;
        default:
          if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
            if (colDataIsNull_s(pColInfoData, j)) {
241 242 243 244 245 246
              if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) {
                qError("NULL value for primary key");
                terrno = TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL;
                goto _end;
              }
              
247 248
              SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);  // should use pCol->type
              taosArrayPush(pVals, &cv);
D
dapan1121 已提交
249
            } else {
250 251 252 253 254 255 256 257 258
              if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) {
                if (*(int64_t*)var == lastTs) {
                  ignoreRow = true;
                } else if (*(int64_t*)var < lastTs) {
                  disorderTs = true;
                } else {
                  lastTs = *(int64_t*)var;
                }
              }
X
Xiaoyu Wang 已提交
259

260 261 262 263
              SValue sv;
              memcpy(&sv.val, var, tDataTypes[pCol->type].bytes);
              SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
              taosArrayPush(pVals, &cv);
D
dapan1121 已提交
264
            }
265 266
          } else {
            uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
267 268
            terrno = TSDB_CODE_APP_ERROR;
            goto _end;
D
dapan1121 已提交
269
          }
270
          break;
D
dapan1121 已提交
271
      }
D
dapan1121 已提交
272 273

      if (ignoreRow) {
274
        break;
D
dapan1121 已提交
275
      }
D
dapan1121 已提交
276
    }
H
Hongze Cheng 已提交
277

278 279 280 281
    if (ignoreRow) {
      ignoreRow = false;
      continue;
    }
D
dapan1121 已提交
282

283 284
    SRow* pRow = NULL;
    if ((terrno = tRowBuild(pVals, pTSchema, &pRow)) < 0) {
285
      tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
286 287 288 289
      goto _end;
    }
    taosArrayPush(tbData.aRowP, &pRow);
  }
D
dapan1121 已提交
290

291 292 293 294 295
  if (disorderTs) {
    tRowSort(tbData.aRowP);
    if ((terrno = tRowMerge(tbData.aRowP, (STSchema*)pTSchema, 0)) != 0) {
      goto _end;
    }
D
dapan1121 已提交
296 297
  }

298
  taosArrayPush(pReq->aSubmitTbData, &tbData);
D
dapan1121 已提交
299

300 301 302 303 304
_end:
  taosArrayDestroy(pVals);
  if (terrno != 0) {
    *ppReq = NULL;
    if (pReq) {
305
      tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
306 307
      taosMemoryFree(pReq);
    }
308
    return terrno;
309 310
  }
  *ppReq = pReq;
D
dapan1121 已提交
311
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
312 313
}

314 315 316 317 318 319 320 321
int32_t dataBlocksToSubmitReq(SDataInserterHandle* pInserter, void** pMsg, int32_t* msgLen) {
  const SArray*   pBlocks = pInserter->pDataBlocks;
  const STSchema* pTSchema = pInserter->pSchema;
  int64_t         uid = pInserter->pNode->tableId;
  int64_t         suid = pInserter->pNode->stableId;
  int32_t         vgId = pInserter->pNode->vgId;
  int32_t         sz = taosArrayGetSize(pBlocks);
  int32_t         code = 0;
X
Xiaoyu Wang 已提交
322
  SSubmitReq2*    pReq = NULL;
323 324 325 326 327 328 329

  for (int32_t i = 0; i < sz; i++) {
    SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);

    code = buildSubmitReqFromBlock(pInserter, &pReq, pDataBlock, pTSchema, uid, vgId, suid);
    if (code) {
      if (pReq) {
330
        tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
331 332 333 334 335 336 337 338
        taosMemoryFree(pReq);
      }

      return code;
    }
  }

  code = submitReqToMsg(vgId, pReq, pMsg, msgLen);
339
  tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
340
  taosMemoryFree(pReq);
X
Xiaoyu Wang 已提交
341

342 343 344
  return code;
}

D
dapan1121 已提交
345 346
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
X
Xiaoyu Wang 已提交
347 348 349 350 351 352 353 354
  if (!pInserter->explain) {
    taosArrayPush(pInserter->pDataBlocks, &pInput->pData);
    void*   pMsg = NULL;
    int32_t msgLen = 0;
    int32_t code = dataBlocksToSubmitReq(pInserter, &pMsg, &msgLen);
    if (code) {
      return code;
    }
D
dapan1121 已提交
355

X
Xiaoyu Wang 已提交
356
    taosArrayClear(pInserter->pDataBlocks);
X
Xiaoyu Wang 已提交
357

X
Xiaoyu Wang 已提交
358 359 360 361 362
    code = sendSubmitRequest(pInserter, pMsg, msgLen, pInserter->pParam->readHandle->pMsgCb->clientRpc,
                             &pInserter->pNode->epSet);
    if (code) {
      return code;
    }
D
dapan1121 已提交
363

X
Xiaoyu Wang 已提交
364
    tsem_wait(&pInserter->ready);
D
dapan1121 已提交
365

X
Xiaoyu Wang 已提交
366 367 368
    if (pInserter->submitRes.code) {
      return pInserter->submitRes.code;
    }
D
dapan1121 已提交
369
  }
D
dapan1121 已提交
370 371

  *pContinue = true;
H
Hongze Cheng 已提交
372

D
dapan1121 已提交
373 374 375
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
376 377 378 379 380 381 382 383
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
  taosThreadMutexLock(&pInserter->mutex);
  pInserter->queryEnd = true;
  pInserter->useconds = useconds;
  taosThreadMutexUnlock(&pInserter->mutex);
}

D
dapan1121 已提交
384 385 386
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd) {
  SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
  *pLen = pDispatcher->submitRes.affectedRows;
H
Hongze Cheng 已提交
387
  qDebug("got total affectedRows %" PRId64, *pLen);
D
dapan1121 已提交
388 389
}

D
dapan1121 已提交
390
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
D
dapan1121 已提交
391 392 393 394
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
  atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pInserter->cachedSize);
  taosArrayDestroy(pInserter->pDataBlocks);
  taosMemoryFree(pInserter->pSchema);
D
dapan1121 已提交
395 396
  taosMemoryFree(pInserter->pParam);
  taosHashCleanup(pInserter->pCols);
D
dapan1121 已提交
397
  taosThreadMutexDestroy(&pInserter->mutex);
D
dapan1121 已提交
398 399
  
  taosMemoryFree(pInserter->pManager);
D
dapan1121 已提交
400 401 402 403
  return TSDB_CODE_SUCCESS;
}

static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
D
dapan1121 已提交
404
  SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
D
dapan1121 已提交
405 406 407 408 409

  *size = atomic_load_64(&pDispatcher->cachedSize);
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
410 411
int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle,
                           void* pParam) {
D
dapan1121 已提交
412 413
  SDataInserterHandle* inserter = taosMemoryCalloc(1, sizeof(SDataInserterHandle));
  if (NULL == inserter) {
H
Haojun Liao 已提交
414
    taosMemoryFree(pParam);
S
Shengliang Guan 已提交
415
    terrno = TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
416
    goto _return;
D
dapan1121 已提交
417 418
  }

H
Hongze Cheng 已提交
419
  SQueryInserterNode* pInserterNode = (SQueryInserterNode*)pDataSink;
D
dapan1121 已提交
420 421
  inserter->sink.fPut = putDataBlock;
  inserter->sink.fEndPut = endPut;
D
dapan1121 已提交
422
  inserter->sink.fGetLen = getDataLength;
D
dapan1121 已提交
423
  inserter->sink.fGetData = NULL;
D
dapan1121 已提交
424 425 426
  inserter->sink.fDestroy = destroyDataSinker;
  inserter->sink.fGetCacheSize = getCacheSize;
  inserter->pManager = pManager;
D
dapan1121 已提交
427
  inserter->pNode = pInserterNode;
D
dapan1121 已提交
428 429 430
  inserter->pParam = pParam;
  inserter->status = DS_BUF_EMPTY;
  inserter->queryEnd = false;
X
Xiaoyu Wang 已提交
431
  inserter->explain = pInserterNode->explain;
D
dapan1121 已提交
432 433

  int64_t suid = 0;
H
Haojun Liao 已提交
434
  int32_t code = pManager->pAPI->metaFn.getTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid);
D
dapan1121 已提交
435
  if (code) {
D
dapan1121 已提交
436 437
    terrno = code;
    goto _return;
D
dapan1121 已提交
438 439
  }

D
dapan1121 已提交
440
  if (pInserterNode->stableId != suid) {
D
dapan1121 已提交
441
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
D
dapan1121 已提交
442
    goto _return;
D
dapan1121 已提交
443 444 445
  }

  inserter->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
D
dapan1121 已提交
446 447
  taosThreadMutexInit(&inserter->mutex, NULL);
  if (NULL == inserter->pDataBlocks) {
S
Shengliang Guan 已提交
448 449
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
450
  }
D
dapan1121 已提交
451

452 453
  inserter->fullOrderColList = pInserterNode->pCols->length == inserter->pSchema->numOfCols;

H
Hongze Cheng 已提交
454 455
  inserter->pCols = taosHashInit(pInserterNode->pCols->length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT),
                                 false, HASH_NO_LOCK);
X
Xiaoyu Wang 已提交
456
  SNode*  pNode = NULL;
457
  int32_t i = 0;
D
dapan1121 已提交
458 459 460
  FOREACH(pNode, pInserterNode->pCols) {
    SColumnNode* pCol = (SColumnNode*)pNode;
    taosHashPut(inserter->pCols, &pCol->colId, sizeof(pCol->colId), &pCol->slotId, sizeof(pCol->slotId));
461 462 463
    if (inserter->fullOrderColList && pCol->colId != inserter->pSchema->columns[i].colId) {
      inserter->fullOrderColList = false;
    }
D
dapan1121 已提交
464
    ++i;
D
dapan1121 已提交
465
  }
H
Hongze Cheng 已提交
466

D
dapan1121 已提交
467
  tsem_init(&inserter->ready, 0, 0);
H
Hongze Cheng 已提交
468

D
dapan1121 已提交
469 470
  *pHandle = inserter;
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
471 472 473 474 475 476 477 478 479 480 481

_return:

  if (inserter) {
    destroyDataSinker((SDataSinkHandle*)inserter);
    taosMemoryFree(inserter);
  } else {
    taosMemoryFree(pManager);
  }
  
  return terrno;  
D
dapan1121 已提交
482
}