clientStmt.c 33.6 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5

#include "clientInt.h"
#include "clientLog.h"
#include "tdef.h"

X
Xiaoyu Wang 已提交
6 7
#include "clientStmt.h"

dengyihao's avatar
dengyihao 已提交
8 9
char* gStmtStatusStr[] = {"unknown",     "init", "prepare", "settbname", "settags",
                          "fetchFields", "bind", "bindCol", "addBatch",  "exec"};
10

11
static int32_t stmtCreateRequest(STscStmt* pStmt) {
D
dapan1121 已提交
12
  int32_t code = 0;
H
Hongze Cheng 已提交
13 14

  if (pStmt->exec.pRequest == NULL) {
dengyihao's avatar
dengyihao 已提交
15 16
    code = buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false, &pStmt->exec.pRequest,
                        pStmt->reqid);
dengyihao's avatar
dengyihao 已提交
17
    if (pStmt->reqid != 0) {
dengyihao's avatar
dengyihao 已提交
18 19
      pStmt->reqid++;
    }
D
dapan1121 已提交
20 21 22
    if (TSDB_CODE_SUCCESS == code) {
      pStmt->exec.pRequest->syncQuery = true;
    }
23
  }
D
dapan1121 已提交
24 25

  return code;
26 27
}

D
stmt  
dapan1121 已提交
28
int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) {
D
dapan1121 已提交
29
  int32_t code = 0;
X
Xiaoyu Wang 已提交
30

31 32 33 34
  if (newStatus >= STMT_INIT && newStatus < STMT_MAX) {
    STMT_LOG_SEQ(newStatus);
  }

D
dapan1121 已提交
35 36 37 38 39
  if (pStmt->errCode && newStatus != STMT_PREPARE) {
    STMT_DLOG("stmt already failed with err: %s", tstrerror(pStmt->errCode));
    return pStmt->errCode;
  }

D
stmt  
dapan1121 已提交
40
  switch (newStatus) {
D
dapan1121 已提交
41
    case STMT_PREPARE:
D
dapan1121 已提交
42
      pStmt->errCode = 0;    
D
dapan1121 已提交
43
      break;
D
stmt  
dapan1121 已提交
44
    case STMT_SETTBNAME:
D
dapan1121 已提交
45
      if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND) || STMT_STATUS_EQ(BIND_COL)) {
D
dapan1121 已提交
46 47 48 49
        code = TSDB_CODE_TSC_STMT_API_ERROR;
      }
      break;
    case STMT_SETTAGS:
D
dapan1121 已提交
50
      if (STMT_STATUS_NE(SETTBNAME) && STMT_STATUS_NE(FETCH_FIELDS)) {
D
dapan1121 已提交
51 52 53 54 55 56 57 58 59 60 61 62
        code = TSDB_CODE_TSC_STMT_API_ERROR;
      }
      break;
    case STMT_FETCH_FIELDS:
      if (STMT_STATUS_EQ(INIT)) {
        code = TSDB_CODE_TSC_STMT_API_ERROR;
      }
      break;
    case STMT_BIND:
      if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND_COL)) {
        code = TSDB_CODE_TSC_STMT_API_ERROR;
      }
X
Xiaoyu Wang 已提交
63 64 65 66 67
      /*
            if ((pStmt->sql.type == STMT_TYPE_MULTI_INSERT) && ()) {
              code = TSDB_CODE_TSC_STMT_API_ERROR;
            }
      */
D
dapan1121 已提交
68 69 70 71 72 73 74 75 76 77
      break;
    case STMT_BIND_COL:
      if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND)) {
        code = TSDB_CODE_TSC_STMT_API_ERROR;
      }
      break;
    case STMT_ADD_BATCH:
      if (STMT_STATUS_NE(BIND) && STMT_STATUS_NE(BIND_COL) && STMT_STATUS_NE(FETCH_FIELDS)) {
        code = TSDB_CODE_TSC_STMT_API_ERROR;
      }
D
stmt  
dapan1121 已提交
78
      break;
D
dapan1121 已提交
79
    case STMT_EXECUTE:
D
dapan1121 已提交
80
      if (STMT_TYPE_QUERY == pStmt->sql.type) {
X
Xiaoyu Wang 已提交
81 82
        if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS) && STMT_STATUS_NE(BIND) &&
            STMT_STATUS_NE(BIND_COL)) {
D
dapan1121 已提交
83 84 85 86 87 88
          code = TSDB_CODE_TSC_STMT_API_ERROR;
        }
      } else {
        if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS)) {
          code = TSDB_CODE_TSC_STMT_API_ERROR;
        }
D
dapan1121 已提交
89
      }
D
dapan1121 已提交
90
      break;
D
stmt  
dapan1121 已提交
91
    default:
S
Shengliang Guan 已提交
92
      code = TSDB_CODE_APP_ERROR;
D
stmt  
dapan1121 已提交
93 94 95
      break;
  }

D
dapan1121 已提交
96
  STMT_ERR_RET(code);
D
stmt  
dapan1121 已提交
97 98 99 100 101 102

  pStmt->sql.status = newStatus;

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
103
int32_t stmtGetTbName(TAOS_STMT* stmt, char** tbName) {
D
stmt  
dapan1121 已提交
104 105
  STscStmt* pStmt = (STscStmt*)stmt;

D
stmt  
dapan1121 已提交
106
  pStmt->sql.type = STMT_TYPE_MULTI_INSERT;
107

D
dapan 已提交
108
  if ('\0' == pStmt->bInfo.tbName[0]) {
D
stmt  
dapan1121 已提交
109 110 111 112
    tscError("no table name set");
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_TBNAME_ERROR);
  }

D
stmt  
dapan1121 已提交
113
  *tbName = pStmt->bInfo.tbName;
D
stmt  
dapan1121 已提交
114 115 116 117

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
118
int32_t stmtBackupQueryFields(STscStmt* pStmt) {
X
Xiaoyu Wang 已提交
119
  SStmtQueryResInfo* pRes = &pStmt->sql.queryRes;
D
dapan1121 已提交
120 121
  pRes->numOfCols = pStmt->exec.pRequest->body.resInfo.numOfCols;
  pRes->precision = pStmt->exec.pRequest->body.resInfo.precision;
X
Xiaoyu Wang 已提交
122

D
dapan1121 已提交
123 124 125 126
  int32_t size = pRes->numOfCols * sizeof(TAOS_FIELD);
  pRes->fields = taosMemoryMalloc(size);
  pRes->userFields = taosMemoryMalloc(size);
  if (NULL == pRes->fields || NULL == pRes->userFields) {
S
Shengliang Guan 已提交
127
    STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
128
  }
D
dapan1121 已提交
129 130 131 132 133 134 135
  memcpy(pRes->fields, pStmt->exec.pRequest->body.resInfo.fields, size);
  memcpy(pRes->userFields, pStmt->exec.pRequest->body.resInfo.userFields, size);

  return TSDB_CODE_SUCCESS;
}

