dnodeShell.c 8.6 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
17
#include "os.h"
S
slguan 已提交
18 19
#include "taoserror.h"
#include "taosdef.h"
H
hzcheng 已提交
20
#include "taosmsg.h"
S
slguan 已提交
21
#include "tglobal.h"
22
#include "tutil.h"
S
slguan 已提交
23
#include "http.h"
S
Shengliang Guan 已提交
24
#include "mnode.h"
S
slguan 已提交
25
#include "dnode.h"
26 27 28
#include "dnodeInt.h"
#include "dnodeVRead.h"
#include "dnodeVWrite.h"
S
Shengliang Guan 已提交
29 30
#include "dnodeMRead.h"
#include "dnodeMWrite.h"
S
slguan 已提交
31
#include "dnodeShell.h"
S
slguan 已提交
32

S
slguan 已提交
33
static void  (*dnodeProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
34
static void    dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *);
S
slguan 已提交
35 36
static int     dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey);
static void  * tsDnodeShellRpc = NULL;
S
slguan 已提交
37 38
static int32_t tsDnodeQueryReqNum  = 0;
static int32_t tsDnodeSubmitReqNum = 0;
H
hzcheng 已提交
39

S
slguan 已提交
40
int32_t dnodeInitShell() {
41 42 43
  dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT]         = dnodeDispatchToVnodeWriteQueue;
  dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY]          = dnodeDispatchToVnodeReadQueue;
  dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH]          = dnodeDispatchToVnodeReadQueue;
44 45
  dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVnodeWriteQueue;
  
46
  // the following message shall be treated as mnode write
S
Shengliang Guan 已提交
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
  dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = dnodeDispatchToMnodeWriteQueue;
  dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT]  = dnodeDispatchToMnodeWriteQueue;
  dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT]   = dnodeDispatchToMnodeWriteQueue; 
  dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = dnodeDispatchToMnodeWriteQueue; 
  dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER]  = dnodeDispatchToMnodeWriteQueue;
  dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER]   = dnodeDispatchToMnodeWriteQueue;
  dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE]= dnodeDispatchToMnodeWriteQueue;
  dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE]  = dnodeDispatchToMnodeWriteQueue;
  dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB]   = dnodeDispatchToMnodeWriteQueue;
  dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB]     = dnodeDispatchToMnodeWriteQueue;
  dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB]    = dnodeDispatchToMnodeWriteQueue;
  dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE]= dnodeDispatchToMnodeWriteQueue;
  dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE]  = dnodeDispatchToMnodeWriteQueue;
  dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE] = dnodeDispatchToMnodeWriteQueue;
  dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM]= dnodeDispatchToMnodeWriteQueue;
  dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY]  = dnodeDispatchToMnodeWriteQueue;
  dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = dnodeDispatchToMnodeWriteQueue;
  dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN]   = dnodeDispatchToMnodeWriteQueue;
65 66
  dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE]= dnodeDispatchToMnodeWriteQueue;
  
67
  // the following message shall be treated as mnode query 
68
  dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT]   = dnodeDispatchToMnodeReadQueue;
S
Shengliang Guan 已提交
69 70 71 72 73 74 75
  dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONNECT]     = dnodeDispatchToMnodeReadQueue;
  dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_USE_DB]      = dnodeDispatchToMnodeReadQueue;
  dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META]  = dnodeDispatchToMnodeReadQueue; 
  dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP]= dnodeDispatchToMnodeReadQueue;   
  dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = dnodeDispatchToMnodeReadQueue;
  dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SHOW]        = dnodeDispatchToMnodeReadQueue;
  dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE]    = dnodeDispatchToMnodeReadQueue;
76

S
slguan 已提交
77
  int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore;
S
slguan 已提交
78
  numOfThreads = (int32_t) ((1.0 - tsRatioOfQueryThreads) * numOfThreads / 2.0);
S
slguan 已提交
79 80 81
  if (numOfThreads < 1) {
    numOfThreads = 1;
  }
