streamExec.c 15.3 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "streamInc.h"
L
Liu Jicong 已提交
17

H
Haojun Liao 已提交
18
// maximum allowed processed block batches. One block may include several submit blocks
19 20
#define MAX_STREAM_EXEC_BATCH_NUM 32
#define MIN_STREAM_EXEC_BATCH_NUM 8
21
#define MAX_STREAM_RESULT_DUMP_THRESHOLD  1000
5
54liuyao 已提交
22

23 24
static int32_t updateCheckPointInfo (SStreamTask* pTask);

25
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
26
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
27 28 29
  return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);
}

L
liuyao 已提交
30
bool streamTaskShouldPause(const SStreamStatus* pStatus) {
dengyihao's avatar
dengyihao 已提交
31
  int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus);
L
liuyao 已提交
32 33 34
  return (status == TASK_STATUS__PAUSE);
}

35 36
static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize,
                            int32_t* totalBlocks) {
37 38
  int32_t code = updateCheckPointInfo(pTask);
  if (code != TSDB_CODE_SUCCESS) {
39
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
40 41 42 43 44
    return code;
  }

  int32_t numOfBlocks = taosArrayGetSize(pRes);
  if (numOfBlocks > 0) {
45
    SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes);
46
    if (pStreamBlocks == NULL) {
47
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
48 49 50 51 52 53 54
      return -1;
    }

    qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, size/1048576.0);

    code = streamTaskOutputResultBlock(pTask, pStreamBlocks);
    if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position
55
      destroyStreamDataBlock(pStreamBlocks);
56 57
      return -1;
    }
58 59 60

    *totalSize += size;
    *totalBlocks += numOfBlocks;
61
  } else {
62
    taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
63 64 65 66 67
  }

  return TSDB_CODE_SUCCESS;
}

68
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) {
69 70
  int32_t code = TSDB_CODE_SUCCESS;
  void*   pExecutor = pTask->exec.pExecutor;
L
Liu Jicong 已提交
71

72 73 74 75 76
  *totalBlocks = 0;
  *totalSize = 0;

  int32_t size = 0;
  int32_t numOfBlocks = 0;
H
Haojun Liao 已提交
77
  SArray* pRes = NULL;
L
Liu Jicong 已提交
78 79

  while (1) {
H
Haojun Liao 已提交
80 81 82
    if (pRes == NULL) {
      pRes = taosArrayInit(4, sizeof(SSDataBlock));
    }
H
Haojun Liao 已提交
83

84
    if (streamTaskShouldStop(&pTask->status)) {
85
      taosArrayDestroy(pRes); // memory leak
L
Liu Jicong 已提交
86 87 88
      return 0;
    }

L
Liu Jicong 已提交
89 90
    SSDataBlock* output = NULL;
    uint64_t     ts = 0;
91
    if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
5
54liuyao 已提交
92
      if (code == TSDB_CODE_QRY_IN_EXEC) {
93
        resetTaskInfo(pExecutor);
5
54liuyao 已提交
94
      }
95 96

      qError("unexpected stream execution, s-task:%s since %s", pTask->id.idStr, terrstr());
L
Liu Jicong 已提交
97
      continue;
L
Liu Jicong 已提交
98
    }
99

100
    if (output == NULL) {
5
54liuyao 已提交
101
      if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
102 103
        SSDataBlock block = {0};

104
        const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*) pItem;
105
        ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
106

L
Liu Jicong 已提交
107
        assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
L
Liu Jicong 已提交
108
        block.info.type = STREAM_PULL_OVER;
L
Liu Jicong 已提交
109 110
        block.info.childId = pTask->selfChildId;
        taosArrayPush(pRes, &block);
H
Haojun Liao 已提交
111
        numOfBlocks += 1;
H
Haojun Liao 已提交
112
        qDebug("s-task:%s(child %d) processed retrieve, reqId:0x%" PRIx64, pTask->id.idStr, pTask->selfChildId,
L
Liu Jicong 已提交
113
               pRetrieveBlock->reqId);
114
      }
H
Haojun Liao 已提交
115

116 117
      break;
    }
L
Liu Jicong 已提交
118 119 120 121 122 123 124 125

    if (output->info.type == STREAM_RETRIEVE) {
      if (streamBroadcastToChildren(pTask, output) < 0) {
        // TODO
      }
      continue;
    }

