python 读取大文件优化示例

发布日期:2025-10-04 06:34:05 分类:365bet是什么公司 浏览:5285

核心方法

逐行读取 - 最常用,内存占用O(1)分块读取 - 适合超大文件,可控制内存使用内存映射 - 高性能,虚拟内存映射缓冲读取 - 平衡性能和内存

特殊场景处理

CSV文件 - 使用pandas的chunksize参数JSON Lines - 逐行解析JSON对象文本分析 - 内存高效的单词计数示例

关键优化技巧

使用生成器 - 避免一次性加载所有数据到内存合理设置块大小 - 平衡内存使用和IO效率进度监控 - 实时显示处理进度错误处理 - 处理编码错误、文件不存在等异常

使用建议

小于100MB: 直接读取到内存100MB-1GB: 使用逐行读取或小块读取大于1GB: 使用内存映射或大块分批处理结构化数据: 使用pandas的chunksize参数

这些方法可以处理几GB甚至几十GB的文件而不会导致内存溢出。根据您的具体需求选择最适合的方法即可。

示例代码:

#!/usr/bin/env python3

# -*- coding: utf-8 -*-

"""

大文件读取示例 - 避免内存溢出的多种方法

"""

import os

import sys

import mmap

import csv

import json

from typing import Generator, Iterator

import pandas as pd

class LargeFileReader:

"""大文件读取工具类"""

def __init__(self, file_path: str, encoding: str = 'utf-8'):

self.file_path = file_path

self.encoding = encoding

def get_file_size(self) -> int:

"""获取文件大小(字节)"""

return os.path.getsize(self.file_path)

def get_file_size_mb(self) -> float:

"""获取文件大小(MB)"""

return self.get_file_size() / (1024 * 1024)

def method1_line_by_line(file_path: str, encoding: str = 'utf-8') -> Generator[str, None, None]:

"""

方法1: 逐行读取 - 最常用的方法

内存使用量: O(1) - 每次只加载一行

适用场景: 文本文件、日志文件

"""

try:

with open(file_path, 'r', encoding=encoding) as file:

for line_num, line in enumerate(file, 1):

# 处理每一行

yield line.strip() # 去除行尾换行符

# 可选:显示进度

if line_num % 10000 == 0:

print(f"已处理 {line_num} 行")

except FileNotFoundError:

print(f"文件未找到: {file_path}")

except UnicodeDecodeError as e:

print(f"编码错误: {e}")

def method2_chunk_reading(file_path: str, chunk_size: int = 1024*1024, encoding: str = 'utf-8') -> Generator[str, None, None]:

"""

方法2: 按块读取 - 适合处理二进制文件或超大文本文件

内存使用量: O(chunk_size) - 每次加载指定大小的块

适用场景: 二进制文件、超大文本文件

"""

try:

with open(file_path, 'r', encoding=encoding) as file:

while True:

chunk = file.read(chunk_size)

if not chunk:

break

yield chunk

except FileNotFoundError:

print(f"文件未找到: {file_path}")

except UnicodeDecodeError as e:

print(f"编码错误: {e}")

def method3_mmap_reading(file_path: str, encoding: str = 'utf-8') -> Iterator[str]:

"""

方法3: 内存映射文件 - 高性能读取

内存使用量: 虚拟内存映射,物理内存按需加载

适用场景: 需要随机访问的大文件、高性能要求

"""

try:

with open(file_path, 'r', encoding=encoding) as file:

with mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_READ) as mmapped_file:

# 逐行读取

for line in iter(mmapped_file.readline, b""):

yield line.decode(encoding).strip()

except FileNotFoundError:

print(f"文件未找到: {file_path}")

except Exception as e:

print(f"内存映射错误: {e}")

def method4_buffered_reading(file_path: str, buffer_size: int = 8192, encoding: str = 'utf-8') -> Generator[str, None, None]:

"""

方法4: 缓冲区读取 - 平衡性能和内存使用

内存使用量: O(buffer_size)

适用场景: 需要自定义缓冲区大小的场景

"""

try:

with open(file_path, 'r', encoding=encoding, buffering=buffer_size) as file:

for line in file:

yield line.strip()

except FileNotFoundError:

print(f"文件未找到: {file_path}")

except UnicodeDecodeError as e:

print(f"编码错误: {e}")

def process_large_csv(file_path: str, chunk_size: int = 10000) -> None:

"""

处理大型CSV文件的示例

使用pandas的chunksize参数分块读取

"""

print(f"开始处理CSV文件: {file_path}")

try:

# 分块读取CSV文件

chunk_iter = pd.read_csv(file_path, chunksize=chunk_size)

total_rows = 0

for chunk_num, chunk in enumerate(chunk_iter, 1):

# 处理当前块

print(f"处理第 {chunk_num} 块,包含 {len(chunk)} 行")

# 示例处理:统计每列的基本信息

print(f"列名: {list(chunk.columns)}")

print(f"数据类型: {chunk.dtypes.to_dict()}")

# 这里可以添加你的数据处理逻辑

# 例如:数据清洗、计算、转换等

total_rows += len(chunk)

# 可选:限制处理的块数量(用于测试)

if chunk_num >= 5: # 只处理前5块

break

print(f"总共处理了 {total_rows} 行数据")

except FileNotFoundError:

print(f"CSV文件未找到: {file_path}")

except pd.errors.EmptyDataError:

print("CSV文件为空")

except Exception as e:

print(f"处理CSV文件时出错: {e}")

def process_large_json_lines(file_path: str) -> None:

"""

处理大型JSON Lines文件 (.jsonl)

每行是一个独立的JSON对象

"""

print(f"开始处理JSON Lines文件: {file_path}")

try:

with open(file_path, 'r', encoding='utf-8') as file:

for line_num, line in enumerate(file, 1):

line = line.strip()

if not line:

continue

try:

# 解析JSON对象

json_obj = json.loads(line)

# 处理JSON对象

# 这里添加你的处理逻辑

print(f"第 {line_num} 行: {type(json_obj)} - {len(str(json_obj))} 字符")

# 示例:提取特定字段

if isinstance(json_obj, dict):

keys = list(json_obj.keys())[:5] # 只显示前5个键

print(f" 键: {keys}")

except json.JSONDecodeError as e:

print(f"第 {line_num} 行JSON解析错误: {e}")

continue

# 显示进度

if line_num % 1000 == 0:

print(f"已处理 {line_num} 行")

# 可选:限制处理行数(用于测试)

if line_num >= 10000:

break

except FileNotFoundError:

print(f"JSON Lines文件未找到: {file_path}")

def process_with_progress_callback(file_path: str, callback_interval: int = 10000) -> None:

"""

带进度回调的文件处理示例

"""

reader = LargeFileReader(file_path)

file_size_mb = reader.get_file_size_mb()

print(f"文件大小: {file_size_mb:.2f} MB")

print("开始处理文件...")

processed_lines = 0

for line in method1_line_by_line(file_path):

# 处理每一行

# 这里添加你的处理逻辑

line_length = len(line)

processed_lines += 1

# 进度回调

if processed_lines % callback_interval == 0:

print(f"已处理 {processed_lines:,} 行")

# 可选:限制处理行数(用于测试)

if processed_lines >= 50000:

print("达到处理限制,停止处理")

break

print(f"处理完成,总共处理了 {processed_lines:,} 行")

def memory_efficient_word_count(file_path: str) -> dict:

"""

内存高效的单词计数示例

适用于超大文本文件

"""

word_count = {}

print("开始统计单词频率...")

for line_num, line in enumerate(method1_line_by_line(file_path), 1):

# 简单的单词分割(可以根据需要改进)

words = line.lower().split()

for word in words:

# 清理单词(去除标点符号等)

clean_word = ''.join(c for c in word if c.isalnum())

if clean_word:

word_count[clean_word] = word_count.get(clean_word, 0) + 1

# 显示进度

if line_num % 10000 == 0:

print(f"已处理 {line_num} 行,当前词汇量: {len(word_count)}")

print(f"统计完成,总词汇量: {len(word_count)}")

# 返回前10个最常用的单词

