vnodeReadMsg.c 11.0 KB
Newer Older
S
slguan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "os.h"
H
Haojun Liao 已提交
18
#include "taosmsg.h"
19
#include "tglobal.h"
20
// #include "query.h"
21
#include "vnodeStatus.h"
22 23
#include "vnodeRead.h"
#include "vnodeReadMsg.h"
S
slguan 已提交
24

25 26 27 28 29 30 31 32
#if 0
// notify connection(handle) that current qhandle is created, if current connection from
// client is broken, the query needs to be killed immediately.
static int32_t vnodeNotifyCurrentQhandle(void *handle, uint64_t qId, void *qhandle, int32_t vgId) {
  SRetrieveTableMsg *pMsg = rpcMallocCont(sizeof(SRetrieveTableMsg));
  pMsg->qId = htobe64(qId);
  pMsg->header.vgId = htonl(vgId);
  pMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));
S
TD-1915  
Shengliang Guan 已提交
33

34 35
  vTrace("QInfo:0x%" PRIx64 "-%p register qhandle to connect:%p", qId, qhandle, handle);
  return rpcReportProgress(handle, (char *)pMsg, sizeof(SRetrieveTableMsg));
S
TD-1915  
Shengliang Guan 已提交
36 37
}

H
Haojun Liao 已提交
38 39 40 41 42 43 44 45
/**
 * @param pRet         response message object
 * @param pVnode       the vnode object
 * @param handle       qhandle for executing query
 * @param freeHandle   free qhandle or not
 * @param ahandle      sqlObj address at client side
 * @return
 */
46 47
static int32_t vnodeDumpQueryResult(SVnRsp *pRet, void *pVnode, uint64_t qId, void **handle, bool *freeHandle,
                                    void *ahandle) {
H
Haojun Liao 已提交
48 49 50
  bool continueExec = false;

  int32_t code = TSDB_CODE_SUCCESS;
51 52
  if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) ==
      TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
53
    if (continueExec) {
H
Haojun Liao 已提交
54
      *freeHandle = false;
55
      code = vnodeReputPutToRQueue(pVnode, handle, ahandle);
S
TD-1768  
Shengliang Guan 已提交
56 57 58 59 60 61
      if (code != TSDB_CODE_SUCCESS) {
        *freeHandle = true;
        return code;
      } else {
        pRet->qhandle = *handle;
      }
H
Haojun Liao 已提交
62 63
    } else {
      *freeHandle = true;
64
      vTrace("QInfo:0x%" PRIx64 "-%p exec completed, free handle:%d", qId, *handle, *freeHandle);
H
Haojun Liao 已提交
65 66
    }
  } else {
67
    SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
68 69 70 71
    memset(pRsp, 0, sizeof(SRetrieveTableRsp));
    pRsp->completed = true;

    pRet->rsp = pRsp;
72
    pRet->len = sizeof(SRetrieveTableRsp);
H
Haojun Liao 已提交
73 74 75 76 77 78
    *freeHandle = true;
  }

  return code;
}

79
static void vnodeBuildNoResultQueryRsp(SVnRsp *pRet) {
H
Haojun Liao 已提交
80 81 82 83
  pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
  pRet->len = sizeof(SRetrieveTableRsp);

  memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
S
TD-1652  
Shengliang Guan 已提交
84
  SRetrieveTableRsp *pRsp = pRet->rsp;
H
Haojun Liao 已提交
85 86 87

  pRsp->completed = true;
}
88
#endif
H
Haojun Liao 已提交
89

90 91
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead) {
#if 0  
S
TD-1915  
Shengliang Guan 已提交
92 93
  void *   pCont = pRead->pCont;
  int32_t  contLen = pRead->contLen;
94
  SVnRsp *pRet = &pRead->rspRet;
95

S
TD-1652  
Shengliang Guan 已提交
96
  SQueryTableMsg *pQueryTableMsg = (SQueryTableMsg *)pCont;
97
  memset(pRet, 0, sizeof(SVnRsp));
S
slguan 已提交
98

H
Haojun Liao 已提交
99
  // qHandle needs to be freed correctly
100 101 102
  if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
    vError("error rpc msg in query, %s", tstrerror(pRead->code));
  }