int32_t stmtRestoreQueryFields(STscStmt* pStmt) {
X
Xiaoyu Wang 已提交
136 137 138
  SStmtQueryResInfo* pRes = &pStmt->sql.queryRes;
  int32_t            size = pRes->numOfCols * sizeof(TAOS_FIELD);

D
dapan1121 已提交
139 140 141 142 143 144
  pStmt->exec.pRequest->body.resInfo.numOfCols = pRes->numOfCols;
  pStmt->exec.pRequest->body.resInfo.precision = pRes->precision;

  if (NULL == pStmt->exec.pRequest->body.resInfo.fields) {
    pStmt->exec.pRequest->body.resInfo.fields = taosMemoryMalloc(size);
    if (NULL == pStmt->exec.pRequest->body.resInfo.fields) {
S
Shengliang Guan 已提交
145
      STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
146 147 148 149 150 151 152
    }
    memcpy(pStmt->exec.pRequest->body.resInfo.fields, pRes->fields, size);
  }

  if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) {
    pStmt->exec.pRequest->body.resInfo.userFields = taosMemoryMalloc(size);
    if (NULL == pStmt->exec.pRequest->body.resInfo.userFields) {
S
Shengliang Guan 已提交
153
      STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
154 155 156
    }
    memcpy(pStmt->exec.pRequest->body.resInfo.userFields, pRes->userFields, size);
  }
D
dapan1121 已提交
157 158 159 160

  return TSDB_CODE_SUCCESS;
}

161 162
int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, SName* tbName, const char* sTableName,
                           bool autoCreateTbl) {
D
stmt  
dapan1121 已提交
163
  STscStmt* pStmt = (STscStmt*)stmt;
164
  char      tbFName[TSDB_TABLE_FNAME_LEN];
165
  tNameExtractFullName(tbName, tbFName);
D
stmt  
dapan1121 已提交
166

167
  memcpy(&pStmt->bInfo.sname, tbName, sizeof(*tbName));
D
dapan 已提交
168 169
  strncpy(pStmt->bInfo.tbFName, tbFName, sizeof(pStmt->bInfo.tbFName) - 1);
  pStmt->bInfo.tbFName[sizeof(pStmt->bInfo.tbFName) - 1] = 0;
170

171
  pStmt->bInfo.tbUid = autoCreateTbl ? 0 : pTableMeta->uid;
D
stmt  
dapan1121 已提交
172 173 174
  pStmt->bInfo.tbSuid = pTableMeta->suid;
  pStmt->bInfo.tbType = pTableMeta->tableType;
  pStmt->bInfo.boundTags = tags;
D
dapan1121 已提交
175
  pStmt->bInfo.tagsCached = false;
D
dapan1121 已提交
176
  tstrncpy(pStmt->bInfo.stbFName, sTableName, sizeof(pStmt->bInfo.stbFName));
D
stmt  
dapan1121 已提交
177 178 179 180

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
181
int32_t stmtUpdateExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockHash) {
D
stmt  
dapan1121 已提交
182 183
  STscStmt* pStmt = (STscStmt*)stmt;

D
dapan1121 已提交
184
  pStmt->sql.pVgHash = pVgHash;
D
stmt  
dapan1121 已提交
185 186 187 188 189
  pStmt->exec.pBlockHash = pBlockHash;

  return TSDB_CODE_SUCCESS;
}

190
int32_t stmtUpdateInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, SName* tbName, bool autoCreateTbl,
wmmhello's avatar
wmmhello 已提交
191
                       SHashObj* pVgHash, SHashObj* pBlockHash, const char* sTableName) {
D
dapan 已提交
192 193
  STscStmt* pStmt = (STscStmt*)stmt;

194
  STMT_ERR_RET(stmtUpdateBindInfo(stmt, pTableMeta, tags, tbName, sTableName, autoCreateTbl));
D
dapan1121 已提交
195
  STMT_ERR_RET(stmtUpdateExecInfo(stmt, pVgHash, pBlockHash));
D
dapan 已提交
196

D
dapan 已提交
197
  pStmt->sql.autoCreateTbl = autoCreateTbl;
198

D
dapan 已提交
199 200 201
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
202 203 204
int32_t stmtGetExecInfo(TAOS_STMT* stmt, SHashObj** pVgHash, SHashObj** pBlockHash) {
  STscStmt* pStmt = (STscStmt*)stmt;

D
dapan1121 已提交
205
  *pVgHash = pStmt->sql.pVgHash;
D
dapan1121 已提交
206 207
  pStmt->sql.pVgHash = NULL;
  
D
stmt  
dapan1121 已提交
208
  *pBlockHash = pStmt->exec.pBlockHash;
D
dapan1121 已提交
209
  pStmt->exec.pBlockHash = NULL;
D
stmt  
dapan1121 已提交
210 211 212 213

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
214
int32_t stmtCacheBlock(STscStmt* pStmt) {
D
stmt  
dapan1121 已提交
215 216 217
  if (pStmt->sql.type != STMT_TYPE_MULTI_INSERT) {
    return TSDB_CODE_SUCCESS;
  }
D
stmt  
dapan1121 已提交
218

219
  uint64_t uid = pStmt->bInfo.tbUid;
D
dapan 已提交
220
  uint64_t cacheUid = (TSDB_CHILD_TABLE == pStmt->bInfo.tbType) ? pStmt->bInfo.tbSuid : uid;
D
stmt  
dapan1121 已提交
221

D
dapan 已提交
222
  if (taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid))) {
D
stmt  
dapan1121 已提交
223 224 225
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
226
  STableDataCxt** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
dengyihao's avatar
dengyihao 已提交
227
  if (!pSrc) {
wmmhello's avatar
wmmhello 已提交
228 229
    return TSDB_CODE_OUT_OF_MEMORY;
  }
D
dapan1121 已提交
230
  STableDataCxt* pDst = NULL;
231

D
dapan1121 已提交
232
  STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc, true));
D
stmt  
dapan1121 已提交
233 234

  SStmtTableCache cache = {
D
dapan1121 已提交
235
      .pDataCtx = pDst,
X
Xiaoyu Wang 已提交
236
      .boundTags = pStmt->bInfo.boundTags,
D
stmt  
dapan1121 已提交
237 238
  };

D
dapan 已提交
239
  if (taosHashPut(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid), &cache, sizeof(cache))) {
D
stmt  
dapan1121 已提交
240 241
    return TSDB_CODE_OUT_OF_MEMORY;
  }
D
stmt  
dapan1121 已提交
242

D
dapan 已提交
243 244 245 246 247
  if (pStmt->sql.autoCreateTbl) {
    pStmt->bInfo.tagsCached = true;
  } else {
    pStmt->bInfo.boundTags = NULL;
  }
248

D
stmt  
dapan1121 已提交
249 250 251
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
252
int32_t stmtParseSql(STscStmt* pStmt) {
D
dapan1121 已提交
253 254
  pStmt->exec.pCurrBlock = NULL;

D
stmt  
dapan1121 已提交
255
  SStmtCallback stmtCb = {
256 257 258 259
      .pStmt = pStmt,
      .getTbNameFn = stmtGetTbName,
      .setInfoFn = stmtUpdateInfo,
      .getExecInfoFn = stmtGetExecInfo,
D
stmt  
dapan1121 已提交
260
  };
D
stmt  
dapan1121 已提交
261

262
  STMT_ERR_RET(stmtCreateRequest(pStmt));
H
Hongze Cheng 已提交
263

D
stmt  
dapan1121 已提交
264 265
  STMT_ERR_RET(parseSql(pStmt->exec.pRequest, false, &pStmt->sql.pQuery, &stmtCb));

D
stmt  
dapan1121 已提交
266
  pStmt->bInfo.needParse = false;
X
Xiaoyu Wang 已提交
267

D
dapan1121 已提交
268 269 270 271 272 273
  if (pStmt->sql.pQuery->pRoot && 0 == pStmt->sql.type) {
    pStmt->sql.type = STMT_TYPE_INSERT;
  } else if (pStmt->sql.pQuery->pPrepareRoot) {
    pStmt->sql.type = STMT_TYPE_QUERY;
  }

D
stmt  
dapan1121 已提交
274 275
  return TSDB_CODE_SUCCESS;
}
D
stmt  
dapan1121 已提交
276

