Moore-Hodgson's Algorithm

此演算法將一個 O( N2 ) 的 Scheduling 問題縮減到了 O( NlogN )


問題描述

有 N 個 Jobs,每個 Job 耗時 p 且期限 (due) 為 d,求最少逾期工作數量 ( Minimum number of late jobs )的排程方式
也就是說,要找出一個排程方式是能夠讓在期限之前完成的工作數量最大化


Moore-Hodgson 演算法

  1. 設 J = ∅ ( J 是一個 subschedule 且為空集合 )
  2. 將 Jobs 按照期限遞增排序 ( d1 < d2 < …… < dn ),也就是依照 Earliest Due Date ( EDD ) 的方式,並放入 J 中
  3. 令 k = subschedule J 中的第一個逾期工作的 index,如果沒有找到,代表已完成排程,跳到步驟 7
  4. 找出 pm = max{ pi, for i = 1 ~ k } ( 找出 index <= k 中最耗時的工作)
  5. 將 Job m 移出 subschedule J,放入集合 S 中
  6. 重複步驟 3
  7. 最後,將集合 S 以任意順序加到 J 的後面,此 Schedule J 即為最少逾期工作數量的排程


例子一

Job 1 2 3 4 5 6
pi ( 所需時間 ) 10 3 4 8 10 6
di ( 期限 ) 15 6 9 23 20 30

第一步,先根據 Due Time 去做排序
再新增加一個 si 去紀錄剛好執行完 Job[i] 的時間

Job 2 3 1 5 4 6
pi ( 所需時間 ) 3 4 10 10 8 6
di ( 期限 ) 6 9 15 20 23 30
si ( 完成工作的時間 ) 3 7 17 27 35 41

我們可以輕易的從 si 與 di 去找到第一個逾期的工作是 Job[1]
然後再找一下在 Job[1] 之前的工作中,最耗時的工作為 Job[1]
所以我們把 Job[1] 刪除,並更新一下在 Job[1] 之後的 si

Job 2 3 5 4 6
pi 3 4 10 8 6
di 6 9 20 23 30
si 3 7 17 25 31

找到第一個逾期的工作是 Job[4]
在 Job[4] 之前最耗時的工作為 Job[5]
剔除他並更新

Job 2 3 4 6
pi 3 4 8 6
di 6 9 23 30
si 3 7 15 21

到這邊就沒有逾期的工作了
接下來就是把剛剛剔除的工作以任意順序加到後面去

所以可以為 : 2 -> 3 -> 4 -> 6 -> 1 -> 5
也可以是為 : 2 -> 3 -> 4 -> 6 -> 5 -> 1

例子二

Job 1 2 3 4 5
pi 4 3 2 5 6
di 6 7 8 9 11

由於剛好順序已經是按照 Due Time 排序過後的了
所以直接加上 si 來輔助過程

Job 1 2 3 4 5
pi 4 3 2 5 6
di 6 7 8 9 11
si 4 7 9 14 20

找到第一個逾期的工作是 Job[3]
在 Job[3] 之前最耗時的工作為 Job[1]
剔除他並更新

Job 2 3 4 5
pi 3 2 5 6
di 7 8 9 11
si 3 5 10 16

找到第一個逾期的工作是 Job[4]
在 Job[4] 之前最耗時的工作為 Job[4]
剔除他並更新

Job 2 3 5
pi 3 2 6
di 7 8 11
si 3 5 11

到這邊就沒有逾期的工作了
接下來就是把剛剛剔除的工作以任意順序加到後面去

所以可以為 : 2 -> 3 -> 5 -> 1 -> 4
也可以是為 : 2 -> 3 -> 5 -> 4 -> 1


暴力程式碼 O( N2 )

此版本程式碼耗時 O( N2 )
他只是演示上述演算法的過程
並非真正的最佳 present algorithm 的方式
真正優化的程式碼請往下看或按側邊攔跳到 優化程式碼

首先我們先定義一下工作的結構與比較大小的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct Job
{
int index;
int process;
int due;
int end_time;

Job(int i = 0, int p = 0, int d = 0, int e = 0) : index{ i }, process { p }, due{ d }, end_time{ e } {}
};

inline bool operator<(const Job &lhs, const Job &rhs)
{
return lhs.due < rhs.due;
}

