提交 16378507 编写于 作者: D dapan1121

support auto create table

上级 13004a57
......@@ -99,6 +99,8 @@ static FORCE_INLINE SQueryInfo* tscGetQueryInfo(SSqlCmd* pCmd, int32_t subClause
return pCmd->pQueryInfo[subClauseIndex];
}
int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *len);
SQueryInfo* tscGetActiveQueryInfo(SSqlCmd* pCmd);
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks);
......
......@@ -46,9 +46,13 @@ typedef struct SNormalStmt {
typedef struct SMultiTbStmt {
bool nameSet;
bool tagSet;
uint64_t currentUid;
uint32_t tbNum;
SStrToken tbname;
SStrToken stbname;
SStrToken values;
SArray *tags;
SHashObj *pTableHash;
SHashObj *pTableBlockHashList; // data block for each table
} SMultiTbStmt;
......@@ -1201,6 +1205,185 @@ static int insertBatchStmtExecute(STscStmt* pStmt) {
return pStmt->pSql->res.code;
}
int stmtParseInsertTbTags(SSqlObj* pSql, STscStmt* pStmt) {
SSqlCmd *pCmd = &pSql->cmd;
int32_t ret = TSDB_CODE_SUCCESS;
if ((ret = tsInsertInitialCheck(pSql)) != TSDB_CODE_SUCCESS) {
return ret;
}
int32_t index = 0;
SStrToken sToken = tStrGetToken(pCmd->curSql, &index, false);
if (sToken.n == 0) {
return TSDB_CODE_TSC_INVALID_SQL;
}
if (sToken.n == 1 && sToken.type == TK_QUESTION) {
pStmt->multiTbInsert = true;
pStmt->mtb.tbname = sToken;
pStmt->mtb.nameSet = false;
if (pStmt->mtb.pTableHash == NULL) {
pStmt->mtb.pTableHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
}
if (pStmt->mtb.pTableBlockHashList == NULL) {
pStmt->mtb.pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
}
pStmt->mtb.tagSet = true;
sToken = tStrGetToken(pCmd->curSql, &index, false);
if (sToken.n > 0 && sToken.type == TK_VALUES) {
return TSDB_CODE_SUCCESS;
}
if (sToken.n <= 0 || sToken.type != TK_USING) {
return TSDB_CODE_TSC_INVALID_SQL;
}
sToken = tStrGetToken(pCmd->curSql, &index, false);
if (sToken.n <= 0 || ((sToken.type != TK_ID) && (sToken.type != TK_STRING))) {
return TSDB_CODE_TSC_INVALID_SQL;
}
pStmt->mtb.stbname = sToken;
sToken = tStrGetToken(pCmd->curSql, &index, false);
if (sToken.n <= 0 || sToken.type != TK_TAGS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
sToken = tStrGetToken(pCmd->curSql, &index, false);
if (sToken.n <= 0 || sToken.type != TK_LP) {
return TSDB_CODE_TSC_INVALID_SQL;
}
pStmt->mtb.tags = taosArrayInit(4, sizeof(SStrToken));
int32_t loopCont = 1;
while (loopCont) {
sToken = tStrGetToken(pCmd->curSql, &index, false);
if (sToken.n <= 0) {
return TSDB_CODE_TSC_INVALID_SQL;
}
switch (sToken.type) {
case TK_RP:
loopCont = 0;
break;
case TK_VALUES:
return TSDB_CODE_TSC_INVALID_SQL;
case TK_QUESTION:
pStmt->mtb.tagSet = false; //continue
default:
taosArrayPush(pStmt->mtb.tags, &sToken);
break;
}
}
if (taosArrayGetSize(pStmt->mtb.tags) <= 0) {
return TSDB_CODE_TSC_INVALID_SQL;
}
sToken = tStrGetToken(pCmd->curSql, &index, false);
if (sToken.n <= 0 || sToken.type != TK_VALUES) {
return TSDB_CODE_TSC_INVALID_SQL;
}
pStmt->mtb.values = sToken;
}
return TSDB_CODE_SUCCESS;
}
int stmtGenInsertStatement(SSqlObj* pSql, STscStmt* pStmt, const char* name, TAOS_BIND* tags) {
size_t tagNum = taosArrayGetSize(pStmt->mtb.tags);
size_t size = 1048576;
char *str = calloc(1, size);
size_t len = 0;
int32_t ret = 0;
int32_t j = 0;
while (1) {
len = (size_t)snprintf(str, size - 1, "insert into %s using %.*s tags(", name, pStmt->mtb.stbname.n, pStmt->mtb.stbname.z);
if (len >= (size -1)) {
size *= 2;
free(str);
str = calloc(1, size);
continue;
}
j = 0;
for (size_t i = 0; i < tagNum && len < (size - 1); ++i) {
SStrToken *t = taosArrayGet(pStmt->mtb.tags, i);
if (t->type == TK_QUESTION) {
int32_t l = 0;
if (i > 0) {
str[len++] = ',';
}
if (tags[j].is_null && (*tags[j].is_null)) {
ret = converToStr(str + len, TSDB_DATA_TYPE_NULL, NULL, -1, &l);
} else {
if (tags[j].buffer == NULL) {
free(str);
tscError("empty");
return TSDB_CODE_TSC_APP_ERROR;
}
ret = converToStr(str + len, tags[j].buffer_type, tags[j].buffer, tags[j].length ? *tags[j].length : -1, &l);
}
++j;
if (ret) {
free(str);
return ret;
}
len += l;
} else {
len += (size_t)snprintf(str + len, size - len - 1, i > 0 ? ",%.*s" : "%.*s", t->n, t->z);
}
}
if (len >= (size - 1)) {
size *= 2;
free(str);
str = calloc(1, size);
continue;
}
strcat(str, ") ");
len += 2;
if ((len + strlen(pStmt->mtb.values.z)) >= (size - 1)) {
size *= 2;
free(str);
str = calloc(1, size);
continue;
}
strcat(str, pStmt->mtb.values.z);
break;
}
free(pSql->sqlstr);
pSql->sqlstr = str;
return TSDB_CODE_SUCCESS;
}
////////////////////////////////////////////////////////////////////////////////
// interface functions
......@@ -1292,34 +1475,15 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
registerSqlObj(pSql);
int32_t ret = TSDB_CODE_SUCCESS;
if ((ret = tsInsertInitialCheck(pSql)) != TSDB_CODE_SUCCESS) {
int32_t ret = stmtParseInsertTbTags(pSql, pStmt);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
int32_t index = 0;
SStrToken sToken = tStrGetToken(pCmd->curSql, &index, false);
if (sToken.n == 0) {
return TSDB_CODE_TSC_INVALID_SQL;
}
if (sToken.n == 1 && sToken.type == TK_QUESTION) {
pStmt->multiTbInsert = true;
pStmt->mtb.tbname = sToken;
pStmt->mtb.nameSet = false;
if (pStmt->mtb.pTableHash == NULL) {
pStmt->mtb.pTableHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
}
if (pStmt->mtb.pTableBlockHashList == NULL) {
pStmt->mtb.pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
}
return TSDB_CODE_SUCCESS;
if (pStmt->multiTbInsert) {
return TSDB_CODE_SUCCESS;
}
pStmt->multiTbInsert = false;
memset(&pStmt->mtb, 0, sizeof(pStmt->mtb));
int32_t code = tsParseSql(pSql, true);
......@@ -1337,7 +1501,8 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
}
int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) {
int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags) {
STscStmt* pStmt = (STscStmt*)stmt;
SSqlObj* pSql = pStmt->pSql;
SSqlCmd* pCmd = &pSql->cmd;
......@@ -1384,9 +1549,23 @@ int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) {
tscDebug("0x%"PRIx64" table:%s is already prepared, uid:%" PRIu64, pSql->self, name, pStmt->mtb.currentUid);
return TSDB_CODE_SUCCESS;
}
if (pStmt->mtb.tagSet) {
pStmt->mtb.tbname = tscReplaceStrToken(&pSql->sqlstr, &pStmt->mtb.tbname, name);
} else {
if (tags == NULL) {
tscError("No tags set");
return TSDB_CODE_TSC_APP_ERROR;
}
int32_t ret = stmtGenInsertStatement(pSql, pStmt, name, tags);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
}
pStmt->mtb.tbname = tscReplaceStrToken(&pSql->sqlstr, &pStmt->mtb.tbname, name);
pStmt->mtb.nameSet = true;
pStmt->mtb.tagSet = true;
tscDebug("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr);
......@@ -1435,6 +1614,12 @@ int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) {
return code;
}
int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) {
return taos_stmt_set_tbname_tags(stmt, name, NULL);
}
int taos_stmt_close(TAOS_STMT* stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
if (!pStmt->isInsert) {
......@@ -1453,6 +1638,7 @@ int taos_stmt_close(TAOS_STMT* stmt) {
pStmt->mtb.pTableBlockHashList = tscDestroyBlockHashTable(pStmt->mtb.pTableBlockHashList, true);
taosHashCleanup(pStmt->pSql->cmd.pTableBlockHashList);
pStmt->pSql->cmd.pTableBlockHashList = NULL;
taosArrayDestroy(pStmt->mtb.tags);
}
}
......
......@@ -32,6 +32,67 @@
static void freeQueryInfoImpl(SQueryInfo* pQueryInfo);
static void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta);
int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *len) {
int32_t n = 0;
switch (type) {
case TSDB_DATA_TYPE_NULL:
n = sprintf(str, "null");
break;
case TSDB_DATA_TYPE_BOOL:
n = sprintf(str, (*(int8_t*)buf) ? "true" : "false");
break;
case TSDB_DATA_TYPE_TINYINT:
n = sprintf(str, "%d", *(int8_t*)buf);
break;
case TSDB_DATA_TYPE_SMALLINT:
n = sprintf(str, "%d", *(int16_t*)buf);
break;
case TSDB_DATA_TYPE_INT:
n = sprintf(str, "%d", *(int32_t*)buf);
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP:
n = sprintf(str, "%" PRId64, *(int64_t*)buf);
break;
case TSDB_DATA_TYPE_FLOAT:
n = sprintf(str, "%f", GET_FLOAT_VAL(buf));
break;
case TSDB_DATA_TYPE_DOUBLE:
n = sprintf(str, "%f", GET_DOUBLE_VAL(buf));
break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
if (bufSize < 0) {
tscError("invalid buf size");
return TSDB_CODE_TSC_INVALID_VALUE;
}
*str = '"';
memcpy(str + 1, buf, bufSize);
*(str + bufSize + 1) = '"';
n = bufSize + 2;
break;
default:
tscError("unsupported type:%d", type);
return TSDB_CODE_TSC_INVALID_VALUE;
}
*len = n;
return TSDB_CODE_SUCCESS;
}
static void tscStrToLower(char *str, int32_t n) {
if (str == NULL || n <= 0) { return;}
for (int32_t i = 0; i < n; i++) {
......
......@@ -112,6 +112,7 @@ typedef struct TAOS_MULTI_BIND {
TAOS_STMT *taos_stmt_init(TAOS *taos);
int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags);
int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name);
int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册