vnodeRead.c 10.3 KB
Newer Older
S
slguan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
H
Haojun Liao 已提交
17
//#include <dnode.h>
S
slguan 已提交
18
#include "os.h"
H
Haojun Liao 已提交
19 20

#include "tglobal.h"
S
slguan 已提交
21
#include "taoserror.h"
H
Haojun Liao 已提交
22 23 24
#include "taosmsg.h"
#include "tcache.h"
#include "query.h"
S
slguan 已提交
25 26 27 28
#include "trpc.h"
#include "tsdb.h"
#include "vnode.h"
#include "vnodeInt.h"
29
#include "tqueue.h"
S
slguan 已提交
30

31 32 33
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SReadMsg *pReadMsg);
static int32_t  vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg);
static int32_t  vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg);
H
Haojun Liao 已提交
34
static int32_t  vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId);
S
slguan 已提交
35 36

void vnodeInitReadFp(void) {
J
jtao1735 已提交
37 38
  vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg;
  vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg;
S
slguan 已提交
39 40
}

41
int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
S
slguan 已提交
42
  SVnodeObj *pVnode = (SVnodeObj *)param;
43
  int msgType = pReadMsg->rpcMsg.msgType;
S
slguan 已提交
44

45
  if (vnodeProcessReadMsgFp[msgType] == NULL) {
46
    vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[msgType]);
47
    return TSDB_CODE_VND_MSG_NOT_PROCESSED;
48
  }
S
slguan 已提交
49

50
  if (pVnode->status != TAOS_VN_STATUS_READY) {
51
    vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[msgType], pVnode->status);
52
    return TSDB_CODE_VND_INVALID_STATUS; 
53
  }
S
slguan 已提交
54

55 56
  // tsdb may be in reset state  
  if (pVnode->tsdb == NULL) return TSDB_CODE_RPC_NOT_READY;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
57
  if (pVnode->status == TAOS_VN_STATUS_CLOSING)
58 59
    return TSDB_CODE_RPC_NOT_READY;

H
Hui Li 已提交
60
  // TODO: Later, let slave to support query
61
  if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
62
    vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role);
H
Hui Li 已提交
63
    return TSDB_CODE_RPC_NOT_READY;
64
  }
H
Hui Li 已提交
65

66
  return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
S
slguan 已提交
67 68
}

H
Haojun Liao 已提交
69
static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void *qhandle) {
70 71 72 73
  SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
  pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
  pRead->pCont = qhandle;
  pRead->contLen = 0;
H
Haojun Liao 已提交
74
  pRead->rpcMsg.handle = NULL;
75 76

  atomic_add_fetch_32(&pVnode->refCount, 1);
H
Haojun Liao 已提交
77 78

  vDebug("QInfo:%p add to query task queue for exec, msg:%p", qhandle, pRead);
79 80 81
  taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead);
}

H
Haojun Liao 已提交
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
static int32_t vnodeDumpQueryResult(SRspRet *pRet, void* pVnode, void* handle, bool* freeHandle) {
  bool continueExec = false;

  int32_t code = TSDB_CODE_SUCCESS;
  if ((code = qDumpRetrieveResult(handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) {
    if (continueExec) {
      vnodePutItemIntoReadQueue(pVnode, handle);
      pRet->qhandle = handle;
      *freeHandle = false;
    } else {
      vDebug("QInfo:%p exec completed", handle);
      *freeHandle = true;
    }
  } else {
    pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
    memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
    *freeHandle = true;
  }

  return code;
}

static void vnodeBuildNoResultQueryRsp(SRspRet* pRet) {
  pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
  pRet->len = sizeof(SRetrieveTableRsp);

  memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
  SRetrieveTableRsp* pRsp = pRet->rsp;

  pRsp->completed = true;
}

114
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
115
  void    *pCont = pReadMsg->pCont;
116 117 118
  int32_t  contLen = pReadMsg->contLen;
  SRspRet *pRet = &pReadMsg->rspRet;

S
slguan 已提交
119 120 121
  SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pCont;
  memset(pRet, 0, sizeof(SRspRet));

H
Haojun Liao 已提交
122
  // qHandle needs to be freed correctly
H
Haojun Liao 已提交
123
  if (pReadMsg->rpcMsg.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
H
Haojun Liao 已提交
124 125 126 127
    SRetrieveTableMsg* killQueryMsg = (SRetrieveTableMsg*) pReadMsg->pCont;
    killQueryMsg->free = htons(killQueryMsg->free);
    killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle);

128
    vWarn("QInfo:%p connection %p broken, kill query", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
H
Haojun Liao 已提交
129
    assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1);
130

131
    void** qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t) killQueryMsg->qhandle);