H
Haojun Liao 已提交
103 104

  int32_t code = TSDB_CODE_SUCCESS;
S
TD-1652  
Shengliang Guan 已提交
105
  void ** handle = NULL;
H
Haojun Liao 已提交
106

S
slguan 已提交
107
  if (contLen != 0) {
108
    qinfo_t  pQInfo = NULL;
D
dapan1121 已提交
109
    uint64_t qId = genQueryId();
H
Haojun Liao 已提交
110
    code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo, qId);
111

S
TD-1652  
Shengliang Guan 已提交
112 113
    SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
    pRsp->code = code;
114
    pRsp->qId = 0;
115

S
slguan 已提交
116 117
    pRet->len = sizeof(SQueryTableRsp);
    pRet->rsp = pRsp;
B
Bomin Zhang 已提交
118
    int32_t vgId = pVnode->vgId;
H
Haojun Liao 已提交
119

120
    // current connect is broken
H
Haojun Liao 已提交
121
    if (code == TSDB_CODE_SUCCESS) {
122
      handle = qRegisterQInfo(pVnode->qMgmt, qId, pQInfo);
H
Haojun Liao 已提交
123
      if (handle == NULL) {  // failed to register qhandle
124 125
        pRsp->code = terrno;
        terrno = 0;
126

127 128
        vError("vgId:%d, QInfo:0x%" PRIx64 "-%p register qhandle failed, return to app, code:%s,", pVnode->vgId, qId,
               (void *)pQInfo, tstrerror(pRsp->code));
129
        qDestroyQueryInfo(pQInfo);  // destroy it directly
130
        return pRsp->code;
131 132
      } else {
        assert(*handle == pQInfo);
H
Haojun Liao 已提交
133
        pRsp->qId = htobe64(qId);
134 135
      }

S
TD-1652  
Shengliang Guan 已提交
136
      if (handle != NULL &&
H
Haojun Liao 已提交
137
          vnodeNotifyCurrentQhandle(pRead->rpcHandle, qId, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
138
        vError("vgId:%d, QInfo:0x%" PRIx64 "-%p, query discarded since link is broken, %p", pVnode->vgId, qId, *handle,
S
TD-1915  
Shengliang Guan 已提交
139
               pRead->rpcHandle);
140

141
        pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
S
TD-1652  
Shengliang Guan 已提交
142
        qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
H
Haojun Liao 已提交
143 144
        return pRsp->code;
      }
145

H
Haojun Liao 已提交
146 147
    } else {
      assert(pQInfo == NULL);
148
    }
H
Haojun Liao 已提交
149

dengyihao's avatar
dengyihao 已提交
150
    if (handle != NULL) {
151 152 153
      vTrace("vgId:%d, QInfo:0x%" PRIx64 "-%p, query msg disposed, create qhandle and returns to app", vgId, qId,
             *handle);
      code = vnodeReputPutToRQueue(pVnode, handle, pRead->rpcHandle);
S
TD-1768  
Shengliang Guan 已提交
154 155 156 157 158
      if (code != TSDB_CODE_SUCCESS) {
        pRsp->code = code;
        qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
        return pRsp->code;
      }
dengyihao's avatar
dengyihao 已提交
159
    }
160

161
    int32_t remain = atomic_add_fetch_32(&pVnode->numOfExistQHandle, 1);
162
    vTrace("vgId:%d, new qhandle created, total qhandle:%d", pVnode->vgId, remain);
S
slguan 已提交
163
  } else {
164
    assert(pCont != NULL);
165
    void **  qhandle = (void **)pRead->qhandle;
H
Haojun Liao 已提交
166
    uint64_t qId = 0;
H
Haojun Liao 已提交
167

168
    vTrace("vgId:%d, QInfo:%p, continues to exec query", pVnode->vgId, *qhandle);
H
Haojun Liao 已提交
169

170
    // In the retrieve blocking model, only 50% CPU will be used in query processing
H
Haojun Liao 已提交
171
    if (tsRetrieveBlockingModel) {
H
Haojun Liao 已提交
172
      qTableQuery(*qhandle, &qId);  // do execute query
173 174 175
      qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, false);
    } else {
      bool freehandle = false;
H
Haojun Liao 已提交
176
      bool buildRes = qTableQuery(*qhandle, &qId);  // do execute query
H
Haojun Liao 已提交
177

178 179 180 181 182
      // build query rsp, the retrieve request has reached here already
      if (buildRes) {
        // update the connection info according to the retrieve connection
        pRead->rpcHandle = qGetResultRetrieveMsg(*qhandle);
        assert(pRead->rpcHandle != NULL);
H
Haojun Liao 已提交
183

S
TD-2321  
Shengliang Guan 已提交
184
        vTrace("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *qhandle,
185
               pRead->rpcHandle);
186

187
        // set the real rsp error code
H
Haojun Liao 已提交
188
        pRead->code = vnodeDumpQueryResult(&pRead->rspRet, pVnode, qId, qhandle, &freehandle, pRead->rpcHandle);
H
Haojun Liao 已提交
189

190 191 192
        // NOTE: set return code to be TSDB_CODE_QRY_HAS_RSP to notify dnode to return msg to client
        code = TSDB_CODE_QRY_HAS_RSP;
      } else {
193
        // void *h1 = qGetResultRetrieveMsg(*qhandle);
D
fix bug  
dapan1121 已提交
194

195 196 197
        /* remove this assert, one possible case that will cause h1 not NULL: query thread unlock pQInfo->lock, and then
         * FETCH thread execute twice before query thread reach here */
        // assert(h1 == NULL);
D
fix bug  
dapan1121 已提交
198

199 200
        freehandle = qQueryCompleted(*qhandle);
      }
H
Haojun Liao 已提交
201

202 203 204
      // NOTE: if the qhandle is not put into vread queue or query is completed, free the qhandle.
      // If the building of result is not required, simply free it. Otherwise, mandatorily free the qhandle
      if (freehandle || (!buildRes)) {
205
        if (freehandle) {
206
          int32_t remain = atomic_sub_fetch_32(&pVnode->numOfExistQHandle, 1);
207 208 209
          vTrace("vgId:%d, QInfo:%p, start to free qhandle, remain qhandle:%d", pVnode->vgId, *qhandle, remain);
        }

210 211
        qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, freehandle);
      }
