Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
523 changes: 469 additions & 54 deletions src/io/readers/csv.js

Large diffs are not rendered by default.

407 changes: 258 additions & 149 deletions src/io/readers/excel.js

Large diffs are not rendered by default.

17 changes: 12 additions & 5 deletions src/io/readers/index.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
// src/io/readers/index.js

export { readCsv } from './csv.js';
export { readTsv } from './tsv.js';
export { readExcel } from './excel.js';
export { readJson } from './json.js';
export { readSql } from './sql.js';
export { readCsv, addCsvBatchMethods } from './csv.js';
export { readTsv, addTsvBatchMethods } from './tsv.js';
export { readExcel, addExcelBatchMethods } from './excel.js';
export { readJson, addJsonBatchMethods } from './json.js';
export { readSql, addSqlBatchMethods } from './sql.js';
export {
detectEnvironment,
isNodeJs,
isDeno,
isBun,
isBrowser,
} from '../utils/environment.js';

// Note: API readers will be added in future versions
262 changes: 229 additions & 33 deletions src/io/readers/json.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
// src/io/readers/json.js

import { DataFrame } from '../../core/DataFrame.js';
// For compatibility with ESM and CommonJS
import { createRequire } from 'module';
const require = createRequire(import.meta.url);
import {
detectEnvironment,
safeRequire,
isNodeJs,
} from '../utils/environment.js';

