diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index e76cc0446c8b7c665e39a96af5e902b88abd1f21..85b9462660b98a49eac6d6c4d79ff3f6f0bcc7c9 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -660,7 +660,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { } // projection query on metric, pipeline retrieve data from vnode list, - // instead of two-stage mergevnodeProcessMsgFromShell free qhandle + // instead of two-stage mergednodeProcessMsgFromShell free qhandle nRows = taos_fetch_block_impl(res, rows); // current subclause is completed, try the next subclause diff --git a/src/dnode/inc/dnodeMgmt.h b/src/dnode/inc/dnodeMgmt.h index f92475f8ae659dda302d28edd15a93c38e92d055..53cef1412f346c44bcc00ef45e62b1e7424af22f 100644 --- a/src/dnode/inc/dnodeMgmt.h +++ b/src/dnode/inc/dnodeMgmt.h @@ -20,6 +20,8 @@ extern "C" { #endif +#include +#include #include "tsched.h" #include "dnode.h" @@ -30,6 +32,9 @@ void dnodeDistributeMsgFromMgmt(char *content, int msgLen, int msgType, SMgmtObj extern void *dmQhandle; +void dnodeSendVpeerCfgMsg(int32_t vnode); +void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid); + #ifdef __cplusplus } #endif diff --git a/src/dnode/inc/dnodeRead.h b/src/dnode/inc/dnodeRead.h index b002d0eefb3d44b3bd5406eb984881384606cdd4..9e8183ca7190f5c466100af85d2fd024868ff86a 100644 --- a/src/dnode/inc/dnodeRead.h +++ b/src/dnode/inc/dnodeRead.h @@ -20,12 +20,24 @@ extern "C" { #endif +#include +#include +#include "taosdef.h" +#include "taosmsg.h" +#include "dnodeShell.h" + +void dnodeFreeQInfoInQueue(SShellObj *pShellObj); + /* * Dnode handle read messages * The processing result is returned by callback function with pShellObj parameter */ int32_t dnodeReadData(SQueryMeterMsg *msg, void *pShellObj, void (*callback)(SQueryMeterRsp *rspMsg, void *pShellObj)); +typedef void (*SDnodeRetrieveCallbackFp)(int32_t code, SRetrieveMeterRsp *pRetrieveRspMsg, void *pShellObj); + +void dnodeRetrieveData(SRetrieveMeterMsg *pMsg, int32_t msgLen, void *pShellObj, SDnodeRetrieveCallbackFp callback); + #ifdef __cplusplus } #endif diff --git a/src/dnode/inc/dnodeShell.h b/src/dnode/inc/dnodeShell.h index bb04767eb8ea6f59aeebed74f5a67df61f717439..a1fa8875abf1cfc2805bfcb067e3ffbd4e5d9312 100644 --- a/src/dnode/inc/dnodeShell.h +++ b/src/dnode/inc/dnodeShell.h @@ -26,7 +26,6 @@ extern "C" { typedef struct { int sid; - int vnode; uint32_t ip; uint16_t port; int32_t count; // track the number of imports @@ -38,6 +37,8 @@ typedef struct { int32_t dnodeInitShell(); +void dnodeCleanupShell(); + //SDnodeStatisInfo dnodeGetStatisInfo() #ifdef __cplusplus diff --git a/src/dnode/inc/dnodeSystem.h b/src/dnode/inc/dnodeSystem.h index 98a9f582f73b143379e4f693f7c6264e98476fad..7e94a642e78aa34305952164b2d24948d29e9408 100644 --- a/src/dnode/inc/dnodeSystem.h +++ b/src/dnode/inc/dnodeSystem.h @@ -37,7 +37,8 @@ extern int32_t (*dnodeInitStorage)(); extern void (*dnodeCleanupStorage)(); extern void (*dnodeParseParameterK)(); extern int32_t tsMaxQueues; - +extern void ** tsRpcQhandle; +extern void *tsQueryQhandle; int32_t dnodeInitSystem(); void dnodeCleanUpSystem(); diff --git a/src/dnode/inc/dnodeUtil.h b/src/dnode/inc/dnodeUtil.h new file mode 100644 index 0000000000000000000000000000000000000000..dfb34b4b74a239ec74d0b3f128f0647051972449 --- /dev/null +++ b/src/dnode/inc/dnodeUtil.h @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_DNODE_UTIL_H +#define TDENGINE_DNODE_UTIL_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include "taosdef.h" +#include "taosmsg.h" +#include "tstatus.h" + +EVnodeStatus dnodeGetVnodeStatus(int32_t vnode); + +bool dnodeCheckVnodeExist(int32_t vnode); + +void *dnodeGetVnodeObj(int32_t vnode); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/dnode/inc/dnodeWrite.h b/src/dnode/inc/dnodeWrite.h index e323f48020cf3938914846db754166cc88ecc422..d10ff8570c5b39eea1618c3958b997ad5148339c 100644 --- a/src/dnode/inc/dnodeWrite.h +++ b/src/dnode/inc/dnodeWrite.h @@ -26,11 +26,13 @@ extern "C" { #include "taosmsg.h" /* - * Write data based on dnode - * If >= 0, it is affect rows - * If < 0, get error code from terrno + * Write data based on dnode, the detail result can be fetched from rsponse + * pSubmitMsg: Data to be written + * pShellObj: Used to pass a communication handle + * callback: Pass the write result through a callback function, possibly in a different thread space + * rsp: will not be freed by callback function */ -int32_t dnodeWriteData(SShellSubmitMsg *msg); +void dnodeWriteData(SShellSubmitMsg *pMsg, void *pShellObj, void (*callback)(SShellSubmitRspMsg *rsp, void *pShellObj)); /* * Check if table already exists diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index d69ca7c2f36ac8b545ea0a6beae6400f8fafc47f..6d7dc1d0326b61e2764fbd150d33491a6f874f43 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -548,7 +548,7 @@ int vnodeProcessCfgDnodeRequest(char *cont, int contLen, SMgmtObj *pMgmtObj) { return 0; } -void vnodeSendVpeerCfgMsg(int vnode) { +void dnodeSendVpeerCfgMsg(int32_t vnode) { char * pMsg, *pStart; int msgLen; SVpeerCfgMsg *pCfg; @@ -566,7 +566,7 @@ void vnodeSendVpeerCfgMsg(int vnode) { taosSendMsgToMnode(pObj, pStart, msgLen); } -int vnodeSendMeterCfgMsg(int vnode, int sid) { +void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid) { char * pMsg, *pStart; int msgLen; SMeterCfgMsg *pCfg; diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index 4a2012aa35cb2219d62e1bbf8c1ce696ddbc76ab..6ed150994a301f9dbaad429302be4035cc44ccc3 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -14,4 +14,48 @@ */ #define _DEFAULT_SOURCE +#include "os.h" +#include "taoserror.h" +#include "tlog.h" +#include "dnodeWrite.h" +#include "dnode.h" +#include "dnodeRead.h" +#include "dnodeSystem.h" +void dnodeFreeQInfoInQueue(SShellObj *pShellObj) { +} + + +void dnodeExecuteRetrieveData(SSchedMsg *pSched) { + SRetrieveMeterMsg *pRetrieve = (SRetrieveMeterMsg *)pSched->msg; + SDnodeRetrieveCallbackFp callback = (SDnodeRetrieveCallbackFp)pSched->thandle; + SShellObj *pObj = (SShellObj *)pSched->ahandle; + SRetrieveMeterRsp result = {0}; + + /* + * in case of server restart, apps may hold qhandle created by server before restart, + * which is actually invalid, therefore, signature check is required. + */ + if (pRetrieve->qhandle != (uint64_t)pObj->qhandle) { + // if free flag is set, client wants to clean the resources + dError("QInfo:%p, qhandle:%p is not matched with saved:%p", pObj->qhandle, pRetrieve->qhandle, pObj->qhandle); + int32_t code = TSDB_CODE_INVALID_QHANDLE; + (*callback)(code, &result, pObj); + } + + //TODO build response here + + free(pSched->msg); +} + +void dnodeRetrieveData(SRetrieveMeterMsg *pMsg, int32_t msgLen, void *pShellObj, SDnodeRetrieveCallbackFp callback) { + int8_t *msg = malloc(msgLen); + memcpy(msg, pMsg, msgLen); + + SSchedMsg schedMsg; + schedMsg.msg = msg; + schedMsg.ahandle = pShellObj; + schedMsg.thandle = callback; + schedMsg.fp = dnodeExecuteRetrieveData; + taosScheduleTask(tsQueryQhandle, &schedMsg); +} diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index f02c7849e7a2a8407b74936864ca7e805fd16548..000b51f244769adcf31ac7b227e837a4a86626ba 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -19,37 +19,33 @@ #include "taosdef.h" #include "taosmsg.h" #include "tlog.h" +#include "tsocket.h" #include "tschemautil.h" #include "textbuffer.h" #include "trpc.h" +#include "http.h" #include "dnode.h" +#include "dnodeMgmt.h" +#include "dnodeRead.h" #include "dnodeSystem.h" #include "dnodeShell.h" +#include "dnodeUtil.h" +#include "dnodeWrite.h" +static void dnodeProcessRetrieveRequest(int8_t *pMsg, int32_t msgLen, SShellObj *pObj); +static void dnodeProcessQueryRequest(int8_t *pMsg, int32_t msgLen, SShellObj *pObj); +static void dnodeProcessShellSubmitRequest(int8_t *pMsg, int32_t msgLen, SShellObj *pObj); +static void *tsDnodeShellServer = NULL; +static SShellObj *tsDnodeShellList = NULL; +static int32_t tsDnodeSelectReqNum = 0; +static int32_t tsDnodeInsertReqNum = 0; +static int32_t tsDnodeShellConns = 0; -int32_t vnodeProcessRetrieveRequest(char *pMsg, int msgLen, SShellObj *pObj); -int32_t vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj); -int32_t vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj); +#define NUM_OF_SESSIONS_PER_VNODE (300) +#define NUM_OF_SESSIONS_PER_DNODE (NUM_OF_SESSIONS_PER_VNODE * TSDB_MAX_VNODES) -static void vnodeProcessBatchSubmitTimer(void *param, void *tmrId); - -static void *pShellServer = NULL; -static SShellObj **shellList = NULL; -static int32_t dnodeSelectReqNum = 0; -static int32_t dnodeInsertReqNum = 0; - -typedef struct { - int32_t import; - int32_t vnode; - int32_t numOfSid; - int32_t ssid; // Start sid - SShellObj *pObj; - int64_t offset; // offset relative the blks - char blks[]; -} SBatchSubmitInfo; - -void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) { +void *dnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) { int sid, vnode; SShellObj *pObj = (SShellObj *)ahandle; SIntMsg * pMsg = (SIntMsg *)msg; @@ -61,11 +57,10 @@ void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) { if (pObj) { pObj->thandle = NULL; dTrace("QInfo:%p %s free qhandle", pObj->qhandle, __FUNCTION__); - vnodeFreeQInfoInQueue(pObj->qhandle); + dnodeFreeQInfoInQueue(pObj); pObj->qhandle = NULL; - vnodeList[pObj->vnode].shellConns--; - dTrace("vid:%d, shell connection:%d is gone, shellConns:%d", pObj->vnode, pObj->sid, - vnodeList[pObj->vnode].shellConns); + tsDnodeShellConns--; + dTrace("shell connection:%d is gone, shellConns:%d", pObj->sid, tsDnodeShellConns); } return NULL; } @@ -73,53 +68,34 @@ void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) { taosGetRpcConnInfo(thandle, &peerId, &peerIp, &peerPort, &vnode, &sid); if (pObj == NULL) { - if (shellList[vnode]) { - pObj = shellList[vnode] + sid; - pObj->thandle = thandle; - pObj->sid = sid; - pObj->vnode = vnode; - pObj->ip = peerIp; - tinet_ntoa(ipstr, peerIp); - vnodeList[pObj->vnode].shellConns++; - dTrace("vid:%d, shell connection:%d from ip:%s is created, shellConns:%d", vnode, sid, ipstr, - vnodeList[pObj->vnode].shellConns); - } else { - dError("vid:%d, vnode not there, shell connection shall be closed", vnode); - return NULL; - } + pObj = tsDnodeShellList + sid; + pObj->thandle = thandle; + pObj->sid = sid; + pObj->ip = peerIp; + tinet_ntoa(ipstr, peerIp); + tsDnodeShellConns--; + dTrace("shell connection:%d from ip:%s is created, shellConns:%d", sid, ipstr, tsDnodeShellConns); } else { - if (pObj != shellList[vnode] + sid) { - dError("vid:%d, shell connection:%d, pObj:%p is not matched with:%p", vnode, sid, pObj, shellList[vnode] + sid); + if (pObj != tsDnodeShellList + sid) { + dError("shell connection:%d, pObj:%p is not matched with:%p", sid, pObj, tsDnodeShellList + sid); return NULL; } } dTrace("vid:%d sid:%d, msg:%s is received pConn:%p", vnode, sid, taosMsg[pMsg->msgType], thandle); + if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) { + taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_NOT_READY); + dTrace("sid:%d, shell query msg is ignored since dnode not running", sid); + return pObj; + } + if (pMsg->msgType == TSDB_MSG_TYPE_QUERY) { - if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_MASTER || vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_SLAVE) { - vnodeProcessQueryRequest((char *)pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj); - } else { - taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_NOT_READY); - dTrace("vid:%d sid:%d, shell query msg is ignored since in status:%s", vnode, sid, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus)); - } + dnodeProcessQueryRequest(pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj); } else if (pMsg->msgType == TSDB_MSG_TYPE_RETRIEVE) { - if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_MASTER || vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_SLAVE) { - vnodeProcessRetrieveRequest((char *) pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj); - } else { - taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_NOT_READY); - dTrace("vid:%d sid:%d, shell retrieve msg is ignored since in status:%s", vnode, sid, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus)); - } + dnodeProcessRetrieveRequest(pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj); } else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) { - if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_MASTER) { - vnodeProcessShellSubmitRequest((char *) pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj); - } else if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_SLAVE) { - taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_REDIRECT); - dTrace("vid:%d sid:%d, shell submit msg is redirect since in status:%s", vnode, sid, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus)); - } else { - taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_NOT_READY); - dTrace("vid:%d sid:%d, shell submit msg is ignored since in status:%s", vnode, sid, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus)); - } + dnodeProcessShellSubmitRequest(pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj); } else { dError("%s is not processed", taosMsg[pMsg->msgType]); } @@ -128,17 +104,13 @@ void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) { } int32_t dnodeInitShell() { - int size; SRpcInit rpcInit; - size = TSDB_MAX_VNODES * sizeof(SShellObj *); - shellList = (SShellObj **)malloc(size); - if (shellList == NULL) return -1; - memset(shellList, 0, size); - int numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore; numOfThreads = (1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0; - if (numOfThreads < 1) numOfThreads = 1; + if (numOfThreads < 1) { + numOfThreads = 1; + } memset(&rpcInit, 0, sizeof(rpcInit)); @@ -147,92 +119,51 @@ int32_t dnodeInitShell() { rpcInit.localPort = tsVnodeShellPort; rpcInit.label = "DND-shell"; rpcInit.numOfThreads = numOfThreads; - rpcInit.fp = vnodeProcessMsgFromShell; + rpcInit.fp = dnodeProcessMsgFromShell; rpcInit.bits = TSDB_SHELL_VNODE_BITS; rpcInit.numOfChanns = TSDB_MAX_VNODES; rpcInit.sessionsPerChann = 16; rpcInit.idMgmt = TAOS_ID_FREE; rpcInit.connType = TAOS_CONN_SOCKET_TYPE_S(); rpcInit.idleTime = tsShellActivityTimer * 2000; - rpcInit.qhandle = rpcQhandle[0]; - rpcInit.efp = vnodeSendVpeerCfgMsg; + rpcInit.qhandle = tsRpcQhandle[0]; + //rpcInit.efp = vnodeSendVpeerCfgMsg; - pShellServer = taosOpenRpc(&rpcInit); - if (pShellServer == NULL) { + tsDnodeShellServer = taosOpenRpc(&rpcInit); + if (tsDnodeShellServer == NULL) { dError("failed to init connection to shell"); return -1; } - return 0; -} - -int vnodeOpenShellVnode(int vnode) { - if (shellList[vnode] != NULL) { - dError("vid:%d, shell is already opened", vnode); - return -1; - } - - const int32_t MIN_NUM_OF_SESSIONS = 300; - SVnodeCfg *pCfg = &vnodeList[vnode].cfg; - int32_t sessions = (int32_t) MAX(pCfg->maxSessions * 1.1, MIN_NUM_OF_SESSIONS); - - size_t size = sessions * sizeof(SShellObj); - shellList[vnode] = (SShellObj *)calloc(1, size); - if (shellList[vnode] == NULL) { - dError("vid:%d, sessions:%d, failed to allocate shellObj, size:%d", vnode, pCfg->maxSessions, size); + const int32_t size = NUM_OF_SESSIONS_PER_DNODE * sizeof(SShellObj); + tsDnodeShellList = (SShellObj *)malloc(size); + if (tsDnodeShellList == NULL) { + dError("failed to allocate shellObj, sessions:%d", NUM_OF_SESSIONS_PER_DNODE); return -1; } + memset(tsDnodeShellList, 0, size); - if(taosOpenRpcChannWithQ(pShellServer, vnode, sessions, rpcQhandle[(vnode+1)%tsMaxQueues]) != TSDB_CODE_SUCCESS) { - dError("vid:%d, sessions:%d, failed to open shell", vnode, pCfg->maxSessions); + // TODO re initialize tsRpcQhandle + if(taosOpenRpcChannWithQ(tsDnodeShellServer, 0, NUM_OF_SESSIONS_PER_DNODE, tsRpcQhandle) != TSDB_CODE_SUCCESS) { + dError("sessions:%d, failed to open shell", NUM_OF_SESSIONS_PER_DNODE); return -1; } - dPrint("vid:%d, sessions:%d, shell is opened", vnode, pCfg->maxSessions); + dError("sessions:%d, shell is opened", NUM_OF_SESSIONS_PER_DNODE); return TSDB_CODE_SUCCESS; } -static void vnodeDelayedFreeResource(void *param, void *tmrId) { - int32_t vnode = *(int32_t*) param; - dTrace("vid:%d, start to free resources for 500ms arrived", vnode); - - taosCloseRpcChann(pShellServer, vnode); // close connection - tfree(shellList[vnode]); //free SShellObj - tfree(param); - - memset(vnodeList + vnode, 0, sizeof(SVnodeObj)); - dTrace("vid:%d, status set to %s", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus)); - - vnodeCalcOpenVnodes(); -} - -void vnodeCloseShellVnode(int vnode) { - if (shellList[vnode] == NULL) return; - - for (int i = 0; i < vnodeList[vnode].cfg.maxSessions; ++i) { - void* qhandle = shellList[vnode][i].qhandle; - if (qhandle != NULL) { - vnodeDecRefCount(qhandle); - } +void dnodeCleanupShell() { + if (tsDnodeShellServer) { + taosCloseRpc(tsDnodeShellServer); } - int32_t* v = malloc(sizeof(int32_t)); - *v = vnode; - - /* - * free the connection related resource after 5sec. - * 1. The msg, as well as SRpcConn may be in the task queue, free it immediate will cause crash - * 2. Free connection may cause *(SRpcConn*)pObj->thandle to be invalid to access. - */ - dTrace("vid:%d, free resources in 500ms", vnode); - taosTmrStart(vnodeDelayedFreeResource, 500, v, vnodeTmrCtrl); -} - -void vnodeCleanUpShell() { - if (pShellServer) taosCloseRpc(pShellServer); + for (int i = 0; i < NUM_OF_SESSIONS_PER_DNODE; ++i) { + dnodeFreeQInfoInQueue(tsDnodeShellList+i); + } - tfree(shellList); + //tfree(tsDnodeShellList); } int vnodeSendQueryRspMsg(SShellObj *pObj, int code, void *qhandle) { @@ -255,7 +186,7 @@ int vnodeSendQueryRspMsg(SShellObj *pObj, int code, void *qhandle) { return msgLen; } -int vnodeSendShellSubmitRspMsg(SShellObj *pObj, int code, int numOfPoints) { +int32_t dnodeSendShellSubmitRspMsg(SShellObj *pObj, int32_t code, int32_t numOfPoints) { char *pMsg, *pStart; int msgLen; @@ -395,344 +326,73 @@ _query_over: vnodeFreeColumnInfo(&pQueryMsg->colList[i]); } - atomic_fetch_add_32(&dnodeSelectReqNum, 1); + atomic_fetch_add_32(&tsDnodeSelectReqNum, 1); return ret; } void vnodeExecuteRetrieveReq(SSchedMsg *pSched) { - char * pMsg = pSched->msg; - int msgLen; - SShellObj *pObj = (SShellObj *)pSched->ahandle; - - SRetrieveMeterMsg *pRetrieve; - SRetrieveMeterRsp *pRsp; - int numOfRows = 0, rowSize = 0, size = 0; - int16_t timePrec = TSDB_TIME_PRECISION_MILLI; - - char *pStart; - - int code = 0; - pRetrieve = (SRetrieveMeterMsg *)pMsg; - SQInfo* pQInfo = (SQInfo*)pRetrieve->qhandle; - pRetrieve->free = htons(pRetrieve->free); - - if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) { - dTrace("retrieve msg, handle:%p, free:%d", pRetrieve->qhandle, pRetrieve->free); - } else { - dTrace("retrieve msg to free resource from client, handle:%p, free:%d", pRetrieve->qhandle, pRetrieve->free); - } - - /* - * in case of server restart, apps may hold qhandle created by server before restart, - * which is actually invalid, therefore, signature check is required. - */ - if (pRetrieve->qhandle == (uint64_t)pObj->qhandle) { - // if free flag is set, client wants to clean the resources - if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) { - code = vnodeRetrieveQueryInfo((void *)(pRetrieve->qhandle), &numOfRows, &rowSize, &timePrec); - } - } else { - dError("QInfo:%p, qhandle:%p is not matched with saved:%p", pObj->qhandle, pRetrieve->qhandle, pObj->qhandle); - code = TSDB_CODE_INVALID_QHANDLE; - } - if (code == TSDB_CODE_SUCCESS) { - size = vnodeGetResultSize((void *)(pRetrieve->qhandle), &numOfRows); - - // buffer size for progress information, including meter count, - // and for each meter, including 'uid' and 'TSKEY'. - int progressSize = 0; - if (pQInfo->pMeterQuerySupporter != NULL) - progressSize = pQInfo->pMeterQuerySupporter->numOfMeters * (sizeof(int64_t) + sizeof(TSKEY)) + sizeof(int32_t); - else if (pQInfo->pObj != NULL) - progressSize = sizeof(int64_t) + sizeof(TSKEY) + sizeof(int32_t); - - pStart = taosBuildRspMsgWithSize(pObj->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, progressSize + size + 100); - if (pStart == NULL) { - taosSendSimpleRsp(pObj->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY); - goto _exit; - } - } - - pMsg = pStart; - - *pMsg = code; - pMsg++; - - pRsp = (SRetrieveMeterRsp *)pMsg; - pRsp->numOfRows = htonl(numOfRows); - pRsp->precision = htons(timePrec); - - if (code == TSDB_CODE_SUCCESS) { - pRsp->offset = htobe64(vnodeGetOffsetVal((void*)pRetrieve->qhandle)); - pRsp->useconds = htobe64(((SQInfo *)(pRetrieve->qhandle))->useconds); - } else { - pRsp->offset = 0; - pRsp->useconds = 0; - } - - pMsg = pRsp->data; - - if (numOfRows > 0 && code == TSDB_CODE_SUCCESS) { - vnodeSaveQueryResult((void *)(pRetrieve->qhandle), pRsp->data, &size); - } - - pMsg += size; - - // write the progress information of each meter to response - // this is required by subscriptions - if (numOfRows > 0 && code == TSDB_CODE_SUCCESS) { - if (pQInfo->pMeterQuerySupporter != NULL && pQInfo->pMeterQuerySupporter->pMeterSidExtInfo != NULL) { - *((int32_t *)pMsg) = htonl(pQInfo->pMeterQuerySupporter->numOfMeters); - pMsg += sizeof(int32_t); - for (int32_t i = 0; i < pQInfo->pMeterQuerySupporter->numOfMeters; i++) { - *((int64_t *)pMsg) = htobe64(pQInfo->pMeterQuerySupporter->pMeterSidExtInfo[i]->uid); - pMsg += sizeof(int64_t); - *((TSKEY *)pMsg) = htobe64(pQInfo->pMeterQuerySupporter->pMeterSidExtInfo[i]->key); - pMsg += sizeof(TSKEY); - } - } else if (pQInfo->pObj != NULL) { - *((int32_t *)pMsg) = htonl(1); - pMsg += sizeof(int32_t); - *((int64_t *)pMsg) = htobe64(pQInfo->pObj->uid); - pMsg += sizeof(int64_t); - if (pQInfo->pointsRead > 0) { - *((TSKEY *)pMsg) = htobe64(pQInfo->query.lastKey + 1); - } else { - *((TSKEY *)pMsg) = htobe64(pQInfo->query.lastKey); - } - pMsg += sizeof(TSKEY); - } - } - - msgLen = pMsg - pStart; - - assert(code != TSDB_CODE_ACTION_IN_PROGRESS); - - if (numOfRows == 0 && (pRetrieve->qhandle == (uint64_t)pObj->qhandle) && (code != TSDB_CODE_ACTION_IN_PROGRESS) && - pRetrieve->qhandle != 0) { - dTrace("QInfo:%p %s free qhandle code:%d", pObj->qhandle, __FUNCTION__, code); - vnodeDecRefCount(pObj->qhandle); - pObj->qhandle = NULL; - } - - taosSendMsgToPeer(pObj->thandle, pStart, msgLen); - -_exit: - free(pSched->msg); -} - -int vnodeProcessRetrieveRequest(char *pMsg, int msgLen, SShellObj *pObj) { - SSchedMsg schedMsg; - - char *msg = malloc(msgLen); - memcpy(msg, pMsg, msgLen); - schedMsg.msg = msg; - schedMsg.ahandle = pObj; - schedMsg.fp = vnodeExecuteRetrieveReq; - taosScheduleTask(queryQhandle, &schedMsg); - - return msgLen; } -static int vnodeCheckSubmitBlockContext(SShellSubmitBlock *pBlocks, SVnodeObj *pVnode) { - int32_t sid = htonl(pBlocks->sid); - uint64_t uid = htobe64(pBlocks->uid); - - if (sid >= pVnode->cfg.maxSessions || sid <= 0) { - dError("vid:%d sid:%d, sid is out of range", pVnode->vnode, sid); - return TSDB_CODE_INVALID_TABLE_ID; - } - - SMeterObj *pMeterObj = pVnode->meterList[sid]; - if (pMeterObj == NULL) { - dError("vid:%d sid:%d, not active table", pVnode->vnode, sid); - vnodeSendMeterCfgMsg(pVnode->vnode, sid); - return TSDB_CODE_NOT_ACTIVE_TABLE; +void dnodeProcessRetrieveRequestCb(int code, SRetrieveMeterRsp *result, SShellObj *pObj) { + if (pObj == NULL || result == NULL || code == TSDB_CODE_ACTION_IN_PROGRESS) { + return; } - - if (pMeterObj->uid != uid) { - dError("vid:%d sid:%d id:%s, uid:%" PRIu64 ", uid in msg:%" PRIu64 ", uid mismatch", pVnode->vnode, sid, pMeterObj->meterId, - pMeterObj->uid, uid); - return TSDB_CODE_INVALID_SUBMIT_MSG; - } - - return TSDB_CODE_SUCCESS; } -static int vnodeDoSubmitJob(SVnodeObj *pVnode, int import, int32_t *ssid, int32_t esid, SShellSubmitBlock **ppBlocks, - TSKEY now, SShellObj *pObj) { - SShellSubmitBlock *pBlocks = *ppBlocks; - int code = TSDB_CODE_SUCCESS; - int32_t numOfPoints = 0; - int32_t i = 0; - SShellSubmitBlock tBlock; - - for (i = *ssid; i < esid; i++) { - numOfPoints = 0; - tBlock = *pBlocks; - - code = vnodeCheckSubmitBlockContext(pBlocks, pVnode); - if (code != TSDB_CODE_SUCCESS) break; - - SMeterObj *pMeterObj = (SMeterObj *)(pVnode->meterList[htonl(pBlocks->sid)]); - - // dont include sid, vid - int32_t subMsgLen = sizeof(pBlocks->numOfRows) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint; - int32_t sversion = htonl(pBlocks->sversion); - - if (import) { - code = vnodeImportPoints(pMeterObj, (char *)&(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj, - sversion, &numOfPoints, now); - pObj->numOfTotalPoints += numOfPoints; - - // records for one table should be consecutive located in the payload buffer, which is guaranteed by client - if (code == TSDB_CODE_SUCCESS) { - pObj->count--; - } - } else { - code = vnodeInsertPoints(pMeterObj, (char *)&(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, NULL, - sversion, &numOfPoints, now); - pObj->numOfTotalPoints += numOfPoints; - } - - if (code != TSDB_CODE_SUCCESS) break; - - pBlocks = (SShellSubmitBlock *)((char *)pBlocks + sizeof(SShellSubmitBlock) + - htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint); - } - - *ssid = i; - *ppBlocks = pBlocks; - /* Since the pBlock part can be changed by the vnodeForwardToPeer interface, - * which is also possible to be used again. For that case, we just copy the original - * block content back. - */ - if (import && (code == TSDB_CODE_ACTION_IN_PROGRESS)) { - memcpy((void *)pBlocks, (void *)&tBlock, sizeof(SShellSubmitBlock)); - } - - return code; +static void dnodeProcessRetrieveRequest(int8_t *pMsg, int32_t msgLen, SShellObj *pObj) { + SRetrieveMeterMsg *pRetrieve = (SRetrieveMeterMsg *) pMsg; + dnodeRetrieveData(pRetrieve, msgLen, pObj, dnodeProcessRetrieveRequestCb); } -int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) { - int code = 0, ret = 0; - int32_t i = 0; - SShellSubmitMsg shellSubmit = *(SShellSubmitMsg *)pMsg; - SShellSubmitMsg *pSubmit = &shellSubmit; - SShellSubmitBlock *pBlocks = NULL; - - pSubmit->import = htons(pSubmit->import); - pSubmit->vnode = htons(pSubmit->vnode); - pSubmit->numOfSid = htonl(pSubmit->numOfSid); - - if (pSubmit->numOfSid <= 0) { - dError("invalid num of meters:%d", pSubmit->numOfSid); - code = TSDB_CODE_INVALID_QUERY_MSG; - goto _submit_over; +void dnodeProcessShellSubmitRequestCb(SShellSubmitRspMsg *result, void *pObj) { + if (pObj == NULL || result == NULL || result->code == TSDB_CODE_ACTION_IN_PROGRESS) { + return; } - if (pSubmit->vnode >= TSDB_MAX_VNODES || pSubmit->vnode < 0) { - dTrace("vnode:%d is out of range", pSubmit->vnode); - code = TSDB_CODE_INVALID_VNODE_ID; - goto _submit_over; + SShellObj *pShellObj = (SShellObj *) pObj; + int32_t msgLen = sizeof(SShellSubmitRspMsg) + result->numOfFailedBlocks * sizeof(SShellSubmitRspBlock); + SShellSubmitRspMsg *submitRsp = (SShellSubmitRspMsg *) taosBuildRspMsgWithSize(pShellObj->thandle, + TSDB_MSG_TYPE_SUBMIT_RSP, msgLen); + if (submitRsp == NULL) { + return; } - SVnodeObj *pVnode = vnodeList + pSubmit->vnode; - if (pVnode->cfg.maxSessions == 0 || pVnode->meterList == NULL) { - dError("vid:%d is not activated for submit", pSubmit->vnode); - vnodeSendVpeerCfgMsg(pSubmit->vnode); - code = TSDB_CODE_NOT_ACTIVE_VNODE; - goto _submit_over; - } + dTrace("code:%d, numOfRows:%d affectedRows:%d", result->code, result->numOfRows, result->affectedRows); + memcpy(submitRsp, result, msgLen); - if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) { - code = TSDB_CODE_NO_WRITE_ACCESS; - goto _submit_over; - } - - if (tsAvailDataDirGB < tsMinimalDataDirGB) { - dError("server disk space remain %.3f GB, need at least %.3f GB, stop writing", tsAvailDataDirGB, tsMinimalDataDirGB); - code = TSDB_CODE_SERV_NO_DISKSPACE; - goto _submit_over; - } - - pObj->count = pSubmit->numOfSid; // for import - pObj->code = 0; // for import - pObj->numOfTotalPoints = 0; - - TSKEY now = taosGetTimestamp(pVnode->cfg.precision); - - pBlocks = (SShellSubmitBlock *)(pMsg + sizeof(SShellSubmitMsg)); - i = 0; - code = vnodeDoSubmitJob(pVnode, pSubmit->import, &i, pSubmit->numOfSid, &pBlocks, now, pObj); - -_submit_over: - ret = 0; - if (pSubmit->import) { // Import case - if (code == TSDB_CODE_ACTION_IN_PROGRESS) { - - SBatchSubmitInfo *pSubmitInfo = - (SBatchSubmitInfo *)calloc(1, sizeof(SBatchSubmitInfo) + msgLen - sizeof(SShellSubmitMsg)); - if (pSubmitInfo == NULL) { - code = TSDB_CODE_SERV_OUT_OF_MEMORY; - ret = vnodeSendShellSubmitRspMsg(pObj, code, pObj->numOfTotalPoints); - } else { // Start a timer to process the next part of request - pSubmitInfo->import = 1; - pSubmitInfo->vnode = pSubmit->vnode; - pSubmitInfo->numOfSid = pSubmit->numOfSid; - pSubmitInfo->ssid = i; // start from this position, not the initial position - pSubmitInfo->pObj = pObj; - pSubmitInfo->offset = ((char *)pBlocks) - (pMsg + sizeof(SShellSubmitMsg)); - assert(pSubmitInfo->offset >= 0); - memcpy((void *)(pSubmitInfo->blks), (void *)(pMsg + sizeof(SShellSubmitMsg)), msgLen - sizeof(SShellSubmitMsg)); - taosTmrStart(vnodeProcessBatchSubmitTimer, 10, (void *)pSubmitInfo, vnodeTmrCtrl); - } - } else { - if (code == TSDB_CODE_SUCCESS) assert(pObj->count == 0); - ret = vnodeSendShellSubmitRspMsg(pObj, code, pObj->numOfTotalPoints); + for (int i = 0; i < submitRsp->numOfFailedBlocks; ++i) { + SShellSubmitRspBlock *block = &submitRsp->failedBlocks[i]; + if (block->code == TSDB_CODE_NOT_ACTIVE_VNODE || block->code == TSDB_CODE_INVALID_VNODE_ID) { + dnodeSendVpeerCfgMsg(block->vnode); + } else if (block->code == TSDB_CODE_INVALID_TABLE_ID || block->code == TSDB_CODE_NOT_ACTIVE_TABLE) { + dnodeSendMeterCfgMsg(block->vnode, block->sid); } - } else { // Insert case - ret = vnodeSendShellSubmitRspMsg(pObj, code, pObj->numOfTotalPoints); + block->vnode = htonl(block->vnode); + block->sid = htonl(block->sid); + block->code = htonl(block->code); } + submitRsp->code = htonl(submitRsp->code); + submitRsp->numOfRows = htonl(submitRsp->numOfRows); + submitRsp->affectedRows = htonl(submitRsp->affectedRows); + submitRsp->failedRows = htonl(submitRsp->failedRows); + submitRsp->numOfFailedBlocks = htonl(submitRsp->numOfFailedBlocks); - atomic_fetch_add_32(&dnodeInsertReqNum, 1); - return ret; + taosSendMsgToPeer(pShellObj->thandle, (int8_t*)submitRsp, msgLen); } -static void vnodeProcessBatchSubmitTimer(void *param, void *tmrId) { - SBatchSubmitInfo *pSubmitInfo = (SBatchSubmitInfo *)param; - assert(pSubmitInfo != NULL && pSubmitInfo->import); - - int32_t i = 0; - int32_t code = TSDB_CODE_SUCCESS; - - SShellObj * pShell = pSubmitInfo->pObj; - SVnodeObj * pVnode = &vnodeList[pSubmitInfo->vnode]; - SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(pSubmitInfo->blks + pSubmitInfo->offset); - TSKEY now = taosGetTimestamp(pVnode->cfg.precision); - i = pSubmitInfo->ssid; - - code = vnodeDoSubmitJob(pVnode, pSubmitInfo->import, &i, pSubmitInfo->numOfSid, &pBlocks, now, pShell); - - if (code == TSDB_CODE_ACTION_IN_PROGRESS) { - pSubmitInfo->ssid = i; - pSubmitInfo->offset = ((char *)pBlocks) - pSubmitInfo->blks; - taosTmrStart(vnodeProcessBatchSubmitTimer, 10, (void *)pSubmitInfo, vnodeTmrCtrl); - } else { - if (code == TSDB_CODE_SUCCESS) assert(pShell->count == 0); - tfree(param); - vnodeSendShellSubmitRspMsg(pShell, code, pShell->numOfTotalPoints); - } +static void dnodeProcessShellSubmitRequest(int8_t *pMsg, int32_t msgLen, SShellObj *pObj) { + SShellSubmitMsg *pSubmit = (SShellSubmitMsg *) pMsg; + dnodeWriteData(pSubmit, pObj, dnodeProcessShellSubmitRequestCb); + atomic_fetch_add_32(&tsDnodeInsertReqNum, 1); } - SDnodeStatisInfo dnodeGetStatisInfo() { SDnodeStatisInfo info = {0}; if (dnodeGetRunStatus() == TSDB_DNODE_RUN_STATUS_RUNING) { - info.httpReqNum = httpGetReqCount(); - info.selectReqNum = atomic_exchange_32(&dnodeSelectReqNum, 0); - info.insertReqNum = atomic_exchange_32(&dnodeInsertReqNum, 0); + info.httpReqNum = httpGetReqCount(); + info.selectReqNum = atomic_exchange_32(&tsDnodeSelectReqNum, 0); + info.insertReqNum = atomic_exchange_32(&tsDnodeInsertReqNum, 0); } return info; diff --git a/src/dnode/src/dnodeSystem.c b/src/dnode/src/dnodeSystem.c index 600a0c38d7f9867a7e762f2ad9225adf2abc38f7..3d77cf2e83118adb2f432ad30e15773b96ddeb76 100644 --- a/src/dnode/src/dnodeSystem.c +++ b/src/dnode/src/dnodeSystem.c @@ -52,9 +52,9 @@ static int32_t dnodeInitTmrCtl(); void *tsStatusTimer = NULL; void *vnodeTmrCtrl; -void **rpcQhandle; +void **tsRpcQhandle; void *dmQhandle; -void *queryQhandle; +void *tsQueryQhandle; int32_t tsVnodePeers = TSDB_VNODES_SUPPORT - 1; int32_t tsMaxQueues; uint32_t tsRebootTime; @@ -95,6 +95,7 @@ void dnodeCleanUpSystem() { tsStatusTimer = NULL; } + dnodeCleanupShell(); dnodeCleanUpModules(); dnodeCleanupVnodes(); taosCloseLogger(); @@ -269,7 +270,7 @@ static int32_t dnodeInitQueryQHandle() { int32_t maxQueueSize = tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode; dTrace("query task queue initialized, max slot:%d, task threads:%d", maxQueueSize, numOfThreads); - queryQhandle = taosInitSchedulerWithInfo(maxQueueSize, numOfThreads, "query", vnodeTmrCtrl); + tsQueryQhandle = taosInitSchedulerWithInfo(maxQueueSize, numOfThreads, "query", vnodeTmrCtrl); return 0; } @@ -291,10 +292,10 @@ static int32_t dnodeInitRpcQHandle() { tsMaxQueues = 1; } - rpcQhandle = malloc(tsMaxQueues * sizeof(void *)); + tsRpcQhandle = malloc(tsMaxQueues * sizeof(void *)); for (int32_t i = 0; i < tsMaxQueues; ++i) { - rpcQhandle[i] = taosInitScheduler(tsSessionsPerVnode, 1, "dnode"); + tsRpcQhandle[i] = taosInitScheduler(tsSessionsPerVnode, 1, "dnode"); } dmQhandle = taosInitScheduler(tsSessionsPerVnode, 1, "mgmt"); diff --git a/src/dnode/src/dnodeUtil.c b/src/dnode/src/dnodeUtil.c new file mode 100644 index 0000000000000000000000000000000000000000..b1d2fc0cb6ec33279bc7543821561b4f2c58563e --- /dev/null +++ b/src/dnode/src/dnodeUtil.c @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "dnodeUtil.h" + +EVnodeStatus dnodeGetVnodeStatus(int32_t vnode) { + return TSDB_VN_STATUS_MASTER; +} + +bool dnodeCheckVnodeExist(int32_t vnode) { + return true; +} diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index 268c843d0d0df53ee558f0565abb4aa05bcdb0c3..75ef1be7413588952288333a07ba0995493a051b 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -14,15 +14,26 @@ */ #define _DEFAULT_SOURCE -#include "dnodeWrite.h" +#include "os.h" #include "taoserror.h" +#include "tlog.h" +#include "dnodeWrite.h" int32_t dnodeCheckTableExist(char *tableId) { return 0; } -int32_t dnodeWriteData(SShellSubmitMsg *msg) { - return 0; +void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pShellObj, void (*callback)(SShellSubmitRspMsg *, void *)) { + SShellSubmitRspMsg result = {0}; + + int32_t numOfSid = htonl(pSubmit->numOfSid); + if (numOfSid <= 0) { + dError("invalid num of tables:%d", numOfSid); + result.code = TSDB_CODE_INVALID_QUERY_MSG; + callback(&result, pShellObj); + } + + //TODO: submit implementation } int32_t dnodeCreateNormalTable(SCreateNormalTableMsg *table) { diff --git a/src/inc/http.h b/src/inc/http.h index d77f27d3eab662ed98a4d70e6ed091d6809b05d2..9ef36560e13e91e8d8652a9233ef160c6ede72c2 100644 --- a/src/inc/http.h +++ b/src/inc/http.h @@ -16,6 +16,10 @@ #ifndef TDENGINE_HTTP_H #define TDENGINE_HTTP_H +#ifdef __cplusplus +extern "C" { +#endif + #include "tglobalcfg.h" #include "tlog.h" @@ -44,4 +48,8 @@ int32_t httpGetReqCount(); +#ifdef __cplusplus +} +#endif + #endif diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index a180a963d8e4497ed903c9d545166f4a47b817c9..2c3d6da6f8f419bbbb93e4021b690f2cafe931ee 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -271,6 +271,7 @@ typedef struct { } SSubmitMsg; typedef struct { + int32_t vnode; int32_t sid; int32_t sversion; uint64_t uid; @@ -279,12 +280,28 @@ typedef struct { } SShellSubmitBlock; typedef struct { - short import; - short vnode; + int8_t import; + int8_t reserved[3]; int32_t numOfSid; /* total number of sid */ char blks[]; /* numOfSid blocks, each blocks for one meter */ } SShellSubmitMsg; +typedef struct { + int32_t vnode; // vnode index of failed block + int32_t sid; // table index of failed block + int32_t code; // errorcode while write data to vnode, such as not created, dropped, no space, invalid table +} SShellSubmitRspBlock; + +typedef struct { + int32_t code; // 0-success, 1-inprogress, > 1 error code + int32_t numOfRows; // number of records the client is trying to write + int32_t affectedRows; // number of records actually written + int32_t failedRows; // number of failed records (exclude duplicate records) + int32_t numOfFailedBlocks; + SShellSubmitRspBlock *failedBlocks; +} SShellSubmitRspMsg; + + typedef struct SSchema { uint8_t type; char name[TSDB_COL_NAME_LEN]; diff --git a/src/mnode/src/mgmtSystem.c b/src/mnode/src/mgmtSystem.c index f63215ce1b192316c7b314e75788af8fcdb5495a..5148975c95971e8a7a7c997e259c058ece8498f5 100644 --- a/src/mnode/src/mgmtSystem.c +++ b/src/mnode/src/mgmtSystem.c @@ -40,7 +40,7 @@ void * mgmtStatisticTimer = NULL; int mgmtShellConns = 0; int mgmtDnodeConns = 0; extern void * pShellConn; -extern void ** rpcQhandle; +extern void ** tsRpcQhandle; extern SMgmtIpList mgmtIpList; extern SMgmtIpList mgmtPublicIpList; extern char mgmtIpStr[TSDB_MAX_MGMT_IPS][20]; diff --git a/src/util/inc/tstatus.h b/src/util/inc/tstatus.h index 8e2f82e024a0560cea5570b9ebd11292b875ecd0..03a882e2836ec4a5614d601e3d54963c73891d3c 100644 --- a/src/util/inc/tstatus.h +++ b/src/util/inc/tstatus.h @@ -40,7 +40,7 @@ enum _TSDB_DB_STATUS { TSDB_DB_STATUS_DROP_FROM_SDB }; -enum _TSDB_VN_STATUS { +typedef enum _TSDB_VN_STATUS { TSDB_VN_STATUS_OFFLINE, TSDB_VN_STATUS_CREATING, TSDB_VN_STATUS_UNSYNCED, @@ -48,7 +48,7 @@ enum _TSDB_VN_STATUS { TSDB_VN_STATUS_MASTER, TSDB_VN_STATUS_CLOSING, TSDB_VN_STATUS_DELETING, -}; +} EVnodeStatus; enum _TSDB_VN_SYNC_STATUS { TSDB_VN_SYNC_STATUS_INIT, diff --git a/src/vnode/detail/inc/vnode.h b/src/vnode/detail/inc/vnode.h index 3b502a9f7995e1c329c36fa11827e4ce2268efd1..b3cb67b34bd84d1b13ea5610562d8d10e0a611c5 100644 --- a/src/vnode/detail/inc/vnode.h +++ b/src/vnode/detail/inc/vnode.h @@ -302,9 +302,8 @@ typedef struct { // internal globals extern int tsMeterSizeOnFile; -extern void ** rpcQhandle; -extern void * queryQhandle; +extern void * tsQueryQhandle; extern int tsVnodePeers; extern int tsMaxVnode; extern int tsMaxQueues; diff --git a/src/vnode/detail/src/vnodeRead.c b/src/vnode/detail/src/vnodeRead.c index 71dd088ae97b8110c85cc3733c6500ddc52e06f4..e4a8b898ec3266c0a89b24357a4eec08632644a1 100644 --- a/src/vnode/detail/src/vnodeRead.c +++ b/src/vnode/detail/src/vnodeRead.c @@ -696,7 +696,7 @@ void *vnodeQueryOnSingleTable(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE dTrace("QInfo:%p set query flag and prepare runtime environment completed, ref:%d, wait for schedule", pQInfo, pQInfo->refCount); - taosScheduleTask(queryQhandle, &schedMsg); + taosScheduleTask(tsQueryQhandle, &schedMsg); return pQInfo; _error: @@ -812,7 +812,7 @@ void *vnodeQueryOnMultiMeters(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE dTrace("QInfo:%p set query flag and prepare runtime environment completed, wait for schedule", pQInfo); - taosScheduleTask(queryQhandle, &schedMsg); + taosScheduleTask(tsQueryQhandle, &schedMsg); return pQInfo; _error: @@ -912,7 +912,7 @@ int vnodeSaveQueryResult(void *handle, char *data, int32_t *size) { schedMsg.msg = NULL; schedMsg.thandle = (void *)1; schedMsg.ahandle = pQInfo; - taosScheduleTask(queryQhandle, &schedMsg); + taosScheduleTask(tsQueryQhandle, &schedMsg); } }