import { Observable, Subject } from 'rxjs';

export default class KeyedSynchronize<K extends keyof any> {
  private locked = new Set<K>();
  private queue: Map<K, Subject<void>[]> = new Map<K, Subject<void>[]>();

  public runSync<T>(key: K, task: () => Observable<T>): Observable<T> {
    return new Observable<T>(subscriber =>
      this.waitUntilUnlocked(key)
        .subscribe(() =>
          task().subscribe({
            next: value => subscriber.next(value),
            error: error => subscriber.error(error),
            complete: () => {
              subscriber.complete();
              this.locked.delete(key);
              this.queue.get(key)?.shift()?.next();
            },
          })));
  }

  private waitUntilUnlocked(key: K): Observable<void> {
    if (!this.locked.has(key)) {
      this.locked.add(key);
      return new Observable<void>(subscriber => subscriber.next());
    }

    const subject = new Subject<void>();
    if (this.queue.has(key)) {
      this.queue.get(key)!.push(subject);
    } else {
      this.queue.set(key, [subject]);
    }
    return subject.asObservable();
  }
}
