fastdeploy_lib.py 31.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469
# Copyright (c) 2022 VisualDL Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =======================================================================
import copy
import json
import os
import random
import re
import signal
import string
from collections import defaultdict
from subprocess import Popen
from subprocess import STDOUT

import google.protobuf.json_format as json_format
import google.protobuf.text_format as text_format
import psutil
import requests

from .proto.model_config_pb2 import ModelConfig
from visualdl.utils.dir import FASTDEPLOYSERVER_PATH


def pbtxt2json(content: str):
    '''
  Convert protocol messages in text format to json format string.
  '''
    message = text_format.Parse(content, ModelConfig())
    json_string = json_format.MessageToJson(message)
    return json_string


def json2pbtxt(content: str):
    '''
  Convert json format string to protocol messages in text format.
  '''
    message = json_format.Parse(content, ModelConfig())
    text_proto = text_format.MessageToString(message)
    return text_proto


def validate_data(model_config):
    '''
    Validate data in model config, we should check empty value recieved from front end.
    The easiest way to handle it is to drop empty value.
    Args:
        model_config: model config to be saved in config file
    Return:
        model config after filtering.
    '''
    model_config_filtered = {}
    for key, value in model_config.items():
        if value:
            model_config_filtered[key] = value
    return model_config_filtered


def analyse_config(cur_dir: str):
    '''
  Analyse the model config in specified directory.
  Return a json object to describe configuration.
  '''
    all_model_configs = {}
    all_model_versions = {}
    parent_dir, sub_dirs, filenames = os.walk(cur_dir).send(
        None)  # models can only put directory in model repository,
    # so we should only search depth 1 directories.
    for model_dir_name in sub_dirs:
        model_dir, model_sub_dirs, filenames = os.walk(
            os.path.join(parent_dir, model_dir_name)).send(None)
        model_name = os.path.basename(model_dir)
        config_filenames = []
        for filename in filenames:
            if '.pbtxt' in filename:
                config_filenames.append(
                    filename
                )  # filenames with extension .pbtxt are all config files
        if config_filenames:
            default_config_filename = config_filenames[0]
            if 'config.pbtxt' in config_filenames:
                default_config_filename = 'config.pbtxt'
                config_filenames.remove(default_config_filename)
                config_filenames.insert(0, default_config_filename)
            else:
                # if no config.pbtxt, we choose the first file in config_filenames list to create config.pbtxt
                copy_config_file_to_default_config(model_dir,
                                                   default_config_filename)
                default_config_filename = 'config.pbtxt'
                config_filenames.insert(0, default_config_filename)
            json_config = json.loads(
                pbtxt2json(
                    open(os.path.join(model_dir,
                                      default_config_filename)).read()))
            json_config["config_filenames"] = config_filenames[
                0]  # add config_filenames to config data (frontend developer said he only wanted one filename,
            # and to request config_filenames by get_config_filenames_for_one_model later)
            all_model_configs[
                model_name] = json_config  # store original config file content in json format
            json_config[
                'name'] = model_name  # because name in config data may be different from model_name,
            # model_name is model directory name actually, we should conform name with model_name.
        else:
            continue
        for model_sub_dir in model_sub_dirs:
            if re.match(
                    r'\d+',
                    model_sub_dir):  # version directory consists of numbers
                if model_name not in all_model_versions:
                    all_model_versions[model_name] = {}
                if model_sub_dir not in all_model_versions[model_name]:
                    all_model_versions[model_name][model_sub_dir] = []
                for version_resource_file in os.listdir(
                        os.path.join(model_dir, model_sub_dir)):
                    all_model_versions[model_name][model_sub_dir].append(
                        version_resource_file)
        if model_name not in all_model_versions:  # if a model has config but no version directory,
            # to convenient users, we create one
            all_model_versions[model_name] = {}
            os.mkdir(os.path.join(model_dir, '1'))
            all_model_versions[model_name]['1'] = []

    if not all_model_configs:
        raise Exception(
            'The path you choose is not a valid model repository, please choose a valid path.'
        )
    return all_model_configs, all_model_versions