D
stmt  
dapan1121 已提交
277
int32_t stmtCleanBindInfo(STscStmt* pStmt) {
D
stmt  
dapan1121 已提交
278 279 280 281
  pStmt->bInfo.tbUid = 0;
  pStmt->bInfo.tbSuid = 0;
  pStmt->bInfo.tbType = 0;
  pStmt->bInfo.needParse = true;
D
dapan 已提交
282
  pStmt->bInfo.inExecCache = false;
D
stmt  
dapan1121 已提交
283

D
dapan 已提交
284 285
  pStmt->bInfo.tbName[0] = 0;
  pStmt->bInfo.tbFName[0] = 0;
D
dapan1121 已提交
286
  if (!pStmt->bInfo.tagsCached) {
287
    qDestroyBoundColInfo(pStmt->bInfo.boundTags);
D
dapan1121 已提交
288 289
    taosMemoryFreeClear(pStmt->bInfo.boundTags);
  }
wmmhello's avatar
wmmhello 已提交
290
  memset(pStmt->bInfo.stbFName, 0, TSDB_TABLE_FNAME_LEN);
D
stmt  
dapan1121 已提交
291
  return TSDB_CODE_SUCCESS;
D
stmt  
dapan1121 已提交
292 293
}

D
dapan1121 已提交
294 295
int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) {
  if (STMT_TYPE_QUERY != pStmt->sql.type || deepClean) {
D
dapan1121 已提交
296 297 298
    taos_free_result(pStmt->exec.pRequest);
    pStmt->exec.pRequest = NULL;
  }
D
stmt  
dapan1121 已提交
299

D
dapan 已提交
300
  size_t keyLen = 0;
301
  void*  pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
D
stmt  
dapan1121 已提交
302
  while (pIter) {
D
dapan1121 已提交
303
    STableDataCxt* pBlocks = *(STableDataCxt**)pIter;
D
dapan1121 已提交
304 305
    char*          key = taosHashGetKey(pIter, &keyLen);
    STableMeta*    pMeta = qGetTableMetaInDataBlock(pBlocks);
306

D
dapan1121 已提交
307 308
    if (keepTable && pBlocks == pStmt->exec.pCurrBlock) {
      TSWAP(pBlocks->pData, pStmt->exec.pCurrTbData);
D
dapan1121 已提交
309
      STMT_ERR_RET(qResetStmtDataBlock(pBlocks, false));
X
Xiaoyu Wang 已提交
310

D
stmt  
dapan1121 已提交
311 312 313 314
      pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
      continue;
    }

D
dapan1121 已提交
315
    qDestroyStmtDataBlock(pBlocks);
D
dapan 已提交
316
    taosHashRemove(pStmt->exec.pBlockHash, key, keyLen);
D
stmt  
dapan1121 已提交
317 318 319 320

    pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
  }

D
dapan1121 已提交
321 322
  if (keepTable) {
    return TSDB_CODE_SUCCESS;
D
stmt  
dapan1121 已提交
323 324
  }

D
dapan1121 已提交
325 326
  taosHashCleanup(pStmt->exec.pBlockHash);
  pStmt->exec.pBlockHash = NULL;
D
stmt  
dapan1121 已提交
327

328
  tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
D
dapan1121 已提交
329
  taosMemoryFreeClear(pStmt->exec.pCurrTbData);
D
dapan1121 已提交
330

D
stmt  
dapan1121 已提交
331 332 333 334 335 336
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));

  return TSDB_CODE_SUCCESS;
}

int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
D
dapan1121 已提交
337 338
  STMT_DLOG_E("start to free SQL info");
  
D
dapan1121 已提交
339 340
  taosMemoryFree(pStmt->sql.queryRes.fields);
  taosMemoryFree(pStmt->sql.queryRes.userFields);
D
stmt  
dapan1121 已提交
341 342
  taosMemoryFree(pStmt->sql.sqlStr);
  qDestroyQuery(pStmt->sql.pQuery);
D
dapan1121 已提交
343
  taosArrayDestroy(pStmt->sql.nodeList);
D
dapan1121 已提交
344 345
  taosHashCleanup(pStmt->sql.pVgHash);
  pStmt->sql.pVgHash = NULL;
X
Xiaoyu Wang 已提交
346 347

  void* pIter = taosHashIterate(pStmt->sql.pTableCache, NULL);
D
stmt  
dapan1121 已提交
348
  while (pIter) {
X
Xiaoyu Wang 已提交
349
    SStmtTableCache* pCache = (SStmtTableCache*)pIter;
D
stmt  
dapan1121 已提交
350

D
dapan1121 已提交
351
    qDestroyStmtDataBlock(pCache->pDataCtx);
352
    qDestroyBoundColInfo(pCache->boundTags);
D
dapan1121 已提交
353
    taosMemoryFreeClear(pCache->boundTags);
X
Xiaoyu Wang 已提交
354

D
stmt  
dapan1121 已提交
355 356 357 358 359
    pIter = taosHashIterate(pStmt->sql.pTableCache, pIter);
  }
  taosHashCleanup(pStmt->sql.pTableCache);
  pStmt->sql.pTableCache = NULL;

D
dapan1121 已提交
360
  STMT_ERR_RET(stmtCleanExecInfo(pStmt, false, true));
D
stmt  
dapan1121 已提交
361
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
D
stmt  
dapan1121 已提交
362

D
dapan1121 已提交
363 364
  memset(&pStmt->sql, 0, sizeof(pStmt->sql));

D
dapan1121 已提交
365 366
  STMT_DLOG_E("end to free SQL info");

D
stmt  
dapan1121 已提交
367 368 369
  return TSDB_CODE_SUCCESS;
}

370 371
int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataCxt* pDataBlock, STableDataCxt** newBlock, uint64_t uid,
                             uint64_t suid) {
372 373 374
  SEpSet           ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
  SVgroupInfo      vgInfo = {0};
  SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
D
dapan1121 已提交
375 376 377
                           .requestId = pStmt->exec.pRequest->requestId,
                           .requestObjRefId = pStmt->exec.pRequest->self,
                           .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
378

D
dapan1121 已提交
379
  STMT_ERR_RET(catalogGetTableHashVgroup(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &vgInfo));
380
  STMT_ERR_RET(
D
dapan1121 已提交
381
      taosHashPut(pStmt->sql.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo)));
382

D
dapan1121 已提交
383 384 385
  STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, suid, vgInfo.vgId, pStmt->sql.autoCreateTbl));

  STMT_DLOG("tableDataCxt rebuilt, uid:%" PRId64 ", vgId:%d", uid, vgInfo.vgId);
D
dapan 已提交
386 387 388 389

  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
