Skip to content

Use EventHandle; add missing API; uncurried fns #47

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bower.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"purescript-exceptions": "^6.0.0",
"purescript-node-buffer": "^8.0.0",
"purescript-nullable": "^6.0.0",
"purescript-prelude": "^6.0.0"
"purescript-prelude": "^6.0.0",
"purescript-node-event-emitter": "https://github.com/purescript-node/purescript-node-event-emitter.git#^3.0.0"
}
}
240 changes: 97 additions & 143 deletions src/Node/Stream.js
Original file line number Diff line number Diff line change
@@ -1,143 +1,97 @@
const _undefined = undefined;
export { _undefined as undefined };

export function setEncodingImpl(s) {
return enc => () => {
s.setEncoding(enc);
};
}

export function readChunkImpl(Left) {
return Right => chunk => {
if (chunk instanceof Buffer) {
return Right(chunk);
} else if (typeof chunk === "string") {
return Left(chunk);
} else {
throw new Error(
"Node.Stream.readChunkImpl: Unrecognised " +
"chunk type; expected String or Buffer, got: " +
chunk
);
}
};
}

export function onDataEitherImpl(readChunk) {
return r => f => () => {
r.on("data", data => {
f(readChunk(data))();
});
};
}

export function onEnd(s) {
return f => () => {
s.on("end", f);
};
}

export function onFinish(s) {
return f => () => {
s.on("finish", f);
};
}

export function onReadable(s) {
return f => () => {
s.on("readable", f);
};
}

export function onError(s) {
return f => () => {
s.on("error", e => {
f(e)();
});
};
}

export function onClose(s) {
return f => () => {
s.on("close", f);
};
}

export function resume(s) {
return () => {
s.resume();
};
}

export function pause(s) {
return () => {
s.pause();
};
}

export function isPaused(s) {
return () => s.isPaused();
}

export function pipe(r) {
return w => () => r.pipe(w);
}

export function unpipe(r) {
return w => () => r.unpipe(w);
}

export function unpipeAll(r) {
return () => r.unpipe();
}

export function readImpl(readChunk) {
return Nothing => Just => r => s => () => {
const v = r.read(s);
if (v === null) {
return Nothing;
} else {
return Just(readChunk(v));
}
};
}

export function writeImpl(w) {
return chunk => done => () => w.write(chunk, null, done);
}

export function writeStringImpl(w) {
return enc => s => done => () => w.write(s, enc, done);
}

export function cork(w) {
return () => w.cork();
}

export function uncork(w) {
return () => w.uncork();
}

export function setDefaultEncodingImpl(w) {
return enc => () => {
w.setDefaultEncoding(enc);
};
}

export function endImpl(w) {
return done => () => {
w.end(null, null, done);
};
}

export function destroy(strm) {
return () => {
strm.destroy(null);
};
}

export function destroyWithError(strm) {
return e => () => {
strm.destroy(e);
};
}
import stream from "node:stream";

export const setEncodingImpl = (s, enc) => s.setEncoding(enc);

export const readChunkImpl = (useBuffer, useString, chunk) => {
if (chunk instanceof Buffer) {
return useBuffer(chunk);
} else if (typeof chunk === "string") {
return useString(chunk);
} else {
throw new Error(
"Node.Stream.readChunkImpl: Unrecognised " +
"chunk type; expected String or Buffer, got: " +
chunk
);
}
};

export const readableImpl = (r) => r.readable;

export const readableEndedImpl = (r) => r.readableEnded;

export const readableFlowingImpl = (r) => r.readableFlowing;

export const readableHighWaterMarkImpl = (r) => r.readableHighWaterMark;

export const readableLengthImpl = (r) => r.readableLength;

export const resumeImpl = (r) => r.resume();

export const pauseImpl = (r) => r.pause;

export const isPausedImpl = (r) => r.isPaused;

export const pipeImpl = (r, w) => r.pipe(w);

export const pipeCbImpl = (r, w, cb) => r.pipe(w, cb);

export const unpipeAllImpl = (r) => r.unpipe();

export const unpipeImpl = (r, w) => r.unpipe(w);

export const readImpl = (r) => r.read();

export const readSizeImpl = (r, size) => r.read(size);

export const writeImpl = (w, buf) => w.write(buf);

export const writeCbImpl = (w, buf) => w.write(buf);

export const writeStringImpl = (w, str, enc) => w.write(str, enc);

export const writeStringCbImpl = (w, str, enc, cb) => w.write(str, enc, cb);

export const corkImpl = (w) => w.cork();

export const uncorkImpl = (w) => w.uncork();

export const setDefaultEncodingImpl = (w, enc) => w.setDefaultEncoding(enc);

export const endCbImpl = (w, cb) => w.end(cb);

export const endImpl = (w) => w.end();

export const writeableImpl = (w) => w.writeable;

export const writeableEndedImpl = (w) => w.writeableEnded;

export const writeableCorkedImpl = (w) => w.writeableCorked;

export const erroredImpl = (w) => w.errored;

export const writeableFinishedImpl = (w) => w.writeableFinished;

export const writeableHighWaterMarkImpl = (w) => w.writeableHighWaterMark;

export const writeableLengthImpl = (w) => w.writeableLength;

export const writeableNeedDrainImpl = (w) => w.writeableNeedDrain;

export const destroyImpl = (w) => w.destroy();

export const destroyErrorImpl = (w, e) => w.destroy(e);

export const closedImpl = (w) => w.closed;

export const destroyedImpl = (w) => w.destroyed;

export const allowHalfOpenImpl = (d) => d.allowHalfOpen;

export const pipelineImpl = (src, transforms, dst, cb) => stream.pipeline([src, ...transforms, dst], cb);

export const readableFromStrImpl = (str) => stream.Readable.from(str, { objectMode: false });

export const readableFromBufImpl = (buf) => stream.Readable.from(buf, { objectMode: false });

export const newPassThrough = () => new stream.PassThrough({ objectMode: false });
Loading