vnodeRead.c 7.9 KB
Newer Older
S
slguan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "os.h"
H
Haojun Liao 已提交
18 19

#include "tglobal.h"
S
slguan 已提交
20
#include "taoserror.h"
H
Haojun Liao 已提交
21 22 23
#include "taosmsg.h"
#include "tcache.h"
#include "query.h"
S
slguan 已提交
24 25 26 27 28
#include "trpc.h"
#include "tsdb.h"
#include "vnode.h"
#include "vnodeInt.h"

29 30 31
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SReadMsg *pReadMsg);
static int32_t  vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg);
static int32_t  vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg);
H
Haojun Liao 已提交
32
static int32_t  vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId);
S
slguan 已提交
33 34

void vnodeInitReadFp(void) {
J
jtao1735 已提交
35 36
  vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg;
  vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg;
S
slguan 已提交
37 38
}

39
int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
S
slguan 已提交
40
  SVnodeObj *pVnode = (SVnodeObj *)param;
41
  int msgType = pReadMsg->rpcMsg.msgType;
S
slguan 已提交
42

43
  if (vnodeProcessReadMsgFp[msgType] == NULL) {
44
    vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[msgType]);
45
    return TSDB_CODE_VND_MSG_NOT_PROCESSED;
46
  }
S
slguan 已提交
47

48
  if (pVnode->status == TAOS_VN_STATUS_DELETING || pVnode->status == TAOS_VN_STATUS_CLOSING) {
49
    vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[msgType], pVnode->status);
50
    return TSDB_CODE_VND_INVALID_VGROUP_ID; 
51
  }
S
slguan 已提交
52

H
Hui Li 已提交
53
  // TODO: Later, let slave to support query
54
  if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
55
    vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role);
H
Hui Li 已提交
56
    return TSDB_CODE_RPC_NOT_READY;
57
  }
H
Hui Li 已提交
58

59
  return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
S
slguan 已提交
60 61
}

62 63 64 65 66
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
  void *   pCont = pReadMsg->pCont;
  int32_t  contLen = pReadMsg->contLen;
  SRspRet *pRet = &pReadMsg->rspRet;

S
slguan 已提交
67 68 69
  SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pCont;
  memset(pRet, 0, sizeof(SRspRet));

H
Haojun Liao 已提交
70
  // qHandle needs to be freed correctly
H
Haojun Liao 已提交
71
  if (pReadMsg->rpcMsg.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
H
Haojun Liao 已提交
72 73 74 75
    SRetrieveTableMsg* killQueryMsg = (SRetrieveTableMsg*) pReadMsg->pCont;
    killQueryMsg->free = htons(killQueryMsg->free);
    killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle);

B
Bomin Zhang 已提交
76
    vWarn("QInfo:%p connection %p broken, kill query", (void*)killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
H
Haojun Liao 已提交
77
    assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1);
78

H
Haojun Liao 已提交
79 80
    // this message arrived here by means of the *query* message, so release the vnode is necessary
    void** qhandle = taosCacheAcquireByKey(pVnode->qHandlePool, (void*) &killQueryMsg->qhandle, sizeof(killQueryMsg->qhandle));
81 82
    if (qhandle == NULL || *qhandle == NULL) {
      vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
H
Haojun Liao 已提交
83 84 85 86 87
    } else {
      taosCacheRelease(pVnode->qHandlePool, (void**) &qhandle, true);
    }

    vnodeRelease(pVnode);
88
    return TSDB_CODE_TSC_QUERY_CANCELLED;
H
Haojun Liao 已提交
89 90 91
  }

  int32_t code = TSDB_CODE_SUCCESS;
92
  qinfo_t pQInfo = NULL;
H
Haojun Liao 已提交
93
  void**  handle = NULL;
H
Haojun Liao 已提交
94

S
slguan 已提交
95
  if (contLen != 0) {
H
Haojun Liao 已提交
96
    code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, pVnode, vnodeRelease, &pQInfo);
97

S
slguan 已提交
98
    SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
99 100
    pRsp->code    = code;
    pRsp->qhandle = 0;
101

S
slguan 已提交
102 103
    pRet->len = sizeof(SQueryTableRsp);
    pRet->rsp = pRsp;
H
Haojun Liao 已提交
104

105
    // current connect is broken
H
Haojun Liao 已提交
106 107 108 109 110 111 112
    if (code == TSDB_CODE_SUCCESS) {
      if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, pQInfo, pVnode->vgId) != TSDB_CODE_SUCCESS) {
        vError("vgId:%d, QInfo:%p, dnode query discarded since link is broken, %p", pVnode->vgId, pQInfo,
               pReadMsg->rpcMsg.handle);
        pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;

        // NOTE: there two refcount, needs to kill twice, todo refactor
H
Haojun Liao 已提交
113 114 115
        // query has not been put into qhandle pool, kill it directly.
        qKillQuery(pQInfo);
        qKillQuery(pQInfo);
H
Haojun Liao 已提交
116 117 118 119

        return pRsp->code;
      }

