clientStmt.c 17.4 KB
Newer Older
D
dapan1121 已提交
1 2 3

#include "clientInt.h"
#include "clientLog.h"
D
stmt  
dapan1121 已提交
4
#include "clientStmt.h"
D
dapan1121 已提交
5 6
#include "tdef.h"

D
stmt  
dapan1121 已提交
7
int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) {
D
dapan1121 已提交
8 9
  int32_t code = 0;
  
D
stmt  
dapan1121 已提交
10
  switch (newStatus) {
D
dapan1121 已提交
11 12
    case STMT_PREPARE:
      break;
D
stmt  
dapan1121 已提交
13
    case STMT_SETTBNAME:
D
dapan1121 已提交
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
      if (STMT_STATUS_NE(PREPARE) && STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(EXECUTE)) {
        code = TSDB_CODE_TSC_STMT_API_ERROR;
      }
      break;
    case STMT_SETTAGS:
      if (STMT_STATUS_NE(SETTBNAME)) {
        code = TSDB_CODE_TSC_STMT_API_ERROR;
      }
      break;
    case STMT_FETCH_FIELDS:
      if (STMT_STATUS_EQ(INIT)) {
        code = TSDB_CODE_TSC_STMT_API_ERROR;
      }
      break;
    case STMT_BIND:
      if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND_COL)) {
        code = TSDB_CODE_TSC_STMT_API_ERROR;
      }
      break;
    case STMT_BIND_COL:
      if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND)) {
        code = TSDB_CODE_TSC_STMT_API_ERROR;
      }
      break;
    case STMT_ADD_BATCH:
      if (STMT_STATUS_NE(BIND) && STMT_STATUS_NE(BIND_COL) && STMT_STATUS_NE(FETCH_FIELDS)) {
        code = TSDB_CODE_TSC_STMT_API_ERROR;
      }
D
stmt  
dapan1121 已提交
42
      break;
D
dapan1121 已提交
43 44 45 46
    case STMT_EXECUTE:
      if (STMT_STATUS_NE(ADD_BATCH) && STMT_STATUS_NE(FETCH_FIELDS)) {
        code = TSDB_CODE_TSC_STMT_API_ERROR;
      }
D
stmt  
dapan1121 已提交
47
    default:
D
dapan1121 已提交
48
      code = TSDB_CODE_TSC_APP_ERROR;
D
stmt  
dapan1121 已提交
49 50 51
      break;
  }

D
dapan1121 已提交
52
  STMT_ERR_RET(code);
D
stmt  
dapan1121 已提交
53 54 55 56 57 58 59

  pStmt->sql.status = newStatus;

  return TSDB_CODE_SUCCESS;
}


D
stmt  
dapan1121 已提交
60 61 62
int32_t stmtGetTbName(TAOS_STMT *stmt, char **tbName) {
  STscStmt* pStmt = (STscStmt*)stmt;

D
stmt  
dapan1121 已提交
63
  pStmt->sql.type = STMT_TYPE_MULTI_INSERT;
D
stmt  
dapan1121 已提交
64
  
D
stmt  
dapan1121 已提交
65
  if (NULL == pStmt->bInfo.tbName) {
D
stmt  
dapan1121 已提交
66 67 68 69
    tscError("no table name set");
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_TBNAME_ERROR);
  }

D
stmt  
dapan1121 已提交
70
  *tbName = pStmt->bInfo.tbName;
D
stmt  
dapan1121 已提交
71 72 73 74 75 76 77

  return TSDB_CODE_SUCCESS;
}

int32_t stmtSetBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags) {
  STscStmt* pStmt = (STscStmt*)stmt;

D
stmt  
dapan1121 已提交
78 79 80 81
  pStmt->bInfo.tbUid = pTableMeta->uid;
  pStmt->bInfo.tbSuid = pTableMeta->suid;
  pStmt->bInfo.tbType = pTableMeta->tableType;
  pStmt->bInfo.boundTags = tags;
D
stmt  
dapan1121 已提交
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99

  return TSDB_CODE_SUCCESS;
}