再把 Moore-Hodgson’s Algorithm 封裝成 class

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class Moore_Hodgson
{
public:
Moore_Hodgson(initializer_list<Job> int_list) : jobs{ int_list }, last_valid_idx{ jobs.size() - 1 }
{
// 初始化 Jobs
initial();

// 運算
operate();
}

friend ostream& operator<<(ostream &out, const Moore_Hodgson &schedule)
{
for (auto &x : schedule.jobs)
out << x.index << " ";

return out;
}

private:
vector<Job> jobs;
unsigned last_valid_idx; // 最後一個可以在期限內完成的工作的 Index

private:
void operate(void)
{
int idx;
int tem = 0;

// 若此次迴圈中沒有改變到 last_valid_idx 的值,代表已完成演算法
while (tem != last_valid_idx)
{
tem = last_valid_idx;

for (idx = 0; idx <= last_valid_idx; ++idx)
{
// 找到逾期的工作
if (jobs[idx].end_time > jobs[idx].due)
{
// 找出最大工作量的工作 J_max
auto itr = find_max_processing_time(jobs, 0, idx);

// 先記錄起來
auto rec = *itr;

// 移除最大工作量的工作 J_max
auto new_itr = jobs.erase(itr);

// 將 last_valid_idx 往前移一位
--last_valid_idx;

// 把在被刪掉的 J_max 工作之後的工作的結束時間更新 ( 不包括已經確定逾期的工作 )
// 且只需要將之後的每個工作都減掉 J_max 的工作量就可以
update_end_time(jobs, rec.process);

// 將逾期的工作加到最後面放著
jobs.push_back(rec);

// 跳出 for 迴圈
break;
}
}
}
}

void initial(void)
{
sort(jobs.begin(), jobs.end());

jobs[0].end_time = jobs[0].process;

for (auto idx = 1; idx < jobs.size(); ++idx)
jobs[idx].end_time = jobs[idx - 1].end_time + jobs[idx].process;
}

vector<Job>::const_iterator find_max_processing_time(const vector<Job> &jobs, int start, int end) const
{
int max = -1;
int idx = -1;

for (int i = start; i <= end; ++i)
if (jobs[i].process > max)
{
max = jobs[i].process;
idx = i;
}

return jobs.begin() + idx;
}

void update_end_time(vector<Job> &jobs, int proc)
{
int idx = 0;

while (idx <= last_valid_idx)
jobs[idx++].end_time -= proc;
}
};

然後在主程式這樣呼叫

1
2
3
4
5
6
7
8
9
10
int main(void)
{
Moore_Hodgson schedule1{ {1,10,15},{2,3,6},{3,4,9 },{4,8,23},{5,10,20},{6,6,30} };
Moore_Hodgson schedule2{ {1,4,6},{2,3,7},{3,2,8 },{4,5,9},{5,6,1} };

cout << "Schedule 1 : " << schedule1 << endl;
cout << "Schedule 2 : " << schedule2 << endl;

return 0;
}

得到輸出

1
2
Schedule 1 : 2 3 4 6 1 5
Schedule 2 : 2 3 5 1 4


優化程式碼 O( NlogN )

上述的暴力法中有許多搜尋與計算 job end time 是重複的
我們可以使用 Priority Queue 與依序加入工作的方式來加快演算法

演算法

  1. 一樣先依照 EDD 次序將 Jobs 排序
  2. 遍歷 Jobs,將 Job[i] 加入到 priority queue 中
  3. 將目前 end time 加上 Job[i] 的耗時
  4. 檢查此工作是否有超時 ( end time 是否大於 the due of Job[i] )
  5. 若超時,則從 priority queue 中去除最大者,將 end time 扣掉他。
    若有需要輸出最後結果的話,也在 overdue 上記錄該工作為超時
  6. 若沒有,則重複步驟 2,直到遍歷完 Jobs
  7. 輸出時,先依照排序過後的 Jobs 次序將未逾期的工作印出
    再以任意順序輸出已逾期工作

例子

用上面的例子一來做說明

Job 1 2 3 4 5 6
pi ( 所需時間 ) 10 3 4 8 10 6
di ( 期限 ) 15 6 9 23 20 30

先將 Jobs 依照期限排序

Job 2 3 1 5 4 6
pi ( 所需時間 ) 3 4 10 10 8 6
di ( 期限 ) 6 9 15 20 23 30

現在就開始來排程

首先依序遍歷 Jobs

(一) 加入 Job[1]

Job 2
pi 3
di 6

end time = 3,沒有超時

(二) 加入 Job[2]

Job 2 3
pi 3 4
di 6 9

end time = 7,沒有超時

(三) 加入 Job[3]

Job 2 3 1
pi 3 4 10
di 6 9 15

