Verified Commit d872e23d authored by w-mj's avatar w-mj

Merge branch 'learning_tracker' of https://gitlab.educg.net/zhangmingyuan/PyGrading

parents f75b0d1f 122c7f08
Pipeline #138 failed with stages
in 0 seconds
<h1>
CG Learning Tracker
</h1>
针对PyGrading测评框架的测评数据采集工具,既支持对整个测评流程和每个测试用例的数据自动采集,也支持添加自定义采集数据
## 采集配置
采集配置来源于Job配置和环境变量(同时存在job配置项和环境变量时优先使用job配置项),现支持配置项如下:
| 配置项 | 含义| 是否可缺省| 缺省值|
| ---------- | -----------| -----------| -----------|
| track_grading_enable | 是否自动采集整体测评数据,该项为False时自动禁用测试用例的数据采集| 是| False|
| track_testcase_enable | 是否自动采集每个测试用例的测评数据| 是| True|
| track_lrs_endpoint | 接收采集数据的LRS服务地址| 否| -|
| track_jwt_secret | 用于JWT签名的密钥| 是| -|
| track_jwt | 用于验证的JWT| 是| -|
| track_debug_enable | 是否打印调试信息| 是| False|
| track_http_enable | 是否将采集的数据自动发送至LRS| 是| True|
| track_file_dir | 附件本地保存目录| 是| /var/tmp/cg_learning|
| track_jwt_token | 用于验证的JWT| 是| -|
| track_root_statement | 本次测评的来源Statement| 是| -|
| track_root_activity | 本次测评的Activity| 是| http://learning.educg.net/activity#program_project|
## 可自动采集的数据
整体测评:
1. 测评配置数据(即job.get_config())
2. 测评消耗的时间
3. 测评的结果
4. 测评的错误信息(即在pre_work、run或post_work中抛出的异常)
用例测评:
1. 测评消耗的时间
2. 测评的结果
3. 测评的错误信息
## Q&A
### 1.如何在整体测评或用例测评中添加自定义的采集数据?
采集工具会自动采集整体或用例的测评结果,因此在`run`中直接返回需要采集的数据即可
或者可先通过`get_current_statement()`方法获取当前采集Statement,使用`add_result_extension`方法添加结果数据、使用`add_attachment`方法添加附件、使用`add_context_extensions`方法添加上下文信息
具体样例可参考测试文件中的`test_with_custom_statistics`测试用例
### 2.如何添加自定义的采集事件?
可先通过`new_statement_instance`方法生成新的Statement,然后设置上下文、填写采集数据即可
具体样例可参考测试文件中的`test_with_custom_event`测试用例
This diff is collapsed.
This diff is collapsed.
import types
import requests
import urllib.parse
def get_file_sha256(file_path):
""" 计算文件的 Sha256"""
with open(file_path, "rb") as f:
import hashlib
sha256obj = hashlib.sha256()
sha256obj.update(f.read())
hash_value = sha256obj.hexdigest()
return hash_value
def exec_function_with_timing(func: types.FunctionType, *args, **kargs):
""" 计算函数调用时长 """
import time
begin = time.time()
ret = func(*args, **kargs)
end = time.time()
total_time = int((end - begin) * 1000)
return ret, total_time
def url_join_args(endpoint, api, params: dict=None, **kwargs):
"""拼接请求参数
:param endpoint: 服务地址
:param api: 接口,可带?或不带
:param params: 请求参数
:param kwargs: 未出现的参数,将组合成字典
:return: 拼接好的url
"""
result = endpoint + api
if not result.endswith('?') and (params or kwargs):
result = result + '?'
if params:
result = result + urllib.parse.urlencode(params)
if kwargs:
if params:
result = result + '&' + urllib.parse.urlencode(kwargs)
else:
result = result + urllib.parse.urlencode(kwargs)
return result
def post_request(endpoint, api, params: dict = None, upload_files: dict=None,
content_key: str="statements", content_data=None, headers: dict = None):
"""发起一个POST请求
:param endpoint: 服务地址
:param api: 接口,可带?或不带
:param params: 请求参数
:param upload_files: 要上传的文件{"key1": {"path": str}, "key2": {"path": str}....}
:param content_key: content的表单键
:param content_data: 要上传的非文件对象
:param headers: 请求头
:return: 拼接好的url
"""
url = url_join_args(endpoint, api, params)
import json
files = {content_key: json.dumps(content_data)}
if upload_files is not None:
for key in upload_files.keys():
file_info = upload_files[key]
files[key] = open(file_info["path"], 'rb')
res = requests.post(url, files=files, headers=headers)
from cg_learning.tracker import debug
debug(str(res.request.body))
debug(str(res.content))
return res.json
......@@ -8,6 +8,8 @@
import inspect
from concurrent.futures import ThreadPoolExecutor
from cg_learning.tracker import learning_tracker_grading, learning_tracker_test_case
from pygrading.job_base import JobBase
from pygrading.exception import FunctionArgsError
......@@ -15,6 +17,7 @@ from pygrading.exception import FunctionArgsError
class Job(JobBase):
""" Running a job and support multi-threading """
@learning_tracker_grading
def start(self, max_workers: int = 1):
""" start a job """
......@@ -36,6 +39,7 @@ class Job(JobBase):
testcases = [(self, case) for case in self.get_testcases()]
# create a thradd pool
@learning_tracker_test_case
def outer_run(args):
"""
support these four situations:
......
......@@ -41,6 +41,8 @@ setuptools.setup(
python_requires='>=3.5',
install_requires=[
'Jinja2>=2.11.2',
'fire>=0.3.1'
'fire>=0.3.1',
'PyJWT>=2.0.1',
'requests>=2.21.0'
],
)
import unittest
import pygrading as gg
from cg_learning.models import Verb, Statement, Activity
from cg_learning.tracker import track_grading_enable_config_key, \
new_statement_instance, get_current_statement, track_debug_config_key, \
get_statements_queue, clear_statements_queue, \
track_auth_jwt_config_key, default_concept_domain, build_the_verb, submit_statement, track_lrs_endpoint_config_key
from pygrading import Job, TestCases
test_track_config = {track_grading_enable_config_key: True,
track_auth_jwt_config_key: "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9."
"eyJpc3MiOiJodHRwOi8vbGVhcm5pbmcuZWR1Y2"
"cubmV0IiwibmFtZSI6IlB5dGhvbiBHcmFkaW5n"
"IEtlcm5lbCJ9.pf0vDkYc4TTgCUhxpCIobv7ST7NcMPovDWihidxzYBg",
track_debug_config_key: True}
def prework(job: Job):
testcases = gg.create_testcase()
testcases.append("TestCase1", 20)
testcases.append("TestCase2", 30)
testcases.append("TestCase3", 25)
testcases.append("TestCase4", 25)
job.set_testcases(testcases)
def run_with_only_case_param(case: TestCases.SingleTestCase):
import random
score = random.uniform(0, case.score)
print(f"{case.name} get score : {score}")
return score
def run_with_job_and_case_params(job: Job, case: TestCases.SingleTestCase):
return run_with_only_case_param(case)
def run_with_only_job_param(job: Job):
return True
def run_with_no_params():
return True
def basic_check_track_result(job: Job):
testcases = job.get_testcases()
statements = get_statements_queue()
statements_len = len(list(statements))
# all statement id should be unique
assert len(set(statement.statement_id for statement in statements)) == statements_len
# a root statement and all case statement
assert statements_len == len(testcases) + 1
class TestTracker(unittest.TestCase):
def tearDown(self):
clear_statements_queue()
def test_multi_form_run_def(self):
job = gg.Job(prework=prework, run=run_with_no_params, config=test_track_config)
job.start()
job.print()
basic_check_track_result(job)
clear_statements_queue()
job = gg.Job(prework=prework, run=run_with_only_case_param, config=test_track_config)
job.start()
job.print()
basic_check_track_result(job)
clear_statements_queue()
job = gg.Job(prework=prework, run=run_with_only_job_param, config=test_track_config)
job.start()
job.print()
basic_check_track_result(job)
clear_statements_queue()
job = gg.Job(prework=prework, run=run_with_job_and_case_params, config=test_track_config)
job.start()
job.print()
basic_check_track_result(job)
def test_run_in_multi_busy_thread(self):
job = gg.Job(prework=prework, run=run_with_only_case_param, config=test_track_config)
job.start(2)
job.print()
basic_check_track_result(job)
def test_run_in_multi_idle_thread(self):
job = gg.Job(prework=prework, run=run_with_only_case_param, config=test_track_config)
job.start(10)
job.print()
basic_check_track_result(job)
def test_with_custom_statistics(self):
def prework_with_custom_statistics(job: Job):
prework(job)
# collect ip of Raspberry Pi
statement = get_current_statement()
statement.add_context_extensions("RPI_ip", "192.168.123.123")
def run_with_custom_statistics(case: TestCases.SingleTestCase):
statement = get_current_statement()
result = {"score": run_with_only_case_param(case)}
if case.name == "TestCase4":
# mark TestCase4 as performance test
statement.add_context_extensions("case_type", "performance")
# collect elapsed time in performance test
result["time"] = 10.23
return result
job = gg.Job(prework=prework_with_custom_statistics, run=run_with_custom_statistics, config=test_track_config)
job.start()
basic_check_track_result(job)
def test_with_custom_event(self):
def run_with_custom_event(case: TestCases.SingleTestCase):
current_statement = get_current_statement()
command = "sleep 2"
result = gg.exec(command)
# create new statement
new_statement = new_statement_instance()
# set the context
new_statement.add_context_activity(
Statement.ContextActivityType.Parent, current_statement.object_activity.activity_id)
new_statement.context_statement_id = current_statement.statement_id
# set the verb
new_statement.verb = build_the_verb("执行")
# set the activity
from urllib.parse import quote
new_statement.object_activity = Activity(quote(f'{default_concept_domain}/activity#{command}'),
activity_type=f'{default_concept_domain}/activity/type#command',
description={
'en': 'execute this command to sleep two seconds',
'zh': '执行该命令使得当前线程睡眠2秒'}
)
new_statement.add_result_extension('cmd', command)
new_statement.add_result_extension('stdout', result.stdout)
new_statement.add_result_extension('stderr', result.stderr)
new_statement.add_result_extension('exec_time', result.exec_time)
new_statement.add_result_extension('returncode', result.returncode)
return {"score": run_with_only_case_param(case)}
job = gg.Job(prework=prework, run=run_with_custom_event, config=test_track_config)
job.start()
basic_check_track_result(job)
def test_exception_outside_case(self):
info = "Grading Exception!!!!"
def prework_exception(job: Job):
prework(job)
raise Exception(info)
# test_track_config[track_grading_enable_config_key] = False
job = gg.Job(prework=prework_exception, run=run_with_no_params, config=test_track_config)
try:
job.start()
except Exception as ex:
assert str(ex) == info
statements = get_statements_queue()
statements_len = len(list(statements))
# since an exception was raise in pre_work , only one statement was generated
assert statements_len == 1
for statement in statements:
assert statement.result_extensions["exception"] == info
clear_statements_queue()
def test_exception_inside_case(self):
info = "Case Exception!!!!"
def run_exception(case: TestCases.SingleTestCase):
if case.name == "TestCase3":
raise Exception(info)
return run_with_only_case_param(case)
# test_track_config[track_grading_enable_config_key] = False
job = gg.Job(prework= prework, run=run_exception, config=test_track_config)
try:
job.start()
except Exception as ex:
assert str(ex) == info
statements = get_statements_queue()
for statement in statements:
if statement.object_activity.activity_id == f"{default_concept_domain}/testcase/TestCase3":
assert statement.result_extensions["exception"] == info
basic_check_track_result(job)
def test_models(self):
verb = Verb("http://verb.educg.net//测试")
assert str(verb) == '{"id": "http://verb.educg.net//测试"}'
def test_new_instance(self):
statements = []
nums = 10000
for i in range(nums):
statements.append(new_statement_instance())
ans = set(statement.statement_id for statement in statements)
# all statement id should be unique
assert len(ans) == nums
clear_statements_queue()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment