vnodeRead.c 14.2 KB
Newer Older
S
slguan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
H
Haojun Liao 已提交
17
#define _NON_BLOCKING_RETRIEVE  0
S
slguan 已提交
18
#include "os.h"
H
Haojun Liao 已提交
19
#include "tglobal.h"
S
slguan 已提交
20
#include "taoserror.h"
H
Haojun Liao 已提交
21 22
#include "taosmsg.h"
#include "query.h"
S
slguan 已提交
23 24 25 26
#include "trpc.h"
#include "tsdb.h"
#include "vnode.h"
#include "vnodeInt.h"
27
#include "tqueue.h"
S
slguan 已提交
28

S
TD-1915  
Shengliang Guan 已提交
29 30 31
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SVReadMsg *pRead);
static int32_t  vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead);
static int32_t  vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead);
H
Haojun Liao 已提交
32
static int32_t  vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId);
S
slguan 已提交
33 34

void vnodeInitReadFp(void) {
J
jtao1735 已提交
35 36
  vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg;
  vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg;
S
slguan 已提交
37 38
}

S
TD-1768  
Shengliang Guan 已提交
39 40 41 42 43
//
// After the fetch request enters the vnode queue, if the vnode cannot provide services, the process function are
// still required, or there will be a deadlock, so we don’t do any check here, but put the check codes before the
// request enters the queue
//
S
TD-1899  
Shengliang Guan 已提交
44 45
int32_t vnodeProcessRead(void *vparam, SVReadMsg *pRead) {
  SVnodeObj *pVnode = vparam;
S
TD-1915  
Shengliang Guan 已提交
46
  int32_t    msgType = pRead->msgType;
S
slguan 已提交
47

48
  if (vnodeProcessReadMsgFp[msgType] == NULL) {
49
    vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[msgType]);
50
    return TSDB_CODE_VND_MSG_NOT_PROCESSED;
51
  }
S
slguan 已提交
52

S
TD-1915  
Shengliang Guan 已提交
53
  return (*vnodeProcessReadMsgFp[msgType])(pVnode, pRead);
S
TD-1768  
Shengliang Guan 已提交
54 55
}

S
TD-2166  
Shengliang Guan 已提交
56
static int32_t vnodeCheckRead(SVnodeObj *pVnode) {
57
  if (pVnode->status != TAOS_VN_STATUS_READY) {
S
TD-1982  
Shengliang Guan 已提交
58
    vDebug("vgId:%d, vnode status is %s, refCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status],
S
TD-1768  
Shengliang Guan 已提交
59
           pVnode->refCount, pVnode);
S
TD-1652  
Shengliang Guan 已提交
60
    return TSDB_CODE_APP_NOT_READY;
61
  }
S
slguan 已提交
62

S
TD-1652  
Shengliang Guan 已提交
63
  // tsdb may be in reset state
S
TD-1661  
Shengliang Guan 已提交
64
  if (pVnode->tsdb == NULL) {
S
TD-1982  
Shengliang Guan 已提交
65
    vDebug("vgId:%d, tsdb is null, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
S
TD-1661  
Shengliang Guan 已提交
66 67
    return TSDB_CODE_APP_NOT_READY;
  }
68

S
TD-1652  
Shengliang Guan 已提交
69
  if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
S
TD-1982  
Shengliang Guan 已提交
70
    vDebug("vgId:%d, replica:%d role:%s, refCount:%d pVnode:%p", pVnode->vgId, pVnode->syncCfg.replica,
S
TD-1768  
Shengliang Guan 已提交
71
           syncRole[pVnode->role], pVnode->refCount, pVnode);
S
Shengliang Guan 已提交
72
    return TSDB_CODE_APP_NOT_READY;
73
  }
H
Hui Li 已提交
74

S
TD-1768  
Shengliang Guan 已提交
75
  return TSDB_CODE_SUCCESS;
S
slguan 已提交
76
}
S
TD-1661  
Shengliang Guan 已提交
77

