parInsertStmt.c 11.8 KB
Newer Older
X
Xiaoyu Wang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "os.h"
#include "parInsertUtil.h"
#include "parInt.h"
#include "parToken.h"
#include "query.h"
#include "tglobal.h"
#include "ttime.h"
#include "ttypes.h"

typedef struct SKvParam {
  int16_t  pos;
  SArray*  pTagVals;
  SSchema* schema;
  char     buf[TSDB_MAX_TAGS_LEN];
} SKvParam;

int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) {
X
Xiaoyu Wang 已提交
33 34
  int32_t code = TSDB_CODE_SUCCESS;
  SArray* pVgDataBlocks = NULL;
D
dapan1121 已提交
35 36
  SVnodeModifOpStmt *pStmt = (SVnodeModifOpStmt*)pQuery->pRoot;
  
X
Xiaoyu Wang 已提交
37
  // merge according to vgId
X
Xiaoyu Wang 已提交
38
  if (taosHashGetSize(pBlockHash) > 0) {
D
dapan1121 已提交
39
    code = insMergeTableDataCxt(pBlockHash, &pVgDataBlocks);
X
Xiaoyu Wang 已提交
40
  }
X
Xiaoyu Wang 已提交
41
  if (TSDB_CODE_SUCCESS == code) {
D
dapan1121 已提交
42 43 44 45 46
    code = insBuildVgDataBlocks(pVgHash, pVgDataBlocks, &pStmt->pDataBlocks);
  }
  
  if (pStmt->freeArrayFunc) {
    pStmt->freeArrayFunc(pVgDataBlocks);
X
Xiaoyu Wang 已提交
47 48
  }
  return code;
X
Xiaoyu Wang 已提交
49 50 51 52
}

int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName,
                           TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
D
dapan1121 已提交
53
  STableDataCxt*   pDataBlock = (STableDataCxt*)pBlock;
X
Xiaoyu Wang 已提交
54
  SMsgBuf             pBuf = {.buf = msgBuf, .len = msgBufLen};
D
dapan1121 已提交
55
  int32_t  code = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
56 57 58 59 60 61 62 63 64 65 66 67
  SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
  if (NULL == tags) {
    return TSDB_CODE_QRY_APP_ERROR;
  }

  SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal));
  if (!pTagArray) {
    return buildInvalidOperationMsg(&pBuf, "out of memory");
  }

  SArray* tagName = taosArrayInit(8, TSDB_COL_NAME_LEN);
  if (!tagName) {
D
dapan1121 已提交
68 69
    code = buildInvalidOperationMsg(&pBuf, "out of memory");
    goto end;
X
Xiaoyu Wang 已提交
70 71
  }

D
dapan1121 已提交
72
  SSchema* pSchema = getTableTagSchema(pDataBlock->pMeta);
X
Xiaoyu Wang 已提交
73 74 75 76 77 78 79 80 81 82 83 84 85

  bool  isJson = false;
  STag* pTag = NULL;

  for (int c = 0; c < tags->numOfBound; ++c) {
    if (bind[c].is_null && bind[c].is_null[0]) {
      continue;
    }

    SSchema* pTagSchema = &pSchema[tags->boundColumns[c]];
    int32_t  colLen = pTagSchema->bytes;
    if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
      colLen = bind[c].length[0];
D
dapan1121 已提交
86 87 88 89
      if ((colLen + VARSTR_HEADER_SIZE) > pTagSchema->bytes) {
        code = buildInvalidOperationMsg(&pBuf, "tag length is too big");
        goto end;
      }
X
Xiaoyu Wang 已提交
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
    }
    taosArrayPush(tagName, pTagSchema->name);
    if (pTagSchema->type == TSDB_DATA_TYPE_JSON) {
      if (colLen > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
        code = buildSyntaxErrMsg(&pBuf, "json string too long than 4095", bind[c].buffer);
        goto end;
      }

      isJson = true;
      char* tmp = taosMemoryCalloc(1, colLen + 1);
      memcpy(tmp, bind[c].buffer, colLen);
      code = parseJsontoTagData(tmp, pTagArray, &pTag, &pBuf);
      taosMemoryFree(tmp);
      if (code != TSDB_CODE_SUCCESS) {
        goto end;
      }
    } else {
      STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
      //      strcpy(val.colName, pTagSchema->name);
      if (pTagSchema->type == TSDB_DATA_TYPE_BINARY) {
        val.pData = (uint8_t*)bind[c].buffer;
        val.nData = colLen;
      } else if (pTagSchema->type == TSDB_DATA_TYPE_NCHAR) {
        int32_t output = 0;
        void*   p = taosMemoryCalloc(1, colLen * TSDB_NCHAR_SIZE);
        if (p == NULL) {
          code = TSDB_CODE_OUT_OF_MEMORY;
          goto end;
        }
        if (!taosMbsToUcs4(bind[c].buffer, colLen, (TdUcs4*)(p), colLen * TSDB_NCHAR_SIZE, &output)) {
          if (errno == E2BIG) {
            taosMemoryFree(p);
            code = generateSyntaxErrMsg(&pBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pTagSchema->name);
            goto end;
          }
          char buf[512] = {0};
          snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno));
          taosMemoryFree(p);
          code = buildSyntaxErrMsg(&pBuf, buf, bind[c].buffer);
          goto end;
        }
        val.pData = p;
        val.nData = output;
      } else {
        memcpy(&val.i64, bind[c].buffer, colLen);
      }
      taosArrayPush(pTagArray, &val);
    }
  }

  if (!isJson && (code = tTagNew(pTagArray, 1, false, &pTag)) != TSDB_CODE_SUCCESS) {
    goto end;
  }