int32_t stmtSetExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockHash) {
  STscStmt* pStmt = (STscStmt*)stmt;

  pStmt->exec.pVgHash = pVgHash;
  pStmt->exec.pBlockHash = pBlockHash;

  return TSDB_CODE_SUCCESS;
}

int32_t stmtGetExecInfo(TAOS_STMT* stmt, SHashObj** pVgHash, SHashObj** pBlockHash) {
  STscStmt* pStmt = (STscStmt*)stmt;

  *pVgHash = pStmt->exec.pVgHash;
  *pBlockHash = pStmt->exec.pBlockHash;
D
stmt  
dapan1121 已提交
100 101 102 103

  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
104 105 106 107
int32_t stmtCacheBlock(STscStmt *pStmt) {
  if (pStmt->sql.type != STMT_TYPE_MULTI_INSERT) {
    return TSDB_CODE_SUCCESS;
  }
D
stmt  
dapan1121 已提交
108

D
dapan1121 已提交
109 110
  uint64_t uid = pStmt->bInfo.tbUid; 
  uint64_t tuid = (TSDB_CHILD_TABLE == pStmt->bInfo.tbType) ? pStmt->bInfo.tbSuid : uid;
D
stmt  
dapan1121 已提交
111

D
dapan1121 已提交
112
  if (taosHashGet(pStmt->sql.pTableCache, &tuid, sizeof(tuid))) {
D
stmt  
dapan1121 已提交
113 114 115
    return TSDB_CODE_SUCCESS;
  }

D
stmt  
dapan1121 已提交
116
  STableDataBlocks** pSrc = taosHashGet(pStmt->exec.pBlockHash, &uid, sizeof(uid));
D
stmt  
dapan1121 已提交
117 118
  STableDataBlocks* pDst = NULL;
  
D
stmt  
dapan1121 已提交
119
  STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc));
D
stmt  
dapan1121 已提交
120 121 122

  SStmtTableCache cache = {
    .pDataBlock = pDst,
D
stmt  
dapan1121 已提交
123
    .boundTags = pStmt->bInfo.boundTags,
D
stmt  
dapan1121 已提交
124 125
  };

D
dapan1121 已提交
126
  if (taosHashPut(pStmt->sql.pTableCache, &tuid, sizeof(tuid), &cache, sizeof(cache))) {
D
stmt  
dapan1121 已提交
127 128
    return TSDB_CODE_OUT_OF_MEMORY;
  }
D
stmt  
dapan1121 已提交
129

D
stmt  
dapan1121 已提交
130
  pStmt->bInfo.boundTags = NULL;
D
stmt  
dapan1121 已提交
131 132 133 134

  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
135 136 137 138 139 140 141 142
int32_t stmtParseSql(STscStmt* pStmt) {
  SStmtCallback stmtCb = {
    .pStmt = pStmt, 
    .getTbNameFn = stmtGetTbName, 
    .setBindInfoFn = stmtSetBindInfo,
    .setExecInfoFn = stmtSetExecInfo,
    .getExecInfoFn = stmtGetExecInfo,
  };
D
stmt  
dapan1121 已提交
143 144 145 146

  if (NULL == pStmt->exec.pRequest) {
    STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
  }
D
stmt  
dapan1121 已提交
147 148 149
  
  STMT_ERR_RET(parseSql(pStmt->exec.pRequest, false, &pStmt->sql.pQuery, &stmtCb));

D
stmt  
dapan1121 已提交
150
  pStmt->bInfo.needParse = false;
D
stmt  
dapan1121 已提交
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
  
  switch (nodeType(pStmt->sql.pQuery->pRoot)) {
    case QUERY_NODE_VNODE_MODIF_STMT:
      if (0 == pStmt->sql.type) {
        pStmt->sql.type = STMT_TYPE_INSERT;
      }
      break;
    case QUERY_NODE_SELECT_STMT:
      pStmt->sql.type = STMT_TYPE_QUERY;
      break;
    default:
      tscError("not supported stmt type %d", nodeType(pStmt->sql.pQuery->pRoot));
      STMT_ERR_RET(TSDB_CODE_TSC_STMT_CLAUSE_ERROR);
  }

  STMT_ERR_RET(stmtCacheBlock(pStmt));

  return TSDB_CODE_SUCCESS;
}
D
stmt  
dapan1121 已提交
170

