很久前不小心上传了一批大尺寸文件,虽然wordpress插件压缩优化了,但是源文件还是存在的。写了段代码,多线程pillow压缩图片尺寸,速度很快,毕竟大部分时间会耗在图片打开保存环节,多线程有提升效果。2000多张图片大概需要4秒钟,实际上十几万图片花费时间估计也差不多。
遍历文件夹用的os.walk,然后多线程去运行pillow压缩程序,比用插件强多了,各种优化插件只会拖慢速度,还不如写个脚本在后台直接压缩图片。
thumbnail相比resize还是有区别的,前者只能压缩不能增加尺寸,而且它能保持图片原来的比例,非常方便。当然,要记得antialias了,做了那么多年地震数据处理分析,自然晓得防假频的重要性了。
# the script was written to loop through a folder and resize all large pics to smaller sizes
# multi-threading is turned on to speed up process
import os
from PIL import Image
import sys
import threading
from sys import argv
import time
years = ['2019', '2020']
file_size = (1024, 768)
def resize(files):
for file in files:
if file.endswith('jpg') or file.endswith('png'):
# print(os.path.abspath(file))
# print(os.path.join(root,file))
file_path = os.path.join(root,file)
filesize = os.stat(file_path).st_size/1024
if filesize > 200:
print(file, file_path, filesize)
img = Image.open(file_path)
print(img.size)
img.thumbnail(file_size,Image.ANTIALIAS)
img = img.convert("RGB") # PNG CONVERT TO JPEG
print(img.size)
img.save(file_path, "JPEG")
def filewrite(filepath):
with open('filesize_list.txt', 'a') as f:
f.write(os.path.abspath(filepath))
f.write('\n')
def sizeCheck(files):
thr_name = threading.current_thread().name
print(f'{thr_name} is processing {len(files)} files')
for file in files:
file_path = os.path.join(root,file)
# file_path = os.path.abspath(file)
filesize = os.stat(file_path).st_size/1024/1024
if filesize > 2:
print(f'threading: {file}, {file_path}, {int(filesize)}MB')
# lock.acquire()
filewrite(file_path)
# lock.release()
def split_files(file_list, split_num):
thread_list = []
# list size each thread has to process
list_size = (len(file_list) // split_num) if (len(file_list) % split_num == 0) else ((len(file_list) // split_num) + 1)
print(f'num of files to check {list_size}')
# start thread
for i in range(split_num):
# get url that current thread need to process
file_list_split = file_list[
i * list_size:(i + 1) * list_size if len(file_list) > (i + 1) * list_size else len(file_list)]
thread = threading.Thread(target=resize, args=(file_list_split,))
thread.setName("Thread" + str(i))
thread_list.append(thread)
# start in thread
thread.start()
# print(thread.getName() + "started")
# combine at the end of the job
for _item in thread_list:
_item.join()
if __name__ == "__main__":
t1 = time.time()
thread_num = 6
lock = threading.Lock()
print("add the directory where you want to check filesizes or leave it to default:")
if len(argv) > 1:
dirs_to_check = [','.join(argv[i]) for i in range(len(argv))]
print(f'num of folders to check: {dirs_to_check} {len(dirs_to_check)}')
else:
dirs_to_check = ['/www/wwwroot/geoseis.cn/wp-content/uploads/2019', '/www/wwwroot/geoseis.cn/wp-content/uploads/2020']
file_list_ = []
for dir_to_check in dirs_to_check:
print(f'dir_to_check {dir_to_check}')
for root, dirs, files in os.walk(dir_to_check):
# print(root, dirs, files, len(files))
for i in files:
file_list_.append(''.join(root + '/'+ i))
print(f'num of files to scan: {len(file_list_)}')
split_files(file_list_, thread_num) # thread_num
t2 = time.time()
print(f'time lapsed: {t2-t1}, num of threads used: {thread_num}')
*/300 * * * * sudo python /www/wwwroot/geoseis.cn/wp-content/uploads/resize_threading.py
用crontab做定时任务,每5小时运行一次,这里有个网站,实时测试你的crontab任务格式是否正确,强烈推荐! https://crontab.guru/#0_/1_


