vnodeReadMsg.c 11.9 KB
Newer Older
S
slguan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "os.h"
H
Haojun Liao 已提交
18
#include "taosmsg.h"
19
#include "tglobal.h"
20
// #include "query.h"
21
#include "vnodeStatus.h"
22 23
#include "vnodeRead.h"
#include "vnodeReadMsg.h"
S
slguan 已提交
24

25 26 27 28 29 30 31 32
#if 0
// notify connection(handle) that current qhandle is created, if current connection from
// client is broken, the query needs to be killed immediately.
static int32_t vnodeNotifyCurrentQhandle(void *handle, uint64_t qId, void *qhandle, int32_t vgId) {
  SRetrieveTableMsg *pMsg = rpcMallocCont(sizeof(SRetrieveTableMsg));
  pMsg->qId = htobe64(qId);
  pMsg->header.vgId = htonl(vgId);
  pMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));
S
TD-1915  
Shengliang Guan 已提交
33

34 35
  vTrace("QInfo:0x%" PRIx64 "-%p register qhandle to connect:%p", qId, qhandle, handle);
  return rpcReportProgress(handle, (char *)pMsg, sizeof(SRetrieveTableMsg));
S
TD-1915  
Shengliang Guan 已提交
36 37
}

H
Haojun Liao 已提交
38 39 40 41 42 43 44 45
/**
 * @param pRet         response message object
 * @param pVnode       the vnode object
 * @param handle       qhandle for executing query
 * @param freeHandle   free qhandle or not
 * @param ahandle      sqlObj address at client side
 * @return
 */
46 47
static int32_t vnodeDumpQueryResult(SVnRsp *pRet, void *pVnode, uint64_t qId, void **handle, bool *freeHandle,
                                    void *ahandle) {
H
Haojun Liao 已提交
48 49 50
  bool continueExec = false;

  int32_t code = TSDB_CODE_SUCCESS;
51 52
  if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) ==
      TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
53
    if (continueExec) {
H
Haojun Liao 已提交
54
      *freeHandle = false;
55
      code = vnodeReputPutToRQueue(pVnode, handle, ahandle);
S
TD-1768  
Shengliang Guan 已提交
56 57 58 59 60 61
      if (code != TSDB_CODE_SUCCESS) {
        *freeHandle = true;
        return code;
      } else {
        pRet->qhandle = *handle;
      }
H
Haojun Liao 已提交
62 63
    } else {
      *freeHandle = true;
64
      vTrace("QInfo:0x%" PRIx64 "-%p exec completed, free handle:%d", qId, *handle, *freeHandle);
H
Haojun Liao 已提交
65 66
    }
  } else {
67
    SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
68 69 70 71
    memset(pRsp, 0, sizeof(SRetrieveTableRsp));
    pRsp->completed = true;

    pRet->rsp = pRsp;
72
    pRet->len = sizeof(SRetrieveTableRsp);
H
Haojun Liao 已提交
73 74 75 76 77 78
    *freeHandle = true;
  }

  return code;
}

79
static void vnodeBuildNoResultQueryRsp(SVnRsp *pRet) {
H
Haojun Liao 已提交
80 81 82 83
  pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
  pRet->len = sizeof(SRetrieveTableRsp);

  memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
S
TD-1652  
Shengliang Guan 已提交
84
  SRetrieveTableRsp *pRsp = pRet->rsp;
H
Haojun Liao 已提交
85 86 87

  pRsp->completed = true;
}
88
#endif
H
Haojun Liao 已提交
89

90 91
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead) {
#if 0  
S
TD-1915  
Shengliang Guan 已提交
92 93
  void *   pCont = pRead->pCont;
  int32_t  contLen = pRead->contLen;
94
  SVnRsp *pRet = &pRead->rspRet;
95

S
TD-1652  
Shengliang Guan 已提交
96
  SQueryTableMsg *pQueryTableMsg = (SQueryTableMsg *)pCont;
97
  memset(pRet, 0, sizeof(SVnRsp));
S
slguan 已提交
98

H
Haojun Liao 已提交
99
  // qHandle needs to be freed correctly
100 101 102
  if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
    vError("error rpc msg in query, %s", tstrerror(pRead->code));
  }
H
Haojun Liao 已提交
103 104

  int32_t code = TSDB_CODE_SUCCESS;
S
TD-1652  
Shengliang Guan 已提交
105
  void ** handle = NULL;
H
Haojun Liao 已提交
106

