未验证 提交 2328921f 编写于 作者: R Ren Wei (任卫) 提交者: GitHub

Cleanup the unused codes for samplecode testing (#32525)

* update testcases

* remove unused codes

* update the docstring for samcd_processor

* no need to import the six module

* 我也不知道为何有一个前导空格,但现在有单元测试,取消这个空格没啥问题

* add unittests for print_signatures; add the first case for 'required' mechanism when executing sample code testing

* there is no paddle installed in PR-CI-APPROVAL

test=document_fix
上级 5468de09
...@@ -84,6 +84,7 @@ function run_test_sampcd_processor() { ...@@ -84,6 +84,7 @@ function run_test_sampcd_processor() {
CUR_PWD=$(pwd) CUR_PWD=$(pwd)
cd ${PADDLE_ROOT}/tools cd ${PADDLE_ROOT}/tools
python test_sampcd_processor.py python test_sampcd_processor.py
python test_print_signatures.py
cd ${CUR_PWD} cd ${CUR_PWD}
} }
......
...@@ -25,9 +25,8 @@ import collections ...@@ -25,9 +25,8 @@ import collections
import sys import sys
import pydoc import pydoc
import hashlib import hashlib
import six import platform
import functools import functools
import paddle
member_dict = collections.OrderedDict() member_dict = collections.OrderedDict()
...@@ -131,7 +130,7 @@ def visit_member(parent_name, member, member_name=None): ...@@ -131,7 +130,7 @@ def visit_member(parent_name, member, member_name=None):
def is_primitive(instance): def is_primitive(instance):
int_types = (int, long) if six.PY2 else (int, ) int_types = (int, long) if platform.python_version()[0] == "2" else (int, )
pritimitive_types = int_types + (float, str) pritimitive_types = int_types + (float, str)
if isinstance(instance, pritimitive_types): if isinstance(instance, pritimitive_types):
return True return True
...@@ -189,6 +188,7 @@ def visit_all_module(mod): ...@@ -189,6 +188,7 @@ def visit_all_module(mod):
if __name__ == '__main__': if __name__ == '__main__':
import paddle
modules = sys.argv[1].split(",") modules = sys.argv[1].split(",")
for m in modules: for m in modules:
visit_all_module(importlib.import_module(m)) visit_all_module(importlib.import_module(m))
......
...@@ -19,8 +19,6 @@ import multiprocessing ...@@ -19,8 +19,6 @@ import multiprocessing
import math import math
import platform import platform
import inspect import inspect
#import paddle
#import paddle.fluid
import json import json
import argparse import argparse
import shutil import shutil
...@@ -28,8 +26,8 @@ import re ...@@ -28,8 +26,8 @@ import re
import logging import logging
""" """
please make sure to run in the tools path please make sure to run in the tools path
usage: python sample_test.py {arg1} usage: python sample_test.py {cpu or gpu}
arg1: the first arg defined running in gpu version or cpu version {cpu or gpu}: running in cpu version or gpu version
for example, you can run cpu version python2 testing like this: for example, you can run cpu version python2 testing like this:
...@@ -105,44 +103,6 @@ def check_indent(cdline): ...@@ -105,44 +103,6 @@ def check_indent(cdline):
return indent return indent
# srccom: raw comments in the source,including ''' and original indent
def sampcd_extract_and_run(srccom, name, htype="def", hname=""):
"""
Extract and run sample codes from source comment and
the result will be returned.
Args:
srccom(str): the source comment of some API whose
example codes will be extracted and run.
name(str): the name of the API.
htype(str): the type of hint banners, def/class/method.
hname(str): the name of the hint banners , e.t. def hname.
Returns:
result: True or False
name(str): the name of the API.
msg(str): messages
"""
sample_code_filenames = sampcd_extract_to_file(srccom, name, htype, hname)
if not sample_code_filenames:
return False, name, 'No sample code!'
results = []
msgs = []
for tfname in sample_code_filenames:
result, msg = execute_samplecode_test(tfname)
results.append(result)
msgs.append(msg)
if not all(results):
failed_fn = []
for i, result in enumerate(results):
if not result:
failed_fn.append(sample_code_filenames[i])
return False, name, 'failed sample codes: ' + ','.join(failed_fn)
return True, name, 'success!'
def sampcd_extract_to_file(srccom, name, htype="def", hname=""): def sampcd_extract_to_file(srccom, name, htype="def", hname=""):
""" """
Extract sample codes from __doc__, and write them to files. Extract sample codes from __doc__, and write them to files.
...@@ -158,8 +118,9 @@ def sampcd_extract_to_file(srccom, name, htype="def", hname=""): ...@@ -158,8 +118,9 @@ def sampcd_extract_to_file(srccom, name, htype="def", hname=""):
sample_code_filenames(list of str) sample_code_filenames(list of str)
""" """
global GPU_ID, RUN_ON_DEVICE, SAMPLECODE_TEMPDIR global GPU_ID, RUN_ON_DEVICE, SAMPLECODE_TEMPDIR
CODE_BLOCK_INTERDUCTORY = "code-block:: python"
sampcd_begins = find_all(srccom, " code-block:: python") sampcd_begins = find_all(srccom, CODE_BLOCK_INTERDUCTORY)
if len(sampcd_begins) == 0: if len(sampcd_begins) == 0:
# detect sample codes using >>> to format and consider this situation as wrong # detect sample codes using >>> to format and consider this situation as wrong
print(htype, " name:", hname) print(htype, " name:", hname)
...@@ -178,7 +139,7 @@ def sampcd_extract_to_file(srccom, name, htype="def", hname=""): ...@@ -178,7 +139,7 @@ def sampcd_extract_to_file(srccom, name, htype="def", hname=""):
sample_code_filenames = [] sample_code_filenames = []
for y in range(1, len(sampcd_begins) + 1): for y in range(1, len(sampcd_begins) + 1):
sampcd_begin = sampcd_begins[y - 1] sampcd_begin = sampcd_begins[y - 1]
sampcd = srccom[sampcd_begin + len(" code-block:: python") + 1:] sampcd = srccom[sampcd_begin + len(CODE_BLOCK_INTERDUCTORY) + 1:]
sampcd = sampcd.split("\n") sampcd = sampcd.split("\n")
# remove starting empty lines # remove starting empty lines
while sampcd[0].replace(' ', '').replace('\t', '') == '': while sampcd[0].replace(' ', '').replace('\t', '') == '':
...@@ -216,7 +177,18 @@ def sampcd_extract_to_file(srccom, name, htype="def", hname=""): ...@@ -216,7 +177,18 @@ def sampcd_extract_to_file(srccom, name, htype="def", hname=""):
return sample_code_filenames return sample_code_filenames
def execute_samplecode_test(tfname): def execute_samplecode(tfname):
"""
Execute a sample-code test.
Args:
tfname: the filename of the samplecode.
Returns:
result: success or not
tfname: same as the input argument
msg: the stdout output of the samplecode executing.
"""
result = True result = True
msg = None msg = None
if platform.python_version()[0] in ["2", "3"]: if platform.python_version()[0] in ["2", "3"]:
...@@ -226,6 +198,14 @@ def execute_samplecode_test(tfname): ...@@ -226,6 +198,14 @@ def execute_samplecode_test(tfname):
result = False result = False
exit(1) exit(1)
# check required envisonment
with open(tfname, 'r') as f:
for line in f.readlines():
if re.match(r'#\s*required\s*:\s*(distributed|gpu|skip)', line):
result = True
return result, tfname, '{} is skipped. cause: {}'.format(tfname,
line)
logging.info('running %s', tfname) logging.info('running %s', tfname)
print("\n----example code check----") print("\n----example code check----")
print("executing sample code .....", tfname) print("executing sample code .....", tfname)
...@@ -252,311 +232,9 @@ def execute_samplecode_test(tfname): ...@@ -252,311 +232,9 @@ def execute_samplecode_test(tfname):
print("----example code check success----\n") print("----example code check success----\n")
# msg is the returned code execution report # msg is the returned code execution report
return result, tfname, msg return result, tfname, msg
def single_defcom_extract(start_from, srcls, is_class_begin=False):
"""
to extract a def function/class/method comments body
Args:
start_from(int): the line num of "def" header
srcls(list): the source file in lines
is_class_begin(bool): whether the start_from is a beginning a class. \
For a sole class body itself may end up with its method if it has no
docstring. But the body of \
a common def function can only be ended up by a none-indented def/class
Returns:
string : the extracted comment body, inclusive of its quote marks.
"""
i = start_from
fcombody = "" # def comment body
comstart = -1 # the starting line index of comment mark "'''" or """"""
# if it is not -1, it indicates the loop is in the comment body
comstyle = 0 # comment mark style ,comments quoted with ''' is coded as 1
# comments quoted with """ is coded as 2
for x in range(i + 1, len(srcls)):
if is_class_begin:
if srcls[x].replace('\t', ' ').startswith(' def '):
break
if srcls[x].startswith('def ') or srcls[x].startswith('class '):
break
else:
if comstart == -1:
s = srcls[x].replace(" ", '').replace("\t",
'').replace("\n", '')
if s.startswith("\"\"\"") or s.startswith("r\"\"\""):
comstart = x
comstyle = 2
continue
if (comstyle == 2 and comstart != -1 and
srcls[x].replace(" ", '').replace("\t", '').replace(
"\n", '').startswith("\"\"\"")):
break
if comstart == -1:
s = srcls[x].replace(" ", '').replace("\t",
'').replace("\n", '')
if s.startswith("\'\'\'") or s.startswith("r\'\'\'"):
comstart = x
comstyle = 1
continue
if (comstyle == 1 and comstart != -1 and
srcls[x].replace(" ", '').replace("\t", '').replace(
"\n", '').startswith("\'\'\'")):
break
if (comstart !=
-1): # when the comments start, begin to add line to fcombody
fcombody += srcls[x]
return fcombody
def srccoms_extract(srcfile, wlist, methods):
"""
Given a source file ``srcfile``, this function will
extract its API(doc comments) and run sample codes in the
API.
Args:
srcfile(file): the source file
wlist(list): white list
methods(list): only elements of this list considered.
Returns:
result: True or False
error_methods: the methods that failed.
"""
process_result = True
error_methods = []
srcc = srcfile.read()
# 2. get defs and classes header line number
# set file pointer to its beginning
srcfile.seek(0, 0)
srcls = srcfile.readlines() # source lines
# 1. fetch__all__ list
allidx = srcc.find("__all__")
logger.debug('processing %s, methods: %s', srcfile.name, str(methods))
srcfile_new, _ = os.path.splitext(srcfile.name)
srcfile_list = srcfile_new.split('/')
srcfile_str = ''
for i in range(4, len(srcfile_list)):
srcfile_str = srcfile_str + srcfile_list[i] + '.'
if allidx != -1:
alllist = []
# get all list for layers/ops.py
if srcfile.name.find("fluid/layers/ops.py") != -1:
for ai in range(0, len(srcls)):
if srcls[ai].startswith("__all__"):
lb = srcls[ai].find('[')
rb = srcls[ai].find(']')
if lb == -1:
continue
allele = srcls[ai][lb + 1:rb].replace("'", '').replace(
" ", '').replace("\"", '')
alllist.append(allele)
if '' in alllist:
alllist.remove('')
else:
alllist_b = allidx + len("__all__")
allstr = srcc[alllist_b + srcc[alllist_b:].find("[") + 1:alllist_b +
srcc[alllist_b:].find("]")]
allstr = allstr.replace("\n", '').replace(" ", '').replace(
"'", '').replace("\"", '')
alllist = allstr.split(',')
if '' in alllist:
alllist.remove('')
api_alllist_count = len(alllist)
logger.debug('found %d items: %s', api_alllist_count, str(alllist))
api_count = 0
handled = []
# get src contents in layers/ops.py
if srcfile.name.find("fluid/layers/ops.py") != -1:
for i in range(0, len(srcls)):
opname = None
opres = re.match(r"^(\w+)\.__doc__", srcls[i])
if opres is not None:
opname = opres.group(1)
else:
opres = re.match(
r"^add_sample_code\(globals\(\)\[\"(\w+)\"\]", srcls[i])
if opres is not None:
opname = opres.group(1)
if opname is not None:
if opname in wlist:
logger.info('%s is in the whitelist, skip it.', opname)
continue
else:
logger.debug('%s\'s docstring found.', opname)
comstart = i
for j in range(i, len(srcls)):
if srcls[j].find("\"\"\"") != -1:
comstart = i
opcom = ""
for j in range(comstart + 1, len(srcls)):
opcom += srcls[j]
if srcls[j].find("\"\"\"") != -1:
break
result, _, _ = sampcd_extract_and_run(opcom, opname, "def",
opname)
if not result:
error_methods.append(opname)
process_result = False
api_count += 1
handled.append(
opname) # ops.py also has normal formatted functions
# use list 'handled' to mark the functions have been handled here
# which will be ignored in the following step
# handled what?
logger.debug('%s already handled.', str(handled))
for i in range(0, len(srcls)):
if srcls[i].startswith(
'def '): # a function header is detected in line i
f_header = srcls[i].replace(" ", '')
fn = f_header[len('def'):f_header.find('(')] # function name
if "%s%s" % (srcfile_str, fn) not in methods:
logger.info(
'[file:%s, function:%s] not in methods list, skip it.',
srcfile_str, fn)
continue
if fn in handled:
continue
if fn in alllist:
api_count += 1
if fn in wlist or fn + "@" + srcfile.name in wlist:
logger.info('[file:%s, function:%s] skip by wlist.',
srcfile_str, fn)
continue
fcombody = single_defcom_extract(i, srcls)
if fcombody == "": # if no comment
print("def name:", fn)
print("-----------------------")
print("WARNING: no comments in function ", fn,
", but it deserves.")
continue
else:
result, _, _ = sampcd_extract_and_run(fcombody, fn,
"def", fn)
if not result:
error_methods.append(fn)
process_result = False
if srcls[i].startswith('class '):
c_header = srcls[i].replace(" ", '')
cn = c_header[len('class'):c_header.find('(')] # class name
if '%s%s' % (srcfile_str, cn) not in methods:
logger.info(
'[file:%s, class:%s] not in methods list, skip it.',
srcfile_str, cn)
continue
if cn in handled:
continue
if cn in alllist:
api_count += 1
if cn in wlist or cn + "@" + srcfile.name in wlist:
logger.info('[file:%s, class:%s] skip by wlist.',
srcfile_str, cn)
continue
# class comment
classcom = single_defcom_extract(i, srcls, True)
if classcom != "":
result, _, _ = sampcd_extract_and_run(classcom, cn,
"class", cn)
if not result:
error_methods.append(cn)
process_result = False
else:
print("WARNING: no comments in class itself ", cn,
", but it deserves.\n")
# handling methods in class bodies
for x in range(
i + 1,
len(srcls)): # from the next line of class header
if (srcls[x].startswith('def ') or
srcls[x].startswith('class ')):
break
else:
# member method def header
srcls[x] = srcls[x].replace('\t', ' ')
if (srcls[x].startswith(
' def ')): # detect a mehtod header..
thisl = srcls[x]
indent = len(thisl) - len(thisl.lstrip())
mn = thisl[indent + len('def '):thisl.find(
'(')] # method name
name = cn + "." + mn # full name
if '%s%s' % (
srcfile_str, name
) not in methods: # class method not in api.spec
logger.info(
'[file:%s, func:%s] not in methods, skip it.',
srcfile_str, name)
continue
if mn.startswith('_'):
logger.info(
'[file:%s, func:%s] startswith _, it\'s private method, skip it.',
srcfile_str, name)
continue
if name in wlist or name + "@" + srcfile.name in wlist:
logger.info(
'[file:%s, class:%s] skip by wlist.',
srcfile_str, name)
continue
thismethod = [thisl[indent:]
] # method body lines
# get all the lines of a single method body
# into thismethod(list)
# and send it to single_defcom_extract
for y in range(x + 1, len(srcls)):
srcls[y] = srcls[y].replace('\t', ' ')
if (srcls[y].startswith('def ') or
srcls[y].startswith('class ')):
# end of method
break
elif srcls[y].startswith(' def '):
# end of method
break
else:
thismethod.append(srcls[y][indent:])
thismtdcom = single_defcom_extract(0,
thismethod)
if thismtdcom != "":
result, _, _ = sampcd_extract_and_run(
thismtdcom, name, "method", name)
if not result:
error_methods.append(name)
process_result = False
else:
logger.warning('__all__ not found in file:%s', srcfile.name)
return process_result, error_methods
def test(file_list):
global methods # readonly
process_result = True
for file in file_list:
with open(file, 'r') as src:
if not srccoms_extract(src, wlist, methods):
process_result = False
return process_result
def run_a_test(tc_filename):
"""
execute a sample code-block.
"""
global methods # readonly
process_result = True
with open(tc_filename, 'r') as src:
process_result, error_methods = srccoms_extract(src, wlist, methods)
return process_result, tc_filename, error_methods
def get_filenames(): def get_filenames():
''' '''
this function will get the sample code files that pending for check. this function will get the sample code files that pending for check.
...@@ -593,6 +271,15 @@ def get_filenames(): ...@@ -593,6 +271,15 @@ def get_filenames():
def get_api_md5(path): def get_api_md5(path):
"""
read the api spec file, and scratch the md5sum value of every api's docstring.
Args:
path: the api spec file. ATTENTION the path relative
Returns:
api_md5(dict): key is the api's real fullname, value is the md5sum.
"""
api_md5 = {} api_md5 = {}
API_spec = '%s/%s' % (os.path.abspath(os.path.join(os.getcwd(), "..")), API_spec = '%s/%s' % (os.path.abspath(os.path.join(os.getcwd(), "..")),
path) path)
...@@ -737,8 +424,7 @@ if __name__ == '__main__': ...@@ -737,8 +424,7 @@ if __name__ == '__main__':
if args.threads: if args.threads:
threads = args.threads threads = args.threads
po = multiprocessing.Pool(threads) po = multiprocessing.Pool(threads)
# results = po.map_async(test, divided_file_list) results = po.map_async(execute_samplecode, filenames.keys())
results = po.map_async(execute_samplecode_test, filenames.keys())
po.close() po.close()
po.join() po.join()
......
#! /usr/bin/env python
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
TestCases for print_signatures.py
sample lines from API_DEV.spec:
paddle.autograd.backward (ArgSpec(args=['tensors', 'grad_tensors', 'retain_graph'], varargs=None, keywords=None, defaults=(None, False)), ('document', '33a4434c9d123331499334fbe0274870'))
paddle.autograd.PyLayer (paddle.autograd.py_layer.PyLayer, ('document', 'c26adbbf5f1eb43d16d4a399242c979e'))
paddle.autograd.PyLayer.apply (ArgSpec(args=['cls'], varargs=args, keywords=kwargs, defaults=None), ('document', 'cb78696dc032fb8af2cba8504153154d'))
"""
import unittest
import hashlib
import inspect
import functools
from print_signatures import md5
from print_signatures import get_functools_partial_spec
from print_signatures import format_spec
from print_signatures import queue_dict
from print_signatures import member_dict
def func_example(param_a, param_b):
"""
example function
"""
pass
def func_example_2(func=functools.partial(func_example, 1)):
"""
example function 2
"""
pass
class ClassExample():
"""
example Class
"""
def example_method(self):
"""
class method
"""
pass
class Test_all_in_print_signatures(unittest.TestCase):
def test_md5(self):
algo = hashlib.md5()
algo.update(func_example.__doc__.encode('utf-8'))
digest = algo.hexdigest()
self.assertEqual(digest, md5(func_example.__doc__))
def test_get_functools_partial_spec(self):
partailed_func = functools.partial(func_example, 1)
# args = inspect.getargspec(partailed_func)
self.assertEqual('func_example(args=(1,), keywords={})',
get_functools_partial_spec(partailed_func))
class Test_format_spec(unittest.TestCase):
def test_normal_func_spec(self):
args = inspect.getargspec(func_example)
self.assertEqual(
'''ArgSpec(args=['param_a', 'param_b'], varargs=None, keywords=None, defaults=None)''',
format_spec(args))
def test_func_spec_with_partialedfunc_as_param_default(self):
# but there is no function belongs to this type in API_DEV.spec
args = inspect.getargspec(func_example_2)
self.assertEqual(
'''ArgSpec(args=['func'], varargs=None, keywords=None, defaults=('func_example(args=(1,), keywords={})',))''',
format_spec(args))
class Test_queue_dict(unittest.TestCase):
pass
if __name__ == '__main__':
unittest.main()
...@@ -22,12 +22,13 @@ import sys ...@@ -22,12 +22,13 @@ import sys
import importlib import importlib
from sampcd_processor import find_all from sampcd_processor import find_all
from sampcd_processor import check_indent from sampcd_processor import check_indent
from sampcd_processor import sampcd_extract_and_run
from sampcd_processor import single_defcom_extract
from sampcd_processor import srccoms_extract
from sampcd_processor import get_api_md5 from sampcd_processor import get_api_md5
from sampcd_processor import get_incrementapi from sampcd_processor import get_incrementapi
from sampcd_processor import get_wlist from sampcd_processor import get_wlist
from sampcd_processor import sampcd_extract_to_file
from sampcd_processor import execute_samplecode
SAMPLECODE_TEMP_DIR = 'samplecode_temp'
class Test_find_all(unittest.TestCase): class Test_find_all(unittest.TestCase):
...@@ -53,107 +54,95 @@ class Test_check_indent(unittest.TestCase): ...@@ -53,107 +54,95 @@ class Test_check_indent(unittest.TestCase):
self.assertEqual(4, check_indent("\thello paddle")) self.assertEqual(4, check_indent("\thello paddle"))
class Test_sampcd_extract_and_run(unittest.TestCase): class Test_execute_samplecode(unittest.TestCase):
def setUp(self):
if not os.path.exists(SAMPLECODE_TEMP_DIR):
os.mkdir(SAMPLECODE_TEMP_DIR)
self.successSampleCodeFile = os.path.join(SAMPLECODE_TEMP_DIR,
'samplecode_success.py')
with open(self.successSampleCodeFile, 'w') as f:
f.write('print(1+1)')
self.failedSampleCodeFile = os.path.join(SAMPLECODE_TEMP_DIR,
'samplecode_failed.py')
with open(self.failedSampleCodeFile, 'w') as f:
f.write('print(1/0)')
def tearDown(self):
os.remove(self.successSampleCodeFile)
os.remove(self.failedSampleCodeFile)
def test_run_success(self):
result, tfname, msg = execute_samplecode(self.successSampleCodeFile)
self.assertTrue(result)
self.assertEqual(self.successSampleCodeFile, tfname)
self.assertIsNotNone(msg)
self.assertLess(msg.find('skipped'), 0)
def test_run_failed(self):
result, tfname, msg = execute_samplecode(self.failedSampleCodeFile)
self.assertFalse(result)
self.assertEqual(self.failedSampleCodeFile, tfname)
self.assertIsNotNone(msg)
self.assertLess(msg.find('skipped'), 0)
def test_testcases_skipped(self):
...
tfname = os.path.join(SAMPLECODE_TEMP_DIR, 'samplecode_skipped.py')
with open(tfname, 'w') as f:
f.write("# required: distributed\nprint(1/0)")
result, _, msg = execute_samplecode(tfname)
self.assertTrue(result)
self.assertGreaterEqual(msg.find('skipped'), 0)
os.remove(tfname)
class Test_sampcd_extract_to_file(unittest.TestCase):
def setUp(self): def setUp(self):
if not os.path.exists('samplecode_temp/'): if not os.path.exists(SAMPLECODE_TEMP_DIR):
os.mkdir('samplecode_temp/') os.mkdir(SAMPLECODE_TEMP_DIR)
def tearDown(self):
shutil.rmtree(SAMPLECODE_TEMP_DIR)
def test_run_a_defs_samplecode(self): def test_1_samplecode(self):
comments = """ comments = """
Examples: Examples:
.. code-block:: python .. code-block:: python
print(1+1) print(1+1)
""" """
funcname = 'one_plus_one' funcname = 'one_plus_one'
res, name, msg = sampcd_extract_and_run(comments, funcname) sample_code_filenames = sampcd_extract_to_file(comments, funcname)
self.assertTrue(res) self.assertCountEqual(
self.assertEqual(funcname, name) [os.path.join(SAMPLECODE_TEMP_DIR, funcname + '_example.py')],
sample_code_filenames)
def test_run_a_def_no_code(self): def test_no_samplecode(self):
comments = """ comments = """
placeholder placeholder
""" """
funcname = 'one_plus_one' funcname = 'one_plus_one'
res, name, msg = sampcd_extract_and_run(comments, funcname) sample_code_filenames = sampcd_extract_to_file(comments, funcname)
self.assertFalse(res) self.assertCountEqual([], sample_code_filenames)
self.assertEqual(funcname, name)
def test_run_a_def_raise_expection(self): def test_2_samplecodes(self):
comments = """ comments = """
placeholder placeholder
Examples: Examples:
.. code-block:: python .. code-block:: python
print(1/0)
"""
funcname = 'one_plus_one'
res, name, msg = sampcd_extract_and_run(comments, funcname)
self.assertFalse(res)
self.assertEqual(funcname, name)
class Test_single_defcom_extract(unittest.TestCase): print(1/0)
def test_extract_from_func(self):
defstr = '''
import os
def foo():
"""
foo is a function.
"""
pass
def bar():
pass
'''
comm = single_defcom_extract(
2, defstr.splitlines(True), is_class_begin=False)
self.assertEqual(" foo is a function.\n", comm)
pass
def test_extract_from_func_with_no_docstring(self): .. code-block:: python
defstr = '''
import os
def bar():
pass
'''
comm = single_defcom_extract(
2, defstr.splitlines(True), is_class_begin=False)
self.assertEqual('', comm)
pass
def test_extract_from_class(self): print(1+1)
defstr = r'''
import os
class Foo():
"""
Foo is a class.
second line.
""" """
pass funcname = 'one_plus_one'
def bar(): sample_code_filenames = sampcd_extract_to_file(comments, funcname)
pass self.assertCountEqual([
def foo(): os.path.join(SAMPLECODE_TEMP_DIR, funcname + '_example_1.py'),
pass os.path.join(SAMPLECODE_TEMP_DIR, funcname + '_example_2.py')
''' ], sample_code_filenames)
comm = single_defcom_extract(
2, defstr.splitlines(True), is_class_begin=True)
rcomm = """ Foo is a class.
second line.
"""
self.assertEqual(rcomm, comm)
pass
def test_extract_from_class_with_no_docstring(self):
defstr = '''
import os
class Foo():
pass
def bar():
pass
def foo():
pass
'''
comm = single_defcom_extract(
0, defstr.splitlines(True), is_class_begin=True)
self.assertEqual('', comm)
class Test_get_api_md5(unittest.TestCase): class Test_get_api_md5(unittest.TestCase):
...@@ -268,181 +257,6 @@ class Test_get_wlist(unittest.TestCase): ...@@ -268,181 +257,6 @@ class Test_get_wlist(unittest.TestCase):
self.assertCountEqual(["deformable_conv"], gpu_not_white) self.assertCountEqual(["deformable_conv"], gpu_not_white)
class Test_srccoms_extract(unittest.TestCase):
def setUp(self):
self.tmpDir = tempfile.mkdtemp()
print('tmpDir=', self.tmpDir)
self.opsDir = os.path.join(self.tmpDir, 'fluid/layers')
os.makedirs(self.opsDir)
sys.path.append(self.opsDir)
sys.path.append(self.tmpDir)
self.api_pr_spec_filename = os.path.abspath(
os.path.join(os.getcwd(), "..", 'paddle/fluid/API_PR.spec'))
with open(self.api_pr_spec_filename, 'w') as f:
f.write("\n".join([
"""one_plus_one (ArgSpec(args=[], varargs=None, keywords=None, defaults=(,)), ('document', "one_plus_one"))""",
"""two_plus_two (ArgSpec(args=[], varargs=None, keywords=None, defaults=(,)), ('document', "two_plus_two"))""",
"""three_plus_three (ArgSpec(args=[], varargs=None, keywords=None, defaults=(,)), ('document', "three_plus_three"))""",
"""four_plus_four (ArgSpec(args=[], varargs=None, keywords=None, defaults=(,)), ('document', "four_plus_four"))""",
]))
def tearDown(self):
#sys.path.remove(self.tmpDir)
shutil.rmtree(self.tmpDir)
os.remove(self.api_pr_spec_filename)
def test_from_ops_py(self):
filecont = '''
def add_sample_code(obj, docstr):
pass
__unary_func__ = [
'exp',
]
__all__ = []
__all__ += __unary_func__
__all__ += ['one_plus_one']
def exp():
pass
add_sample_code(globals()["exp"], r"""
Examples:
.. code-block:: python
# import paddle
# x = paddle.to_tensor([-0.4, -0.2, 0.1, 0.3])
# out = paddle.exp(x)
out = [0.67032005, 0.81873075, 1.10517092, 1.34985881]
print(out)
# [0.67032005 0.81873075 1.10517092 1.34985881]
""")
def one_plus_one():
return 1+1
one_plus_one.__doc__ = """
placeholder
Examples:
.. code-block:: python
print(1+1)
"""
__all__ += ['two_plus_two']
def two_plus_two():
return 2+2
add_sample_code(globals()["two_plus_two"], """
Examples:
.. code-block:: python
print(2+2)
""")
'''
pyfilename = os.path.join(self.opsDir, 'ops.py')
with open(pyfilename, 'w') as pyfile:
pyfile.write(filecont)
self.assertTrue(os.path.exists(pyfilename))
utsp = importlib.import_module('ops')
print('testing srccoms_extract from ops.py')
methods = ['one_plus_one', 'two_plus_two', 'exp']
# os.remove("samplecode_temp/" "one_plus_one_example.py")
self.assertFalse(
os.path.exists("samplecode_temp/"
"one_plus_one_example.py"))
with open(pyfilename, 'r') as pyfile:
res, error_methods = srccoms_extract(pyfile, [], methods)
self.assertTrue(res)
self.assertTrue(
os.path.exists("samplecode_temp/"
"one_plus_one_example.py"))
os.remove("samplecode_temp/" "one_plus_one_example.py")
self.assertTrue(
os.path.exists("samplecode_temp/"
"two_plus_two_example.py"))
os.remove("samplecode_temp/" "two_plus_two_example.py")
self.assertTrue(os.path.exists("samplecode_temp/" "exp_example.py"))
os.remove("samplecode_temp/" "exp_example.py")
def test_from_not_ops_py(self):
filecont = '''
__all__ = [
'one_plus_one'
]
def one_plus_one():
"""
placeholder
Examples:
.. code-block:: python
print(1+1)
"""
return 1+1
'''
pyfilename = os.path.join(self.tmpDir, 'opo.py') # not ops.py
with open(pyfilename, 'w') as pyfile:
pyfile.write(filecont)
utsp = importlib.import_module('opo')
methods = ['one_plus_one']
with open(pyfilename, 'r') as pyfile:
res, error_methods = srccoms_extract(pyfile, [], methods)
self.assertTrue(res)
expectedFile = os.path.join("samplecode_temp",
"one_plus_one_example.py")
self.assertTrue(os.path.exists(expectedFile))
os.remove(expectedFile)
def test_with_empty_wlist(self):
"""
see test_from_ops_py
"""
pass
def test_with_wlist(self):
filecont = '''
__all__ = [
'four_plus_four',
'three_plus_three'
]
def four_plus_four():
"""
placeholder
Examples:
.. code-block:: python
print(4+4)
"""
return 4+4
def three_plus_three():
"""
placeholder
Examples:
.. code-block:: python
print(3+3)
"""
return 3+3
'''
pyfilename = os.path.join(self.tmpDir, 'three_and_four.py')
with open(pyfilename, 'w') as pyfile:
pyfile.write(filecont)
utsp = importlib.import_module('three_and_four')
methods = ['four_plus_four', 'three_plus_three']
with open(pyfilename, 'r') as pyfile:
res, error_methods = srccoms_extract(pyfile, ['three_plus_three'],
methods)
self.assertTrue(res)
expectedFile = os.path.join("samplecode_temp",
"four_plus_four_example.py")
self.assertTrue(os.path.exists(expectedFile))
os.remove(expectedFile)
self.assertFalse(os.path.exists(expectedFile))
# https://github.com/PaddlePaddle/Paddle/blob/develop/python/paddle/fluid/layers/ops.py # https://github.com/PaddlePaddle/Paddle/blob/develop/python/paddle/fluid/layers/ops.py
# why? unabled to use the ast module. emmmmm # why? unabled to use the ast module. emmmmm
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册