dataInserter.c 14.6 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "dataSinkInt.h"
#include "dataSinkMgt.h"
#include "executorimpl.h"
#include "planner.h"
#include "tcompression.h"
#include "tdatablock.h"
#include "tglobal.h"
#include "tqueue.h"

extern SDataSinkStat gDataSinkStat;

D
dapan1121 已提交
27
typedef struct SSubmitRes {
X
Xiaoyu Wang 已提交
28 29
  int64_t      affectedRows;
  int32_t      code;
H
Haojun Liao 已提交
30
  SSubmitRsp2* pRsp;
D
dapan1121 已提交
31
} SSubmitRes;
D
dapan1121 已提交
32 33 34 35

typedef struct SDataInserterHandle {
  SDataSinkHandle     sink;
  SDataSinkManager*   pManager;
D
dapan1121 已提交
36 37 38 39 40
  STSchema*           pSchema;
  SQueryInserterNode* pNode;
  SSubmitRes          submitRes;
  SInserterParam*     pParam;
  SArray*             pDataBlocks;
D
dapan1121 已提交
41
  SHashObj*           pCols;
D
dapan1121 已提交
42 43
  int32_t             status;
  bool                queryEnd;
44
  bool                fullOrderColList;
D
dapan1121 已提交
45 46 47
  uint64_t            useconds;
  uint64_t            cachedSize;
  TdThreadMutex       mutex;
H
Hongze Cheng 已提交
48
  tsem_t              ready;
X
Xiaoyu Wang 已提交
49
  bool                explain;
D
dapan1121 已提交
50 51
} SDataInserterHandle;

D
dapan1121 已提交
52 53 54
typedef struct SSubmitRspParam {
  SDataInserterHandle* pInserter;
} SSubmitRspParam;
D
dapan1121 已提交
55

D
dapan1121 已提交
56
int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) {
H
Hongze Cheng 已提交
57
  SSubmitRspParam*     pParam = (SSubmitRspParam*)param;
D
dapan1121 已提交
58
  SDataInserterHandle* pInserter = pParam->pInserter;
D
dapan1121 已提交
59

D
dapan1121 已提交
60
  pInserter->submitRes.code = code;
H
Hongze Cheng 已提交
61

D
dapan1121 已提交
62
  if (code == TSDB_CODE_SUCCESS) {
63
    pInserter->submitRes.pRsp = taosMemoryCalloc(1, sizeof(SSubmitRsp2));
H
Hongze Cheng 已提交
64
    SDecoder coder = {0};
D
dapan1121 已提交
65
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
H
Haojun Liao 已提交
66
    code = tDecodeSSubmitRsp2(&coder, pInserter->submitRes.pRsp);
D
dapan1121 已提交
67
    if (code) {
H
Haojun Liao 已提交
68
      taosMemoryFree(pInserter->submitRes.pRsp);
D
dapan1121 已提交
69 70 71
      pInserter->submitRes.code = code;
      goto _return;
    }
H
Hongze Cheng 已提交
72

H
Haojun Liao 已提交
73 74 75 76 77 78 79 80
    if (pInserter->submitRes.pRsp->affectedRows > 0) {
      SArray* pCreateTbList = pInserter->submitRes.pRsp->aCreateTbRsp;
      int32_t numOfTables = taosArrayGetSize(pCreateTbList);

      for (int32_t i = 0; i < numOfTables; ++i) {
        SVCreateTbRsp* pRsp = taosArrayGet(pCreateTbList, i);
        if (TSDB_CODE_SUCCESS != pRsp->code) {
          code = pRsp->code;
H
Haojun Liao 已提交
81
          taosMemoryFree(pInserter->submitRes.pRsp);
D
dapan1121 已提交
82 83 84 85 86
          pInserter->submitRes.code = code;
          goto _return;
        }
      }
    }
H
Hongze Cheng 已提交
87

88
    pInserter->submitRes.affectedRows += pInserter->submitRes.pRsp->affectedRows;
X
Xiaoyu Wang 已提交
89
    qDebug("submit rsp received, affectedRows:%d, total:%" PRId64, pInserter->submitRes.pRsp->affectedRows,
H
Hongze Cheng 已提交
90
           pInserter->submitRes.affectedRows);
H
Haojun Liao 已提交
91 92
    tDecoderClear(&coder);
    taosMemoryFree(pInserter->submitRes.pRsp);
D
dapan1121 已提交
93
  }
D
dapan1121 已提交
94

D
dapan1121 已提交
95 96
_return:
  tsem_post(&pInserter->ready);
D
dapan1121 已提交
97
  taosMemoryFree(pMsg->pData);
D
dapan1121 已提交
98
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
99 100
}

X
Xiaoyu Wang 已提交
101 102
static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, void* pMsg, int32_t msgLen, void* pTransporter,
                                 SEpSet* pEpset) {
D
dapan1121 已提交
103 104 105 106
  // send the fetch remote task result reques
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (NULL == pMsgSendInfo) {
    taosMemoryFreeClear(pMsg);
S
Shengliang Guan 已提交
107
    terrno = TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
108
    return terrno;
D
dapan1121 已提交
109 110
  }

D
dapan1121 已提交
111 112
  SSubmitRspParam* pParam = taosMemoryCalloc(1, sizeof(SSubmitRspParam));
  pParam->pInserter = pInserter;
D
dapan1121 已提交
113

D
dapan1121 已提交
114
  pMsgSendInfo->param = pParam;
H
Hongze Cheng 已提交
115
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
D
dapan1121 已提交
116
  pMsgSendInfo->msgInfo.pData = pMsg;
117
  pMsgSendInfo->msgInfo.len = msgLen;
D
dapan1121 已提交
118 119
  pMsgSendInfo->msgType = TDMT_VND_SUBMIT;
  pMsgSendInfo->fp = inserterCallback;
D
dapan1121 已提交
120

D
dapan1121 已提交
121 122
  int64_t transporterId = 0;
  return asyncSendMsgToServer(pTransporter, pEpset, &transporterId, pMsgSendInfo);
D
dapan1121 已提交
123 124
}

125
static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int32_t* pLen) {
X
Xiaoyu Wang 已提交
126
  int32_t code = TSDB_CODE_SUCCESS;
127
  int32_t len = 0;
X
Xiaoyu Wang 已提交
128
  void*   pBuf = NULL;
129 130 131 132 133 134 135 136 137 138 139 140 141 142
  tEncodeSize(tEncodeSSubmitReq2, pReq, len, code);
  if (TSDB_CODE_SUCCESS == code) {
    SEncoder encoder;
    len += sizeof(SMsgHead);
    pBuf = taosMemoryMalloc(len);
    if (NULL == pBuf) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    ((SMsgHead*)pBuf)->vgId = htonl(vgId);
    ((SMsgHead*)pBuf)->contLen = htonl(len);
    tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead));
    code = tEncodeSSubmitReq2(&encoder, pReq);
    tEncoderClear(&encoder);
  }
D
dapan1121 已提交
143

144 145 146 147 148 149 150 151
  if (TSDB_CODE_SUCCESS == code) {
    *pData = pBuf;
    *pLen = len;
  } else {
    taosMemoryFree(pBuf);
  }
  return code;
}
D
dapan1121 已提交
152

X
Xiaoyu Wang 已提交
153 154
int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** ppReq, const SSDataBlock* pDataBlock,
                                const STSchema* pTSchema, int64_t uid, int32_t vgId, tb_uid_t suid) {
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
  SSubmitReq2* pReq = *ppReq;
  SArray*      pVals = NULL;
  int32_t      numOfBlks = 0;

  terrno = TSDB_CODE_SUCCESS;

  if (NULL == pReq) {
    if (!(pReq = taosMemoryMalloc(sizeof(SSubmitReq2)))) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _end;
    }

    if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      goto _end;
    }
D
dapan1121 已提交
171 172
  }

173 174
  int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
  int32_t rows = pDataBlock->info.rows;
D
dapan1121 已提交
175

176 177 178 179 180 181 182
  SSubmitTbData tbData = {0};
  if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
    goto _end;
  }
  tbData.suid = suid;
  tbData.uid = uid;
  tbData.sver = pTSchema->version;
D
dapan1121 已提交
183