sorted_words = sorted(word_count.items(), key=lambda x: x[1], reverse=True)

return dict(sorted_words[:10])

def main():

"""主函数 - 演示各种大文件读取方法"""

# 注意:请替换为你的实际文件路径

test_file = "large_file.txt" # 替换为实际的大文件路径

csv_file = "large_data.csv" # 替换为实际的CSV文件路径

json_file = "large_data.jsonl" # 替换为实际的JSON Lines文件路径

print("=== 大文件读取示例 ===\n")

# 检查文件是否存在

if not os.path.exists(test_file):

print(f"测试文件 {test_file} 不存在")

print("请创建一个测试文件或修改文件路径")

return

# 创建文件读取器

reader = LargeFileReader(test_file)

print(f"文件路径: {test_file}")

print(f"文件大小: {reader.get_file_size_mb():.2f} MB\n")

# 示例1: 逐行读取(推荐用于大多数文本文件)

print("=== 方法1: 逐行读取 ===")

line_count = 0

for line in method1_line_by_line(test_file):

line_count += 1

if line_count <= 5: # 只显示前5行

print(f"第{line_count}行: {line[:50]}...")

if line_count >= 10000: # 限制处理行数

break

print(f"处理了 {line_count} 行\n")

# 示例2: 块读取

print("=== 方法2: 块读取 ===")

chunk_count = 0

for chunk in method2_chunk_reading(test_file, chunk_size=1024):

chunk_count += 1

if chunk_count <= 3: # 只显示前3块

print(f"块{chunk_count}: {len(chunk)} 字符")

if chunk_count >= 10: # 限制处理块数

break

print(f"处理了 {chunk_count} 个块\n")

# 示例3: 内存映射

print("=== 方法3: 内存映射 ===")

mmap_count = 0

try:

for line in method3_mmap_reading(test_file):

mmap_count += 1

if mmap_count >= 10000: # 限制处理行数

break

print(f"使用内存映射处理了 {mmap_count} 行\n")

except Exception as e:

print(f"内存映射失败: {e}\n")

# 示例4: 带进度的处理

print("=== 方法4: 带进度回调的处理 ===")

process_with_progress_callback(test_file, callback_interval=5000)

print()

# 示例5: CSV文件处理

if os.path.exists(csv_file):

print("=== CSV文件处理 ===")

process_large_csv(csv_file, chunk_size=1000)

print()

# 示例6: JSON Lines文件处理

if os.path.exists(json_file):

print("=== JSON Lines文件处理 ===")

process_large_json_lines(json_file)

print()

# 示例7: 单词计数

print("=== 内存高效单词计数 ===")

try:

top_words = memory_efficient_word_count(test_file)

print("前10个最常用单词:")

for word, count in top_words.items():

print(f" {word}: {count}")

except Exception as e:

print(f"单词统计失败: {e}")

if __name__ == "__main__":

main()

# 性能优化建议:

"""

1. 选择合适的方法:

- 逐行读取: 适用于大多数文本文件

- 块读取: 适用于二进制文件或需要自定义处理块的场景

- 内存映射: 适用于需要随机访问或高性能要求的场景

- pandas分块: 适用于结构化数据(CSV)

2. 内存优化:

- 及时释放不需要的变量

- 使用生成器而不是列表

- 避免一次性加载整个文件到内存

3. 性能优化:

- 合理设置缓冲区大小

- 使用适当的编码

- 考虑使用多进程/多线程处理

4. 错误处理:

- 处理文件不存在的情况

- 处理编码错误

- 处理磁盘空间不足等IO错误

"""

Excel大文件读取方法

1. pandas分块读取 (.xlsx, .xls)

适合中等大小的Excel文件可以处理多个工作表支持数据类型自动识别

2. openpyxl逐行读取 (.xlsx) - 推荐

内存效率最高的方法使用read_only=True模式真正的逐行处理,内存占用O(1)适合处理超大Excel文件

3. xlrd处理 (.xls)

专门处理旧版Excel格式分块读取支持适合Legacy Excel文件

4. pyxlsb处理 (.xlsb)

处理Excel二进制格式读取速度快,文件小需要额外安装pyxlsb库

