task-queue.ts 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. import Piscina from '..';
  2. import { test } from 'tap';
  3. import { resolve } from 'path';
  4. import { Task, TaskQueue } from '../dist/src/common';
  5. test('will put items into a task queue until they can run', async ({ equal }) => {
  6. const pool = new Piscina({
  7. filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
  8. minThreads: 2,
  9. maxThreads: 3
  10. });
  11. equal(pool.threads.length, 2);
  12. equal(pool.queueSize, 0);
  13. const buffers = [
  14. new Int32Array(new SharedArrayBuffer(4)),
  15. new Int32Array(new SharedArrayBuffer(4)),
  16. new Int32Array(new SharedArrayBuffer(4)),
  17. new Int32Array(new SharedArrayBuffer(4))
  18. ];
  19. const results = [];
  20. results.push(pool.runTask(buffers[0]));
  21. equal(pool.threads.length, 2);
  22. equal(pool.queueSize, 0);
  23. results.push(pool.runTask(buffers[1]));
  24. equal(pool.threads.length, 2);
  25. equal(pool.queueSize, 0);
  26. results.push(pool.runTask(buffers[2]));
  27. equal(pool.threads.length, 3);
  28. equal(pool.queueSize, 0);
  29. results.push(pool.runTask(buffers[3]));
  30. equal(pool.threads.length, 3);
  31. equal(pool.queueSize, 1);
  32. for (const buffer of buffers) {
  33. Atomics.store(buffer, 0, 1);
  34. Atomics.notify(buffer, 0, 1);
  35. }
  36. await results[0];
  37. equal(pool.queueSize, 0);
  38. await Promise.all(results);
  39. });
  40. test('will reject items over task queue limit', async ({ equal, rejects }) => {
  41. const pool = new Piscina({
  42. filename: resolve(__dirname, 'fixtures/eval.ts'),
  43. minThreads: 0,
  44. maxThreads: 1,
  45. maxQueue: 2
  46. });
  47. equal(pool.threads.length, 0);
  48. equal(pool.queueSize, 0);
  49. rejects(pool.runTask('while (true) {}'), /Terminating worker thread/);
  50. equal(pool.threads.length, 1);
  51. equal(pool.queueSize, 0);
  52. rejects(pool.runTask('while (true) {}'), /Terminating worker thread/);
  53. equal(pool.threads.length, 1);
  54. equal(pool.queueSize, 1);
  55. rejects(pool.runTask('while (true) {}'), /Terminating worker thread/);
  56. equal(pool.threads.length, 1);
  57. equal(pool.queueSize, 2);
  58. rejects(pool.runTask('while (true) {}'), /Task queue is at limit/);
  59. await pool.destroy();
  60. });
  61. test('will reject items when task queue is unavailable', async ({ equal, rejects }) => {
  62. const pool = new Piscina({
  63. filename: resolve(__dirname, 'fixtures/eval.ts'),
  64. minThreads: 0,
  65. maxThreads: 1,
  66. maxQueue: 0
  67. });
  68. equal(pool.threads.length, 0);
  69. equal(pool.queueSize, 0);
  70. rejects(pool.runTask('while (true) {}'), /Terminating worker thread/);
  71. equal(pool.threads.length, 1);
  72. equal(pool.queueSize, 0);
  73. rejects(pool.runTask('while (true) {}'), /No task queue available and all Workers are busy/);
  74. await pool.destroy();
  75. });
  76. test('will reject items when task queue is unavailable (fixed thread count)', async ({ equal, rejects }) => {
  77. const pool = new Piscina({
  78. filename: resolve(__dirname, 'fixtures/eval.ts'),
  79. minThreads: 1,
  80. maxThreads: 1,
  81. maxQueue: 0
  82. });
  83. equal(pool.threads.length, 1);
  84. equal(pool.queueSize, 0);
  85. rejects(pool.runTask('while (true) {}'), /Terminating worker thread/);
  86. equal(pool.threads.length, 1);
  87. equal(pool.queueSize, 0);
  88. rejects(pool.runTask('while (true) {}'), /No task queue available and all Workers are busy/);
  89. await pool.destroy();
  90. });
  91. test('tasks can share a Worker if requested (both tests blocking)', async ({ equal, rejects }) => {
  92. const pool = new Piscina({
  93. filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
  94. minThreads: 0,
  95. maxThreads: 1,
  96. maxQueue: 0,
  97. concurrentTasksPerWorker: 2
  98. });
  99. equal(pool.threads.length, 0);
  100. equal(pool.queueSize, 0);
  101. rejects(pool.runTask(new Int32Array(new SharedArrayBuffer(4))));
  102. equal(pool.threads.length, 1);
  103. equal(pool.queueSize, 0);
  104. rejects(pool.runTask(new Int32Array(new SharedArrayBuffer(4))));
  105. equal(pool.threads.length, 1);
  106. equal(pool.queueSize, 0);
  107. await pool.destroy();
  108. });
  109. test('tasks can share a Worker if requested (one test finishes)', async ({ equal, rejects }) => {
  110. const pool = new Piscina({
  111. filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
  112. minThreads: 0,
  113. maxThreads: 1,
  114. maxQueue: 0,
  115. concurrentTasksPerWorker: 2
  116. });
  117. const buffers = [
  118. new Int32Array(new SharedArrayBuffer(4)),
  119. new Int32Array(new SharedArrayBuffer(4))
  120. ];
  121. equal(pool.threads.length, 0);
  122. equal(pool.queueSize, 0);
  123. const firstTask = pool.runTask(buffers[0]);
  124. equal(pool.threads.length, 1);
  125. equal(pool.queueSize, 0);
  126. rejects(pool.runTask(
  127. 'new Promise((resolve) => setTimeout(resolve, 1000000))',
  128. resolve(__dirname, 'fixtures/eval.js')), /Terminating worker thread/);
  129. equal(pool.threads.length, 1);
  130. equal(pool.queueSize, 0);
  131. Atomics.store(buffers[0], 0, 1);
  132. Atomics.notify(buffers[0], 0, 1);
  133. await firstTask;
  134. equal(pool.threads.length, 1);
  135. equal(pool.queueSize, 0);
  136. await pool.destroy();
  137. });
  138. test('tasks can share a Worker if requested (both tests finish)', async ({ equal }) => {
  139. const pool = new Piscina({
  140. filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
  141. minThreads: 1,
  142. maxThreads: 1,
  143. maxQueue: 0,
  144. concurrentTasksPerWorker: 2
  145. });
  146. const buffers = [
  147. new Int32Array(new SharedArrayBuffer(4)),
  148. new Int32Array(new SharedArrayBuffer(4))
  149. ];
  150. equal(pool.threads.length, 1);
  151. equal(pool.queueSize, 0);
  152. const firstTask = pool.runTask(buffers[0]);
  153. equal(pool.threads.length, 1);
  154. equal(pool.queueSize, 0);
  155. const secondTask = pool.runTask(buffers[1]);
  156. equal(pool.threads.length, 1);
  157. equal(pool.queueSize, 0);
  158. Atomics.store(buffers[0], 0, 1);
  159. Atomics.store(buffers[1], 0, 1);
  160. Atomics.notify(buffers[0], 0, 1);
  161. Atomics.notify(buffers[1], 0, 1);
  162. Atomics.wait(buffers[0], 0, 1);
  163. Atomics.wait(buffers[1], 0, 1);
  164. await firstTask;
  165. equal(buffers[0][0], -1);
  166. await secondTask;
  167. equal(buffers[1][0], -1);
  168. equal(pool.threads.length, 1);
  169. equal(pool.queueSize, 0);
  170. });
  171. test('custom task queue works', async ({ equal, ok }) => {
  172. let sizeCalled : boolean = false;
  173. let shiftCalled : boolean = false;
  174. let pushCalled : boolean = false;
  175. class CustomTaskPool implements TaskQueue {
  176. tasks: Task[] = [];
  177. get size () : number {
  178. sizeCalled = true;
  179. return this.tasks.length;
  180. }
  181. shift () : Task | null {
  182. shiftCalled = true;
  183. return this.tasks.length > 0 ? this.tasks.shift() as Task : null;
  184. }
  185. push (task : Task) : void {
  186. pushCalled = true;
  187. this.tasks.push(task);
  188. ok(Piscina.queueOptionsSymbol in task);
  189. if ((task as any).task.a === 3) {
  190. equal(task[Piscina.queueOptionsSymbol], null);
  191. } else {
  192. equal(task[Piscina.queueOptionsSymbol].option,
  193. (task as any).task.a);
  194. }
  195. }
  196. remove (task : Task) : void {
  197. const index = this.tasks.indexOf(task);
  198. this.tasks.splice(index, 1);
  199. }
  200. };
  201. const pool = new Piscina({
  202. filename: resolve(__dirname, 'fixtures/eval.js'),
  203. taskQueue: new CustomTaskPool(),
  204. // Setting maxThreads low enough to ensure we queue
  205. maxThreads: 1,
  206. minThreads: 1
  207. });
  208. function makeTask (task, option) {
  209. return { ...task, [Piscina.queueOptionsSymbol]: { option } };
  210. }
  211. const ret = await Promise.all([
  212. pool.runTask(makeTask({ a: 1 }, 1)),
  213. pool.runTask(makeTask({ a: 2 }, 2)),
  214. pool.runTask({ a: 3 }) // No queueOptionsSymbol attached
  215. ]);
  216. equal(ret[0].a, 1);
  217. equal(ret[1].a, 2);
  218. equal(ret[2].a, 3);
  219. ok(sizeCalled);
  220. ok(pushCalled);
  221. ok(shiftCalled);
  222. });