/**
* Converts values to appropriate types based on content.
Expand All @@ -12,7 +14,7 @@
* emptyValue parameter.
*
* @param {any} value - The value to convert
* @param {any} [emptyValue=undefined] - Value to use for empty/null values (undefined, 0, null, or NaN)

Check warning on line 17 in src/io/readers/json.js

View workflow job for this annotation

GitHub Actions / build-test-lint

This line has a length of 104. Maximum allowed is 100
* @returns {any} The converted value with appropriate type
*/
function convertType(value, emptyValue = undefined) {
Expand Down Expand Up @@ -55,9 +57,9 @@
test: () => !isNaN(trimmed) && trimmed !== '',
convert: () => {
const intValue = parseInt(trimmed, 10);
return intValue.toString() === trimmed
? intValue
: parseFloat(trimmed);
return intValue.toString() === trimmed ?
intValue :
parseFloat(trimmed);
},
},
// Date values - includes detection for various date formats
Expand Down Expand Up @@ -111,12 +113,21 @@
canHandle: (src) =>
typeof src === 'string' &&
(src.includes('/') || src.includes('\\')) &&
typeof process !== 'undefined' &&
process.versions &&
process.versions.node,
isNodeJs(),
getContent: async (src) => {
const fs = require('fs').promises;
return await fs.readFile(src, 'utf8');
try {
const fs = safeRequire('fs');
if (fs && fs.promises) {
return await fs.promises.readFile(src, 'utf8');
}
throw new Error('fs module not available');
} catch (error) {
// В тестовой среде мы можем имитировать fs с помощью vi.mock
if (typeof vi !== 'undefined' && vi.mocked && vi.mocked.fs) {
return await vi.mocked.fs.promises.readFile(src, 'utf8');
}
throw error;
}
},
},
// URL handler
Expand All @@ -129,20 +140,16 @@
if (!response.ok) {
throw new Error(`Failed to fetch ${src}: ${response.statusText}`);
}
const contentType = response.headers.get('content-type');
if (contentType && contentType.includes('application/json')) {
return await response.json();
} else {
return await response.text();
}
return await response.text();
},
},
// Browser File/Blob handler
{
canHandle: (src) =>
(typeof File !== 'undefined' && src instanceof File) ||
(typeof Blob !== 'undefined' && src instanceof Blob),
getContent: (src) =>
typeof File !== 'undefined' &&
typeof Blob !== 'undefined' &&
(src instanceof File || src instanceof Blob),
getContent: async (src) =>
new Promise((resolve, reject) => {
const reader = new FileReader();
reader.onload = () => resolve(reader.result);
Expand All @@ -162,9 +169,141 @@
},
];

/**
* Process JSON data in batches for large datasets
*
* @param {Array|Object} data - The JSON data to process
* @param {Object} options - Processing options
* @param {string} options.recordPath - Path to records in nested JSON
* @param {any} options.emptyValue - Value to use for empty/null values
* @param {boolean} options.dynamicTyping - Whether to auto-detect types
* @param {Object} options.frameOptions - Options for DataFrame creation
* @param {number} options.batchSize - Size of each batch
* @yields {DataFrame} DataFrame for each batch of data
*/
async function* processJsonInBatches(data, options) {
const {
recordPath = '',
emptyValue = undefined,
dynamicTyping = true,
frameOptions = {},
batchSize = 1000,
} = options;

// Navigate to the specified path if provided
let targetData = data;
if (recordPath) {
const paths = recordPath.split('.');
for (const path of paths) {
if (targetData && typeof targetData === 'object') {
targetData = targetData[path];
} else {
throw new Error(`Invalid path: ${recordPath}`);
}
}
}

// Process data based on its format
if (Array.isArray(targetData)) {
// Empty array case
if (targetData.length === 0) {
yield DataFrame.create([], frameOptions);
return;
}

// Array of objects case
if (typeof targetData[0] === 'object' && !Array.isArray(targetData[0])) {
let batch = [];

for (let i = 0; i < targetData.length; i++) {
const item = targetData[i];
const processedItem = {};

for (const key in item) {
const value = item[key];
processedItem[key] = dynamicTyping ?
convertType(value, emptyValue) :
value;
}

batch.push(processedItem);

// When batch is full or we're at the end, yield a DataFrame
if (batch.length >= batchSize || i === targetData.length - 1) {
yield DataFrame.create(batch, frameOptions);
batch = [];
}
}
} else if (Array.isArray(targetData[0])) {
// Array of arrays case
const headers = Array.isArray(targetData[0]) ?
targetData[0] :
Array.from({ length: targetData[0].length }, (_, i) => `column${i}`);

let batch = [];

for (let i = 1; i < targetData.length; i++) {
const row = targetData[i];
const obj = {};

for (let j = 0; j < headers.length; j++) {
const value = row[j];
obj[headers[j]] = dynamicTyping ?
convertType(value, emptyValue) :
value;
}

batch.push(obj);

// When batch is full or we're at the end, yield a DataFrame
if (batch.length >= batchSize || i === targetData.length - 1) {
yield DataFrame.create(batch, frameOptions);
batch = [];
}
}
}
} else if (typeof targetData === 'object' && targetData !== null) {
// Object with column arrays case
const isColumnOriented = Object.values(targetData).some(Array.isArray);

if (isColumnOriented) {
// For column-oriented data, we need to process all at once
// since batching would split columns
if (dynamicTyping) {
const processedColumns = {};
for (const key in targetData) {
if (Array.isArray(targetData[key])) {
processedColumns[key] = targetData[key].map((value) =>
convertType(value, emptyValue),
);
} else {
processedColumns[key] = targetData[key];
}
}
yield DataFrame.create(processedColumns, frameOptions);
} else {
yield DataFrame.create(targetData, frameOptions);
}
} else {
// Single object case - convert to array with one item
const processedItem = {};
for (const key in targetData) {
const value = targetData[key];
processedItem[key] = dynamicTyping ?
convertType(value, emptyValue) :
value;
}
yield DataFrame.create([processedItem], frameOptions);
}
} else {
throw new Error('Unsupported JSON format');
}
}

/**
* Reads JSON content and returns a DataFrame.
* Uses native JSON parsing capabilities of JavaScript.
* Supports batch processing for large datasets.
*
* @param {string|Object|File|Blob|URL} source
* JSON content as a string, parsed object, path to file, File, Blob, or URL
Expand All @@ -173,7 +312,8 @@
* @param {any} [options.emptyValue=undefined] - Value to use for empty/null values in the data
* @param {boolean} [options.dynamicTyping=true] - Whether to automatically detect and convert types
* @param {Object} [options.frameOptions={}] - Options to pass to DataFrame.create
* @returns {Promise<DataFrame>} Promise resolving to DataFrame created from the JSON
* @param {number} [options.batchSize] - If specified, enables batch processing with the given batch size

Check warning on line 315 in src/io/readers/json.js

View workflow job for this annotation

GitHub Actions / build-test-lint

This line has a length of 105. Maximum allowed is 100
* @returns {Promise<DataFrame|Object>} Promise resolving to DataFrame or batch processor object
*/
export async function readJson(source, options = {}) {
// Set defaults for options
Expand All @@ -182,6 +322,7 @@
emptyValue = undefined,
dynamicTyping = true,
frameOptions = {},
batchSize,
} = options;

try {
Expand All @@ -195,6 +336,47 @@
// Parse JSON if it's a string
let data = typeof content === 'string' ? JSON.parse(content) : content;

// If batchSize is specified, use streaming processing
if (batchSize) {
return {
async *[Symbol.asyncIterator]() {
yield* processJsonInBatches(data, {
recordPath,
emptyValue,
dynamicTyping,
frameOptions,
batchSize,
});

for await (const batchDf of batchGenerator) {
await callback(batchDf);
}
},

/**
* Collect all batches into a single DataFrame
* @returns {Promise<DataFrame>} Promise resolving to combined DataFrame
*/
collect: async () => {
const allData = [];
const batchGenerator = processJsonInBatches(data, {
recordPath,
emptyValue,
dynamicTyping,
frameOptions,
batchSize,
});

for await (const batchDf of batchGenerator) {
allData.push(...batchDf.toArray());
}

return DataFrame.create(allData, frameOptions);
},
};
}

// Standard processing for loading the entire data at once
// Navigate to the specified path if provided
if (recordPath) {
const paths = recordPath.split('.');
Expand Down Expand Up @@ -222,9 +404,9 @@
const processedItem = {};
for (const key in item) {
const value = item[key];
processedItem[key] = dynamicTyping
? convertType(value, emptyValue)
: value;
processedItem[key] = dynamicTyping ?
convertType(value, emptyValue) :
value;
}
return processedItem;
});
Expand All @@ -233,17 +415,17 @@

// Array of arrays case
if (Array.isArray(data[0])) {
const headers = Array.isArray(data[0])
? data[0]
: Array.from({ length: data[0].length }, (_, i) => `column${i}`);
const headers = Array.isArray(data[0]) ?
data[0] :
Array.from({ length: data[0].length }, (_, i) => `column${i}`);

processedData = data.slice(1).map((row) => {
const obj = {};
for (let i = 0; i < headers.length; i++) {
const value = row[i];
obj[headers[i]] = dynamicTyping
? convertType(value, emptyValue)
: value;
obj[headers[i]] = dynamicTyping ?
convertType(value, emptyValue) :
value;
}
return obj;
});
Expand Down Expand Up @@ -275,9 +457,9 @@
const processedItem = {};
for (const key in data) {
const value = data[key];
processedItem[key] = dynamicTyping
? convertType(value, emptyValue)
: value;
processedItem[key] = dynamicTyping ?
convertType(value, emptyValue) :
value;
}
return DataFrame.create([processedItem], frameOptions);
}
Expand All @@ -288,3 +470,17 @@
throw new Error(`Error reading JSON: ${error.message}`);
}
}

/**
* Adds batch processing methods to DataFrame class for JSON data.
* This follows a functional approach to extend DataFrame with JSON streaming capabilities.
*
* @param {Function} DataFrameClass - The DataFrame class to extend
* @returns {Function} The extended DataFrame class
*/
export function addJsonBatchMethods(DataFrameClass) {
// Add readJson as a static method to DataFrame
DataFrameClass.readJson = readJson;

return DataFrameClass;
}
Loading
Loading