L
Liu Jicong 已提交
126 127 128
    SSDataBlock block = {0};
    assignOneDataBlock(&block, output);
    block.info.childId = pTask->selfChildId;
H
Haojun Liao 已提交
129

130 131 132
    size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
    numOfBlocks += 1;

L
Liu Jicong 已提交
133
    taosArrayPush(pRes, &block);
H
Haojun Liao 已提交
134

135 136 137 138
    qDebug("s-task:%s (child %d) executed and get block, total blocks:%d, size:%.2fMiB", pTask->id.idStr,
           pTask->selfChildId, numOfBlocks, size / 1048576.0);

    // current output should be dispatched to down stream nodes
139 140 141
    if (numOfBlocks >= MAX_STREAM_RESULT_DUMP_THRESHOLD) {
      ASSERT(numOfBlocks == taosArrayGetSize(pRes));
      code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
142 143 144 145
      if (code != TSDB_CODE_SUCCESS) {
        return code;
      }

H
Haojun Liao 已提交
146
      pRes = NULL;
147 148
      size = 0;
      numOfBlocks = 0;
149
    }
150
  }
151

152 153 154
  if (numOfBlocks > 0) {
    ASSERT(numOfBlocks == taosArrayGetSize(pRes));
    code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
155 156
  } else {
    taosArrayDestroy(pRes);
L
Liu Jicong 已提交
157
  }
158

H
Haojun Liao 已提交
159
  return code;
L
Liu Jicong 已提交
160 161
}

162
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
dengyihao's avatar
dengyihao 已提交
163
  int32_t code = 0;
164

dengyihao's avatar
dengyihao 已提交
165
  ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
166
  void* exec = pTask->exec.pExecutor;
167

L
Liu Jicong 已提交
168
  qSetStreamOpOpen(exec);
L
Liu Jicong 已提交
169
  bool finished = false;
L
Liu Jicong 已提交
170

171 172 173 174 175 176 177 178 179
  while (1) {
    SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
    if (pRes == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    int32_t batchCnt = 0;
    while (1) {
L
liuyao 已提交
180
      if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
L
liuyao 已提交
181
        taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
L
Liu Jicong 已提交
182 183 184
        return 0;
      }

185 186 187
      SSDataBlock* output = NULL;
      uint64_t     ts = 0;
      if (qExecTask(exec, &output, &ts) < 0) {
5
54liuyao 已提交
188
        continue;
189
      }
L
Liu Jicong 已提交
190
      if (output == NULL) {
L
Liu Jicong 已提交
191 192 193 194 195
        if (qStreamRecoverScanFinished(exec)) {
          finished = true;
        } else {
          qSetStreamOpOpen(exec);
        }
L
Liu Jicong 已提交
196 197
        break;
      }
198 199 200 201 202 203

      SSDataBlock block = {0};
      assignOneDataBlock(&block, output);
      block.info.childId = pTask->selfChildId;
      taosArrayPush(pRes, &block);

L
Liu Jicong 已提交
204 205
      batchCnt++;

206
      qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, batchCnt, batchSz);
H
Haojun Liao 已提交
207 208 209
      if (batchCnt >= batchSz) {
        break;
      }
210
    }
H
Haojun Liao 已提交
211

212
    if (taosArrayGetSize(pRes) == 0) {
213 214
      if (finished) {
        taosArrayDestroy(pRes);
H
Haojun Liao 已提交
215
        qDebug("s-task:%s finish recover exec task ", pTask->id.idStr);
216 217
        break;
      } else {
H
Haojun Liao 已提交
218
        qDebug("s-task:%s continue recover exec task ", pTask->id.idStr);
219 220
        continue;
      }
221
    }
H
Haojun Liao 已提交
222

S
Shengliang Guan 已提交
223
    SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
224 225 226 227 228 229 230 231
    if (qRes == NULL) {
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }

    qRes->type = STREAM_INPUT__DATA_BLOCK;
    qRes->blocks = pRes;
232
    code = streamTaskOutputResultBlock(pTask, qRes);
dengyihao's avatar
dengyihao 已提交
233
    if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) {
dengyihao's avatar
dengyihao 已提交
234
      taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
dengyihao's avatar
dengyihao 已提交
235
      taosFreeQitem(qRes);
dengyihao's avatar
dengyihao 已提交
236 237
      return code;
    }
L
Liu Jicong 已提交
238 239

    if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
