streamExec.c 15.3 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "streamInc.h"
L
Liu Jicong 已提交
17

H
Haojun Liao 已提交
18 19
// maximum allowed processed block batches. One block may include several submit blocks
#define MAX_STREAM_EXEC_BATCH_NUM 128
L
liuyao 已提交
20
#define MIN_STREAM_EXEC_BATCH_NUM 16
21
#define MAX_STREAM_RESULT_DUMP_THRESHOLD  1000
5
54liuyao 已提交
22

23 24
static int32_t updateCheckPointInfo (SStreamTask* pTask);

25
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
26
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
27 28 29
  return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);
}

L
liuyao 已提交
30
bool streamTaskShouldPause(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
31
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
L
liuyao 已提交
32 33 34
  return (status == TASK_STATUS__PAUSE);
}

35 36
static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize,
                            int32_t* totalBlocks) {
37 38
  int32_t code = updateCheckPointInfo(pTask);
  if (code != TSDB_CODE_SUCCESS) {
39
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
40 41 42 43 44
    return code;
  }

  int32_t numOfBlocks = taosArrayGetSize(pRes);
  if (numOfBlocks > 0) {
45
    SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes);
46
    if (pStreamBlocks == NULL) {
47
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
48 49 50 51 52 53 54
      return -1;
    }

    qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, size/1048576.0);

    code = streamTaskOutputResultBlock(pTask, pStreamBlocks);
    if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position
55
      destroyStreamDataBlock(pStreamBlocks);
56 57
      return -1;
    }
58 59 60

    *totalSize += size;
    *totalBlocks += numOfBlocks;
61
  } else {
62
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
63 64 65 66 67
  }

  return TSDB_CODE_SUCCESS;
}

68
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) {
69 70
  int32_t code = TSDB_CODE_SUCCESS;
  void*   pExecutor = pTask->exec.pExecutor;
L
Liu Jicong 已提交
71

72 73 74 75 76
  *totalBlocks = 0;
  *totalSize = 0;

  int32_t size = 0;
  int32_t numOfBlocks = 0;
H
Haojun Liao 已提交
77
  SArray* pRes = NULL;
L
Liu Jicong 已提交
78 79

  while (1) {
H
Haojun Liao 已提交
80 81 82
    if (pRes == NULL) {
      pRes = taosArrayInit(4, sizeof(SSDataBlock));
    }
H
Haojun Liao 已提交
83

84
    if (streamTaskShouldStop(&pTask->status)) {
85
      taosArrayDestroy(pRes); // memory leak
L
Liu Jicong 已提交
86 87 88
      return 0;
    }

L
Liu Jicong 已提交
89 90
    SSDataBlock* output = NULL;
    uint64_t     ts = 0;
91
    if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
5
54liuyao 已提交
92
      if (code == TSDB_CODE_QRY_IN_EXEC) {
93
        resetTaskInfo(pExecutor);
5
54liuyao 已提交
94
      }
95 96

      qError("unexpected stream execution, s-task:%s since %s", pTask->id.idStr, terrstr());
L
Liu Jicong 已提交
97
      continue;
L
Liu Jicong 已提交
98
    }
99

100
    if (output == NULL) {
5
54liuyao 已提交
101
      if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
102 103
        SSDataBlock block = {0};

104
        const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*) pItem;
105
        ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
106

L
Liu Jicong 已提交
107
        assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
L
Liu Jicong 已提交
108
        block.info.type = STREAM_PULL_OVER;
L
Liu Jicong 已提交
109 110
        block.info.childId = pTask->selfChildId;
        taosArrayPush(pRes, &block);
L
Liu Jicong 已提交
111

H
Haojun Liao 已提交
112
        qDebug("s-task:%s(child %d) processed retrieve, reqId:0x%" PRIx64, pTask->id.idStr, pTask->selfChildId,
L
Liu Jicong 已提交
113
               pRetrieveBlock->reqId);
114
      }
H
Haojun Liao 已提交
115

116 117
      break;
    }
L
Liu Jicong 已提交
118 119 120 121 122 123 124 125

    if (output->info.type == STREAM_RETRIEVE) {
      if (streamBroadcastToChildren(pTask, output) < 0) {
        // TODO
      }
      continue;
    }

