parInsertStmt.c 11.9 KB
Newer Older
X
Xiaoyu Wang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "os.h"
#include "parInsertUtil.h"
#include "parInt.h"
#include "parToken.h"
#include "query.h"
#include "tglobal.h"
#include "ttime.h"
#include "ttypes.h"

typedef struct SKvParam {
  int16_t  pos;
  SArray*  pTagVals;
  SSchema* schema;
  char     buf[TSDB_MAX_TAGS_LEN];
} SKvParam;

int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash) {
X
Xiaoyu Wang 已提交
33 34
  int32_t code = TSDB_CODE_SUCCESS;
  SArray* pVgDataBlocks = NULL;
D
dapan1121 已提交
35 36
  SVnodeModifOpStmt *pStmt = (SVnodeModifOpStmt*)pQuery->pRoot;
  
X
Xiaoyu Wang 已提交
37
  // merge according to vgId
X
Xiaoyu Wang 已提交
38
  if (taosHashGetSize(pBlockHash) > 0) {
D
dapan1121 已提交
39
    code = insMergeTableDataCxt(pBlockHash, &pVgDataBlocks);
X
Xiaoyu Wang 已提交
40
  }
X
Xiaoyu Wang 已提交
41
  if (TSDB_CODE_SUCCESS == code) {
D
dapan1121 已提交
42 43 44 45 46
    code = insBuildVgDataBlocks(pVgHash, pVgDataBlocks, &pStmt->pDataBlocks);
  }
  
  if (pStmt->freeArrayFunc) {
    pStmt->freeArrayFunc(pVgDataBlocks);
X
Xiaoyu Wang 已提交
47 48
  }
  return code;
X
Xiaoyu Wang 已提交
49 50 51 52
}

int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const char* sTableName, char* tName,
                           TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
D
dapan1121 已提交
53
  STableDataCxt*   pDataBlock = (STableDataCxt*)pBlock;
X
Xiaoyu Wang 已提交
54
  SMsgBuf             pBuf = {.buf = msgBuf, .len = msgBufLen};
D
dapan1121 已提交
55
  int32_t  code = TSDB_CODE_SUCCESS;
56
  SBoundColInfo* tags = (SBoundColInfo*)boundTags;
X
Xiaoyu Wang 已提交
57 58 59 60 61 62 63 64 65 66 67
  if (NULL == tags) {
    return TSDB_CODE_QRY_APP_ERROR;
  }

  SArray* pTagArray = taosArrayInit(tags->numOfBound, sizeof(STagVal));
  if (!pTagArray) {
    return buildInvalidOperationMsg(&pBuf, "out of memory");
  }

  SArray* tagName = taosArrayInit(8, TSDB_COL_NAME_LEN);
  if (!tagName) {
D
dapan1121 已提交
68 69
    code = buildInvalidOperationMsg(&pBuf, "out of memory");
    goto end;
X
Xiaoyu Wang 已提交
70 71
  }

D
dapan1121 已提交
72
  SSchema* pSchema = getTableTagSchema(pDataBlock->pMeta);
X
Xiaoyu Wang 已提交
73 74 75 76 77 78 79 80 81

  bool  isJson = false;
  STag* pTag = NULL;

  for (int c = 0; c < tags->numOfBound; ++c) {
    if (bind[c].is_null && bind[c].is_null[0]) {
      continue;
    }

82
    SSchema* pTagSchema = &pSchema[tags->pColIndex[c]];
X
Xiaoyu Wang 已提交
83 84 85
    int32_t  colLen = pTagSchema->bytes;
    if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
      colLen = bind[c].length[0];
D
dapan1121 已提交
86 87 88 89
      if ((colLen + VARSTR_HEADER_SIZE) > pTagSchema->bytes) {
        code = buildInvalidOperationMsg(&pBuf, "tag length is too big");
        goto end;
      }
X
Xiaoyu Wang 已提交
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
    }
    taosArrayPush(tagName, pTagSchema->name);
    if (pTagSchema->type == TSDB_DATA_TYPE_JSON) {
      if (colLen > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
        code = buildSyntaxErrMsg(&pBuf, "json string too long than 4095", bind[c].buffer);
        goto end;
      }

      isJson = true;
      char* tmp = taosMemoryCalloc(1, colLen + 1);
      memcpy(tmp, bind[c].buffer, colLen);
      code = parseJsontoTagData(tmp, pTagArray, &pTag, &pBuf);
      taosMemoryFree(tmp);
      if (code != TSDB_CODE_SUCCESS) {
        goto end;
      }
    } else {
      STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
      //      strcpy(val.colName, pTagSchema->name);
      if (pTagSchema->type == TSDB_DATA_TYPE_BINARY) {
        val.pData = (uint8_t*)bind[c].buffer;
        val.nData = colLen;
      } else if (pTagSchema->type == TSDB_DATA_TYPE_NCHAR) {
        int32_t output = 0;
        void*   p = taosMemoryCalloc(1, colLen * TSDB_NCHAR_SIZE);
        if (p == NULL) {
          code = TSDB_CODE_OUT_OF_MEMORY;
          goto end;
        }
        if (!taosMbsToUcs4(bind[c].buffer, colLen, (TdUcs4*)(p), colLen * TSDB_NCHAR_SIZE, &output)) {
          if (errno == E2BIG) {
            taosMemoryFree(p);
            code = generateSyntaxErrMsg(&pBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pTagSchema->name);
            goto end;
          }
          char buf[512] = {0};
          snprintf(buf, tListLen(buf), " taosMbsToUcs4 error:%s", strerror(errno));
          taosMemoryFree(p);
          code = buildSyntaxErrMsg(&pBuf, buf, bind[c].buffer);
          goto end;
        }
        val.pData = p;
        val.nData = output;
      } else {
        memcpy(&val.i64, bind[c].buffer, colLen);
      }
      taosArrayPush(pTagArray, &val);
    }
  }

  if (!isJson && (code = tTagNew(pTagArray, 1, false, &pTag)) != TSDB_CODE_SUCCESS) {
    goto end;
  }