def exchange_format_to_original_format(exchange_format):
    '''
  Change config exchange format to original format.
  '''
    ensembles = []
    models = []
    all_models = {}
    if 'ensembles' in exchange_format:
        ensembles = exchange_format['ensembles']
    if 'models' in exchange_format:
        models = exchange_format['models']
    alls = ensembles + models
    for model_config in alls:
        # 1. add 'executionAccelerators' keyword
        if 'optimization' in model_config:
            optimization_config = model_config['optimization']
            del model_config['optimization']
            model_config['optimization'] = {}
            model_config['optimization'][
                'executionAccelerators'] = optimization_config
        # 2. delete versions information
        if 'versions' in model_config:
            del model_config['versions']
        if 'config_filenames' in model_config:
            del model_config['config_filenames']
        if 'platform' in model_config and model_config[
                'platform'] == 'ensemble':  # emsemble model
            # 3. add 'ensembleScheduling' keyword
            if 'step' in model_config:
                step_configs = model_config['step']
                if 'ensembleScheduling' not in model_config:
                    model_config['ensembleScheduling'] = {}
                model_config['ensembleScheduling']['step'] = step_configs
                del model_config['step']
                # 4. remove two virtual models(feed, fetch), and
                #  "modelType", "inputModels", "outputModels", "inputVars", "outputVars"
                remove_list = []
                for model_config_in_step in step_configs:
                    if model_config_in_step[
                            'modelName'] == 'feed' or model_config_in_step[
                                'modelName'] == 'fetch':
                        remove_list.append(model_config_in_step)
                        continue
                    del model_config_in_step['modelType']
                    del model_config_in_step['inputModels']
                    del model_config_in_step['outputModels']
                    del model_config_in_step['inputVars']
                    del model_config_in_step['outputVars']
                for remove_item in remove_list:
                    step_configs.remove(remove_item)
        all_models[model_config['name']] = model_config
    return all_models


def copy_config_file_to_default_config(model_dir, config_name):
    json_config = json.loads(
        pbtxt2json(open(os.path.join(model_dir, config_name)).read()))
    model_name = os.path.basename(model_dir)
    json_config['name'] = model_name
    text_proto = json2pbtxt(json.dumps(json_config))
    with open(os.path.join(model_dir, 'config.pbtxt'), 'w') as f:
        f.write(text_proto)