S
TD-1899  
Shengliang Guan 已提交
78 79 80 81 82 83 84 85 86 87
void vnodeFreeFromRQueue(void *vparam, SVReadMsg *pRead) {
  SVnodeObj *pVnode = vparam;

  atomic_sub_fetch_32(&pVnode->queuedRMsg, 1);
  vTrace("vgId:%d, free from vrqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg);

  taosFreeQitem(pRead);
  vnodeRelease(pVnode);
}

S
TD-1915  
Shengliang Guan 已提交
88 89
int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qtype, void *rparam) {
  SVnodeObj *pVnode = vparam;
90

S
TD-1915  
Shengliang Guan 已提交
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
  if (qtype == TAOS_QTYPE_RPC || qtype == TAOS_QTYPE_QUERY) {
    int32_t code = vnodeCheckRead(pVnode);
    if (code != TSDB_CODE_SUCCESS) return code;
  }

  int32_t size = sizeof(SVReadMsg) + contLen;
  SVReadMsg *pRead = taosAllocateQitem(size);
  if (pRead == NULL) {
    return TSDB_CODE_VND_OUT_OF_MEMORY;
  }

  if (rparam != NULL) {
    SRpcMsg *pRpcMsg = rparam;
    pRead->rpcHandle = pRpcMsg->handle;
    pRead->rpcAhandle = pRpcMsg->ahandle;
    pRead->msgType = pRpcMsg->msgType;
    pRead->code = pRpcMsg->code;
  }

  if (contLen != 0) {
    pRead->contLen = contLen;
    memcpy(pRead->pCont, pCont, contLen);
  } else {
    pRead->qhandle = pCont;
  }

  pRead->qtype = qtype;
H
Haojun Liao 已提交
118

S
TD-1915  
Shengliang Guan 已提交
119
  atomic_add_fetch_32(&pVnode->refCount, 1);
S
TD-1899  
Shengliang Guan 已提交
120 121
  atomic_add_fetch_32(&pVnode->queuedRMsg, 1);
  vTrace("vgId:%d, write into vrqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg);
S
TD-1768  
Shengliang Guan 已提交
122

S
TD-1915  
Shengliang Guan 已提交
123
  taosWriteQitem(pVnode->rqueue, qtype, pRead);
S
TD-1768  
Shengliang Guan 已提交
124
  return TSDB_CODE_SUCCESS;
125 126
}

S
TD-1915  
Shengliang Guan 已提交
127 128 129 130 131 132 133 134 135 136 137 138 139
static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void *ahandle) {
  SRpcMsg rpcMsg = {0};
  rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
  rpcMsg.ahandle = ahandle;

  int32_t code = vnodeWriteToRQueue(pVnode, qhandle, 0, TAOS_QTYPE_QUERY, &rpcMsg);
  if (code == TSDB_CODE_SUCCESS) {
    vDebug("QInfo:%p add to vread queue for exec query", *qhandle);
  }

  return code;
}

H
Haojun Liao 已提交
140 141 142 143 144 145 146 147 148 149
/**
 *
 * @param pRet         response message object
 * @param pVnode       the vnode object
 * @param handle       qhandle for executing query
 * @param freeHandle   free qhandle or not
 * @param ahandle      sqlObj address at client side
 * @return
 */
static int32_t vnodeDumpQueryResult(SRspRet *pRet, void *pVnode, void **handle, bool *freeHandle, void *ahandle) {
H
Haojun Liao 已提交
150 151 152
  bool continueExec = false;

  int32_t code = TSDB_CODE_SUCCESS;
153
  if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
154
    if (continueExec) {
H
Haojun Liao 已提交
155
      *freeHandle = false;
156
      code = vnodePutItemIntoReadQueue(pVnode, handle, ahandle);
S
TD-1768  
Shengliang Guan 已提交
157 158 159 160 161 162
      if (code != TSDB_CODE_SUCCESS) {
        *freeHandle = true;
        return code;
      } else {
        pRet->qhandle = *handle;
      }
H
Haojun Liao 已提交
163 164
    } else {
      *freeHandle = true;
165
      vDebug("QInfo:%p exec completed, free handle:%d", *handle, *freeHandle);
H
Haojun Liao 已提交
166 167
    }
  } else {
168
    SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
169 170 171 172
    memset(pRsp, 0, sizeof(SRetrieveTableRsp));
    pRsp->completed = true;

    pRet->rsp = pRsp;
173
    pRet->len = sizeof(SRetrieveTableRsp);
H
Haojun Liao 已提交
174 175 176 177 178 179
    *freeHandle = true;
  }

  return code;
}