L
Liu Jicong 已提交
126 127 128
    SSDataBlock block = {0};
    assignOneDataBlock(&block, output);
    block.info.childId = pTask->selfChildId;
H
Haojun Liao 已提交
129

130 131 132
    size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
    numOfBlocks += 1;

L
Liu Jicong 已提交
133
    taosArrayPush(pRes, &block);
H
Haojun Liao 已提交
134

135 136 137 138
    qDebug("s-task:%s (child %d) executed and get block, total blocks:%d, size:%.2fMiB", pTask->id.idStr,
           pTask->selfChildId, numOfBlocks, size / 1048576.0);

    // current output should be dispatched to down stream nodes
139 140 141
    if (numOfBlocks >= MAX_STREAM_RESULT_DUMP_THRESHOLD) {
      ASSERT(numOfBlocks == taosArrayGetSize(pRes));
      code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
142 143 144 145
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

H
Haojun Liao 已提交
146
      pRes = NULL;
147 148
      size = 0;
      numOfBlocks = 0;
149
    }
150
  }
151

152 153 154 155 156 157
  if (numOfBlocks > 0) {
    ASSERT(numOfBlocks == taosArrayGetSize(pRes));
    code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
158 159
  } else {
    taosArrayDestroy(pRes);
L
Liu Jicong 已提交
160
  }
161

L
Liu Jicong 已提交
162 163 164
  return 0;
}

165
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
dengyihao's avatar
dengyihao 已提交
166
  int32_t code = 0;
167

dengyihao's avatar
dengyihao 已提交
168
  ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
169

170
  void* exec = pTask->exec.pExecutor;
171

L
Liu Jicong 已提交
172
  qSetStreamOpOpen(exec);
L
Liu Jicong 已提交
173
  bool finished = false;
L
Liu Jicong 已提交
174

175 176 177 178 179 180 181 182 183
  while (1) {
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
    if (pRes == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    int32_t batchCnt = 0;
    while (1) {
L
liuyao 已提交
184
      if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
L
liuyao 已提交
185
        taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
186 187 188
        return 0;
      }

189 190 191
      SSDataBlock* output = NULL;
      uint64_t     ts = 0;
      if (qExecTask(exec, &output, &ts) < 0) {
5
54liuyao 已提交
192
        continue;
193
      }
L
Liu Jicong 已提交
194
      if (output == NULL) {
L
Liu Jicong 已提交
195 196 197 198 199
        if (qStreamRecoverScanFinished(exec)) {
          finished = true;
        } else {
          qSetStreamOpOpen(exec);
        }
L
Liu Jicong 已提交
200 201
        break;
      }
202 203 204 205 206 207

      SSDataBlock block = {0};
      assignOneDataBlock(&block, output);
      block.info.childId = pTask->selfChildId;
      taosArrayPush(pRes, &block);

L
Liu Jicong 已提交
208 209
      batchCnt++;

210
      qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, batchCnt, batchSz);
H
Haojun Liao 已提交
211 212 213
      if (batchCnt >= batchSz) {
        break;
      }
214
    }
H
Haojun Liao 已提交
215

216
    if (taosArrayGetSize(pRes) == 0) {
217 218
      if (finished) {
        taosArrayDestroy(pRes);
H
Haojun Liao 已提交
219
        qDebug("s-task:%s finish recover exec task ", pTask->id.idStr);
220 221
        break;
      } else {
H
Haojun Liao 已提交
222
        qDebug("s-task:%s continue recover exec task ", pTask->id.idStr);
223 224
        continue;
      }
225
    }
H
Haojun Liao 已提交
226

S
Shengliang Guan 已提交
227
    SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
228 229 230 231 232 233 234 235
    if (qRes == NULL) {
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    qRes->type = STREAM_INPUT__DATA_BLOCK;
    qRes->blocks = pRes;
236
    code = streamTaskOutputResultBlock(pTask, qRes);
dengyihao's avatar
dengyihao 已提交
237
    if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
dengyihao's avatar
dengyihao 已提交
238
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
dengyihao's avatar
dengyihao 已提交
239
      taosFreeQitem(qRes);
dengyihao's avatar
dengyihao 已提交
240 241
      return code;
    }