def original_format_to_exchange_format(original_format, version_info):
    '''
  Change config original format to exchange format.
  '''
    exchange_format = {}
    exchange_format['ensembles'] = []
    exchange_format['models'] = []
    # 0. transform version info into component format in frontend
    for model_name, version_filenames_dict in version_info.items():
        version_info_for_frontend = []
        for version_name, filenames in version_filenames_dict.items():
            version_filenames_dict_for_frontend = {}
            version_filenames_dict_for_frontend['title'] = version_name
            version_filenames_dict_for_frontend['key'] = version_name
            version_filenames_dict_for_frontend['children'] = []
            for filename in filenames:
                version_filenames_dict_for_frontend['children'].append({
                    'title':
                    filename,
                    'key':
                    filename
                })
            version_info_for_frontend.append(
                version_filenames_dict_for_frontend)
        version_info[model_name] = version_info_for_frontend

    for model_name, model_config in original_format.items():
        # 1. remove 'executionAccelerators' keyword
        transformed_config = copy.deepcopy(model_config)
        if 'optimization' in model_config:
            if 'executionAccelerators' in model_config['optimization']:
                transformed_optimization_config = model_config['optimization'][
                    'executionAccelerators']
                del transformed_config['optimization']
                transformed_config[
                    'optimization'] = transformed_optimization_config
        # 2. add versions information
        if model_name in version_info:
            transformed_config['versions'] = version_info[model_name]
        if 'platform' in model_config and model_config[
                'platform'] == 'ensemble':  # emsemble model
            # 3. remove ensembleScheduling
            if 'ensembleScheduling' in model_config:
                if 'step' in model_config['ensembleScheduling']:
                    del transformed_config['ensembleScheduling']
                    transformed_config['step'] = model_config[
                        'ensembleScheduling']['step']
                    # 4. add two virtual models(feed, fetch), and
                    # "modelType", "inputModels", "outputModels", "inputVars", "outputVars"
                    for model_config_in_step in transformed_config['step']:
                        model_config_in_step['modelType'] = 'normal'
                        model_config_in_step['inputModels'] = []
                        model_config_in_step['outputModels'] = []
                        model_config_in_step['inputVars'] = []
                        model_config_in_step['outputVars'] = []

                    transformed_config['step'].append({
                        "modelName": "feed",
                        "modelType": "virtual",
                        "inputModels": [],
                        "outputModels": [],
                        "inputVars": [],
                        "outputVars": []
                    })
                    transformed_config['step'].append({
                        "modelName": "fetch",
                        "modelType": "virtual",
                        "inputModels": [],
                        "outputModels": [],
                        "inputVars": [],
                        "outputVars": []
                    })
                    analyse_step_relationships(transformed_config['step'],
                                               transformed_config['input'],
                                               transformed_config['output'])
                    exchange_format['ensembles'].append(transformed_config)
        elif 'backend' in model_config:  # single model
            exchange_format['models'].append(transformed_config)
    return exchange_format


def analyse_step_relationships(step_config, inputs, outputs):  # noqa: C901
    '''
  Analyse model relationships in ensemble step. And fill  \
    "inputModels", "outputModels", "inputVars", "outputVars" in step_config.
  step_config: step data in ensemble model config.
  inputs: inputs in ensemble model config.
  outputs: outputs in ensemble model config.
  '''
    models_dict = {}
    vars_dict = {}
    for model_config_in_step in step_config:
        models_dict[model_config_in_step['modelName']] = model_config_in_step
        if model_config_in_step['modelType'] == 'virtual':
            for var in inputs:
                if var['name'] not in vars_dict:
                    vars_dict[var['name']] = {}
                    vars_dict[var['name']]['from_models'] = set()
                    vars_dict[var['name']]['to_models'] = set()
                vars_dict[var['name']]['from_models'].add('feed')
            for var in outputs:
                if var['name'] not in vars_dict:
                    vars_dict[var['name']] = {}
                    vars_dict[var['name']]['from_models'] = set()
                    vars_dict[var['name']]['to_models'] = set()
                vars_dict[var['name']]['to_models'].add('fetch')
        else:
            for var_placehold_name, var_name in model_config_in_step[
                    'inputMap'].items():
                if var_name not in vars_dict:
                    vars_dict[var_name] = {}
                    vars_dict[var_name]['from_models'] = set()
                    vars_dict[var_name]['to_models'] = set()
                vars_dict[var_name]['to_models'].add(
                    model_config_in_step['modelName'])

            for var_placehold_name, var_name in model_config_in_step[
                    'outputMap'].items():
                if var_name not in vars_dict:
                    vars_dict[var_name] = {}
                    vars_dict[var_name]['from_models'] = set()
                    vars_dict[var_name]['to_models'] = set()
                vars_dict[var_name]['from_models'].add(
                    model_config_in_step['modelName'])
    for var_name, relationships in vars_dict.items():
        for from_model in relationships['from_models']:
            models_dict[from_model]['outputVars'].append(var_name)
            for var_to_model in relationships['to_models']:
                if var_to_model not in models_dict[from_model]['outputModels']:
                    models_dict[from_model]['outputModels'].append(
                        var_to_model)
        for to_model in relationships['to_models']:
            models_dict[to_model]['inputVars'].append(var_name)
            for var_from_model in relationships['from_models']:
                if var_from_model not in models_dict[to_model]['inputModels']:
                    models_dict[to_model]['inputModels'].append(var_from_model)
    calculate_layout_for_frontend(models_dict)