H
Haojun Liao 已提交
212
    }
213
  }
H
Haojun Liao 已提交
214

S
slguan 已提交
215
  return code;
216 217
#endif
  return 0;  
S
slguan 已提交
218 219
}

220 221 222 223
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead) {
#if 0  
  void *   pCont = pRead->pCont;
  SVnRsp *pRet = &pRead->rspRet;
224

S
slguan 已提交
225
  SRetrieveTableMsg *pRetrieve = pCont;
H
Haojun Liao 已提交
226
  pRetrieve->free = htons(pRetrieve->free);
H
Haojun Liao 已提交
227
  pRetrieve->qId = htobe64(pRetrieve->qId);
H
Haojun Liao 已提交
228

H
Haojun Liao 已提交
229
  vTrace("vgId:%d, qId:0x%" PRIx64 ", retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, pRetrieve->qId,
S
TD-1915  
Shengliang Guan 已提交
230
         pRetrieve->free, pRead->rpcHandle);
231

232
  memset(pRet, 0, sizeof(SVnRsp));
H
Haojun Liao 已提交
233

B
Bomin Zhang 已提交
234
  terrno = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
235
  int32_t code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
236
  void ** handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qId);
B
Bomin Zhang 已提交
237 238 239
  if (handle == NULL) {
    code = terrno;
    terrno = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
240
  } else if (!checkQIdEqual(*handle, pRetrieve->qId)) {
H
Haojun Liao 已提交
241
    code = TSDB_CODE_QRY_INVALID_QHANDLE;
B
Bomin Zhang 已提交
242
  }
S
TD-1652  
Shengliang Guan 已提交
243