H
Haojun Liao 已提交
120 121
      handle = taosCachePut(pVnode->qHandlePool, pQInfo, sizeof(pQInfo), &pQInfo, sizeof(pQInfo), tsShellActivityTimer * 2);
      assert(*handle == pQInfo);
122
      pRsp->qhandle = htobe64((uint64_t) (handle));
H
Haojun Liao 已提交
123 124 125
    } else {
      assert(pQInfo == NULL);
      vnodeRelease(pVnode);
126
    }
H
Haojun Liao 已提交
127

128
    vDebug("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo);
S
slguan 已提交
129
  } else {
130
    assert(pCont != NULL);
131 132
    pQInfo = *(void**)(pCont);
    handle = pCont;
133
    code = TSDB_CODE_VND_ACTION_IN_PROGRESS;
134

135
    vDebug("vgId:%d, QInfo:%p, dnode query msg in progress", pVnode->vgId, pQInfo);
S
slguan 已提交
136 137
  }

138
  if (pQInfo != NULL) {
139 140 141
    qTableQuery(pQInfo); // do execute query

    assert(handle != NULL);
H
Haojun Liao 已提交
142
    taosCacheRelease(pVnode->qHandlePool, (void**) &handle, false);
143 144
  }

S
slguan 已提交
145 146 147
  return code;
}

148 149 150 151
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
  void *   pCont = pReadMsg->pCont;
  SRspRet *pRet = &pReadMsg->rspRet;

S
slguan 已提交
152
  SRetrieveTableMsg *pRetrieve = pCont;
153
  void **pQInfo = (void*) htobe64(pRetrieve->qhandle);
H
Haojun Liao 已提交
154 155
  pRetrieve->free = htons(pRetrieve->free);

156 157
  vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed", pVnode->vgId, *pQInfo);

S
slguan 已提交
158
  memset(pRet, 0, sizeof(SRspRet));
H
Haojun Liao 已提交
159 160
  int32_t ret = 0;

161 162
  void** handle = taosCacheAcquireByKey(pVnode->qHandlePool, pQInfo, sizeof(pQInfo));
  if (handle == NULL || handle != pQInfo) {
H
Haojun Liao 已提交
163 164
    ret = TSDB_CODE_QRY_INVALID_QHANDLE;
  }
S
slguan 已提交
165

H
Haojun Liao 已提交
166
  if (pRetrieve->free == 1) {
167 168
    if (ret == TSDB_CODE_SUCCESS) {
      vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pQInfo);
H
Haojun Liao 已提交
169

170 171 172
      taosCacheRelease(pVnode->qHandlePool, (void**) &handle, true);
      pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
      pRet->len = sizeof(SRetrieveTableRsp);
H
Haojun Liao 已提交
173

174 175 176 177 178 179
      memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
      SRetrieveTableRsp* pRsp = pRet->rsp;
      pRsp->numOfRows = 0;
      pRsp->completed = true;
      pRsp->useconds  = 0;
    } else { // todo handle error
H
Haojun Liao 已提交
180

181
    }
H
Haojun Liao 已提交
182 183 184
    return ret;
  }

185
  vDebug("vgId:%d, QInfo:%p, retrieve msg is received", pVnode->vgId, *pQInfo);
H
Haojun Liao 已提交
186

187
  int32_t code = qRetrieveQueryResultInfo(*pQInfo);
188
  if (code != TSDB_CODE_SUCCESS) {
S
slguan 已提交
189 190 191 192 193
    //TODO
    pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
    memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
  } else {
    // todo check code and handle error in build result set
194
    code = qDumpRetrieveResult(*pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len);
S
slguan 已提交
195

196 197
    if (qHasMoreResultsToRetrieve(*pQInfo)) {
      pRet->qhandle = handle;
198
      code = TSDB_CODE_VND_ACTION_NEED_REPROCESSED;
H
Haojun Liao 已提交
199
    } else { // no further execution invoked, release the ref to vnode
H
Haojun Liao 已提交
200
      taosCacheRelease(pVnode->qHandlePool, (void**) &handle, true);
S
slguan 已提交
201 202 203 204 205
    }
  }
  
  return code;
}
H
Haojun Liao 已提交
206 207 208 209 210 211 212 213 214 215 216 217 218

// notify connection(handle) that current qhandle is created, if current connection from
// client is broken, the query needs to be killed immediately.
int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId) {
  SRetrieveTableMsg* killQueryMsg = rpcMallocCont(sizeof(SRetrieveTableMsg));
  killQueryMsg->qhandle = htobe64((uint64_t) qhandle);
  killQueryMsg->free = htons(1);
  killQueryMsg->header.vgId = htonl(vgId);
  killQueryMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));

  vDebug("QInfo:%p register qhandle to connect:%p", qhandle, handle);
  return rpcReportProgress(handle, (char*) killQueryMsg, sizeof(SRetrieveTableMsg));
}