132 133
    if (qhandle == NULL || *qhandle == NULL) {
      vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
H
Haojun Liao 已提交
134
    } else {
135
      assert(*qhandle == (void*) killQueryMsg->qhandle);
H
Haojun Liao 已提交
136

137
      qKillQuery(*qhandle);
138
      qReleaseQInfo(pVnode->qMgmt, (void**) &qhandle, true);
H
Haojun Liao 已提交
139 140
    }

141
    return TSDB_CODE_TSC_QUERY_CANCELLED;
H
Haojun Liao 已提交
142 143 144
  }

  int32_t code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
145
  void**  handle = NULL;
H
Haojun Liao 已提交
146

S
slguan 已提交
147
  if (contLen != 0) {
H
Haojun Liao 已提交
148
    qinfo_t pQInfo = NULL;
149
    code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo);
150

S
slguan 已提交
151
    SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
152 153
    pRsp->code    = code;
    pRsp->qhandle = 0;
154

S
slguan 已提交
155 156
    pRet->len = sizeof(SQueryTableRsp);
    pRet->rsp = pRsp;
B
Bomin Zhang 已提交
157
    int32_t vgId = pVnode->vgId;
H
Haojun Liao 已提交
158

159
    // current connect is broken
H
Haojun Liao 已提交
160
    if (code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
161
      handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t) pQInfo);
162
      if (handle == NULL) {  // failed to register qhandle
163 164
        vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo,
               tstrerror(pRsp->code));
165
        pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE;
166
        qDestroyQueryInfo(pQInfo);  // destroy it directly
167 168
      } else {
        assert(*handle == pQInfo);
169
        pRsp->qhandle = htobe64((uint64_t) pQInfo);
170 171
      }

172
      if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
173
        vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle);
174
        pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
Haojun Liao 已提交
175
        qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
H
Haojun Liao 已提交
176 177 178 179
        return pRsp->code;
      }
    } else {
      assert(pQInfo == NULL);
180
    }
H
Haojun Liao 已提交
181

dengyihao's avatar
dengyihao 已提交
182
    if (handle != NULL) {
H
Haojun Liao 已提交
183 184
      vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, register qhandle and return to app", vgId, *handle);

H
Haojun Liao 已提交
185 186
      vnodePutItemIntoReadQueue(pVnode, *handle);
      qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
dengyihao's avatar
dengyihao 已提交
187
    }
188

S
slguan 已提交
189
  } else {
190
    assert(pCont != NULL);
H
Haojun Liao 已提交
191 192

    handle = qAcquireQInfo(pVnode->qMgmt, (uint64_t) pCont);
H
Haojun Liao 已提交
193
    if (handle == NULL) {
194
      vWarn("QInfo:%p invalid qhandle in continuing exec query, conn:%p", (void*) pCont, pReadMsg->rpcMsg.handle);
H
Haojun Liao 已提交
195 196
      code = TSDB_CODE_QRY_INVALID_QHANDLE;
    } else {
H
Haojun Liao 已提交
197
      vDebug("vgId:%d, QInfo:%p, dnode continue exec query", pVnode->vgId, (void*) pCont);
H
Haojun Liao 已提交
198 199

      bool freehandle = false;
200 201
      bool buildRes = qTableQuery(*handle); // do execute query

H
Haojun Liao 已提交
202 203
      // build query rsp
      if (buildRes) {
H
Haojun Liao 已提交
204 205 206
        // update the connection info according to the retrieve connection
        pReadMsg->rpcMsg.handle = qGetResultRetrieveMsg(*handle);
        assert(pReadMsg->rpcMsg.handle != NULL);
207

H
Haojun Liao 已提交
208
        vDebug("vgId:%d, QInfo:%p, start to build result rsp after query paused, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle);
H
Haojun Liao 已提交
209
        code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, *handle, &freehandle);
210

H
Haojun Liao 已提交
211 212 213
        // todo test the error code case
        if (code == TSDB_CODE_SUCCESS) {
          code = TSDB_CODE_QRY_HAS_RSP;
214 215
        }
      }
H
Haojun Liao 已提交
216

H
Haojun Liao 已提交
217
      qReleaseQInfo(pVnode->qMgmt, (void**) &handle, freehandle);
H
Haojun Liao 已提交
218
    }
219
  }