D
stmt  
dapan1121 已提交
171
int32_t stmtCleanBindInfo(STscStmt* pStmt) {
D
stmt  
dapan1121 已提交
172 173 174 175
  pStmt->bInfo.tbUid = 0;
  pStmt->bInfo.tbSuid = 0;
  pStmt->bInfo.tbType = 0;
  pStmt->bInfo.needParse = true;
D
stmt  
dapan1121 已提交
176

D
stmt  
dapan1121 已提交
177 178 179
  taosMemoryFreeClear(pStmt->bInfo.tbName);
  destroyBoundColumnInfo(pStmt->bInfo.boundTags);
  taosMemoryFreeClear(pStmt->bInfo.boundTags);
D
stmt  
dapan1121 已提交
180 181

  return TSDB_CODE_SUCCESS;
D
stmt  
dapan1121 已提交
182 183
}

D
dapan1121 已提交
184 185 186 187 188
int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool freeRequest) {
  if (STMT_TYPE_QUERY != pStmt->sql.type || freeRequest) {
    taos_free_result(pStmt->exec.pRequest);
    pStmt->exec.pRequest = NULL;
  }
D
stmt  
dapan1121 已提交
189 190 191 192

  void *pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
  while (pIter) {
    STableDataBlocks* pBlocks = *(STableDataBlocks**)pIter;    
D
stmt  
dapan1121 已提交
193 194
    uint64_t *key = taosHashGetKey(pIter, NULL);
    
D
stmt  
dapan1121 已提交
195
    if (keepTable && (*key == pStmt->bInfo.tbUid)) {
D
stmt  
dapan1121 已提交
196
      STMT_ERR_RET(qResetStmtDataBlock(pBlocks, true));
D
stmt  
dapan1121 已提交
197 198 199 200 201
      
      pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
      continue;
    }

D
stmt  
dapan1121 已提交
202 203
    qFreeStmtDataBlock(pBlocks);
    taosHashRemove(pStmt->exec.pBlockHash, key, sizeof(*key));
D
stmt  
dapan1121 已提交
204 205 206 207 208 209 210 211 212 213

    pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
  }

  if (keepTable) {
    return TSDB_CODE_SUCCESS;
  }

  taosHashCleanup(pStmt->exec.pBlockHash);
  pStmt->exec.pBlockHash = NULL;
D
stmt  
dapan1121 已提交
214 215 216 217 218 219 220 221 222

  STMT_ERR_RET(stmtCleanBindInfo(pStmt));

  return TSDB_CODE_SUCCESS;
}

int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
  taosMemoryFree(pStmt->sql.sqlStr);
  qDestroyQuery(pStmt->sql.pQuery);
D
dapan1121 已提交
223 224 225
  qDestroyQueryPlan(pStmt->sql.pQueryPlan);
  taosArrayDestroy(pStmt->sql.nodeList);
  
D
stmt  
dapan1121 已提交
226 227
  void *pIter = taosHashIterate(pStmt->sql.pTableCache, NULL);
  while (pIter) {
D
stmt  
dapan1121 已提交
228
    SStmtTableCache* pCache = (SStmtTableCache*)pIter;    
D
stmt  
dapan1121 已提交
229

D
stmt  
dapan1121 已提交
230
    qDestroyStmtDataBlock(pCache->pDataBlock);
D
stmt  
dapan1121 已提交
231
    destroyBoundColumnInfo(pCache->boundTags);
D
stmt  
dapan1121 已提交
232
    
D
stmt  
dapan1121 已提交
233 234 235 236 237 238 239
    pIter = taosHashIterate(pStmt->sql.pTableCache, pIter);
  }
  taosHashCleanup(pStmt->sql.pTableCache);
  pStmt->sql.pTableCache = NULL;

  memset(&pStmt->sql, 0, sizeof(pStmt->sql));

