diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 611f52676ef2354ad89984a55855fae264fd42ca..2dd580dda00651d8d7d4d84505147a8f464d954c 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -62,7 +62,7 @@ typedef struct STableMeta { int8_t numOfVpeers; int16_t sversion; SVnodeDesc vpeerDesc[TSDB_VNODES_SUPPORT]; - int32_t vgid; // virtual group id, which current table belongs to + int32_t vgId; // virtual group id, which current table belongs to int32_t sid; // the index of one table in a virtual node uint64_t uid; // unique id of a table SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info @@ -182,7 +182,7 @@ typedef struct STableDataBlocks { char tableId[TSDB_TABLE_ID_LEN]; int8_t tsSource; // where does the UNIX timestamp come from, server or client bool ordered; // if current rows are ordered or not - int64_t vgid; // virtual group id + int64_t vgId; // virtual group id int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending int32_t numOfTables; // number of tables in current submit block diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index b96beb35fd654301203f0595465f6dbb685aab9c..8fb6b925efd01fc1e22be509895762868fe28bc6 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -698,7 +698,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(dataBuf->pData); tsSetBlockInfo(pBlocks, pTableMeta, numOfRows); - dataBuf->vgid = pTableMeta->vgid; + dataBuf->vgId = pTableMeta->vgId; dataBuf->numOfTables = 1; /* @@ -1058,7 +1058,6 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { goto _error_clean; } - void *fp = pSql->fp; ptrdiff_t pos = pSql->asyncTblPos - pSql->sqlstr; if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) { @@ -1068,17 +1067,15 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { * And during the getMeterMetaCallback function, the sql string will be parsed from the * interrupted position. */ - if (fp != NULL) { - if (TSDB_CODE_ACTION_IN_PROGRESS == code) { - tscTrace("async insert and waiting to get meter meta, then continue parse sql from offset: %" PRId64, pos); - return code; - } - - // todo add to return - tscError("async insert parse error, code:%d, %s", code, tstrerror(code)); - pSql->asyncTblPos = NULL; + if (TSDB_CODE_ACTION_IN_PROGRESS == code) { + tscTrace("async insert and waiting to get meter meta, then continue parse sql from offset: %" PRId64, pos); + return code; } + // todo add to return + tscError("async insert parse error, code:%d, %s", code, tstrerror(code)); + pSql->asyncTblPos = NULL; + goto _error_clean; // TODO: should _clean or _error_clean to async flow ???? } @@ -1096,15 +1093,13 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { goto _error_clean; } - int32_t numOfCols = tscGetNumOfTags(pTableMetaInfo->pTableMeta); STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta); if (sToken.type == TK_VALUES) { - SParsedDataColInfo spd = {.numOfCols = numOfCols}; + SParsedDataColInfo spd = {.numOfCols = tinfo.numOfColumns}; SSchema *pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta); - - tscSetAssignedColumnInfo(&spd, pSchema, numOfCols); + tscSetAssignedColumnInfo(&spd, pSchema, tinfo.numOfColumns); if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) { goto _error_clean; @@ -1243,7 +1238,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { // submit to more than one vnode if (pCmd->pDataBlocks->nSize > 0) { - // merge according to vgid + // merge according to vgId if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) { goto _error_clean; } diff --git a/src/client/src/tscSchemaUtil.c b/src/client/src/tscSchemaUtil.c index f4fb761ed9efa1057042e039f617252fda6f9a82..be0065ff4a0773c009513beb5bddefe80b67c3e1 100644 --- a/src/client/src/tscSchemaUtil.c +++ b/src/client/src/tscSchemaUtil.c @@ -165,7 +165,7 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size pTableMeta->sid = pTableMetaMsg->sid; pTableMeta->uid = pTableMetaMsg->uid; - pTableMeta->vgid = pTableMetaMsg->vgid; + pTableMeta->vgId = pTableMetaMsg->vgId; pTableMeta->numOfVpeers = pTableMetaMsg->numOfVpeers; memcpy(pTableMeta->vpeerDesc, pTableMetaMsg->vpeerDesc, sizeof(SVnodeDesc) * pTableMeta->numOfVpeers); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 8058d7f054b126fdc479c720ca0dc89d4766dff8..8fb3df94d5844734ec07ee83e50d193fc0e7c7fa 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -341,11 +341,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { * the tscShouldFreeAsyncSqlObj will success and tscFreeSqlObj free it immediately. */ bool shouldFree = tscShouldFreeAsyncSqlObj(pSql); - if (command == TSDB_SQL_INSERT) { // handle multi-vnode insertion situation - (*pSql->fp)(pSql, taosres, rpcMsg->code); - } else { - (*pSql->fp)(pSql->param, taosres, rpcMsg->code); - } + (*pSql->fp)(pSql->param, taosres, rpcMsg->code); if (shouldFree) { // If it is failed, all objects allocated during execution taos_connect_a should be released @@ -539,22 +535,27 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { char * pMsg, *pStart; SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta; pStart = pSql->cmd.payload + tsRpcHeadSize; pMsg = pStart; pShellMsg = (SShellSubmitMsg *)pMsg; - + + pShellMsg->desc.numOfVnodes = htonl(1); + pShellMsg->import = htons(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT) ? 0 : 1); - pShellMsg->vnode = 0; //htons(pTableMeta->vpeerDesc[pTableMeta->index].vnode); - pShellMsg->numOfSid = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted + pShellMsg->header.vgId = htonl(pTableMeta->vgId); + pShellMsg->header.contLen = htonl(pSql->cmd.payloadLen); + + pShellMsg->numOfTables = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted // pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; // tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pTableMeta->vpeerDesc[pTableMeta->index].ip), // htons(pShellMsg->vnode)); - pSql->cmd.payloadLen = sizeof(SShellSubmitMsg); +// pSql->cmd.payloadLen = sizeof(SShellSubmitMsg); return TSDB_CODE_SUCCESS; } @@ -676,7 +677,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->uid = pTableMeta->uid; pQueryMsg->numOfTagsCols = 0; - pQueryMsg->vgId = htonl(pTableMeta->vgid); + pQueryMsg->vgId = htonl(pTableMeta->vgId); tscTrace("%p queried tables:%d, table id: %s", pSql, 1, pTableMetaInfo->name); } else { // query on super table if (pTableMetaInfo->vnodeIndex < 0) { @@ -1849,12 +1850,12 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { pMetaMsg->sid = htonl(pMetaMsg->sid); pMetaMsg->sversion = htons(pMetaMsg->sversion); - pMetaMsg->vgid = htonl(pMetaMsg->vgid); + pMetaMsg->vgId = htonl(pMetaMsg->vgId); pMetaMsg->uid = htobe64(pMetaMsg->uid); pMetaMsg->contLen = htons(pMetaMsg->contLen); - if (pMetaMsg->sid < 0 || pMetaMsg->vgid < 0) { - tscError("invalid meter vgid:%d, sid%d", pMetaMsg->vgid, pMetaMsg->sid); + if (pMetaMsg->sid < 0 || pMetaMsg->vgId < 0) { + tscError("invalid meter vgId:%d, sid%d", pMetaMsg->vgId, pMetaMsg->sid); return TSDB_CODE_INVALID_VALUE; } @@ -1948,11 +1949,11 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) { pMeta->sid = htonl(pMeta->sid); pMeta->sversion = htons(pMeta->sversion); - pMeta->vgid = htonl(pMeta->vgid); + pMeta->vgId = htonl(pMeta->vgId); pMeta->uid = htobe64(pMeta->uid); - if (pMeta->sid <= 0 || pMeta->vgid < 0) { - tscError("invalid meter vgid:%d, sid%d", pMeta->vgid, pMeta->sid); + if (pMeta->sid <= 0 || pMeta->vgId < 0) { + tscError("invalid meter vgId:%d, sid%d", pMeta->vgId, pMeta->sid); pSql->res.code = TSDB_CODE_INVALID_VALUE; pSql->res.numOfTotal = i; return TSDB_CODE_OTHERS; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index c3cc6ec9a825bce9e34c3bfcc1499cae4ac97954..5d93e44c7712910c8a34805def9dfe9c2ed0aa82 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -130,7 +130,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con pSql->pTscObj = pObj; pSql->signature = pSql; tsem_init(&pSql->rspSem, 0, 0); -// tsem_init(&pSql->emptyRspSem, 0, 1); + pObj->pSql = pSql; pSql->fp = fp; pSql->param = param; @@ -146,6 +146,8 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con return NULL; } + // tsRpcHeaderSize will be updated during RPC initialization, so only after it initialization, this value is valid + tsInsertHeadSize = tsRpcHeadSize + sizeof(SShellSubmitMsg); return pObj; } diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index faf5dd94b8aeef33b86af5856ad0198e2050f571..c594e047152313cb0a1554ee6e2fe5b43e4541b1 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -34,7 +34,6 @@ void * pTscMgmtConn; void * pSlaveConn; void * tscCacheHandle; int32_t globalCode = 0; -int initialized = 0; int slaveIndex; void * tscTmr; void * tscQhandle; @@ -187,9 +186,7 @@ void taos_init_imp() { if (tscCacheHandle == NULL) tscCacheHandle = taosCacheInit(tscTmr, refreshTime); - initialized = 1; tscTrace("client is initialized successfully"); - tsInsertHeadSize = tsRpcHeadSize + sizeof(SShellSubmitMsg); } void taos_init() { pthread_once(&tscinit, taos_init_imp); } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index ffbddf31da864fe40e8bd13bec2b55c938f531a7..aa4e5c93aa7a067c00cc4221e574a34e66c56c1f 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -614,7 +614,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { */ pCmd->payloadLen = pDataBlock->nAllocSize - tsRpcHeadSize; - assert(pCmd->allocSize >= pCmd->payloadLen + tsRpcHeadSize + 100); + assert(pCmd->allocSize >= pCmd->payloadLen + tsRpcHeadSize + 100 && pCmd->payloadLen > 0); return TSDB_CODE_SUCCESS; } @@ -705,8 +705,9 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi STableDataBlocks* pOneTableBlock = pTableDataBlockList->pData[i]; STableDataBlocks* dataBuf = NULL; - int32_t ret = - tscGetDataBlockFromList(pVnodeDataBlockHashList, pVnodeDataBlockList, pOneTableBlock->vgid, TSDB_PAYLOAD_SIZE, + + int32_t ret = + tscGetDataBlockFromList(pVnodeDataBlockHashList, pVnodeDataBlockList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE, tsInsertHeadSize, 0, pOneTableBlock->tableId, pOneTableBlock->pTableMeta, &dataBuf); if (ret != TSDB_CODE_SUCCESS) { tscError("%p failed to prepare the data block buffer for merging table data, code:%d", pSql, ret); diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index 1deefd4f532df6eb6a4f14c5736b41f296c9e967..7ed731c953f0046f9918fc337746eb75e9eda1ce 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -275,7 +275,9 @@ static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) { pRsp->numOfRows = htonl(1); pRsp->affectedRows = htonl(1); pRsp->numOfFailedBlocks = 0; - + + // todo write to tsdb + SRpcMsg rpcRsp = { .handle = pMsg->rpcMsg.handle, .pCont = pRsp, diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 24f3a84227e1fd1a6397e3462180231442a5c5b1..467c2a2995ac15407364ed0413aa3979e4a43676 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -198,10 +198,20 @@ typedef struct { } SShellSubmitBlock; typedef struct { + int32_t numOfVnodes; +} SMsgDesc; + +typedef struct SMsgHead { + int32_t contLen; + int32_t vgId; +} SMsgHead; + +typedef struct { + SMsgDesc desc; + SMsgHead header; int16_t import; - int16_t vnode; - int32_t numOfSid; /* total number of sid */ - char blks[]; /* numOfSid blocks, each blocks for one table */ + int32_t numOfTables; // total number of sid + char blks[]; // number of data blocks, each table has at least one data block } SShellSubmitMsg; typedef struct { @@ -232,15 +242,6 @@ typedef struct { uint32_t ip; } SVnodeDesc; -typedef struct { - int32_t numOfVnodes; -} SMsgDesc; - -typedef struct { - int32_t contLen; - int32_t vgId; -} SMsgHead; - typedef struct { int32_t contLen; int32_t vgId; @@ -688,7 +689,7 @@ typedef struct STableMetaMsg { int8_t numOfVpeers; SVnodeDesc vpeerDesc[TSDB_VNODES_SUPPORT]; int32_t sid; - int32_t vgid; + int32_t vgId; uint64_t uid; SSchema schema[]; } STableMetaMsg; diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index c9fa6e36d8451e057e825a2e9fd458fa6c939d5b..d5f0e7c85378d2b775a323a3d1a1bad783088a3f 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -445,7 +445,7 @@ int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMetaMsg *pMeta, bool usePublicIp) { pMeta->uid = htobe64(pTable->uid); pMeta->sid = htonl(pTable->sid); - pMeta->vgid = htonl(pTable->vgId); + pMeta->vgId = htonl(pTable->vgId); pMeta->sversion = htons(pTable->superTable->sversion); pMeta->precision = pDb->cfg.precision; pMeta->numOfTags = pTable->superTable->numOfTags; diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index f0cfa06a2226b87bd70645ecffc887330a681a73..f076b69f36323f08c0e69195d6d1ab08b4ac4150 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -524,7 +524,7 @@ static int32_t mgmtSetSchemaFromNormalTable(SSchema *pSchema, SNormalTableObj *p int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, STableMetaMsg *pMeta, bool usePublicIp) { pMeta->uid = htobe64(pTable->uid); pMeta->sid = htonl(pTable->sid); - pMeta->vgid = htonl(pTable->vgId); + pMeta->vgId = htonl(pTable->vgId); pMeta->sversion = htons(pTable->sversion); pMeta->precision = pDb->cfg.precision; pMeta->numOfTags = 0; diff --git a/src/mnode/src/mgmtSuperTable.c b/src/mnode/src/mgmtSuperTable.c index df787ae697aafaa3ca3d8c19f7dc903b84936da7..bd697ade93288917e38bb1388049d98717bdd9c3 100644 --- a/src/mnode/src/mgmtSuperTable.c +++ b/src/mnode/src/mgmtSuperTable.c @@ -654,7 +654,7 @@ int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) { int32_t mgmtGetSuperTableMeta(SDbObj *pDb, SSuperTableObj *pTable, STableMetaMsg *pMeta, bool usePublicIp) { pMeta->uid = htobe64(pTable->uid); pMeta->sid = htonl(pTable->sid); - pMeta->vgid = htonl(pTable->vgId); + pMeta->vgId = htonl(pTable->vgId); pMeta->sversion = htons(pTable->sversion); pMeta->precision = pDb->cfg.precision; pMeta->numOfTags = pTable->numOfTags;