H
hzcheng 已提交
82

S
slguan 已提交
83
  SRpcInit rpcInit;
H
hzcheng 已提交
84
  memset(&rpcInit, 0, sizeof(rpcInit));
J
jtao1735 已提交
85
  rpcInit.localPort    = tsDnodeShellPort;
J
jtao1735 已提交
86
  rpcInit.label        = "SHELL";
H
hzcheng 已提交
87
  rpcInit.numOfThreads = numOfThreads;
S
slguan 已提交
88
  rpcInit.cfp          = dnodeProcessMsgFromShell;
89
  rpcInit.sessions     = tsMaxShellConns;
S
slguan 已提交
90
  rpcInit.connType     = TAOS_CONN_SERVER;
91
  rpcInit.idleTime     = tsShellActivityTimer * 1000;
S
slguan 已提交
92
  rpcInit.afp          = dnodeRetrieveUserAuthInfo;
H
hzcheng 已提交
93

S
slguan 已提交
94 95
  tsDnodeShellRpc = rpcOpen(&rpcInit);
  if (tsDnodeShellRpc == NULL) {
S
slguan 已提交
96
    dError("failed to init shell rpc server");
S
slguan 已提交
97 98
    return -1;
  }
H
hzcheng 已提交
99

100
  dInfo("shell rpc server is opened");
S
slguan 已提交
101
  return 0;
H
hzcheng 已提交
102 103
}

S
slguan 已提交
104
void dnodeCleanupShell() {
S
slguan 已提交
105 106
  if (tsDnodeShellRpc) {
    rpcClose(tsDnodeShellRpc);
S
slguan 已提交
107
    tsDnodeShellRpc = NULL;
H
hzcheng 已提交
108
  }
S
slguan 已提交
109 110
}

111
void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
112 113 114 115 116
  SRpcMsg rpcMsg = {
    .handle  = pMsg->handle,
    .pCont   = NULL,
    .contLen = 0
  };
S
slguan 已提交
117

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
118 119
  if (pMsg->pCont == NULL) return;

S
slguan 已提交
120
  if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) {
121
    dError("RPC %p, shell msg:%s is ignored since dnode not running", pMsg->handle, taosMsg[pMsg->msgType]);
S
Shengliang Guan 已提交
122
    rpcMsg.code = TSDB_CODE_APP_NOT_READY;
S
slguan 已提交
123 124
    rpcSendResponse(&rpcMsg);
    rpcFreeCont(pMsg->pCont);
S
slguan 已提交
125 126
    return;
  }
H
hzcheng 已提交
127

S
slguan 已提交
128 129 130 131 132 133
  if (pMsg->msgType == TSDB_MSG_TYPE_QUERY) {
    atomic_fetch_add_32(&tsDnodeQueryReqNum, 1);
  } else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
    atomic_fetch_add_32(&tsDnodeSubmitReqNum, 1);
  } else {}

S
slguan 已提交
134 135
  if ( dnodeProcessShellMsgFp[pMsg->msgType] ) {
    (*dnodeProcessShellMsgFp[pMsg->msgType])(pMsg);
S
slguan 已提交
136
  } else {
137
    dError("RPC %p, shell msg:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]);
138
    rpcMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
139 140 141
    rpcSendResponse(&rpcMsg);
    rpcFreeCont(pMsg->pCont);
    return;
H
Hongze Cheng 已提交
142
  }
H
hzcheng 已提交
143 144
}

S
slguan 已提交
145
static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
146
  int code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey);
S
Shengliang Guan 已提交
147
  if (code != TSDB_CODE_APP_NOT_READY) return code;
S
Shengliang Guan 已提交
148 149

  SDMAuthMsg *pMsg = rpcMallocCont(sizeof(SDMAuthMsg));
B
Bomin Zhang 已提交
150
  tstrncpy(pMsg->user, user, sizeof(pMsg->user));