S
slguan 已提交
107
  if (contLen != 0) {
108
    qinfo_t  pQInfo = NULL;
D
dapan1121 已提交
109
    uint64_t qId = genQueryId();
H
Haojun Liao 已提交
110
    code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo, qId);
111

S
TD-1652  
Shengliang Guan 已提交
112 113
    SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
    pRsp->code = code;
114
    pRsp->qId = 0;
115

S
slguan 已提交
116 117
    pRet->len = sizeof(SQueryTableRsp);
    pRet->rsp = pRsp;
B
Bomin Zhang 已提交
118
    int32_t vgId = pVnode->vgId;
H
Haojun Liao 已提交
119

120
    // current connect is broken
H
Haojun Liao 已提交
121
    if (code == TSDB_CODE_SUCCESS) {
122
      handle = qRegisterQInfo(pVnode->qMgmt, qId, pQInfo);
H
Haojun Liao 已提交
123
      if (handle == NULL) {  // failed to register qhandle
124 125
        pRsp->code = terrno;
        terrno = 0;
126

127 128
        vError("vgId:%d, QInfo:0x%" PRIx64 "-%p register qhandle failed, return to app, code:%s,", pVnode->vgId, qId,
               (void *)pQInfo, tstrerror(pRsp->code));
129
        qDestroyQueryInfo(pQInfo);  // destroy it directly
130
        return pRsp->code;
131 132
      } else {
        assert(*handle == pQInfo);
H
Haojun Liao 已提交
133
        pRsp->qId = htobe64(qId);
134 135
      }

S
TD-1652  
Shengliang Guan 已提交
136
      if (handle != NULL &&
H
Haojun Liao 已提交
137
          vnodeNotifyCurrentQhandle(pRead->rpcHandle, qId, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
138
        vError("vgId:%d, QInfo:0x%" PRIx64 "-%p, query discarded since link is broken, %p", pVnode->vgId, qId, *handle,
S
TD-1915  
Shengliang Guan 已提交
139
               pRead->rpcHandle);
140

141
        pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
S
TD-1652  
Shengliang Guan 已提交
142
        qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
H
Haojun Liao 已提交
143 144
        return pRsp->code;
      }
145

H
Haojun Liao 已提交
146 147
    } else {
      assert(pQInfo == NULL);
148
    }
H
Haojun Liao 已提交
149

dengyihao's avatar
dengyihao 已提交
150
    if (handle != NULL) {
151 152 153
      vTrace("vgId:%d, QInfo:0x%" PRIx64 "-%p, query msg disposed, create qhandle and returns to app", vgId, qId,
             *handle);
      code = vnodeReputPutToRQueue(pVnode, handle, pRead->rpcHandle);
S
TD-1768  
Shengliang Guan 已提交
154 155 156 157 158
      if (code != TSDB_CODE_SUCCESS) {
        pRsp->code = code;
        qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
        return pRsp->code;
      }
dengyihao's avatar
dengyihao 已提交
159
    }
160

S
Shengliang Guan 已提交
161
    int32_t remain = atomic_add_fetch_32(&pVnode->numOfQHandle, 1);
162
    vTrace("vgId:%d, new qhandle created, total qhandle:%d", pVnode->vgId, remain);
S
slguan 已提交
163
  } else {
164
    assert(pCont != NULL);
165
    void **  qhandle = (void **)pRead->qhandle;
H
Haojun Liao 已提交
166
    uint64_t qId = 0;
H
Haojun Liao 已提交
167

168
    vTrace("vgId:%d, QInfo:%p, continues to exec query", pVnode->vgId, *qhandle);
H
Haojun Liao 已提交
169

170
    // In the retrieve blocking model, only 50% CPU will be used in query processing
H
Haojun Liao 已提交
171
    if (tsRetrieveBlockingModel) {
H
Haojun Liao 已提交
172
      qTableQuery(*qhandle, &qId);  // do execute query
173 174 175
      qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, false);
    } else {
      bool freehandle = false;
H
Haojun Liao 已提交
176
      bool buildRes = qTableQuery(*qhandle, &qId);  // do execute query
H
Haojun Liao 已提交
177

178 179 180 181 182
      // build query rsp, the retrieve request has reached here already
      if (buildRes) {
        // update the connection info according to the retrieve connection
        pRead->rpcHandle = qGetResultRetrieveMsg(*qhandle);
        assert(pRead->rpcHandle != NULL);
H
Haojun Liao 已提交
183

S
TD-2321  
Shengliang Guan 已提交
184
        vTrace("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *qhandle,
185
               pRead->rpcHandle);
186

187
        // set the real rsp error code
H
Haojun Liao 已提交
188
        pRead->code = vnodeDumpQueryResult(&pRead->rspRet, pVnode, qId, qhandle, &freehandle, pRead->rpcHandle);
H
Haojun Liao 已提交
189

190 191 192
        // NOTE: set return code to be TSDB_CODE_QRY_HAS_RSP to notify dnode to return msg to client
        code = TSDB_CODE_QRY_HAS_RSP;
      } else {
193
        // void *h1 = qGetResultRetrieveMsg(*qhandle);
D
fix bug  
dapan1121 已提交
194

195 196 197
        /* remove this assert, one possible case that will cause h1 not NULL: query thread unlock pQInfo->lock, and then
         * FETCH thread execute twice before query thread reach here */
        // assert(h1 == NULL);
D
fix bug  
dapan1121 已提交
198

199 200
        freehandle = qQueryCompleted(*qhandle);
      }
H
Haojun Liao 已提交
201

202 203 204
      // NOTE: if the qhandle is not put into vread queue or query is completed, free the qhandle.
      // If the building of result is not required, simply free it. Otherwise, mandatorily free the qhandle
      if (freehandle || (!buildRes)) {
205
        if (freehandle) {
S
Shengliang Guan 已提交
206
          int32_t remain = atomic_sub_fetch_32(&pVnode->numOfQHandle, 1);
207 208 209
          vTrace("vgId:%d, QInfo:%p, start to free qhandle, remain qhandle:%d", pVnode->vgId, *qhandle, remain);
        }

210 211
        qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, freehandle);
      }