184 185 186 187
  if (!pVals && !(pVals = taosArrayInit(colNum, sizeof(SColVal)))) {
    taosArrayDestroy(tbData.aRowP);
    goto _end;
  }
D
dapan1121 已提交
188

189 190 191
  int64_t lastTs = TSKEY_MIN;
  bool    ignoreRow = false;
  bool    disorderTs = false;
H
Hongze Cheng 已提交
192

193 194
  for (int32_t j = 0; j < rows; ++j) {  // iterate by row
    taosArrayClear(pVals);
D
dapan1121 已提交
195

196 197
    int32_t offset = 0;
    for (int32_t k = 0; k < pTSchema->numOfCols; ++k) {  // iterate by column
X
Xiaoyu Wang 已提交
198 199
      int16_t         colIdx = k;
      const STColumn* pCol = &pTSchema->columns[k];
200
      if (!pInserter->fullOrderColList) {
201 202 203
        int16_t* slotId = taosHashGet(pInserter->pCols, &pCol->colId, sizeof(pCol->colId));
        if (NULL == slotId) {
          continue;
D
dapan1121 已提交
204
        }
H
Hongze Cheng 已提交
205

206 207
        colIdx = *slotId;
      }
H
Hongze Cheng 已提交
208

209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
      SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, colIdx);
      void*            var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);

      switch (pColInfoData->info.type) {
        case TSDB_DATA_TYPE_NCHAR:
        case TSDB_DATA_TYPE_VARCHAR: {  // TSDB_DATA_TYPE_BINARY
          ASSERT(pColInfoData->info.type == pCol->type);
          if (colDataIsNull_s(pColInfoData, j)) {
            SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
            taosArrayPush(pVals, &cv);
          } else {
            void*   data = colDataGetVarData(pColInfoData, j);
            SValue  sv = (SValue){.nData = varDataLen(data), .pData = varDataVal(data)};  // address copy, no value
            SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
            taosArrayPush(pVals, &cv);
D
dapan1121 已提交
224
          }
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
          break;
        }
        case TSDB_DATA_TYPE_VARBINARY:
        case TSDB_DATA_TYPE_DECIMAL:
        case TSDB_DATA_TYPE_BLOB:
        case TSDB_DATA_TYPE_JSON:
        case TSDB_DATA_TYPE_MEDIUMBLOB:
          uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
          ASSERT(0);
          break;
        default:
          if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
            if (colDataIsNull_s(pColInfoData, j)) {
              SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);  // should use pCol->type
              taosArrayPush(pVals, &cv);
D
dapan1121 已提交
240
            } else {
241 242 243 244 245 246 247 248 249
              if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) {
                if (*(int64_t*)var == lastTs) {
                  ignoreRow = true;
                } else if (*(int64_t*)var < lastTs) {
                  disorderTs = true;
                } else {
                  lastTs = *(int64_t*)var;
                }
              }
X
Xiaoyu Wang 已提交
250

251 252 253 254
              SValue sv;
              memcpy(&sv.val, var, tDataTypes[pCol->type].bytes);
              SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv);
              taosArrayPush(pVals, &cv);
D
dapan1121 已提交
255
            }
256 257 258
          } else {
            uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
            ASSERT(0);
D
dapan1121 已提交
259
          }
260
          break;
D
dapan1121 已提交
261
      }
D
dapan1121 已提交
262 263

      if (ignoreRow) {
264
        break;
D
dapan1121 已提交
265
      }
D
dapan1121 已提交
266
    }
H
Hongze Cheng 已提交
267

268 269 270 271
    if (ignoreRow) {
      ignoreRow = false;
      continue;
    }
D
dapan1121 已提交
272

273 274 275 276 277 278 279
    SRow* pRow = NULL;
    if ((terrno = tRowBuild(pVals, pTSchema, &pRow)) < 0) {
      tDestroySSubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
      goto _end;
    }
    taosArrayPush(tbData.aRowP, &pRow);
  }
D
dapan1121 已提交
280

281 282 283 284 285
  if (disorderTs) {
    tRowSort(tbData.aRowP);
    if ((terrno = tRowMerge(tbData.aRowP, (STSchema*)pTSchema, 0)) != 0) {
      goto _end;
    }
D
dapan1121 已提交
286 287
  }