390
int32_t stmtGetFromCache(STscStmt* pStmt) {
D
stmt  
dapan1121 已提交
391
  pStmt->bInfo.needParse = true;
D
dapan 已提交
392
  pStmt->bInfo.inExecCache = false;
393

394
  STableDataCxt** pCxtInExec = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
D
dapan1121 已提交
395
  if (pCxtInExec) {
D
dapan1121 已提交
396
    pStmt->bInfo.needParse = false;
D
dapan 已提交
397 398
    pStmt->bInfo.inExecCache = true;

D
dapan1121 已提交
399
    pStmt->exec.pCurrBlock = *pCxtInExec;
D
dapan 已提交
400 401

    if (pStmt->sql.autoCreateTbl) {
D
dapan1121 已提交
402
      tscDebug("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
D
dapan 已提交
403 404 405
      return TSDB_CODE_SUCCESS;
    }
  }
D
stmt  
dapan1121 已提交
406

D
stmt  
dapan1121 已提交
407
  if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) {
D
dapan 已提交
408
    if (pStmt->bInfo.inExecCache) {
X
Xiaoyu Wang 已提交
409
      if (ASSERT(taosHashGetSize(pStmt->exec.pBlockHash) == 1)) {
410 411 412
        tscError("stmtGetFromCache error");
        return TSDB_CODE_TSC_STMT_CACHE_ERROR;
      }
D
dapan 已提交
413
      pStmt->bInfo.needParse = false;
D
dapan1121 已提交
414
      tscDebug("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
D
dapan 已提交
415 416
      return TSDB_CODE_SUCCESS;
    }
417

D
dapan1121 已提交
418
    tscDebug("no stmt block cache for tb %s", pStmt->bInfo.tbFName);
D
stmt  
dapan1121 已提交
419 420 421
    return TSDB_CODE_SUCCESS;
  }

D
dapan 已提交
422 423 424 425
  if (NULL == pStmt->pCatalog) {
    STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog));
  }

D
dapan 已提交
426 427 428 429
  if (pStmt->sql.autoCreateTbl) {
    SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pStmt->bInfo.tbSuid, sizeof(pStmt->bInfo.tbSuid));
    if (pCache) {
      pStmt->bInfo.needParse = false;
D
dapan1121 已提交
430
      pStmt->bInfo.tbUid = 0;
431

D
dapan1121 已提交
432
      STableDataCxt* pNewBlock = NULL;
D
dapan1121 已提交
433
      STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, 0, pStmt->bInfo.tbSuid));
434 435 436

      if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
                      POINTER_BYTES)) {
D
dapan 已提交
437 438
        STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
      }
439

D
dapan1121 已提交
440 441
      pStmt->exec.pCurrBlock = pNewBlock;

H
Hongze Cheng 已提交
442
      tscDebug("reuse stmt block for tb %s in sqlBlock, suid:0x%" PRIx64, pStmt->bInfo.tbFName, pStmt->bInfo.tbSuid);
D
dapan1121 已提交
443

D
dapan 已提交
444 445
      return TSDB_CODE_SUCCESS;
    }
446

D
dapan 已提交
447 448
    STMT_RET(stmtCleanBindInfo(pStmt));
  }
D
stmt  
dapan1121 已提交
449

450 451
  STableMeta*      pTableMeta = NULL;
  SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
D
dapan1121 已提交
452 453 454
                           .requestId = pStmt->exec.pRequest->requestId,
                           .requestObjRefId = pStmt->exec.pRequest->self,
                           .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
455
  int32_t          code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
D
dapan1121 已提交
456
  if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
D
dapan1121 已提交
457
    tscDebug("tb %s not exist", pStmt->bInfo.tbFName);
D
dapan1121 已提交
458
    stmtCleanBindInfo(pStmt);
D
dapan1121 已提交
459

D
dapan1121 已提交
460
    STMT_ERR_RET(code);
D
dapan1121 已提交
461 462 463
  }

  STMT_ERR_RET(code);
X
Xiaoyu Wang 已提交
464

D
dapan1121 已提交
465 466
  uint64_t uid = pTableMeta->uid;
  uint64_t suid = pTableMeta->suid;
X
Xiaoyu Wang 已提交
467
  int8_t   tableType = pTableMeta->tableType;
D
dapan1121 已提交
468
  taosMemoryFree(pTableMeta);
D
dapan 已提交
469
  uint64_t cacheUid = (TSDB_CHILD_TABLE == tableType) ? suid : uid;
470

D
dapan1121 已提交
471
  if (uid == pStmt->bInfo.tbUid) {
D
stmt  
dapan1121 已提交
472
    pStmt->bInfo.needParse = false;
D
dapan1121 已提交
473

D
dapan1121 已提交
474 475
    tscDebug("tb %s is current table", pStmt->bInfo.tbFName);

D
stmt  
dapan1121 已提交
476 477 478
    return TSDB_CODE_SUCCESS;
  }

D
dapan 已提交
479
  if (pStmt->bInfo.inExecCache) {
D
dapan 已提交
480
    SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid));
D
stmt  
dapan1121 已提交
481
    if (NULL == pCache) {
482 483 484
      tscError("table [%s, %" PRIx64 ", %" PRIx64 "] found in exec blockHash, but not in sql blockHash",
               pStmt->bInfo.tbFName, uid, cacheUid);

S
Shengliang Guan 已提交
485
      STMT_ERR_RET(TSDB_CODE_APP_ERROR);
D
stmt  
dapan1121 已提交
486
    }
X
Xiaoyu Wang 已提交
487

D
stmt  
dapan1121 已提交
488
    pStmt->bInfo.needParse = false;
X
Xiaoyu Wang 已提交
489

D
dapan1121 已提交
490 491 492
    pStmt->bInfo.tbUid = uid;
    pStmt->bInfo.tbSuid = suid;
    pStmt->bInfo.tbType = tableType;
D
stmt  
dapan1121 已提交
493
    pStmt->bInfo.boundTags = pCache->boundTags;
D
dapan1121 已提交
494
    pStmt->bInfo.tagsCached = true;
D
dapan1121 已提交
495

D
dapan1121 已提交
496 497
    tscDebug("tb %s in execBlock list, set to current", pStmt->bInfo.tbFName);

D
stmt  
dapan1121 已提交
498 499 500
    return TSDB_CODE_SUCCESS;
  }

D
dapan 已提交
501
  SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid));
D
stmt  
dapan1121 已提交
502
  if (pCache) {
D
stmt  
dapan1121 已提交
503
    pStmt->bInfo.needParse = false;
D
stmt  
dapan1121 已提交
504

D
dapan1121 已提交
505 506 507
    pStmt->bInfo.tbUid = uid;
    pStmt->bInfo.tbSuid = suid;
    pStmt->bInfo.tbType = tableType;
D
stmt  
dapan1121 已提交
508
    pStmt->bInfo.boundTags = pCache->boundTags;
D
dapan1121 已提交
509
    pStmt->bInfo.tagsCached = true;
D
stmt  
dapan1121 已提交
510

D
dapan1121 已提交
511
    STableDataCxt* pNewBlock = NULL;
D
dapan1121 已提交
512
    STMT_ERR_RET(stmtRebuildDataBlock(pStmt, pCache->pDataCtx, &pNewBlock, uid, suid));
D
stmt  
dapan1121 已提交
513

514 515
    if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock,
                    POINTER_BYTES)) {
D
stmt  
dapan1121 已提交
516 517
      STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
X
Xiaoyu Wang 已提交
518

D
dapan1121 已提交
519 520
    pStmt->exec.pCurrBlock = pNewBlock;

D
dapan1121 已提交
521 522
    tscDebug("tb %s in sqlBlock list, set to current", pStmt->bInfo.tbFName);

D
stmt  
dapan1121 已提交
523 524 525
    return TSDB_CODE_SUCCESS;
  }