end time = 17,超時
利用 priority queue 去除最大的,也就是將 J1 刪除
並減去他的工作時間,end time = 17 - 10 = 7

(四) 加入 Job[4]

Job 2 3 5
pi 3 4 10
di 6 9 20

end time = 17,沒有超時

(五) 加入 Job[5]

Job 2 3 5 4
pi 3 4 10 8
di 6 9 20 23

end time = 25,超時
利用 priority queue 去除最大的,也就是將 Job5 刪除
並減去他的工作時間,end time = 25 - 10 = 15

(六) 加入 Job[6]

Job 2 3 4 6
pi 3 4 8 6
di 6 9 23 30

end time = 21,沒有超時

最後輸出答案為 : 2 -> 3 -> 4 -> 6 -> 1 -> 4

程式碼

首先定義 Job 結構和比較大小的方法
由於我們不需要再記錄個別點的結束時間,因此就不需要 end_time 欄位了

1
2
3
4
5
6
7
8
9
10
11
12
13
struct Job2
{
int index;
int process;
int due;

Job2(int i = 0, int p = 0, int d = 0) : index{ i }, process { p }, due{ d } {}
};

inline bool operator<(const Job2 &lhs, const Job2 &rhs)
{
return lhs.due < rhs.due;
}

再來我們還需要定義 Priority Queue 中紀錄資訊的結構

1
2
3
4
5
6
7
8
9
10
11
12
struct Record
{
int index;
int process;

Record(int i = 0, int p = 0) :index{ i }, process{ p } {}
};

inline bool operator<(const Record &lhs, const Record &rhs)
{
return lhs.process < rhs.process;
}

主要演算法部分
要注意因為 Job 的 Index 是從 1 開始,所以要把 overdue 多開一個空間才可以用 Index 直接存取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class Moore_Hodgson_optimal
{
public:
Moore_Hodgson_optimal(initializer_list<Job2> int_list) : jobs{ int_list }, overdue(jobs.size() + 1, false)
{
// 運算
operate();
}

friend ostream& operator<<(ostream &out, const Moore_Hodgson_optimal &schedule)
{
// 先輸出沒有超時的部分
for (auto i = 0; i < schedule.jobs.size(); ++i)
if (!schedule.overdue[schedule.jobs[i].index])
out << schedule.jobs[i].index << " ";

// 任意順序輸出超時的部分
for (auto i = 1; i < schedule.overdue.size(); ++i)
if (schedule.overdue[i])
out << i << " ";

return out;
}

private:
vector<Job2> jobs;
vector<bool> overdue; // 紀錄是否超時

private:
void initial(void)
{
sort(jobs.begin(), jobs.end());
}

void operate(void)
{
priority_queue<Record> pque;

// 初始化 Jobs
initial();

int sum_time = 0;

for (auto &job : jobs)
{
// 加入 priority queue
pque.push({ job.index,job.process });

// 加上此工作的耗時
sum_time += job.process;

// 檢查是否會超時
if (sum_time > job.due)
{
// 刪掉最大耗時並記錄
sum_time -= pque.top().process;
overdue[pque.top().index] = true;

pque.pop();
}
}
}
};

主程式

1
2
3
4
5
6
7
8
9
10
int main(void)
{
Moore_Hodgson_optimal oschedule1{ {1,10,15},{2,3,6},{3,4,9 },{4,8,23},{5,10,20},{6,6,30} };
Moore_Hodgson_optimal oschedule2{ {1,4,6},{2,3,7},{3,2,8 },{4,5,9},{5,6,11} };

cout << "Schedule 1 : " << oschedule1 << endl;
cout << "Schedule 2 : " << oschedule2 << endl;

return 0;
}

輸出

1
2
Schedule 1 : 2 3 4 6 1 5
Schedule 2 : 2 3 5 1 4


思考

此演算法的目的是要讓未超時的工作數量最大化,也就是要讓逾期的工作量最小
整體的核心就是 Due
所以先用 Due 去由小到大排序,再依序把一個一個工作加到排程中
若發現加入當前工作後此工作會超時,就從以加入的工作中選一個最耗費時間的把他刪除掉
可以想成是讓整體的彈性時間更大 ( 也就是完成目前排程工作的時間到排程最後一個工作期限的這段緩衝 )
重複此動作到所以工作皆加入,就會是一個 Minimum number of late jobs 的 Schedule


證明演算法

我這邊就沒有證明了,因為我也沒有搞很懂QQ
若是有大大有興趣,可以看一下參考
我有放一些 Reference
記得搞懂了要來留言教我呀



參考網站

0%