S
TD-1652  
Shengliang Guan 已提交
180
static void vnodeBuildNoResultQueryRsp(SRspRet *pRet) {
H
Haojun Liao 已提交
181 182 183 184
  pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
  pRet->len = sizeof(SRetrieveTableRsp);

  memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
S
TD-1652  
Shengliang Guan 已提交
185
  SRetrieveTableRsp *pRsp = pRet->rsp;
H
Haojun Liao 已提交
186 187 188 189

  pRsp->completed = true;
}

S
TD-1915  
Shengliang Guan 已提交
190 191 192 193
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
  void *   pCont = pRead->pCont;
  int32_t  contLen = pRead->contLen;
  SRspRet *pRet = &pRead->rspRet;
194

S
TD-1652  
Shengliang Guan 已提交
195
  SQueryTableMsg *pQueryTableMsg = (SQueryTableMsg *)pCont;
S
slguan 已提交
196 197
  memset(pRet, 0, sizeof(SRspRet));

H
Haojun Liao 已提交
198
  // qHandle needs to be freed correctly
S
TD-1915  
Shengliang Guan 已提交
199 200
  if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
    SRetrieveTableMsg *killQueryMsg = (SRetrieveTableMsg *)pRead->pCont;
H
Haojun Liao 已提交
201 202 203
    killQueryMsg->free = htons(killQueryMsg->free);
    killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle);

S
TD-1915  
Shengliang Guan 已提交
204 205
    vWarn("QInfo:%p connection %p broken, kill query", (void *)killQueryMsg->qhandle, pRead->rpcHandle);
    assert(pRead->contLen > 0 && killQueryMsg->free == 1);
206

S
TD-1652  
Shengliang Guan 已提交
207
    void **qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t)killQueryMsg->qhandle);
208
    if (qhandle == NULL || *qhandle == NULL) {
S
TD-1652  
Shengliang Guan 已提交
209
      vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void *)killQueryMsg->qhandle,
S
TD-1915  
Shengliang Guan 已提交
210
            pRead->rpcHandle);
H
Haojun Liao 已提交
211
    } else {
S
TD-1652  
Shengliang Guan 已提交
212
      assert(*qhandle == (void *)killQueryMsg->qhandle);
H
Haojun Liao 已提交
213

214
      qKillQuery(*qhandle);
S
TD-1652  
Shengliang Guan 已提交
215
      qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, true);
H
Haojun Liao 已提交
216 217
    }

218
    return TSDB_CODE_TSC_QUERY_CANCELLED;
H
Haojun Liao 已提交
219 220 221
  }

  int32_t code = TSDB_CODE_SUCCESS;
S
TD-1652  
Shengliang Guan 已提交
222
  void ** handle = NULL;
H
Haojun Liao 已提交
223

S
slguan 已提交
224
  if (contLen != 0) {
H
Haojun Liao 已提交
225
    qinfo_t pQInfo = NULL;
226
    code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo);
227

S
TD-1652  
Shengliang Guan 已提交
228 229
    SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
    pRsp->code = code;
230
    pRsp->qhandle = 0;
231

S
slguan 已提交
232 233
    pRet->len = sizeof(SQueryTableRsp);
    pRet->rsp = pRsp;
B
Bomin Zhang 已提交
234
    int32_t vgId = pVnode->vgId;
H
Haojun Liao 已提交
235

236
    // current connect is broken
H
Haojun Liao 已提交
237
    if (code == TSDB_CODE_SUCCESS) {
S
TD-1652  
Shengliang Guan 已提交
238
      handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t)pQInfo);
H
Haojun Liao 已提交
239
      if (handle == NULL) {  // failed to register qhandle
240 241
        pRsp->code = terrno;
        terrno = 0;
S
Shengliang Guan 已提交
242
        vError("vgId:%d, QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo,
243
               tstrerror(pRsp->code));
244
        qDestroyQueryInfo(pQInfo);  // destroy it directly
245
        return pRsp->code;
246 247
      } else {
        assert(*handle == pQInfo);
S
TD-1652  
Shengliang Guan 已提交
248
        pRsp->qhandle = htobe64((uint64_t)pQInfo);
249 250
      }

S
TD-1652  
Shengliang Guan 已提交
251
      if (handle != NULL &&
S
TD-1915  
Shengliang Guan 已提交
252
          vnodeNotifyCurrentQhandle(pRead->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
S
TD-1652  
Shengliang Guan 已提交
253
        vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle,
S
TD-1915  
Shengliang Guan 已提交
254
               pRead->rpcHandle);
255
        pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
S
TD-1652  
Shengliang Guan 已提交
256
        qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
H
Haojun Liao 已提交
257 258 259 260
        return pRsp->code;
      }
    } else {
      assert(pQInfo == NULL);
261
    }