D
dapan1121 已提交
144
  insBuildCreateTbReq(pDataBlock->pData->pCreateTbReq, tName, pTag, suid, sTableName, tagName, pDataBlock->pMeta->tableInfo.numOfTags, TSDB_DEFAULT_TABLE_TTL);
X
Xiaoyu Wang 已提交
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159

end:
  for (int i = 0; i < taosArrayGetSize(pTagArray); ++i) {
    STagVal* p = (STagVal*)taosArrayGet(pTagArray, i);
    if (p->type == TSDB_DATA_TYPE_NCHAR) {
      taosMemoryFreeClear(p->pData);
    }
  }
  taosArrayDestroy(pTagArray);
  taosArrayDestroy(tagName);

  return code;
}

int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
D
dapan1121 已提交
160 161 162
  STableDataCxt*      pDataBlock = (STableDataCxt*)pBlock;
  SSchema*            pSchema = getTableColumnSchema(pDataBlock->pMeta);
  SBoundColInfo*      boundInfo = &pDataBlock->boundColsInfo;
X
Xiaoyu Wang 已提交
163 164 165
  SMsgBuf             pBuf = {.buf = msgBuf, .len = msgBufLen};
  int32_t             rowNum = bind->num;

D
dapan1121 已提交
166 167 168
  for (int c = 0; c < boundInfo->numOfBound; ++c) {
    SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]];
    SColData* pCol = taosArrayGet(pDataBlock->pData->aCol, c);
X
Xiaoyu Wang 已提交
169

D
dapan1121 已提交
170 171
    if (bind[c].num != rowNum) {
      return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
X
Xiaoyu Wang 已提交
172
    }
D
dapan1121 已提交
173 174 175

    if (bind[c].buffer_type != pColSchema->type) {
      return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
X
Xiaoyu Wang 已提交
176
    }
D
dapan1121 已提交
177 178

    tColDataAddValueByBind(pCol, bind + c);
X
Xiaoyu Wang 已提交
179 180
  }

D
dapan1121 已提交
181 182
  qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum);

D
dapan1121 已提交
183
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
184 185 186 187
}