240
      qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt);
241
      streamDispatchStreamBlock(pTask);
L
Liu Jicong 已提交
242
    }
243 244 245 246

    if (finished) {
      break;
    }
247 248 249 250 251
  }
  return 0;
}

#if 0
252 253 254 255
int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
  // fetch all queue item, merge according to batchLimit
  int32_t numOfItems = taosReadAllQitems(pTask->inputQueue1, pTask->inputQall);
  if (numOfItems == 0) {
256
    qDebug("task: %d, stream task exec over, queue empty", pTask->id.taskId);
257 258 259 260 261 262 263 264 265 266
    return 0;
  }
  SStreamQueueItem* pMerged = NULL;
  SStreamQueueItem* pItem = NULL;
  taosGetQitem(pTask->inputQall, (void**)&pItem);
  if (pItem == NULL) {
    if (pMerged != NULL) {
      // process merged item
    } else {
      return 0;
267
    }
268
  }
269

270 271 272 273 274
  // if drop
  if (pItem->type == STREAM_INPUT__DESTROY) {
    // set status drop
    return -1;
  }
275

276
  if (pTask->taskLevel == TASK_LEVEL__SINK) {
277
    ASSERT(((SStreamQueueItem*)pItem)->type == STREAM_INPUT__DATA_BLOCK);
278
    streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pItem);
279 280
  }

281 282 283 284
  // exec impl

  // output
  // try dispatch
285 286
  return 0;
}
287
#endif
L
Liu Jicong 已提交
288

289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316
int32_t updateCheckPointInfo (SStreamTask* pTask) {
  int64_t ckId = 0;
  int64_t dataVer = 0;
  qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);

  SCheckpointInfo* pCkInfo = &pTask->chkInfo;
  if (ckId > pCkInfo->id) {  // save it since the checkpoint is updated
    qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64
           ", checkPoint id:%" PRId64 " -> %" PRId64, pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->id, ckId);

    pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pCkInfo->currentVer};

    taosWLockLatch(&pTask->pMeta->lock);

    streamMetaSaveTask(pTask->pMeta, pTask);
    if (streamMetaCommit(pTask->pMeta) < 0) {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr());
      return -1;
    } else {
      taosWUnLockLatch(&pTask->pMeta->lock);
      qDebug("s-task:%s update checkpoint ver succeed", pTask->id.idStr);
    }
  }

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
317
int32_t streamExecForAll(SStreamTask* pTask) {
dengyihao's avatar
dengyihao 已提交
318
  int32_t code = 0;
L
Liu Jicong 已提交
319
  while (1) {
320
    int32_t batchSize = 1;
L
liuyao 已提交
321
    int16_t times = 0;
322

323 324
    SStreamQueueItem* pInput = NULL;

325
    // merge multiple input data if possible in the input queue.
H
Haojun Liao 已提交
326 327
    qDebug("s-task:%s start to extract data block from inputQ", pTask->id.idStr);

L
Liu Jicong 已提交
328
    while (1) {
L
liuyao 已提交
329
      if (streamTaskShouldPause(&pTask->status)) {
L
liuyao 已提交
330 331 332 333 334
        if (batchSize > 1) {
          break;
        } else {
          return 0;
        }
L
liuyao 已提交
335
      }
336

L
Liu Jicong 已提交
337 338
      SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
      if (qItem == NULL) {
L
liuyao 已提交
339 340 341
        if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {
          times++;
          taosMsleep(1);
342
          qDebug("===stream===try again batchSize:%d", batchSize);
L
liuyao 已提交
343 344
          continue;
        }
345

L
liuyao 已提交
346
        qDebug("===stream===break batchSize:%d", batchSize);
L
Liu Jicong 已提交
347
        break;
L
Liu Jicong 已提交
348
      }
349 350 351

      if (pInput == NULL) {
        pInput = qItem;
352
        streamQueueProcessSuccess(pTask->inputQueue);
353
        if (pTask->taskLevel == TASK_LEVEL__SINK) {
L
Liu Jicong 已提交
354
          break;
L
Liu Jicong 已提交
355
        }
L
Liu Jicong 已提交
356
      } else {
357
        // todo we need to sort the data block, instead of just appending into the array list.
358 359
        void* newRet = NULL;
        if ((newRet = streamMergeQueueItem(pInput, qItem)) == NULL) {
L
Liu Jicong 已提交
360 361 362
          streamQueueProcessFail(pTask->inputQueue);
          break;
        } else {
363 364
          batchSize++;
          pInput = newRet;
L
Liu Jicong 已提交
365
          streamQueueProcessSuccess(pTask->inputQueue);
L
liuyao 已提交
366
          if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) {
H
Haojun Liao 已提交
367
            qDebug("maximum batch limit:%d reached, processing, %s", MAX_STREAM_EXEC_BATCH_NUM, pTask->id.idStr);
5
54liuyao 已提交
368 369
            break;
          }
L
Liu Jicong 已提交
370
        }
L
Liu Jicong 已提交
371 372
      }
    }
373

374
    if (streamTaskShouldStop(&pTask->status)) {
375 376 377
      if (pInput) {
        streamFreeQitem(pInput);
      }
378

L
Liu Jicong 已提交
379
      return 0;
L
Liu Jicong 已提交
380
    }
L
Liu Jicong 已提交
381

382
    if (pInput == NULL) {
L
Liu Jicong 已提交
383 384 385
      break;
    }

386
    if (pTask->taskLevel == TASK_LEVEL__SINK) {
387
      ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
388
      qDebug("s-task:%s sink task start to sink %d blocks", pTask->id.idStr, batchSize);
389
      streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
L
Liu Jicong 已提交
390
      continue;
L
Liu Jicong 已提交
391
    }
L
Liu Jicong 已提交
392

393 394 395 396 397 398 399
    // wait for the task to be ready to go
    while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
      int8_t status = atomic_load_8(&pTask->status.taskStatus);
      if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) {
        qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr,
               atomic_load_8(&pTask->status.taskStatus));
        taosMsleep(2);