H
Haojun Liao 已提交
262

dengyihao's avatar
dengyihao 已提交
263
    if (handle != NULL) {
264
      vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app", vgId, *handle);
S
TD-1915  
Shengliang Guan 已提交
265
      code = vnodePutItemIntoReadQueue(pVnode, handle, pRead->rpcHandle);
S
TD-1768  
Shengliang Guan 已提交
266 267 268 269 270
      if (code != TSDB_CODE_SUCCESS) {
        pRsp->code = code;
        qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
        return pRsp->code;
      }
dengyihao's avatar
dengyihao 已提交
271
    }
S
slguan 已提交
272
  } else {
273
    assert(pCont != NULL);
S
TD-1915  
Shengliang Guan 已提交
274
    void **qhandle = (void **)pRead->qhandle;
H
Haojun Liao 已提交
275

H
Haojun Liao 已提交
276 277
    vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, *qhandle);

278 279 280 281 282 283 284
    // In the retrieve blocking model, only 50% CPU will be used in query processing
    if (tsHalfCoresForQuery) {
      qTableQuery(*qhandle);  // do execute query
      qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, false);
    } else {
      bool freehandle = false;
      bool buildRes = qTableQuery(*qhandle);  // do execute query
H
Haojun Liao 已提交
285

286 287 288 289 290
      // build query rsp, the retrieve request has reached here already
      if (buildRes) {
        // update the connection info according to the retrieve connection
        pRead->rpcHandle = qGetResultRetrieveMsg(*qhandle);
        assert(pRead->rpcHandle != NULL);
H
Haojun Liao 已提交
291

292 293
        vDebug("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *qhandle,
               pRead->rpcHandle);
294

295 296
        // set the real rsp error code
        pRead->code = vnodeDumpQueryResult(&pRead->rspRet, pVnode, qhandle, &freehandle, pRead->rpcHandle);
H
Haojun Liao 已提交
297

298 299 300 301 302 303 304
        // NOTE: set return code to be TSDB_CODE_QRY_HAS_RSP to notify dnode to return msg to client
        code = TSDB_CODE_QRY_HAS_RSP;
      } else {
        void *h1 = qGetResultRetrieveMsg(*qhandle);
        assert(h1 == NULL);
        freehandle = qQueryCompleted(*qhandle);
      }
H
Haojun Liao 已提交
305

306 307 308 309 310
      // NOTE: if the qhandle is not put into vread queue or query is completed, free the qhandle.
      // If the building of result is not required, simply free it. Otherwise, mandatorily free the qhandle
      if (freehandle || (!buildRes)) {
        qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, freehandle);
      }
H
Haojun Liao 已提交
311
    }
312
  }
H
Haojun Liao 已提交
313

S
slguan 已提交
314 315 316
  return code;
}

S
TD-1915  
Shengliang Guan 已提交
317 318 319
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
  void *   pCont = pRead->pCont;
  SRspRet *pRet = &pRead->rspRet;
320

S
slguan 已提交
321
  SRetrieveTableMsg *pRetrieve = pCont;
H
Haojun Liao 已提交
322
  pRetrieve->free = htons(pRetrieve->free);
323
  pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
H
Haojun Liao 已提交
324

S
TD-1652  
Shengliang Guan 已提交
325
  vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, (void *)pRetrieve->qhandle,
S
TD-1915  
Shengliang Guan 已提交
326
         pRetrieve->free, pRead->rpcHandle);
327

S
slguan 已提交
328
  memset(pRet, 0, sizeof(SRspRet));
H
Haojun Liao 已提交
329

B
Bomin Zhang 已提交
330
  terrno = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
331
  int32_t code = TSDB_CODE_SUCCESS;
S
TD-1652  
Shengliang Guan 已提交
332
  void ** handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qhandle);
B
Bomin Zhang 已提交
333 334 335 336
  if (handle == NULL) {
    code = terrno;
    terrno = TSDB_CODE_SUCCESS;
  } else if ((*handle) != (void *)pRetrieve->qhandle) {
H
Haojun Liao 已提交
337
    code = TSDB_CODE_QRY_INVALID_QHANDLE;
B
Bomin Zhang 已提交
338
  }
S
TD-1652  
Shengliang Guan 已提交
339