D
dapan1121 已提交
144
  insBuildCreateTbReq(pDataBlock->pData->pCreateTbReq, tName, pTag, suid, sTableName, tagName, pDataBlock->pMeta->tableInfo.numOfTags);
X
Xiaoyu Wang 已提交
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159

end:
  for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) {
    STagVal* p = (STagVal*)taosArrayGet(pTagArray, i);
    if (p->type == TSDB_DATA_TYPE_NCHAR) {
      taosMemoryFreeClear(p->pData);
    }
  }
  taosArrayDestroy(pTagArray);
  taosArrayDestroy(tagName);

  return code;
}

int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
D
dapan1121 已提交
160 161 162
  STableDataCxt*      pDataBlock = (STableDataCxt*)pBlock;
  SSchema*            pSchema = getTableColumnSchema(pDataBlock->pMeta);
  SBoundColInfo*      boundInfo = &pDataBlock->boundColsInfo;
X
Xiaoyu Wang 已提交
163 164 165
  SMsgBuf             pBuf = {.buf = msgBuf, .len = msgBufLen};
  int32_t             rowNum = bind->num;

D
dapan1121 已提交
166 167 168
  for (int c = 0; c < boundInfo->numOfBound; ++c) {
    SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]];
    SColData* pCol = taosArrayGet(pDataBlock->pData->aCol, c);
X
Xiaoyu Wang 已提交
169

D
dapan1121 已提交
170 171
    if (bind[c].num != rowNum) {
      return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
X
Xiaoyu Wang 已提交
172
    }
D
dapan1121 已提交
173 174 175

    if (bind[c].buffer_type != pColSchema->type) {
      return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
X
Xiaoyu Wang 已提交
176
    }
D
dapan1121 已提交
177 178

    tColDataAddValueByBind(pCol, bind + c);
X
Xiaoyu Wang 已提交
179 180
  }

D
dapan1121 已提交
181
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
182 183 184 185
}

