safePython.py 3.6 KB
Newer Older
H
hjdhnx 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# File  : safePython.py
# Author: DaShenHan&道长-----先苦后甜,任凭晚风拂柳颜------
# Date  : 2022/8/28

import io
import tokenize

from func_timeout import func_set_timeout
from func_timeout.exceptions import FunctionTimedOut
from urllib.parse import urljoin,quote,unquote
import requests
import time
import json
import re
from lxml import etree
import datetime
import base64
from utils.log import logger

H
hjdhnx 已提交
22
time_out_sec = 8  # 安全执行python代码超时
H
hjdhnx 已提交
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
class my_exception(Exception):
    def __init__(self, message):
        self.message = message

    def __str__(self):
        message = f'函数执行超时: "{self.message}"'
        return message

@func_set_timeout(time_out_sec)
def excute(*args):
    exec(*args)

def check_unsafe_attributes(string):
    """
    安全检测需要exec执行的python代码
    :param string:
    :return:
    """
    g = tokenize.tokenize(io.BytesIO(string.encode('utf-8')).readline)
    pre_op = ''
    for toktype, tokval, _, _, _ in g:
        if toktype == tokenize.NAME and pre_op == '.' and tokval.startswith('_'):
            attr = tokval
            msg = "access to attribute '{0}' is unsafe.".format(attr)
            raise AttributeError(msg)
        elif toktype == tokenize.OP:
            pre_op = tokval

DEFAULT_PYTHON_CODE = """# 可用内置环境变量:
#  - log: log(message): 打印日志功能
#  - error: 弹出用户错误的弹窗
# 返回变量值: result = {...}\n\n
zyw_lists = env['hikerule.zyw.list'].with_context(active_test=True).sudo().search(
                    [('option', '=', 'zy'), ('cate_id.name', '!=', '18+'),('cate_id.is_bad', '!=', True)])
result = env['hikerule.zyw.list2data.wizard'].sudo().get_publish_value(zyw_lists)
"""

class safePython:
    def __init__(self,name, code):
        self.name = name or '未定义'
        self.code = code

    def action_task_exec(self,call=None,params=None):
        """
        接口调用执行函数
        :return:
        """
        if not params:
            params = []
        builtins = __builtins__
        builtins = dict(builtins).copy()
        for key in ['__import__','eval','exec','globals','dir','copyright','open','quit']:
            del builtins[key]  # 删除不安全的关键字
        # print(builtins)
        global_dict = {'__builtins__': builtins,
                       'requests': requests, 'urljoin':urljoin,'quote':quote,'unquote': unquote,
                       'log': logger.info, 'json': json,'print':print,
                       're':re,'etree':etree,'time':time,'datetime':datetime,'base64':base64
                       }  # 禁用内置函数,不允许导入包
        try:
            check_unsafe_attributes(self.code)
            localdict = {'result': None}
            # 待解决windows下运行超时的问题
            base_code = self.code.strip()
            if call:
                logger.info(f'开始执行:{call}')
            try:
                # excute(to_run_code, global_dict, localdict)
                excute(base_code, global_dict, localdict)
                run = localdict.get(call)
                if run:
                    localdict['result'] = run(*params)
            except FunctionTimedOut:
                raise my_exception(f'函数[{self.name}]运行时间超过{time_out_sec}秒,疑似死循环,已被系统切断')
        except Exception as e:
            ret = f'执行报错:{e}'
            logger.info(ret)
            return ret
        else:
            # print(global_dict)
            # print(localdict)
            ret = localdict['result']
            return ret