All files / src/lib/scan concurrent.ts

98.55% Statements 68/69
94.28% Branches 33/35
90% Functions 9/10
98.55% Lines 68/69

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224                                                                                                                          5x 5x 5x 5x   5x 5x 36x 1x   35x 35x   34x     34x 34x 2x   32x 32x 34x 1x               5x     4x                                       139x 139x             139x 139x     139x 139x   139x 207x 207x       139x 139x 139x   139x 139x 139x 704x 2x   702x 702x   701x 2x   699x 697x 562x   135x 314x   135x       137x   2x   139x 139x       139x 139x 469x     129x 306x 301x   123x   340x 133x   207x 207x         139x       139x               139x   2x                   1038x 3x            
/**
 * Shared concurrency helper for grep + DSN scanner.
 *
 * Implements the "bounded parallelism with cooperative early exit"
 * pattern from `src/lib/dsn/code-scanner.ts::scanFilesForDsns`,
 * extracted so PR 2's grep engine and PR 3's DSN scanner can share
 * one tested implementation.
 *
 * ### Why not `pLimit.clearQueue()`
 *
 * `pLimit` exposes a `clearQueue()` method that cancels still-queued
 * tasks — but it does so by silently dropping the unresolved promises,
 * which means the outer `Promise.all` never settles. The DSN scanner
 * has a multi-line comment about this (`code-scanner.ts:634`). We use
 * a shared `earlyExit` boolean instead: queued tasks peek at it in
 * their first statement and return immediately.
 */
 
import pLimit from "p-limit";
import { CONCURRENCY_LIMIT } from "./constants.js";
 
/** Common options for the two concurrent helpers. */
export type ConcurrentOptions = {
  /** Max in-flight tasks. Default: `CONCURRENCY_LIMIT` (50). */
  concurrency?: number;
  /**
   * Abort mid-stream. When aborted, the next `yield` (stream variant)
   * or the `await` inside `mapFilesConcurrent` throws an
   * `AbortError`. In-flight tasks are left to settle on their own.
   */
  signal?: AbortSignal;
};
 
/**
 * Options for the gather-all `mapFilesConcurrent`.
 *
 * The optional `onResult` callback is invoked synchronously in the
 * coordinator after each per-item result is pushed. Returning
 * `{ done: true }` raises the shared early-exit flag — queued tasks
 * bail and `mapFilesConcurrent` resolves with whatever has been
 * collected so far.
 */
export type MapFilesOptions<T> = ConcurrentOptions & {
  onResult?: (result: T) => { done: boolean } | undefined;
};
 
/**
 * Run `fn` on every item from `source` with bounded concurrency,
 * collect all results into an array, and resolve when every task has
 * settled OR an early-exit signal has been raised and all in-flight
 * tasks have completed.
 *
 * Result order is completion order, NOT source order — if you need
 * source-order output, sort after. `null` returns from `fn` are
 * filtered out of the result array.
 */
export async function mapFilesConcurrent<TIn, TOut>(
  source: AsyncIterable<TIn>,
  fn: (item: TIn) => Promise<TOut | null>,
  opts: MapFilesOptions<TOut> = {}
): Promise<TOut[]> {
  const limit = pLimit(opts.concurrency ?? CONCURRENCY_LIMIT);
  const results: TOut[] = [];
  const state = { earlyExit: false };
  const tasks: Promise<void>[] = [];
 
  try {
    for await (const item of source) {
      if (state.earlyExit) {
        break;
      }
      throwIfAborted(opts.signal);
      tasks.push(
        limit(async () => {
          Iif (state.earlyExit) {
            return;
          }
          const out = await fn(item);
          if (state.earlyExit || out === null) {
            return;
          }
          results.push(out);
          const verdict = opts.onResult?.(out);
          if (verdict?.done) {
            state.earlyExit = true;
          }
        })
      );
    }
  } finally {
    // Always wait for in-flight tasks — otherwise a thrown AbortError
    // mid-iteration would leave orphans running against closed state.
    await Promise.all(tasks);
  }
 
  return results;
}
 
