abort-task.ts 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. import { AbortController } from 'abort-controller';
  2. import { EventEmitter } from 'events';
  3. import Piscina from '..';
  4. import { test } from 'tap';
  5. import { resolve } from 'path';
  6. test('tasks can be aborted through AbortController while running', async ({ equal, rejects }) => {
  7. const pool = new Piscina({
  8. filename: resolve(__dirname, 'fixtures/notify-then-sleep.ts')
  9. });
  10. const buf = new Int32Array(new SharedArrayBuffer(4));
  11. const abortController = new AbortController();
  12. rejects(pool.runTask(buf, abortController.signal),
  13. /The task has been aborted/);
  14. Atomics.wait(buf, 0, 0);
  15. equal(Atomics.load(buf, 0), 1);
  16. abortController.abort();
  17. });
  18. test('tasks can be aborted through EventEmitter while running', async ({ equal, rejects }) => {
  19. const pool = new Piscina({
  20. filename: resolve(__dirname, 'fixtures/notify-then-sleep.ts')
  21. });
  22. const buf = new Int32Array(new SharedArrayBuffer(4));
  23. const ee = new EventEmitter();
  24. rejects(pool.runTask(buf, ee), /The task has been aborted/);
  25. rejects(pool.run(buf, { signal: ee }), /The task has been aborted/);
  26. Atomics.wait(buf, 0, 0);
  27. equal(Atomics.load(buf, 0), 1);
  28. ee.emit('abort');
  29. });
  30. test('tasks can be aborted through EventEmitter before running', async ({ equal, rejects }) => {
  31. const pool = new Piscina({
  32. filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
  33. maxThreads: 1
  34. });
  35. const bufs = [
  36. new Int32Array(new SharedArrayBuffer(4)),
  37. new Int32Array(new SharedArrayBuffer(4))
  38. ];
  39. const task1 = pool.runTask(bufs[0]);
  40. const ee = new EventEmitter();
  41. rejects(pool.runTask(bufs[1], ee), /The task has been aborted/);
  42. rejects(pool.run(bufs[1], { signal: ee }), /The task has been aborted/);
  43. equal(pool.queueSize, 2);
  44. ee.emit('abort');
  45. // Wake up the thread handling the first task.
  46. Atomics.store(bufs[0], 0, 1);
  47. Atomics.notify(bufs[0], 0, 1);
  48. await task1;
  49. });
  50. test('abortable tasks will not share workers (abortable posted second)', async ({ equal, rejects }) => {
  51. const pool = new Piscina({
  52. filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
  53. maxThreads: 1,
  54. concurrentTasksPerWorker: 2
  55. });
  56. const bufs = [
  57. new Int32Array(new SharedArrayBuffer(4)),
  58. new Int32Array(new SharedArrayBuffer(4))
  59. ];
  60. const task1 = pool.runTask(bufs[0]);
  61. const ee = new EventEmitter();
  62. rejects(pool.runTask(bufs[1], ee), /The task has been aborted/);
  63. equal(pool.queueSize, 1);
  64. ee.emit('abort');
  65. // Wake up the thread handling the first task.
  66. Atomics.store(bufs[0], 0, 1);
  67. Atomics.notify(bufs[0], 0, 1);
  68. await task1;
  69. });
  70. test('abortable tasks will not share workers (abortable posted first)', async ({ equal, rejects }) => {
  71. const pool = new Piscina({
  72. filename: resolve(__dirname, 'fixtures/eval.js'),
  73. maxThreads: 1,
  74. concurrentTasksPerWorker: 2
  75. });
  76. const ee = new EventEmitter();
  77. rejects(pool.runTask('while(true);', ee), /The task has been aborted/);
  78. const task2 = pool.runTask('42');
  79. equal(pool.queueSize, 1);
  80. ee.emit('abort');
  81. // Wake up the thread handling the second task.
  82. equal(await task2, 42);
  83. });
  84. test('abortable tasks will not share workers (on worker available)', async ({ equal }) => {
  85. const pool = new Piscina({
  86. filename: resolve(__dirname, 'fixtures/sleep.js'),
  87. maxThreads: 1,
  88. concurrentTasksPerWorker: 2
  89. });
  90. // Task 1 will sleep 100 ms then complete,
  91. // Task 2 will sleep 300 ms then complete.
  92. // Abortable task 3 should still be in the queue
  93. // when Task 1 completes, but should not be selected
  94. // until after Task 2 completes because it is abortable.
  95. const ret = await Promise.all([
  96. pool.runTask({ time: 100, a: 1 }),
  97. pool.runTask({ time: 300, a: 2 }),
  98. pool.runTask({ time: 100, a: 3 }, new EventEmitter())
  99. ]);
  100. equal(ret[0], 0);
  101. equal(ret[1], 1);
  102. equal(ret[2], 2);
  103. });
  104. test('abortable tasks will not share workers (destroy workers)', async ({ rejects }) => {
  105. const pool = new Piscina({
  106. filename: resolve(__dirname, 'fixtures/sleep.js'),
  107. maxThreads: 1,
  108. concurrentTasksPerWorker: 2
  109. });
  110. // Task 1 will sleep 100 ms then complete,
  111. // Task 2 will sleep 300 ms then complete.
  112. // Abortable task 3 should still be in the queue
  113. // when Task 1 completes, but should not be selected
  114. // until after Task 2 completes because it is abortable.
  115. pool.runTask({ time: 100, a: 1 }).then(() => {
  116. pool.destroy();
  117. });
  118. rejects(pool.runTask({ time: 300, a: 2 }), /Terminating worker thread/);
  119. rejects(pool.runTask({ time: 100, a: 3 }, new EventEmitter()),
  120. /Terminating worker thread/);
  121. });
  122. test('aborted AbortSignal rejects task immediately', async ({ rejects, equal }) => {
  123. const pool = new Piscina({
  124. filename: resolve(__dirname, 'fixtures/move.ts')
  125. });
  126. const controller = new AbortController();
  127. // Abort the controller early
  128. controller.abort();
  129. equal(controller.signal.aborted, true);
  130. // The data won't be moved because the task will abort immediately.
  131. const data = new Uint8Array(new SharedArrayBuffer(4));
  132. rejects(pool.runTask(data, [data.buffer], controller.signal),
  133. /The task has been aborted/);
  134. equal(data.length, 4);
  135. });
  136. test('task with AbortSignal cleans up properly', async ({ equal }) => {
  137. const pool = new Piscina({
  138. filename: resolve(__dirname, 'fixtures/eval.js')
  139. });
  140. const ee = new EventEmitter();
  141. await pool.runTask('1+1', ee);
  142. const { getEventListeners } = EventEmitter as any;
  143. if (typeof getEventListeners === 'function') {
  144. equal(getEventListeners(ee, 'abort').length, 0);
  145. }
  146. const controller = new AbortController();
  147. await pool.runTask('1+1', controller.signal);
  148. });