B
Bomin Zhang 已提交
244
  if (code != TSDB_CODE_SUCCESS) {
245 246
    vError("vgId:%d, invalid qId in retrieving result, code:%s, QInfo:%" PRIu64, pVnode->vgId, tstrerror(code),
           pRetrieve->qId);
H
Haojun Liao 已提交
247
    vnodeBuildNoResultQueryRsp(pRet);
H
Haojun Liao 已提交
248
    return code;
H
Haojun Liao 已提交
249
  }
H
Haojun Liao 已提交
250

251 252
  // kill current query and free corresponding resources.
  if (pRetrieve->free == 1) {
253 254 255
    int32_t remain = atomic_sub_fetch_32(&pVnode->numOfExistQHandle, 1);
    vWarn("vgId:%d, QInfo:%" PRIx64 "-%p, retrieve msg received to kill query and free qhandle, remain qhandle:%d",
          pVnode->vgId, pRetrieve->qId, *handle, remain);
256

257 258 259 260 261 262 263
    qKillQuery(*handle);
    qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);

    vnodeBuildNoResultQueryRsp(pRet);
    code = TSDB_CODE_TSC_QUERY_CANCELLED;
    return code;
  }
H
Haojun Liao 已提交
264

265
  // register the qhandle to connect to quit query immediate if connection is broken
H
Haojun Liao 已提交
266
  if (vnodeNotifyCurrentQhandle(pRead->rpcHandle, pRetrieve->qId, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
267 268 269
    int32_t remain = atomic_sub_fetch_32(&pVnode->numOfExistQHandle, 1);
    vError("vgId:%d, QInfo:%" PRIu64 "-%p, retrieve discarded since link is broken, conn:%p, remain qhandle:%d",
           pVnode->vgId, pRetrieve->qhandle, *handle, pRead->rpcHandle, remain);
270

271
    code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
Haojun Liao 已提交
272
    qKillQuery(*handle);
S
TD-1652  
Shengliang Guan 已提交
273
    qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
274 275
    return code;
  }
276

H
Haojun Liao 已提交
277
  bool freeHandle = true;
S
TD-1652  
Shengliang Guan 已提交
278
  bool buildRes = false;
279

S
TD-1915  
Shengliang Guan 已提交
280
  code = qRetrieveQueryResultInfo(*handle, &buildRes, pRead->rpcHandle);
H
Haojun Liao 已提交
281
  if (code != TSDB_CODE_SUCCESS) {
S
TD-1652  
Shengliang Guan 已提交
282
    // TODO handle malloc failure
H
Haojun Liao 已提交
283
    pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
284
    pRet->len = sizeof(SRetrieveTableRsp);
H
Haojun Liao 已提交
285
    memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
H
Haojun Liao 已提交
286
    freeHandle = true;
S
TD-1652  
Shengliang Guan 已提交
287
  } else {  // result is not ready, return immediately
H
Haojun Liao 已提交
288
    // Only affects the non-blocking model
H
Haojun Liao 已提交
289
    if (!tsRetrieveBlockingModel) {
290 291 292 293 294
      if (!buildRes) {
        assert(pRead->rpcHandle != NULL);
        qReleaseQInfo(pVnode->qMgmt, (void **)&handle, false);
        return TSDB_CODE_QRY_NOT_READY;
      }
295 296
    }

H
Haojun Liao 已提交
297
    // ahandle is the sqlObj pointer
H
Haojun Liao 已提交
298
    code = vnodeDumpQueryResult(pRet, pVnode, pRetrieve->qId, handle, &freeHandle, pRead->rpcHandle);
H
Haojun Liao 已提交
299
  }
H
Haojun Liao 已提交
300

301 302
  // If qhandle is not added into vread queue, the query should be completed already or paused with error.
  // Here free qhandle immediately
H
Haojun Liao 已提交
303
  if (freeHandle) {
304
    int32_t remain = atomic_sub_fetch_32(&pVnode->numOfExistQHandle, 1);
305
    vTrace("vgId:%d, QInfo:%p, start to free qhandle, remain qhandle:%d", pVnode->vgId, *handle, remain);
S
TD-1652  
Shengliang Guan 已提交
306
    qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
S
slguan 已提交
307
  }
308

S
slguan 已提交
309
  return code;
310 311
#endif 
  return 0;
S
slguan 已提交
312
}
H
Haojun Liao 已提交
313