/**
 * Streaming counterpart to `mapFilesConcurrent`. Yields each non-null
 * result as soon as its producing task settles — useful when the
 * consumer wants to display matches progressively or terminate early
 * via `break`.
 *
 * `fn` returns `TOut[]` (or `null` for no output), which lets per-item
 * work emit multiple results (e.g., multiple grep matches per file).
 *
 * Consumer-initiated `break` drains the queue then stops pumping;
 * in-flight workers run to completion off the main path.
 */
export async function* mapFilesConcurrentStream<TIn, TOut>(
  source: AsyncIterable<TIn>,
  fn: (item: TIn) => Promise<TOut[] | null>,
  opts: ConcurrentOptions = {}
): AsyncGenerator<TOut> {
  const limit = pLimit(opts.concurrency ?? CONCURRENCY_LIMIT);
  const state = { earlyExit: false };
 
  // Producer-consumer buffer. Workers push into `queue`; the generator
  // drains it between awaits on `awake`. When `awake` resolves, it's
  // replaced in one atomic step so subsequent notifications don't
  // deadlock. We build the first promise inline so the TS flow
  // analyzer sees both variables as assigned before any use.
  const queue: TOut[] = [];
  let wakeUp: () => void = () => {
    /* reassigned by resetAwake below */
  };
  let awake: Promise<void> = new Promise<void>((r) => {
    wakeUp = r;
  });
  const resetAwake = () => {
    awake = new Promise<void>((r) => {
      wakeUp = r;
    });
  };
 
  let producerDone = false;
  let producerError: unknown = null;
  const tasks: Promise<void>[] = [];
 
  const producer = (async () => {
    try {
      for await (const item of source) {
        if (state.earlyExit) {
          break;
        }
        throwIfAborted(opts.signal);
        tasks.push(
          limit(async () => {
            if (state.earlyExit) {
              return;
            }
            const out = await fn(item);
            if (state.earlyExit || out === null || out.length === 0) {
              return;
            }
            for (const entry of out) {
              queue.push(entry);
            }
            wakeUp();
          })
        );
      }
      await Promise.all(tasks);
    } catch (error) {
      producerError = error;
    } finally {
      producerDone = true;
      wakeUp();
    }
  })();
 
  try {
    while (true) {
      if (queue.length > 0) {
        // Drain everything already queued before we yield control,
        // so fast consumers don't spin one microtask per result.
        while (queue.length > 0) {
          yield queue.shift() as TOut;
          throwIfAborted(opts.signal);
        }
        continue;
      }
      if (producerDone) {
        break;
      }
      await awake;
      resetAwake();
    }
  } finally {
    // Consumer-initiated break — stop pumping new tasks; in-flight
    // tasks still run to completion because we don't clearQueue().
    state.earlyExit = true;
    // Make sure the producer has exited before returning (it might
    // still be blocked on awake). We rely on the finally-block above
    // flipping `producerDone` + calling `wakeUp()`.
    await producer;
    // Propagate producer errors from inside the `finally` so they
    // surface on both paths: normal drain-to-completion AND
    // consumer-initiated `break`. Code *after* a generator's
    // try/finally is unreachable when the consumer breaks (the
    // runtime's `return()` resolves the iterator without executing
    // the post-try body), so an error thrown outside this block
    // would be silently lost on the break path.
    if (producerError) {
      // biome-ignore lint/correctness/noUnsafeFinally: intentional — this is the only path to surface producer errors on the break path
      throw producerError;
    }
  }
}
 
/**
 * Mirror Node's `AbortSignal.throwIfAborted` for Bun targets that
 * don't expose it.
 */
function throwIfAborted(signal: AbortSignal | undefined): void {
  if (signal?.aborted) {
    throw new DOMException(
      signal.reason instanceof Error ? signal.reason.message : "Aborted",
      "AbortError"
    );
  }
}