diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 252129e2d79f0ec4e85a898207ba1a7cd4223e4d..93a68a4999a7181da7c0fd172177ccba0a685891 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -294,7 +294,7 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle, * There is not response callback function for submit response. * The actual inserted number of points is the first number. */ - if (type == TSDB_MSG_TYPE_DNODE_SUBMIT_RSP) { + if (type == TSDB_MSG_TYPE_SUBMIT_RSP) { pRes->numOfRows += *(int32_t *)pRes->pRsp; tscTrace("%p cmd:%d code:%d, inserted rows:%d, rsp len:%d", pSql, pCmd->command, pRes->code, @@ -512,8 +512,6 @@ int tscProcessSql(SSqlObj *pSql) { return pSql->res.code; } - //TODO change the connect info in metadata - return TSDB_CODE_OTHERS; // if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) { // pSql->index = pMeterMetaInfo->pMeterMeta->index; // } else { // it must be the parent SSqlObj for super table query @@ -1240,14 +1238,14 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pShellMsg = (SShellSubmitMsg *)pMsg; pShellMsg->import = htons(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT) ? 0 : 1); - //pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pMeterMeta->index].vnode); + pShellMsg->vnode = 0; //htons(pMeterMeta->vpeerDesc[pMeterMeta->index].vnode); pShellMsg->numOfSid = htonl(pSql->cmd.numOfTablesInSubmit); // number of meters to be inserted // pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here - pSql->cmd.msgType = TSDB_MSG_TYPE_DNODE_SUBMIT; - //tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pMeterMeta->index].ip), - // htons(pShellMsg->vnode)); - + pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; + tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pMeterMeta->index].ip), + htons(pShellMsg->vnode)); + return TSDB_CODE_SUCCESS; } @@ -1642,10 +1640,12 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { tscTrace("%p msg built success,len:%d bytes", pSql, msgLen); pCmd->payloadLen = msgLen; - pSql->cmd.msgType = TSDB_MSG_TYPE_DNODE_QUERY; + pSql->cmd.msgType = TSDB_MSG_TYPE_QUERY; assert(msgLen + minMsgSize() <= size); + memmove(pSql->cmd.payload, pStart, pSql->cmd.payloadLen - tsRpcHeadSize); + return TSDB_CODE_SUCCESS; } @@ -3337,9 +3337,9 @@ int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) { } void tscInitMsgs() { - tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;// - tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;// - tscBuildMsg[TSDB_SQL_FETCH] = tscBuildRetrieveMsg;// + tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg; + tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg; + tscBuildMsg[TSDB_SQL_FETCH] = tscBuildRetrieveMsg; tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg; tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 280b091cdadfccd7ee9d0c17f052709aec0a6753..26569313f300b916a7b17aa13ab7ab719ca4f2cf 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -853,7 +853,8 @@ char *taos_errstr(TAOS *taos) { STscObj *pObj = (STscObj *)taos; uint8_t code; - if (pObj == NULL || pObj->signature != pObj) return tstrerror(globalCode); + if (pObj == NULL || pObj->signature != pObj) + return (char*)tstrerror(globalCode); SSqlObj* pSql = pObj->pSql; @@ -866,7 +867,7 @@ char *taos_errstr(TAOS *taos) { if (hasAdditionalErrorInfo(code, &pSql->cmd)) { return pSql->cmd.payload; } else { - return tstrerror(code); + return (char*)tstrerror(code); } } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index e1ecaa28d87a975f10179a892a6ff4530b28221d..624220c3734cdf642ea4c977adda77cf4baea931 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -118,7 +118,7 @@ bool tscQueryOnMetric(SSqlCmd* pCmd) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); return ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_QUERY) == TSDB_QUERY_TYPE_STABLE_QUERY) && - (pCmd->msgType == TSDB_MSG_TYPE_DNODE_QUERY); + (pCmd->msgType == TSDB_MSG_TYPE_QUERY); } bool tscQueryMetricTags(SQueryInfo* pQueryInfo) { diff --git a/src/dnode/inc/dnodeMgmt.h b/src/dnode/inc/dnodeMgmt.h index 5158b12ae295be94c71b0936300cdd9de76dff38..399b4b9920625a766f1bead360237673d539b6f0 100644 --- a/src/dnode/inc/dnodeMgmt.h +++ b/src/dnode/inc/dnodeMgmt.h @@ -30,8 +30,8 @@ void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen); void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen); -void dnodeSendVpeerCfgMsg(int32_t vnode); -void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid); +void dnodeSendVnodeCfgMsg(int32_t vnode); +void dnodeSendTableCfgMsg(int32_t vnode, int32_t sid); diff --git a/src/dnode/inc/dnodeWrite.h b/src/dnode/inc/dnodeWrite.h index 03fd81640154da61c3216f3b70928582d1fb5a56..42c94a440c4f370fe0b2b3c3b3d2698c7060b75f 100644 --- a/src/dnode/inc/dnodeWrite.h +++ b/src/dnode/inc/dnodeWrite.h @@ -49,12 +49,12 @@ int32_t dnodeDropTable(SDRemoveTableMsg *pTable); * Create stream * if stream already exist, update it */ -int32_t dnodeCreateStream(SAlterStreamMsg *stream); +int32_t dnodeCreateStream(SDAlterStreamMsg *pStream); /* * Remove all child tables of supertable from local repository */ -int32_t dnodeDropSuperTable(uint64_t stableUid); +int32_t dnodeDropSuperTable(SDRemoveSuperTableMsg *pStable); #ifdef __cplusplus } diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index c9e2b7ce0350840a7abf1e66a1d7b5677192a542..f5b108b81067a95cb4534df44e92ae41e2457ebf 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -121,23 +121,6 @@ void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void //rpcFreeCont(pCont); } -void dnodeProcessTableCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { - int32_t code = htonl(*((int32_t *) pCont)); - - if (code == TSDB_CODE_SUCCESS) { - SDCreateTableMsg *table = (SDCreateTableMsg *) (pCont + sizeof(int32_t)); - dnodeCreateTable(table); - } else if (code == TSDB_CODE_INVALID_TABLE_ID) { - SDRemoveTableMsg *pTable = (SDRemoveTableMsg *) (pCont + sizeof(int32_t)); - pTable->sid = htonl(pTable->sid); - pTable->uid = htobe64(pTable->uid); - dError("table:%s, sid:%d table is not configured, remove it", pTable->tableId, pTable->sid); - dnodeDropTable(pTable); - } else { - dError("code:%d invalid message", code); - } -} - void dnodeProcessCreateTableRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { SDCreateTableMsg *pTable = pCont; pTable->numOfColumns = htons(pTable->numOfColumns); @@ -170,8 +153,14 @@ void dnodeProcessCreateTableRequest(void *pCont, int32_t contLen, int8_t msgType } void dnodeProcessAlterStreamRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { - SAlterStreamMsg *stream = (SAlterStreamMsg *) pCont; - int32_t code = dnodeCreateStream(stream); + SDAlterStreamMsg *pStream = pCont; + pStream->uid = htobe64(pStream->uid); + pStream->stime = htobe64(pStream->stime); + pStream->vnode = htonl(pStream->vnode); + pStream->sid = htonl(pStream->sid); + pStream->status = htonl(pStream->status); + + int32_t code = dnodeCreateStream(pStream); dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); } @@ -206,8 +195,26 @@ void dnodeProcessVPeerCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void } } +void dnodeProcessTableCfgRsp(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { + int32_t code = htonl(*((int32_t *) pCont)); + + if (code == TSDB_CODE_SUCCESS) { + SDCreateTableMsg *table = (SDCreateTableMsg *) (pCont + sizeof(int32_t)); + dnodeCreateTable(table); + } else if (code == TSDB_CODE_INVALID_TABLE_ID) { + SDRemoveTableMsg *pTable = (SDRemoveTableMsg *) (pCont + sizeof(int32_t)); + pTable->sid = htonl(pTable->sid); + pTable->uid = htobe64(pTable->uid); + dError("table:%s, sid:%d table is not configured, remove it", pTable->tableId, pTable->sid); + dnodeDropTable(pTable); + } else { + dError("code:%d invalid message", code); + } +} + void dnodeProcessCreateVnodeRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { SCreateVnodeMsg *pVnode = (SCreateVnodeMsg *) pCont; + int32_t code = dnodeCreateVnode(pVnode); dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); } @@ -215,17 +222,19 @@ void dnodeProcessCreateVnodeRequest(void *pCont, int32_t contLen, int8_t msgType void dnodeProcessFreeVnodeRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { SFreeVnodeMsg *pVnode = (SFreeVnodeMsg *) pCont; int32_t vnode = htonl(pVnode->vnode); + int32_t code = dnodeDropVnode(vnode); dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); } void dnodeProcessDnodeCfgRequest(void *pCont, int32_t contLen, int8_t msgType, void *pConn) { SCfgDnodeMsg *pCfg = (SCfgDnodeMsg *)pCont; + int32_t code = tsCfgDynamicOptions(pCfg->config); dnodeSendRspToMnode(pConn, msgType + 1, code, NULL, 0); } -void dnodeSendVpeerCfgMsg(int32_t vnode) { +void dnodeSendVnodeCfgMsg(int32_t vnode) { SVpeerCfgMsg *cfg = (SVpeerCfgMsg *) rpcMallocCont(sizeof(SVpeerCfgMsg)); if (cfg == NULL) { return; @@ -235,7 +244,7 @@ void dnodeSendVpeerCfgMsg(int32_t vnode) { dnodeSendMsgToMnode(TSDB_MSG_TYPE_VNODE_CFG, cfg, sizeof(SVpeerCfgMsg)); } -void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) { +void dnodeSendTableCfgMsg(int32_t vnode, int32_t sid) { STableCfgMsg *cfg = (STableCfgMsg *) rpcMallocCont(sizeof(STableCfgMsg)); if (cfg == NULL) { return; @@ -248,8 +257,8 @@ void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) { void dnodeInitProcessShellMsg() { dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_TABLE] = dnodeProcessCreateTableRequest; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_REMOVE_TABLE] = dnodeProcessRemoveTableRequest; - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CREATE_VNODE] = dnodeProcessCreateVnodeRequest; - dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_FREE_VNODE] = dnodeProcessFreeVnodeRequest; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_CREATE_VNODE] = dnodeProcessCreateVnodeRequest; + dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_FREE_VNODE] = dnodeProcessFreeVnodeRequest; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_DNODE_CFG] = dnodeProcessDnodeCfgRequest; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_ALTER_STREAM] = dnodeProcessAlterStreamRequest; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_VNODE_CFG_RSP] = dnodeProcessVPeerCfgRsp; diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index f590b3d869b2a6a80d6ce13655435aaf0842978c..246fc39a0c5aadb67a0b247ebb00b7ef55ad103f 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -40,34 +40,32 @@ static void *tsDnodeShellServer = NULL; static int32_t tsDnodeQueryReqNum = 0; static int32_t tsDnodeSubmitReqNum = 0; -void* dnodeProcessMsgFromShell(int8_t msgType, void *pCont, int32_t contLen, void *handle, int32_t index) { +void dnodeProcessMsgFromShell(char msgType, void *pCont, int contLen, void *handle, int32_t code) { assert(handle != NULL); if (pCont == NULL || contLen == 0) { dnodeFreeQInfo(handle); dTrace("conn:%p, free query info", handle); - return NULL; + return; } if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) { rpcSendResponse(handle, TSDB_CODE_NOT_READY, 0, 0); dTrace("conn:%p, query msg is ignored since dnode not running", handle); - return NULL; + return; } - dTrace("conn:%p, msg:%s is received", handle, taosMsg[msgType]); + dTrace("conn:%p, msg:%s is received", handle, taosMsg[(int8_t)msgType]); - if (msgType == TSDB_MSG_TYPE_DNODE_QUERY) { + if (msgType == TSDB_MSG_TYPE_QUERY) { dnodeProcessQueryRequest(pCont, contLen, handle); } else if (msgType == TSDB_MSG_TYPE_RETRIEVE) { dnodeProcessRetrieveRequest(pCont, contLen, handle); - } else if (msgType == TSDB_MSG_TYPE_DNODE_SUBMIT) { + } else if (msgType == TSDB_MSG_TYPE_SUBMIT) { dnodeProcessShellSubmitRequest(pCont, contLen, handle); } else { - dError("conn:%p, msg:%s is not processed", handle, taosMsg[msgType]); + dError("conn:%p, msg:%s is not processed", handle, taosMsg[(int8_t)msgType]); } - - return NULL; } int32_t dnodeInitShell() { @@ -83,7 +81,7 @@ int32_t dnodeInitShell() { rpcInit.localPort = tsVnodeShellPort; rpcInit.label = "DND-shell"; rpcInit.numOfThreads = numOfThreads; - rpcInit.cfp = dnodeProcessMsgFromShell; + rpcInit.cfp = dnodeProcessMsgFromShell; rpcInit.sessions = TSDB_SESSIONS_PER_DNODE; rpcInit.connType = TAOS_CONN_SERVER; rpcInit.idleTime = tsShellActivityTimer * 2000; @@ -187,9 +185,9 @@ void dnodeProcessShellSubmitRequestCb(SShellSubmitRspMsg *result, void *pConn) { for (int i = 0; i < submitRsp->numOfFailedBlocks; ++i) { SShellSubmitRspBlock *block = &submitRsp->failedBlocks[i]; if (block->code == TSDB_CODE_NOT_ACTIVE_VNODE || block->code == TSDB_CODE_INVALID_VNODE_ID) { - dnodeSendVpeerCfgMsg(block->vnode); + dnodeSendVnodeCfgMsg(block->vnode); } else if (block->code == TSDB_CODE_INVALID_TABLE_ID || block->code == TSDB_CODE_NOT_ACTIVE_TABLE) { - dnodeSendMeterCfgMsg(block->vnode, block->sid); + dnodeSendTableCfgMsg(block->vnode, block->sid); } block->index = htonl(block->index); block->vnode = htonl(block->vnode); diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index 6caaa34a1990e75e4afcac3acc47d4ef52cd2237..238fb58e7571fba44464f9182fb901cf5d865b49 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -22,6 +22,8 @@ #include "dnodeVnodeMgmt.h" void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShellSubmitRspMsg *rsp, void *pConn)) { + dTrace("submit msg is disposed, affectrows:1"); + SShellSubmitRspMsg result = {0}; int32_t numOfSid = htonl(pSubmit->numOfSid); @@ -31,7 +33,11 @@ void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShe callback(&result, pConn); } - //TODO: submit implementation + result.code = 0; + result.numOfRows = 1; + result.affectedRows = 1; + result.numOfFailedBlocks = 0; + callback(&result, pConn); } int32_t dnodeCreateTable(SDCreateTableMsg *pTable) { @@ -65,7 +71,6 @@ int32_t dnodeCreateTable(SDCreateTableMsg *pTable) { return TSDB_CODE_SUCCESS; } - /* * Remove table from local repository */ @@ -78,24 +83,16 @@ int32_t dnodeDropTable(SDRemoveTableMsg *pTable) { * Create stream * if stream already exist, update it */ -int32_t dnodeCreateStream(SAlterStreamMsg *stream) { - int32_t vnode = htonl(stream->vnode); - int32_t sid = htonl(stream->sid); - uint64_t uid = htobe64(stream->uid); - - if (!dnodeCheckTableExist(vnode, sid, uid)) { - return TSDB_CODE_INVALID_TABLE; - } - - //TODO create or remove stream - - return 0; +int32_t dnodeCreateStream(SDAlterStreamMsg *pStream) { + dPrint("stream:%s, is created, ", pStream->tableId); + return TSDB_CODE_SUCCESS; } /* * Remove all child tables of supertable from local repository */ -int32_t dnodeDropSuperTable(uint64_t stableUid) { +int32_t dnodeDropSuperTable(SDRemoveSuperTableMsg *pStable) { + dPrint("stable:%s, is removed", pStable->tableId); return TSDB_CODE_SUCCESS; } diff --git a/src/inc/.fuse_hidden0000e2ae00000244 b/src/inc/.fuse_hidden0000e2ae00000244 new file mode 100644 index 0000000000000000000000000000000000000000..6c704221906284c996e6a0fda354bd26c9d5655c --- /dev/null +++ b/src/inc/.fuse_hidden0000e2ae00000244 @@ -0,0 +1,816 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_TAOSMSG_H +#define TDENGINE_TAOSMSG_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include + +#include "taosdef.h" +#include "taoserror.h" +#include "taosdef.h" +#include "trpc.h" + +// message type +#define TSDB_MSG_TYPE_REG 1 +#define TSDB_MSG_TYPE_REG_RSP 2 +#define TSDB_MSG_TYPE_DNODE_SUBMIT 3 +#define TSDB_MSG_TYPE_DNODE_SUBMIT_RSP 4 +#define TSDB_MSG_TYPE_DNODE_QUERY 5 +#define TSDB_MSG_TYPE_DNODE_QUERY_RSP 6 +#define TSDB_MSG_TYPE_RETRIEVE 7 +#define TSDB_MSG_TYPE_RETRIEVE_RSP 8 +#define TSDB_MSG_TYPE_DNODE_CREATE_TABLE 9 +#define TSDB_MSG_TYPE_DNODE_CREATE_TABLE_RSP 10 +#define TSDB_MSG_TYPE_DNODE_REMOVE_TABLE 11 +#define TSDB_MSG_TYPE_DNODE_REMOVE_TABLE_RSP 12 +#define TSDB_MSG_TYPE_DNODE_CREATE_VNODE 13 +#define TSDB_MSG_TYPE_DNODE_VPEERS_RSP 14 +#define TSDB_MSG_TYPE_DNODE_FREE_VNODE 15 +#define TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP 16 +#define TSDB_MSG_TYPE_DNODE_CFG 17 +#define TSDB_MSG_TYPE_DNODE_CFG_RSP 18 +#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM 19 +#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM_RSP 20 +#define TSDB_MSG_TYPE_SDB_SYNC 21 +#define TSDB_MSG_TYPE_SDB_SYNC_RSP 22 +#define TSDB_MSG_TYPE_SDB_FORWARD 23 +#define TSDB_MSG_TYPE_SDB_FORWARD_RSP 24 +#define TSDB_MSG_TYPE_CONNECT 31 +#define TSDB_MSG_TYPE_CONNECT_RSP 32 +#define TSDB_MSG_TYPE_CREATE_ACCT 33 +#define TSDB_MSG_TYPE_CREATE_ACCT_RSP 34 +#define TSDB_MSG_TYPE_ALTER_ACCT 35 +#define TSDB_MSG_TYPE_ALTER_ACCT_RSP 36 +#define TSDB_MSG_TYPE_DROP_ACCT 37 +#define TSDB_MSG_TYPE_DROP_ACCT_RSP 38 +#define TSDB_MSG_TYPE_CREATE_USER 39 +#define TSDB_MSG_TYPE_CREATE_USER_RSP 40 +#define TSDB_MSG_TYPE_ALTER_USER 41 +#define TSDB_MSG_TYPE_ALTER_USER_RSP 42 +#define TSDB_MSG_TYPE_DROP_USER 43 +#define TSDB_MSG_TYPE_DROP_USER_RSP 44 +#define TSDB_MSG_TYPE_CREATE_MNODE 45 +#define TSDB_MSG_TYPE_CREATE_MNODE_RSP 46 +#define TSDB_MSG_TYPE_DROP_MNODE 47 +#define TSDB_MSG_TYPE_DROP_MNODE_RSP 48 +#define TSDB_MSG_TYPE_CREATE_DNODE 49 +#define TSDB_MSG_TYPE_CREATE_DNODE_RSP 50 +#define TSDB_MSG_TYPE_DROP_DNODE 51 +#define TSDB_MSG_TYPE_DROP_DNODE_RSP 52 +#define TSDB_MSG_TYPE_ALTER_DNODE 53 +#define TSDB_MSG_TYPE_ALTER_DNODE_RSP 54 +#define TSDB_MSG_TYPE_CREATE_DB 55 +#define TSDB_MSG_TYPE_CREATE_DB_RSP 56 +#define TSDB_MSG_TYPE_DROP_DB 57 +#define TSDB_MSG_TYPE_DROP_DB_RSP 58 +#define TSDB_MSG_TYPE_USE_DB 59 +#define TSDB_MSG_TYPE_USE_DB_RSP 60 +#define TSDB_MSG_TYPE_ALTER_DB 61 +#define TSDB_MSG_TYPE_ALTER_DB_RSP 62 +#define TSDB_MSG_TYPE_CREATE_TABLE 63 +#define TSDB_MSG_TYPE_CREATE_TABLE_RSP 64 +#define TSDB_MSG_TYPE_DROP_TABLE 65 +#define TSDB_MSG_TYPE_DROP_TABLE_RSP 66 +#define TSDB_MSG_TYPE_ALTER_TABLE 67 +#define TSDB_MSG_TYPE_ALTER_TABLE_RSP 68 +#define TSDB_MSG_TYPE_VNODE_CFG 69 +#define TSDB_MSG_TYPE_VNODE_CFG_RSP 70 +#define TSDB_MSG_TYPE_TABLE_CFG 71 +#define TSDB_MSG_TYPE_TABLE_CFG_RSP 72 +#define TSDB_MSG_TYPE_TABLE_META 73 +#define TSDB_MSG_TYPE_TABLE_META_RSP 74 +#define TSDB_MSG_TYPE_STABLE_META 75 +#define TSDB_MSG_TYPE_STABLE_META_RSP 76 +#define TSDB_MSG_TYPE_MULTI_TABLE_META 77 +#define TSDB_MSG_TYPE_MULTI_TABLE_META_RSP 78 +#define TSDB_MSG_TYPE_ALTER_STREAM 79 +#define TSDB_MSG_TYPE_ALTER_STREAM_RSP 80 +#define TSDB_MSG_TYPE_SHOW 81 +#define TSDB_MSG_TYPE_SHOW_RSP 82 +#define TSDB_MSG_TYPE_CFG_MNODE 83 +#define TSDB_MSG_TYPE_CFG_MNODE_RSP 84 +#define TSDB_MSG_TYPE_KILL_QUERY 85 +#define TSDB_MSG_TYPE_KILL_QUERY_RSP 86 +#define TSDB_MSG_TYPE_KILL_STREAM 87 +#define TSDB_MSG_TYPE_KILL_STREAM_RSP 88 +#define TSDB_MSG_TYPE_KILL_CONNECTION 89 +#define TSDB_MSG_TYPE_KILL_CONNECTION_RSP 90 +#define TSDB_MSG_TYPE_HEARTBEAT 91 +#define TSDB_MSG_TYPE_HEARTBEAT_RSP 92 +#define TSDB_MSG_TYPE_STATUS 93 +#define TSDB_MSG_TYPE_STATUS_RSP 94 +#define TSDB_MSG_TYPE_GRANT 95 +#define TSDB_MSG_TYPE_GRANT_RSP 96 +#define TSDB_MSG_TYPE_MAX 97 + +// IE type +#define TSDB_IE_TYPE_SEC 1 +#define TSDB_IE_TYPE_META 2 +#define TSDB_IE_TYPE_MGMT_IP 3 +#define TSDB_IE_TYPE_DNODE_CFG 4 +#define TSDB_IE_TYPE_NEW_VERSION 5 +#define TSDB_IE_TYPE_DNODE_EXT 6 +#define TSDB_IE_TYPE_DNODE_STATE 7 + +enum _mgmt_table { + TSDB_MGMT_TABLE_ACCT, + TSDB_MGMT_TABLE_USER, + TSDB_MGMT_TABLE_DB, + TSDB_MGMT_TABLE_TABLE, + TSDB_MGMT_TABLE_DNODE, + TSDB_MGMT_TABLE_MNODE, + TSDB_MGMT_TABLE_VGROUP, + TSDB_MGMT_TABLE_METRIC, + TSDB_MGMT_TABLE_MODULE, + TSDB_MGMT_TABLE_QUERIES, + TSDB_MGMT_TABLE_STREAMS, + TSDB_MGMT_TABLE_CONFIGS, + TSDB_MGMT_TABLE_CONNS, + TSDB_MGMT_TABLE_SCORES, + TSDB_MGMT_TABLE_GRANTS, + TSDB_MGMT_TABLE_VNODES, + TSDB_MGMT_TABLE_MAX, +}; + +#define TSDB_ALTER_TABLE_ADD_TAG_COLUMN 1 +#define TSDB_ALTER_TABLE_DROP_TAG_COLUMN 2 +#define TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN 3 +#define TSDB_ALTER_TABLE_UPDATE_TAG_VAL 4 + +#define TSDB_ALTER_TABLE_ADD_COLUMN 5 +#define TSDB_ALTER_TABLE_DROP_COLUMN 6 + +#define TSDB_INTERPO_NONE 0 +#define TSDB_INTERPO_NULL 1 +#define TSDB_INTERPO_SET_VALUE 2 +#define TSDB_INTERPO_LINEAR 3 +#define TSDB_INTERPO_PREV 4 + +#define TSDB_ALTER_USER_PASSWD 0x1 +#define TSDB_ALTER_USER_PRIVILEGES 0x2 + +#define TSDB_KILL_MSG_LEN 30 + +typedef enum { + TSDB_TABLE_TYPE_SUPER_TABLE = 0, // super table + TSDB_TABLE_TYPE_CHILD_TABLE = 1, // table created from super table + TSDB_TABLE_TYPE_NORMAL_TABLE = 2, // ordinary table + TSDB_TABLE_TYPE_STREAM_TABLE = 3, // table created from stream computing + TSDB_TABLE_TYPE_MAX = 4 +} ETableType; + + +#define TSDB_VN_READ_ACCCESS ((char)0x1) +#define TSDB_VN_WRITE_ACCCESS ((char)0x2) +#define TSDB_VN_ALL_ACCCESS (TSDB_VN_READ_ACCCESS | TSDB_VN_WRITE_ACCCESS) + +#define TSDB_COL_NORMAL 0x0U +#define TSDB_COL_TAG 0x1U +#define TSDB_COL_JOIN 0x2U + +extern char *taosMsg[]; + +#pragma pack(push, 1) + +typedef struct { + int32_t vnode; + int32_t sid; + int32_t sversion; + uint64_t uid; + int16_t numOfRows; + char payLoad[]; +} SShellSubmitBlock; + +typedef struct { + int8_t import; + int8_t reserved[3]; + int32_t numOfSid; /* total number of sid */ + char blks[]; /* numOfSid blocks, each blocks for one table */ +} SShellSubmitMsg; + +typedef struct { + int32_t index; // index of failed block in submit blocks + int32_t vnode; // vnode index of failed block + int32_t sid; // table index of failed block + int32_t code; // errorcode while write data to vnode, such as not created, dropped, no space, invalid table +} SShellSubmitRspBlock; + +typedef struct { + int32_t code; // 0-success, > 0 error code + int32_t numOfRows; // number of records the client is trying to write + int32_t affectedRows; // number of records actually written + int32_t failedRows; // number of failed records (exclude duplicate records) + int32_t numOfFailedBlocks; + SShellSubmitRspBlock failedBlocks[]; +} SShellSubmitRspMsg; + +typedef struct SSchema { + uint8_t type; + char name[TSDB_COL_NAME_LEN + 1]; + int16_t colId; + int16_t bytes; +} SSchema; + +typedef struct { + int32_t vnode; //the index of vnode + uint32_t ip; +} SVPeerDesc; + +typedef struct { + int8_t tableType; + int16_t numOfColumns; + int16_t numOfTags; + int32_t sid; + int32_t sversion; + int32_t tagDataLen; + int32_t sqlDataLen; + int32_t contLen; + int32_t numOfVPeers; + uint64_t uid; + uint64_t superTableUid; + uint64_t createdTime; + SVPeerDesc vpeerDesc[TSDB_MAX_MPEERS]; + char tableId[TSDB_TABLE_ID_LEN + 1]; + char superTableId[TSDB_TABLE_ID_LEN + 1]; + char data[]; +} SDCreateTableMsg; + +typedef struct { + char tableId[TSDB_TABLE_ID_LEN + 1]; + char db[TSDB_DB_NAME_LEN + 1]; + int8_t igExists; + int16_t numOfTags; + int16_t numOfColumns; + int16_t sqlLen; // the length of SQL, it starts after schema , sql is a null-terminated string + int16_t reserved[16]; + SSchema schema[]; +} SCreateTableMsg; + +typedef struct { + char tableId[TSDB_TABLE_ID_LEN + 1]; + int8_t igNotExists; +} SDropTableMsg; + +typedef struct { + char tableId[TSDB_TABLE_ID_LEN + 1]; + char db[TSDB_DB_NAME_LEN + 1]; + int16_t type; /* operation type */ + char tagVal[TSDB_MAX_BYTES_PER_ROW]; + int8_t numOfCols; /* number of schema */ + SSchema schema[]; +} SAlterTableMsg; + +typedef struct { + char clientVersion[TSDB_VERSION_LEN]; + char msgVersion[TSDB_VERSION_LEN]; + char db[TSDB_TABLE_ID_LEN + 1]; +} SConnectMsg; + +typedef struct { + char acctId[TSDB_ACCT_LEN + 1]; + char serverVersion[TSDB_VERSION_LEN]; + int8_t writeAuth; + int8_t superAuth; + SRpcIpSet ipList; +} SConnectRsp; + +typedef struct { + int32_t maxUsers; + int32_t maxDbs; + int32_t maxTimeSeries; + int32_t maxConnections; + int32_t maxStreams; + int32_t maxPointsPerSecond; + int64_t maxStorage; // In unit of GB + int64_t maxQueryTime; // In unit of hour + int64_t maxInbound; + int64_t maxOutbound; + int8_t accessState; // Configured only by command +} SAcctCfg; + +typedef struct { + char user[TSDB_USER_LEN + 1]; + char pass[TSDB_KEY_LEN + 1]; + SAcctCfg cfg; +} SCreateAcctMsg, SAlterAcctMsg; + +typedef struct { + char user[TSDB_USER_LEN + 1]; +} SDropUserMsg, SDropAcctMsg; + +typedef struct { + char user[TSDB_USER_LEN + 1]; + char pass[TSDB_KEY_LEN + 1]; + int8_t privilege; + int8_t flag; +} SCreateUserMsg, SAlterUserMsg; + +typedef struct { + char db[TSDB_TABLE_ID_LEN + 1]; +} SMgmtHead; + +typedef struct { + int32_t sid; + int32_t numOfVPeers; + uint64_t uid; + SVPeerDesc vpeerDesc[TSDB_MAX_MPEERS]; + char tableId[TSDB_TABLE_ID_LEN + 1]; +} SDRemoveTableMsg; + +typedef struct { + char tableId[TSDB_TABLE_ID_LEN + 1]; + int64_t uid; +} SDRemoveSuperTableMsg; + +typedef struct { + int32_t vnode; +} SFreeVnodeMsg; + +typedef struct SColIndexEx { + int16_t colId; + /* + * colIdx is the index of column in latest schema of table + * it is available in the client side. Also used to determine + * whether current table schema is up-to-date. + * + * colIdxInBuf is used to denote the index of column in pQuery->colList, + * this value is invalid in client side, as well as in cache block of vnode either. + */ + int16_t colIdx; + int16_t colIdxInBuf; + uint16_t flag; // denote if it is a tag or not +} SColIndexEx; + +/* sql function msg, to describe the message to vnode about sql function + * operations in select clause */ +typedef struct SSqlFuncExprMsg { + int16_t functionId; + int16_t numOfParams; + + SColIndexEx colInfo; + struct ArgElem { + int16_t argType; + int16_t argBytes; + union { + double d; + int64_t i64; + char * pz; + } argValue; + } arg[3]; +} SSqlFuncExprMsg; + +typedef struct SSqlBinaryExprInfo { + struct tSQLBinaryExpr *pBinExpr; /* for binary expression */ + int32_t numOfCols; /* binary expression involves the readed number of columns*/ + SColIndexEx * pReqColumns; /* source column list */ +} SSqlBinaryExprInfo; + +typedef struct SSqlFunctionExpr { + SSqlFuncExprMsg pBase; + SSqlBinaryExprInfo pBinExprInfo; + int16_t resBytes; + int16_t resType; + int16_t interResBytes; +} SSqlFunctionExpr; + +typedef struct SColumnFilterInfo { + int16_t lowerRelOptr; + int16_t upperRelOptr; + int16_t filterOnBinary; /* denote if current column is binary */ + + union { + struct { + int64_t lowerBndi; + int64_t upperBndi; + }; + struct { + double lowerBndd; + double upperBndd; + }; + struct { + int64_t pz; + int64_t len; + }; + }; +} SColumnFilterInfo; + +/* + * for client side struct, we only need the column id, type, bytes are not necessary + * But for data in vnode side, we need all the following information. + */ +typedef struct SColumnInfo { + int16_t colId; + int16_t type; + int16_t bytes; + int16_t numOfFilters; + SColumnFilterInfo *filters; +} SColumnInfo; + +/* + * enable vnode to understand how to group several tables with different tag; + */ +typedef struct STableSidExtInfo { + int32_t sid; + int64_t uid; + TSKEY key; // key for subscription + char tags[]; +} STableSidExtInfo; + +/* + * the outputCols is equalled to or larger than numOfCols + * e.g., select min(colName), max(colName), avg(colName) from table + * the outputCols will be 3 while the numOfCols is 1. + */ +typedef struct { + int16_t vnode; + int32_t numOfSids; + uint64_t pSidExtInfo; // table id & tag info ptr, in windows pointer may + + uint64_t uid; + TSKEY skey; + TSKEY ekey; + + int16_t order; + int16_t orderColId; + + int16_t numOfCols; // the number of columns will be load from vnode + char intervalTimeUnit; // time interval type, for revisement of interval(1d) + + int64_t nAggTimeInterval; // time interval for aggregation, in million second + int64_t slidingTime; // value for sliding window + + // tag schema, used to parse tag information in pSidExtInfo + uint64_t pTagSchema; + + int16_t numOfTagsCols; // required number of tags + int16_t tagLength; // tag length in current query + + int16_t numOfGroupCols; // num of group by columns + int16_t orderByIdx; + int16_t orderType; // used in group by xx order by xxx + uint64_t groupbyTagIds; + + int64_t limit; + int64_t offset; + + int16_t queryType; // denote another query process + int16_t numOfOutputCols; // final output columns numbers + + int16_t interpoType; // interpolate type + uint64_t defaultVal; // default value array list + + int32_t colNameLen; + int64_t colNameList; + + int64_t pSqlFuncExprs; + + int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed + int32_t tsLen; // total length of ts comp block + int32_t tsNumOfBlocks; // ts comp block numbers + int32_t tsOrder; // ts comp block order + SColumnInfo colList[]; +} SQueryTableMsg; + +typedef struct { + char code; + uint64_t qhandle; +} SQueryTableRsp; + +typedef struct { + uint64_t qhandle; + uint16_t free; +} SRetrieveTableMsg; + +typedef struct { + int32_t numOfRows; + int16_t precision; + int64_t offset; // updated offset value for multi-vnode projection query + int64_t useconds; + char data[]; +} SRetrieveTableRsp; + +typedef struct { + uint32_t vnode; + uint32_t vgId; + uint8_t status; + uint8_t dropStatus; + uint8_t accessState; + int64_t totalStorage; + int64_t compStorage; + int64_t pointsWritten; + uint8_t syncStatus; + uint8_t reserved[15]; +} SVnodeLoad; + +typedef struct { + uint32_t vnode; + char accessState; +} SVnodeAccess; + +/* + * NOTE: sizeof(SVnodeCfg) < TSDB_FILE_HEADER_LEN / 4 + */ +typedef struct { + char acct[TSDB_USER_LEN + 1]; + char db[TSDB_DB_NAME_LEN + 1]; + uint32_t vgId; + int32_t maxSessions; + int32_t cacheBlockSize; + union { + int32_t totalBlocks; + float fraction; + } cacheNumOfBlocks; + int32_t daysPerFile; + int32_t daysToKeep1; + int32_t daysToKeep2; + int32_t daysToKeep; + int32_t commitTime; + int32_t rowsInFileBlock; + int16_t blocksPerTable; + int8_t compression; + int8_t commitLog; + int8_t replications; + int8_t repStrategy; + int8_t loadLatest; // load into mem or not + uint8_t precision; // time resolution + int8_t reserved[16]; +} SVnodeCfg, SCreateDbMsg, SDbCfg, SAlterDbMsg; + +typedef struct { + char db[TSDB_TABLE_ID_LEN + 1]; + uint8_t ignoreNotExists; +} SDropDbMsg, SUseDbMsg; + +// IMPORTANT: sizeof(SVnodeStatisticInfo) should not exceed +// TSDB_FILE_HEADER_LEN/4 - TSDB_FILE_HEADER_VERSION_SIZE +typedef struct { + int64_t pointsWritten; // In unit of points + int64_t totalStorage; // In unit of bytes + int64_t compStorage; // In unit of bytes + int64_t queryTime; // In unit of second ?? + char reserved[64]; +} SVnodeStatisticInfo; + +typedef struct { + uint32_t version; + uint32_t publicIp; + uint32_t lastReboot; // time stamp for last reboot + uint16_t numOfCores; + uint8_t alternativeRole; + uint8_t reserve; + uint16_t numOfTotalVnodes; // from config file + uint16_t unused; + float diskAvailable; // GB + uint32_t openVnodes; + char reserved[16]; + SVnodeLoad load[]; +} SStatusMsg; + +typedef struct { + int32_t code; + SRpcIpSet ipList; +} SStatusRsp; + +typedef struct { + uint32_t moduleStatus; + uint32_t createdTime; + uint32_t numOfVnodes; + uint32_t reserved; +} SDnodeState; + +// internal message +typedef struct { + uint32_t destId; + uint32_t destIp; + char tableId[TSDB_UNI_LEN + 1]; + char empty[3]; + uint8_t msgType; + int32_t msgLen; + uint8_t content[0]; +} SIntMsg; + +typedef struct { + char spi; + char encrypt; + char secret[TSDB_KEY_LEN]; // key is changed if updated + char cipheringKey[TSDB_KEY_LEN]; +} SSecIe; + +typedef struct { + int32_t numOfVPeers; + SVPeerDesc vpeerDesc[]; +} SVpeerDescArray; + +typedef struct { + int32_t vnode; + SVnodeCfg cfg; + SVPeerDesc vpeerDesc[TSDB_MAX_MPEERS]; +} SCreateVnodeMsg; + +typedef struct { + char tableId[TSDB_TABLE_ID_LEN + 1]; + int16_t createFlag; + char tags[]; +} STableInfoMsg; + +typedef struct { + int32_t numOfTables; + char tableIds[]; +} SMultiTableInfoMsg; + +typedef struct { + char tableId[TSDB_TABLE_ID_LEN + 1]; +} SSuperTableInfoMsg; + +typedef struct { + int32_t numOfDnodes; + uint32_t dnodeIps[]; +} SSuperTableInfoRsp; + +typedef struct { + int16_t elemLen; + + char tableId[TSDB_TABLE_ID_LEN + 1]; + int16_t orderIndex; + int16_t orderType; // used in group by xx order by xxx + + int16_t rel; // denotes the relation between condition and table list + + int32_t tableCond; // offset value of table name condition + int32_t tableCondLen; + + int32_t cond; // offset of column query condition + int32_t condLen; + + int16_t tagCols[TSDB_MAX_TAGS + 1]; // required tag columns, plus one is for table name + int16_t numOfTags; // required number of tags + + int16_t numOfGroupCols; // num of group by columns + int32_t groupbyTagColumnList; +} SSuperTableMetaElemMsg; + +typedef struct { + int32_t numOfTables; + int32_t join; + int32_t joinCondLen; // for join condition + int32_t metaElem[TSDB_MAX_JOIN_TABLE_NUM]; +} SSuperTableMetaMsg; + +typedef struct { + SVPeerDesc vpeerDesc[TSDB_VNODES_SUPPORT]; + int16_t index; // used locally + int32_t numOfSids; + int32_t pSidExtInfoList[]; // offset value of STableSidExtInfo +} SVnodeSidList; + +typedef struct { + int32_t numOfTables; + int32_t numOfVnodes; + uint16_t tagLen; /* tag value length */ + int32_t list[]; /* offset of SVnodeSidList, compared to the SSuperTableMeta struct */ +} SSuperTableMeta; + +typedef struct STableMeta { + char tableId[TSDB_TABLE_ID_LEN + 1]; // note: This field must be at the front + int32_t contLen; + uint8_t numOfTags : 6; + uint8_t precision : 2; + uint8_t tableType : 4; + uint8_t index : 4; // used locally + int16_t numOfColumns; + int16_t rowSize; // used locally, calculated in client + int16_t sversion; + int8_t numOfVpeers; + SVPeerDesc vpeerDesc[TSDB_VNODES_SUPPORT]; + int32_t sid; + int32_t vgid; + uint64_t uid; + SSchema schema[]; +} STableMeta; + +typedef struct SMultiTableMeta { + int32_t numOfTables; + int32_t contLen; + STableMeta metas[]; +} SMultiTableMeta; + +typedef struct { + char name[TSDB_TABLE_ID_LEN + 1]; + char data[TSDB_MAX_TAGS_LEN]; +} STagData; + +/* + * sql: show tables like '%a_%' + * payload is the query condition, e.g., '%a_%' + * payloadLen is the length of payload + */ +typedef struct { + int8_t type; + char db[TSDB_DB_NAME_LEN + 1]; + uint16_t payloadLen; + char payload[]; +} SShowMsg; + +typedef struct { + uint64_t qhandle; + STableMeta tableMeta; +} SShowRsp; + +typedef struct { + char ip[32]; +} SCreateMnodeMsg, SDropMnodeMsg, SCreateDnodeMsg, SDropDnodeMsg; + +typedef struct { + uint32_t dnode; + int32_t vnode; + int32_t sid; +} STableCfgMsg; + +typedef struct { + uint32_t dnode; + int32_t vnode; +} SVpeerCfgMsg; + +typedef struct { + char ip[32]; + char config[64]; +} SCfgDnodeMsg; + +typedef struct { + char sql[TSDB_SHOW_SQL_LEN + 1]; + uint32_t queryId; + int64_t useconds; + int64_t stime; +} SQueryDesc; + +typedef struct { + char sql[TSDB_SHOW_SQL_LEN + 1]; + uint32_t streamId; + int64_t num; // number of computing/cycles + int64_t useconds; + int64_t ctime; + int64_t stime; + int64_t slidingTime; + int64_t interval; +} SStreamDesc; + +typedef struct { + int32_t numOfQueries; + SQueryDesc qdesc[]; +} SQqueryList; + +typedef struct { + int32_t numOfStreams; + SStreamDesc sdesc[]; +} SStreamList; + +typedef struct { + SQqueryList qlist; + SStreamList slist; +} SHeartBeatMsg; + +typedef struct { + uint32_t queryId; + uint32_t streamId; + int8_t killConnection; + SRpcIpSet ipList; +} SHeartBeatRsp; + +typedef struct { + char queryId[TSDB_KILL_MSG_LEN + 1]; +} SKillQueryMsg, SKillStreamMsg, SKillConnectionMsg; + +typedef struct { + int32_t vnode; + int32_t sid; + uint64_t uid; + uint64_t stime; // stream starting time + int32_t status; + char tableId[TSDB_TABLE_ID_LEN + 1]; +} SDAlterStreamMsg; + +#pragma pack(pop) + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 0a68c8b4f94e93fb11eb4ba61a0de5aff0f3ae7a..daf49cc50a29bcc57fee805fa9758824cbfb2973 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -31,20 +31,20 @@ extern "C" { // message type #define TSDB_MSG_TYPE_REG 1 #define TSDB_MSG_TYPE_REG_RSP 2 -#define TSDB_MSG_TYPE_DNODE_SUBMIT 3 -#define TSDB_MSG_TYPE_DNODE_SUBMIT_RSP 4 -#define TSDB_MSG_TYPE_DNODE_QUERY 5 -#define TSDB_MSG_TYPE_DNODE_QUERY_RSP 6 +#define TSDB_MSG_TYPE_SUBMIT 3 +#define TSDB_MSG_TYPE_SUBMIT_RSP 4 +#define TSDB_MSG_TYPE_QUERY 5 +#define TSDB_MSG_TYPE_QUERY_RSP 6 #define TSDB_MSG_TYPE_RETRIEVE 7 #define TSDB_MSG_TYPE_RETRIEVE_RSP 8 #define TSDB_MSG_TYPE_DNODE_CREATE_TABLE 9 #define TSDB_MSG_TYPE_DNODE_CREATE_TABLE_RSP 10 #define TSDB_MSG_TYPE_DNODE_REMOVE_TABLE 11 #define TSDB_MSG_TYPE_DNODE_REMOVE_TABLE_RSP 12 -#define TSDB_MSG_TYPE_DNODE_CREATE_VNODE 13 -#define TSDB_MSG_TYPE_DNODE_VPEERS_RSP 14 -#define TSDB_MSG_TYPE_DNODE_FREE_VNODE 15 -#define TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP 16 +#define TSDB_MSG_TYPE_CREATE_VNODE 13 +#define TSDB_MSG_TYPE_CREATE_VNODE_RSP 14 +#define TSDB_MSG_TYPE_FREE_VNODE 15 +#define TSDB_MSG_TYPE_FREE_VNODE_RSP 16 #define TSDB_MSG_TYPE_DNODE_CFG 17 #define TSDB_MSG_TYPE_DNODE_CFG_RSP 18 #define TSDB_MSG_TYPE_DNODE_ALTER_STREAM 19 @@ -195,13 +195,13 @@ typedef struct { int32_t sid; int32_t sversion; uint64_t uid; - short numOfRows; + int16_t numOfRows; char payLoad[]; } SShellSubmitBlock; typedef struct { - int8_t import; - int8_t reserved[3]; + int16_t import; + int16_t vnode; int32_t numOfSid; /* total number of sid */ char blks[]; /* numOfSid blocks, each blocks for one table */ } SShellSubmitMsg; @@ -219,7 +219,7 @@ typedef struct { int32_t affectedRows; // number of records actually written int32_t failedRows; // number of failed records (exclude duplicate records) int32_t numOfFailedBlocks; - SShellSubmitRspBlock *failedBlocks; + SShellSubmitRspBlock failedBlocks[]; } SShellSubmitRspMsg; typedef struct SSchema { @@ -336,7 +336,8 @@ typedef struct { } SDRemoveTableMsg; typedef struct { - char tableId[TSDB_TABLE_ID_LEN + 1]; + char tableId[TSDB_TABLE_ID_LEN + 1]; + int64_t uid; } SDRemoveSuperTableMsg; typedef struct { @@ -697,6 +698,7 @@ typedef struct STableMeta { int16_t numOfColumns; int16_t rowSize; // used locally, calculated in client int16_t sversion; + int8_t numOfVpeers; SVPeerDesc vpeerDesc[TSDB_VNODES_SUPPORT]; int32_t sid; int32_t vgid; @@ -802,7 +804,8 @@ typedef struct { uint64_t uid; uint64_t stime; // stream starting time int32_t status; -} SAlterStreamMsg; + char tableId[TSDB_TABLE_ID_LEN + 1]; +} SDAlterStreamMsg; #pragma pack(pop) diff --git a/src/kit/shell/src/shellMain.c b/src/kit/shell/src/shellMain.c index 94220b74da39ee046aa17736f9b567a3c0037b68..eba047df1048320671bca5c46e012f37123e7a4a 100644 --- a/src/kit/shell/src/shellMain.c +++ b/src/kit/shell/src/shellMain.c @@ -86,7 +86,7 @@ int main(int argc, char* argv[]) { { printf("=== this a test for debug usage\n"); void *taos = taos_connect(NULL, "root", "taosdata", NULL, 0); - taos_query(taos, "create table d1.c2 using d1.st2 tags(1)"); + taos_query(taos, "insert into d1.t14 values(now, 1)"); while (1) { sleep(1000); } diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index 34f3fa2e8dd742a7221706d2d9c423b0104c31a0..48de49601d1144a06a1583aefa40b4248dbea095 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -359,7 +359,7 @@ int32_t mgmtCreateChildTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj * return TSDB_CODE_SERV_OUT_OF_MEMORY; } - *pTableOut = pTable; + *pTableOut = (STableInfo *) pTable; mTrace("table:%s, create table in vgroup, vgroup:%d sid:%d vnode:%d uid:%" PRIu64 , pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid); @@ -467,6 +467,7 @@ int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *p pMeta->numOfColumns = htons(pTable->superTable->numOfColumns); pMeta->tableType = pTable->type; pMeta->contLen = sizeof(STableMeta) + mgmtSetSchemaFromSuperTable(pMeta->schema, pTable->superTable); + strcpy(pMeta->tableId, pTable->tableId); SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { @@ -481,6 +482,7 @@ int32_t mgmtGetChildTableMeta(SDbObj *pDb, SChildTableObj *pTable, STableMeta *p pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); } } + pMeta->numOfVpeers = pVgroup->numOfVnodes; return TSDB_CODE_SUCCESS; } diff --git a/src/mnode/src/mgmtDnodeInt.c b/src/mnode/src/mgmtDnodeInt.c index d3e139539f5caf124ff30f258ce81d4f72151be4..821a0585a4ed5813e8f37da88ea953d611a86e25 100644 --- a/src/mnode/src/mgmtDnodeInt.c +++ b/src/mnode/src/mgmtDnodeInt.c @@ -106,8 +106,9 @@ static void mgmtProcessTableCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLe mgmtSendRspToDnode(thandle, msgType + 1, TSDB_CODE_SUCCESS, NULL, 0); + //TODO SRpcIpSet ipSet = mgmtGetIpSetFromIp(pCfg->dnode); - mgmtSendCreateTableMsg(pTable, &ipSet, NULL); + mgmtSendCreateTableMsg(NULL, &ipSet, NULL); } static void mgmtProcessVnodeCfgMsg(int8_t msgType, int8_t *pCont, int32_t contLen, void *pConn) { @@ -216,7 +217,7 @@ void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, vo mTrace("vgroup:%d, send create vnode:%d msg, ahandle:%p", pVgroup->vgId, vnode, ahandle); SCreateVnodeMsg *pVpeer = mgmtBuildVpeersMsg(pVgroup, vnode); if (pVpeer != NULL) { - mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_CREATE_VNODE, pVpeer, sizeof(SCreateVnodeMsg), ahandle); + mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_CREATE_VNODE, pVpeer, sizeof(SCreateVnodeMsg), ahandle); } } @@ -226,7 +227,7 @@ void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *p return; } - mTrace("msg:%d:%s is received from dnode, pConn:%p", msgType, taosMsg[msgType], pConn); + mTrace("msg:%d:%s is received from dnode, pConn:%p", msgType, taosMsg[(int8_t)msgType], pConn); if (msgType == TSDB_MSG_TYPE_TABLE_CFG) { mgmtProcessTableCfgMsg(msgType, pCont, contLen, pConn); @@ -236,9 +237,9 @@ void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *p mgmtProcessCreateTableRsp(msgType, pCont, contLen, pConn, code); } else if (msgType == TSDB_MSG_TYPE_DNODE_REMOVE_TABLE_RSP) { mgmtProcessRemoveTableRsp(msgType, pCont, contLen, pConn, code); - } else if (msgType == TSDB_MSG_TYPE_DNODE_VPEERS_RSP) { + } else if (msgType == TSDB_MSG_TYPE_CREATE_VNODE_RSP) { mgmtProcessCreateVnodeRsp(msgType, pCont, contLen, pConn, code); - } else if (msgType == TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP) { + } else if (msgType == TSDB_MSG_TYPE_FREE_VNODE_RSP) { mgmtProcessFreeVnodeRsp(msgType, pCont, contLen, pConn, code); } else if (msgType == TSDB_MSG_TYPE_DNODE_CFG_RSP) { } else if (msgType == TSDB_MSG_TYPE_ALTER_STREAM_RSP) { @@ -261,7 +262,7 @@ void mgmtSendOneFreeVnodeMsg(int32_t vnode, SRpcIpSet *ipSet, void *ahandle) { SFreeVnodeMsg *pFreeVnode = rpcMallocCont(sizeof(SFreeVnodeMsg)); if (pFreeVnode != NULL) { pFreeVnode->vnode = htonl(vnode); - mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_DNODE_FREE_VNODE, pFreeVnode, sizeof(SFreeVnodeMsg), ahandle); + mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_FREE_VNODE, pFreeVnode, sizeof(SFreeVnodeMsg), ahandle); } } diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index bc1d3f8c32ad819f9a9dd99dddd5b5fcf821ceb8..eff640c209bb4e3085e81d7dd36801ee7d4c2fac 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -395,7 +395,7 @@ int32_t mgmtCreateNormalTable(SCreateTableMsg *pCreate, int32_t contLen, SVgObj return TSDB_CODE_SERV_OUT_OF_MEMORY; } - *pTableOut = pTable; + *pTableOut = (STableInfo *) pTable; mTrace("table:%s, create table in vgroup, vgroup:%d sid:%d vnode:%d uid:%" PRIu64 , pTable->tableId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid); @@ -564,6 +564,7 @@ int32_t mgmtGetNormalTableMeta(SDbObj *pDb, SNormalTableObj *pTable, STableMeta pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); } } + pMeta->numOfVpeers = pVgroup->numOfVnodes; return TSDB_CODE_SUCCESS; } diff --git a/src/mnode/src/mgmtSuperTable.c b/src/mnode/src/mgmtSuperTable.c index fa3d591680d94e952cc28e6cd727613c35209df6..6a74f7616d224749bf9e574c00542d50e34d24d7 100644 --- a/src/mnode/src/mgmtSuperTable.c +++ b/src/mnode/src/mgmtSuperTable.c @@ -631,6 +631,7 @@ int32_t mgmtGetSuperTableMeta(SDbObj *pDb, SSuperTableObj *pTable, STableMeta *p pMeta->numOfColumns = htons(pTable->numOfColumns); pMeta->tableType = pTable->type; pMeta->contLen = sizeof(STableMeta) + mgmtSetSchemaFromSuperTable(pMeta->schema, pTable); + strcpy(pMeta->tableId, pTable->tableId); return TSDB_CODE_SUCCESS; } diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index f456b82aaf0e26acf1f8d38a92980847e87bcc6e..84ae74f3f635c68ed5768b90783a0e45aabbeb78 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -355,7 +355,7 @@ void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, char type, void *pCont, in // connection type is application specific. // for TDengine, all the query, show commands shall have TCP connection - if (type == TSDB_MSG_TYPE_DNODE_QUERY || type == TSDB_MSG_TYPE_RETRIEVE || + if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_RETRIEVE || type == TSDB_MSG_TYPE_STABLE_META || type == TSDB_MSG_TYPE_MULTI_TABLE_META || type == TSDB_MSG_TYPE_SHOW ) pContext->connType = RPC_CONN_TCPC;