var Stream = require('stream') module.exports = function (write, end) { var stream = new Stream() var buffer = [], ended = false, destroyed = false, emitEnd stream.writable = stream.readable = true stream.paused = false stream._paused = false stream.buffer = buffer stream .on('pause', function () { stream._paused = true }) .on('drain', function () { stream._paused = false }) function destroySoon () { process.nextTick(stream.destroy.bind(stream)) } if(write) stream.on('_data', write) if(end) stream.on('_end', end) //destroy the stream once both ends are over //but do it in nextTick, so that other listeners //on end have time to respond stream.once('end', function () { stream.readable = false if(!stream.writable) { process.nextTick(function () { stream.destroy() }) } }) stream.once('_end', function () { stream.writable = false if(!stream.readable) stream.destroy() }) // this is the default write method, // if you overide it, you are resposible // for pause state. stream._data = function (data) { if(!stream.paused && !buffer.length) stream.emit('data', data) else buffer.push(data) return !(stream.paused || buffer.length) } stream._end = function (data) { if(data) stream._data(data) if(emitEnd) return emitEnd = true //destroy is handled above. stream.drain() } stream.write = function (data) { stream.emit('_data', data) return !stream._paused } stream.end = function () { stream.writable = false if(stream.ended) return stream.ended = true stream.emit('_end') } stream.drain = function () { if(!buffer.length && !emitEnd) return //if the stream is paused after just before emitEnd() //end should be buffered. while(!stream.paused) { if(buffer.length) { stream.emit('data', buffer.shift()) if(buffer.length == 0) { stream.emit('_drain') } } else if(emitEnd && stream.readable) { stream.readable = false stream.emit('end') return } else { //if the buffer has emptied. emit drain. return true } } } var started = false stream.resume = function () { //this is where I need pauseRead, and pauseWrite. //here the reading side is unpaused, //but the writing side may still be paused. //the whole buffer might not empity at once. //it might pause again. //the stream should never emit data inbetween pause()...resume() //and write should return !buffer.length started = true stream.paused = false stream.drain() //will emit drain if buffer empties. return stream } stream.destroy = function () { if(destroyed) return destroyed = ended = true buffer.length = 0 stream.emit('close') } var pauseCalled = false stream.pause = function () { started = true stream.paused = true stream.emit('_pause') return stream } stream._pause = function () { if(!stream._paused) { stream._paused = true stream.emit('pause') } return this } stream.paused = true process.nextTick(function () { //unless the user manually paused if(started) return stream.resume() }) return stream }