D
stmt  
dapan1121 已提交
526 527
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));

D
stmt  
dapan1121 已提交
528 529 530
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
531 532 533 534 535
int32_t stmtResetStmt(STscStmt* pStmt) {
  STMT_ERR_RET(stmtCleanSQLInfo(pStmt));

  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (NULL == pStmt->sql.pTableCache) {
S
Shengliang Guan 已提交
536
    terrno = TSDB_CODE_OUT_OF_MEMORY;
D
stmt  
dapan1121 已提交
537 538 539 540 541 542 543 544
    STMT_ERR_RET(terrno);
  }

  pStmt->sql.status = STMT_INIT;

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
545
TAOS_STMT* stmtInit(STscObj* taos, int64_t reqid) {
X
Xiaoyu Wang 已提交
546
  STscObj*  pObj = (STscObj*)taos;
D
dapan1121 已提交
547 548 549
  STscStmt* pStmt = NULL;

  pStmt = taosMemoryCalloc(1, sizeof(STscStmt));
D
stmt  
dapan1121 已提交
550
  if (NULL == pStmt) {
S
Shengliang Guan 已提交
551
    terrno = TSDB_CODE_OUT_OF_MEMORY;
D
dapan1121 已提交
552 553
    return NULL;
  }
D
stmt  
dapan1121 已提交
554

D
stmt  
dapan1121 已提交
555 556
  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (NULL == pStmt->sql.pTableCache) {
S
Shengliang Guan 已提交
557
    terrno = TSDB_CODE_OUT_OF_MEMORY;
D
stmt  
dapan1121 已提交
558 559 560
    taosMemoryFree(pStmt);
    return NULL;
  }
D
stmt  
dapan1121 已提交
561

D
dapan1121 已提交
562
  pStmt->taos = pObj;
D
stmt  
dapan1121 已提交
563
  pStmt->bInfo.needParse = true;
D
stmt  
dapan1121 已提交
564
  pStmt->sql.status = STMT_INIT;
dengyihao's avatar
dengyihao 已提交
565
  pStmt->reqid = reqid;
X
Xiaoyu Wang 已提交
566

567
  STMT_LOG_SEQ(STMT_INIT);
dengyihao's avatar
dengyihao 已提交
568

569 570
  tscDebug("stmt:%p initialized", pStmt);

D
stmt  
dapan1121 已提交
571 572
  return pStmt;
}
D
dapan1121 已提交
573

X
Xiaoyu Wang 已提交
574
int stmtPrepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
D
stmt  
dapan1121 已提交
575 576
  STscStmt* pStmt = (STscStmt*)stmt;

577
  STMT_DLOG_E("start to prepare");
D
dapan1121 已提交
578

D
stmt  
dapan1121 已提交
579
  if (pStmt->sql.status >= STMT_PREPARE) {
D
stmt  
dapan1121 已提交
580
    STMT_ERR_RET(stmtResetStmt(pStmt));
D
stmt  
dapan1121 已提交
581 582
  }

D
stmt  
dapan1121 已提交
583 584 585 586 587
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_PREPARE));

  if (length <= 0) {
    length = strlen(sql);
  }
X
Xiaoyu Wang 已提交
588

D
stmt  
dapan1121 已提交
589 590
  pStmt->sql.sqlStr = strndup(sql, length);
  pStmt->sql.sqlLen = length;
D
stmt  
dapan1121 已提交
591 592 593 594

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
595
int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) {
D
stmt  
dapan1121 已提交
596 597
  STscStmt* pStmt = (STscStmt*)stmt;

598
  STMT_DLOG("start to set tbName: %s", tbName);
D
dapan1121 已提交
599

D
stmt  
dapan1121 已提交
600
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTBNAME));
D
stmt  
dapan1121 已提交
601

D
dapan1121 已提交
602 603 604 605 606 607 608
  int32_t insert = 0;
  stmtIsInsert(stmt, &insert);
  if (0 == insert) {
    tscError("set tb name not available for none insert statement");
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
  }

609
  STMT_ERR_RET(stmtCreateRequest(pStmt));
610 611 612

  STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
                            pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
D
dapan 已提交
613
  tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName);
614

D
stmt  
dapan1121 已提交
615
  STMT_ERR_RET(stmtGetFromCache(pStmt));
D
stmt  
dapan1121 已提交
616

D
stmt  
dapan1121 已提交
617
  if (pStmt->bInfo.needParse) {
D
dapan 已提交
618 619
    strncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName) - 1);
    pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0;
D
dapan1121 已提交
620 621

    STMT_ERR_RET(stmtParseSql(pStmt));
D
stmt  
dapan1121 已提交
622 623
  }

D
stmt  
dapan1121 已提交
624 625 626
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
627
int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) {
D
stmt  
dapan1121 已提交
628
  STscStmt* pStmt = (STscStmt*)stmt;
D
dapan1121 已提交
629

630
  STMT_DLOG_E("start to set tbTags");
D
dapan1121 已提交
631

D
stmt  
dapan1121 已提交
632
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
D
stmt  
dapan1121 已提交
633

D
dapan 已提交
634 635 636
  if (pStmt->bInfo.inExecCache) {
    return TSDB_CODE_SUCCESS;
  }
D
dapan 已提交
637

D
dapan1121 已提交
638 639
  STableDataCxt** pDataBlock =
      (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
D
stmt  
dapan1121 已提交
640
  if (NULL == pDataBlock) {
D
dapan 已提交
641
    tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
S
Shengliang Guan 已提交
642
    STMT_ERR_RET(TSDB_CODE_APP_ERROR);
D
stmt  
dapan1121 已提交
643
  }
X
Xiaoyu Wang 已提交
644

D
dapan1121 已提交
645
  tscDebug("start to bind stmt tag values");
H
Hongze Cheng 已提交
646 647 648
  STMT_ERR_RET(qBindStmtTagsValue(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.tbSuid, pStmt->bInfo.stbFName,
                                  pStmt->bInfo.sname.tname, tags, pStmt->exec.pRequest->msgBuf,
                                  pStmt->exec.pRequest->msgBufLen));
D
stmt  
dapan1121 已提交
649

D
stmt  
dapan1121 已提交
650 651 652
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
653
int stmtFetchTagFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) {
D
stmt  
dapan1121 已提交
654 655 656 657 658
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
    tscError("invalid operation to get query tag fileds");
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
  }

D
dapan1121 已提交
659 660
  STableDataCxt** pDataBlock =
      (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
D
stmt  
dapan1121 已提交
661
  if (NULL == pDataBlock) {
D
dapan 已提交
662
    tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
S
Shengliang Guan 已提交
663
    STMT_ERR_RET(TSDB_CODE_APP_ERROR);
D
stmt  
dapan1121 已提交
664 665
  }

D
stmt  
dapan1121 已提交
666
  STMT_ERR_RET(qBuildStmtTagFields(*pDataBlock, pStmt->bInfo.boundTags, fieldNum, fields));
D
stmt  
dapan1121 已提交
667

D
stmt  
dapan1121 已提交
668 669
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
670

D
dapan1121 已提交
671
int stmtFetchColFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) {
D
stmt  
dapan1121 已提交
672 673 674 675 676
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
    tscError("invalid operation to get query column fileds");
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
  }

D
dapan1121 已提交
677 678
  STableDataCxt** pDataBlock =
      (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
D
stmt  
dapan1121 已提交
679
  if (NULL == pDataBlock) {
D
dapan 已提交
680
    tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
S
Shengliang Guan 已提交
681
    STMT_ERR_RET(TSDB_CODE_APP_ERROR);
D
stmt  
dapan1121 已提交
682 683
  }

D
stmt  
dapan1121 已提交
684
  STMT_ERR_RET(qBuildStmtColFields(*pDataBlock, fieldNum, fields));
D
stmt  
dapan1121 已提交
685

X
Xiaoyu Wang 已提交
686
  return TSDB_CODE_SUCCESS;
D
stmt  
dapan1121 已提交
687 688
}

X
Xiaoyu Wang 已提交
689
int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
D
stmt  
dapan1121 已提交
690 691
  STscStmt* pStmt = (STscStmt*)stmt;

692
  STMT_DLOG("start to bind stmt data, colIdx: %d", colIdx);
D
dapan1121 已提交
693

D
dapan1121 已提交
694 695
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND));