D
dapan1121 已提交
240
  STMT_ERR_RET(stmtCleanExecInfo(pStmt, false, true));
D
stmt  
dapan1121 已提交
241
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));
D
stmt  
dapan1121 已提交
242 243 244 245 246

  return TSDB_CODE_SUCCESS;
}

int32_t stmtGetFromCache(STscStmt* pStmt) {
D
stmt  
dapan1121 已提交
247 248
  pStmt->bInfo.needParse = true;

D
stmt  
dapan1121 已提交
249
  if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) {
D
stmt  
dapan1121 已提交
250 251 252 253 254 255 256 257 258
    return TSDB_CODE_SUCCESS;
  }

  if (NULL == pStmt->pCatalog) {
    STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &pStmt->pCatalog));
  }

  STableMeta *pTableMeta = NULL;
  SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
D
stmt  
dapan1121 已提交
259
  STMT_ERR_RET(catalogGetTableMeta(pStmt->pCatalog, pStmt->taos->pAppInfo->pTransporter, &ep, &pStmt->bInfo.sname, &pTableMeta));
D
stmt  
dapan1121 已提交
260

D
stmt  
dapan1121 已提交
261 262
  if (pTableMeta->uid == pStmt->bInfo.tbUid) {
    pStmt->bInfo.needParse = false;
D
stmt  
dapan1121 已提交
263
    
D
stmt  
dapan1121 已提交
264 265 266
    return TSDB_CODE_SUCCESS;
  }

D
stmt  
dapan1121 已提交
267 268 269 270 271 272 273
  if (taosHashGet(pStmt->exec.pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid))) {
    SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pTableMeta->uid, sizeof(pTableMeta->uid));
    if (NULL == pCache) {
      tscError("table uid %" PRIx64 "found in exec blockHash, but not in sql blockHash", pTableMeta->uid);
      STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
    }
    
D
stmt  
dapan1121 已提交
274
    pStmt->bInfo.needParse = false;
D
stmt  
dapan1121 已提交
275
    
D
stmt  
dapan1121 已提交
276 277 278 279
    pStmt->bInfo.tbUid = pTableMeta->uid;
    pStmt->bInfo.tbSuid = pTableMeta->suid;
    pStmt->bInfo.tbType = pTableMeta->tableType;
    pStmt->bInfo.boundTags = pCache->boundTags;
D
stmt  
dapan1121 已提交
280 281 282 283
    
    return TSDB_CODE_SUCCESS;
  }

D
stmt  
dapan1121 已提交
284 285
  SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &pTableMeta->uid, sizeof(pTableMeta->uid));
  if (pCache) {
D
stmt  
dapan1121 已提交
286
    pStmt->bInfo.needParse = false;
D
stmt  
dapan1121 已提交
287

D
stmt  
dapan1121 已提交
288 289 290 291
    pStmt->bInfo.tbUid = pTableMeta->uid;
    pStmt->bInfo.tbSuid = pTableMeta->suid;
    pStmt->bInfo.tbType = pTableMeta->tableType;
    pStmt->bInfo.boundTags = pCache->boundTags;
D
stmt  
dapan1121 已提交
292

D
stmt  
dapan1121 已提交
293
    STableDataBlocks* pNewBlock = NULL;
D
stmt  
dapan1121 已提交
294
    STMT_ERR_RET(qRebuildStmtDataBlock(&pNewBlock, pCache->pDataBlock));
D
stmt  
dapan1121 已提交
295

D
stmt  
dapan1121 已提交
296
    if (taosHashPut(pStmt->exec.pBlockHash, &pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid), &pNewBlock, POINTER_BYTES)) {
D
stmt  
dapan1121 已提交
297 298
      STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
stmt  
dapan1121 已提交
299 300 301 302
    
    return TSDB_CODE_SUCCESS;
  }

