[Javascript] Broadcaster + Operator + Listener pattern -- 14. Marking Done Based on a Condition

Asynchronous code often uses conditions to determine when a block of code should finish running. This lesson walks through writing a doneIf operator which can check the current value passed through against a custom condition and pass a done value if that condition is true.

import { pipe } from "ramda";
import { addListener, done } from "./broadcasters";
import { repeatWhen, filter } from "./operators";

const log = console.log;
const inputClick = addListener("#input", "click");

let doneIf = (condition) => (broadcaster) => (listener) => {
  let cancel;
  cancel = broadcaster((value) => {
    if (condition(value)) {
      listener(done);
      return;
    }
    listener(value);
  });

  return () => {
    cancel();
  };
};

let state = (broadcaster) => (listener) => {
  let count = 3;
  return broadcaster(() => {
    count--;
    listener(count);
  });
};

let interEnter = filter((e) => e.key === "Enter")(
  addListener("#input", "keydown")
);
let score = pipe(
  state,
  doneIf((v) => v === 0),
  repeatWhen(interEnter)
);
score(inputClick)(log);

// operators.js

import { curry } from "lodash";
import { done } from "./broadcasters";

let createOperator = curry((operator, broadcaster, listener) => {
  return operator((behaviorListener) => {
    return broadcaster((value) => {
      if (value === done) {
        listener(done);
        return;
      }

      behaviorListener(value);
    });
  }, listener);
});

export let map = (transform) =>
  createOperator((broadcaster, listener) => {
    return broadcaster((value) => {
      listener(transform(value));
    });
  });

export let filter = (predicate) =>
  createOperator((broadcaster, listener) => {
    return broadcaster((value) => {
      if (predicate(value)) {
        listener(value);
      }
    });
  });

export let split = (splitter) =>
  curry((broadcaster, listener) => {
    let buffer = [];
    return broadcaster((value) => {
      if (value === done) {
        listener(buffer);
        buffer = [];
        listener(done);
      }
      if (value == splitter) {
        listener(buffer);
        buffer = [];
      } else {
        buffer.push(value);
      }
    });
  });

export let hardCode = (newValue) =>
  createOperator((broadcaster, listener) => {
    let cancel = broadcaster((value) => {
      listener(newValue);
    });

    return () => cancel();
  });

export let add = (initial) => (broadcaster) => (listener) => {
  return broadcaster((value) => {
    listener((initial += value));
  });
};

export let startWhen = (whenBroadcaster) => (mainBroadcaster) => (listener) => {
  let cancelMain;
  let cancelWhen;

  cancelWhen = whenBroadcaster(() => {
    if (cancelMain) cancelMain();
    cancelMain = mainBroadcaster((value) => {
      listener(value);
    });
  });

  return () => {
    cancelMain();
    cancelWhen();
  };
};

export let stopWhen = (whenBroadcaster) => (mainBroadcaster) => (listener) => {
  let cancelMain = mainBroadcaster(listener);

  let cancelWhen = whenBroadcaster((value) => {
    cancelMain();
  });

  return () => {
    cancelMain();
    cancelWhen();
  };
};

export let targetValue = map((event) => event.target.value);

export let mapBroadcaster = (createBroadcaster) => (broadcaster) => (
  listener
) => {
  return broadcaster((value) => {
    let newBroadcaster = createBroadcaster(value);
    newBroadcaster(listener);
  });
};

export let applyOperator = (broadcaster) =>
  mapBroadcaster((operator) => operator(broadcaster));

export let stringConcat = (broadcaster) => (listener) => {
  let result = "";
  return broadcaster((value) => {
    if (value === done) {
      listener(result);
      result = "";
      return;
    }
    result += value;
  });
};

// only when listener receive DONE event
// then broadcaster should trigger the listener again
// otherwise, keep emit value
export let repeat = (broadcaster) => (listener) => {
  let cancel;
  let repeatListener = (value) => {
    // when it is doen event
    if (value === done) {
      // because it is repeated event
      // need to cancel previous one
      if (cancel) {
        cancel();
      }
      // broadcaster should trigger the listener again
      cancel = broadcaster(repeatListener);
      return;
    }
    // otherwise, keep emitting value
    listener(value);
  };
  cancel = broadcaster(repeatListener);

  return () => cancel();
};

// Only when 'whenBroadcater' happen then do the repeat logic
export let repeatWhen = (whenBroadcaster) => (mainBroadcaster) => (listener) => {
  let mainCancel;
  let whenCancel;
  let repeatListener = (value) => {
    if (value === done) {
      if (mainCancel) {
        mainCancel();
      }
      whenCancel = whenBroadcaster(() => {
       // cancel previous when broadcaster 
        whenCancel();
        mainCancel = mainBroadcaster(repeatListener);
      });
    }
    listener(value);
  };
  mainCancel = mainBroadcaster(repeatListener);

  return () => {
    mainCancel();
    whenCancel();
  };
};

// broadcasters.js

import { curry } from "ramda";
export let done = Symbol("done");

export let createTimeout = curry((time, listener) => {
  let id = setTimeout(() => {
    listener(null);
    listener(done);
  }, time);

  return () => {
    clearTimeout(id);
  };
});

export let addListener = curry((selector, eventType, listener) => {
  let element = document.querySelector(selector);
  element.addEventListener(eventType, listener);

  return () => {
    element.removeEventListener(eventType, listener);
  };
});

export let createInterval = curry((time, listener) => {
  let i = 0;
  let id = setInterval(() => {
    listener(i++);
  }, time);
  return () => {
    clearInterval(id);
  };
});

//broadcaster = function that accepts a listener
export let merge = curry((broadcaster1, broadcaster2, listener) => {
  let cancel1 = broadcaster1(listener);
  let cancel2 = broadcaster2(listener);

  return () => {
    cancel1();
    cancel2();
  };
});

export let zip = curry((broadcaster1, broadcaster2, listener) => {
  let cancelBoth;

  let buffer1 = [];
  let cancel1 = broadcaster1((value) => {
    buffer1.push(value);
    // console.log(buffer1)
    if (buffer2.length) {
      listener([buffer1.shift(), buffer2.shift()]);

      if (buffer1[0] === done || buffer2[0] === done) {
        listener(done);
        cancelBoth();
      }
    }
  });

  let buffer2 = [];
  let cancel2 = broadcaster2((value) => {
    buffer2.push(value);

    if (buffer1.length) {
      listener([buffer1.shift(), buffer2.shift()]);
      if (buffer1[0] === done || buffer2[0] === done) {
        listener(done);
        cancelBoth();
      }
    }
  });

  cancelBoth = () => {
    cancel1();
    cancel2();
  };

  return cancelBoth;
});

export let forOf = curry((iterable, listener) => {
  let id = setTimeout(() => {
    for (let i of iterable) {
      listener(i);
    }
    listener(done);
  }, 0);

  return () => {
    clearTimeout(id);
  };
});

  

原文地址:https://www.cnblogs.com/Answer1215/p/13923649.html