X
Xiaoyu Wang 已提交
696 697
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
D
stmt  
dapan1121 已提交
698
    pStmt->bInfo.needParse = false;
D
stmt  
dapan1121 已提交
699
  }
D
stmt  
dapan1121 已提交
700

D
dapan1121 已提交
701 702 703 704
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
    taos_free_result(pStmt->exec.pRequest);
    pStmt->exec.pRequest = NULL;
  }
X
Xiaoyu Wang 已提交
705

706
  STMT_ERR_RET(stmtCreateRequest(pStmt));
D
stmt  
dapan1121 已提交
707

D
stmt  
dapan1121 已提交
708
  if (pStmt->bInfo.needParse) {
D
stmt  
dapan1121 已提交
709 710
    STMT_ERR_RET(stmtParseSql(pStmt));
  }
D
stmt  
dapan1121 已提交
711

D
dapan1121 已提交
712
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
713 714
    STMT_ERR_RET(qStmtBindParams(pStmt->sql.pQuery, bind, colIdx));

D
dapan1121 已提交
715 716 717 718 719 720 721 722 723 724 725
    SParseContext ctx = {.requestId = pStmt->exec.pRequest->requestId,
                         .acctId = pStmt->taos->acctId,
                         .db = pStmt->exec.pRequest->pDb,
                         .topicQuery = false,
                         .pSql = pStmt->sql.sqlStr,
                         .sqlLen = pStmt->sql.sqlLen,
                         .pMsg = pStmt->exec.pRequest->msgBuf,
                         .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
                         .pTransporter = pStmt->taos->pAppInfo->pTransporter,
                         .pStmtCb = NULL,
                         .pUser = pStmt->taos->user};
D
dapan1121 已提交
726 727
    ctx.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
    STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &ctx.pCatalog));
728

D
dapan1121 已提交
729
    STMT_ERR_RET(qStmtParseQuerySql(&ctx, pStmt->sql.pQuery));
X
Xiaoyu Wang 已提交
730

D
dapan1121 已提交
731
    if (pStmt->sql.pQuery->haveResultSet) {
732 733
      setResSchemaInfo(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->pResSchema,
                       pStmt->sql.pQuery->numOfResCols);
D
dapan1121 已提交
734
      taosMemoryFreeClear(pStmt->sql.pQuery->pResSchema);
D
dapan1121 已提交
735 736
      setResPrecision(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->precision);
    }
737

D
dapan1121 已提交
738
    TSWAP(pStmt->exec.pRequest->dbList, pStmt->sql.pQuery->pDbList);
739
    TSWAP(pStmt->exec.pRequest->tableList, pStmt->sql.pQuery->pTableList);
D
dapan1121 已提交
740
    TSWAP(pStmt->exec.pRequest->targetTableList, pStmt->sql.pQuery->pTargetTableList);
D
dapan1121 已提交
741

742 743 744
    // if (STMT_TYPE_QUERY == pStmt->sql.queryRes) {
    //   STMT_ERR_RET(stmtRestoreQueryFields(pStmt));
    // }
D
dapan1121 已提交
745

746
    // STMT_ERR_RET(stmtBackupQueryFields(pStmt));
D
dapan1121 已提交
747

D
dapan1121 已提交
748
    return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
749
  }
750

D
dapan1121 已提交
751 752 753 754 755
  STableDataCxt** pDataBlock = NULL;

  if (pStmt->exec.pCurrBlock) {
    pDataBlock = &pStmt->exec.pCurrBlock;
  } else {
756 757
    pDataBlock =
        (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
D
dapan1121 已提交
758 759
    if (NULL == pDataBlock) {
      tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
D
dapan1121 已提交
760
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_CACHE_ERROR);
D
dapan1121 已提交
761 762
    }
    pStmt->exec.pCurrBlock = *pDataBlock;
D
stmt  
dapan1121 已提交
763
  }
D
stmt  
dapan1121 已提交
764 765

  if (colIdx < 0) {
D
dapan1121 已提交
766 767 768 769 770
    int32_t code = qBindStmtColsValue(*pDataBlock, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen);
    if (code) {
      tscError("qBindStmtColsValue failed, error:%s", tstrerror(code));
      STMT_ERR_RET(code);
    }
D
stmt  
dapan1121 已提交
771 772 773
  } else {
    if (colIdx != (pStmt->bInfo.sBindLastIdx + 1) && colIdx != 0) {
      tscError("bind column index not in sequence");
S
Shengliang Guan 已提交
774
      STMT_ERR_RET(TSDB_CODE_APP_ERROR);
D
stmt  
dapan1121 已提交
775 776 777
    }

    pStmt->bInfo.sBindLastIdx = colIdx;
X
Xiaoyu Wang 已提交
778

D
stmt  
dapan1121 已提交
779 780 781
    if (0 == colIdx) {
      pStmt->bInfo.sBindRowNum = bind->num;
    }
X
Xiaoyu Wang 已提交
782 783 784

    qBindStmtSingleColValue(*pDataBlock, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen, colIdx,
                            pStmt->bInfo.sBindRowNum);
D
stmt  
dapan1121 已提交
785
  }
X
Xiaoyu Wang 已提交
786

D
stmt  
dapan1121 已提交
787
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
788 789
}

X
Xiaoyu Wang 已提交
790
int stmtAddBatch(TAOS_STMT* stmt) {
D
stmt  
dapan1121 已提交
791 792
  STscStmt* pStmt = (STscStmt*)stmt;

793
  STMT_DLOG_E("start to add batch");
D
dapan1121 已提交
794

D
stmt  
dapan1121 已提交
795
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH));
D
stmt  
dapan1121 已提交
796

D
dapan1121 已提交
797
  STMT_ERR_RET(stmtCacheBlock(pStmt));
X
Xiaoyu Wang 已提交
798

D
stmt  
dapan1121 已提交
799 800 801
  return TSDB_CODE_SUCCESS;
}

