a b/src/executor.py
1
# -*- coding: utf-8 -*-
2
"""
3
@Time : 2023/12/11 12:49
4
@Auth : Juexiao Zhou
5
@File :executor.py
6
@IDE :PyCharm
7
@Page: www.joshuachou.ink
8
"""
9
10
import subprocess
11
import time
12
13
class CodeExecutor:
14
    def __init__(self):
15
        self.bash_code_path = None
16
        self.code_prefix = [
17
            'mamba activate abc_runtime',
18
        ]
19
        self.code_postfix = [
20
        ]
21
22
    def execute(self, bash_code_path):
23
24
        self.bash_code_path = bash_code_path
25
        with open(self.bash_code_path, 'r') as input_file:
26
            bash_content = input_file.read()
27
28
        self.bash_code_path_execute = self.bash_code_path + '.execute.sh'
29
30
        # 打开新生成的 Bash 文件以供写入
31
        with open(self.bash_code_path_execute, 'w') as output_file:
32
            for code in self.code_prefix:
33
                output_file.write(code + '\n')
34
            # 写入原始内容
35
            output_file.write(bash_content)
36
            output_file.write('\n')  # 确保在新行开始
37
            for code in self.code_postfix:
38
                output_file.write(code + '\n')
39
40
        # 使用 subprocess 执行 Bash 文件,将输出捕获到一个字符串中
41
        process = subprocess.Popen(['bash', '-i', '-e', self.bash_code_path_execute],
42
                                            stdout=subprocess.PIPE,
43
                                            stderr=subprocess.PIPE,
44
                                            text=True)
45
46
        # 实时读取输出并打印
47
        stdout = []
48
        while True:
49
            output = process.stdout.readline()
50
            if output == '' and process.poll() is not None:
51
                break
52
            print(f'[stdout] {output.strip()}')
53
            stdout.append(f'[stdout] {output.strip()}')
54
55
        stderr = []
56
        for _ in process.stderr.readlines():
57
            if 'EnvironmentNameNotFound' in _ or '\n' == _:
58
                pass
59
            else:
60
                print(f"[stderr] {_}", end='')
61
                stderr.append(_)
62
63
        if len(stdout) > 10:
64
            stdout = stdout[-10:]
65
        if len(stderr) > 10:
66
            stderr = stderr[-10:]
67
68
        stdout = '\n'.join(stdout)
69
        stderr = '\n'.join(stderr)
70
71
        process.communicate()
72
73
        executor_info = stdout + '\n' + stderr
74
        return executor_info