def get_config_filenames_for_one_model(cur_dir, name):
    _, _, filenames = os.walk(os.path.join(cur_dir, name)).send(None)
    config_filenames = []
    backup_config_filenames = []
    for filename in filenames:
        if '.pbtxt' in filename and 'vdlbackup' not in filename:
            config_filenames.append(
                filename
            )  # filenames with extension .pbtxt and not contain 'vdlbackup' are normal config files
        elif '.pbtxt' in filename and 'vdlbackup' in filename:
            backup_config_filenames.append(
                filename
            )  # filenames with extension .pbtxt and  contain 'vdlbackup' are backup config files
    config_filenames = sorted(config_filenames) + sorted(
        backup_config_filenames)
    return config_filenames


def get_config_for_one_model(cur_dir, name, config_filename):
    all_model_configs = {}
    all_model_versions = {}
    filename = os.path.join(cur_dir, name, config_filename)
    json_config = json.loads(pbtxt2json(open(filename).read()))
    json_config[
        'name'] = name  # because name in config data may be different from model_name,
    # model_name is model directory name actually, we should conform name with model_name.
    json_config["config_filenames"] = config_filename
    all_model_configs[
        name] = json_config  # store original config file content in json format
    all_model_versions[name] = {}
    for model_sub_dir in os.listdir(os.path.join(cur_dir, name)):
        if re.match(r'\d+',
                    model_sub_dir):  # version directory consists of numbers
            if model_sub_dir not in all_model_versions[name]:
                all_model_versions[name][model_sub_dir] = []
            for version_resource_file in os.listdir(
                    os.path.join(cur_dir, name, model_sub_dir)):
                all_model_versions[name][model_sub_dir].append(
                    version_resource_file)
    model_config = original_format_to_exchange_format(all_model_configs,
                                                      all_model_versions)
    if model_config['ensembles']:
        return model_config['ensembles'][0]
    elif model_config['models']:
        return model_config['models'][0]


def calculate_layout_for_frontend(model_config_in_step):
    '''
    Analyse model topology connections and prepare the positions for each model in layout.
    Dynamic program algorithm:
        depth(cur_node) = max([depth(prev_node) for prev_node in cur_node['inputModels']])
    Args:
        model_config_in_step(dict): model config in ensemble models' step, indexed by model name.
    Returns:
        None. Results calculated will be saved in place.
    '''
    path_depth = defaultdict(int)

    def depth_recursive(model):
        if model['modelName'] == 'feed':
            path_depth[model['modelName']] = 0
            return 0
        if path_depth[model['modelName']] != 0:
            return path_depth[model['modelName']]
        path_depth[model['modelName']] = max([
            depth_recursive(model_config_in_step[model_name]) for model_name in
            model_config_in_step[model['modelName']]['inputModels']
        ]) + 1
        return path_depth[model['modelName']]

    depth_recursive(model_config_in_step['fetch'])
    path_depth_tuple = [
        (k, v)
        for k, v in sorted(path_depth.items(), key=lambda item: item[1])
    ]
    cur_x = 0
    last_depth = -1
    for model_name, depth in path_depth_tuple:
        if depth == last_depth:
            model_config_in_step[model_name]['pos_y'] = depth
            model_config_in_step[model_name]['pos_x'] = cur_x
            cur_x += 1
        else:
            cur_x = 0
            model_config_in_step[model_name]['pos_y'] = depth
            model_config_in_step[model_name]['pos_x'] = cur_x
            cur_x += 1
        last_depth = depth
    return