D
stmt  
dapan1121 已提交
303 304
  STMT_ERR_RET(stmtCleanBindInfo(pStmt));

D
stmt  
dapan1121 已提交
305 306 307
  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
308 309 310 311 312 313 314 315 316 317 318 319 320 321
int32_t stmtResetStmt(STscStmt* pStmt) {
  STMT_ERR_RET(stmtCleanSQLInfo(pStmt));

  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (NULL == pStmt->sql.pTableCache) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    STMT_ERR_RET(terrno);
  }

  pStmt->sql.status = STMT_INIT;

  return TSDB_CODE_SUCCESS;
}

D
stmt  
dapan1121 已提交
322

D
dapan1121 已提交
323 324 325 326 327
TAOS_STMT *stmtInit(TAOS *taos) {
  STscObj* pObj = (STscObj*)taos;
  STscStmt* pStmt = NULL;

  pStmt = taosMemoryCalloc(1, sizeof(STscStmt));
D
stmt  
dapan1121 已提交
328
  if (NULL == pStmt) {
D
dapan1121 已提交
329 330 331
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }
D
stmt  
dapan1121 已提交
332

D
stmt  
dapan1121 已提交
333 334
  pStmt->sql.pTableCache = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
  if (NULL == pStmt->sql.pTableCache) {
D
stmt  
dapan1121 已提交
335 336 337 338
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    taosMemoryFree(pStmt);
    return NULL;
  }
D
stmt  
dapan1121 已提交
339

D
dapan1121 已提交
340
  pStmt->taos = pObj;
D
stmt  
dapan1121 已提交
341
  pStmt->bInfo.needParse = true;
D
stmt  
dapan1121 已提交
342
  pStmt->sql.status = STMT_INIT;
D
stmt  
dapan1121 已提交
343
  
D
stmt  
dapan1121 已提交
344 345
  return pStmt;
}
D
dapan1121 已提交
346

D
stmt  
dapan1121 已提交
347 348 349
int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
  STscStmt* pStmt = (STscStmt*)stmt;

D
stmt  
dapan1121 已提交
350
  if (pStmt->sql.status >= STMT_PREPARE) {
D
stmt  
dapan1121 已提交
351
    STMT_ERR_RET(stmtResetStmt(pStmt));
D
stmt  
dapan1121 已提交
352 353
  }

D
stmt  
dapan1121 已提交
354 355 356 357 358
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_PREPARE));

  if (length <= 0) {
    length = strlen(sql);
  }
D
stmt  
dapan1121 已提交
359
  
D
stmt  
dapan1121 已提交
360 361
  pStmt->sql.sqlStr = strndup(sql, length);
  pStmt->sql.sqlLen = length;
D
stmt  
dapan1121 已提交
362 363 364 365 366

  return TSDB_CODE_SUCCESS;
}


D
stmt  
dapan1121 已提交
367
int stmtSetTbName(TAOS_STMT *stmt, const char *tbName) {
D
stmt  
dapan1121 已提交
368 369
  STscStmt* pStmt = (STscStmt*)stmt;

D
stmt  
dapan1121 已提交
370
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTBNAME));
D
stmt  
dapan1121 已提交
371

D
dapan1121 已提交
372 373 374 375 376 377 378
  int32_t insert = 0;
  stmtIsInsert(stmt, &insert);
  if (0 == insert) {
    tscError("set tb name not available for none insert statement");
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
  }

D
stmt  
dapan1121 已提交
379 380
  if (NULL == pStmt->exec.pRequest) {
    STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
D
dapan1121 已提交
381
  }
D
stmt  
dapan1121 已提交
382
  
D
stmt  
dapan1121 已提交
383
  STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
D
stmt  
dapan1121 已提交
384
    
D
stmt  
dapan1121 已提交
385
  STMT_ERR_RET(stmtGetFromCache(pStmt));
