若dyna求解器的路径为https://www.gofarlic.com\Program Files\lsdyna\program\ls-dyna_smp_d_R11_1_0_winx64_ifort160.exe,需要执行的k文件路径为https://www.gofarlic.com\ProgramData\lsdyna\DOEv4.X\DOEv4.3\job.k,则通过cmd可调用 dyna求解器对k文件进行计算,cmd窗口中输入代码为
"https://www.gofarlic.com\Program Files\lsdyna\program\ls-dyna_smp_d_R11_1_0_winx64_ifort160.exe" I=https://www.gofarlic.com\ProgramData\lsdyna\DOEv4.X\DOEv4.3\job.k O=https://www.gofarlic.com\ProgramData\lsdyna\DOEv4.X\DOEv4.3\d3hsp NCPU=8
其中,NCPU=8 含义为8核SMP计算。
python中,可以通过os库 中的os.system(command)来执行cmd命令,如python通过cmd设置定时关机的代码可表示为
import os
os.system('shutdown -s -t 3600')
python的threading 模块 提供了多线程运行方式,具体细节可参考Python threading实现多线程 基础篇 - 知乎 (zhihu.com),总体而言,实现多线程运行有两种方式:
方式一的案例为:
from threading import Thread
from time import sleep, ctime
def func(name, sec):
print('---开始---', name, '时间', ctime())
sleep(sec)
print('***结束***', name, '时间', ctime())
# 创建 Thread 实例
t1 = Thread(target=func, args=('第一个线程', 1))
t2 = Thread(target=func, args=('第二个线程', 2))
# 启动线程运行
t1.start()
t2.start()
# 等待所有线程执行完毕
t1.join() # join() 等待线程终止,要不然一直挂起
t2.join()
方式二的案例为:
from threading import Thread
from time import sleep, ctime
# 创建 Thread 的子类
class MyThread(Thread):
def __init__(self, func, args):
'''
:param func: 可调用的对象
:param args: 可调用对象的参数
'''
Thread.__init__(self) # 不要忘记调用Thread的初始化方法
self.func = func
self.args = args
def run(self):
self.func(*self.args)
def func(name, sec):
print('---开始---', name, '时间', ctime())
sleep(sec)
print('***结束***', name, '时间', ctime())
def main():
# 创建 Thread 实例
t1 = MyThread(func, (1, 1))
t2 = MyThread(func, (2, 2))
# 启动线程运行
t1.start()
t2.start()
# 等待所有线程执行完毕
t1.join()
t2.join()
if __name__ == '__main__':
main()
两种方式的结果为:
---开始--- 一 时间 Fri Nov 29 11:34:31 2019
---开始--- 二 时间 Fri Nov 29 11:34:31 2019
***结束*** 一 时间 Fri Nov 29 11:34:32 2019
***结束*** 二 时间 Fri Nov 29 11:34:33 2019
任务目标为执行多个k文件,每个k文件位于一个独立的文件夹下,所有文件夹存放在同一个总体文件夹之中,文件组织如下。
程序的输入为总体文件夹的路径、进程数量NProcessor、单进程调用核的数量NCPU、输出为执行有限元仿真的cmd代码及在cmd中的并行执行,并辅以日志和实时显示的功能。步骤如下:
总体代码如下:
from threading import Thread
import os
import time
import logging
from utilis.logger import setlogger
from utilis.ParrellelRunLib import watchdog_check,pull_task,check_single_thread
from datetime import datetime
import threading
import time
class Parel_Runer(object):
lsdyna_dir = "https://www.gofarlic.com\\Program Files\\lsdyna\\program\\ls-dyna_smp_d_R11_1_0_winx64_ifort160.exe"
Kfile_name = 'job.k'
def __init__(self,NCPU = 6, NProcessor = 7):
self.NCPU = NCPU
self.thread_register = [None for i in range(NProcessor)]
def run_in_dir(self,dir): #获取需要执行的k文件路径列表,并调用函数执行
self.dir = dir
setlogger(os.path.join(self.dir, 'training-%s.log' % datetime.strftime(datetime.now(), '%m%d-%H%M%S')))
#获取需要执行的k文件路径列表
self.KfileDirs = []
subdirs = next(os.walk(self.dir))[1]
for subdir in subdirs:
if 'condition' in subdir:
self.KfileDirs.append(os.path.join(dir,subdir))
#记录
for KfileDir in self.KfileDirs:
commandline = '"%s" I=%s NCPU=%d' % (self.lsdyna_dir, os.path.join(KfileDir, self.Kfile_name), self.NCPU)
logging.info('dir:\t' + KfileDir)
logging.info('command:\t' + commandline + '\n \n')
logging.info('--' * 20)
#执行
self.run_actually()
def run_actually(self):
start = time.time()
# 循环执行条件 :KfileDirs不为空或 线程不全为空
while len(self.KfileDirs) > 0 or not register_is_free(self.thread_register):
response = watchdog_check(self.thread_register)
if response != None: # 检查是否有空闲,空闲返回
KfileDir= self.KfileDirs.pop(0)
commandline = '"%s" I=%s NCPU=%d' % (self.lsdyna_dir, os.path.join(KfileDir, self.Kfile_name), self.NCPU)
self.thread_register[response] = pull_task(commandline, KfileDir, "thread-%d" % (response + 1))
time.sleep(2)
# 动态刷新
timeinfo = "<{:s}> processtime:{:6.2f}min |||||\n".format(datetime.strftime(datetime.now(), '%H:%M:%S'),
(time.time() - start) / 60)
showinfo = ["<thread-{:<2d} | mode: {:^9s} | running time:{:5.2f}min> | processdir: {:s}\n"
.format(i + 1, check_single_thread(item), 0 if item is None else item.runningtime() / 60,
"None" if item is None else os.path.split(item.cal_dir)[-1])
for i, item in enumerate(self.thread_register)]
print('\r', timeinfo, *showinfo, end='')
if __name__ == '__main__':
runner = Parel_Runer()
runner.run_in_dir(r'https://www.gofarlic.com\ProgramData\lsdyna\Ex1-Model2\1_SimplifiedModel\5-generation\table_v2')
class CustomThread(threading.Thread):
def __init__(self, *args,**kwargs):
super().__init__(*args,**kwargs)
def setdir(self,cal_dir):
self.cal_dir = cal_dir
def run(self):
self.starttime = time.time()
self._target(*self._args)
def runningtime(self):
try:
return time.time() - self.starttime
except:
return 0
def pull_task(commandline,cal_dir=None,name = None):
t = CustomThread(target=execute_cmd_repeat, args=(commandline,cal_dir), name= name)
t.setdir(cal_dir)
t.start()
return t
def execute_cmd_repeat(command,cal_dir='./',repeat=2,d3plotfile_target=100):
runtime = 0
number = cal_d3plot_number(cal_dir)
while (number < d3plotfile_target and runtime < repeat):
clearfile(cal_dir)
execute_cmd(command,cal_dir,runtime)
number = cal_d3plot_number(cal_dir)
runtime += 1
def execute_cmd(command,cal_dir='./',repeattime=0):
log_info=['-----' * 10, "calculation start in %s:"%threading.current_thread().name,
cal_dir,command,'\nrepeattime: %d\n'%repeattime,'-----' * 10]
for item in log_info:
logging.info(item)
#获取当前路径
os.chdir(cal_dir)
os.system(command)
log_info=['-----' * 10, "calculation finished in %s:"%threading.current_thread().name,
cal_dir,command,'\nrepeattime: %d\n'%repeattime,'-----' * 10]
for item in log_info:
logging.info(item)
def cal_d3plot_number(cal_dir):
files = next(os.walk(cal_dir))[2]
count = 0
for file in files:
if 'd3plot' in file:
count += 1
return count
def clearfile(target_dir):
files = next(os.walk(target_dir))[2]
for file in files:
if not file.endswith('.k'):
os.remove(os.path.join(target_dir,file))
def watchdog_check(register):
for i,item in enumerate(register):
if item == None or not item.is_alive():
return i
return None
def register_is_free(register):
for item in register:
if item != None and item.is_alive():
return False
return True
def check_single_thread(td):
if td == None:
return 'undefined'
else:
return "%s"%td.is_alive()