提交 005ff0ed 编写于 作者: D dapan

stmt auto create table

上级 1ea3afe7
......@@ -89,7 +89,7 @@ extern char *qtypeStr[];
#define TSDB_PORT_HTTP 11
#define TD_DEBUG_PRINT_ROW
#undef TD_DEBUG_PRINT_ROW
#ifdef __cplusplus
}
......
......@@ -26,8 +26,7 @@ extern "C" {
typedef struct SStmtCallback {
TAOS_STMT* pStmt;
int32_t (*getTbNameFn)(TAOS_STMT*, char**);
int32_t (*setBindInfoFn)(TAOS_STMT*, STableMeta*, void*);
int32_t (*setExecInfoFn)(TAOS_STMT*, SHashObj*, SHashObj*);
int32_t (*setInfoFn)(TAOS_STMT*, STableMeta*, void*, char*, bool, SHashObj*, SHashObj*);
int32_t (*getExecInfoFn)(TAOS_STMT*, SHashObj**, SHashObj**);
} SStmtCallback;
......@@ -89,8 +88,9 @@ int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash
int32_t qResetStmtDataBlock(void* block, bool keepBuf);
int32_t qCloneStmtDataBlock(void** pDst, void* pSrc);
void qFreeStmtDataBlock(void* pDataBlock);
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc);
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid);
void qDestroyStmtDataBlock(void* pBlock);
STableMeta *qGetTableMetaInDataBlock(void* pDataBlock);
int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen);
int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
......
......@@ -62,7 +62,8 @@ typedef struct SStmtBindInfo {
int8_t tbType;
bool tagsCached;
void* boundTags;
char* tbName;
char tbName[TSDB_TABLE_FNAME_LEN];;
char tbFName[TSDB_TABLE_FNAME_LEN];
SName sname;
} SStmtBindInfo;
......@@ -76,7 +77,7 @@ typedef struct SStmtExecInfo {
typedef struct SStmtSQLInfo {
STMT_TYPE type;
STMT_STATUS status;
bool autoCreate;
bool autoCreateTbl;
uint64_t runTimes;
SHashObj* pTableCache; //SHash<SStmtTableCache>
SQuery* pQuery;
......
......@@ -68,7 +68,7 @@ int32_t stmtGetTbName(TAOS_STMT *stmt, char **tbName) {
pStmt->sql.type = STMT_TYPE_MULTI_INSERT;
if (NULL == pStmt->bInfo.tbName) {
if ('\0' == pStmt->bInfo.tbName[0]) {
tscError("no table name set");
STMT_ERR_RET(TSDB_CODE_TSC_STMT_TBNAME_ERROR);
}
......@@ -121,9 +121,12 @@ int32_t stmtRestoreQueryFields(STscStmt* pStmt) {
return TSDB_CODE_SUCCESS;
}
int32_t stmtSetBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags) {
int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, char* tbFName) {
STscStmt* pStmt = (STscStmt*)stmt;
strncpy(pStmt->bInfo.tbFName, tbFName, sizeof(pStmt->bInfo.tbFName) - 1);
pStmt->bInfo.tbFName[sizeof(pStmt->bInfo.tbFName) - 1] = 0;
pStmt->bInfo.tbUid = pTableMeta->uid;
pStmt->bInfo.tbSuid = pTableMeta->suid;
pStmt->bInfo.tbType = pTableMeta->tableType;
......@@ -133,7 +136,7 @@ int32_t stmtSetBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags) {
return TSDB_CODE_SUCCESS;
}
int32_t stmtSetExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockHash) {
int32_t stmtUpdateExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockHash) {
STscStmt* pStmt = (STscStmt*)stmt;
pStmt->exec.pVgHash = pVgHash;
......@@ -142,6 +145,18 @@ int32_t stmtSetExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockHash
return TSDB_CODE_SUCCESS;
}
int32_t stmtUpdateInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, char* tbFName, bool autoCreateTbl, SHashObj* pVgHash, SHashObj* pBlockHash) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_ERR_RET(stmtUpdateBindInfo(stmt, pTableMeta, tags, tbFName));
STMT_ERR_RET(stmtUpdateExecInfo(stmt, pVgHash, pBlockHash));
pStmt->sql.autoCreateTbl = autoCreateTbl;
return TSDB_CODE_SUCCESS;
}
int32_t stmtGetExecInfo(TAOS_STMT* stmt, SHashObj** pVgHash, SHashObj** pBlockHash) {
STscStmt* pStmt = (STscStmt*)stmt;
......@@ -157,13 +172,13 @@ int32_t stmtCacheBlock(STscStmt *pStmt) {
}
uint64_t uid = pStmt->bInfo.tbUid;
uint64_t tuid = (TSDB_CHILD_TABLE == pStmt->bInfo.tbType) ? pStmt->bInfo.tbSuid : uid;
uint64_t cacheUid = (TSDB_CHILD_TABLE == pStmt->bInfo.tbType) ? pStmt->bInfo.tbSuid : uid;
if (taosHashGet(pStmt->sql.pTableCache, &tuid, sizeof(tuid))) {
if (taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid))) {
return TSDB_CODE_SUCCESS;
}
STableDataBlocks** pSrc = taosHashGet(pStmt->exec.pBlockHash, &uid, sizeof(uid));
STableDataBlocks** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
STableDataBlocks* pDst = NULL;
STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc));
......@@ -173,7 +188,7 @@ int32_t stmtCacheBlock(STscStmt *pStmt) {
.boundTags = pStmt->bInfo.boundTags,
};
if (taosHashPut(pStmt->sql.pTableCache, &tuid, sizeof(tuid), &cache, sizeof(cache))) {
if (taosHashPut(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid), &cache, sizeof(cache))) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -186,8 +201,7 @@ int32_t stmtParseSql(STscStmt* pStmt) {
SStmtCallback stmtCb = {
.pStmt = pStmt,
.getTbNameFn = stmtGetTbName,
.setBindInfoFn = stmtSetBindInfo,
.setExecInfoFn = stmtSetExecInfo,
.setInfoFn = stmtUpdateInfo,
.getExecInfoFn = stmtGetExecInfo,
};
......@@ -222,7 +236,8 @@ int32_t stmtCleanBindInfo(STscStmt* pStmt) {
pStmt->bInfo.tbType = 0;
pStmt->bInfo.needParse = true;
taosMemoryFreeClear(pStmt->bInfo.tbName);
pStmt->bInfo.tbName[0] = 0;
pStmt->bInfo.tbFName[0] = 0;
if (!pStmt->bInfo.tagsCached) {
destroyBoundColumnInfo(pStmt->bInfo.boundTags);
taosMemoryFreeClear(pStmt->bInfo.boundTags);
......@@ -237,12 +252,14 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool freeRequest) {
pStmt->exec.pRequest = NULL;
}
size_t keyLen = 0;
void *pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
while (pIter) {
STableDataBlocks* pBlocks = *(STableDataBlocks**)pIter;
uint64_t *key = taosHashGetKey(pIter, NULL);
char *key = taosHashGetKey(pIter, &keyLen);
STableMeta* pMeta = qGetTableMetaInDataBlock(pBlocks);
if (keepTable && (*key == pStmt->bInfo.tbUid)) {
if (keepTable && (pMeta->uid == pStmt->bInfo.tbUid)) {
STMT_ERR_RET(qResetStmtDataBlock(pBlocks, true));
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
......@@ -250,7 +267,7 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool freeRequest) {
}
qFreeStmtDataBlock(pBlocks);
taosHashRemove(pStmt->exec.pBlockHash, key, sizeof(*key));
taosHashRemove(pStmt->exec.pBlockHash, key, keyLen);
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
}
......@@ -322,6 +339,7 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
uint64_t suid = pTableMeta->suid;
int8_t tableType = pTableMeta->tableType;
taosMemoryFree(pTableMeta);
uint64_t cacheUid = (TSDB_CHILD_TABLE == tableType) ? suid : uid;
if (uid == pStmt->bInfo.tbUid) {
pStmt->bInfo.needParse = false;
......@@ -329,10 +347,10 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
return TSDB_CODE_SUCCESS;
}
if (taosHashGet(pStmt->exec.pBlockHash, &uid, sizeof(uid))) {
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &uid, sizeof(uid));
if (taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName))) {
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid));
if (NULL == pCache) {
tscError("table uid %" PRIx64 "found in exec blockHash, but not in sql blockHash", uid);
tscError("table [%s, %" PRIx64 ", %" PRIx64 "] found in exec blockHash, but not in sql blockHash", pStmt->bInfo.tbFName, uid, cacheUid);
STMT_ERR_RET(TSDB_CODE_TSC_APP_ERROR);
}
......@@ -348,7 +366,7 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
return TSDB_CODE_SUCCESS;
}
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &uid, sizeof(uid));
SStmtTableCache* pCache = taosHashGet(pStmt->sql.pTableCache, &cacheUid, sizeof(cacheUid));
if (pCache) {
pStmt->bInfo.needParse = false;
......@@ -359,9 +377,9 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
pStmt->bInfo.tagsCached = true;
STableDataBlocks* pNewBlock = NULL;
STMT_ERR_RET(qRebuildStmtDataBlock(&pNewBlock, pCache->pDataBlock));
STMT_ERR_RET(qRebuildStmtDataBlock(&pNewBlock, pCache->pDataBlock, uid));
if (taosHashPut(pStmt->exec.pBlockHash, &pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid), &pNewBlock, POINTER_BYTES)) {
if (taosHashPut(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName), &pNewBlock, POINTER_BYTES)) {
STMT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
......@@ -449,12 +467,13 @@ int stmtSetTbName(TAOS_STMT *stmt, const char *tbName) {
}
STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb, pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
tNameExtractFullName(&pStmt->bInfo.sname, pStmt->bInfo.tbFName);
STMT_ERR_RET(stmtGetFromCache(pStmt));
if (pStmt->bInfo.needParse) {
taosMemoryFree(pStmt->bInfo.tbName);
pStmt->bInfo.tbName = strdup(tbName);
strncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName) - 1);
pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0;
}
return TSDB_CODE_SUCCESS;
......@@ -465,15 +484,15 @@ int stmtSetTbTags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags) {
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
if (!pStmt->bInfo.needParse) {
if (!pStmt->bInfo.needParse) { // table already cached, no need create table
return TSDB_CODE_SUCCESS;
}
STMT_ERR_RET(stmtParseSql(pStmt));
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid));
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
if (NULL == pDataBlock) {
tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid);
tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
......@@ -489,9 +508,9 @@ int32_t stmtFetchTagFields(STscStmt* pStmt, int32_t *fieldNum, TAOS_FIELD** fiel
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
}
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid));
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
if (NULL == pDataBlock) {
tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid);
tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
......@@ -506,9 +525,9 @@ int32_t stmtFetchColFields(STscStmt* pStmt, int32_t *fieldNum, TAOS_FIELD** fiel
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
}
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid));
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
if (NULL == pDataBlock) {
tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid);
tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
......@@ -552,9 +571,9 @@ int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int32_t colIdx) {
STMT_RET(qStmtBindParam(pStmt->sql.pQueryPlan, bind, colIdx, pStmt->exec.pRequest->requestId));
}
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, (const char*)&pStmt->bInfo.tbUid, sizeof(pStmt->bInfo.tbUid));
STableDataBlocks **pDataBlock = (STableDataBlocks**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
if (NULL == pDataBlock) {
tscError("table uid %" PRIx64 "not found in exec blockHash", pStmt->bInfo.tbUid);
tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
STMT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
......
......@@ -20,6 +20,7 @@
#include "mndShow.h"
#include "mndStb.h"
#include "mndUser.h"
#include "mndDnode.h"
#include "tglobal.h"
#include "version.h"
......
......@@ -137,7 +137,7 @@ void destroyBlockArrayList(SArray* pDataBlockList);
void destroyBlockHashmap(SHashObj* pDataBlockHash);
int initRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataColInfo *pColInfo);
int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows);
int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
int32_t getDataBlockFromList(SHashObj* pHashList, void* id, int32_t idLen, int32_t size, int32_t startOffset, int32_t rowSize,
STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList, SVCreateTbReq* pCreateTbReq);
int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** pVgDataBlocks);
int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq);
......
......@@ -237,13 +237,8 @@ static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, con
return code;
}
static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SToken* pTname, bool isStb) {
static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SName* name, char *dbFname, bool isStb) {
SParseContext* pBasicCtx = pCxt->pComCxt;
SName name = {0};
createSName(&name, pTname, pBasicCtx->acctId, pBasicCtx->db, &pCxt->msg);
char dbFname[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(&name, dbFname);
bool pass = false;
CHECK_CODE(catalogChkAuth(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pBasicCtx->pUser, dbFname, AUTH_TYPE_WRITE, &pass));
......@@ -251,22 +246,22 @@ static int32_t getTableMetaImpl(SInsertParseContext* pCxt, SToken* pTname, bool
return TSDB_CODE_PAR_PERMISSION_DENIED;
}
if (isStb) {
CHECK_CODE(catalogGetSTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name,
CHECK_CODE(catalogGetSTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, name,
&pCxt->pTableMeta));
} else {
CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name,
CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, name,
&pCxt->pTableMeta));
SVgroupInfo vg;
CHECK_CODE(
catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &vg));
catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, name, &vg));
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
}
return TSDB_CODE_SUCCESS;
}
static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) { return getTableMetaImpl(pCxt, pTname, false); }
static int32_t getTableMeta(SInsertParseContext* pCxt, SName* name, char *dbFname) { return getTableMetaImpl(pCxt, name, dbFname, false); }
static int32_t getSTableMeta(SInsertParseContext* pCxt, SToken* pTname) { return getTableMetaImpl(pCxt, pTname, true); }
static int32_t getSTableMeta(SInsertParseContext* pCxt, SName* name, char *dbFname) { return getTableMetaImpl(pCxt, name, dbFname, true); }
static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) {
while (start < end) {
......@@ -842,7 +837,7 @@ static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, SName*
catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pTableName, &vg));
CHECK_CODE(taosHashPut(pCxt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)));
pMeta->uid = tGenIdPI64();
pMeta->uid = 0;
pMeta->vgId = vg.vgId;
STableMeta* pBackup = NULL;
......@@ -853,11 +848,7 @@ static int32_t storeTableMeta(SInsertParseContext* pCxt, SHashObj* pHash, SName*
}
// pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken) {
SName name;
createSName(&name, pTbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(&name, tbFName);
static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tbFName) {
int32_t len = strlen(tbFName);
STableMeta** pMeta = taosHashGet(pCxt->pSubTableHashObj, tbFName, len);
if (NULL != pMeta) {
......@@ -867,11 +858,17 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken)
SToken sToken;
// pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
NEXT_TOKEN(pCxt->pSql, sToken);
CHECK_CODE(getSTableMeta(pCxt, &sToken));
SName sname;
createSName(&sname, &sToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
char stbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(&sname, stbFName);
CHECK_CODE(getSTableMeta(pCxt, &sname, stbFName));
if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) {
return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed");
}
CHECK_CODE(storeTableMeta(pCxt, pCxt->pSubTableHashObj, &name, tbFName, len, pCxt->pTableMeta));
CHECK_CODE(storeTableMeta(pCxt, pCxt->pSubTableHashObj, name, tbFName, len, pCxt->pTableMeta));
SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta);
setBoundColumnInfo(&pCxt->tags, pTagsSchema, getNumOfTags(pCxt->pTableMeta));
......@@ -891,7 +888,7 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken)
if (TK_NK_LP != sToken.type) {
return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z);
}
CHECK_CODE(parseTagsClause(pCxt, pCxt->pTableMeta->schema, getTableInfo(pCxt->pTableMeta).precision, name.tname));
CHECK_CODE(parseTagsClause(pCxt, pCxt->pTableMeta->schema, getTableInfo(pCxt->pTableMeta).precision, name->tname));
NEXT_TOKEN(pCxt->pSql, sToken);
if (TK_NK_RP != sToken.type) {
return buildSyntaxErrMsg(&pCxt->msg, ") is expected", sToken.z);
......@@ -1066,6 +1063,8 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) {
// [...];
static int32_t parseInsertBody(SInsertParseContext* pCxt) {
int32_t tbNum = 0;
char tbFName[TSDB_TABLE_FNAME_LEN];
bool autoCreateTbl = false;
// for each table
while (1) {
......@@ -1095,6 +1094,8 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
sToken.z = tbName;
sToken.n = strlen(tbName);
autoCreateTbl = true;
} else {
return buildSyntaxErrMsg(&pCxt->msg, "? only used in stmt", sToken.z);
}
......@@ -1103,16 +1104,20 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
SToken tbnameToken = sToken;
NEXT_TOKEN(pCxt->pSql, sToken);
SName name;
createSName(&name, &tbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
tNameExtractFullName(&name, tbFName);
// USING cluase
if (TK_USING == sToken.type) {
CHECK_CODE(parseUsingClause(pCxt, &tbnameToken));
CHECK_CODE(parseUsingClause(pCxt, &name, tbFName));
NEXT_TOKEN(pCxt->pSql, sToken);
} else {
CHECK_CODE(getTableMeta(pCxt, &tbnameToken));
CHECK_CODE(getTableMeta(pCxt, &name, tbFName));
}
STableDataBlocks* dataBuf = NULL;
CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, pCxt->pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, tbFName, strlen(tbFName), TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta,
&dataBuf, NULL, &pCxt->createTblReq));
......@@ -1154,10 +1159,9 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
memcpy(tags, &pCxt->tags, sizeof(pCxt->tags));
(*pCxt->pStmtCb->setBindInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pTableMeta, tags);
memset(&pCxt->tags, 0, sizeof(pCxt->tags));
(*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pTableMeta, tags, tbFName, autoCreateTbl, pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj);
(*pCxt->pStmtCb->setExecInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj);
memset(&pCxt->tags, 0, sizeof(pCxt->tags));
pCxt->pVgroupsHashObj = NULL;
pCxt->pTableBlockHashObj = NULL;
pCxt->pTableMeta = NULL;
......@@ -1194,7 +1198,7 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
&context.pTableBlockHashObj);
} else {
context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
context.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
context.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false);
}
if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pSubTableHashObj ||
......@@ -1673,7 +1677,7 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SArray *cols
buildCreateTbReq(&smlHandle->createTblReq, tableName, row, pTableMeta->suid);
STableDataBlocks* pDataBlock = NULL;
ret = getDataBlockFromList(smlHandle->pBlockHash, pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
ret = getDataBlockFromList(smlHandle->pBlockHash, &pTableMeta->uid, sizeof(pTableMeta->uid), TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SSubmitBlk), getTableInfo(pTableMeta).rowSize, pTableMeta,
&pDataBlock, NULL, &smlHandle->createTblReq);
if(ret != TSDB_CODE_SUCCESS){
......
......@@ -185,11 +185,11 @@ int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq)
return TSDB_CODE_SUCCESS;
}
int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
int32_t getDataBlockFromList(SHashObj* pHashList, void* id, int32_t idLen, int32_t size, int32_t startOffset, int32_t rowSize,
STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList,
SVCreateTbReq* pCreateTbReq) {
*dataBlocks = NULL;
STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id));
STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)id, idLen);
if (t1 != NULL) {
*dataBlocks = *t1;
}
......@@ -207,7 +207,7 @@ int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int3
}
}
taosHashPut(pHashList, (const char*)&id, sizeof(int64_t), (char*)dataBlocks, POINTER_BYTES);
taosHashPut(pHashList, (const char*)id, idLen, (char*)dataBlocks, POINTER_BYTES);
if (pBlockList) {
taosArrayPush(pBlockList, dataBlocks);
}
......@@ -457,7 +457,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p
STableDataBlocks* dataBuf = NULL;
pOneTableBlock->pTableMeta->vgId = pOneTableBlock->vgId; // for schemaless, restore origin vgId
int32_t ret =
getDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE, INSERT_HEAD_SIZE, 0,
getDataBlockFromList(pVnodeDataBlockHashList, &pOneTableBlock->vgId, sizeof(pOneTableBlock->vgId), TSDB_PAYLOAD_SIZE, INSERT_HEAD_SIZE, 0,
pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList, NULL);
if (ret != TSDB_CODE_SUCCESS) {
taosHashCleanup(pVnodeDataBlockHashList);
......@@ -620,7 +620,7 @@ int32_t qCloneStmtDataBlock(void** pDst, void* pSrc) {
return qResetStmtDataBlock(*pDst, false);
}
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc) {
int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc, uint64_t uid) {
int32_t code = qCloneStmtDataBlock(pDst, pSrc);
if (code) {
return code;
......@@ -633,11 +633,19 @@ int32_t qRebuildStmtDataBlock(void** pDst, void* pSrc) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (pBlock->pTableMeta) {
pBlock->pTableMeta->uid = uid;
}
memset(pBlock->pData, 0, sizeof(SSubmitBlk));
return TSDB_CODE_SUCCESS;
}
STableMeta *qGetTableMetaInDataBlock(void* pDataBlock) {
return ((STableDataBlocks*)pDataBlock)->pTableMeta;
}
void qFreeStmtDataBlock(void* pDataBlock) {
if (pDataBlock == NULL) {
return;
......
......@@ -48,10 +48,18 @@ int64_t tGenIdPI64(void) {
}
}
int64_t ts = taosGetTimestampMs();
uint64_t pid = taosGetPId();
int32_t val = atomic_add_fetch_32(&tUUIDSerialNo, 1);
int64_t id;
while (true) {
int64_t ts = taosGetTimestampMs();
uint64_t pid = taosGetPId();
int32_t val = atomic_add_fetch_32(&tUUIDSerialNo, 1);
id = ((tUUIDHashId & 0x07FF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
if (id) {
break;
}
}
int64_t id = ((tUUIDHashId & 0x07FF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
return id;
}
......@@ -2988,7 +2988,7 @@ void prepareCheckResultImpl(TAOS * taos, char *tname, bool printr, int expec
}
} else {
printf("!!!expect rows %d mis-match rows %d fetched from %s\n", expected, rows, tname);
exit(1);
//exit(1);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册