def launch_process(kwargs: dict):
    '''
  Launch a fastdeploy server according to specified arguments.
  '''
    cmd = ['fastdeployserver']
    launch_env = os.environ.copy()
    start_args = {}
    for key, value in kwargs.items():
        if key == 'default_model_name':  # Used to fill client model_name automatically
            start_args[key] = value
            continue
        if key == 'server-name' or key == 'ensemble-img':  # extra information
            start_args[key] = value
            continue
        if key == 'gpus':
            if value:
                launch_env['CUDA_VISIBLE_DEVICES'] = value
                start_args[key] = value
            continue
        cmd.append('--{}'.format(key))
        cmd.append('{}'.format(value))
        start_args[key] = value
    if start_args['server-name'] and start_args['server-name'] in os.listdir(
            FASTDEPLOYSERVER_PATH):
        raise RuntimeError(
            "Failed to launch server,server name {} has been used,please write a different server name."
            .format(start_args['server-name']))
    all_model_configs, all_model_versions = analyse_config(
        start_args['model-repository'])
    model_repo_config = original_format_to_exchange_format(
        all_model_configs, all_model_versions)
    model_repo_config['ensemble-img'] = start_args['ensemble-img']
    logfilename = 'logfile-{}'.format(get_random_string(8))
    while os.path.exists(os.path.join(FASTDEPLOYSERVER_PATH, logfilename)):
        logfilename = 'logfile-{}'.format(get_random_string(8))
C
chenjian 已提交
470 471 472 473 474 475 476 477 478 479 480 481 482 483
    try:
        p = Popen(
            cmd,
            stdout=open(
                os.path.join(FASTDEPLOYSERVER_PATH, logfilename),
                'w',
                buffering=1),
            stderr=STDOUT,
            universal_newlines=True,
            env=launch_env)
    except Exception:
        raise RuntimeError(
            "Failed to launch fastdeployserver,please check fastdeployserver is installed in environment."
        )
484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796
    server_name = start_args['server-name'] if start_args[
        'server-name'] else p.pid
    with open(
            os.path.join(FASTDEPLOYSERVER_PATH, '{}'.format(server_name)),
            'w') as f:
        # filename ${server_name} contain 4 lines:
        # line1 : the real log filename ${logfilename}
        # line2 : pid
        # line3 : launch arguments
        # line4 : model-repository configuration
        f.write(logfilename + '\n' + str(p.pid) + '\n' +
                json.dumps(start_args) + '\n' + json.dumps(model_repo_config))
    return p


def get_random_string(length):
    # choose from all lowercase letter
    letters = string.ascii_lowercase
    result_str = ''.join([random.choice(letters) for i in range(length)])
    return result_str


def get_start_arguments(server_id):
    '''
    Get the start arguments for fastdeployserver process.
    Args:
        server_id(str): fastdeployserver process name
    Returns:
        args(dict): launch arguments when start fastdeployserver process.
    '''
    args = {}
    if os.path.exists(
            os.path.join(FASTDEPLOYSERVER_PATH, '{}'.format(server_id))):
        with open(
                os.path.join(FASTDEPLOYSERVER_PATH, '{}'.format(server_id)),
                'r') as f:
            arguments_json = f.read().split('\n')[2]
            args = json.loads(arguments_json)
    return args


def get_process_pid(server_id):
    '''
    Get the process id for fastdeployserver process.
    Args:
        server_id(str): fastdeployserver process name
    Returns:
        pid(int): process id.
    '''
    pid = None
    if os.path.exists(
            os.path.join(FASTDEPLOYSERVER_PATH, '{}'.format(server_id))):
        with open(
                os.path.join(FASTDEPLOYSERVER_PATH, '{}'.format(server_id)),
                'r') as f:
            pid = int(f.read().split('\n')[1])
    return pid


def get_process_logfile_name(server_id):
    '''
    Get the process logfile name for fastdeployserver process.
    Args:
        server_id(str): fastdeployserver process name
    Returns:
        logfile(str): logfile name.
    '''
    filename = None
    if os.path.exists(
            os.path.join(FASTDEPLOYSERVER_PATH, '{}'.format(server_id))):
        with open(
                os.path.join(FASTDEPLOYSERVER_PATH, '{}'.format(server_id)),
                'r') as f:
            filename = f.read().split('\n')[0]
    return filename