288
  taosArrayPush(pReq->aSubmitTbData, &tbData);
D
dapan1121 已提交
289

290 291 292 293 294 295 296 297 298 299 300
_end:
  taosArrayDestroy(pVals);
  if (terrno != 0) {
    *ppReq = NULL;
    if (pReq) {
      tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE);
      taosMemoryFree(pReq);
    }
    return TSDB_CODE_FAILED;
  }
  *ppReq = pReq;
D
dapan1121 已提交
301
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
302 303
}

304 305 306 307 308 309 310 311
int32_t dataBlocksToSubmitReq(SDataInserterHandle* pInserter, void** pMsg, int32_t* msgLen) {
  const SArray*   pBlocks = pInserter->pDataBlocks;
  const STSchema* pTSchema = pInserter->pSchema;
  int64_t         uid = pInserter->pNode->tableId;
  int64_t         suid = pInserter->pNode->stableId;
  int32_t         vgId = pInserter->pNode->vgId;
  int32_t         sz = taosArrayGetSize(pBlocks);
  int32_t         code = 0;
X
Xiaoyu Wang 已提交
312
  SSubmitReq2*    pReq = NULL;
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330

  for (int32_t i = 0; i < sz; i++) {
    SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);

    code = buildSubmitReqFromBlock(pInserter, &pReq, pDataBlock, pTSchema, uid, vgId, suid);
    if (code) {
      if (pReq) {
        tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE);
        taosMemoryFree(pReq);
      }

      return code;
    }
  }

  code = submitReqToMsg(vgId, pReq, pMsg, msgLen);
  tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE);
  taosMemoryFree(pReq);
X
Xiaoyu Wang 已提交
331

332 333 334
  return code;
}

D
dapan1121 已提交
335 336
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
X
Xiaoyu Wang 已提交
337 338 339 340 341 342 343 344
  if (!pInserter->explain) {
    taosArrayPush(pInserter->pDataBlocks, &pInput->pData);
    void*   pMsg = NULL;
    int32_t msgLen = 0;
    int32_t code = dataBlocksToSubmitReq(pInserter, &pMsg, &msgLen);
    if (code) {
      return code;
    }
D
dapan1121 已提交
345

X
Xiaoyu Wang 已提交
346
    taosArrayClear(pInserter->pDataBlocks);
X
Xiaoyu Wang 已提交
347

X
Xiaoyu Wang 已提交
348 349 350 351 352
    code = sendSubmitRequest(pInserter, pMsg, msgLen, pInserter->pParam->readHandle->pMsgCb->clientRpc,
                             &pInserter->pNode->epSet);
    if (code) {
      return code;
    }
D
dapan1121 已提交
353

X
Xiaoyu Wang 已提交
354
    tsem_wait(&pInserter->ready);
D
dapan1121 已提交
355

X
Xiaoyu Wang 已提交
356 357 358
    if (pInserter->submitRes.code) {
      return pInserter->submitRes.code;
    }
D
dapan1121 已提交
359
  }
D
dapan1121 已提交
360 361

  *pContinue = true;
H
Hongze Cheng 已提交
362

D
dapan1121 已提交
363 364 365
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
366 367 368 369 370 371 372 373
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
  taosThreadMutexLock(&pInserter->mutex);
  pInserter->queryEnd = true;
  pInserter->useconds = useconds;
  taosThreadMutexUnlock(&pInserter->mutex);
}

D
dapan1121 已提交
374 375 376
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd) {
  SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
  *pLen = pDispatcher->submitRes.affectedRows;
H
Hongze Cheng 已提交
377
  qDebug("got total affectedRows %" PRId64, *pLen);
D
dapan1121 已提交
378 379
}

D
dapan1121 已提交
380
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
D
dapan1121 已提交
381 382 383 384
  SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
  atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pInserter->cachedSize);
  taosArrayDestroy(pInserter->pDataBlocks);
  taosMemoryFree(pInserter->pSchema);
D
dapan1121 已提交
385 386
  taosMemoryFree(pInserter->pParam);
  taosHashCleanup(pInserter->pCols);
