metaforecast/node_modules/json2csv/lib/JSON2CSVTransform.js

204 lines
5.4 KiB
JavaScript
Raw Normal View History

2021-01-12 12:43:41 +00:00
'use strict';
const { Transform } = require('stream');
const Parser = require('jsonparse');
const JSON2CSVBase = require('./JSON2CSVBase');
class JSON2CSVTransform extends Transform {
constructor(opts, transformOpts) {
super(transformOpts);
// Inherit methods from JSON2CSVBase since extends doesn't
// allow multiple inheritance and manually preprocess opts
Object.getOwnPropertyNames(JSON2CSVBase.prototype)
.forEach(key => (this[key] = JSON2CSVBase.prototype[key]));
this.opts = this.preprocessOpts(opts);
this._data = '';
this._hasWritten = false;
if (this._readableState.objectMode) {
this.initObjectModeParse();
} else if (this.opts.ndjson) {
this.initNDJSONParse();
} else {
this.initJSONParser();
}
if (this.opts.withBOM) {
this.push('\ufeff');
}
if (this.opts.fields) {
this.opts.fields = this.preprocessFieldsInfo(this.opts.fields);
this.pushHeader();
}
}
/**
* Init the transform with a parser to process data in object mode.
* It receives JSON objects one by one and send them to `pushLine for processing.
*/
initObjectModeParse() {
const transform = this;
this.parser = {
write(line) {
transform.pushLine(line);
},
getPendingData() {
return undefined;
}
};
}
/**
* Init the transform with a parser to process NDJSON data.
* It maintains a buffer of received data, parses each line
* as JSON and send it to `pushLine for processing.
*/
initNDJSONParse() {
const transform = this;
this.parser = {
_data: '',
write(chunk) {
this._data += chunk.toString();
const lines = this._data
.split('\n')
.map(line => line.trim())
.filter(line => line !== '');
let pendingData = false;
lines
.forEach((line, i) => {
try {
transform.pushLine(JSON.parse(line));
} catch(e) {
if (i === lines.length - 1) {
pendingData = true;
} else {
e.message = `Invalid JSON (${line})`
transform.emit('error', e);
}
}
});
this._data = pendingData
? this._data.slice(this._data.lastIndexOf('\n'))
: '';
},
getPendingData() {
return this._data;
}
};
}
/**
* Init the transform with a parser to process JSON data.
* It maintains a buffer of received data, parses each as JSON
* item if the data is an array or the data itself otherwise
* and send it to `pushLine` for processing.
*/
initJSONParser() {
const transform = this;
this.parser = new Parser();
this.parser.onValue = function (value) {
if (this.stack.length !== this.depthToEmit) return;
transform.pushLine(value);
}
this.parser._onToken = this.parser.onToken;
this.parser.onToken = function (token, value) {
transform.parser._onToken(token, value);
if (this.stack.length === 0
&& !transform.opts.fields
&& this.mode !== Parser.C.ARRAY
&& this.mode !== Parser.C.OBJECT) {
this.onError(new Error('Data should not be empty or the "fields" option should be included'));
}
if (this.stack.length === 1) {
if(this.depthToEmit === undefined) {
// If Array emit its content, else emit itself
this.depthToEmit = (this.mode === Parser.C.ARRAY) ? 1 : 0;
}
if (this.depthToEmit !== 0 && this.stack.length === 1) {
// No need to store the whole root array in memory
this.value = undefined;
}
}
}
this.parser.getPendingData = function () {
return this.value;
}
this.parser.onError = function (err) {
if(err.message.includes('Unexpected')) {
err.message = `Invalid JSON (${err.message})`;
}
transform.emit('error', err);
}
}
/**
* Main function that send data to the parse to be processed.
*
* @param {Buffer} chunk Incoming data
* @param {String} encoding Encoding of the incoming data. Defaults to 'utf8'
* @param {Function} done Called when the proceesing of the supplied chunk is done
*/
_transform(chunk, encoding, done) {
this.parser.write(chunk);
done();
}
_flush(done) {
if (this.parser.getPendingData()) {
done(new Error('Invalid data received from stdin', this.parser.getPendingData()));
}
done();
}
/**
* Generate the csv header and pushes it downstream.
*/
pushHeader() {
if (this.opts.header) {
const header = this.getHeader();
this.emit('header', header);
this.push(header);
this._hasWritten = true;
}
}
/**
* Transforms an incoming json data to csv and pushes it downstream.
*
* @param {Object} data JSON object to be converted in a CSV row
*/
pushLine(data) {
const processedData = this.preprocessRow(data);
if (!this._hasWritten) {
this.opts.fields = this.opts.fields || this.preprocessFieldsInfo(Object.keys(processedData[0]));
this.pushHeader();
}
processedData.forEach(row => {
const line = this.processRow(row, this.opts);
if (line === undefined) return;
this.emit('line', line);
this.push(this._hasWritten ? this.opts.eol + line : line);
this._hasWritten = true;
});
}
}
module.exports = JSON2CSVTransform;