def get_process_model_configuration(server_id):
    '''
    Get the model repository configuration for fastdeployserver process.
    Args:
        server_id(str): fastdeployserver process name
    Returns:
        configuration(dict): model repository configuration
    '''
    conf = {}
    if os.path.exists(
            os.path.join(FASTDEPLOYSERVER_PATH, '{}'.format(server_id))):
        with open(
                os.path.join(FASTDEPLOYSERVER_PATH, '{}'.format(server_id)),
                'r') as f:
            conf_json = f.read().split('\n')[3]
            conf = json.loads(conf_json)
    return conf


def get_process_output(server_id, length):
    '''
  Get the standard output of a opened subprocess.
  '''
    if os.path.exists(
            os.path.join(FASTDEPLOYSERVER_PATH, '{}'.format(server_id))):
        logfilename = get_process_logfile_name(server_id)
        # delete file ${logfilename} if exists
        if os.path.exists(
                os.path.join(FASTDEPLOYSERVER_PATH, '{}'.format(logfilename))):
            with open(
                    os.path.join(FASTDEPLOYSERVER_PATH,
                                 '{}'.format(logfilename)), 'r') as f:
                f.seek(length)
                data = f.read()
                return data


def mark_pid_for_dead_process(server_id):
    '''
    Resource files for a dead server only deleted when user closes the server in frontend.
    When user close the server, pid recorded in logfile will be killed.
    In case a dead process id is reassigned for a new process, we should mark the pid recorded in logfile as outdated.
    Here, we choose to replace the pid to -1 in logfile to denote the zombie process \
        which has been polled and becomes dead.
    Args:
        server_id(str): fastdeployserver process name
    '''
    if os.path.exists(
            os.path.join(FASTDEPLOYSERVER_PATH, '{}'.format(server_id))):
        with open(
                os.path.join(FASTDEPLOYSERVER_PATH, '{}'.format(server_id)),
                'r') as f:
            contents = f.read().split('\n')
        contents[1] = '-1'  # we replace pid to -1
        with open(
                os.path.join(FASTDEPLOYSERVER_PATH, '{}'.format(server_id)),
                'w') as f:
            f.write('\n'.join(contents))


def delete_files_for_process(server_id):
    '''
    Delete logfile for fastdeployserver process.
    Args:
        server_id(str): fastdeployserver process name
    '''
    if os.path.exists(
            os.path.join(FASTDEPLOYSERVER_PATH, '{}'.format(server_id))):
        logfilename = get_process_logfile_name(server_id)
        # delete file ${logfilename} if exists
        if os.path.exists(
                os.path.join(FASTDEPLOYSERVER_PATH, '{}'.format(logfilename))):
            os.remove(
                os.path.join(FASTDEPLOYSERVER_PATH, '{}'.format(logfilename)))
        os.remove(os.path.join(FASTDEPLOYSERVER_PATH, '{}'.format(server_id)))


def kill_process(process):
    '''
  Stop a opened subprocess.
  '''
    if type(process) == str:  # server_id, use os.kill to terminate
        pid = get_process_pid(process)
        if pid == -1:  # we use -1 to mark dead process
            return
        try:
            os.kill(pid, signal.SIGKILL)
        except Exception:
            pass
    else:
        pid = process.pid
        process.kill()
        try:
            process.wait(10)
        except Exception:
            pass


def get_alive_fastdeploy_servers():
    '''
    Search server names in `FASTDEPLOYSERVER_PATH`, if process is dead and log still exists due to \
        some unexpectable reasons, delete log file.
    '''
    server_names = [
        name for name in os.listdir(FASTDEPLOYSERVER_PATH)
        if 'logfile' not in name
    ]
    should_delete_servers = []
    for server_name in server_names:
        if check_process_alive(server_name) is False:
            delete_files_for_process(server_name)
            should_delete_servers.append(server_name)
    for server_name in should_delete_servers:
        server_names.remove(server_name)
    return server_names