L
Liu Jicong 已提交
242 243

    if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
244
      qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt);
245
      streamDispatchStreamBlock(pTask);
L
Liu Jicong 已提交
246
    }
247 248 249 250

    if (finished) {
      break;
    }
251 252 253 254 255
  }
  return 0;
}

#if 0
256 257 258 259
int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
  // fetch all queue item, merge according to batchLimit
  int32_t numOfItems = taosReadAllQitems(pTask->inputQueue1, pTask->inputQall);
  if (numOfItems == 0) {
260
    qDebug("task: %d, stream task exec over, queue empty", pTask->id.taskId);
261 262 263 264 265 266 267 268 269 270
    return 0;
  }
  SStreamQueueItem* pMerged = NULL;
  SStreamQueueItem* pItem = NULL;
  taosGetQitem(pTask->inputQall, (void**)&pItem);
  if (pItem == NULL) {
    if (pMerged != NULL) {
      // process merged item
    } else {
      return 0;
271
    }
272
  }
273

274 275 276 277 278
  // if drop
  if (pItem->type == STREAM_INPUT__DESTROY) {
    // set status drop
    return -1;
  }
279

280
  if (pTask->taskLevel == TASK_LEVEL__SINK) {
281
    ASSERT(((SStreamQueueItem*)pItem)->type == STREAM_INPUT__DATA_BLOCK);
282
    streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pItem);
283 284
  }

285 286 287 288
  // exec impl

  // output
  // try dispatch
289 290
  return 0;
}
291
#endif
L
Liu Jicong 已提交
292

293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
int32_t updateCheckPointInfo (SStreamTask* pTask) {
  int64_t ckId = 0;
  int64_t dataVer = 0;
  qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);

  SCheckpointInfo* pCkInfo = &pTask->chkInfo;
  if (ckId > pCkInfo->id) {  // save it since the checkpoint is updated
    qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64
           ", checkPoint id:%" PRId64 " -> %" PRId64, pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->id, ckId);

    pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pCkInfo->currentVer};

    taosWLockLatch(&pTask->pMeta->lock);

    streamMetaSaveTask(pTask->pMeta, pTask);
    if (streamMetaCommit(pTask->pMeta) < 0) {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr());
      return -1;
    } else {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qDebug("s-task:%s update checkpoint ver succeed", pTask->id.idStr);
    }
  }

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
321
int32_t streamExecForAll(SStreamTask* pTask) {
dengyihao's avatar
dengyihao 已提交
322
  int32_t code = 0;
L
Liu Jicong 已提交
323
  while (1) {
324
    int32_t batchSize = 1;
L
liuyao 已提交
325
    int16_t times = 0;
326

327 328
    SStreamQueueItem* pInput = NULL;

329
    // merge multiple input data if possible in the input queue.
H
Haojun Liao 已提交
330 331
    qDebug("s-task:%s start to extract data block from inputQ", pTask->id.idStr);

L
Liu Jicong 已提交
332
    while (1) {
L
liuyao 已提交
333 334 335
      if (streamTaskShouldPause(&pTask->status)) {
        return 0;
      }
336

L
Liu Jicong 已提交
337 338
      SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
      if (qItem == NULL) {
L
liuyao 已提交
339 340 341
        if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {
          times++;
          taosMsleep(1);
342
          qDebug("===stream===try again batchSize:%d", batchSize);
L
liuyao 已提交
343 344
          continue;
        }
345

L
liuyao 已提交
346
        qDebug("===stream===break batchSize:%d", batchSize);
L
Liu Jicong 已提交
347
        break;
L
Liu Jicong 已提交
348
      }
349 350 351

      if (pInput == NULL) {
        pInput = qItem;
352
        streamQueueProcessSuccess(pTask->inputQueue);
353
        if (pTask->taskLevel == TASK_LEVEL__SINK) {
L
Liu Jicong 已提交
354
          break;
L
Liu Jicong 已提交
355
        }
L
Liu Jicong 已提交
356
      } else {
357
        // todo we need to sort the data block, instead of just appending into the array list.
358 359
        void* newRet = NULL;
        if ((newRet = streamMergeQueueItem(pInput, qItem)) == NULL) {
L
Liu Jicong 已提交
360 361 362
          streamQueueProcessFail(pTask->inputQueue);
          break;
        } else {
363 364
          batchSize++;
          pInput = newRet;
L
Liu Jicong 已提交
365
          streamQueueProcessSuccess(pTask->inputQueue);
L
liuyao 已提交
366
          if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) {
H
Haojun Liao 已提交
367
            qDebug("maximum batch limit:%d reached, processing, %s", MAX_STREAM_EXEC_BATCH_NUM, pTask->id.idStr);
5
54liuyao 已提交
368 369
            break;
          }
L
Liu Jicong 已提交
370
        }
L
Liu Jicong 已提交
371 372
      }
    }
373

374
    if (streamTaskShouldStop(&pTask->status)) {
375 376 377
      if (pInput) {
        streamFreeQitem(pInput);
      }
378

L
Liu Jicong 已提交
379
      return 0;
L
Liu Jicong 已提交
380
    }
L
Liu Jicong 已提交
381

382
    if (pInput == NULL) {
L
Liu Jicong 已提交
383 384 385
      break;
    }

386
    if (pTask->taskLevel == TASK_LEVEL__SINK) {
387
      ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
388
      qDebug("s-task:%s sink node start to sink result. numOfBlocks:%d", pTask->id.idStr, batchSize);
389
      streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
L
Liu Jicong 已提交
390
      continue;
L
Liu Jicong 已提交
391
    }
L
Liu Jicong 已提交
392

393 394 395 396 397 398 399
    // wait for the task to be ready to go
    while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
      int8_t status = atomic_load_8(&pTask->status.taskStatus);
      if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) {
        qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr,
               atomic_load_8(&pTask->status.taskStatus));
        taosMsleep(2);