D
stmt  
dapan1121 已提交
386

D
stmt  
dapan1121 已提交
387 388 389 390 391
  if (pStmt->bInfo.needParse) {
    taosMemoryFree(pStmt->bInfo.tbName);
    pStmt->bInfo.tbName = strdup(tbName);
  }

D
stmt  
dapan1121 已提交
392 393 394
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
395
int stmtSetTbTags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags) {
D
stmt  
dapan1121 已提交
396
  STscStmt* pStmt = (STscStmt*)stmt;
D
dapan1121 已提交
397

D
stmt  
dapan1121 已提交
398
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
D
stmt  
dapan1121 已提交
399

D
stmt  
dapan1121 已提交
400
  if (pStmt->bInfo.needParse) {
D
stmt  
dapan1121 已提交
401 402 403
    STMT_ERR_RET(stmtParseSql(pStmt));
  }

D
stmt  
dapan1121 已提交
404
  STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid));
D
stmt  
dapan1121 已提交
405
  if (NULL == pDataBlock) {
D
stmt  
dapan1121 已提交
406
    tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid);
D
stmt  
dapan1121 已提交
407 408 409
    STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }
  
D
stmt  
dapan1121 已提交
410
  STMT_ERR_RET(qBindStmtTagsValue(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.tbSuid, &pStmt->bInfo.sname, tags, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
D
stmt  
dapan1121 已提交
411

D
stmt  
dapan1121 已提交
412 413 414 415
  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
416
int32_t stmtFetchTagFields(STscStmt* pStmt, int32_t *fieldNum, TAOS_FIELD** fields) {
D
stmt  
dapan1121 已提交
417 418 419 420 421
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
    tscError("invalid operation to get query tag fileds");
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
  }

D
stmt  
dapan1121 已提交
422
  STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid));
D
stmt  
dapan1121 已提交
423
  if (NULL == pDataBlock) {
D
stmt  
dapan1121 已提交
424
    tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid);
D
stmt  
dapan1121 已提交
425 426 427
    STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }

D
stmt  
dapan1121 已提交
428
  STMT_ERR_RET(qBuildStmtTagFields(*pDataBlock, pStmt->bInfo.boundTags, fieldNum, fields));
D
stmt  
dapan1121 已提交
429

D
stmt  
dapan1121 已提交
430 431
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
432

D
dapan1121 已提交
433
int32_t stmtFetchColFields(STscStmt* pStmt, int32_t *fieldNum, TAOS_FIELD** fields) {
D
stmt  
dapan1121 已提交
434 435 436 437 438
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
    tscError("invalid operation to get query column fileds");
    STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
  }

D
stmt  
dapan1121 已提交
439
  STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid));
D
stmt  
dapan1121 已提交
440
  if (NULL == pDataBlock) {
D
stmt  
dapan1121 已提交
441
    tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid);
D
stmt  
dapan1121 已提交
442 443 444
    STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }

D
stmt  
dapan1121 已提交
445
  STMT_ERR_RET(qBuildStmtColFields(*pDataBlock, fieldNum, fields));
D
stmt  
dapan1121 已提交
446 447 448 449

  return TSDB_CODE_SUCCESS;  
}

D
dapan1121 已提交
450
int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int32_t colIdx) {
D
stmt  
dapan1121 已提交
451 452
  STscStmt* pStmt = (STscStmt*)stmt;

D
stmt  
dapan1121 已提交
453
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_BIND));
D
stmt  
dapan1121 已提交
454

D
stmt  
dapan1121 已提交
455 456
  if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 && STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
    pStmt->bInfo.needParse = false;
D
stmt  
dapan1121 已提交
457
  }
D
stmt  
dapan1121 已提交
458

D
dapan1121 已提交
459 460 461 462 463
  if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
    taos_free_result(pStmt->exec.pRequest);
    pStmt->exec.pRequest = NULL;
  }
  
