mirror of
https://github.com/mafintosh/tar-stream.git
synced 2024-11-13 13:39:28 +00:00
407 lines
9.0 KiB
JavaScript
407 lines
9.0 KiB
JavaScript
const { Writable, Readable, getStreamError } = require('streamx')
|
|
const FIFO = require('fast-fifo')
|
|
const b4a = require('b4a')
|
|
const headers = require('./headers')
|
|
|
|
const EMPTY = b4a.alloc(0)
|
|
|
|
class BufferList {
|
|
constructor () {
|
|
this.buffered = 0
|
|
this.shifted = 0
|
|
this.queue = new FIFO()
|
|
|
|
this._offset = 0
|
|
}
|
|
|
|
push (buffer) {
|
|
this.buffered += buffer.byteLength
|
|
this.queue.push(buffer)
|
|
}
|
|
|
|
shiftFirst (size) {
|
|
return this._buffered === 0 ? null : this._next(size)
|
|
}
|
|
|
|
shift (size) {
|
|
if (size > this.buffered) return null
|
|
if (size === 0) return EMPTY
|
|
|
|
let chunk = this._next(size)
|
|
|
|
if (size === chunk.byteLength) return chunk // likely case
|
|
|
|
const chunks = [chunk]
|
|
|
|
while ((size -= chunk.byteLength) > 0) {
|
|
chunk = this._next(size)
|
|
chunks.push(chunk)
|
|
}
|
|
|
|
return b4a.concat(chunks)
|
|
}
|
|
|
|
_next (size) {
|
|
const buf = this.queue.peek()
|
|
const rem = buf.byteLength - this._offset
|
|
|
|
if (size >= rem) {
|
|
const sub = this._offset ? buf.subarray(this._offset, buf.byteLength) : buf
|
|
this.queue.shift()
|
|
this._offset = 0
|
|
this.buffered -= rem
|
|
this.shifted += rem
|
|
return sub
|
|
}
|
|
|
|
this.buffered -= size
|
|
this.shifted += size
|
|
|
|
return buf.subarray(this._offset, (this._offset += size))
|
|
}
|
|
}
|
|
|
|
class Source extends Readable {
|
|
constructor (self, header, offset) {
|
|
super()
|
|
|
|
this.header = header
|
|
this.offset = offset
|
|
|
|
this._parent = self
|
|
}
|
|
|
|
_read (cb) {
|
|
if (this.header.size === 0) {
|
|
this.push(null)
|
|
}
|
|
if (this._parent._stream === this) {
|
|
this._parent._update()
|
|
}
|
|
cb(null)
|
|
}
|
|
|
|
_predestroy () {
|
|
this._parent.destroy(getStreamError(this))
|
|
}
|
|
|
|
_detach () {
|
|
if (this._parent._stream === this) {
|
|
this._parent._stream = null
|
|
this._parent._missing = overflow(this.header.size)
|
|
this._parent._update()
|
|
}
|
|
}
|
|
|
|
_destroy (cb) {
|
|
this._detach()
|
|
cb(null)
|
|
}
|
|
}
|
|
|
|
class Extract extends Writable {
|
|
constructor (opts) {
|
|
super(opts)
|
|
|
|
if (!opts) opts = {}
|
|
|
|
this._buffer = new BufferList()
|
|
this._offset = 0
|
|
this._header = null
|
|
this._stream = null
|
|
this._missing = 0
|
|
this._longHeader = false
|
|
this._callback = noop
|
|
this._locked = false
|
|
this._finished = false
|
|
this._pax = null
|
|
this._paxGlobal = null
|
|
this._gnuLongPath = null
|
|
this._gnuLongLinkPath = null
|
|
this._filenameEncoding = opts.filenameEncoding || 'utf-8'
|
|
this._allowUnknownFormat = !!opts.allowUnknownFormat
|
|
this._unlockBound = this._unlock.bind(this)
|
|
}
|
|
|
|
_unlock (err) {
|
|
this._locked = false
|
|
|
|
if (err) {
|
|
this.destroy(err)
|
|
this._continueWrite(err)
|
|
return
|
|
}
|
|
|
|
this._update()
|
|
}
|
|
|
|
_consumeHeader () {
|
|
if (this._locked) return false
|
|
|
|
this._offset = this._buffer.shifted
|
|
|
|
try {
|
|
this._header = headers.decode(this._buffer.shift(512), this._filenameEncoding, this._allowUnknownFormat)
|
|
} catch (err) {
|
|
this._continueWrite(err)
|
|
return false
|
|
}
|
|
|
|
if (!this._header) return true
|
|
|
|
switch (this._header.type) {
|
|
case 'gnu-long-path':
|
|
case 'gnu-long-link-path':
|
|
case 'pax-global-header':
|
|
case 'pax-header':
|
|
this._longHeader = true
|
|
this._missing = this._header.size
|
|
return true
|
|
}
|
|
|
|
this._locked = true
|
|
this._applyLongHeaders()
|
|
|
|
if (this._header.size === 0 || this._header.type === 'directory') {
|
|
this.emit('entry', this._header, this._createStream(), this._unlockBound)
|
|
return true
|
|
}
|
|
|
|
this._stream = this._createStream()
|
|
this._missing = this._header.size
|
|
|
|
this.emit('entry', this._header, this._stream, this._unlockBound)
|
|
return true
|
|
}
|
|
|
|
_applyLongHeaders () {
|
|
if (this._gnuLongPath) {
|
|
this._header.name = this._gnuLongPath
|
|
this._gnuLongPath = null
|
|
}
|
|
|
|
if (this._gnuLongLinkPath) {
|
|
this._header.linkname = this._gnuLongLinkPath
|
|
this._gnuLongLinkPath = null
|
|
}
|
|
|
|
if (this._pax) {
|
|
if (this._pax.path) this._header.name = this._pax.path
|
|
if (this._pax.linkpath) this._header.linkname = this._pax.linkpath
|
|
if (this._pax.size) this._header.size = parseInt(this._pax.size, 10)
|
|
this._header.pax = this._pax
|
|
this._pax = null
|
|
}
|
|
}
|
|
|
|
_decodeLongHeader (buf) {
|
|
switch (this._header.type) {
|
|
case 'gnu-long-path':
|
|
this._gnuLongPath = headers.decodeLongPath(buf, this._filenameEncoding)
|
|
break
|
|
case 'gnu-long-link-path':
|
|
this._gnuLongLinkPath = headers.decodeLongPath(buf, this._filenameEncoding)
|
|
break
|
|
case 'pax-global-header':
|
|
this._paxGlobal = headers.decodePax(buf)
|
|
break
|
|
case 'pax-header':
|
|
this._pax = this._paxGlobal === null
|
|
? headers.decodePax(buf)
|
|
: Object.assign({}, this._paxGlobal, headers.decodePax(buf))
|
|
break
|
|
}
|
|
}
|
|
|
|
_consumeLongHeader () {
|
|
this._longHeader = false
|
|
this._missing = overflow(this._header.size)
|
|
|
|
const buf = this._buffer.shift(this._header.size)
|
|
|
|
try {
|
|
this._decodeLongHeader(buf)
|
|
} catch (err) {
|
|
this._continueWrite(err)
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
_consumeStream () {
|
|
const buf = this._buffer.shiftFirst(this._missing)
|
|
if (buf === null) return false
|
|
|
|
this._missing -= buf.byteLength
|
|
const drained = this._stream.push(buf)
|
|
|
|
if (this._missing === 0) {
|
|
this._stream.push(null)
|
|
if (drained) this._stream._detach()
|
|
return drained && this._locked === false
|
|
}
|
|
|
|
return drained
|
|
}
|
|
|
|
_createStream () {
|
|
return new Source(this, this._header, this._offset)
|
|
}
|
|
|
|
_update () {
|
|
while (this._buffer.buffered > 0 && !this.destroying) {
|
|
if (this._missing > 0) {
|
|
if (this._stream !== null) {
|
|
if (this._consumeStream() === false) return
|
|
continue
|
|
}
|
|
|
|
if (this._longHeader === true) {
|
|
if (this._missing > this._buffer.buffered) break
|
|
if (this._consumeLongHeader() === false) return false
|
|
continue
|
|
}
|
|
|
|
const ignore = this._buffer.shiftFirst(this._missing)
|
|
if (ignore !== null) this._missing -= ignore.byteLength
|
|
continue
|
|
}
|
|
|
|
if (this._buffer.buffered < 512) break
|
|
if (this._stream !== null || this._consumeHeader() === false) return
|
|
}
|
|
|
|
this._continueWrite(null)
|
|
}
|
|
|
|
_continueWrite (err) {
|
|
const cb = this._callback
|
|
this._callback = noop
|
|
cb(err)
|
|
}
|
|
|
|
_write (data, cb) {
|
|
this._callback = cb
|
|
this._buffer.push(data)
|
|
this._update()
|
|
}
|
|
|
|
_final (cb) {
|
|
this._finished = this._missing === 0 && this._buffer.buffered === 0
|
|
cb(this._finished ? null : new Error('Unexpected end of data'))
|
|
}
|
|
|
|
_predestroy () {
|
|
this._continueWrite(null)
|
|
}
|
|
|
|
_destroy (cb) {
|
|
if (this._stream) this._stream.destroy(getStreamError(this))
|
|
cb(null)
|
|
}
|
|
|
|
[Symbol.asyncIterator] () {
|
|
let error = null
|
|
|
|
let promiseResolve = null
|
|
let promiseReject = null
|
|
|
|
let entryStream = null
|
|
let entryCallback = null
|
|
|
|
const extract = this
|
|
|
|
this.on('entry', onentry)
|
|
this.on('error', (err) => { error = err })
|
|
this.on('close', onclose)
|
|
|
|
return {
|
|
[Symbol.asyncIterator] () {
|
|
return this
|
|
},
|
|
next () {
|
|
return new Promise(onnext)
|
|
},
|
|
return () {
|
|
return destroy(null)
|
|
},
|
|
throw (err) {
|
|
return destroy(err)
|
|
}
|
|
}
|
|
|
|
function consumeCallback (err) {
|
|
if (!entryCallback) return
|
|
const cb = entryCallback
|
|
entryCallback = null
|
|
cb(err)
|
|
}
|
|
|
|
function onnext (resolve, reject) {
|
|
if (error) {
|
|
return reject(error)
|
|
}
|
|
|
|
if (entryStream) {
|
|
resolve({ value: entryStream, done: false })
|
|
entryStream = null
|
|
return
|
|
}
|
|
|
|
promiseResolve = resolve
|
|
promiseReject = reject
|
|
|
|
consumeCallback(null)
|
|
|
|
if (extract._finished && promiseResolve) {
|
|
promiseResolve({ value: undefined, done: true })
|
|
promiseResolve = promiseReject = null
|
|
}
|
|
}
|
|
|
|
function onentry (header, stream, callback) {
|
|
entryCallback = callback
|
|
stream.on('error', noop) // no way around this due to tick sillyness
|
|
|
|
if (promiseResolve) {
|
|
promiseResolve({ value: stream, done: false })
|
|
promiseResolve = promiseReject = null
|
|
} else {
|
|
entryStream = stream
|
|
}
|
|
}
|
|
|
|
function onclose () {
|
|
consumeCallback(error)
|
|
if (!promiseResolve) return
|
|
if (error) promiseReject(error)
|
|
else promiseResolve({ value: undefined, done: true })
|
|
promiseResolve = promiseReject = null
|
|
}
|
|
|
|
function destroy (err) {
|
|
extract.destroy(err)
|
|
consumeCallback(err)
|
|
return new Promise((resolve, reject) => {
|
|
if (extract.destroyed) return resolve({ value: undefined, done: true })
|
|
extract.once('close', function () {
|
|
if (err) reject(err)
|
|
else resolve({ value: undefined, done: true })
|
|
})
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
module.exports = function extract (opts) {
|
|
return new Extract(opts)
|
|
}
|
|
|
|
function noop () {}
|
|
|
|
function overflow (size) {
|
|
size &= 511
|
|
return size && 512 - size
|
|
}
|