int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
                                int32_t rowNum) {
D
dapan1121 已提交
188 189 190
  STableDataCxt*      pDataBlock = (STableDataCxt*)pBlock;
  SSchema*            pSchema = getTableColumnSchema(pDataBlock->pMeta);
  SBoundColInfo*      boundInfo = &pDataBlock->boundColsInfo;
X
Xiaoyu Wang 已提交
191
  SMsgBuf             pBuf = {.buf = msgBuf, .len = msgBufLen};
D
dapan1121 已提交
192 193 194
  SSchema*            pColSchema = &pSchema[boundInfo->pColIndex[colIdx]];
  SColData*           pCol = taosArrayGet(pDataBlock->pData->aCol, colIdx);
  
D
dapan1121 已提交
195
  if (bind->num != rowNum) {
D
dapan1121 已提交
196
    return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
X
Xiaoyu Wang 已提交
197
  }
D
dapan1121 已提交
198
  
D
dapan1121 已提交
199
  if (bind->buffer_type != pColSchema->type) {
D
dapan1121 已提交
200
    return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
X
Xiaoyu Wang 已提交
201
  }
D
dapan1121 已提交
202 203
  
  tColDataAddValueByBind(pCol, bind);
X
Xiaoyu Wang 已提交
204

D
dapan1121 已提交
205 206
  qDebug("stmt col %d bind %d rows data", colIdx, rowNum);

X
Xiaoyu Wang 已提交
207 208 209
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
210
int32_t buildBoundFields(int32_t numOfBound, int16_t* boundColumns, SSchema* pSchema, int32_t* fieldNum, TAOS_FIELD_E** fields,
X
Xiaoyu Wang 已提交
211 212
                         uint8_t timePrec) {
  if (fields) {
D
dapan1121 已提交
213
    *fields = taosMemoryCalloc(numOfBound, sizeof(TAOS_FIELD));
X
Xiaoyu Wang 已提交
214 215 216 217
    if (NULL == *fields) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }

D
dapan1121 已提交
218
    SSchema* schema = &pSchema[boundColumns[0]];
X
Xiaoyu Wang 已提交
219 220 221 222
    if (TSDB_DATA_TYPE_TIMESTAMP == schema->type) {
      (*fields)[0].precision = timePrec;
    }

D
dapan1121 已提交
223 224
    for (int32_t i = 0; i < numOfBound; ++i) {
      schema = &pSchema[boundColumns[i]];
X
Xiaoyu Wang 已提交
225 226 227 228 229 230
      strcpy((*fields)[i].name, schema->name);
      (*fields)[i].type = schema->type;
      (*fields)[i].bytes = schema->bytes;
    }
  }

D
dapan1121 已提交
231
  *fieldNum = numOfBound;
X
Xiaoyu Wang 已提交
232 233 234 235 236

  return TSDB_CODE_SUCCESS;
}

int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD_E** fields) {
D
dapan1121 已提交
237
  STableDataCxt*   pDataBlock = (STableDataCxt*)pBlock;
238
  SBoundColInfo* tags = (SBoundColInfo*)boundTags;
X
Xiaoyu Wang 已提交
239 240 241 242
  if (NULL == tags) {
    return TSDB_CODE_QRY_APP_ERROR;
  }

D
dapan1121 已提交
243
  if (pDataBlock->pMeta->tableType != TSDB_SUPER_TABLE && pDataBlock->pMeta->tableType != TSDB_CHILD_TABLE) {
X
Xiaoyu Wang 已提交
244 245 246
    return TSDB_CODE_TSC_STMT_API_ERROR;
  }

D
dapan1121 已提交
247
  SSchema* pSchema = getTableTagSchema(pDataBlock->pMeta);
X
Xiaoyu Wang 已提交
248 249 250 251 252 253 254
  if (tags->numOfBound <= 0) {
    *fieldNum = 0;
    *fields = NULL;

    return TSDB_CODE_SUCCESS;
  }

255
  CHECK_CODE(buildBoundFields(tags->numOfBound, tags->pColIndex, pSchema, fieldNum, fields, 0));
X
Xiaoyu Wang 已提交
256 257 258 259 260

  return TSDB_CODE_SUCCESS;
}