D
dapan1121 已提交
387
  taosThreadMutexDestroy(&pInserter->mutex);
D
dapan1121 已提交
388 389 390 391
  return TSDB_CODE_SUCCESS;
}

static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
D
dapan1121 已提交
392
  SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
D
dapan1121 已提交
393 394 395 396 397

  *size = atomic_load_64(&pDispatcher->cachedSize);
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
398 399
int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle,
                           void* pParam) {
D
dapan1121 已提交
400 401
  SDataInserterHandle* inserter = taosMemoryCalloc(1, sizeof(SDataInserterHandle));
  if (NULL == inserter) {
S
Shengliang Guan 已提交
402 403
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
404 405
  }

H
Hongze Cheng 已提交
406
  SQueryInserterNode* pInserterNode = (SQueryInserterNode*)pDataSink;
D
dapan1121 已提交
407 408
  inserter->sink.fPut = putDataBlock;
  inserter->sink.fEndPut = endPut;
D
dapan1121 已提交
409
  inserter->sink.fGetLen = getDataLength;
D
dapan1121 已提交
410
  inserter->sink.fGetData = NULL;
D
dapan1121 已提交
411 412 413
  inserter->sink.fDestroy = destroyDataSinker;
  inserter->sink.fGetCacheSize = getCacheSize;
  inserter->pManager = pManager;
D
dapan1121 已提交
414
  inserter->pNode = pInserterNode;
D
dapan1121 已提交
415 416 417
  inserter->pParam = pParam;
  inserter->status = DS_BUF_EMPTY;
  inserter->queryEnd = false;
X
Xiaoyu Wang 已提交
418
  inserter->explain = pInserterNode->explain;
D
dapan1121 已提交
419 420

  int64_t suid = 0;
H
Hongze Cheng 已提交
421 422
  int32_t code =
      tsdbGetTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid);
D
dapan1121 已提交
423
  if (code) {
D
dapan1121 已提交
424
    destroyDataSinker((SDataSinkHandle*)inserter);
D
dapan1121 已提交
425
    taosMemoryFree(inserter);
D
dapan1121 已提交
426 427 428
    return code;
  }

D
dapan1121 已提交
429
  if (pInserterNode->stableId != suid) {
D
dapan1121 已提交
430 431
    destroyDataSinker((SDataSinkHandle*)inserter);
    taosMemoryFree(inserter);
D
dapan1121 已提交
432 433 434 435 436
    terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
    return terrno;
  }

  inserter->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
D
dapan1121 已提交
437 438
  taosThreadMutexInit(&inserter->mutex, NULL);
  if (NULL == inserter->pDataBlocks) {
D
dapan1121 已提交
439 440
    destroyDataSinker((SDataSinkHandle*)inserter);
    taosMemoryFree(inserter);
S
Shengliang Guan 已提交
441 442
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
443
  }
D
dapan1121 已提交
444

445 446
  inserter->fullOrderColList = pInserterNode->pCols->length == inserter->pSchema->numOfCols;

H
Hongze Cheng 已提交
447 448
  inserter->pCols = taosHashInit(pInserterNode->pCols->length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT),
                                 false, HASH_NO_LOCK);
X
Xiaoyu Wang 已提交
449
  SNode*  pNode = NULL;
450
  int32_t i = 0;
D
dapan1121 已提交
451 452 453
  FOREACH(pNode, pInserterNode->pCols) {
    SColumnNode* pCol = (SColumnNode*)pNode;
    taosHashPut(inserter->pCols, &pCol->colId, sizeof(pCol->colId), &pCol->slotId, sizeof(pCol->slotId));
454 455 456
    if (inserter->fullOrderColList && pCol->colId != inserter->pSchema->columns[i].colId) {
      inserter->fullOrderColList = false;
    }
D
dapan1121 已提交
457
    ++i;
D
dapan1121 已提交
458
  }
H
Hongze Cheng 已提交
459

D
dapan1121 已提交
460
  tsem_init(&inserter->ready, 0, 0);
H
Hongze Cheng 已提交
461

D
dapan1121 已提交
462 463 464
  *pHandle = inserter;
  return TSDB_CODE_SUCCESS;
}