H
Haojun Liao 已提交
212
    }
213
  }
H
Haojun Liao 已提交
214

S
slguan 已提交
215
  return code;
216 217
#endif
  return 0;  
S
slguan 已提交
218 219
}

L
Liu Jicong 已提交
220
//mq related
L
Liu Jicong 已提交
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead) {
  //parse message and optionally move offset
  void* pMsg = pRead->pCont;
  tmqConsumeReq *pConsumeMsg = (tmqConsumeReq*) pMsg;
  tmqMsgHead msgHead = pConsumeMsg->head;
  //extract head
  STQ *pTq = pVnode->pTQ;
  tqGroupHandle *pHandle = tqFindGHandleBycId(pTq, msgHead.clientId);
  //return msg if offset not moved
  if(pConsumeMsg->commitOffset == pHandle->consumeOffset) {
    //return msg
    return 0;
  }
  //or move offset
  tqMoveOffsetToNext(pHandle);
  //fetch or register context
  tqFetchMsg(pHandle, pRead);
  //judge mode, tail read or catch up read
L
Liu Jicong 已提交
239
  /*int64_t lastVer = walLastVer(pVnode->wal);*/
L
Liu Jicong 已提交
240
  //launch new query
L
Liu Jicong 已提交
241 242
  return 0;
}
L
Liu Jicong 已提交
243

L
Liu Jicong 已提交
244
int32_t vnodeProcessTqQueryMsg(SVnode *pVnode, SReadMsg *pRead) {
L
Liu Jicong 已提交
245 246 247 248
  //get operator tree from tq data structure
  //execute operator tree
  //put data into ringbuffer
  //unref memory
L
Liu Jicong 已提交
249 250 251 252
  return 0;
}
//mq related end

253 254 255 256
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead) {
#if 0  
  void *   pCont = pRead->pCont;
  SVnRsp *pRet = &pRead->rspRet;
257

S
slguan 已提交
258
  SRetrieveTableMsg *pRetrieve = pCont;
H
Haojun Liao 已提交
259
  pRetrieve->free = htons(pRetrieve->free);
H
Haojun Liao 已提交
260
  pRetrieve->qId = htobe64(pRetrieve->qId);
H
Haojun Liao 已提交
261

H
Haojun Liao 已提交
262
  vTrace("vgId:%d, qId:0x%" PRIx64 ", retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, pRetrieve->qId,
S
TD-1915  
Shengliang Guan 已提交
263
         pRetrieve->free, pRead->rpcHandle);
264

265
  memset(pRet, 0, sizeof(SVnRsp));
H
Haojun Liao 已提交
266

B
Bomin Zhang 已提交
267
  terrno = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
268
  int32_t code = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
269
  void ** handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qId);
B
Bomin Zhang 已提交
270 271 272
  if (handle == NULL) {
    code = terrno;
    terrno = TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
273
  } else if (!checkQIdEqual(*handle, pRetrieve->qId)) {
H
Haojun Liao 已提交
274
    code = TSDB_CODE_QRY_INVALID_QHANDLE;
B
Bomin Zhang 已提交
275
  }
