-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmod.test.ts
137 lines (124 loc) · 4.6 KB
/
mod.test.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import { assertEquals } from "https://deno.land/[email protected]/assert/assert_equals.ts";
import { assertObjectMatch } from "https://deno.land/[email protected]/assert/assert_object_match.ts";
import { delay } from "https://deno.land/[email protected]/async/delay.ts";
import { handlers, setup } from "https://deno.land/[email protected]/log/mod.ts";
import { Queue } from "./mod.ts";
setup({
handlers: {
console: new handlers.ConsoleHandler("DEBUG"),
},
loggers: {
kvmq: { level: "DEBUG", handlers: ["console"] },
},
});
const db = await Deno.openKv();
Deno.test("queue", async () => {
const queue = new Queue<string>(db, "test");
await queue.deleteWaitingJobs();
await queue.pushJob("a");
await queue.pushJob("b", { priority: 1 });
await queue.pushJob("error", { retryCount: 1 });
await queue.pushJob("c", { repeatCount: 1 });
await queue.pushJob("error");
await queue.pushJob("d");
const jobs = await queue.getAllJobs();
assertEquals(jobs.length, 6);
assertObjectMatch(jobs[0], { state: "b", place: 1, status: "waiting" });
assertObjectMatch(jobs[1], { state: "a", place: 2, status: "waiting" });
assertObjectMatch(jobs[2], { state: "error", place: 3, status: "waiting" });
const results: string[] = [];
const errors: string[] = [];
const worker = queue.createWorker(async (job) => {
console.log("processing", job.state);
await delay(1000);
if (job.state === "error") {
throw new Error("error");
}
results.push(job.state);
});
worker.addEventListener("error", (e) => {
errors.push(e.detail.error.message);
});
const controller = new AbortController();
worker.processJobs({ signal: controller.signal });
await delay(500);
// initial state
assertEquals(results, []);
assertEquals(errors, []);
await delay(1000);
// b is processed first because it has the highest priority
assertEquals(results, ["b"]);
assertEquals(errors, []);
await delay(1000);
// a is processed next
assertEquals(results, ["b", "a"]);
assertEquals(errors, []);
await delay(1000);
// error is next, should be attempted and fail 2 times
assertEquals(results, ["b", "a"]);
assertEquals(errors, ["error"]);
await delay(1000);
assertEquals(results, ["b", "a"]);
assertEquals(errors, ["error", "error"]);
await delay(1000);
// c is next, first repeat
assertEquals(results, ["b", "a", "c"]);
assertEquals(errors, ["error", "error"]);
await delay(1000);
// second error
assertEquals(results, ["b", "a", "c"]);
assertEquals(errors, ["error", "error", "error"]);
await delay(1000);
// d
assertEquals(results, ["b", "a", "c", "d"]);
assertEquals(errors, ["error", "error", "error"]);
await delay(1000);
// second repeat of c (repeats re-add at the end of the queue)
assertEquals(results, ["b", "a", "c", "d", "c"]);
assertEquals(errors, ["error", "error", "error"]);
await delay(1000);
// end, nothing changed
assertEquals(results, ["b", "a", "c", "d", "c"]);
assertEquals(errors, ["error", "error", "error"]);
// now waiting 3 seconds before polling again
await queue.pushJob("error", { retryCount: 1, retryDelayMs: 500 });
await queue.pushJob("e");
await queue.pushJob("f");
await delay(3000);
// error
assertEquals(results, ["b", "a", "c", "d", "c"]);
assertEquals(errors, ["error", "error", "error", "error"]);
await delay(1000);
// e
assertEquals(results, ["b", "a", "c", "d", "c", "e"]);
assertEquals(errors, ["error", "error", "error", "error"]);
controller.abort();
await delay(1000);
// error retry
assertEquals(results, ["b", "a", "c", "d", "c", "e"]);
assertEquals(errors, ["error", "error", "error", "error", "error"]);
await delay(1000);
// nothing because aborted
assertEquals(results, ["b", "a", "c", "d", "c", "e"]);
assertEquals(errors, ["error", "error", "error", "error", "error"]);
await delay(1000);
const controller2 = new AbortController();
const processPromise = worker.processJobs({ signal: controller2.signal });
await delay(500);
// initial after resume
assertEquals(results, ["b", "a", "c", "d", "c", "e"]);
assertEquals(errors, ["error", "error", "error", "error", "error"]);
const jobs2 = await queue.getAllJobs();
assertEquals(jobs2.length, 1);
assertObjectMatch(jobs2[0], { state: "f", place: 0, status: "processing" });
await delay(1000);
// f
assertEquals(results, ["b", "a", "c", "d", "c", "e", "f"]);
assertEquals(errors, ["error", "error", "error", "error", "error"]);
await delay(1000);
// nothing
assertEquals(results, ["b", "a", "c", "d", "c", "e", "f"]);
assertEquals(errors, ["error", "error", "error", "error", "error"]);
controller2.abort();
await processPromise;
});