D
stmt  
dapan1121 已提交
464 465
  if (NULL == pStmt->exec.pRequest) {
    STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
D
stmt  
dapan1121 已提交
466 467
  }

D
stmt  
dapan1121 已提交
468
  if (pStmt->bInfo.needParse) {
D
stmt  
dapan1121 已提交
469 470
    STMT_ERR_RET(stmtParseSql(pStmt));
  }
D
stmt  
dapan1121 已提交
471

D
dapan1121 已提交
472 473 474 475 476 477 478 479 480 481
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
    if (NULL == pStmt->sql.pQueryPlan) {
      STMT_ERR_RET(getQueryPlan(pStmt->exec.pRequest, pStmt->sql.pQuery, &pStmt->sql.nodeList));
      pStmt->sql.pQueryPlan = pStmt->exec.pRequest->body.pDag;
      pStmt->exec.pRequest->body.pDag = NULL;
    }
    
    STMT_RET(qStmtBindParam(pStmt->sql.pQueryPlan, bind, colIdx));
  }
  
D
stmt  
dapan1121 已提交
482
  STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid));
D
stmt  
dapan1121 已提交
483
  if (NULL == pDataBlock) {
D
stmt  
dapan1121 已提交
484
    tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid);
D
stmt  
dapan1121 已提交
485 486
    STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
  }
D
stmt  
dapan1121 已提交
487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503

  if (colIdx < 0) {
    qBindStmtColsValue(*pDataBlock, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen);
  } else {
    if (colIdx != (pStmt->bInfo.sBindLastIdx + 1) && colIdx != 0) {
      tscError("bind column index not in sequence");
      STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
    }

    pStmt->bInfo.sBindLastIdx = colIdx;
    
    if (0 == colIdx) {
      pStmt->bInfo.sBindRowNum = bind->num;
    }
    
    qBindStmtSingleColValue(*pDataBlock, bind, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen, colIdx, pStmt->bInfo.sBindRowNum);
  }
D
stmt  
dapan1121 已提交
504 505
  
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
506 507
}

D
stmt  
dapan1121 已提交
508 509 510 511

int stmtAddBatch(TAOS_STMT *stmt) {
  STscStmt* pStmt = (STscStmt*)stmt;

D
stmt  
dapan1121 已提交
512
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_ADD_BATCH));
D
stmt  
dapan1121 已提交
513

D
stmt  
dapan1121 已提交
514
  STMT_ERR_RET(stmtCacheBlock(pStmt));
D
stmt  
dapan1121 已提交
515
  
D
stmt  
dapan1121 已提交
516 517 518 519
  return TSDB_CODE_SUCCESS;
}

int stmtExec(TAOS_STMT *stmt) {
D
stmt  
dapan1121 已提交
520 521 522
  STscStmt* pStmt = (STscStmt*)stmt;
  int32_t code = 0;

D
stmt  
dapan1121 已提交
523
  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_EXECUTE));
D
stmt  
dapan1121 已提交
524

D
dapan1121 已提交
525 526 527 528 529 530 531
  if (STMT_TYPE_QUERY == pStmt->sql.type) {
    scheduleQuery(pStmt->exec.pRequest, pStmt->sql.pQueryPlan, pStmt->sql.nodeList);
  } else {
    STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->exec.pVgHash, pStmt->exec.pBlockHash));
    launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, TSDB_CODE_SUCCESS, true);
  }
  
D
stmt  
dapan1121 已提交
532
  STMT_ERR_JRET(pStmt->exec.pRequest->code);
D
stmt  
dapan1121 已提交
533

D
dapan1121 已提交
534 535
  pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest);
  pStmt->affectedRows += pStmt->exec.affectedRows;
D
stmt  
dapan1121 已提交
536

D
stmt  
dapan1121 已提交
537 538
_return:

D
dapan1121 已提交
539
  stmtCleanExecInfo(pStmt, (code ? false : true), false);
D
stmt  
dapan1121 已提交
540 541
  
  ++pStmt->sql.runTimes;
