pillow压缩图片尺寸并保持图片质量(crontab定时任务最强解析)

很久前不小心上传了一批大尺寸文件,虽然wordpress插件压缩优化了,但是源文件还是存在的。写了段代码,多线程pillow压缩图片尺寸,速度很快,毕竟大部分时间会耗在图片打开保存环节,多线程有提升效果。2000多张图片大概需要4秒钟,实际上十几万图片花费时间估计也差不多。

遍历文件夹用的os.walk,然后多线程去运行pillow压缩程序,比用插件强多了,各种优化插件只会拖慢速度,还不如写个脚本在后台直接压缩图片。

thumbnail相比resize还是有区别的,前者只能压缩不能增加尺寸,而且它能保持图片原来的比例,非常方便。当然,要记得antialias了,做了那么多年地震数据处理分析,自然晓得防假频的重要性了。

# the script was written to loop through a folder and resize all large pics to smaller sizes
# multi-threading is turned on to speed up process

import os
from PIL import Image
import sys
import threading
from sys import argv
import time

years = ['2019', '2020']
file_size = (1024, 768)

def resize(files):
    for file in files:
        if file.endswith('jpg') or file.endswith('png'):
            # print(os.path.abspath(file))
            # print(os.path.join(root,file))
            file_path = os.path.join(root,file)
            filesize = os.stat(file_path).st_size/1024
            if filesize > 200:
                print(file, file_path, filesize)
                img = Image.open(file_path)
                print(img.size)
                img.thumbnail(file_size,Image.ANTIALIAS)
                img = img.convert("RGB") # PNG CONVERT TO JPEG
                print(img.size)
                img.save(file_path, "JPEG")
                
                
def filewrite(filepath):
    with open('filesize_list.txt', 'a') as f:
        f.write(os.path.abspath(filepath))
        f.write('\n')

def sizeCheck(files):
    thr_name = threading.current_thread().name
    print(f'{thr_name} is processing {len(files)} files')
    for file in files:
        file_path = os.path.join(root,file)
        # file_path = os.path.abspath(file)
        filesize = os.stat(file_path).st_size/1024/1024
        if filesize > 2:
            print(f'threading: {file}, {file_path}, {int(filesize)}MB')
            # lock.acquire()
            filewrite(file_path)
            # lock.release()
            
def split_files(file_list, split_num):
    thread_list = []
    # list size each thread has to process
    list_size = (len(file_list) // split_num) if (len(file_list) % split_num == 0) else ((len(file_list) // split_num) + 1)
    print(f'num of files to check {list_size}')
    # start thread
    for i in range(split_num):
        # get url that current thread need to process
        file_list_split = file_list[
                         i * list_size:(i + 1) * list_size if len(file_list) > (i + 1) * list_size else len(file_list)]
        thread = threading.Thread(target=resize, args=(file_list_split,))
        thread.setName("Thread" + str(i))
        thread_list.append(thread)
        # start in thread
        thread.start()
        # print(thread.getName() + "started")
    # combine at the end of the job
    for _item in thread_list:
        _item.join()
        
if __name__ == "__main__":
    t1 = time.time()
    thread_num = 6
    lock = threading.Lock()
    print("add the directory where you want to check filesizes or leave it to default:") 
    if len(argv) > 1:
        dirs_to_check = [','.join(argv[i]) for i in range(len(argv))]        
        print(f'num of folders to check: {dirs_to_check}  {len(dirs_to_check)}')
    else:
        dirs_to_check = ['/www/wwwroot/geoseis.cn/wp-content/uploads/2019', '/www/wwwroot/geoseis.cn/wp-content/uploads/2020']
    file_list_ = []
    for dir_to_check in dirs_to_check:      
        print(f'dir_to_check {dir_to_check}')
        for root, dirs, files in os.walk(dir_to_check):
            # print(root, dirs, files, len(files))
            for i in files:
                file_list_.append(''.join(root + '/'+ i))
    print(f'num of files to scan: {len(file_list_)}')
    split_files(file_list_, thread_num) # thread_num
    t2 = time.time()
    print(f'time lapsed: {t2-t1}, num of threads used: {thread_num}')
            
                        
*/300  * * * *  sudo python /www/wwwroot/geoseis.cn/wp-content/uploads/resize_threading.py

用crontab做定时任务,每5小时运行一次,这里有个网站,实时测试你的crontab任务格式是否正确,强烈推荐! https://crontab.guru/#0_/1_

每天00:05执行
每300分钟执行一次
每周五的05:10执行

scrapy redis 增量爬虫

各种技术贴满满的copy, paste味道,真遇到实际问题却很少提及,比如最常见的增量爬虫。scrapy redis在第一次爬取时可以用自带的驱虫中间件,但是增量爬虫不行,这里我写了个中间件可以实现增量爬取,前提是目标url会变化。如果是url不变但是网页内容变化的增量爬虫,就需要新的解决方案了。

虽然配合redis,scrapy理论上可以暂停重启,但是有时候程序可能有问题一次ctrl c无法停止,强制两次ctrl C又会破坏scrapy redis的重启机制。很多时候dupefilter里的url指纹和mysql数据库里的url可能不一致,甚至有些情况下你必须清空dupefilter重新运行爬虫(强制停止爬虫时很多requests虽然没有爬取成功但是指纹却放到了dupefilter,下一次爬取时就会忽略这些url),这个中间件就能解决你的问题。

方案很简单,就是从mysql读取所有的url放到redis,每次request之前都会检查下所要请求的url是否已经爬取了,redis比mysql效率高很多,非常适合这种秒级应用。另外,如果url很多,可以用md5来节省redis内存,这里没有用,很容易实现。

class DedupeMiddleware(object): # 定制去重中间件

    logger = logging.getLogger(__name__) # 利用这个可以方便进行scrapy 的log输出
    client = redis.Redis(host=settings.REDIS_HOST, port=6379, db=0, password=settings.REDIS_PWD)
    if client:
        logger.info('redis connected for dedupe')
    else:
        logger.info('redis connect failed for dedupe') 

    def process_request(self, request, spider):
        # Called for each request that goes through the downloader
        # middleware.

        # Must either:
        # - return None: continue processing this request
        # - or return a Response object
        # - or return a Request object
        # - or raise IgnoreRequest: process_exception() methods of
        #   installed downloader middleware will be called
        if self.client.hexists('existing_url', request.url): #取item里的url和key里的字段对比,看是否存在,存在就丢掉这个item。不存在返回item给后面的函数处理
            self.logger.info(f'{request.url} already in mysql')
            raise IgnoreRequest(f'{request.url} already in mysql')
        return None