802
int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) {
D
dapan1121 已提交
803
  tscDebug("stmt start to update tbUid, blockNum: %d", pRsp->nBlocks);
D
dapan1121 已提交
804

805 806 807
  int32_t code = 0;
  int32_t finalCode = 0;
  size_t  keyLen = 0;
808
  void*   pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
D
dapan 已提交
809
  while (pIter) {
D
dapan1121 已提交
810
    STableDataCxt* pBlock = *(STableDataCxt**)pIter;
811
    char*          key = taosHashGetKey(pIter, &keyLen);
812 813

    STableMeta* pMeta = qGetTableMetaInDataBlock(pBlock);
D
dapan 已提交
814 815 816 817 818
    if (pMeta->uid) {
      pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
      continue;
    }

819 820
    SSubmitBlkRsp* blkRsp = NULL;
    int32_t        i = 0;
D
dapan 已提交
821 822 823 824 825
    for (; i < pRsp->nBlocks; ++i) {
      blkRsp = pRsp->pBlocks + i;
      if (strlen(blkRsp->tblFName) != keyLen) {
        continue;
      }
826

D
dapan 已提交
827 828 829
      if (strncmp(blkRsp->tblFName, key, keyLen)) {
        continue;
      }
830

D
dapan 已提交
831 832 833 834
      break;
    }

    if (i < pRsp->nBlocks) {
835 836 837
      tscDebug("auto created table %s uid updated from %" PRIx64 " to %" PRIx64, blkRsp->tblFName, pMeta->uid,
               blkRsp->uid);

D
dapan 已提交
838 839 840
      pMeta->uid = blkRsp->uid;
      pStmt->bInfo.tbUid = blkRsp->uid;
    } else {
841 842
      tscDebug("table %s not found in submit rsp, will update from catalog", pStmt->bInfo.tbFName);
      if (NULL == pStmt->pCatalog) {
843 844 845 846 847 848
        code = catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog);
        if (code) {
          pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
          finalCode = code;
          continue;
        }
849 850
      }

851 852 853 854 855 856
      code = stmtCreateRequest(pStmt);
      if (code) {
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
        finalCode = code;
        continue;
      }
857 858 859 860 861 862 863 864 865 866

      STableMeta*      pTableMeta = NULL;
      SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
                               .requestId = pStmt->exec.pRequest->requestId,
                               .requestObjRefId = pStmt->exec.pRequest->self,
                               .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
      int32_t          code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);

      taos_free_result(pStmt->exec.pRequest);
      pStmt->exec.pRequest = NULL;
867 868 869 870

      if (code || NULL == pTableMeta) {
        pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
        finalCode = code;
D
dapan1121 已提交
871
        taosMemoryFree(pTableMeta);
872
        continue;
873 874 875 876
      }

      pMeta->uid = pTableMeta->uid;
      pStmt->bInfo.tbUid = pTableMeta->uid;
877
      taosMemoryFree(pTableMeta);
D
dapan 已提交
878 879 880 881 882
    }

    pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
  }

883
  return finalCode;
D
dapan 已提交
884 885
}

886 887 888 889
int stmtExec(TAOS_STMT* stmt) {
  STscStmt*   pStmt = (STscStmt*)stmt;
  int32_t     code = 0;
  SSubmitRsp* pRsp = NULL;
D
stmt  
dapan1121 已提交
890

891
  STMT_DLOG_E("start to exec");
D
dapan1121 已提交
892

D
stmt  
dapan1121 已提交
893
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
D
stmt  
dapan1121 已提交
894

D
dapan1121 已提交
895
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
896
    launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
D
dapan1121 已提交
897
  } else {
898
    tDestroySubmitTbData(pStmt->exec.pCurrTbData, TSDB_MSG_FLG_ENCODE);
D
dapan1121 已提交
899
    taosMemoryFreeClear(pStmt->exec.pCurrTbData);
900

D
dapan1121 已提交
901
    STMT_ERR_RET(qCloneCurrentTbData(pStmt->exec.pCurrBlock, &pStmt->exec.pCurrTbData));
902

D
dapan1121 已提交
903
    STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pBlockHash));
D
dapan1121 已提交
904
    launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
D
dapan1121 已提交
905
  }
D
fix bug  
dapan1121 已提交
906 907 908 909 910 911

  if (pStmt->exec.pRequest->code && NEED_CLIENT_HANDLE_ERROR(pStmt->exec.pRequest->code)) {
    code = refreshMeta(pStmt->exec.pRequest->pTscObj, pStmt->exec.pRequest);
    if (code) {
      pStmt->exec.pRequest->code = code;
    } else {
D
dapan1121 已提交
912
      tFreeSSubmitRsp(pRsp);
D
fix bug  
dapan1121 已提交
913 914 915 916
      STMT_ERR_RET(stmtResetStmt(pStmt));
      STMT_ERR_RET(TSDB_CODE_NEED_RETRY);
    }
  }
X
Xiaoyu Wang 已提交
917

D
stmt  
dapan1121 已提交
918
  STMT_ERR_JRET(pStmt->exec.pRequest->code);
D
stmt  
dapan1121 已提交
919

D
dapan1121 已提交
920 921
  pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
  pStmt->affectedRows += pStmt->exec.affectedRows;
D
stmt  
dapan1121 已提交
922

D
stmt  
dapan1121 已提交
923 924
_return:

D
dapan1121 已提交
925
  stmtCleanExecInfo(pStmt, (code ? false : true), false);
D
dapan 已提交
926

D
dapan1121 已提交
927
  tFreeSSubmitRsp(pRsp);
928

D
stmt  
dapan1121 已提交
929
  ++pStmt->sql.runTimes;
X
Xiaoyu Wang 已提交
930

D
stmt  
dapan1121 已提交
931
  STMT_RET(code);
D
stmt  
dapan1121 已提交
932 933
}

X
Xiaoyu Wang 已提交
934
int stmtClose(TAOS_STMT* stmt) {
D
stmt  
dapan1121 已提交
935 936
  STscStmt* pStmt = (STscStmt*)stmt;

D
dapan1121 已提交
937 938
  STMT_DLOG_E("start to free stmt");

D
dapan1121 已提交
939
  stmtCleanSQLInfo(pStmt);
D
dapan1121 已提交
940
  taosMemoryFree(stmt);
D
dapan1121 已提交
941 942

  return TSDB_CODE_SUCCESS;
D
stmt  
dapan1121 已提交
943 944
}

X
Xiaoyu Wang 已提交
945
const char* stmtErrstr(TAOS_STMT* stmt) {
D
stmt  
dapan1121 已提交
946
  STscStmt* pStmt = (STscStmt*)stmt;
D
stmt  
dapan1121 已提交
947

D
fix bug  
dapan1121 已提交
948
  if (stmt == NULL || NULL == pStmt->exec.pRequest) {
X
Xiaoyu Wang 已提交
949
    return (char*)tstrerror(terrno);
D
stmt  
dapan1121 已提交
950 951
  }

D
fix bug  
dapan1121 已提交
952
  pStmt->exec.pRequest->code = terrno;
D
stmt  
dapan1121 已提交
953

D
stmt  
dapan1121 已提交
954
  return taos_errstr(pStmt->exec.pRequest);
D
stmt  
dapan1121 已提交
955 956
}

X
Xiaoyu Wang 已提交
957
int stmtAffectedRows(TAOS_STMT* stmt) { return ((STscStmt*)stmt)->affectedRows; }
D
stmt  
dapan1121 已提交
958