int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
                                int32_t rowNum) {
D
dapan1121 已提交
186 187 188
  STableDataCxt*      pDataBlock = (STableDataCxt*)pBlock;
  SSchema*            pSchema = getTableColumnSchema(pDataBlock->pMeta);
  SBoundColInfo*      boundInfo = &pDataBlock->boundColsInfo;
X
Xiaoyu Wang 已提交
189
  SMsgBuf             pBuf = {.buf = msgBuf, .len = msgBufLen};
D
dapan1121 已提交
190 191 192 193 194
  SSchema*            pColSchema = &pSchema[boundInfo->pColIndex[colIdx]];
  SColData*           pCol = taosArrayGet(pDataBlock->pData->aCol, colIdx);
  
  if (bind[colIdx].num != rowNum) {
    return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
X
Xiaoyu Wang 已提交
195
  }
D
dapan1121 已提交
196 197 198
  
  if (bind[colIdx].buffer_type != pColSchema->type) {
    return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
X
Xiaoyu Wang 已提交
199
  }
D
dapan1121 已提交
200 201
  
  tColDataAddValueByBind(pCol, bind);
X
Xiaoyu Wang 已提交
202 203 204 205

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
206
int32_t buildBoundFields(int32_t numOfBound, int16_t boundColumns, SSchema* pSchema, int32_t* fieldNum, TAOS_FIELD_E** fields,
X
Xiaoyu Wang 已提交
207 208
                         uint8_t timePrec) {
  if (fields) {
D
dapan1121 已提交
209
    *fields = taosMemoryCalloc(numOfBound, sizeof(TAOS_FIELD));
X
Xiaoyu Wang 已提交
210 211 212 213
    if (NULL == *fields) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

D
dapan1121 已提交
214
    SSchema* schema = &pSchema[boundColumns[0]];
X
Xiaoyu Wang 已提交
215 216 217 218
    if (TSDB_DATA_TYPE_TIMESTAMP == schema->type) {
      (*fields)[0].precision = timePrec;
    }

D
dapan1121 已提交
219 220
    for (int32_t i = 0; i < numOfBound; ++i) {
      schema = &pSchema[boundColumns[i]];
X
Xiaoyu Wang 已提交
221 222 223 224 225 226
      strcpy((*fields)[i].name, schema->name);
      (*fields)[i].type = schema->type;
      (*fields)[i].bytes = schema->bytes;
    }
  }

D
dapan1121 已提交
227
  *fieldNum = numOfBound;
X
Xiaoyu Wang 已提交
228 229 230 231 232

  return TSDB_CODE_SUCCESS;
}

int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD_E** fields) {
D
dapan1121 已提交
233
  STableDataCxt*   pDataBlock = (STableDataCxt*)pBlock;
X
Xiaoyu Wang 已提交
234 235 236 237 238
  SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
  if (NULL == tags) {
    return TSDB_CODE_QRY_APP_ERROR;
  }

D
dapan1121 已提交
239
  if (pDataBlock->pMeta->tableType != TSDB_SUPER_TABLE && pDataBlock->pMeta->tableType != TSDB_CHILD_TABLE) {
X
Xiaoyu Wang 已提交
240 241 242
    return TSDB_CODE_TSC_STMT_API_ERROR;
  }

D
dapan1121 已提交
243
  SSchema* pSchema = getTableTagSchema(pDataBlock->pMeta);
X
Xiaoyu Wang 已提交
244 245 246 247 248 249 250
  if (tags->numOfBound <= 0) {
    *fieldNum = 0;
    *fields = NULL;

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
251
  CHECK_CODE(buildBoundFields(tags->numOfBound, tags->boundColumns, pSchema, fieldNum, fields, 0));
X
Xiaoyu Wang 已提交
252 253 254 255 256

  return TSDB_CODE_SUCCESS;
}

int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD_E** fields) {
D
dapan1121 已提交
257 258 259
  STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
  SSchema*       pSchema = getTableColumnSchema(pDataBlock->pMeta);
  if (pDataBlock->boundColsInfo.numOfBound <= 0) {
X
Xiaoyu Wang 已提交
260 261 262 263 264 265 266 267
    *fieldNum = 0;
    if (fields) {
      *fields = NULL;
    }

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
268 269
  CHECK_CODE(buildBoundFields(&pDataBlock->boundColsInfo.numOfBound, pDataBlock->boundColsInfo.pColIndex, pSchema, fieldNum, fields,
                              pDataBlock->pMeta->tableInfo.precision));
X
Xiaoyu Wang 已提交
270 271 272 273

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
274 275 276
int32_t qResetStmtDataBlock(void* block, bool deepClear) {
  STableDataCxt* pBlock = (STableDataCxt*)block;
  int32_t colNum = taosArrayGetSize(pBlock->pData->aCol);
X
Xiaoyu Wang 已提交
277

D
dapan1121 已提交
278 279 280 281 282 283
  for (int32_t i = 0; i < colNum; ++i) {
    SColData *pCol = (SColData*)taosArrayGet(pBlock->pData->aCol, i);
    if (deepClear) {
      tColDataDeepClear(pCol);
    } else {
      tColDataClear(pCol);
X
Xiaoyu Wang 已提交
284 285 286 287 288 289
    }
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
290 291 292 293
int32_t qCloneStmtDataBlock(void** pDst, void* pSrc, bool reset) {
  int32_t code = 0;
  
  *pDst = taosMemoryCalloc(1, sizeof(STableDataCxt));
X
Xiaoyu Wang 已提交
294 295 296 297
  if (NULL == *pDst) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

D
dapan1121 已提交
298 299 300 301
  STableDataCxt* pNewCxt = (STableDataCxt*)*pDst;
  STableDataCxt* pCxt = (STableDataCxt*)pSrc;
  pNewCxt->pSchema = NULL;
  pNewCxt->pValues = NULL;
X
Xiaoyu Wang 已提交
302

D
dapan1121 已提交
303 304
  if (pCxt->pMeta) {
    void* pNewMeta = taosMemoryMalloc(TABLE_META_SIZE(pCxt->pMeta));
X
Xiaoyu Wang 已提交
305
    if (NULL == pNewMeta) {
D
dapan1121 已提交
306
      insDestroyTableDataCxt(*pDst);
X
Xiaoyu Wang 已提交
307 308
      return TSDB_CODE_OUT_OF_MEMORY;
    }
D
dapan1121 已提交
309 310
    memcpy(pNewMeta, pCxt->pMeta, TABLE_META_SIZE(pCxt->pMeta));
    pNewCxt->pMeta = pNewMeta;
X
Xiaoyu Wang 已提交
311 312
  }

D
dapan1121 已提交
313 314 315 316 317 318 319 320 321 322 323
  memcpy(&pNewCxt->boundColsInfo, &pCxt->boundColsInfo, sizeof(pCxt->boundColsInfo));
  pNewCxt->boundColsInfo.pColIndex = NULL;
  
  if (pCxt->boundColsInfo.pColIndex) {
    void* pNewColIdx = taosMemoryMalloc(pCxt->boundColsInfo.numOfBound * sizeof(*pCxt->boundColsInfo.pColIndex));
    if (NULL == pNewColIdx) {
      insDestroyTableDataCxt(*pDst);
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    memcpy(pNewColIdx, pCxt->boundColsInfo.pColIndex, pCxt->boundColsInfo.numOfBound * sizeof(*pCxt->boundColsInfo.pColIndex));
    pNewCxt->boundColsInfo.pColIndex = pNewColIdx;
X
Xiaoyu Wang 已提交
324 325
  }

D
dapan1121 已提交
326 327 328 329 330 331
  if (pCxt->pData) {
    SSubmitTbData *pNewTb = (SSubmitTbData*)taosMemoryMalloc(sizeof(SSubmitTbData));
    if (NULL == pNewTb) {
      insDestroyTableDataCxt(*pDst);
      return TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
332

D
dapan1121 已提交
333 334
    memcpy(pNewTb, pCxt->pData, sizeof(*pCxt->pData));
    pNewTb->pCreateTbReq = NULL;
X
Xiaoyu Wang 已提交
335

D
dapan1121 已提交
336 337 338 339 340 341 342 343 344 345 346
    pNewTb->aCol = taosArrayDup(pCxt->pData->aCol, NULL);
    if (NULL == pNewTb) {
      insDestroyTableDataCxt(*pDst);
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    
    if (reset) {
      code = qResetStmtDataBlock(*pDst, true);
    }
    
    pNewCxt->pData = pNewTb;
X
Xiaoyu Wang 已提交
347 348
  }

D
dapan1121 已提交
349 350
  
  return code;
X
Xiaoyu Wang 已提交
351 352
}

D
dapan1121 已提交
353 354 355 356 357
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgId) {
  int32_t code = qCloneStmtDataBlock(pDst, pSrc, false);
  if (code) {
    return code;
  }
X
Xiaoyu Wang 已提交
358

D
dapan1121 已提交
359 360 361 362
  STableDataCxt* pBlock = (STableDataCxt*)*pDst;
  if (pBlock->pMeta) {
    pBlock->pMeta->uid = uid;
    pBlock->pMeta->vgId = vgId;
X
Xiaoyu Wang 已提交
363 364
  }

D
dapan1121 已提交
365
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
366 367
}

D
dapan1121 已提交
368 369
STableMeta* qGetTableMetaInDataBlock(void* pDataBlock) { return ((STableDataCxt*)pDataBlock)->pMeta; }

X
Xiaoyu Wang 已提交
370 371 372 373 374
void qDestroyStmtDataBlock(void* pBlock) {
  if (pBlock == NULL) {
    return;
  }

D
dapan1121 已提交
375 376
  STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
  insDestroyTableDataCxt(pDataBlock);
X
Xiaoyu Wang 已提交
377
}