新增功能特点

✅ 智能文件信息获取 - 不加载全部数据就能获取文件结构信息

✅ 内存使用对比 - 实时监控不同方法的内存消耗

✅ 批量数据处理 - 支持批次处理和进度监控

✅ 多格式支持 - 支持.xlsx、.xls、.xlsb三种格式

✅ 错误处理 - 完善的异常处理机制

安装依赖

bash

pip install pandas openpyxl xlrd pyxlsb psutil

使用建议

小于50MB: 可以使用pandas直接读取50MB-500MB: 使用pandas分块读取大于500MB: 推荐使用openpyxl逐行读取超大文件: 考虑转换为CSV或Parquet格式

这套方案可以处理几GB甚至更大的Excel文件而不会内存溢出,特别是openpyxl的逐行读取方法,是处理超大Excel文件的最佳选择!

#!/usr/bin/env python3

# -*- coding: utf-8 -*-

"""

大文件读取示例 - 避免内存溢出的多种方法

"""

import os

import sys

import mmap

import csv

import json

from typing import Generator, Iterator

import pandas as pd

class LargeFileReader:

"""大文件读取工具类"""

def __init__(self, file_path: str, encoding: str = 'utf-8'):

self.file_path = file_path

self.encoding = encoding

def get_file_size(self) -> int:

"""获取文件大小(字节)"""

return os.path.getsize(self.file_path)

def get_file_size_mb(self) -> float:

"""获取文件大小(MB)"""

return self.get_file_size() / (1024 * 1024)

def method1_line_by_line(file_path: str, encoding: str = 'utf-8') -> Generator[str, None, None]:

"""

方法1: 逐行读取 - 最常用的方法

内存使用量: O(1) - 每次只加载一行

适用场景: 文本文件、日志文件

"""

try:

with open(file_path, 'r', encoding=encoding) as file:

for line_num, line in enumerate(file, 1):

# 处理每一行

yield line.strip() # 去除行尾换行符

# 可选:显示进度

if line_num % 10000 == 0:

print(f"已处理 {line_num} 行")

except FileNotFoundError:

print(f"文件未找到: {file_path}")

except UnicodeDecodeError as e:

print(f"编码错误: {e}")

def method2_chunk_reading(file_path: str, chunk_size: int = 1024*1024, encoding: str = 'utf-8') -> Generator[str, None, None]:

"""

方法2: 按块读取 - 适合处理二进制文件或超大文本文件

内存使用量: O(chunk_size) - 每次加载指定大小的块

适用场景: 二进制文件、超大文本文件

"""

try:

with open(file_path, 'r', encoding=encoding) as file:

while True:

chunk = file.read(chunk_size)

if not chunk:

break

yield chunk

except FileNotFoundError:

print(f"文件未找到: {file_path}")

except UnicodeDecodeError as e:

print(f"编码错误: {e}")

def method3_mmap_reading(file_path: str, encoding: str = 'utf-8') -> Iterator[str]:

"""

方法3: 内存映射文件 - 高性能读取

内存使用量: 虚拟内存映射,物理内存按需加载

适用场景: 需要随机访问的大文件、高性能要求

"""

try:

with open(file_path, 'r', encoding=encoding) as file:

with mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_READ) as mmapped_file:

# 逐行读取

for line in iter(mmapped_file.readline, b""):

yield line.decode(encoding).strip()

except FileNotFoundError:

print(f"文件未找到: {file_path}")

except Exception as e:

print(f"内存映射错误: {e}")

def method4_buffered_reading(file_path: str, buffer_size: int = 8192, encoding: str = 'utf-8') -> Generator[str, None, None]:

"""

方法4: 缓冲区读取 - 平衡性能和内存使用

内存使用量: O(buffer_size)

适用场景: 需要自定义缓冲区大小的场景

"""

try:

with open(file_path, 'r', encoding=encoding, buffering=buffer_size) as file:

for line in file:

yield line.strip()

except FileNotFoundError:

print(f"文件未找到: {file_path}")

except UnicodeDecodeError as e:

print(f"编码错误: {e}")