X
Xiaoyu Wang 已提交
959
int stmtAffectedRowsOnce(TAOS_STMT* stmt) { return ((STscStmt*)stmt)->exec.affectedRows; }
D
dapan1121 已提交
960

X
Xiaoyu Wang 已提交
961
int stmtIsInsert(TAOS_STMT* stmt, int* insert) {
D
stmt  
dapan1121 已提交
962 963
  STscStmt* pStmt = (STscStmt*)stmt;

964 965
  STMT_DLOG_E("start is insert");

D
stmt  
dapan1121 已提交
966 967 968
  if (pStmt->sql.type) {
    *insert = (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type);
  } else {
969
    *insert = qIsInsertValuesSql(pStmt->sql.sqlStr, pStmt->sql.sqlLen);
D
stmt  
dapan1121 已提交
970
  }
X
Xiaoyu Wang 已提交
971

D
stmt  
dapan1121 已提交
972 973 974
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
975
int stmtGetTagFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
D
dapan1121 已提交
976
  int32_t code = 0;
D
dapan1121 已提交
977
  STscStmt* pStmt = (STscStmt*)stmt;
D
dapan1121 已提交
978
  int32_t preCode = pStmt->errCode;
D
dapan1121 已提交
979

980 981
  STMT_DLOG_E("start to get tag fields");

D
dapan1121 已提交
982
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
D
dapan1121 已提交
983
    STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
D
dapan1121 已提交
984
  }
985

D
dapan1121 已提交
986
  STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
D
dapan1121 已提交
987 988 989 990 991 992 993 994 995 996 997

  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
    pStmt->bInfo.needParse = false;
  }

  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
    taos_free_result(pStmt->exec.pRequest);
    pStmt->exec.pRequest = NULL;
  }

D
dapan1121 已提交
998
  STMT_ERRI_JRET(stmtCreateRequest(pStmt));
D
dapan1121 已提交
999 1000

  if (pStmt->bInfo.needParse) {
D
dapan1121 已提交
1001
    STMT_ERRI_JRET(stmtParseSql(pStmt));
D
dapan1121 已提交
1002 1003
  }

D
dapan1121 已提交
1004
  STMT_ERRI_JRET(stmtFetchTagFields(stmt, nums, fields));
D
dapan1121 已提交
1005

D
dapan1121 已提交
1006 1007
_return:

D
dapan1121 已提交
1008 1009
  pStmt->errCode = preCode;
  
D
dapan1121 已提交
1010
  return code;
D
dapan1121 已提交
1011 1012 1013
}

int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
D
dapan1121 已提交
1014
  int32_t code = 0;
D
dapan1121 已提交
1015
  STscStmt* pStmt = (STscStmt*)stmt;
D
dapan1121 已提交
1016
  int32_t preCode = pStmt->errCode;
D
dapan1121 已提交
1017

1018 1019
  STMT_DLOG_E("start to get col fields");

D
dapan1121 已提交
1020
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
D
dapan1121 已提交
1021
    STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
D
dapan1121 已提交
1022
  }
1023

D
dapan1121 已提交
1024
  STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
D
dapan1121 已提交
1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035

  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
    pStmt->bInfo.needParse = false;
  }

  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
    taos_free_result(pStmt->exec.pRequest);
    pStmt->exec.pRequest = NULL;
  }

D
dapan1121 已提交
1036
  STMT_ERRI_JRET(stmtCreateRequest(pStmt));
D
dapan1121 已提交
1037 1038

  if (pStmt->bInfo.needParse) {
D
dapan1121 已提交
1039
    STMT_ERRI_JRET(stmtParseSql(pStmt));
D
dapan1121 已提交
1040 1041
  }

D
dapan1121 已提交
1042
  STMT_ERRI_JRET(stmtFetchColFields(stmt, nums, fields));
D
dapan1121 已提交
1043

D
dapan1121 已提交
1044 1045
_return:

D
dapan1121 已提交
1046 1047
  pStmt->errCode = preCode;

D
dapan1121 已提交
1048
  return code;
D
dapan1121 已提交
1049 1050
}

X
Xiaoyu Wang 已提交
1051
int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
D
dapan1121 已提交
1052 1053
  STscStmt* pStmt = (STscStmt*)stmt;

1054 1055
  STMT_DLOG_E("start to get param num");

D
dapan1121 已提交
1056 1057
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));

X
Xiaoyu Wang 已提交
1058 1059
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
D
dapan1121 已提交
1060 1061 1062 1063 1064 1065 1066
    pStmt->bInfo.needParse = false;
  }

  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
    taos_free_result(pStmt->exec.pRequest);
    pStmt->exec.pRequest = NULL;
  }
X
Xiaoyu Wang 已提交
1067

1068
  STMT_ERR_RET(stmtCreateRequest(pStmt));
D
dapan1121 已提交
1069 1070 1071 1072 1073
  if (pStmt->bInfo.needParse) {
    STMT_ERR_RET(stmtParseSql(pStmt));
  }

  if (STMT_TYPE_QUERY == pStmt->sql.type) {
D
dapan1121 已提交
1074
    *nums = taosArrayGetSize(pStmt->sql.pQuery->pPlaceholderValues);
D
dapan1121 已提交
1075 1076 1077
  } else {
    STMT_ERR_RET(stmtFetchColFields(stmt, nums, NULL));
  }
X
Xiaoyu Wang 已提交
1078

D
stmt  
dapan1121 已提交
1079 1080 1081
  return TSDB_CODE_SUCCESS;
}

1082
int stmtGetParam(TAOS_STMT* stmt, int idx, int* type, int* bytes) {
D
dapan1121 已提交
1083 1084
  STscStmt* pStmt = (STscStmt*)stmt;

1085 1086
  STMT_DLOG_E("start to get param");

D
dapan1121 已提交
1087 1088 1089
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
    STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR);
  }
1090

D
dapan1121 已提交
1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));

  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
      STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
    pStmt->bInfo.needParse = false;
  }

  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
    taos_free_result(pStmt->exec.pRequest);
    pStmt->exec.pRequest = NULL;
  }

1103
  STMT_ERR_RET(stmtCreateRequest(pStmt));
D
dapan1121 已提交
1104 1105 1106 1107 1108

  if (pStmt->bInfo.needParse) {
    STMT_ERR_RET(stmtParseSql(pStmt));
  }

1109 1110
  int32_t       nums = 0;
  TAOS_FIELD_E* pField = NULL;
D
dapan1121 已提交
1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125
  STMT_ERR_RET(stmtFetchColFields(stmt, &nums, &pField));
  if (idx >= nums) {
    tscError("idx %d is too big", idx);
    taosMemoryFree(pField);
    STMT_ERR_RET(TSDB_CODE_INVALID_PARA);
  }

  *type = pField[idx].type;
  *bytes = pField[idx].bytes;

  taosMemoryFree(pField);

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
1126
TAOS_RES* stmtUseResult(TAOS_STMT* stmt) {
D
dapan1121 已提交
1127 1128
  STscStmt* pStmt = (STscStmt*)stmt;

1129 1130
  STMT_DLOG_E("start to use result");

D
dapan1121 已提交
1131 1132 1133 1134 1135 1136
  if (STMT_TYPE_QUERY != pStmt->sql.type) {
    tscError("useResult only for query statement");
    return NULL;
  }

  return pStmt->exec.pRequest;
D
stmt  
dapan1121 已提交
1137
}