B
Bomin Zhang 已提交
340
  if (code != TSDB_CODE_SUCCESS) {
341
    vError("vgId:%d, invalid handle in retrieving result, code:0x%08x, QInfo:%p", pVnode->vgId, code, (void *)pRetrieve->qhandle);
H
Haojun Liao 已提交
342
    vnodeBuildNoResultQueryRsp(pRet);
H
Haojun Liao 已提交
343
    return code;
H
Haojun Liao 已提交
344
  }
B
Bomin Zhang 已提交
345
  
H
Haojun Liao 已提交
346
  if (pRetrieve->free == 1) {
347
    vWarn("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
348
    qKillQuery(*handle);
S
TD-1652  
Shengliang Guan 已提交
349
    qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
H
Haojun Liao 已提交
350

H
Haojun Liao 已提交
351
    vnodeBuildNoResultQueryRsp(pRet);
352
    code = TSDB_CODE_TSC_QUERY_CANCELLED;
H
Haojun Liao 已提交
353 354 355
    return code;
  }

356
  // register the qhandle to connect to quit query immediate if connection is broken
S
TD-1915  
Shengliang Guan 已提交
357 358
  if (vnodeNotifyCurrentQhandle(pRead->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
    vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle, pRead->rpcHandle);
359
    code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
Haojun Liao 已提交
360
    qKillQuery(*handle);
S
TD-1652  
Shengliang Guan 已提交
361
    qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
362 363
    return code;
  }
364

H
Haojun Liao 已提交
365
  bool freeHandle = true;
S
TD-1652  
Shengliang Guan 已提交
366
  bool buildRes = false;
367

S
TD-1915  
Shengliang Guan 已提交
368
  code = qRetrieveQueryResultInfo(*handle, &buildRes, pRead->rpcHandle);
H
Haojun Liao 已提交
369
  if (code != TSDB_CODE_SUCCESS) {
S
TD-1652  
Shengliang Guan 已提交
370
    // TODO handle malloc failure
H
Haojun Liao 已提交
371
    pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
372
    pRet->len = sizeof(SRetrieveTableRsp);
H
Haojun Liao 已提交
373
    memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
H
Haojun Liao 已提交
374
    freeHandle = true;
S
TD-1652  
Shengliang Guan 已提交
375
  } else {  // result is not ready, return immediately
H
Haojun Liao 已提交
376
    assert(buildRes == true);
H
Haojun Liao 已提交
377

378 379 380 381 382 383 384 385
    // Only effects in the non-blocking model
    if (!tsHalfCoresForQuery) {
      if (!buildRes) {
        assert(pRead->rpcHandle != NULL);

        qReleaseQInfo(pVnode->qMgmt, (void **)&handle, false);
        return TSDB_CODE_QRY_NOT_READY;
      }
386 387
    }

H
Haojun Liao 已提交
388
    // ahandle is the sqlObj pointer
S
TD-1915  
Shengliang Guan 已提交
389
    code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle, pRead->rpcHandle);
H
Haojun Liao 已提交
390
  }
H
Haojun Liao 已提交
391

392 393
  // If qhandle is not added into vread queue, the query should be completed already or paused with error.
  // Here free qhandle immediately
H
Haojun Liao 已提交
394
  if (freeHandle) {
S
TD-1652  
Shengliang Guan 已提交
395
    qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
S
slguan 已提交
396
  }
397

S
slguan 已提交
398 399
  return code;
}
H
Haojun Liao 已提交
400 401 402

// notify connection(handle) that current qhandle is created, if current connection from
// client is broken, the query needs to be killed immediately.
S
TD-1652  
Shengliang Guan 已提交
403 404 405
int32_t vnodeNotifyCurrentQhandle(void *handle, void *qhandle, int32_t vgId) {
  SRetrieveTableMsg *killQueryMsg = rpcMallocCont(sizeof(SRetrieveTableMsg));
  killQueryMsg->qhandle = htobe64((uint64_t)qhandle);
H
Haojun Liao 已提交
406 407 408 409 410
  killQueryMsg->free = htons(1);
  killQueryMsg->header.vgId = htonl(vgId);
  killQueryMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));

  vDebug("QInfo:%p register qhandle to connect:%p", qhandle, handle);
S
TD-1652  
Shengliang Guan 已提交
411
  return rpcReportProgress(handle, (char *)killQueryMsg, sizeof(SRetrieveTableMsg));
412
}