def process_large_csv(file_path: str, chunk_size: int = 10000) -> None:

"""

处理大型CSV文件的示例

使用pandas的chunksize参数分块读取

"""

print(f"开始处理CSV文件: {file_path}")

try:

# 分块读取CSV文件

chunk_iter = pd.read_csv(file_path, chunksize=chunk_size)

total_rows = 0

for chunk_num, chunk in enumerate(chunk_iter, 1):

# 处理当前块

print(f"处理第 {chunk_num} 块,包含 {len(chunk)} 行")

# 示例处理:统计每列的基本信息

print(f"列名: {list(chunk.columns)}")

print(f"数据类型: {chunk.dtypes.to_dict()}")

# 这里可以添加你的数据处理逻辑

# 例如:数据清洗、计算、转换等

total_rows += len(chunk)

# 可选:限制处理的块数量(用于测试)

if chunk_num >= 5: # 只处理前5块

break

print(f"总共处理了 {total_rows} 行数据")

except FileNotFoundError:

print(f"CSV文件未找到: {file_path}")

except pd.errors.EmptyDataError:

print("CSV文件为空")

except Exception as e:

print(f"处理CSV文件时出错: {e}")

def process_large_json_lines(file_path: str) -> None:

"""

处理大型JSON Lines文件 (.jsonl)

每行是一个独立的JSON对象

"""

print(f"开始处理JSON Lines文件: {file_path}")

try:

with open(file_path, 'r', encoding='utf-8') as file:

for line_num, line in enumerate(file, 1):

line = line.strip()

if not line:

continue

try:

# 解析JSON对象

json_obj = json.loads(line)

# 处理JSON对象

# 这里添加你的处理逻辑

print(f"第 {line_num} 行: {type(json_obj)} - {len(str(json_obj))} 字符")

# 示例:提取特定字段

if isinstance(json_obj, dict):

keys = list(json_obj.keys())[:5] # 只显示前5个键

print(f" 键: {keys}")

except json.JSONDecodeError as e:

print(f"第 {line_num} 行JSON解析错误: {e}")

continue

# 显示进度

if line_num % 1000 == 0:

print(f"已处理 {line_num} 行")

# 可选:限制处理行数(用于测试)

if line_num >= 10000:

break

except FileNotFoundError:

print(f"JSON Lines文件未找到: {file_path}")

def process_with_progress_callback(file_path: str, callback_interval: int = 10000) -> None:

"""

带进度回调的文件处理示例

"""

reader = LargeFileReader(file_path)

file_size_mb = reader.get_file_size_mb()

print(f"文件大小: {file_size_mb:.2f} MB")

print("开始处理文件...")

processed_lines = 0

for line in method1_line_by_line(file_path):

# 处理每一行

# 这里添加你的处理逻辑

line_length = len(line)

processed_lines += 1

# 进度回调

if processed_lines % callback_interval == 0:

print(f"已处理 {processed_lines:,} 行")

# 可选:限制处理行数(用于测试)

if processed_lines >= 50000:

print("达到处理限制,停止处理")

break

print(f"处理完成,总共处理了 {processed_lines:,} 行")

def memory_efficient_word_count(file_path: str) -> dict:

"""

内存高效的单词计数示例

适用于超大文本文件

"""

word_count = {}

print("开始统计单词频率...")

for line_num, line in enumerate(method1_line_by_line(file_path), 1):

# 简单的单词分割(可以根据需要改进)

words = line.lower().split()

for word in words:

# 清理单词(去除标点符号等)

clean_word = ''.join(c for c in word if c.isalnum())

if clean_word:

word_count[clean_word] = word_count.get(clean_word, 0) + 1

# 显示进度

if line_num % 10000 == 0:

print(f"已处理 {line_num} 行,当前词汇量: {len(word_count)}")

print(f"统计完成,总词汇量: {len(word_count)}")

# 返回前10个最常用的单词

sorted_words = sorted(word_count.items(), key=lambda x: x[1], reverse=True)

return dict(sorted_words[:10])

def main():

"""主函数 - 演示各种大文件读取方法"""

# 注意:请替换为你的实际文件路径