S
TD-1652  
Shengliang Guan 已提交
276

B
Bomin Zhang 已提交
277
  if (code != TSDB_CODE_SUCCESS) {
278 279
    vError("vgId:%d, invalid qId in retrieving result, code:%s, QInfo:%" PRIu64, pVnode->vgId, tstrerror(code),
           pRetrieve->qId);
H
Haojun Liao 已提交
280
    vnodeBuildNoResultQueryRsp(pRet);
H
Haojun Liao 已提交
281
    return code;
H
Haojun Liao 已提交
282
  }
H
Haojun Liao 已提交
283

284 285
  // kill current query and free corresponding resources.
  if (pRetrieve->free == 1) {
S
Shengliang Guan 已提交
286
    int32_t remain = atomic_sub_fetch_32(&pVnode->numOfQHandle, 1);
287 288
    vWarn("vgId:%d, QInfo:%" PRIx64 "-%p, retrieve msg received to kill query and free qhandle, remain qhandle:%d",
          pVnode->vgId, pRetrieve->qId, *handle, remain);
289

290 291 292 293 294 295 296
    qKillQuery(*handle);
    qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);

    vnodeBuildNoResultQueryRsp(pRet);
    code = TSDB_CODE_TSC_QUERY_CANCELLED;
    return code;
  }
H
Haojun Liao 已提交
297

298
  // register the qhandle to connect to quit query immediate if connection is broken
H
Haojun Liao 已提交
299
  if (vnodeNotifyCurrentQhandle(pRead->rpcHandle, pRetrieve->qId, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
300
    int32_t remain = atomic_sub_fetch_32(&pVnode->numOfQHandle, 1);
301 302
    vError("vgId:%d, QInfo:%" PRIu64 "-%p, retrieve discarded since link is broken, conn:%p, remain qhandle:%d",
           pVnode->vgId, pRetrieve->qhandle, *handle, pRead->rpcHandle, remain);
303

304
    code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
H
Haojun Liao 已提交
305
    qKillQuery(*handle);
S
TD-1652  
Shengliang Guan 已提交
306
    qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
307 308
    return code;
  }
309

H
Haojun Liao 已提交
310
  bool freeHandle = true;
S
TD-1652  
Shengliang Guan 已提交
311
  bool buildRes = false;
312

S
TD-1915  
Shengliang Guan 已提交
313
  code = qRetrieveQueryResultInfo(*handle, &buildRes, pRead->rpcHandle);
H
Haojun Liao 已提交
314
  if (code != TSDB_CODE_SUCCESS) {
S
TD-1652  
Shengliang Guan 已提交
315
    // TODO handle malloc failure
H
Haojun Liao 已提交
316
    pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
317
    pRet->len = sizeof(SRetrieveTableRsp);
H
Haojun Liao 已提交
318
    memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
H
Haojun Liao 已提交
319
    freeHandle = true;
S
TD-1652  
Shengliang Guan 已提交
320
  } else {  // result is not ready, return immediately
H
Haojun Liao 已提交
321
    // Only affects the non-blocking model
H
Haojun Liao 已提交
322
    if (!tsRetrieveBlockingModel) {
323 324 325 326 327
      if (!buildRes) {
        assert(pRead->rpcHandle != NULL);
        qReleaseQInfo(pVnode->qMgmt, (void **)&handle, false);
        return TSDB_CODE_QRY_NOT_READY;
      }
328 329
    }

H
Haojun Liao 已提交
330
    // ahandle is the sqlObj pointer
H
Haojun Liao 已提交
331
    code = vnodeDumpQueryResult(pRet, pVnode, pRetrieve->qId, handle, &freeHandle, pRead->rpcHandle);
H
Haojun Liao 已提交
332
  }
H
Haojun Liao 已提交
333

334 335
  // If qhandle is not added into vread queue, the query should be completed already or paused with error.
  // Here free qhandle immediately
H
Haojun Liao 已提交
336
  if (freeHandle) {
S
Shengliang Guan 已提交
337
    int32_t remain = atomic_sub_fetch_32(&pVnode->numOfQHandle, 1);
338
    vTrace("vgId:%d, QInfo:%p, start to free qhandle, remain qhandle:%d", pVnode->vgId, *handle, remain);
S
TD-1652  
Shengliang Guan 已提交
339
    qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
S
slguan 已提交
340
  }
341

S
slguan 已提交
342
  return code;
343 344
#endif 
  return 0;
S
slguan 已提交
345
}
H
Haojun Liao 已提交
346