通过python并行运算cmd命令(以dyna为例)
一、基础知识
1.1 dyna的cmd命令方式
若dyna求解器的路径为D:\Program Files\lsdyna\program\ls-dyna_smp_d_R11_1_0_winx64_ifort160.exe
,需要执行的k文件路径为D:\ProgramData\lsdyna\DOEv4.X\DOEv4.3\job.k
,则通过cmd可调用 dyna求解器对k文件进行计算,cmd窗口中输入代码为
"D:\Program Files\lsdyna\program\ls-dyna_smp_d_R11_1_0_winx64_ifort160.exe" I=D:\ProgramData\lsdyna\DOEv4.X\DOEv4.3\job.k O=D:\ProgramData\lsdyna\DOEv4.X\DOEv4.3\d3hsp NCPU=8
其中,NCPU=8
含义为8核SMP计算。
1.2 python执行cmd命令
python中,可以通过os库中的os.system(command)来执行cmd命令,如python通过cmd设置定时关机的代码可表示为
import os
os.system('shutdown -s -t 3600')
1.3 python的并行机制
python的threading 模块提供了多线程运行方式,具体细节可参考 Python threading实现多线程 基础篇 - 知乎 (zhihu.com),总体而言,实现多线程运行有两种方式:
- 创建Thread 的实例,传给它一个可调用对象(函数或者类的实例方法)。
- 派生Thread 的子类,并创建子类的实例。
方式一的案例为:
from threading import Thread
from time import sleep, ctime
def func(name, sec):
print('---开始---', name, '时间', ctime())
sleep(sec)
print('***结束***', name, '时间', ctime())
# 创建 Thread 实例
t1 = Thread(target=func, args=('第一个线程', 1))
t2 = Thread(target=func, args=('第二个线程', 2))
# 启动线程运行
t1.start()
t2.start()
# 等待所有线程执行完毕
t1.join() # join() 等待线程终止,要不然一直挂起
t2.join()
方式二的案例为:
from threading import Thread
from time import sleep, ctime
# 创建 Thread 的子类
class MyThread(Thread):
def __init__(self, func, args):
'''
:param func: 可调用的对象
:param args: 可调用对象的参数
'''
Thread.__init__(self) # 不要忘记调用Thread的初始化方法
self.func = func
self.args = args
def run(self):
self.func(*self.args)
def func(name, sec):
print('---开始---', name, '时间', ctime())
sleep(sec)
print('***结束***', name, '时间', ctime())
def main():
# 创建 Thread 实例
t1 = MyThread(func, (1, 1))
t2 = MyThread(func, (2, 2))
# 启动线程运行
t1.start()
t2.start()
# 等待所有线程执行完毕
t1.join()
t2.join()
if __name__ == '__main__':
main()
两种方式的结果为:
---开始--- 一 时间 Fri Nov 29 11:34:31 2019
---开始--- 二 时间 Fri Nov 29 11:34:31 2019
***结束*** 一 时间 Fri Nov 29 11:34:32 2019
***结束*** 二 时间 Fri Nov 29 11:34:33 2019
二、 python并行运算
2.1 文件说明
任务目标为执行多个k文件,每个k文件位于一个独立的文件夹下,所有文件夹存放在同一个总体文件夹之中,文件组织如下。
2.2 整体思路
程序的输入为总体文件夹的路径、进程数量NProcessor、单进程调用核的数量NCPU、输出为执行有限元仿真的cmd代码及在cmd中的并行执行,并辅以日志和实时显示的功能。步骤如下:
- 定义类的主体,确定dyna路径,输入NCPU和NProcessor来初始化类;
- 搜索总体文件夹下的子文件夹,记录在类变量KfileDirs中;
- 将KfileDirs视为队列,实施并行计算;
- 每次从KfileDirs中取出一个KfileDir,将它作为参数调用pulltask函数来启用进程(根据k文件路径调用cmd执行,并返回进程);
- 动态展示当前各个进程的状态和信息;
- 只要KfileDirs未完全遍历 或 全部进程均空闲,就继续执行;
总体代码如下:
from threading import Thread
import os
import time
import logging
from utilis.logger import setlogger
from utilis.ParrellelRunLib import watchdog_check,pull_task,check_single_thread
from datetime import datetime
import threading
import time
class Parel_Runer(object):
lsdyna_dir = "D:\\Program Files\\lsdyna\\program\\ls-dyna_smp_d_R11_1_0_winx64_ifort160.exe"
Kfile_name = 'job.k'
def __init__(self,NCPU = 6, NProcessor = 7):
self.NCPU = NCPU
self.thread_register = [None for i in range(NProcessor)]
def run_in_dir(self,dir): #获取需要执行的k文件路径列表,并调用函数执行
self.dir = dir
setlogger(os.path.join(self.dir, 'training-%s.log' % datetime.strftime(datetime.now(), '%m%d-%H%M%S')))
#获取需要执行的k文件路径列表
self.KfileDirs = []
subdirs = next(os.walk(self.dir))[1]
for subdir in subdirs:
if 'condition' in subdir:
self.KfileDirs.append(os.path.join(dir,subdir))
#记录
for KfileDir in self.KfileDirs:
commandline = '"%s" I=%s NCPU=%d' % (self.lsdyna_dir, os.path.join(KfileDir, self.Kfile_name), self.NCPU)
logging.info('dir:\t' + KfileDir)
logging.info('command:\t' + commandline + '\n \n')
logging.info('--' * 20)
#执行
self.run_actually()
def run_actually(self):
start = time.time()
# 循环执行条件 :KfileDirs不为空或 线程不全为空
while len(self.KfileDirs) > 0 or not register_is_free(self.thread_register):
response = watchdog_check(self.thread_register)
if response != None: # 检查是否有空闲,空闲返回
KfileDir= self.KfileDirs.pop(0)
commandline = '"%s" I=%s NCPU=%d' % (self.lsdyna_dir, os.path.join(KfileDir, self.Kfile_name), self.NCPU)
self.thread_register[response] = pull_task(commandline, KfileDir, "thread-%d" % (response + 1))
time.sleep(2)
# 动态刷新
timeinfo = "<{:s}> processtime:{:6.2f}min |||||\n".format(datetime.strftime(datetime.now(), '%H:%M:%S'),
(time.time() - start) / 60)
showinfo = ["<thread-{:<2d} | mode: {:^9s} | running time:{:5.2f}min> | processdir: {:s}\n"
.format(i + 1, check_single_thread(item), 0 if item is None else item.runningtime() / 60,
"None" if item is None else os.path.split(item.cal_dir)[-1])
for i, item in enumerate(self.thread_register)]
print('\r', timeinfo, *showinfo, end='')
if __name__ == '__main__':
runner = Parel_Runer()
runner.run_in_dir(r'D:\ProgramData\lsdyna\Ex1-Model2\1_SimplifiedModel\5-generation\table_v2')
2.3 详解pull task函数
- CustomThread类,是threading.Thread的子类;
- pull_task函数,接受了K文件路径和计算结果放置路径,然后通过CustomThread唤起了进程执行execute_cmd_repeat函数并返回该进程变量;
- execute_cmd_repeat函数,目的为循环执行有限元。为防止有限元不收敛提前终止,因此通过该函数对生成文件d3plot的数量进行检查,如果生成d3plot的数量不达标且未达到重复上限,则清除生成的文件并再次执行有限元仿真;
- execute_cmd函数,单次执行有限元仿真。跟据生成文件夹更改执行路径,然后根据K文件路径执行有限元仿真;
- cal_d3plot_number函数:统计文件夹下d3plot文件的数量;
- clearfile函数:清除文件夹下k文件外的所有文件。
class CustomThread(threading.Thread):
def __init__(self, *args,**kwargs):
super().__init__(*args,**kwargs)
def setdir(self,cal_dir):
self.cal_dir = cal_dir
def run(self):
self.starttime = time.time()
self._target(*self._args)
def runningtime(self):
try:
return time.time() - self.starttime
except:
return 0
def pull_task(commandline,cal_dir=None,name = None):
t = CustomThread(target=execute_cmd_repeat, args=(commandline,cal_dir), name= name)
t.setdir(cal_dir)
t.start()
return t
def execute_cmd_repeat(command,cal_dir='./',repeat=2,d3plotfile_target=100):
runtime = 0
number = cal_d3plot_number(cal_dir)
while (number < d3plotfile_target and runtime < repeat):
clearfile(cal_dir)
execute_cmd(command,cal_dir,runtime)
number = cal_d3plot_number(cal_dir)
runtime += 1
def execute_cmd(command,cal_dir='./',repeattime=0):
log_info=['-----' * 10, "calculation start in %s:"%threading.current_thread().name,
cal_dir,command,'\nrepeattime: %d\n'%repeattime,'-----' * 10]
for item in log_info:
logging.info(item)
#获取当前路径
os.chdir(cal_dir)
os.system(command)
log_info=['-----' * 10, "calculation finished in %s:"%threading.current_thread().name,
cal_dir,command,'\nrepeattime: %d\n'%repeattime,'-----' * 10]
for item in log_info:
logging.info(item)
def cal_d3plot_number(cal_dir):
files = next(os.walk(cal_dir))[2]
count = 0
for file in files:
if 'd3plot' in file:
count += 1
return count
def clearfile(target_dir):
files = next(os.walk(target_dir))[2]
for file in files:
if not file.endswith('.k'):
os.remove(os.path.join(target_dir,file))
2.4 其他函数
- watchdog函数:检查线程列表中的线程是否有空闲,若有空闲,返回空闲的线程编号,否则返回None;
- register_is_free函数:检查线程列表是否全部空闲;
- check_single_thread函数:检查单个线程是否在运行;
def watchdog_check(register):
for i,item in enumerate(register):
if item == None or not item.is_alive():
return i
return None
def register_is_free(register):
for item in register:
if item != None and item.is_alive():
return False
return True
def check_single_thread(td):
if td == None:
return 'undefined'
else:
return "%s"%td.is_alive()