diff --git a/documentation/webdocs/markdowndocs/TAOS SQL-ch.md b/documentation/webdocs/markdowndocs/TAOS SQL-ch.md index 0be39ab8e46c2836bd804563f7401d8614c04c1e..343ce80422ea3d973907adbe438bfdd3021b2ddb 100644 --- a/documentation/webdocs/markdowndocs/TAOS SQL-ch.md +++ b/documentation/webdocs/markdowndocs/TAOS SQL-ch.md @@ -480,9 +480,9 @@ TDengine支持针对数据的聚合查询。提供支持的聚合和选择函数 - **LEASTSQUARES** ```mysql - SELECT LEASTSQUARES(field_name) FROM tb_name [WHERE clause] + SELECT LEASTSQUARES(field_name, start_val, step_val) FROM tb_name [WHERE clause] ``` - 功能说明:统计表中某列的值是主键(时间戳)的拟合直线方程。 + 功能说明:统计表中某列的值是主键(时间戳)的拟合直线方程。start_val是自变量初始值,step_val是自变量的步长值。 返回结果数据类型:字符串表达式(斜率, 截距)。 应用字段:不能应用在timestamp、binary、nchar、bool类型字段。 说明:自变量是时间戳,因变量是该列的值。 diff --git a/documentation/webdocs/markdowndocs/TAOS SQL.md b/documentation/webdocs/markdowndocs/TAOS SQL.md index 72e41dbec49c29b51515e5f1dd56030cf200edab..c0d35e9afcbf5853273f0db4c2181a0075f1e844 100644 --- a/documentation/webdocs/markdowndocs/TAOS SQL.md +++ b/documentation/webdocs/markdowndocs/TAOS SQL.md @@ -412,7 +412,7 @@ TDengine supports aggregations over numerical values, they are listed below: SELECT PERCENTILE(field_name, P) FROM { tb_name | stb_name } [WHERE clause] ``` Function: the value of the specified column below which `P` percent of the data points fall. - Return Data Type: the same data type. + Return Data Type: double. Applicable Data Types: all types except `timestamp`, `binary`, `nchar`, `bool`. Applied to: table/STable. Note: The range of `P` is `[0, 100]`. When `P=0` , `PERCENTILE` returns the equal value as `MIN`; when `P=100`, `PERCENTILE` returns the equal value as `MAX`. @@ -446,7 +446,7 @@ TDengine supports aggregations over numerical values, they are listed below: SELECT SPREAD(field_name) FROM { tb_name | stb_name } [WHERE clause] ``` Function: return the difference between the maximum and the mimimum value. - Return Data Type: the same data type. + Return Data Type: double. Applicable Data Types: all types except `timestamp`, `binary`, `nchar`, `bool`. Applied to: table/STable. Note: spread gives the range of data variation in a table/supertable; it is equivalent to `MAX()` - `MIN()` diff --git a/src/client/inc/tscLog.h b/src/client/inc/tscLog.h index c3959517422103c2bf7013ffe4f80de56f2e8414..94adcfe17ade52ef912b023924af49accfc07b6a 100644 --- a/src/client/inc/tscLog.h +++ b/src/client/inc/tscLog.h @@ -31,9 +31,7 @@ extern int32_t tscEmbedded; #define tscInfo(...) { if (cDebugFlag & DEBUG_INFO) { taosPrintLog("TSC INFO ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }} #define tscDebug(...) { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSC DEBUG ", cDebugFlag, __VA_ARGS__); }} #define tscTrace(...) { if (cDebugFlag & DEBUG_TRACE) { taosPrintLog("TSC TRACE ", cDebugFlag, __VA_ARGS__); }} - -#define tscDebugDump(...) { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLongString("TSC DEBUG ", cDebugFlag, __VA_ARGS__); }} -#define tscTraceDump(...) { if (cDebugFlag & DEBUG_TRACE) { taosPrintLongString("TSC TRACE ", cDebugFlag, __VA_ARGS__); }} +#define tscDebugL(...){ if (cDebugFlag & DEBUG_DEBUG) { taosPrintLongString("TSC DEBUG ", cDebugFlag, __VA_ARGS__); }} #ifdef __cplusplus } diff --git a/src/client/src/TSDBJNIConnector.c b/src/client/src/TSDBJNIConnector.c index d070fad11b47cafd5344e761b3da3afefb052bce..eb9b1cb479ca2d81889f94311c3423f97e1626a3 100644 --- a/src/client/src/TSDBJNIConnector.c +++ b/src/client/src/TSDBJNIConnector.c @@ -29,9 +29,6 @@ #define jniDebug(...) { if (jniDebugFlag & DEBUG_DEBUG) { taosPrintLog("JNI DEBUG ", jniDebugFlag, __VA_ARGS__); }} #define jniTrace(...) { if (jniDebugFlag & DEBUG_TRACE) { taosPrintLog("JNI TRACE ", jniDebugFlag, __VA_ARGS__); }} -#define jniDebugDump(...) { if (jniDebugFlag & DEBUG_DEBUG) { taosPrintLongString("JNI DEBUG ", jniDebugFlag, __VA_ARGS__); }} -#define jniTraceDump(...) { if (jniDebugFlag & DEBUG_TRACE) { taosPrintLongString("JNI DEBUG ", jniDebugFlag, __VA_ARGS__); }} - int __init = 0; JavaVM *g_vm = NULL; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 3fed3e4d6719d508ec629a215b3d0033f2a6eb27..da24e3691a43b887afe71fbb02eb526fde70f6b0 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -55,7 +55,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const strtolower(pSql->sqlstr, sqlstr); - tscDebugDump("%p SQL: %s", pSql, pSql->sqlstr); + tscDebugL("%p SQL: %s", pSql, pSql->sqlstr); pSql->cmd.curSql = pSql->sqlstr; int32_t code = tsParseSql(pSql, true); @@ -471,7 +471,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { } // in case of insert, redo parsing the sql string and build new submit data block for two reasons: - // 1. the table Id(tid & uid) may have been update, the submit block needs to be updated + // 1. the table Id(tid & uid) may have been update, the submit block needs to be updated accordingly. // 2. vnode may need the schema information along with submit block to update its local table schema. if (pCmd->command == TSDB_SQL_INSERT) { tscDebug("%p redo parse sql string to build submit block", pSql); diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index 22b6be1c57657177279d423cbbac2844031e7dad..9f0d1a26abd93f8dfb52db0e03feb20529c07904 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -538,7 +538,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) { pRes->numOfRows = 1; strtolower(pSql->sqlstr, sql); - tscDebugDump("%p SQL: %s", pSql, pSql->sqlstr); + tscDebugL("%p SQL: %s", pSql, pSql->sqlstr); if (tscIsInsertData(pSql->sqlstr)) { pStmt->isInsert = true; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 22e91bd658d5769e80163dea3388e567294ae6c5..ea88e4a935159d59a439d0f0218ef247803ea1ed 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -4490,7 +4490,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { pUpdateMsg->tid = htonl(pTableMeta->sid); pUpdateMsg->uid = htobe64(pTableMeta->uid); pUpdateMsg->colId = htons(pTagsSchema->colId); - pUpdateMsg->type = htons(pTagsSchema->type); + pUpdateMsg->type = pTagsSchema->type; pUpdateMsg->bytes = htons(pTagsSchema->bytes); pUpdateMsg->tversion = htons(pTableMeta->tversion); pUpdateMsg->numOfTags = htons(numOfTags); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index d73983e77c704e66c1c0cf491c6b8205d749a570..584e70ba6c5cee0527af66003e1bf21797bbf93a 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -247,7 +247,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { } else { STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); if (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID || - rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { + rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) { if (pCmd->command == TSDB_SQL_CONNECT) { rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; rpcFreeCont(rpcMsg->pCont); @@ -260,7 +260,12 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { // get table meta query will not retry, do nothing } else { tscWarn("%p it shall renew table meta, code:%s, retry:%d", pSql, tstrerror(rpcMsg->code), ++pSql->retry); - + + // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema + if (rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) { + pSql->cmd.submitSchema = 1; + } + pSql->res.code = rpcMsg->code; // keep the previous error code if (pSql->retry > pSql->maxRetry) { tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry); diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index a6d111dbc7e7f050863f148d8d27a6b2859777aa..cc703e0f9338f50f765d6c1679ca2f541930a1d8 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -503,7 +503,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p } strtolower(pSql->sqlstr, sqlstr); - tscDebugDump("%p SQL: %s", pSql, pSql->sqlstr); + tscDebugL("%p SQL: %s", pSql, pSql->sqlstr); tsem_init(&pSql->rspSem, 0, 0); int32_t code = tsParseSql(pSql, true); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 9b6eff71232f609c18d02ab34c0c3d2ccf76312d..791073f7a8b5f00af93b5c35fc1f778907471537 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -579,9 +579,9 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo int32_t numOfCols = tscGetNumOfColumns(pTableDataBlock->pTableMeta); for(int32_t j = 0; j < numOfCols; ++j) { STColumn* pCol = (STColumn*) pDataBlock; - pCol->colId = pSchema[j].colId; + pCol->colId = htons(pSchema[j].colId); pCol->type = pSchema[j].type; - pCol->bytes = pSchema[j].bytes; + pCol->bytes = htons(pSchema[j].bytes); pCol->offset = 0; pDataBlock += sizeof(STColumn); @@ -663,7 +663,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { } SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData; - int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize; + int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta); if (dataBuf->nAllocSize < destSize) { while (dataBuf->nAllocSize < destSize) { @@ -691,7 +691,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { tscDebug("%p tableId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableId, pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(ekey)); - int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + expandSize); + int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + expandSize) + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta); pBlocks->tid = htonl(pBlocks->tid); pBlocks->uid = htobe64(pBlocks->uid); diff --git a/src/common/inc/tulog.h b/src/common/inc/tulog.h index 63c838be695fc25722c9c7416872f809bb706d21..6365b21ef96483f2c774b1af618ebf356e2d1677 100644 --- a/src/common/inc/tulog.h +++ b/src/common/inc/tulog.h @@ -32,9 +32,6 @@ extern int32_t tscEmbedded; #define uDebug(...) { if (uDebugFlag & DEBUG_DEBUG) { taosPrintLog("UTL DEBUG ", uDebugFlag, __VA_ARGS__); }} #define uTrace(...) { if (uDebugFlag & DEBUG_TRACE) { taosPrintLog("UTL TRACE ", uDebugFlag, __VA_ARGS__); }} -#define uDebugDump(...) { if (uDebugFlag & DEBUG_DEBUG) { taosPrintLongString("UTL DEBUG ", uDebugFlag, __VA_ARGS__); }} -#define uTraceDump(...) { if (uDebugFlag & DEBUG_TRACE) { taosPrintLongString("UTL TRACE ", uDebugFlag, __VA_ARGS__); }} - #define pError(...) { taosPrintLog("APP ERROR ", 255, __VA_ARGS__); } #define pPrint(...) { taosPrintLog("APP INFO ", 255, __VA_ARGS__); } diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 67c104878a9c924181e1d2cbb0883bf7dd366ebe..684fb71af925202dde1954e1ec7fdb6a8b3fdf3c 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -1210,7 +1210,7 @@ void taosInitGlobalCfg() { } bool taosCheckGlobalCfg() { - if (debugFlag & DEBUG_TRACE || debugFlag & DEBUG_DEBUG) { + if (debugFlag & DEBUG_TRACE || debugFlag & DEBUG_DEBUG || debugFlag & DEBUG_DUMP) { taosSetAllDebugFlag(); } diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index b20e6c9749bedbef99777b1ff2d8e6c3372b3706..546e8cecb9dc166687d10e078be764d4f93baa21 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -210,6 +210,7 @@ static void *dnodeProcessWriteQueue(void *param) { int32_t numOfMsgs; int type; void *pVnode, *item; + SRspRet *pRspRet; dDebug("write worker:%d is running", pWorker->workerId); @@ -222,9 +223,11 @@ static void *dnodeProcessWriteQueue(void *param) { for (int32_t i = 0; i < numOfMsgs; ++i) { pWrite = NULL; + pRspRet = NULL; taosGetQitem(pWorker->qall, &type, &item); if (type == TAOS_QTYPE_RPC) { pWrite = (SWriteMsg *)item; + pRspRet = &pWrite->rspRet; pHead = (SWalHead *)(pWrite->pCont - sizeof(SWalHead)); pHead->msgType = pWrite->rpcMsg.msgType; pHead->version = 0; @@ -234,7 +237,7 @@ static void *dnodeProcessWriteQueue(void *param) { pHead = (SWalHead *)item; } - int32_t code = vnodeProcessWrite(pVnode, type, pHead, item); + int32_t code = vnodeProcessWrite(pVnode, type, pHead, pRspRet); if (pWrite) pWrite->rpcMsg.code = code; } @@ -247,6 +250,11 @@ static void *dnodeProcessWriteQueue(void *param) { if (type == TAOS_QTYPE_RPC) { pWrite = (SWriteMsg *)item; dnodeSendRpcVnodeWriteRsp(pVnode, item, pWrite->rpcMsg.code); + } else if (type == TAOS_QTYPE_FWD) { + pHead = (SWalHead *)item; + vnodeConfirmForward(pVnode, pHead->version, 0); + taosFreeQitem(item); + vnodeRelease(pVnode); } else { taosFreeQitem(item); vnodeRelease(pVnode); diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 4a7d86c43426bcbbde18e52e2bedf6b5d56fab48..fc433463c5e522edfcf825b2682064651dff0c6d 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -180,7 +180,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_DISK_PERMISSIONS, 0, 0x0506, "vnode no d TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR, 0, 0x0507, "vnode no such file or directory") TAOS_DEFINE_ERROR(TSDB_CODE_VND_OUT_OF_MEMORY, 0, 0x0508, "vnode out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_VND_APP_ERROR, 0, 0x0509, "vnode app error") -TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0214, "vnode no write auth") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_STATUS, 0, 0x0510, "vnode not in ready state") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, 0, 0x0511, "vnode not in synced state") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, 0, 0x0512, "vnode no write auth") // tsdb TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, 0, 0x0600, "tsdb invalid table id") @@ -200,6 +202,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_ACTION, 0, 0x060D, "tsdb inval TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_CREATE_TB_MSG, 0, 0x060E, "tsdb invalid create table message") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM, 0, 0x060F, "tsdb no table data in memory skiplist") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_FILE_ALREADY_EXISTS, 0, 0x0610, "tsdb file already exists") +TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECONFIGURE, 0, 0x0611, "tsdb need to reconfigure table") // query TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, 0, 0x0700, "query invalid handle") diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 184979797026d45e627f8d39716b8d3827c83ec7..8c987b6ef18750c2f422bd53684cc6fcd89ef0c7 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -203,8 +203,7 @@ typedef struct SSubmitBlk { typedef struct SSubmitMsg { SMsgHead header; int32_t length; - int32_t compressed : 2; - int32_t numOfBlocks : 30; + int32_t numOfBlocks; SSubmitBlk blocks[]; } SSubmitMsg; @@ -285,7 +284,7 @@ typedef struct { int32_t tid; int16_t tversion; int16_t colId; - int16_t type; + int8_t type; int16_t bytes; int32_t tagValLen; int16_t numOfTags; diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 6a6341a5225ae2767b4d756b05d7cbfde3ba403d..b8cc1768e85376182a8a57adbc5d159485151048 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -115,7 +115,7 @@ STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg); int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg); int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId); -int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg); +int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg); TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid); void tsdbStartStream(TSDB_REPO_T *repo); diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 151ea3f56352272a1c852c476983a97e2a424593..1e6cfa97006e110f5ce1e36073f165540adeda26 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -60,6 +60,7 @@ void* vnodeGetWal(void *pVnode); int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item); int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes); void vnodeBuildStatusMsg(void *param); +void vnodeConfirmForward(void *param, uint64_t version, int32_t code); void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes); int32_t vnodeInitResources(); diff --git a/src/mnode/src/mnodeDnode.c b/src/mnode/src/mnodeDnode.c index c1b8256a06763414d96e90a11dd62a3530fe9365..4707616ba85c303bc92c21eee8d63d9d2578c60c 100644 --- a/src/mnode/src/mnodeDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -334,7 +334,7 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) { if (pStatus->dnodeId == 0) { mDebug("dnode:%d %s, first access", pDnode->dnodeId, pDnode->dnodeEp); } else { - //mDebug("dnode:%d, status received, access times %d", pDnode->dnodeId, pDnode->lastAccess); + mTrace("dnode:%d, status received, access times %d", pDnode->dnodeId, pDnode->lastAccess); } int32_t openVnodes = htons(pStatus->openVnodes); diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index 9a041aa4fdd2b50164493d85624c2a61d8e56f6e..3a9076335a31e4c5e46db32a6e677541b50b341c 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -165,10 +165,18 @@ static int32_t mnodeVgroupActionUpdate(SSdbOper *pOper) { } mnodeDecDnodeRef(pDnode); } + + free(pNew); } mnodeVgroupUpdateIdPool(pVgroup); + // reset vgid status on vgroup changed + mDebug("vgId:%d, reset sync status to unsynced", pVgroup->vgId); + for (int32_t v = 0; v < pVgroup->numOfVnodes; ++v) { + pVgroup->vnodeGid[v].role = TAOS_SYNC_ROLE_UNSYNCED; + } + mnodeDecVgroupRef(pVgroup); mDebug("vgId:%d, is updated, numOfVnode:%d", pVgroup->vgId, pVgroup->numOfVnodes); @@ -300,6 +308,7 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; if (pVgid->pDnode == pDnode) { + mTrace("dnode:%d, receive status from dnode, vgId:%d status is %d", pVgroup->vgId, pDnode->dnodeId, pVgid->role); pVgid->role = pVload->role; if (pVload->role == TAOS_SYNC_ROLE_MASTER) { pVgroup->inUse = i; diff --git a/src/plugins/http/inc/httpLog.h b/src/plugins/http/inc/httpLog.h index 3712360a1c8e6b6e8d2f1d4951f6547e36b26e9f..f4c20a40d553d8e87bf4599b36e418fe04a21716 100644 --- a/src/plugins/http/inc/httpLog.h +++ b/src/plugins/http/inc/httpLog.h @@ -26,8 +26,6 @@ extern int32_t httpDebugFlag; #define httpInfo(...) { if (httpDebugFlag & DEBUG_INFO) { taosPrintLog("HTP INFO ", 255, __VA_ARGS__); }} #define httpDebug(...) { if (httpDebugFlag & DEBUG_DEBUG) { taosPrintLog("HTP DEBUG ", httpDebugFlag, __VA_ARGS__); }} #define httpTrace(...) { if (httpDebugFlag & DEBUG_TRACE) { taosPrintLog("HTP TRACE ", httpDebugFlag, __VA_ARGS__); }} - -#define httpDebugDump(...) { if (httpDebugFlag & DEBUG_DEBUG) { taosPrintLongString("HTP DEBUG ", httpDebugFlag, __VA_ARGS__); }} -#define httpTraceDump(...) { if (httpDebugFlag & DEBUG_TRACE) { taosPrintLongString("HTP TRACE ", httpDebugFlag, __VA_ARGS__); }} +#define httpTraceL(...){ if (httpDebugFlag & DEBUG_TRACE) { taosPrintLongString("HTP TRACE ", httpDebugFlag, __VA_ARGS__); }} #endif diff --git a/src/plugins/http/src/httpHandle.c b/src/plugins/http/src/httpHandle.c index 056fe425d4a8db454c8141aa4cae775f6fe2bd6f..a89ea7d8f1acf2db542a08a83edfa929f41fbad2 100644 --- a/src/plugins/http/src/httpHandle.c +++ b/src/plugins/http/src/httpHandle.c @@ -313,9 +313,9 @@ bool httpParseRequest(HttpContext* pContext) { return true; } - httpTraceDump("context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, read size:%d, raw data:\n%s", pContext, pContext->fd, - pContext->ipstr, pContext->pThread->label, pContext->pThread->numOfFds, pContext->parser.bufsize, - pContext->parser.buffer); + httpTraceL("context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, read size:%d, raw data:\n%s", pContext, pContext->fd, + pContext->ipstr, pContext->pThread->label, pContext->pThread->numOfFds, pContext->parser.bufsize, + pContext->parser.buffer); if (!httpGetHttpMethod(pContext)) { return false; diff --git a/src/plugins/http/src/httpServer.c b/src/plugins/http/src/httpServer.c index a5009c2347a4a582984338b8257c015196d7b582..d7d7da6668dfbc325ab760fcb9a1d230fbb0925e 100644 --- a/src/plugins/http/src/httpServer.c +++ b/src/plugins/http/src/httpServer.c @@ -108,7 +108,7 @@ bool httpReadDataImp(HttpContext *pContext) { static bool httpDecompressData(HttpContext *pContext) { if (pContext->contentEncoding != HTTP_COMPRESS_GZIP) { - httpTraceDump("context:%p, fd:%d, ip:%s, content:%s", pContext, pContext->fd, pContext->ipstr, pContext->parser.data.pos); + httpTraceL("context:%p, fd:%d, ip:%s, content:%s", pContext, pContext->fd, pContext->ipstr, pContext->parser.data.pos); return true; } @@ -124,8 +124,8 @@ static bool httpDecompressData(HttpContext *pContext) { if (ret == 0) { memcpy(pContext->parser.data.pos, decompressBuf, decompressBufLen); pContext->parser.data.pos[decompressBufLen] = 0; - httpTraceDump("context:%p, fd:%d, ip:%s, rawSize:%d, decompressSize:%d, content:%s", - pContext, pContext->fd, pContext->ipstr, pContext->parser.data.len, decompressBufLen, decompressBuf); + httpTraceL("context:%p, fd:%d, ip:%s, rawSize:%d, decompressSize:%d, content:%s", pContext, pContext->fd, + pContext->ipstr, pContext->parser.data.len, decompressBufLen, decompressBuf); pContext->parser.data.len = decompressBufLen; } else { httpError("context:%p, fd:%d, ip:%s, failed to decompress data, rawSize:%d, error:%d", diff --git a/src/plugins/http/src/httpSql.c b/src/plugins/http/src/httpSql.c index 9d3efca01d4c97d5db7cc21df8a2c67ff1e39251..c43d928d1b694a812c504d1c5fc70658e4c48c4e 100644 --- a/src/plugins/http/src/httpSql.c +++ b/src/plugins/http/src/httpSql.c @@ -166,8 +166,8 @@ void httpProcessMultiSql(HttpContext *pContext) { HttpSqlCmd *cmd = multiCmds->cmds + multiCmds->pos; char *sql = httpGetCmdsString(pContext, cmd->sql); - httpTraceDump("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, start query, sql:%s", pContext, pContext->fd, - pContext->ipstr, pContext->user, multiCmds->pos, sql); + httpTraceL("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, start query, sql:%s", pContext, pContext->fd, + pContext->ipstr, pContext->user, multiCmds->pos, sql); taosNotePrintHttp(sql); taos_query_a(pContext->session->taos, sql, httpProcessMultiSqlCallBack, (void *)pContext); } @@ -306,8 +306,8 @@ void httpProcessSingleSqlCmd(HttpContext *pContext) { return; } - httpTraceDump("context:%p, fd:%d, ip:%s, user:%s, start query, sql:%s", pContext, pContext->fd, pContext->ipstr, - pContext->user, sql); + httpTraceL("context:%p, fd:%d, ip:%s, user:%s, start query, sql:%s", pContext, pContext->fd, pContext->ipstr, + pContext->user, sql); taosNotePrintHttp(sql); taos_query_a(pSession->taos, sql, httpProcessSingleSqlCallBack, (void *)pContext); } diff --git a/src/plugins/monitor/src/monitorMain.c b/src/plugins/monitor/src/monitorMain.c index 11b701e0ea19aaee51bc07b0bc48eeb543b7c3b7..36468b3fdb692359ae3392b902f4fd31921fd0b6 100644 --- a/src/plugins/monitor/src/monitorMain.c +++ b/src/plugins/monitor/src/monitorMain.c @@ -35,9 +35,6 @@ #define monitorDebug(...) { if (monitorDebugFlag & DEBUG_DEBUG) { taosPrintLog("MON DEBUG ", monitorDebugFlag, __VA_ARGS__); }} #define monitorTrace(...) { if (monitorDebugFlag & DEBUG_TRACE) { taosPrintLog("MON TRACE ", monitorDebugFlag, __VA_ARGS__); }} -#define monitorDebugDump(...) { if (monitorDebugFlag & DEBUG_DEBUG) { taosPrintLongString("MON DEBUG ", monitorDebugFlag, __VA_ARGS__); }} -#define monitorTraceDump(...) { if (monitorDebugFlag & DEBUG_TRACE) { taosPrintLongString("MON TRACE ", monitorDebugFlag, __VA_ARGS__); }} - #define SQL_LENGTH 1024 #define LOG_LEN_STR 100 #define IP_LEN_STR 18 diff --git a/src/plugins/mqtt/inc/mqttLog.h b/src/plugins/mqtt/inc/mqttLog.h index c0515c2c26fb0492d1711ce7aa048b9714adc709..5d5f98a13b3edcd16e47dba4a38b587c95707a6c 100644 --- a/src/plugins/mqtt/inc/mqttLog.h +++ b/src/plugins/mqtt/inc/mqttLog.h @@ -27,7 +27,4 @@ extern int32_t mqttDebugFlag; #define mqttDebug(...) { if (mqttDebugFlag & DEBUG_DEBUG) { taosPrintLog("MQT DEBUG ", mqttDebugFlag, __VA_ARGS__); }} #define mqttTrace(...) { if (mqttDebugFlag & DEBUG_TRACE) { taosPrintLog("MQT TRACE ", mqttDebugFlag, __VA_ARGS__); }} -#define mqttDebugDump(...) { if (mqttDebugFlag & DEBUG_DEBUG) { taosPrintLongString("MQT DEBUG ", mqttDebugFlag, __VA_ARGS__); }} -#define mqttTraceDump(...) { if (mqttDebugFlag & DEBUG_TRACE) { taosPrintLongString("MQT DEBUG ", mqttDebugFlag, __VA_ARGS__); }} - #endif diff --git a/src/rpc/inc/rpcLog.h b/src/rpc/inc/rpcLog.h index 0504ddac434012286127ac70b960891f6e050a31..f0f5c84ff91384dca93199dd9287755b599662e8 100644 --- a/src/rpc/inc/rpcLog.h +++ b/src/rpc/inc/rpcLog.h @@ -31,9 +31,7 @@ extern int32_t tscEmbedded; #define tInfo(...) { if (rpcDebugFlag & DEBUG_INFO) { taosPrintLog("RPC INFO ", tscEmbedded ? 255 : rpcDebugFlag, __VA_ARGS__); }} #define tDebug(...) { if (rpcDebugFlag & DEBUG_DEBUG) { taosPrintLog("RPC DEBUG ", rpcDebugFlag, __VA_ARGS__); }} #define tTrace(...) { if (rpcDebugFlag & DEBUG_TRACE) { taosPrintLog("RPC TRACE ", rpcDebugFlag, __VA_ARGS__); }} - -#define tDebugDump(x, y) { if (rpcDebugFlag & DEBUG_DEBUG) { taosDumpData((unsigned char *)x, y); }} -#define tTraceDump(x, y) { if (rpcDebugFlag & DEBUG_TRACE) { taosDumpData((unsigned char *)x, y); }} +#define tDump(x, y) { if (rpcDebugFlag & DEBUG_DUMP) { taosDumpData((unsigned char *)x, y); }} #ifdef __cplusplus } diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index c05c8c76e16fc96d5a513608e26fcd24956bc7b6..0d5b80e8f3e56ac680b2ab2605740a2fc7d0a9b1 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -538,6 +538,7 @@ void rpcCancelRequest(void *handle) { if (pContext->pConn) { tDebug("%s, app trys to cancel request", pContext->pConn->info); + pContext->pConn->pReqMsg = NULL; rpcCloseConn(pContext->pConn); pContext->pConn = NULL; rpcFreeCont(pContext->pCont); @@ -602,8 +603,13 @@ static void rpcReleaseConn(SRpcConn *pConn) { rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg pConn->pRspMsg = NULL; - if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg); - } + // if server has ever reported progress, free content + if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg); // do not use rpcFreeMsg + } else { + // if there is an outgoing message, free it + if (pConn->outType && pConn->pReqMsg) + rpcFreeMsg(pConn->pReqMsg); + } // memset could not be used, since lockeBy can not be reset pConn->inType = 0; @@ -959,6 +965,7 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { if (pConn->outType) { SRpcReqContext *pContext = pConn->pContext; pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; + pConn->pReqMsg = NULL; taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); } @@ -973,7 +980,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { SRpcInfo *pRpc = (SRpcInfo *)pRecv->shandle; SRpcConn *pConn = (SRpcConn *)pRecv->thandle; - tTraceDump(pRecv->msg, pRecv->msgLen); + tDump(pRecv->msg, pRecv->msgLen); // underlying UDP layer does not know it is server or client pRecv->connType = pRecv->connType | pRpc->connType; @@ -1061,6 +1068,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { SRpcReqContext *pContext = pConn->pContext; rpcMsg.handle = pContext; pConn->pContext = NULL; + pConn->pReqMsg = NULL; // for UDP, port may be changed by server, the port in ipSet shall be used for cache if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { @@ -1247,7 +1255,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno)); } - tTraceDump(msg, msgLen); + tDump(msg, msgLen); } static void rpcProcessConnError(void *param, void *id) { @@ -1297,6 +1305,7 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { tDebug("%s, failed to send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort); if (pConn->pContext) { pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; + pConn->pReqMsg = NULL; taosTmrStart(rpcProcessConnError, 0, pConn->pContext, pRpc->tmrCtrl); rpcReleaseConn(pConn); } diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index e1bed0125518e038f6e328c50462486babd38b8a..7d1a5a52de8cf4982f85e68b388007718a7aa741 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -195,7 +195,6 @@ typedef struct { typedef struct { uint32_t len; uint32_t offset; - // uint32_t padding; uint32_t hasLast : 2; uint32_t numOfBlocks : 30; uint64_t uid; @@ -224,7 +223,7 @@ typedef struct { typedef struct { int16_t colId; - int16_t len; + int32_t len; int32_t type : 8; int32_t offset : 24; int64_t sum; @@ -294,18 +293,64 @@ typedef struct { #define TABLE_SUID(t) (t)->suid #define TABLE_LASTKEY(t) (t)->lastKey -static FORCE_INLINE STSchema *tsdbGetTableSchema(STable *pTable) { - if (pTable->type == TSDB_CHILD_TABLE) { // check child table first - STable *pSuper = pTable->pSuper; - if (pSuper == NULL) return NULL; - return pSuper->schema[pSuper->numOfSchemas - 1]; - } else if (pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_SUPER_TABLE || pTable->type == TSDB_STREAM_TABLE) { - return pTable->schema[pTable->numOfSchemas - 1]; +STsdbMeta* tsdbNewMeta(STsdbCfg* pCfg); +void tsdbFreeMeta(STsdbMeta* pMeta); +int tsdbOpenMeta(STsdbRepo* pRepo); +int tsdbCloseMeta(STsdbRepo* pRepo); +STable* tsdbGetTableByUid(STsdbMeta* pMeta, uint64_t uid); +STSchema* tsdbGetTableSchemaByVersion(STable* pTable, int16_t version); +int tsdbWLockRepoMeta(STsdbRepo* pRepo); +int tsdbRLockRepoMeta(STsdbRepo* pRepo); +int tsdbUnlockRepoMeta(STsdbRepo* pRepo); +void tsdbRefTable(STable* pTable); +void tsdbUnRefTable(STable* pTable); +void tsdbUpdateTableSchema(STsdbRepo* pRepo, STable* pTable, STSchema* pSchema, bool insertAct); + +static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) { + if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) { + return -1; + } else if (*(int16_t *)key1 > schemaVersion(*(STSchema **)key2)) { + return 1; } else { - return NULL; + return 0; } } +static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock, bool copy, int16_t version) { + STable* pDTable = (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable; + STSchema* pSchema = NULL; + STSchema* pTSchema = NULL; + + if (lock) taosRLockLatch(&(pDTable->latch)); + if (version < 0) { // get the latest version of schema + pTSchema = pDTable->schema[pDTable->numOfSchemas - 1]; + } else { // get the schema with version + void* ptr = taosbsearch(&version, pDTable->schema, pDTable->numOfSchemas, sizeof(STSchema*), + tsdbCompareSchemaVersion, TD_EQ); + if (ptr == NULL) { + terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; + goto _exit; + } + pTSchema = *(STSchema**)ptr; + } + + ASSERT(pTSchema != NULL); + + if (copy) { + if ((pSchema = tdDupSchema(pTSchema)) == NULL) terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + } else { + pSchema = pTSchema; + } + +_exit: + if (lock) taosRUnLockLatch(&(pDTable->latch)); + return pSchema; +} + +static FORCE_INLINE STSchema* tsdbGetTableSchema(STable* pTable) { + return tsdbGetTableSchemaImpl(pTable, false, false, -1); +} + static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) { if (pTable->type == TSDB_CHILD_TABLE) { // check child table first STable *pSuper = pTable->pSuper; @@ -318,19 +363,6 @@ static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) { } } -STsdbMeta* tsdbNewMeta(STsdbCfg* pCfg); -void tsdbFreeMeta(STsdbMeta* pMeta); -int tsdbOpenMeta(STsdbRepo* pRepo); -int tsdbCloseMeta(STsdbRepo* pRepo); -STable* tsdbGetTableByUid(STsdbMeta* pMeta, uint64_t uid); -STSchema* tsdbGetTableSchemaByVersion(STable* pTable, int16_t version); -int tsdbUpdateTable(STsdbRepo* pRepo, STable* pTable, STableCfg* pCfg); -int tsdbWLockRepoMeta(STsdbRepo* pRepo); -int tsdbRLockRepoMeta(STsdbRepo* pRepo); -int tsdbUnlockRepoMeta(STsdbRepo* pRepo); -void tsdbRefTable(STable* pTable); -void tsdbUnRefTable(STable* pTable); - // ------------------ tsdbBuffer.c STsdbBufPool* tsdbNewBufPool(); void tsdbFreeBufPool(STsdbBufPool* pBufPool); @@ -405,8 +437,9 @@ int tsdbLoadCompIdx(SRWHelper* pHelper, void* target); int tsdbLoadCompInfo(SRWHelper* pHelper, void* target); int tsdbLoadCompData(SRWHelper* phelper, SCompBlock* pcompblock, void* target); void tsdbGetDataStatis(SRWHelper* pHelper, SDataStatis* pStatis, int numOfCols); -int tsdbLoadBlockDataCols(SRWHelper* pHelper, SCompBlock* pCompBlock, int16_t* colIds, int numOfColIds); -int tsdbLoadBlockData(SRWHelper* pHelper, SCompBlock* pCompBlock); +int tsdbLoadBlockDataCols(SRWHelper* pHelper, SCompBlock* pCompBlock, SCompInfo* pCompInfo, int16_t* colIds, + int numOfColIds); +int tsdbLoadBlockData(SRWHelper* pHelper, SCompBlock* pCompBlock, SCompInfo* pCompInfo); // ------------------ tsdbMain.c #define REPO_ID(r) (r)->config.tsdbId diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 4b9e977a1b741c6998b3c6bca8b7e7ec2cae6a27..6b3160070514b13d8b3b961854346d98d2b400a3 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -41,9 +41,9 @@ typedef struct { } SSubmitBlkIter; typedef struct { - int32_t totalLen; - int32_t len; - SSubmitBlk *pBlock; + int32_t totalLen; + int32_t len; + void * pMsg; } SSubmitMsgIter; static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg); @@ -56,7 +56,7 @@ static STsdbRepo * tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg); static void tsdbFreeRepo(STsdbRepo *pRepo); static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter); static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY now, int32_t *affectedrows); -static SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter); +static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock); static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter); static int tsdbRestoreInfo(STsdbRepo *pRepo); static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter); @@ -68,6 +68,7 @@ static int keyFGroupCompFunc(const void *key, const void *fgroup); static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg); static void * tsdbDecodeCfg(void *buf, STsdbCfg *pCfg); static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable); +static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg); // Function declaration int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) { @@ -164,6 +165,13 @@ int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg * STsdbRepo * pRepo = (STsdbRepo *)repo; SSubmitMsgIter msgIter = {0}; + if (tsdbScanAndConvertSubmitMsg(pRepo, pMsg) < 0) { + if (terrno != TSDB_CODE_TDB_TABLE_RECONFIGURE) { + tsdbError("vgId:%d failed to insert data since %s", REPO_ID(pRepo), tstrerror(terrno)); + } + return -1; + } + if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) { tsdbError("vgId:%d failed to insert data since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; @@ -173,12 +181,14 @@ int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg * int32_t affectedrows = 0; TSKEY now = taosGetTimestamp(pRepo->config.precision); - - while ((pBlock = tsdbGetSubmitMsgNext(&msgIter)) != NULL) { + while (true) { + tsdbGetSubmitMsgNext(&msgIter, &pBlock); + if (pBlock == NULL) break; if (tsdbInsertDataToTable(pRepo, pBlock, now, &affectedrows) < 0) { return -1; } } + if (pRsp != NULL) pRsp->affectedRows = htonl(affectedrows); return 0; } @@ -263,7 +273,7 @@ void tsdbStartStream(TSDB_REPO_T *repo) { STable *pTable = pMeta->tables[i]; if (pTable && pTable->type == TSDB_STREAM_TABLE) { pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql, - tsdbGetTableSchema(pTable)); + tsdbGetTableSchemaImpl(pTable, false, false, -1)); } } } @@ -694,17 +704,12 @@ static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) { return -1; } - pMsg->length = htonl(pMsg->length); - pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); - pMsg->compressed = htonl(pMsg->compressed); - pIter->totalLen = pMsg->length; - pIter->len = TSDB_SUBMIT_MSG_HEAD_SIZE; + pIter->len = 0; + pIter->pMsg = pMsg; if (pMsg->length <= TSDB_SUBMIT_MSG_HEAD_SIZE) { terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; return -1; - } else { - pIter->pBlock = pMsg->blocks; } return 0; @@ -714,26 +719,8 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY STsdbMeta *pMeta = pRepo->tsdbMeta; int64_t points = 0; - STable *pTable = tsdbGetTableByUid(pMeta, pBlock->uid); - if (pTable == NULL || TABLE_TID(pTable) != pBlock->tid) { - tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid, - pBlock->tid); - terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; - return -1; - } - - if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) { - tsdbError("vgId:%d invalid action trying to insert a super table %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable)); - terrno = TSDB_CODE_TDB_INVALID_ACTION; - return -1; - } - - // Check schema version and update schema if needed - if (tsdbCheckTableSchema(pRepo, pBlock, pTable) < 0) { - tsdbError("vgId:%d failed to insert data to table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), - tstrerror(terrno)); - return -1; - } + STable *pTable = pMeta->tables[pBlock->tid]; + ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid); SSubmitBlkIter blkIter = {0}; SDataRow row = NULL; @@ -764,27 +751,23 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY return 0; } -static SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) { - SSubmitBlk *pBlock = pIter->pBlock; - if (pBlock == NULL) return NULL; - - pBlock->dataLen = htonl(pBlock->dataLen); - pBlock->schemaLen = htonl(pBlock->schemaLen); - pBlock->numOfRows = htons(pBlock->numOfRows); - pBlock->uid = htobe64(pBlock->uid); - pBlock->tid = htonl(pBlock->tid); - - pBlock->sversion = htonl(pBlock->sversion); - pBlock->padding = htonl(pBlock->padding); - - pIter->len = pIter->len + sizeof(SSubmitBlk) + pBlock->dataLen; - if (pIter->len >= pIter->totalLen) { - pIter->pBlock = NULL; +static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { + if (pIter->len == 0) { + pIter->len += TSDB_SUBMIT_MSG_HEAD_SIZE; } else { - pIter->pBlock = (SSubmitBlk *)((char *)pBlock + pBlock->dataLen + sizeof(SSubmitBlk)); + SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len); + pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen); } - return pBlock; + if (pIter->len > pIter->totalLen) { + terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; + *pPBlock = NULL; + return -1; + } + + *pPBlock = (pIter->len == pIter->totalLen) ? NULL : (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len); + + return 0; } static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) { @@ -969,42 +952,64 @@ static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) { static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable) { ASSERT(pTable != NULL); - STSchema *pSchema = tsdbGetTableSchema(pTable); + STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); int sversion = schemaVersion(pSchema); - if (pBlock->sversion == sversion) return 0; - if (pBlock->sversion > sversion) { // need to config - tsdbDebug("vgId:%d table %s tid %d has version %d smaller than client version %d, try to config", REPO_ID(pRepo), - TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), sversion, pBlock->sversion); - if (pRepo->appH.configFunc) { - void *msg = (*pRepo->appH.configFunc)(REPO_ID(pRepo), TABLE_TID(pTable)); - if (msg == NULL) { - tsdbError("vgId:%d failed to config table %s tid %d since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), - TABLE_TID(pTable), tstrerror(terrno)); + if (pBlock->sversion == sversion) { + return 0; + } else { + if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE) { // stream table is not allowed to change schema + terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; + return -1; + } + } + + if (pBlock->sversion > sversion) { // may need to update table schema + if (pBlock->schemaLen > 0) { + tsdbDebug( + "vgId:%d table %s tid %d uid %" PRIu64 " schema version %d is out of data, client version %d, update...", + REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), sversion, pBlock->sversion); + ASSERT(pBlock->schemaLen % sizeof(STColumn) == 0); + int numOfCols = pBlock->schemaLen / sizeof(STColumn); + STColumn *pTCol = (STColumn *)pBlock->data; + + STSchemaBuilder schemaBuilder = {0}; + if (tdInitTSchemaBuilder(&schemaBuilder, pBlock->sversion) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbError("vgId:%d failed to update schema of table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), + tstrerror(terrno)); return -1; } - STableCfg *pTableCfg = tsdbCreateTableCfgFromMsg(msg); - if (pTableCfg == NULL) { - rpcFreeCont(msg); - return -1; + for (int i = 0; i < numOfCols; i++) { + if (tdAddColToSchema(&schemaBuilder, pTCol[i].type, htons(pTCol[i].colId), htons(pTCol[i].bytes)) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbError("vgId:%d failed to update schema of table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), + tstrerror(terrno)); + tdDestroyTSchemaBuilder(&schemaBuilder); + return -1; + } } - if (tsdbUpdateTable(pRepo, (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable, pTableCfg) < 0) { - tsdbError("vgId:%d failed to update table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), - tstrerror(terrno)); - tsdbClearTableCfg(pTableCfg); - rpcFreeCont(msg); + STSchema *pNSchema = tdGetSchemaFromBuilder(&schemaBuilder); + if (pNSchema == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tdDestroyTSchemaBuilder(&schemaBuilder); return -1; } - tsdbClearTableCfg(pTableCfg); - rpcFreeCont(msg); + + tdDestroyTSchemaBuilder(&schemaBuilder); + tsdbUpdateTableSchema(pRepo, pTable, pNSchema, true); } else { - terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; + tsdbDebug( + "vgId:%d table %s tid %d uid %" PRIu64 " schema version %d is out of data, client version %d, reconfigure...", + REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), sversion, pBlock->sversion); + terrno = TSDB_CODE_TDB_TABLE_RECONFIGURE; return -1; } } else { - if (tsdbGetTableSchemaByVersion(pTable, pBlock->sversion) == NULL) { + ASSERT(pBlock->sversion >= 0); + if (tsdbGetTableSchemaImpl(pTable, false, false, pBlock->sversion) == NULL) { tsdbError("vgId:%d invalid submit schema version %d to table %s tid %d from client", REPO_ID(pRepo), pBlock->sversion, TABLE_CHAR_NAME(pTable), TABLE_TID(pTable)); } @@ -1013,7 +1018,64 @@ static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pT } return 0; - } +} + +static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) { + ASSERT(pMsg != NULL); + STsdbMeta * pMeta = pRepo->tsdbMeta; + SSubmitMsgIter msgIter = {0}; + SSubmitBlk * pBlock = NULL; + + terrno = TSDB_CODE_SUCCESS; + pMsg->length = htonl(pMsg->length); + pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); + + if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1; + while (true) { + if (tsdbGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; + if (pBlock == NULL) break; + + pBlock->uid = htobe64(pBlock->uid); + pBlock->tid = htonl(pBlock->tid); + pBlock->sversion = htonl(pBlock->sversion); + pBlock->dataLen = htonl(pBlock->dataLen); + pBlock->schemaLen = htonl(pBlock->schemaLen); + pBlock->numOfRows = htons(pBlock->numOfRows); + + if (pBlock->tid <= 0 || pBlock->tid >= pRepo->config.maxTables) { + tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid, + pBlock->tid); + terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; + return -1; + } + + STable *pTable = pMeta->tables[pBlock->tid]; + if (pTable == NULL || TABLE_UID(pTable) != pBlock->uid) { + tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid, + pBlock->tid); + terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; + return -1; + } + + if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) { + tsdbError("vgId:%d invalid action trying to insert a super table %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable)); + terrno = TSDB_CODE_TDB_INVALID_ACTION; + return -1; + } + + // Check schema version and update schema if needed + if (tsdbCheckTableSchema(pRepo, pBlock, pTable) < 0) { + if (terrno == TSDB_CODE_TDB_TABLE_RECONFIGURE) { + continue; + } else { + return -1; + } + } + } + + if (terrno != TSDB_CODE_SUCCESS) return -1; + return 0; +} static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) { // TODO diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index d3b9081a36b5ddc2e16a23d5e4eb7e75e19ae1c3..af86de5aa80db13054dbd0c47d343fe2cbc3e027 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -538,10 +538,12 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe SCommitIter *pIter = iters + tid; if (pIter->pTable == NULL) continue; + taosRLockLatch(&(pIter->pTable->latch)); + tsdbSetHelperTable(pHelper, pIter->pTable, pRepo); if (pIter->pIter != NULL) { - tdInitDataCols(pDataCols, tsdbGetTableSchema(pIter->pTable)); + tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1)); int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5; int nLoop = 0; @@ -557,6 +559,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe int rowsWritten = tsdbWriteDataBlock(pHelper, pDataCols); ASSERT(rowsWritten != 0); if (rowsWritten < 0) { + taosRUnLockLatch(&(pIter->pTable->latch)); tsdbError("vgId:%d failed to write data block to table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable), tstrerror(terrno)); @@ -571,6 +574,8 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe ASSERT(pDataCols->numOfRows == 0); } + taosRUnLockLatch(&(pIter->pTable->latch)); + // Move the last block to the new .l file if neccessary if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) { tsdbError("vgId:%d, failed to move last block, since %s", REPO_ID(pRepo), tstrerror(terrno)); @@ -680,10 +685,10 @@ static int tsdbReadRowsFromCache(STsdbMeta *pMeta, STable *pTable, SSkipListIter if (dataRowKey(row) > maxKey) break; if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { - pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row)); + pSchema = tsdbGetTableSchemaImpl(pTable, true, false, dataRowVersion(row)); if (pSchema == NULL) { // TODO: deal with the error here - ASSERT(false); + ASSERT(0); } } diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 8b79d95af11f03317bc4949060b1ba3d5920f90b..9ebd63ba7e58149077fe4d69d85e394efe6b35c6 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -29,10 +29,9 @@ static void tsdbOrgMeta(void *pHandle); static char * getTagIndexKey(const void *pData); static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper); static void tsdbFreeTable(STable *pTable); -static int tsdbUpdateTableTagSchema(STable *pTable, STSchema *newSchema); -static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx); +static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, bool lock); static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFromIdx, bool lock); -static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable); +static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable, bool refSuper); static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable); static int tsdbInitTableCfg(STableCfg *config, ETableType type, uint64_t uid, int32_t tid); static int tsdbTableSetSchema(STableCfg *config, STSchema *pSchema, bool dup); @@ -76,7 +75,7 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) { // TODO if (super->type != TSDB_SUPER_TABLE) return -1; if (super->tableId.uid != pCfg->superUid) return -1; - tsdbUpdateTable(pRepo, super, pCfg); + // tsdbUpdateTable(pRepo, super, pCfg); } } @@ -84,10 +83,18 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) { if (table == NULL) goto _err; // Register to meta + tsdbWLockRepoMeta(pRepo); if (newSuper) { - if (tsdbAddTableToMeta(pRepo, super, true) < 0) goto _err; + if (tsdbAddTableToMeta(pRepo, super, true, false) < 0) { + tsdbUnlockRepoMeta(pRepo); + goto _err; + } + } + if (tsdbAddTableToMeta(pRepo, table, true, false) < 0) { + tsdbUnlockRepoMeta(pRepo); + goto _err; } - if (tsdbAddTableToMeta(pRepo, table, true) < 0) goto _err; + tsdbUnlockRepoMeta(pRepo); // Write to memtable action int tlen1 = (newSuper) ? tsdbGetTableEncodeSize(TSDB_UPDATE_META, super) : 0; @@ -255,7 +262,7 @@ _err: return NULL; } -static int32_t colIdCompar(const void* left, const void* right) { +static UNUSED_FUNC int32_t colIdCompar(const void* left, const void* right) { int16_t colId = *(int16_t*) left; STColumn* p2 = (STColumn*) right; @@ -266,91 +273,118 @@ static int32_t colIdCompar(const void* left, const void* right) { return (colId < p2->colId)? -1:1; } -int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) { +int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) { STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbMeta *pMeta = pRepo->tsdbMeta; + STSchema * pNewSchema = NULL; pMsg->uid = htobe64(pMsg->uid); pMsg->tid = htonl(pMsg->tid); pMsg->tversion = htons(pMsg->tversion); pMsg->colId = htons(pMsg->colId); - pMsg->type = htons(pMsg->type); pMsg->bytes = htons(pMsg->bytes); pMsg->tagValLen = htonl(pMsg->tagValLen); pMsg->numOfTags = htons(pMsg->numOfTags); pMsg->schemaLen = htonl(pMsg->schemaLen); - assert(pMsg->schemaLen == sizeof(STColumn) * pMsg->numOfTags); - - char* d = pMsg->data; - for(int32_t i = 0; i < pMsg->numOfTags; ++i) { - STColumn* pCol = (STColumn*) d; - pCol->colId = htons(pCol->colId); - pCol->bytes = htons(pCol->bytes); - pCol->offset = 0; - - d += sizeof(STColumn); + for (int i = 0; i < pMsg->numOfTags; i++) { + STColumn *pTCol = (STColumn *)pMsg->data + i; + pTCol->bytes = htons(pTCol->bytes); + pTCol->colId = htons(pTCol->colId); } STable *pTable = tsdbGetTableByUid(pMeta, pMsg->uid); - if (pTable == NULL) { - terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; - return -1; - } - if (TABLE_TID(pTable) != pMsg->tid) { + if (pTable == NULL || TABLE_TID(pTable) != pMsg->tid) { + tsdbError("vgId:%d failed to update table tag value since invalid table id %d uid %" PRIu64, REPO_ID(pRepo), + pMsg->tid, pMsg->uid); terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; return -1; } if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) { - tsdbError("vgId:%d failed to update tag value of table %s since its type is %d", REPO_ID(pRepo), - TABLE_CHAR_NAME(pTable), TABLE_TYPE(pTable)); + tsdbError("vgId:%d try to update tag value of a non-child table, invalid action", REPO_ID(pRepo)); terrno = TSDB_CODE_TDB_INVALID_ACTION; return -1; } - if (schemaVersion(tsdbGetTableTagSchema(pTable)) < pMsg->tversion) { - tsdbDebug("vgId:%d server tag version %d is older than client tag version %d, try to config", REPO_ID(pRepo), - schemaVersion(tsdbGetTableTagSchema(pTable)), pMsg->tversion); - void *msg = (*pRepo->appH.configFunc)(pRepo->config.tsdbId, pMsg->tid); - if (msg == NULL) return -1; + if (schemaVersion(pTable->pSuper->tagSchema) > pMsg->tversion) { + tsdbError( + "vgId:%d failed to update tag value of table %s since version out of date, client tag version %d server tag " + "version %d", + REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), pMsg->tversion, schemaVersion(pTable->tagSchema)); + terrno = TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE; + return -1; + } - // Deal with error her - STableCfg *pTableCfg = tsdbCreateTableCfgFromMsg(msg); - STable * super = tsdbGetTableByUid(pMeta, pTableCfg->superUid); - ASSERT(super != NULL); + if (schemaVersion(pTable->pSuper->tagSchema) < pMsg->tversion) { // tag schema out of data, + tsdbDebug("vgId:%d need to update tag schema of table %s tid %d uid %" PRIu64 + " since out of date, current version %d new version %d", + REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), + schemaVersion(pTable->pSuper->tagSchema), pMsg->tversion); - int32_t code = tsdbUpdateTable(pRepo, super, pTableCfg); - if (code != TSDB_CODE_SUCCESS) { - tsdbClearTableCfg(pTableCfg); - return code; + STSchemaBuilder schemaBuilder = {0}; + + STColumn *pTCol = (STColumn *)pMsg->data; + ASSERT(pMsg->schemaLen % sizeof(STColumn) == 0 && pTCol[0].colId == colColId(schemaColAt(pTable->pSuper->tagSchema, 0))); + if (tdInitTSchemaBuilder(&schemaBuilder, pMsg->tversion) < 0) { + tsdbDebug("vgId:%d failed to update tag schema of table %s tid %d uid %" PRIu64 " since out of memory", + REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable)); + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; } - tsdbClearTableCfg(pTableCfg); - rpcFreeCont(msg); + for (int i = 0; i < (pMsg->schemaLen / sizeof(STColumn)); i++) { + if (tdAddColToSchema(&schemaBuilder, pTCol[i].type, pTCol[i].colId, pTCol[i].bytes) < 0) { + tdDestroyTSchemaBuilder(&schemaBuilder); + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + } + pNewSchema = tdGetSchemaFromBuilder(&schemaBuilder); + if (pNewSchema == NULL) { + tdDestroyTSchemaBuilder(&schemaBuilder); + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + tdDestroyTSchemaBuilder(&schemaBuilder); } - STSchema *pTagSchema = tsdbGetTableTagSchema(pTable); - - if (schemaVersion(pTagSchema) > pMsg->tversion) { - tsdbError( - "vgId:%d failed to update tag value of table %s since version out of date, client tag version %d server tag " - "version %d", - REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), pMsg->tversion, schemaVersion(pTable->tagSchema)); - return TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE; + // Chage in memory + if (pNewSchema != NULL) { // change super table tag schema + taosWLockLatch(&(pTable->pSuper->latch)); + STSchema *pOldSchema = pTable->pSuper->tagSchema; + pTable->pSuper->tagSchema = pNewSchema; + tdFreeSchema(pOldSchema); + taosWUnLockLatch(&(pTable->pSuper->latch)); } - if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == pMsg->colId) { + + bool isChangeIndexCol = (pMsg->colId == colColId(schemaColAt(pTable->pSuper->tagSchema, 0))); + // STColumn *pCol = bsearch(&(pMsg->colId), pMsg->data, pMsg->numOfTags, sizeof(STColumn), colIdCompar); + // ASSERT(pCol != NULL); + + if (isChangeIndexCol) { + tsdbWLockRepoMeta(pRepo); tsdbRemoveTableFromIndex(pMeta, pTable); } - // TODO: remove table from index if it is the first column of tag - - // TODO: convert the tag schema from client, and then extract the type and bytes from schema according to colId - STColumn* res = bsearch(&pMsg->colId, pMsg->data, pMsg->numOfTags, sizeof(STColumn), colIdCompar); - assert(res != NULL); + taosWLockLatch(&(pTable->latch)); + tdSetKVRowDataOfCol(&(pTable->tagVal), pMsg->colId, pMsg->type, POINTER_SHIFT(pMsg->data, pMsg->schemaLen)); + taosWUnLockLatch(&(pTable->latch)); + if (isChangeIndexCol) { + tsdbAddTableIntoIndex(pMeta, pTable, false); + tsdbUnlockRepoMeta(pRepo); + } - tdSetKVRowDataOfCol(&pTable->tagVal, pMsg->colId, res->type, pMsg->data + pMsg->schemaLen); - if (schemaColAt(pTagSchema, DEFAULT_TAG_INDEX_COLUMN)->colId == pMsg->colId) { - tsdbAddTableIntoIndex(pMeta, pTable); + // Update on file + int tlen1 = (pNewSchema) ? tsdbGetTableEncodeSize(TSDB_UPDATE_META, pTable->pSuper) : 0; + int tlen2 = tsdbGetTableEncodeSize(TSDB_UPDATE_META, pTable); + void *buf = tsdbAllocBytes(pRepo, tlen1+tlen2); + ASSERT(buf != NULL); + if (pNewSchema) { + void *pBuf = tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, pTable->pSuper); + ASSERT(POINTER_DISTANCE(pBuf, buf) == tlen1); + buf = pBuf; } - return TSDB_CODE_SUCCESS; + tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, pTable); + + return 0; } // ------------------ INTERNAL FUNCTIONS ------------------ @@ -460,56 +494,7 @@ STable *tsdbGetTableByUid(STsdbMeta *pMeta, uint64_t uid) { } STSchema *tsdbGetTableSchemaByVersion(STable *pTable, int16_t version) { - STable *pSearchTable = (pTable->type == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable; - if (pSearchTable == NULL) return NULL; - - void *ptr = taosbsearch(&version, pSearchTable->schema, pSearchTable->numOfSchemas, sizeof(STSchema *), - tsdbCompareSchemaVersion, TD_EQ); - if (ptr == NULL) return NULL; - - return *(STSchema **)ptr; -} - -int tsdbUpdateTable(STsdbRepo *pRepo, STable *pTable, STableCfg *pCfg) { - // TODO: this function can only be called when there is no query and commit on this table - ASSERT(TABLE_TYPE(pTable) != TSDB_CHILD_TABLE); - bool changed = false; - STsdbMeta *pMeta = pRepo->tsdbMeta; - - if ((pTable->type == TSDB_SUPER_TABLE) && (schemaVersion(pTable->tagSchema) < schemaVersion(pCfg->tagSchema))) { - if (tsdbUpdateTableTagSchema(pTable, pCfg->tagSchema) < 0) { - tsdbError("vgId:%d failed to update table %s tag schema since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), - tstrerror(terrno)); - return -1; - } - changed = true; - } - - STSchema *pTSchema = tsdbGetTableSchema(pTable); - if (schemaVersion(pTSchema) < schemaVersion(pCfg->schema)) { - if (pTable->numOfSchemas < TSDB_MAX_TABLE_SCHEMAS) { - pTable->schema[pTable->numOfSchemas++] = tdDupSchema(pCfg->schema); - } else { - ASSERT(pTable->numOfSchemas == TSDB_MAX_TABLE_SCHEMAS); - STSchema *tSchema = tdDupSchema(pCfg->schema); - tdFreeSchema(pTable->schema[0]); - memmove(pTable->schema, pTable->schema + 1, sizeof(STSchema *) * (TSDB_MAX_TABLE_SCHEMAS - 1)); - pTable->schema[pTable->numOfSchemas - 1] = tSchema; - } - - pMeta->maxRowBytes = MAX(pMeta->maxRowBytes, dataRowMaxBytesFromSchema(pCfg->schema)); - pMeta->maxCols = MAX(pMeta->maxCols, schemaNCols(pCfg->schema)); - - changed = true; - } - - if (changed) { - int tlen = tsdbGetTableEncodeSize(TSDB_UPDATE_META, pTable); - void *buf = tsdbAllocBytes(pRepo, tlen); - tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, pTable); - } - - return 0; + return tsdbGetTableSchemaImpl(pTable, true, false, version); } int tsdbWLockRepoMeta(STsdbRepo *pRepo) { @@ -553,7 +538,7 @@ void tsdbRefTable(STable *pTable) { void tsdbUnRefTable(STable *pTable) { int32_t ref = T_REF_DEC(pTable); - tsdbTrace("unref table uid:%"PRIu64", tid:%d, refCount:%d", TABLE_UID(pTable), TABLE_TID(pTable), ref); + tsdbDebug("unref table uid:%"PRIu64", tid:%d, refCount:%d", TABLE_UID(pTable), TABLE_TID(pTable), ref); if (ref == 0) { // tsdbDebug("destory table name:%s uid:%"PRIu64", tid:%d", TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable)); @@ -565,17 +550,36 @@ void tsdbUnRefTable(STable *pTable) { } } -// ------------------ LOCAL FUNCTIONS ------------------ -static int tsdbCompareSchemaVersion(const void *key1, const void *key2) { - if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) { - return -1; - } else if (*(int16_t *)key1 > schemaVersion(*(STSchema **)key2)) { - return 1; +void tsdbUpdateTableSchema(STsdbRepo *pRepo, STable *pTable, STSchema *pSchema, bool insertAct) { + ASSERT(TABLE_TYPE(pTable) != TSDB_STREAM_TABLE && TABLE_TYPE(pTable) != TSDB_SUPER_TABLE); + STsdbMeta *pMeta = pRepo->tsdbMeta; + + STable *pCTable = (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable; + ASSERT(schemaVersion(pSchema) > schemaVersion(pCTable->schema[pCTable->numOfSchemas - 1])); + + taosWLockLatch(&(pCTable->latch)); + if (pCTable->numOfSchemas < TSDB_MAX_TABLE_SCHEMAS) { + pCTable->schema[pCTable->numOfSchemas++] = pSchema; } else { - return 0; + ASSERT(pCTable->numOfSchemas == TSDB_MAX_TABLE_SCHEMAS); + tdFreeSchema(pCTable->schema[0]); + memmove(pCTable->schema, pCTable->schema + 1, sizeof(STSchema *) * (TSDB_MAX_TABLE_SCHEMAS - 1)); + pCTable->schema[pCTable->numOfSchemas - 1] = pSchema; + } + + if (schemaNCols(pSchema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(pSchema); + if (schemaTLen(pSchema) > pMeta->maxRowBytes) pMeta->maxRowBytes = schemaTLen(pSchema); + taosWUnLockLatch(&(pCTable->latch)); + + if (insertAct) { + int tlen = tsdbGetTableEncodeSize(TSDB_UPDATE_META, pCTable); + void *buf = tsdbAllocBytes(pRepo, tlen); + ASSERT(buf != NULL); + tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, pTable); } } +// ------------------ LOCAL FUNCTIONS ------------------ static int tsdbRestoreTable(void *pHandle, void *cont, int contLen) { STsdbRepo *pRepo = (STsdbRepo *)pHandle; STable * pTable = NULL; @@ -587,7 +591,7 @@ static int tsdbRestoreTable(void *pHandle, void *cont, int contLen) { tsdbDecodeTable(cont, &pTable); - if (tsdbAddTableToMeta(pRepo, pTable, false) < 0) { + if (tsdbAddTableToMeta(pRepo, pTable, false, false) < 0) { tsdbFreeTable(pTable); return -1; } @@ -605,7 +609,7 @@ static void tsdbOrgMeta(void *pHandle) { for (int i = 1; i < pCfg->maxTables; i++) { STable *pTable = pMeta->tables[i]; if (pTable != NULL && pTable->type == TSDB_CHILD_TABLE) { - tsdbAddTableIntoIndex(pMeta, pTable); + tsdbAddTableIntoIndex(pMeta, pTable, true); } } } @@ -715,7 +719,7 @@ _err: static void tsdbFreeTable(STable *pTable) { if (pTable) { - tsdbDebug("table %s is destroyed", TABLE_CHAR_NAME(pTable)); + if (pTable->name != NULL) tsdbDebug("table %s is destroyed", TABLE_CHAR_NAME(pTable)); tfree(TABLE_NAME(pTable)); if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) { for (int i = 0; i < TSDB_MAX_TABLE_SCHEMAS; i++) { @@ -735,25 +739,10 @@ static void tsdbFreeTable(STable *pTable) { } } -static int tsdbUpdateTableTagSchema(STable *pTable, STSchema *newSchema) { - ASSERT(pTable->type == TSDB_SUPER_TABLE); - ASSERT(schemaVersion(pTable->tagSchema) < schemaVersion(newSchema)); - STSchema *pOldSchema = pTable->tagSchema; - STSchema *pNewSchema = tdDupSchema(newSchema); - if (pNewSchema == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - pTable->tagSchema = pNewSchema; - tdFreeSchema(pOldSchema); - - return 0; -} - -static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx) { +static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, bool lock) { STsdbMeta *pMeta = pRepo->tsdbMeta; - if (addIdx && tsdbWLockRepoMeta(pRepo) < 0) { + if (lock && tsdbWLockRepoMeta(pRepo) < 0) { tsdbError("vgId:%d failed to add table %s to meta since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), tstrerror(terrno)); return -1; @@ -768,7 +757,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx) { } } else { if (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE && addIdx) { // add STABLE to the index - if (tsdbAddTableIntoIndex(pMeta, pTable) < 0) { + if (tsdbAddTableIntoIndex(pMeta, pTable, true) < 0) { tsdbDebug("vgId:%d failed to add table %s to meta while add table to index since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), tstrerror(terrno)); goto _err; @@ -787,14 +776,15 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx) { } if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) { - STSchema *pSchema = tsdbGetTableSchema(pTable); + STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); if (schemaNCols(pSchema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(pSchema); if (schemaTLen(pSchema) > pMeta->maxRowBytes) pMeta->maxRowBytes = schemaTLen(pSchema); } - if (addIdx && tsdbUnlockRepoMeta(pRepo) < 0) return -1; + if (lock && tsdbUnlockRepoMeta(pRepo) < 0) return -1; if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE && addIdx) { - pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql, tsdbGetTableSchema(pTable)); + pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql, + tsdbGetTableSchemaImpl(pTable, false, false, -1)); } tsdbTrace("vgId:%d table %s tid %d uid %" PRIu64 " is added to meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), @@ -803,7 +793,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx) { _err: tsdbRemoveTableFromMeta(pRepo, pTable, false, false); - if (addIdx) tsdbUnlockRepoMeta(pRepo); + if (lock) tsdbUnlockRepoMeta(pRepo); return -1; } @@ -814,7 +804,7 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro STable * tTable = NULL; STsdbCfg * pCfg = &(pRepo->config); - STSchema *pSchema = tsdbGetTableSchema(pTable); + STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); int maxCols = schemaNCols(pSchema); int maxRowBytes = schemaTLen(pSchema); @@ -848,7 +838,7 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro for (int i = 0; i < pCfg->maxTables; i++) { STable *pTable = pMeta->tables[i]; if (pTable != NULL) { - pSchema = tsdbGetTableSchema(pTable); + pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); maxCols = MAX(maxCols, schemaNCols(pSchema)); maxRowBytes = MAX(maxRowBytes, schemaTLen(pSchema)); } @@ -860,7 +850,7 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro tsdbUnRefTable(pTable); } -static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) { +static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable, bool refSuper) { ASSERT(pTable->type == TSDB_CHILD_TABLE && pTable != NULL); STable *pSTable = tsdbGetTableByUid(pMeta, TABLE_SUID(pTable)); ASSERT(pSTable != NULL); @@ -884,7 +874,7 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) { memcpy(SL_GET_NODE_DATA(pNode), &pTable, sizeof(STable *)); tSkipListPut(pSTable->pIndex, pNode); - T_REF_INC(pSTable); + if (refSuper) T_REF_INC(pSTable); return 0; } @@ -1252,4 +1242,4 @@ static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable) { } return 0; -} +} \ No newline at end of file diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index eab9a5e0565a301d5f6d2f30a16c876c1acb781f..326da07a3662fe6b037a5ce2722ee191719cd0bd 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -218,7 +218,7 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) { pHelper->tableInfo.tid = pTable->tableId.tid; pHelper->tableInfo.uid = pTable->tableId.uid; - STSchema *pSchema = tsdbGetTableSchema(pTable); + STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); pHelper->tableInfo.sversion = schemaVersion(pSchema); tdInitDataCols(pHelper->pDataCols[0], pSchema); @@ -318,7 +318,7 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { ASSERT(pCompBlock->last); if (pCompBlock->numOfSubBlocks > 1) { - if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, pIdx->numOfBlocks - 1)) < 0) return -1; + if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, pIdx->numOfBlocks - 1), NULL) < 0) return -1; ASSERT(pHelper->pDataCols[0]->numOfRows > 0 && pHelper->pDataCols[0]->numOfRows < pCfg->minRowsPerFileBlock); if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0], pHelper->pDataCols[0]->numOfRows, &compBlock, true, true) < 0) @@ -577,11 +577,12 @@ void tsdbGetDataStatis(SRWHelper *pHelper, SDataStatis *pStatis, int numOfCols) } } -int tsdbLoadBlockDataCols(SRWHelper *pHelper, SCompBlock *pCompBlock, int16_t *colIds, int numOfColIds) { +int tsdbLoadBlockDataCols(SRWHelper *pHelper, SCompBlock *pCompBlock, SCompInfo *pCompInfo, int16_t *colIds, int numOfColIds) { ASSERT(pCompBlock->numOfSubBlocks >= 1); // Must be super block int numOfSubBlocks = pCompBlock->numOfSubBlocks; - if (numOfSubBlocks > 1) pCompBlock = (SCompBlock *)POINTER_SHIFT(pHelper->pCompInfo, pCompBlock->offset); + if (numOfSubBlocks > 1) + pCompBlock = (SCompBlock *)POINTER_SHIFT((pCompInfo == NULL) ? pHelper->pCompInfo : pCompInfo, pCompBlock->offset); tdResetDataCols(pHelper->pDataCols[0]); if (tsdbLoadBlockDataColsImpl(pHelper, pCompBlock, pHelper->pDataCols[0], colIds, numOfColIds) < 0) goto _err; @@ -598,10 +599,10 @@ _err: return -1; } -int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock) { - +int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock, SCompInfo *pCompInfo) { int numOfSubBlock = pCompBlock->numOfSubBlocks; - if (numOfSubBlock > 1) pCompBlock = (SCompBlock *)POINTER_SHIFT(pHelper->pCompInfo, pCompBlock->offset); + if (numOfSubBlock > 1) + pCompBlock = (SCompBlock *)POINTER_SHIFT((pCompInfo == NULL) ? pHelper->pCompInfo : pCompInfo, pCompBlock->offset); tdResetDataCols(pHelper->pDataCols[0]); if (tsdbLoadBlockDataImpl(pHelper, pCompBlock, pHelper->pDataCols[0]) < 0) goto _err; @@ -703,6 +704,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa } // Add checksum + ASSERT(pCompCol->len > 0); pCompCol->len += sizeof(TSCKSUM); taosCalcChecksumAppend(0, (uint8_t *)tptr, pCompCol->len); @@ -792,7 +794,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err; } else { // Load - if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx)) < 0) goto _err; + if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err; ASSERT(pHelper->pDataCols[0]->numOfRows <= blockAtIdx(pHelper, blkIdx)->numOfRows); // Merge if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err; @@ -848,7 +850,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err; } else { // Load-Merge-Write // Load - if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx)) < 0) goto _err; + if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err; if (blockAtIdx(pHelper, blkIdx)->last) pHelper->hasOldLastBlock = false; rowsWritten = rows3; diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 6dccb4194ee4f4b8acb2fc2181fdffa824be2706..2b0d6ba7a68520143e0c1a2129f4ba4245fd7dd5 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -593,9 +593,12 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock); } - tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(pCheckInfo->pTableObj)); + STSchema* pSchema = tsdbGetTableSchema(pCheckInfo->pTableObj); + tdInitDataCols(pCheckInfo->pDataCols, pSchema); + tdInitDataCols(pQueryHandle->rhelper.pDataCols[0], pSchema); + tdInitDataCols(pQueryHandle->rhelper.pDataCols[1], pSchema); - if (tsdbLoadBlockData(&(pQueryHandle->rhelper), pBlock) == 0) { + if (tsdbLoadBlockData(&(pQueryHandle->rhelper), pBlock, pCheckInfo->pCompInfo) == 0) { SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo; pBlockLoadInfo->fileGroup = pQueryHandle->pFileGroup; diff --git a/src/util/inc/tlog.h b/src/util/inc/tlog.h index af4e6ae1c22dc282bdf4bbfda21d781ea19c8c7c..1f6a81d4b49a8a44f5c60da9a6c901be96e5d8f6 100644 --- a/src/util/inc/tlog.h +++ b/src/util/inc/tlog.h @@ -26,6 +26,7 @@ extern "C" { #define DEBUG_INFO DEBUG_WARN #define DEBUG_DEBUG 4U #define DEBUG_TRACE 8U +#define DEBUG_DUMP 16U #define DEBUG_SCREEN 64U #define DEBUG_FILE 128U diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index b469a5836ef21b1c804df109bce35af21644748c..1c96ba543caf79fe22e2594658c0e7037448fb76 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -160,6 +160,13 @@ int32_t vnodeDrop(int32_t vgId) { int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) { SVnodeObj *pVnode = param; + + if (pVnode->status != TAOS_VN_STATUS_READY) + return TSDB_CODE_VND_INVALID_STATUS; + + if (pVnode->syncCfg.replica > 1 && pVnode->role == TAOS_SYNC_ROLE_UNSYNCED) + return TSDB_CODE_VND_NOT_SYNCED; + pVnode->status = TAOS_VN_STATUS_UPDATING; int32_t code = vnodeSaveCfg(pVnodeCfg); @@ -408,10 +415,19 @@ void *vnodeGetWal(void *pVnode) { } static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SDMStatusMsg *pStatus) { - if (pVnode->status == TAOS_VN_STATUS_DELETING) return; + int64_t totalStorage = 0; + int64_t compStorage = 0; + int64_t pointsWritten = 0; + + if (pVnode->status != TAOS_VN_STATUS_READY) return; if (pStatus->openVnodes >= TSDB_MAX_VNODES) return; - int64_t totalStorage, compStorage, pointsWritten = 0; - tsdbReportStat(pVnode->tsdb, &pointsWritten, &totalStorage, &compStorage); + + // still need report status when unsynced + if (pVnode->syncCfg.replica > 1 && pVnode->role == TAOS_SYNC_ROLE_UNSYNCED) { + } else if (pVnode->tsdb == NULL) { + } else { + tsdbReportStat(pVnode->tsdb, &pointsWritten, &totalStorage, &compStorage); + } SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++]; pLoad->vgId = htonl(pVnode->vgId); diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 57dbf4dede3edc65a5fe828bbb855f38b5f23eab..c8a6a4ce4e5184ccd3b3b5728d22f7a9ec7f9214 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -46,9 +46,9 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { return TSDB_CODE_VND_MSG_NOT_PROCESSED; } - if (pVnode->status == TAOS_VN_STATUS_DELETING || pVnode->status == TAOS_VN_STATUS_CLOSING) { + if (pVnode->status != TAOS_VN_STATUS_READY) { vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[msgType], pVnode->status); - return TSDB_CODE_VND_INVALID_VGROUP_ID; + return TSDB_CODE_VND_INVALID_STATUS; } // TODO: Later, let slave to support query @@ -234,4 +234,4 @@ int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId) { vDebug("QInfo:%p register qhandle to connect:%p", qhandle, handle); return rpcReportProgress(handle, (char*) killQueryMsg, sizeof(SRetrieveTableMsg)); -} \ No newline at end of file +} diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 171557acb61b78b3ac8ac7dff7531a21a654daf6..dad079c4b87b8d1f3823f58650caafd72dc746d7 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -58,7 +58,7 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { if (pHead->version == 0) { // from client or CQ if (pVnode->status != TAOS_VN_STATUS_READY) - return TSDB_CODE_VND_INVALID_VGROUP_ID; // it may be in deleting or closing state + return TSDB_CODE_VND_INVALID_STATUS; // it may be in deleting or closing state if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) return TSDB_CODE_RPC_NOT_READY; @@ -89,21 +89,25 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { return syncCode; } +void vnodeConfirmForward(void *param, uint64_t version, int32_t code) { + SVnodeObj *pVnode = (SVnodeObj *)param; + syncConfirmForward(pVnode->sync, version, code); +} + static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) { int32_t code = TSDB_CODE_SUCCESS; + vTrace("vgId:%d, submit msg is processed", pVnode->vgId); + // save insert result into item + SShellSubmitRspMsg *pRsp = NULL; + if (pRet) { + pRet->len = sizeof(SShellSubmitRspMsg); + pRet->rsp = rpcMallocCont(pRet->len); + pRsp = pRet->rsp; + } - vTrace("vgId:%d, submit msg is processed", pVnode->vgId); - - pRet->len = sizeof(SShellSubmitRspMsg); - pRet->rsp = rpcMallocCont(pRet->len); - SShellSubmitRspMsg *pRsp = pRet->rsp; if (tsdbInsertData(pVnode->tsdb, pCont, pRsp) < 0) code = terrno; - pRsp->numOfFailedBlocks = 0; //TODO - //pRet->len += pRsp->numOfFailedBlocks * sizeof(SShellSubmitRspBlock); //TODO - pRsp->code = 0; - pRsp->numOfRows = htonl(1); return code; } @@ -158,7 +162,7 @@ static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet } static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) { - if (tsdbUpdateTagValue(pVnode->tsdb, (SUpdateTableTagValMsg *)pCont) < 0) { + if (tsdbUpdateTableTagValue(pVnode->tsdb, (SUpdateTableTagValMsg *)pCont) < 0) { return terrno; } return TSDB_CODE_SUCCESS; diff --git a/tests/pytest/alter/alter_table_crash.py b/tests/pytest/alter/alter_table_crash.py index aefe9ff26e1493c189122f6827d7cd6d9f546d41..d1af022e35e636b15bc6e0fa175c7c41ed1ff640 100644 --- a/tests/pytest/alter/alter_table_crash.py +++ b/tests/pytest/alter/alter_table_crash.py @@ -78,5 +78,6 @@ class TDTestCase: tdSql.close() tdLog.success("%s successfully executed" % __file__) + tdCases.addWindows(__file__, TDTestCase()) tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/pytest/client/client.py b/tests/pytest/client/client.py index 21df7e6a86b83287a9a6d740c8b7a9dbe260b597..9bb1887216b6675c4ac8469ab16a89905fc788e9 100644 --- a/tests/pytest/client/client.py +++ b/tests/pytest/client/client.py @@ -40,6 +40,9 @@ class TDTestCase: ret = tdSql.query('select server_status() as result') tdSql.checkData(0, 0, 1) + ret = tdSql.execute('alter dnode 127.0.0.1 debugFlag 135') + tdLog.info("alter dnode ret: %d" % ret) + def stop(self): tdSql.close() tdLog.success("%s successfully executed" % __file__) diff --git a/tests/pytest/fulltest.sh b/tests/pytest/fulltest.sh index 066dda5d979bbdb971a5bacaec929a6b3724f0a4..b98e5c3d393e5d6210b358ffc995a02d6a509848 100755 --- a/tests/pytest/fulltest.sh +++ b/tests/pytest/fulltest.sh @@ -1,7 +1,6 @@ #!/bin/bash ulimit -c unlimited -python3 ./test.py -f client/client.py python3 ./test.py -f insert/basic.py python3 ./test.py -f insert/int.py python3 ./test.py -f insert/float.py @@ -151,3 +150,6 @@ python3 ./test.py -f stream/stream2.py #alter table python3 ./test.py -f alter/alter_table_crash.py + +# client +python3 ./test.py -f client/client.py diff --git a/tests/pytest/insert/writeDBNonStop.py b/tests/pytest/insert/writeDBNonStop.py index c89853ffb66023bbce3dcda55b8a2acabc72f1fd..bdc93f0469d8a774a3f8a8675eb65728ccb8aa8d 100644 --- a/tests/pytest/insert/writeDBNonStop.py +++ b/tests/pytest/insert/writeDBNonStop.py @@ -67,7 +67,7 @@ class DBWriteNonStop: self.cursor.execute( "select first(ts), last(ts), min(speed), max(speed), avg(speed), count(*) from st") data = self.cursor.fetchall() - end = datetime.now() + end = datetime.now() self.writeDataToCSVFile(data, (end - start).seconds) time.sleep(.001) @@ -75,8 +75,9 @@ class DBWriteNonStop: self.cursor.close() self.conn.close() + test = DBWriteNonStop() test.connectDB() test.createTable() test.insertData() -test.closeConn() \ No newline at end of file +test.closeConn() diff --git a/tests/pytest/query/filterAllIntTypes.py b/tests/pytest/query/filterAllIntTypes.py index a2bab63c886558d87eb63e3b26b222c83afdebc0..4d91168dae3440443be6daa96747386a687a0f58 100644 --- a/tests/pytest/query/filterAllIntTypes.py +++ b/tests/pytest/query/filterAllIntTypes.py @@ -89,17 +89,25 @@ class TDTestCase: tdSql.checkRows(101) # range for int type on column - tdSql.query("select * from st%s where num > 50 and num < 100" % curType) - tdSql.checkRows(49) + tdSql.query( + "select * from st%s where num > 50 and num < 100" % + curType) + tdSql.checkRows(49) - tdSql.query("select * from st%s where num >= 50 and num < 100" % curType) - tdSql.checkRows(50) + tdSql.query( + "select * from st%s where num >= 50 and num < 100" % + curType) + tdSql.checkRows(50) - tdSql.query("select * from st%s where num > 50 and num <= 100" % curType) - tdSql.checkRows(50) + tdSql.query( + "select * from st%s where num > 50 and num <= 100" % + curType) + tdSql.checkRows(50) - tdSql.query("select * from st%s where num >= 50 and num <= 100" % curType) - tdSql.checkRows(51) + tdSql.query( + "select * from st%s where num >= 50 and num <= 100" % + curType) + tdSql.checkRows(51) # > for int type on tag tdSql.query("select * from st%s where id > 5" % curType) @@ -135,16 +143,22 @@ class TDTestCase: # range for int type on tag tdSql.query("select * from st%s where id > 5 and id < 7" % curType) - tdSql.checkRows(10) + tdSql.checkRows(10) - tdSql.query("select * from st%s where id >= 5 and id < 7" % curType) - tdSql.checkRows(20) + tdSql.query( + "select * from st%s where id >= 5 and id < 7" % + curType) + tdSql.checkRows(20) - tdSql.query("select * from st%s where id > 5 and id <= 7" % curType) - tdSql.checkRows(20) + tdSql.query( + "select * from st%s where id > 5 and id <= 7" % + curType) + tdSql.checkRows(20) - tdSql.query("select * from st%s where id >= 5 and id <= 7" % curType) - tdSql.checkRows(30) + tdSql.query( + "select * from st%s where id >= 5 and id <= 7" % + curType) + tdSql.checkRows(30) print( "======= Verify filter for %s type finished =========" % diff --git a/tests/pytest/query/filterCombo.py b/tests/pytest/query/filterCombo.py index f72b913c92fa1d967aac1e15e050bda23fc7342d..e769addb525433bb05c7a287590c07f88ce86714 100644 --- a/tests/pytest/query/filterCombo.py +++ b/tests/pytest/query/filterCombo.py @@ -52,8 +52,7 @@ class TDTestCase: # illegal condition tdSql.error( - "select * from db.st where ts != '2020-05-13 10:00:00.002' OR tagtype < 2") - tdSql.error("select * from db.st where tagtype <> 1 OR tagtype < 2") + "select * from db.st where ts != '2020-05-13 10:00:00.002' OR tagtype < 2") def stop(self): tdSql.close() diff --git a/tests/pytest/query/queryJoin.py b/tests/pytest/query/queryJoin.py index 6d135e1006ba705d348bf2417605c0defd6b31a8..6ea240a334b4f4baf267aaacf53cfa0940c6c388 100644 --- a/tests/pytest/query/queryJoin.py +++ b/tests/pytest/query/queryJoin.py @@ -77,7 +77,7 @@ class TDTestCase: # join queries tdSql.query( "select * from stb_p, stb_t where stb_p.ts=stb_t.ts and stb_p.id = stb_t.id") - tdSql.checkRows(6) + tdSql.checkRows(6) tdSql.error( "select ts, pressure, temperature, id, dscrption from stb_p, stb_t where stb_p.ts=stb_t.ts and stb_p.id = stb_t.id") @@ -108,7 +108,7 @@ class TDTestCase: tdSql.query("select stb_t.ts, stb_t.dscrption, stb_t.temperature, stb_t.id, stb_p.dscrption, stb_p.pressure from stb_p, stb_t where stb_p.ts=stb_t.ts and stb_p.id = stb_t.id") tdSql.checkRows(6) - + tdSql.query("select stb_t.ts, stb_t.dscrption, stb_t.temperature, stb_t.id, stb_p.dscrption, stb_p.pressure from stb_p, stb_t where stb_p.ts=stb_t.ts and stb_p.id = stb_t.id") tdSql.checkRows(6) diff --git a/tests/pytest/query/queryMetaData.py b/tests/pytest/query/queryMetaData.py index 7b95e4a81cb3807062050099008aa3f73fbb4dab..67df20cb9ad6c399715927c6be5925932e4c2f71 100755 --- a/tests/pytest/query/queryMetaData.py +++ b/tests/pytest/query/queryMetaData.py @@ -55,9 +55,9 @@ class MetadataQuery: def createTablesAndInsertData(self, threadID): cursor = self.connectDB() - cursor.execute("use test") + cursor.execute("use test") - tablesPerThread = int (self.tables / self.numOfTherads) + tablesPerThread = int(self.tables / self.numOfTherads) base = threadID * tablesPerThread for i in range(tablesPerThread): cursor.execute( @@ -68,24 +68,72 @@ class MetadataQuery: %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d')''' % - (base + i + 1, - (base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100, - (base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100, - (base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100, - (base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100, - (base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100, - (base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100)) - + (base + i + 1, (base + i) % + 100, (base + i) % + 10000, (base + i) % + 1000000, (base + i) % + 100000000, (base + i) % + 100 * 1.1, (base + i) % + 100 * 2.3, (base + i) % + 2, (base + i) % + 100, (base + i) % + 100, (base + i) % + 100, (base + i) % + 10000, (base + i) % + 1000000, (base + i) % + 100000000, (base + i) % + 100 * 1.1, (base + i) % + 100 * 2.3, (base + i) % + 2, (base + i) % + 100, (base + i) % + 100, (base + i) % + 100, (base + i) % + 10000, (base + i) % + 1000000, (base + i) % + 100000000, (base + i) % + 100 * 1.1, (base + i) % + 100 * 2.3, (base + i) % + 2, (base + i) % + 100, (base + i) % + 100, (base + i) % + 100, (base + i) % + 10000, (base + i) % + 1000000, (base + i) % + 100000000, (base + i) % + 100 * 1.1, (base + i) % + 100 * 2.3, (base + i) % + 2, (base + i) % + 100, (base + i) % + 100, (base + i) % + 100, (base + i) % + 10000, (base + i) % + 1000000, (base + i) % + 100000000, (base + i) % + 100 * 1.1, (base + i) % + 100 * 2.3, (base + i) % + 2, (base + i) % + 100, (base + i) % + 100, (base + i) % + 100, (base + i) % + 10000, (base + i) % + 1000000, (base + i) % + 100000000, (base + i) % + 100 * 1.1, (base + i) % + 100 * 2.3, (base + i) % + 2, (base + i) % + 100, (base + i) % + 100)) + cursor.execute( "insert into t%d values(%d, 1) (%d, 2) (%d, 3) (%d, 4) (%d, 5)" % (base + i + 1, self.ts + 1, self.ts + 2, self.ts + 3, self.ts + 4, self.ts + 5)) - cursor.close() + cursor.close() def queryData(self, query): cursor = self.connectDB() cursor.execute("use test") - print("================= query tag data =================") + print("================= query tag data =================") startTime = datetime.now() cursor.execute(query) cursor.fetchall() @@ -107,15 +155,15 @@ if __name__ == '__main__': print( "================= Create %d tables and insert %d records into each table =================" % (t.tables, t.records)) - startTime = datetime.now() + startTime = datetime.now() threads = [] for i in range(t.numOfTherads): thread = threading.Thread( target=t.createTablesAndInsertData, args=(i,)) thread.start() threads.append(thread) - - for th in threads: + + for th in threads: th.join() endTime = datetime.now() diff --git a/tests/pytest/query/queryMetaPerformace.py b/tests/pytest/query/queryMetaPerformace.py index 0570311b08bf0c50667897b3f7901784cbb17ff5..8ab7105c9d92c57ff06e39d308c9e80eb6f09814 100644 --- a/tests/pytest/query/queryMetaPerformace.py +++ b/tests/pytest/query/queryMetaPerformace.py @@ -19,6 +19,7 @@ import time from datetime import datetime import numpy as np + class MyThread(threading.Thread): def __init__(self, func, args=()): @@ -35,17 +36,23 @@ class MyThread(threading.Thread): except Exception: return None + class MetadataQuery: def initConnection(self): self.tables = 100 self.records = 10 - self.numOfTherads =5 + self.numOfTherads = 5 self.ts = 1537146000000 self.host = "127.0.0.1" self.user = "root" self.password = "taosdata" self.config = "/etc/taos" - self.conn = taos.connect( self.host, self.user, self.password, self.config) + self.conn = taos.connect( + self.host, + self.user, + self.password, + self.config) + def connectDB(self): return self.conn.cursor() @@ -69,7 +76,7 @@ class MetadataQuery: cursor.execute("use test") base = threadID * self.tables - tablesPerThread = int (self.tables / self.numOfTherads) + tablesPerThread = int(self.tables / self.numOfTherads) for i in range(tablesPerThread): cursor.execute( '''create table t%d using meters tags( @@ -79,20 +86,69 @@ class MetadataQuery: %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d')''' % - (base + i + 1, - (base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100, - (base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100, - (base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100, - (base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100, - (base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100, - (base + i) %100, (base + i) %10000, (base + i) %1000000, (base + i) %100000000, (base + i) %100 * 1.1, (base + i) %100 * 2.3, (base + i) %2, (base + i) %100, (base + i) %100)) + (base + i + 1, (base + i) % + 100, (base + i) % + 10000, (base + i) % + 1000000, (base + i) % + 100000000, (base + i) % + 100 * 1.1, (base + i) % + 100 * 2.3, (base + i) % + 2, (base + i) % + 100, (base + i) % + 100, (base + i) % + 100, (base + i) % + 10000, (base + i) % + 1000000, (base + i) % + 100000000, (base + i) % + 100 * 1.1, (base + i) % + 100 * 2.3, (base + i) % + 2, (base + i) % + 100, (base + i) % + 100, (base + i) % + 100, (base + i) % + 10000, (base + i) % + 1000000, (base + i) % + 100000000, (base + i) % + 100 * 1.1, (base + i) % + 100 * 2.3, (base + i) % + 2, (base + i) % + 100, (base + i) % + 100, (base + i) % + 100, (base + i) % + 10000, (base + i) % + 1000000, (base + i) % + 100000000, (base + i) % + 100 * 1.1, (base + i) % + 100 * 2.3, (base + i) % + 2, (base + i) % + 100, (base + i) % + 100, (base + i) % + 100, (base + i) % + 10000, (base + i) % + 1000000, (base + i) % + 100000000, (base + i) % + 100 * 1.1, (base + i) % + 100 * 2.3, (base + i) % + 2, (base + i) % + 100, (base + i) % + 100, (base + i) % + 100, (base + i) % + 10000, (base + i) % + 1000000, (base + i) % + 100000000, (base + i) % + 100 * 1.1, (base + i) % + 100 * 2.3, (base + i) % + 2, (base + i) % + 100, (base + i) % + 100)) for j in range(self.records): cursor.execute( "insert into t%d values(%d, %d)" % (base + i + 1, self.ts + j, j)) cursor.close() - def queryWithTagId(self, threadId, tagId, queryNum): - print("---------thread%d start-----------"%threadId) + + def queryWithTagId(self, threadId, tagId, queryNum): + print("---------thread%d start-----------" % threadId) query = '''select tgcol1, tgcol2, tgcol3, tgcol4, tgcol5, tgcol6, tgcol7, tgcol8, tgcol9, tgcol10, tgcol11, tgcol12, tgcol13, tgcol14, tgcol15, tgcol16, tgcol17, tgcol18, tgcol19, tgcol20, tgcol21, tgcol22, tgcol23, tgcol24, tgcol25, tgcol26, tgcol27, @@ -103,18 +159,19 @@ class MetadataQuery: latancy = [] cursor = self.connectDB() cursor.execute("use test") - for i in range(queryNum): + for i in range(queryNum): startTime = time.time() - cursor.execute(query.format(id = tagId, condition = i)) + cursor.execute(query.format(id=tagId, condition=i)) cursor.fetchall() - latancy.append((time.time() - startTime)) - print("---------thread%d end-----------"%threadId) + latancy.append((time.time() - startTime)) + print("---------thread%d end-----------" % threadId) return latancy + def queryData(self, query): cursor = self.connectDB() cursor.execute("use test") - print("================= query tag data =================") + print("================= query tag data =================") startTime = datetime.now() cursor.execute(query) cursor.fetchall() @@ -124,7 +181,7 @@ class MetadataQuery: (endTime - startTime).seconds) cursor.close() - #self.conn.close() + # self.conn.close() if __name__ == '__main__': @@ -132,18 +189,33 @@ if __name__ == '__main__': t = MetadataQuery() t.initConnection() - latancys = [] - threads = [] + latancys = [] + threads = [] tagId = 1 - queryNum = 1000 + queryNum = 1000 for i in range(t.numOfTherads): - thread = MyThread(t.queryWithTagId, args = (i, tagId, queryNum)) - threads.append(thread) + thread = MyThread(t.queryWithTagId, args=(i, tagId, queryNum)) + threads.append(thread) thread.start() - for i in range(t.numOfTherads): + for i in range(t.numOfTherads): threads[i].join() - latancys.extend(threads[i].get_result()) - print("Total query: %d"%(queryNum * t.numOfTherads)) - print("statistic(s): mean= %f, P50 = %f, P75 = %f, P95 = %f, P99 = %f" - %(sum(latancys)/(queryNum * t.numOfTherads), np.percentile(latancys, 50), np.percentile(latancys, 75), np.percentile(latancys, 95), np.percentile(latancys, 99))) - + latancys.extend(threads[i].get_result()) + print("Total query: %d" % (queryNum * t.numOfTherads)) + print( + "statistic(s): mean= %f, P50 = %f, P75 = %f, P95 = %f, P99 = %f" % + (sum(latancys) / + ( + queryNum * + t.numOfTherads), + np.percentile( + latancys, + 50), + np.percentile( + latancys, + 75), + np.percentile( + latancys, + 95), + np.percentile( + latancys, + 99))) diff --git a/tests/pytest/query/queryNormal.py b/tests/pytest/query/queryNormal.py index 814c627d89790dd730d1e86d9101a612095678be..712a56d2d726419ee2687512189d08bfc31d4772 100644 --- a/tests/pytest/query/queryNormal.py +++ b/tests/pytest/query/queryNormal.py @@ -36,18 +36,17 @@ class TDTestCase: "insert into tb2 using stb1 tags(2,'tb2', '表2') values ('2020-04-18 15:00:02.000', 3, 2.1), ('2020-04-18 15:00:03.000', 4, 2.2)") # inner join --- bug - tdSql.query("select * from tb1 a, tb2 b where a.ts = b.ts") - tdSql.checkRows(1) + tdSql.error("select * from tb1 a, tb2 b where a.ts = b.ts") # join 3 tables -- bug exists - tdSql.query("select stb_t.ts, stb_t.dscrption, stb_t.temperature, stb_p.id, stb_p.dscrption, stb_p.pressure,stb_v.velocity from stb_p, stb_t, stb_v where stb_p.ts=stb_t.ts and stb_p.ts=stb_v.ts and stb_p.id = stb_t.id") + tdSql.error("select stb_t.ts, stb_t.dscrption, stb_t.temperature, stb_p.id, stb_p.dscrption, stb_p.pressure,stb_v.velocity from stb_p, stb_t, stb_v where stb_p.ts=stb_t.ts and stb_p.ts=stb_v.ts and stb_p.id = stb_t.id") # query show stable tdSql.query("show stables") tdSql.checkRows(1) # query show tables - tdSql.query("show table") + tdSql.query("show tables") tdSql.checkRows(2) # query count @@ -71,16 +70,13 @@ class TDTestCase: tdSql.checkRows(2) # query first ... as - tdSql.query("select first(*) as begin from stb1") - tdSql.checkData(0, 1, 1) + tdSql.error("select first(*) as begin from stb1") # query last ... as - tdSql.query("select last(*) as end from stb1") - tdSql.checkData(0, 1, 4) + tdSql.error("select last(*) as end from stb1") # query last_row ... as - tdSql.query("select last_row(*) as end from stb1") - tdSql.checkData(0, 1, 4) + tdSql.error("select last_row(*) as end from stb1") # query group .. by tdSql.query("select sum(c1), t2 from stb1 group by t2") @@ -95,8 +91,7 @@ class TDTestCase: tdSql.checkRows(1) # query ... alias for table ---- bug - tdSql.query("select t.ts from tb1 t") - tdSql.checkRows(2) + tdSql.error("select t.ts from tb1 t") # query ... tbname tdSql.query("select tbname from stb1") @@ -104,7 +99,7 @@ class TDTestCase: # query ... tbname count ---- bug tdSql.query("select count(tbname) from stb1") - tdSql.checkRows(2) + tdSql.checkData(0, 0, 2) # query ... select database ---- bug tdSql.query("SELECT database()") diff --git a/tests/pytest/query/select_last_crash.py b/tests/pytest/query/select_last_crash.py index 9aeb122f82693853d682497885cc143470a08b3f..9b580a24acd145e2e90ae6ee513e946d72820f2c 100644 --- a/tests/pytest/query/select_last_crash.py +++ b/tests/pytest/query/select_last_crash.py @@ -23,7 +23,6 @@ class TDTestCase: tdLog.debug("start to execute %s" % __file__) tdSql.init(conn.cursor()) - self.rowNum = 5000 self.ts = 1537146000000 @@ -36,15 +35,13 @@ class TDTestCase: "create table t1 using st tags('dev_001')") for i in range(self.rowNum): - tdSql.execute("insert into t1 values(%d, 'taosdata%d', %d)" % (self.ts + i, i + 1, i + 1)) + tdSql.execute( + "insert into t1 values(%d, 'taosdata%d', %d)" % + (self.ts + i, i + 1, i + 1)) tdSql.query("select last(*) from st") tdSql.checkRows(1) - - print( - "======= Verify filter for %s type finished =========" % - curType) - + def stop(self): tdSql.close() tdLog.success("%s successfully executed" % __file__) diff --git a/tests/pytest/random-test/random-test-multi-threading-3.py b/tests/pytest/random-test/random-test-multi-threading-3.py index 7079a5c118b5beaaf4efecd9de4b93bc788b7064..a8e2c26ae5fa4f4dffa7967b09919abfc670c8bf 100644 --- a/tests/pytest/random-test/random-test-multi-threading-3.py +++ b/tests/pytest/random-test/random-test-multi-threading-3.py @@ -330,7 +330,6 @@ class Test (Thread): self.q.put(-1) tdLog.exit("second thread failed, first thread exit too") - elif (self.threadId == 2): while True: self.dbEvent.wait() diff --git a/tests/pytest/regressiontest.sh b/tests/pytest/regressiontest.sh index eada5f67f7dae0d9395d869cfd0e368bb5d71f5a..d3a8deaf47984b144e2318a5edd24f8fcb80909a 100755 --- a/tests/pytest/regressiontest.sh +++ b/tests/pytest/regressiontest.sh @@ -1,7 +1,6 @@ #!/bin/bash ulimit -c unlimited -python3 ./test.py -f client/client.py python3 ./test.py -f insert/basic.py python3 ./test.py -f insert/int.py python3 ./test.py -f insert/float.py @@ -138,6 +137,9 @@ python3 ./test.py -f query/filterOtherTypes.py python3 ./test.py -f query/queryError.py python3 ./test.py -f query/querySort.py python3 ./test.py -f query/queryJoin.py +python3 ./test.py -f query/filterCombo.py +python3 ./test.py -f query/queryNormal.py +python3 ./test.py -f query/select_last_crash.py #stream python3 ./test.py -f stream/stream1.py @@ -146,4 +148,5 @@ python3 ./test.py -f stream/stream2.py #alter table python3 ./test.py -f alter/alter_table_crash.py - +# client +python3 ./test.py -f client/client.py diff --git a/tests/pytest/smoketest.sh b/tests/pytest/smoketest.sh index 6b21912dd59714edcb22a44f0b10a2c2a7f3d0e6..7c14b673e5fac1f5d3b19dd583acc8462e5abab3 100755 --- a/tests/pytest/smoketest.sh +++ b/tests/pytest/smoketest.sh @@ -1,10 +1,6 @@ #!/bin/bash ulimit -c unlimited -# client -python3 ./test.py $1 -f client/client.py -python3 ./test.py $1 -s && sleep 1 - # insert python3 ./test.py $1 -f insert/basic.py python3 ./test.py $1 -s && sleep 1 @@ -35,3 +31,7 @@ python3 ./test.py $1 -s && sleep 1 python3 ./test.py $1 -f query/filter.py python3 ./test.py $1 -s && sleep 1 +# client +python3 ./test.py $1 -f client/client.py +python3 ./test.py $1 -s && sleep 1 + diff --git a/tests/pytest/util/sql.py b/tests/pytest/util/sql.py index dc0366b21428f350c157e33eef3a6308eb39f6f4..d6644df3ac26a33189addc2fe8bf9bf24ea83d25 100644 --- a/tests/pytest/util/sql.py +++ b/tests/pytest/util/sql.py @@ -198,7 +198,7 @@ class TDSql: "%s failed: sql:%s, affectedRows:%d != expect:%d" % (callerFilename, self.sql, self.affectedRows, expectAffectedRows)) tdLog.info("sql:%s, affectedRows:%d == expect:%d" % - (self.sql, self.affectedRows, expectAffectedRows)) + (self.sql, self.affectedRows, expectAffectedRows)) tdSql = TDSql()