Post

简单的排产系统的实现

近期公司需要做一个简单的排产算法,梳理下来之后需求点可以分为:

  1. 在给定生产计划,产品生产工艺路线的情况下,拟定一个基本的排产计划。
  2. 支持滚动重排。

产品经理对接过来的需求中,提到不需要考虑人员,物料等的约束,仅考虑提高设备利用率。因此,可以建模为如下方案。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@dataclass 
class Orders:
    order_id : str # 订单id
    start_date: datetime
    end_date : datetime 
    product : str # 产品id
    priority : int # 订单优先级

class Product :
    product_id : str 
    steps : List[StepDependency] 

class Resource :
    res_id : str 

有了基础的业务数据模型之后,我们来考虑问题本身。可以很显然地看到,该问题是一个约束优化问题,需要在以下条件下,求得最短完工时间,或者最大设备利用率。这里为了最简化代码,我们可以使用设备利用率来做。

1
2
# 求解模型

定义好模型之后,就可以直接使用谷歌库来求解。

重新排产问题

笔者在接手时,认为重排产是一个不频繁,且不难实现的需求,初版给出之后才发现,由于算法安排时间过紧,在半夜一点到凌晨六点安排了一大堆地狱的换料清洗等等任务。考虑到生产同事的身心健康,以及产品虽然说不需要考虑但事实上往往会成为限制的物料等内容,动态重排变成了一个优先级很高的需求。

因此,同样地,我们先开始建模。

首先,动态重排可以分为两种,一种是设备受限,一种是设备不受限。第一种可以是由于设备故障等,造成机器不可用。这种情况下,生产同事可以选择切换产品线,换到不需要使用该设备的产品。 设备不受限的情况是指由于种种原因,某一个或者多个工序需要延长其加工时间。

动态重排是在某一种初始状态下的重排,因此,我们首先需要考虑初始条件如何给出。

1
2
3
4
5
@dataclass 
class initialStates 
    resource_availablity : List
    step_finished : List 
    step_ongoing : List 

其中,resource_avalibility 列表记录设备从当前时刻开始的最早可用时间,如设备故障,或需要维修4小时,则记录在列表中分别为inf, 4。 所有未完成的,以及未开始的任务,都需要避开这个列表中记录的资源不可用时间段 。

step_finished : 由于产品工艺是不变的,为了保证工艺路线中靠后的工艺可以正确地识别到对应的依赖已经完成,因此,我们需要记录当前整体工艺中已经完成的工艺。

step_ongoing : 一般来说,在排产后的任意一个时间节点,都会存在正在进行中的工艺,因此中断重排必须要考虑这一部分任务的处理。尽管有时候可能是底层数据如工艺路线不对,比如时间过于紧张,往往不能按时完成,但更一般的情况可能是某一设备偶发性拖延,如开机晚了一点等,这种情况导致的与排产数据对不上,不需要处理底层工艺路线,但因为这种拖延可能会影响到下一个使用该设备的产品,因此也需要进行处理。 step_ongoing 中记录当前进行的工序的剩余时间。

这样,使用这三个属性,我们可以表征到重排产需要的初始状态。

因此,新的排产流程伪代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Scheduler:
    def __init__(self, products, orders, resources):
        pass

    def run_simulate(initial_state):

        cp_model = cpModel()
        maitain_intervals = []
        finished_tasks = []
        ongoing_tasks = []
        for res_id, res_maitaining_time in initial_state.resource_availablity.items():
            start_time, end_time = res_maitaining_time
            maitain_intervals.append(add_fake_task(start_time, end_time, res_id)) 
        
        for (order_id, step_id) in initial_state.step_finished:

            finished_tasks.append({(order_id, step_id):{"status":"finished" }}) # 已完成的任务,不占用设备,因此不需要记录对应的设备id
        
        
        for (order_id, step_id), (remaining_time,res_id) in initial_state.step_ongoing.items():
            st_time = 0 if res_id not in initial_state.resource_availablity.keys() else initial_state.resource_availablity[res_id].end_time
            maitain_intervals.append(add_fake_task(st_time, remaining_time, res_id)) #对于当前在进行中的任务,设备上的其他任务都需要在该任务完成之后才能排,因此可以等效于一个设备不可用时间

        # 其余内容与无初始状态的排产基本相同,增加nooverlap 约束时,需要增加maitain_intervals。 


This post is licensed under CC BY 4.0 by the author.