int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD_E** fields) {
D
dapan1121 已提交
261 262 263
  STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
  SSchema*       pSchema = getTableColumnSchema(pDataBlock->pMeta);
  if (pDataBlock->boundColsInfo.numOfBound <= 0) {
X
Xiaoyu Wang 已提交
264 265 266 267 268 269 270 271
    *fieldNum = 0;
    if (fields) {
      *fields = NULL;
    }

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
272
  CHECK_CODE(buildBoundFields(pDataBlock->boundColsInfo.numOfBound, pDataBlock->boundColsInfo.pColIndex, pSchema, fieldNum, fields,
D
dapan1121 已提交
273
                              pDataBlock->pMeta->tableInfo.precision));
X
Xiaoyu Wang 已提交
274 275 276 277

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
278 279 280
int32_t qResetStmtDataBlock(void* block, bool deepClear) {
  STableDataCxt* pBlock = (STableDataCxt*)block;
  int32_t colNum = taosArrayGetSize(pBlock->pData->aCol);
X
Xiaoyu Wang 已提交
281

D
dapan1121 已提交
282 283 284 285 286 287
  for (int32_t i = 0; i < colNum; ++i) {
    SColData *pCol = (SColData*)taosArrayGet(pBlock->pData->aCol, i);
    if (deepClear) {
      tColDataDeepClear(pCol);
    } else {
      tColDataClear(pCol);
X
Xiaoyu Wang 已提交
288 289 290 291 292 293
    }
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
294 295 296 297
int32_t qCloneStmtDataBlock(void** pDst, void* pSrc, bool reset) {
  int32_t code = 0;
  
  *pDst = taosMemoryCalloc(1, sizeof(STableDataCxt));
X
Xiaoyu Wang 已提交
298 299 300 301
  if (NULL == *pDst) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

D
dapan1121 已提交
302 303 304 305
  STableDataCxt* pNewCxt = (STableDataCxt*)*pDst;
  STableDataCxt* pCxt = (STableDataCxt*)pSrc;
  pNewCxt->pSchema = NULL;
  pNewCxt->pValues = NULL;
X
Xiaoyu Wang 已提交
306

D
dapan1121 已提交
307 308
  if (pCxt->pMeta) {
    void* pNewMeta = taosMemoryMalloc(TABLE_META_SIZE(pCxt->pMeta));
X
Xiaoyu Wang 已提交
309
    if (NULL == pNewMeta) {
D
dapan1121 已提交
310
      insDestroyTableDataCxt(*pDst);
X
Xiaoyu Wang 已提交
311 312
      return TSDB_CODE_OUT_OF_MEMORY;
    }
D
dapan1121 已提交
313 314
    memcpy(pNewMeta, pCxt->pMeta, TABLE_META_SIZE(pCxt->pMeta));
    pNewCxt->pMeta = pNewMeta;
X
Xiaoyu Wang 已提交
315 316
  }

D
dapan1121 已提交
317 318 319 320 321 322 323 324 325 326 327
  memcpy(&pNewCxt->boundColsInfo, &pCxt->boundColsInfo, sizeof(pCxt->boundColsInfo));
  pNewCxt->boundColsInfo.pColIndex = NULL;
  
  if (pCxt->boundColsInfo.pColIndex) {
    void* pNewColIdx = taosMemoryMalloc(pCxt->boundColsInfo.numOfBound * sizeof(*pCxt->boundColsInfo.pColIndex));
    if (NULL == pNewColIdx) {
      insDestroyTableDataCxt(*pDst);
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    memcpy(pNewColIdx, pCxt->boundColsInfo.pColIndex, pCxt->boundColsInfo.numOfBound * sizeof(*pCxt->boundColsInfo.pColIndex));
    pNewCxt->boundColsInfo.pColIndex = pNewColIdx;
X
Xiaoyu Wang 已提交
328 329
  }

D
dapan1121 已提交
330 331 332 333 334 335
  if (pCxt->pData) {
    SSubmitTbData *pNewTb = (SSubmitTbData*)taosMemoryMalloc(sizeof(SSubmitTbData));
    if (NULL == pNewTb) {
      insDestroyTableDataCxt(*pDst);
      return TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
336

D
dapan1121 已提交
337 338
    memcpy(pNewTb, pCxt->pData, sizeof(*pCxt->pData));
    pNewTb->pCreateTbReq = NULL;
X
Xiaoyu Wang 已提交
339

D
dapan1121 已提交
340 341 342 343 344
    pNewTb->aCol = taosArrayDup(pCxt->pData->aCol, NULL);
    if (NULL == pNewTb) {
      insDestroyTableDataCxt(*pDst);
      return TSDB_CODE_OUT_OF_MEMORY;
    }
D
dapan1121 已提交
345 346

    pNewCxt->pData = pNewTb;
D
dapan1121 已提交
347 348 349 350
    
    if (reset) {
      code = qResetStmtDataBlock(*pDst, true);
    }
X
Xiaoyu Wang 已提交
351 352
  }

D
dapan1121 已提交
353 354
  
  return code;
X
Xiaoyu Wang 已提交
355 356
}

D
dapan1121 已提交
357 358 359 360 361
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid, int32_t vgId) {
  int32_t code = qCloneStmtDataBlock(pDst, pSrc, false);
  if (code) {
    return code;
  }
X
Xiaoyu Wang 已提交
362

D
dapan1121 已提交
363 364 365 366
  STableDataCxt* pBlock = (STableDataCxt*)*pDst;
  if (pBlock->pMeta) {
    pBlock->pMeta->uid = uid;
    pBlock->pMeta->vgId = vgId;
X
Xiaoyu Wang 已提交
367 368
  }

D
dapan1121 已提交
369
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
370 371
}

D
dapan1121 已提交
372 373
STableMeta* qGetTableMetaInDataBlock(void* pDataBlock) { return ((STableDataCxt*)pDataBlock)->pMeta; }

X
Xiaoyu Wang 已提交
374 375 376 377 378
void qDestroyStmtDataBlock(void* pBlock) {
  if (pBlock == NULL) {
    return;
  }

D
dapan1121 已提交
379 380
  STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
  insDestroyTableDataCxt(pDataBlock);
X
Xiaoyu Wang 已提交
381
}