test_file = "large_file.txt" # 替换为实际的大文件路径

csv_file = "large_data.csv" # 替换为实际的CSV文件路径

json_file = "large_data.jsonl" # 替换为实际的JSON Lines文件路径

print("=== 大文件读取示例 ===\n")

# 检查文件是否存在

if not os.path.exists(test_file):

print(f"测试文件 {test_file} 不存在")

print("请创建一个测试文件或修改文件路径")

return

# 创建文件读取器

reader = LargeFileReader(test_file)

print(f"文件路径: {test_file}")

print(f"文件大小: {reader.get_file_size_mb():.2f} MB\n")

# 示例1: 逐行读取(推荐用于大多数文本文件)

print("=== 方法1: 逐行读取 ===")

line_count = 0

for line in method1_line_by_line(test_file):

line_count += 1

if line_count <= 5: # 只显示前5行

print(f"第{line_count}行: {line[:50]}...")

if line_count >= 10000: # 限制处理行数

break

print(f"处理了 {line_count} 行\n")

# 示例2: 块读取

print("=== 方法2: 块读取 ===")

chunk_count = 0

for chunk in method2_chunk_reading(test_file, chunk_size=1024):

chunk_count += 1

if chunk_count <= 3: # 只显示前3块

print(f"块{chunk_count}: {len(chunk)} 字符")

if chunk_count >= 10: # 限制处理块数

break

print(f"处理了 {chunk_count} 个块\n")

# 示例3: 内存映射

print("=== 方法3: 内存映射 ===")

mmap_count = 0

try:

for line in method3_mmap_reading(test_file):

mmap_count += 1

if mmap_count >= 10000: # 限制处理行数

break

print(f"使用内存映射处理了 {mmap_count} 行\n")

except Exception as e:

print(f"内存映射失败: {e}\n")

# 示例4: 带进度的处理

print("=== 方法4: 带进度回调的处理 ===")

process_with_progress_callback(test_file, callback_interval=5000)

print()

# 示例5: CSV文件处理

if os.path.exists(csv_file):

print("=== CSV文件处理 ===")

process_large_csv(csv_file, chunk_size=1000)

print()

# 示例6: JSON Lines文件处理

if os.path.exists(json_file):

print("=== JSON Lines文件处理 ===")

process_large_json_lines(json_file)

print()

# 示例7: 单词计数

print("=== 内存高效单词计数 ===")

try:

top_words = memory_efficient_word_count(test_file)

print("前10个最常用单词:")

for word, count in top_words.items():

print(f" {word}: {count}")

except Exception as e:

print(f"单词统计失败: {e}")

if __name__ == "__main__":

main()

# 性能优化建议:

"""

Excel文件大文件读取最佳实践:

1. 选择合适的库和方法:

- openpyxl (read_only=True): 最佳内存效率,支持.xlsx

- pandas + chunksize: 易用但内存使用较高

- xlrd: 适用于.xls文件

- pyxlsb: 适用于.xlsb文件

2. Excel特有优化:

- 使用read_only=True模式

- 设置data_only=True跳过公式计算

- 逐行读取而不是一次性加载整个工作表

- 按批次处理数据

3. 内存优化策略:

- 使用生成器和迭代器

- 及时释放DataFrame和工作簿对象

- 避免同时打开多个工作簿

- 考虑将数据转换为更高效的格式(如Parquet)

4. 性能建议:

- 对于超大Excel文件,考虑先转换为CSV格式

- 使用多进程处理多个工作表

- 设置合适的批次大小(1000-10000行)

- 在SSD上处理文件以提高IO速度

5. 文件格式选择:

- .xlsx: 现代格式,压缩率高,但读取较慢

- .xls: 旧格式,读取快但文件大

- .xlsb: 二进制格式,读取快,文件小

- .csv: 最快的读取速度,但丢失Excel特性

6. 错误处理:

- 处理损坏的Excel文件

- 处理不同的数据类型和格式

- 处理合并单元格

- 处理隐藏的行和列

安装所需的依赖:

pip install pandas openpyxl xlrd pyxlsb psutil

"""