400
      } else {
401
        break;
402 403
      }
    }
404

405 406
    int64_t st = taosGetTimestampMs();
    qDebug("s-task:%s start to execute, block batches:%d", pTask->id.idStr, batchSize);
H
Haojun Liao 已提交
407

408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440
    {
      // set input
      void*   pExecutor = pTask->exec.pExecutor;

      const SStreamQueueItem* pItem = pInput;
      if (pItem->type == STREAM_INPUT__GET_RES) {
        const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput;
        qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
        ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
        const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
        qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
        qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit,
               pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
      } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
        const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;

        SArray* pBlockList = pBlock->blocks;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
        qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer);
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
        const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput;

        SArray* pBlockList = pMerged->submits;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
        qDebug("s-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks);
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
      } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
        const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
        qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else {
        ASSERT(0);
L
Liu Jicong 已提交
441
      }
442
    }
443

444 445 446
    int64_t resSize = 0;
    int32_t totalBlocks = 0;
    streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks);
L
Liu Jicong 已提交
447

448 449
    double  el = (taosGetTimestampMs() - st) / 1000.0;
    qDebug("s-task:%s exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", pTask->id.idStr, el, resSize / 1048576.0, totalBlocks);
450
    streamFreeQitem(pInput);
L
Liu Jicong 已提交
451
  }
452

L
Liu Jicong 已提交
453
  return 0;
L
Liu Jicong 已提交
454 455
}

L
Liu Jicong 已提交
456
int32_t streamTryExec(SStreamTask* pTask) {
457
  // this function may be executed by multi-threads, so status check is required.
L
Liu Jicong 已提交
458
  int8_t schedStatus =
459
      atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE);
460

L
Liu Jicong 已提交
461 462 463
  if (schedStatus == TASK_SCHED_STATUS__WAITING) {
    int32_t code = streamExecForAll(pTask);
    if (code < 0) {
464
      atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED);
L
Liu Jicong 已提交
465 466
      return -1;
    }
467

468
    // todo the task should be commit here
469
    atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
470
    qDebug("s-task:%s exec completed", pTask->id.idStr);
L
Liu Jicong 已提交
471

dengyihao's avatar
dengyihao 已提交
472 473
    if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) &&
        (!streamTaskShouldPause(&pTask->status))) {
L
Liu Jicong 已提交
474
      streamSchedExec(pTask);
L
Liu Jicong 已提交
475 476
    }
  }
477

L
Liu Jicong 已提交
478
  return 0;
L
Liu Jicong 已提交
479
}