def check_process_zombie(server_id):
    '''
    Given a server id, check whether the process became zoombie and mark pid as -1.
    Args:
        server_id(str): fastdeployserver process name
    Return:
        status(bool): True if process became zoombie.
    '''
    pid = get_process_pid(server_id)
    if pid == -1:
        return True
    else:
        return False


def check_process_alive(server_id):
    '''
    Given a server id, check whether the process is alive or not.
    Args:
        server_id(str): fastdeployserver process name
    Return:
        status(bool): True if process is still alive.
    '''
    pid = get_process_pid(server_id)
    if pid is None:
        return False
    if pid == -1:  # We use -1 to mark zombie process which has been dead process.
        # Consider user wants to know the reason for dead process  due to exception,
        # we return True to let user in frontend can get the log for dead process.
        return True
    try:
        os.kill(pid, 0)
    except OSError:
        return False
    else:
        if 'fastdeployserve' not in psutil.Process(pid).name(
        ):  # We should judge the pid is fastdeployserver process, in case pid has been reassigned.
            # Note: I do not know why psutil.Process(pid).name() is fastdeployserve but not fastdeployserver.
            return False
        else:
            return True


_metric_column_name = {
    "Model": {
        "nv_inference_request_success", "nv_inference_request_failure",
        "nv_inference_count", "nv_inference_exec_count",
        "nv_inference_request_duration_us", "nv_inference_queue_duration_us",
        "nv_inference_compute_input_duration_us",
        "nv_inference_compute_infer_duration_us",
        "nv_inference_compute_output_duration_us"
    },
    "GPU": {
        "nv_gpu_power_usage", "nv_gpu_power_limit", "nv_energy_consumption",
        "nv_gpu_utilization", "nv_gpu_memory_total_bytes",
        "nv_gpu_memory_used_bytes"
    },
    "CPU": {
        "nv_cpu_utilization", "nv_cpu_memory_total_bytes",
        "nv_cpu_memory_used_bytes"
    }
}


def generate_metric_table(server_addr, server_port):  # noqa:C901
    model_table = {}
    gpu_table = {}
    try:
        res = requests.get("http://{}:{}/metrics".format(
            server_addr, server_port))
    except Exception:
        return None
    metric_content = res.text
    for content in metric_content.split('\n'):
        if content.startswith('#'):
            continue
        else:
            res = re.match(r'(\w+){(.*)} (\w+)',
                           content)  # match output by server metrics interface
            if not res:
                continue
            metric_name = res.group(1)
            model = res.group(2)
            value = res.group(3)
            infos = {}
            for info in model.split(','):
                k, v = info.split('=')
                v = v.strip('"')
                infos[k] = v
            if metric_name in [
                    "nv_inference_request_duration_us",
                    "nv_inference_queue_duration_us",
                    "nv_inference_compute_input_duration_us",
                    "nv_inference_compute_infer_duration_us",
                    "nv_inference_compute_output_duration_us"
            ]:
                value = float(value) / 1000
            elif metric_name in [
                    "nv_gpu_memory_total_bytes", "nv_gpu_memory_used_bytes"
            ]:
                value = float(value) / 1024 / 1024 / 1024
            for key, metric_names in _metric_column_name.items():
                if metric_name in metric_names:
                    if key == 'Model':
                        model_name = infos['model']
                        if model_name not in model_table:
                            model_table[model_name] = {}
                        model_table[model_name][metric_name] = value
                    elif key == 'GPU':
                        gpu_name = infos['gpu_uuid']
                        if gpu_name not in gpu_table:
                            gpu_table[gpu_name] = {}
                        gpu_table[gpu_name][metric_name] = value
                    elif key == 'CPU':
                        pass
    results = {}
    results['Model'] = model_table
    results['GPU'] = gpu_table
    return results