5秒钟揪出电脑中大文件 – python多线程遍历文件夹

python多线程遍历文件夹, 获取文件大小, list 线程安全

电脑用久了,自然疏于管理大文件,不经意间硬盘就告急了。这里祭出大招,5秒钟遍历13万文件,揪出你想找的大文件!这里用了多线程,测试中发现,线程数超过4就意义不大了,毕竟只有十几万文件。

虽然脚本只有几十行,但是这个多线程的使用很有意义,解决了python list 线程安全这个问题

# the script was written to loop through a folder check sizes of all files
# multi-threading is turned on to speed up process

import os
import threading
from sys import argv
import time

years = ['2019', '2020']

def filewrite(filepath):
    with open('filesize_list.txt', 'a') as f:
        f.write(os.path.abspath(filepath))
        f.write('\n')

def sizeCheck(files):
    thr_name = threading.current_thread().name
    print(f'{thr_name} is processing {len(files)} files')
    for file in files:
        file_path = os.path.join(root,file)
        # file_path = os.path.abspath(file)
        filesize = os.stat(file_path).st_size/1024/1024
        if filesize > 3072:
            print(f'threading: {file}, {file_path}, {int(filesize)}MB')
            # lock.acquire()
            filewrite(file_path)
            # lock.release()
            
def split_files(file_list, split_num):
    thread_list = []
    # list size each thread has to process
    list_size = (len(file_list) // split_num) if (len(file_list) % split_num == 0) else ((len(file_list) // split_num) + 1)
    print(f'num of files to check {list_size}')
    # start thread
    for i in range(split_num):
        # get url that current thread need to process
        file_list_split = file_list[
                         i * list_size:(i + 1) * list_size if len(file_list) > (i + 1) * list_size else len(file_list)]
        thread = threading.Thread(target=sizeCheck, args=(file_list_split,))
        thread.setName("Thread" + str(i))
        thread_list.append(thread)
        # start in thread
        thread.start()
        # print(thread.getName() + "started")
    # combine at the end of the job
    for _item in thread_list:
        _item.join()
        
if __name__ == "__main__":
    t1 = time.time()
    thread_num = 6
    lock = threading.Lock()
    print("add the directory where you want to check filesizes or leave it to default:") 
    if len(argv) > 1:
        dirs_to_check = [','.join(argv[i]) for i in range(len(argv))]        
        print(f'num of folders to check: {dirs_to_check}  {len(dirs_to_check)}')
    else:
        dirs_to_check = ['D:\\', 'E:\\', 'F:\\', 'G:\\']
    file_list_ = []
    for dir_to_check in dirs_to_check:      
        print(f'dir_to_check {dir_to_check}')
        for root, dirs, files in os.walk(dir_to_check):
            # print(root, dirs, files, len(files))
            for i in files:
                file_list_.append(''.join(root + '\\'+ i))
    print(f'num of files to scan: {len(file_list_)}')
    split_files(file_list_, thread_num) # thread_num
    t2 = time.time()
    print(f'time lapsed: {t2-t1}, num of threads used: {thread_num}')
            
       
5秒钟遍历13万文件

pillow压缩图片尺寸并保持图片质量(crontab定时任务最强解析)

很久前不小心上传了一批大尺寸文件,虽然wordpress插件压缩优化了,但是源文件还是存在的。写了段代码,多线程pillow压缩图片尺寸,速度很快,毕竟大部分时间会耗在图片打开保存环节,多线程有提升效果。2000多张图片大概需要4秒钟,实际上十几万图片花费时间估计也差不多。

遍历文件夹用的os.walk,然后多线程去运行pillow压缩程序,比用插件强多了,各种优化插件只会拖慢速度,还不如写个脚本在后台直接压缩图片。

thumbnail相比resize还是有区别的,前者只能压缩不能增加尺寸,而且它能保持图片原来的比例,非常方便。当然,要记得antialias了,做了那么多年地震数据处理分析,自然晓得防假频的重要性了。

# the script was written to loop through a folder and resize all large pics to smaller sizes
# multi-threading is turned on to speed up process

import os
from PIL import Image
import sys
import threading
from sys import argv
import time

years = ['2019', '2020']
file_size = (1024, 768)

def resize(files):
    for file in files:
        if file.endswith('jpg') or file.endswith('png'):
            # print(os.path.abspath(file))
            # print(os.path.join(root,file))
            file_path = os.path.join(root,file)
            filesize = os.stat(file_path).st_size/1024
            if filesize > 200:
                print(file, file_path, filesize)
                img = Image.open(file_path)
                print(img.size)
                img.thumbnail(file_size,Image.ANTIALIAS)
                img = img.convert("RGB") # PNG CONVERT TO JPEG
                print(img.size)
                img.save(file_path, "JPEG")
                
                
def filewrite(filepath):
    with open('filesize_list.txt', 'a') as f:
        f.write(os.path.abspath(filepath))
        f.write('\n')

def sizeCheck(files):
    thr_name = threading.current_thread().name
    print(f'{thr_name} is processing {len(files)} files')
    for file in files:
        file_path = os.path.join(root,file)
        # file_path = os.path.abspath(file)
        filesize = os.stat(file_path).st_size/1024/1024
        if filesize > 2:
            print(f'threading: {file}, {file_path}, {int(filesize)}MB')
            # lock.acquire()
            filewrite(file_path)
            # lock.release()
            
def split_files(file_list, split_num):
    thread_list = []
    # list size each thread has to process
    list_size = (len(file_list) // split_num) if (len(file_list) % split_num == 0) else ((len(file_list) // split_num) + 1)
    print(f'num of files to check {list_size}')
    # start thread
    for i in range(split_num):
        # get url that current thread need to process
        file_list_split = file_list[
                         i * list_size:(i + 1) * list_size if len(file_list) > (i + 1) * list_size else len(file_list)]
        thread = threading.Thread(target=resize, args=(file_list_split,))
        thread.setName("Thread" + str(i))
        thread_list.append(thread)
        # start in thread
        thread.start()
        # print(thread.getName() + "started")
    # combine at the end of the job
    for _item in thread_list:
        _item.join()
        
if __name__ == "__main__":
    t1 = time.time()
    thread_num = 6
    lock = threading.Lock()
    print("add the directory where you want to check filesizes or leave it to default:") 
    if len(argv) > 1:
        dirs_to_check = [','.join(argv[i]) for i in range(len(argv))]        
        print(f'num of folders to check: {dirs_to_check}  {len(dirs_to_check)}')
    else:
        dirs_to_check = ['/www/wwwroot/geoseis.cn/wp-content/uploads/2019', '/www/wwwroot/geoseis.cn/wp-content/uploads/2020']
    file_list_ = []
    for dir_to_check in dirs_to_check:      
        print(f'dir_to_check {dir_to_check}')
        for root, dirs, files in os.walk(dir_to_check):
            # print(root, dirs, files, len(files))
            for i in files:
                file_list_.append(''.join(root + '/'+ i))
    print(f'num of files to scan: {len(file_list_)}')
    split_files(file_list_, thread_num) # thread_num
    t2 = time.time()
    print(f'time lapsed: {t2-t1}, num of threads used: {thread_num}')
            
                        
*/300  * * * *  sudo python /www/wwwroot/geoseis.cn/wp-content/uploads/resize_threading.py

用crontab做定时任务,每5小时运行一次,这里有个网站,实时测试你的crontab任务格式是否正确,强烈推荐! https://crontab.guru/#0_/1_

每天00:05执行
每300分钟执行一次
每周五的05:10执行