400
      } else {
401
        break;
402 403
      }
    }
404

405 406
    int64_t st = taosGetTimestampMs();
    qDebug("s-task:%s start to execute, block batches:%d", pTask->id.idStr, batchSize);
H
Haojun Liao 已提交
407

408 409
    {
      // set input
410
      void* pExecutor = pTask->exec.pExecutor;
411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440

      const SStreamQueueItem* pItem = pInput;
      if (pItem->type == STREAM_INPUT__GET_RES) {
        const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput;
        qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
        ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
        const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
        qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
        qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit,
               pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
      } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
        const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;

        SArray* pBlockList = pBlock->blocks;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
        qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer);
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK);
      } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
        const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput;

        SArray* pBlockList = pMerged->submits;
        int32_t numOfBlocks = taosArrayGetSize(pBlockList);
        qDebug("s-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks);
        qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
      } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
        const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
        qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
      } else {
        ASSERT(0);
L
Liu Jicong 已提交
441
      }
442
    }
443

444 445 446
    int64_t resSize = 0;
    int32_t totalBlocks = 0;
    streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks);
L
Liu Jicong 已提交
447

448 449
    double  el = (taosGetTimestampMs() - st) / 1000.0;
    qDebug("s-task:%s exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", pTask->id.idStr, el, resSize / 1048576.0, totalBlocks);
450
    streamFreeQitem(pInput);
L
Liu Jicong 已提交
451
  }
452

L
Liu Jicong 已提交
453
  return 0;
L
Liu Jicong 已提交
454 455
}

L
Liu Jicong 已提交
456
int32_t streamTryExec(SStreamTask* pTask) {
457
  // this function may be executed by multi-threads, so status check is required.
L
Liu Jicong 已提交
458
  int8_t schedStatus =
459
      atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE);
460

L
Liu Jicong 已提交
461 462 463
  if (schedStatus == TASK_SCHED_STATUS__WAITING) {
    int32_t code = streamExecForAll(pTask);
    if (code < 0) {
464
      atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED);
L
Liu Jicong 已提交
465 466
      return -1;
    }
467

468
    // todo the task should be commit here
469
    atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
470
    qDebug("s-task:%s exec completed", pTask->id.idStr);
L
Liu Jicong 已提交
471

dengyihao's avatar
dengyihao 已提交
472 473
    if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) &&
        (!streamTaskShouldPause(&pTask->status))) {
L
Liu Jicong 已提交
474
      streamSchedExec(pTask);
L
Liu Jicong 已提交
475 476
    }
  }
477

L
Liu Jicong 已提交
478
  return 0;
L
Liu Jicong 已提交
479
}