D
stmt  
dapan1121 已提交
542 543
  
  STMT_RET(code);
D
stmt  
dapan1121 已提交
544 545 546
}


D
stmt  
dapan1121 已提交
547
int stmtClose(TAOS_STMT *stmt) {
D
stmt  
dapan1121 已提交
548 549 550
  STscStmt* pStmt = (STscStmt*)stmt;

  STMT_RET(stmtCleanSQLInfo(pStmt));
D
stmt  
dapan1121 已提交
551 552
}

D
stmt  
dapan1121 已提交
553
const char *stmtErrstr(TAOS_STMT *stmt) {
D
stmt  
dapan1121 已提交
554
  STscStmt* pStmt = (STscStmt*)stmt;
D
stmt  
dapan1121 已提交
555

D
stmt  
dapan1121 已提交
556 557 558 559
  if (stmt == NULL) {
    return (char*) tstrerror(terrno);
  }

D
stmt  
dapan1121 已提交
560 561 562 563
  if (pStmt->exec.pRequest) {
    pStmt->exec.pRequest->code = terrno;
  }

D
stmt  
dapan1121 已提交
564
  return taos_errstr(pStmt->exec.pRequest);
D
stmt  
dapan1121 已提交
565 566
}

D
stmt  
dapan1121 已提交
567
int stmtAffectedRows(TAOS_STMT *stmt) {
D
stmt  
dapan1121 已提交
568
  return ((STscStmt*)stmt)->affectedRows;
D
stmt  
dapan1121 已提交
569 570
}

D
dapan1121 已提交
571 572 573 574
int stmtAffectedRowsOnce(TAOS_STMT *stmt) {
  return ((STscStmt*)stmt)->exec.affectedRows;
}

D
stmt  
dapan1121 已提交
575
int stmtIsInsert(TAOS_STMT *stmt, int *insert) {
D
stmt  
dapan1121 已提交
576 577
  STscStmt* pStmt = (STscStmt*)stmt;

D
stmt  
dapan1121 已提交
578 579 580 581 582
  if (pStmt->sql.type) {
    *insert = (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type);
  } else {
    *insert = isInsertSql(pStmt->sql.sqlStr, 0);
  }
D
stmt  
dapan1121 已提交
583
  
D
stmt  
dapan1121 已提交
584 585 586 587
  return TSDB_CODE_SUCCESS;
}

int stmtGetParamNum(TAOS_STMT *stmt, int *nums) {
D
dapan1121 已提交
588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606
  STscStmt* pStmt = (STscStmt*)stmt;

  STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));

  if (pStmt->bInfo.needParse) {
    STMT_ERR_RET(stmtParseSql(pStmt));
  }

  if (STMT_TYPE_QUERY == pStmt->sql.type) {
    if (NULL == pStmt->sql.pQueryPlan) {
      STMT_ERR_RET(getQueryPlan(pStmt->exec.pRequest, pStmt->sql.pQuery, &pStmt->sql.nodeList));
      pStmt->sql.pQueryPlan = pStmt->exec.pRequest->body.pDag;
      pStmt->exec.pRequest->body.pDag = NULL;
    }

    *nums = (pStmt->sql.pQueryPlan->pPlaceholderValues) ? pStmt->sql.pQueryPlan->pPlaceholderValues->length : 0;
  } else {
    STMT_ERR_RET(stmtFetchColFields(stmt, nums, NULL));
  }
D
stmt  
dapan1121 已提交
607
  
D
stmt  
dapan1121 已提交
608 609 610 611
  return TSDB_CODE_SUCCESS;
}

TAOS_RES *stmtUseResult(TAOS_STMT *stmt) {
D
dapan1121 已提交
612 613 614 615 616 617 618 619
  STscStmt* pStmt = (STscStmt*)stmt;

  if (STMT_TYPE_QUERY != pStmt->sql.type) {
    tscError("useResult only for query statement");
    return NULL;
  }

  return pStmt->exec.pRequest;
D
stmt  
dapan1121 已提交
620 621 622 623
}