S
Shengliang Guan 已提交
151 152 153 154 155 156

  SRpcMsg rpcMsg = {0};
  rpcMsg.pCont = pMsg;
  rpcMsg.contLen = sizeof(SDMAuthMsg);
  rpcMsg.msgType = TSDB_MSG_TYPE_DM_AUTH;
  
157
  dDebug("user:%s, send auth msg to mnode", user);
S
Shengliang Guan 已提交
158 159 160 161 162 163 164
  SRpcMsg rpcRsp = {0};
  dnodeSendMsgToDnodeRecv(&rpcMsg, &rpcRsp);

  if (rpcRsp.code != 0) {
    dError("user:%s, auth msg received from mnode, error:%s", user, tstrerror(rpcRsp.code));
  } else {
    SDMAuthRsp *pRsp = rpcRsp.pCont;
165
    dDebug("user:%s, auth msg received from mnode", user);
S
Shengliang Guan 已提交
166 167 168 169 170 171 172 173
    memcpy(secret, pRsp->secret, TSDB_KEY_LEN);
    memcpy(ckey, pRsp->ckey, TSDB_KEY_LEN);
    *spi = pRsp->spi;
    *encrypt = pRsp->encrypt;
  }

  rpcFreeCont(rpcRsp.pCont);
  return rpcRsp.code;
S
slguan 已提交
174 175
}

S
Shengliang Guan 已提交
176
void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid) {
177
  dDebug("vgId:%d, sid:%d send config table msg to mnode", vgId, sid);
S
Shengliang Guan 已提交
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199

  int32_t contLen = sizeof(SDMConfigTableMsg);
  SDMConfigTableMsg *pMsg = rpcMallocCont(contLen);

  pMsg->dnodeId = htonl(dnodeGetDnodeId());
  pMsg->vgId = htonl(vgId);
  pMsg->sid = htonl(sid);

  SRpcMsg rpcMsg = {0};
  rpcMsg.pCont = pMsg;
  rpcMsg.contLen = contLen;
  rpcMsg.msgType = TSDB_MSG_TYPE_DM_CONFIG_TABLE;

  SRpcMsg rpcRsp = {0};
  dnodeSendMsgToDnodeRecv(&rpcMsg, &rpcRsp);
  terrno = rpcRsp.code;
  
  if (rpcRsp.code != 0) {
    rpcFreeCont(rpcRsp.pCont);
    dError("vgId:%d, sid:%d failed to config table from mnode", vgId, sid);
    return NULL;
  } else {
200
    dInfo("vgId:%d, sid:%d config table msg is received", vgId, sid);
S
Shengliang Guan 已提交
201 202 203 204 205 206 207
    
    // delete this after debug finished
    SMDCreateTableMsg *pTable = rpcRsp.pCont;
    int16_t   numOfColumns = htons(pTable->numOfColumns);
    int16_t   numOfTags = htons(pTable->numOfTags);
    int32_t   sid = htonl(pTable->sid);
    uint64_t  uid = htobe64(pTable->uid);
208
    dInfo("table:%s, numOfColumns:%d numOfTags:%d sid:%d uid:%" PRIu64, pTable->tableId, numOfColumns, numOfTags, sid, uid);
S
Shengliang Guan 已提交
209 210 211 212 213

    return rpcRsp.pCont;
  }
}

S
slguan 已提交
214 215 216
SDnodeStatisInfo dnodeGetStatisInfo() {
  SDnodeStatisInfo info = {0};
  if (dnodeGetRunStatus() == TSDB_DNODE_RUN_STATUS_RUNING) {
S
slguan 已提交
217
    info.httpReqNum   = httpGetReqCount();
S
slguan 已提交
218 219 220
    info.queryReqNum  = atomic_exchange_32(&tsDnodeQueryReqNum, 0);
    info.submitReqNum = atomic_exchange_32(&tsDnodeSubmitReqNum, 0);
  }
S
slguan 已提交
221

S
slguan 已提交
222 223
  return info;
}