未验证 提交 0b28267f 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #8518 from taosdata/fix/TD-10823

[TD-10823]<fix>: after calling taos_stmt_execute, clean up the cache …
...@@ -81,6 +81,10 @@ tests/comparisonTest/opentsdb/opentsdbtest/.settings/ ...@@ -81,6 +81,10 @@ tests/comparisonTest/opentsdb/opentsdbtest/.settings/
tests/examples/JDBC/JDBCDemo/.classpath tests/examples/JDBC/JDBCDemo/.classpath
tests/examples/JDBC/JDBCDemo/.project tests/examples/JDBC/JDBCDemo/.project
tests/examples/JDBC/JDBCDemo/.settings/ tests/examples/JDBC/JDBCDemo/.settings/
tests/script/api/batchprepare
tests/script/api/stmt
tests/script/api/stmtBatchTest
tests/script/api/stmtTest
# Emacs # Emacs
# -*- mode: gitignore; -*- # -*- mode: gitignore; -*-
......
...@@ -108,6 +108,7 @@ typedef struct SBlockKeyInfo { ...@@ -108,6 +108,7 @@ typedef struct SBlockKeyInfo {
int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *len); int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *len);
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks); int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks);
int32_t tscCreateDataBlockData(STableDataBlocks* dataBuf, size_t defaultSize, int32_t rowSize, int32_t startOffset);
void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta); void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta);
void tscSortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf); void tscSortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf);
int tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo); int tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo);
......
...@@ -48,12 +48,14 @@ typedef struct SMultiTbStmt { ...@@ -48,12 +48,14 @@ typedef struct SMultiTbStmt {
bool nameSet; bool nameSet;
bool tagSet; bool tagSet;
bool subSet; bool subSet;
bool tagColSet;
uint64_t currentUid; uint64_t currentUid;
char *sqlstr; char *sqlstr;
uint32_t tbNum; uint32_t tbNum;
SStrToken tbname; SStrToken tbname;
SStrToken stbname; SStrToken stbname;
SStrToken values; SStrToken values;
SStrToken tagCols;
SArray *tags; SArray *tags;
STableDataBlocks *lastBlock; STableDataBlocks *lastBlock;
SHashObj *pTableHash; SHashObj *pTableHash;
...@@ -1246,6 +1248,12 @@ static void insertBatchClean(STscStmt* pStmt) { ...@@ -1246,6 +1248,12 @@ static void insertBatchClean(STscStmt* pStmt) {
pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks); pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks);
pCmd->insertParam.numOfTables = 0; pCmd->insertParam.numOfTables = 0;
STableDataBlocks** p = taosHashIterate(pCmd->insertParam.pTableBlockHashList, NULL);
while(p) {
tfree((*p)->pData);
p = taosHashIterate(pCmd->insertParam.pTableBlockHashList, p);
}
taosHashClear(pCmd->insertParam.pTableBlockHashList); taosHashClear(pCmd->insertParam.pTableBlockHashList);
tscFreeSqlResult(pSql); tscFreeSqlResult(pSql);
tscFreeSubobj(pSql); tscFreeSubobj(pSql);
...@@ -1337,9 +1345,40 @@ int stmtParseInsertTbTags(SSqlObj* pSql, STscStmt* pStmt) { ...@@ -1337,9 +1345,40 @@ int stmtParseInsertTbTags(SSqlObj* pSql, STscStmt* pStmt) {
pStmt->mtb.stbname = sToken; pStmt->mtb.stbname = sToken;
sToken = tStrGetToken(pCmd->insertParam.sql, &index, false); sToken = tStrGetToken(pCmd->insertParam.sql, &index, false);
if (sToken.n <= 0 || sToken.type != TK_TAGS) { if (sToken.n <= 0 || ((sToken.type != TK_TAGS) && (sToken.type != TK_LP))) {
tscError("keyword TAGS expected, sql:%s", pCmd->insertParam.sql); tscError("invalid token, sql:%s", pCmd->insertParam.sql);
return tscSQLSyntaxErrMsg(pCmd->payload, "keyword TAGS expected", sToken.z ? sToken.z : pCmd->insertParam.sql); return tscSQLSyntaxErrMsg(pCmd->payload, "invalid token", sToken.z ? sToken.z : pCmd->insertParam.sql);
}
// ... (tag_col_list) TAGS(tag_val_list) ...
int32_t tagColsCnt = 0;
if (sToken.type == TK_LP) {
pStmt->mtb.tagColSet = true;
pStmt->mtb.tagCols = sToken;
int32_t tagColsStart = index;
while (1) {
sToken = tStrGetToken(pCmd->insertParam.sql, &index, false);
if (sToken.type == TK_ILLEGAL) {
return tscSQLSyntaxErrMsg(pCmd->payload, "unrecognized token", sToken.z);
}
if (sToken.type == TK_ID) {
++tagColsCnt;
}
if (sToken.type == TK_RP) {
break;
}
}
if (tagColsCnt == 0) {
tscError("tag column list expected, sql:%s", pCmd->insertParam.sql);
return tscSQLSyntaxErrMsg(pCmd->payload, "tag column list expected", pCmd->insertParam.sql);
}
pStmt->mtb.tagCols.n = index - tagColsStart + 1;
sToken = tStrGetToken(pCmd->insertParam.sql, &index, false);
if (sToken.n <= 0 || sToken.type != TK_TAGS) {
tscError("keyword TAGS expected, sql:%s", pCmd->insertParam.sql);
return tscSQLSyntaxErrMsg(pCmd->payload, "keyword TAGS expected", sToken.z ? sToken.z : pCmd->insertParam.sql);
}
} }
sToken = tStrGetToken(pCmd->insertParam.sql, &index, false); sToken = tStrGetToken(pCmd->insertParam.sql, &index, false);
...@@ -1379,6 +1418,11 @@ int stmtParseInsertTbTags(SSqlObj* pSql, STscStmt* pStmt) { ...@@ -1379,6 +1418,11 @@ int stmtParseInsertTbTags(SSqlObj* pSql, STscStmt* pStmt) {
return tscSQLSyntaxErrMsg(pCmd->payload, "no tags", pCmd->insertParam.sql); return tscSQLSyntaxErrMsg(pCmd->payload, "no tags", pCmd->insertParam.sql);
} }
if (tagColsCnt > 0 && taosArrayGetSize(pStmt->mtb.tags) != tagColsCnt) {
tscError("not match tags, sql:%s", pCmd->insertParam.sql);
return tscSQLSyntaxErrMsg(pCmd->payload, "not match tags", pCmd->insertParam.sql);
}
sToken = tStrGetToken(pCmd->insertParam.sql, &index, false); sToken = tStrGetToken(pCmd->insertParam.sql, &index, false);
if (sToken.n <= 0 || (sToken.type != TK_VALUES && sToken.type != TK_LP)) { if (sToken.n <= 0 || (sToken.type != TK_VALUES && sToken.type != TK_LP)) {
tscError("sql error, sql:%s", pCmd->insertParam.sql); tscError("sql error, sql:%s", pCmd->insertParam.sql);
...@@ -1401,7 +1445,13 @@ int stmtGenInsertStatement(SSqlObj* pSql, STscStmt* pStmt, const char* name, TAO ...@@ -1401,7 +1445,13 @@ int stmtGenInsertStatement(SSqlObj* pSql, STscStmt* pStmt, const char* name, TAO
int32_t j = 0; int32_t j = 0;
while (1) { while (1) {
len = (size_t)snprintf(str, size - 1, "insert into %s using %.*s tags(", name, pStmt->mtb.stbname.n, pStmt->mtb.stbname.z); if (pStmt->mtb.tagColSet) {
len = (size_t)snprintf(str, size - 1, "insert into %s using %.*s %.*s tags(",
name, pStmt->mtb.stbname.n, pStmt->mtb.stbname.z, pStmt->mtb.tagCols.n, pStmt->mtb.tagCols.z);
} else {
len = (size_t)snprintf(str, size - 1, "insert into %s using %.*s tags(", name, pStmt->mtb.stbname.n, pStmt->mtb.stbname.z);
}
if (len >= (size -1)) { if (len >= (size -1)) {
size *= 2; size *= 2;
free(str); free(str);
...@@ -1637,6 +1687,13 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags ...@@ -1637,6 +1687,13 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
STMT_RET(TSDB_CODE_TSC_APP_ERROR); STMT_RET(TSDB_CODE_TSC_APP_ERROR);
} }
if ((*t1)->pData == NULL) {
code = tscCreateDataBlockData(*t1, TSDB_PAYLOAD_SIZE, (*t1)->pTableMeta->tableInfo.rowSize, sizeof(SSubmitBlk));
if (code != TSDB_CODE_SUCCESS) {
STMT_RET(code);
}
}
SSubmitBlk* pBlk = (SSubmitBlk*) (*t1)->pData; SSubmitBlk* pBlk = (SSubmitBlk*) (*t1)->pData;
pCmd->batchSize = pBlk->numOfRows; pCmd->batchSize = pBlk->numOfRows;
if (pBlk->numOfRows == 0) { if (pBlk->numOfRows == 0) {
...@@ -1695,7 +1752,6 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags ...@@ -1695,7 +1752,6 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
STMT_RET(TSDB_CODE_SUCCESS); STMT_RET(TSDB_CODE_SUCCESS);
} }
if (pStmt->mtb.tagSet) { if (pStmt->mtb.tagSet) {
pStmt->mtb.tbname = tscReplaceStrToken(&pSql->sqlstr, &pStmt->mtb.tbname, name); pStmt->mtb.tbname = tscReplaceStrToken(&pSql->sqlstr, &pStmt->mtb.tbname, name);
} else { } else {
...@@ -1762,7 +1818,6 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags ...@@ -1762,7 +1818,6 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
STMT_RET(code); STMT_RET(code);
} }
int taos_stmt_set_sub_tbname(TAOS_STMT* stmt, const char* name) { int taos_stmt_set_sub_tbname(TAOS_STMT* stmt, const char* name) {
STscStmt* pStmt = (STscStmt*)stmt; STscStmt* pStmt = (STscStmt*)stmt;
STMT_CHECK STMT_CHECK
...@@ -1770,8 +1825,6 @@ int taos_stmt_set_sub_tbname(TAOS_STMT* stmt, const char* name) { ...@@ -1770,8 +1825,6 @@ int taos_stmt_set_sub_tbname(TAOS_STMT* stmt, const char* name) {
return taos_stmt_set_tbname_tags(stmt, name, NULL); return taos_stmt_set_tbname_tags(stmt, name, NULL);
} }
int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) { int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) {
STscStmt* pStmt = (STscStmt*)stmt; STscStmt* pStmt = (STscStmt*)stmt;
STMT_CHECK STMT_CHECK
...@@ -1779,7 +1832,6 @@ int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) { ...@@ -1779,7 +1832,6 @@ int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) {
return taos_stmt_set_tbname_tags(stmt, name, NULL); return taos_stmt_set_tbname_tags(stmt, name, NULL);
} }
int taos_stmt_close(TAOS_STMT* stmt) { int taos_stmt_close(TAOS_STMT* stmt) {
STscStmt* pStmt = (STscStmt*)stmt; STscStmt* pStmt = (STscStmt*)stmt;
if (pStmt == NULL || pStmt->taos == NULL) { if (pStmt == NULL || pStmt->taos == NULL) {
...@@ -1846,7 +1898,6 @@ int taos_stmt_bind_param(TAOS_STMT* stmt, TAOS_BIND* bind) { ...@@ -1846,7 +1898,6 @@ int taos_stmt_bind_param(TAOS_STMT* stmt, TAOS_BIND* bind) {
} }
} }
int taos_stmt_bind_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind) { int taos_stmt_bind_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind) {
STscStmt* pStmt = (STscStmt*)stmt; STscStmt* pStmt = (STscStmt*)stmt;
...@@ -1910,8 +1961,6 @@ int taos_stmt_bind_single_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, in ...@@ -1910,8 +1961,6 @@ int taos_stmt_bind_single_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, in
STMT_RET(insertStmtBindParamBatch(pStmt, bind, colIdx)); STMT_RET(insertStmtBindParamBatch(pStmt, bind, colIdx));
} }
int taos_stmt_add_batch(TAOS_STMT* stmt) { int taos_stmt_add_batch(TAOS_STMT* stmt) {
STscStmt* pStmt = (STscStmt*)stmt; STscStmt* pStmt = (STscStmt*)stmt;
STMT_CHECK STMT_CHECK
...@@ -2052,7 +2101,6 @@ int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes) { ...@@ -2052,7 +2101,6 @@ int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes) {
} }
} }
char *taos_stmt_errstr(TAOS_STMT *stmt) { char *taos_stmt_errstr(TAOS_STMT *stmt) {
STscStmt* pStmt = (STscStmt*)stmt; STscStmt* pStmt = (STscStmt*)stmt;
...@@ -2063,8 +2111,6 @@ char *taos_stmt_errstr(TAOS_STMT *stmt) { ...@@ -2063,8 +2111,6 @@ char *taos_stmt_errstr(TAOS_STMT *stmt) {
return taos_errstr(pStmt->pSql); return taos_errstr(pStmt->pSql);
} }
const char *taos_data_type(int type) { const char *taos_data_type(int type) {
switch (type) { switch (type) {
case TSDB_DATA_TYPE_NULL: return "TSDB_DATA_TYPE_NULL"; case TSDB_DATA_TYPE_NULL: return "TSDB_DATA_TYPE_NULL";
...@@ -2081,4 +2127,3 @@ const char *taos_data_type(int type) { ...@@ -2081,4 +2127,3 @@ const char *taos_data_type(int type) {
default: return "UNKNOWN"; default: return "UNKNOWN";
} }
} }
...@@ -1797,6 +1797,32 @@ int32_t tscCreateDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOff ...@@ -1797,6 +1797,32 @@ int32_t tscCreateDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOff
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
int32_t code = tscCreateDataBlockData(dataBuf, defaultSize, rowSize, startOffset);
if (code != TSDB_CODE_SUCCESS) {
tfree(dataBuf);
return code;
}
//Here we keep the tableMeta to avoid it to be remove by other threads.
dataBuf->pTableMeta = tscTableMetaDup(pTableMeta);
SParsedDataColInfo* pColInfo = &dataBuf->boundColumnInfo;
SSchema* pSchema = tscGetTableSchema(dataBuf->pTableMeta);
tscSetBoundColumnInfo(pColInfo, pSchema, dataBuf->pTableMeta->tableInfo.numOfColumns);
dataBuf->vgId = dataBuf->pTableMeta->vgId;
tNameAssign(&dataBuf->tableName, name);
assert(defaultSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);
*dataBlocks = dataBuf;
return TSDB_CODE_SUCCESS;
}
int32_t tscCreateDataBlockData(STableDataBlocks* dataBuf, size_t defaultSize, int32_t rowSize, int32_t startOffset) {
assert(dataBuf != NULL);
dataBuf->nAllocSize = (uint32_t)defaultSize; dataBuf->nAllocSize = (uint32_t)defaultSize;
dataBuf->headerSize = startOffset; dataBuf->headerSize = startOffset;
...@@ -1809,30 +1835,16 @@ int32_t tscCreateDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOff ...@@ -1809,30 +1835,16 @@ int32_t tscCreateDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOff
dataBuf->pData = malloc(dataBuf->nAllocSize); dataBuf->pData = malloc(dataBuf->nAllocSize);
if (dataBuf->pData == NULL) { if (dataBuf->pData == NULL) {
tscError("failed to allocated memory, reason:%s", strerror(errno)); tscError("failed to allocated memory, reason:%s", strerror(errno));
tfree(dataBuf);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
memset(dataBuf->pData, 0, sizeof(SSubmitBlk)); memset(dataBuf->pData, 0, sizeof(SSubmitBlk));
//Here we keep the tableMeta to avoid it to be remove by other threads.
dataBuf->pTableMeta = tscTableMetaDup(pTableMeta);
SParsedDataColInfo* pColInfo = &dataBuf->boundColumnInfo;
SSchema* pSchema = tscGetTableSchema(dataBuf->pTableMeta);
tscSetBoundColumnInfo(pColInfo, pSchema, dataBuf->pTableMeta->tableInfo.numOfColumns);
dataBuf->ordered = true; dataBuf->ordered = true;
dataBuf->prevTS = INT64_MIN; dataBuf->prevTS = INT64_MIN;
dataBuf->rowSize = rowSize; dataBuf->rowSize = rowSize;
dataBuf->size = startOffset; dataBuf->size = startOffset;
dataBuf->tsSource = -1; dataBuf->tsSource = -1;
dataBuf->vgId = dataBuf->pTableMeta->vgId;
tNameAssign(&dataBuf->tableName, name);
assert(defaultSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);
*dataBlocks = dataBuf;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -1761,7 +1761,7 @@ int stmt_funcb_autoctb_e1(TAOS_STMT *stmt) { ...@@ -1761,7 +1761,7 @@ int stmt_funcb_autoctb_e1(TAOS_STMT *stmt) {
int code = taos_stmt_prepare(stmt, sql, 0); int code = taos_stmt_prepare(stmt, sql, 0);
if (code != 0){ if (code != 0){
printf("failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt)); printf("failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt));
return -1; exit(1);
} }
int id = 0; int id = 0;
...@@ -1797,9 +1797,44 @@ int stmt_funcb_autoctb_e1(TAOS_STMT *stmt) { ...@@ -1797,9 +1797,44 @@ int stmt_funcb_autoctb_e1(TAOS_STMT *stmt) {
return 0; return 0;
} }
int stmt_multi_insert_check(TAOS_STMT *stmt) {
char *sql;
// The number of tag column list is not equal to the number of tag value list
sql = "insert into ? using stb1 (id1) tags(1,?) values(?,?,?,?,?,?,?,?,?,?)";
if (0 == taos_stmt_prepare(stmt, sql, 0)) {
printf("failed to check taos_stmt_prepare. sql:%s\n", sql);
exit(1);
}
// The number of column list is not equal to the number of value list
sql = "insert into ? using stb1 tags(1,?,2,?,4,?,6.0,?,'b') "
"(ts, b, v1, v2, v4, v8, f4, f8, bin) values(?,?,?,?,?,?,?,?,?,?)";
if (0 == taos_stmt_prepare(stmt, sql, 0)) {
printf("failed to check taos_stmt_prepare. sql:%s\n", sql);
exit(1);
}
sql = "insert into ? using stb1 () tags(1,?) values(?,?,?,?,?,?,?,?,?,?)";
if (0 == taos_stmt_prepare(stmt, sql, 0)) {
printf("failed to check taos_stmt_prepare. sql:%s\n", sql);
exit(1);
}
sql = "insert into ? using stb1 ( tags(1,?) values(?,?,?,?,?,?,?,?,?,?)";
if (0 == taos_stmt_prepare(stmt, sql, 0)) {
printf("failed to check taos_stmt_prepare. sql:%s\n", sql);
exit(1);
}
sql = "insert into ? using stb1 ) tags(1,?) values(?,?,?,?,?,?,?,?,?,?)";
if (0 == taos_stmt_prepare(stmt, sql, 0)) {
printf("failed to check taos_stmt_prepare. sql:%s\n", sql);
exit(1);
}
return 0;
}
//1 tables 10 records //1 tables 10 records
int stmt_funcb_autoctb_e2(TAOS_STMT *stmt) { int stmt_funcb_autoctb_e2(TAOS_STMT *stmt) {
...@@ -4505,7 +4540,6 @@ void* runcase(void *par) { ...@@ -4505,7 +4540,6 @@ void* runcase(void *par) {
(void)idx; (void)idx;
#if 1 #if 1
prepare(taos, 1, 1); prepare(taos, 1, 1);
...@@ -4819,6 +4853,16 @@ void* runcase(void *par) { ...@@ -4819,6 +4853,16 @@ void* runcase(void *par) {
#endif #endif
#if 1
prepare(taos, 1, 0);
stmt = taos_stmt_init(taos);
printf("stmt_multi_insert_check start\n");
stmt_multi_insert_check(stmt);
printf("stmt_multi_insert_check end\n");
taos_stmt_close(stmt);
#endif
#if 1 #if 1
prepare(taos, 1, 1); prepare(taos, 1, 1);
...@@ -5007,7 +5051,6 @@ void* runcase(void *par) { ...@@ -5007,7 +5051,6 @@ void* runcase(void *par) {
printf("check result end\n"); printf("check result end\n");
#endif #endif
#if 1 #if 1
preparem(taos, 0, idx); preparem(taos, 0, idx);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册