H
Haojun Liao 已提交
220

S
slguan 已提交
221 222 223
  return code;
}

224 225 226 227
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
  void *   pCont = pReadMsg->pCont;
  SRspRet *pRet = &pReadMsg->rspRet;

S
slguan 已提交
228
  SRetrieveTableMsg *pRetrieve = pCont;
H
Haojun Liao 已提交
229
  pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
H
Haojun Liao 已提交
230 231
  pRetrieve->free = htons(pRetrieve->free);

232
  vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, (void*) pRetrieve->qhandle, pRetrieve->free, pReadMsg->rpcMsg.handle);
233

S
slguan 已提交
234
  memset(pRet, 0, sizeof(SRspRet));
H
Haojun Liao 已提交
235

H
Haojun Liao 已提交
236
  int32_t code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
237
  void** handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qhandle);
238
  if (handle == NULL || (*handle) != (void*) pRetrieve->qhandle) {
H
Haojun Liao 已提交
239
    code = TSDB_CODE_QRY_INVALID_QHANDLE;
240
    vDebug("vgId:%d, invalid qhandle in fetch result, QInfo:%p", pVnode->vgId, (void*) pRetrieve->qhandle);
H
Haojun Liao 已提交
241 242
    
    vnodeBuildNoResultQueryRsp(pRet);
H
Haojun Liao 已提交
243
    return code;
H
Haojun Liao 已提交
244
  }
S
slguan 已提交
245

H
Haojun Liao 已提交
246
  if (pRetrieve->free == 1) {
H
Haojun Liao 已提交
247
    vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
248
    qKillQuery(*handle);
H
Haojun Liao 已提交
249
    qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
H
Haojun Liao 已提交
250

H
Haojun Liao 已提交
251
    vnodeBuildNoResultQueryRsp(pRet);
H
Haojun Liao 已提交
252 253 254 255
    return code;
  }

  bool freeHandle = true;
256 257
  bool buildRes   = false;

258
  code = qRetrieveQueryResultInfo(*handle, &buildRes, pReadMsg->rpcMsg.handle);
H
Haojun Liao 已提交
259 260 261 262
  if (code != TSDB_CODE_SUCCESS) {
    //TODO handle malloc failure
    pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
    memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
H
Haojun Liao 已提交
263
  } else { // result is not ready, return immediately
264
    if (!buildRes) {
H
Haojun Liao 已提交
265
      qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
266 267 268
      return TSDB_CODE_QRY_NOT_READY;
    }

H
Haojun Liao 已提交
269
    code = vnodeDumpQueryResult(pRet, pVnode, *handle, &freeHandle);
S
slguan 已提交
270
  }
271

H
Haojun Liao 已提交
272
  qReleaseQInfo(pVnode->qMgmt, (void**) &handle, freeHandle);
S
slguan 已提交
273 274
  return code;
}
H
Haojun Liao 已提交
275 276 277 278 279 280 281 282 283 284 285 286

// notify connection(handle) that current qhandle is created, if current connection from
// client is broken, the query needs to be killed immediately.
int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId) {
  SRetrieveTableMsg* killQueryMsg = rpcMallocCont(sizeof(SRetrieveTableMsg));
  killQueryMsg->qhandle = htobe64((uint64_t) qhandle);
  killQueryMsg->free = htons(1);
  killQueryMsg->header.vgId = htonl(vgId);
  killQueryMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));

  vDebug("QInfo:%p register qhandle to connect:%p", qhandle, handle);
  return rpcReportProgress(handle, (char*) killQueryMsg, sizeof(SRetrieveTableMsg));
287
}