从七牛云迁移到 Cloudflare R2,手把手教你搭建个人图床!

哈喽,大家好啊!

在平时大家记录点东西、写笔记、做博客之类,经常需要插入图片。

以前我一直用七牛云做个人图床,个人有免费10G存储空间用着还不错,但最近流量费用是在负担不起,所以就只能从七牛云迁移到 Cloudflare R2。

这出口流量费用还是有点肉疼。

所以在综合对比之后,我最终还是决定将图床迁移到 Cloudflare R2。

并且相比七牛云,Cloudflare R2 具有天然的优势,出入流量不限制:

  • 不收取出口流量费用,图片访问再多也不用担心流量费用,不像国内的云厂商。
  • 免费提供 10GB 存储额度,个人博客基本够用。
  • 自带 Cloudflare CDN 网络,无需额外购买加速服务。
  • 支持绑定自定义域名。
  • 每月提供约 100 万次写入请求 + 1000 万次读取请求,个人博客图片加载基本无压力。

接下来我会把七牛云上的图片逐步迁移到 Cloudflare R2,并配置自定义域名。

使用七牛云作为备份存储,实现双备份,避免单个平台出现问题导致图片数据丢失。

同时教大家如何搭建自定义图床,顺便简单记录一下。

一、准备

在开始之前,需要提前准备以下内容:

  1. 一个 Cloudflare 账号,没有可以直接使用QQ邮箱注册一个

  2. 一个域名,没有的话可以购买一个,域名推荐大家可以去Spaceship 购买一个XYZ后缀的域名,输入优惠码<code>XYZ52</code>,折合人民币几块钱,支持国内支付,比国内任何X云都便宜。

https://spaceship.sjv.io/c/7338998/1794549/21274

  1. 七牛云图床中的图片数据(如果需要迁移)

二、Cloudflare 添加域名

打开 <code>Cloudflare</code>官网 ,如果没有账户可以注册一个。

Cloudflare 注册完成、通过邮箱验证后,会直接跳转到面板页,点击域名—>概览—>加入域。

输入刚刚购买好的域名,然后点击继续。

选择免费计划,继续选择前往激活。

复制<code>cloudflare</code>生成的DNS 填写到刚刚注册的域名对应的DNS地址里面。

回到 Cloudflare,点击底部我已更新名称服务器,接下来,就只需要等待很短的时间,一般为 3-5 分钟添加托管到 Cloudflare 的域名就会生效。

三、创建 Cloudflare R2 存储桶

接下来我们开始创建 Cloudflare R2 存储桶,并绑定自定义域名。

1. 进入 R2 控制台

在 Cloudflare 后台左侧菜单找到存储和数据库→ R2对象存储→将R2订阅添加到我的帐户。

接着绑定支付方式。

点击进入后,选择 Create bucket(创建存储桶)

2. 创建 Bucket

这里需要填写一个 Bucket 名称,例如:<code>blog-img</code>,位置选择亚太地区,然后点击 Create bucket

创建成功后,你会看到一个空的存储桶界面。

3. 绑定自定义域名

接下来绑定自己的域名,点击设置→自定义域名

输入你的域名,例如:

img.yourdomain.com

然后 Cloudflare 会自动帮你完成解析。绑定成功后,你的图片访问地址会变成:

https://img.yourdomain.com/xxx.png

当然,你也可以开启开发域名,然后Cloudflare 会提供临时一个域名提供你测试访问。

4. 设置CORS 访问策略

为了后期不必要的麻烦,这里简单设置一下CORS访问策略,点击CORS 策略添加。

复制如下内容填入后保存。

[
  {
    &quot;AllowedOrigins&quot;: [
      &quot;http://img.yourdomain.com&quot;,
      &quot;https://img.yourdomain.com&quot;
    ],
    &quot;AllowedMethods&quot;: [
      &quot;GET&quot;,
      &quot;POST&quot;,
      &quot;PUT&quot;,
      &quot;DELETE&quot;,
      &quot;HEAD&quot;
    ],
    &quot;AllowedHeaders&quot;: [&quot;*&quot;],
    &quot;ExposeHeaders&quot;: [&quot;ETag&quot;],
    &quot;MaxAgeSeconds&quot;: 3600
  }
]

5.获取 API令牌

在R2 控制台右下角点击管理

创建账户 API 令牌

配置权限支持读写操作,并且设置

最后保存访问密钥ID和机密访问密钥

四、迁移七牛云数据

1. 下载迁移工具

先去官网下载 <code>qshell</code> 工具:

https://developer.qiniu.com/kodo/1302/qshell

根据自己的操作系统下载对应版本,Linux安装示例如下:

wget https://github.com/qiniu/qshell/releases/download/v2.19.8/qshell-v2.19.8-linux-amd64.tar.gz
tar -xzf qshell-v2.19.8-linux-amd64.tar.gz
chmod +x qshell
mv qshell /usr/local/bin/qshell
qshell -v

安装好<code>qshell</code>以后安装 <code>rclone</code>:

curl https://rclone.org/install.sh | sudo bash
rclone version

创建工作目录:

mkdir -p ~/qiniu-r2-migration/data
cd ~/qiniu-r2-migration

2. 环境变量配置

把下面占位符替换成你的真实值:

export QINIU_AK=&quot;你的七牛AK&quot;
export QINIU_SK=&quot;你的七牛SK&quot;
export QINIU_BUCKET=&quot;你的七牛空间名&quot;
export LOCAL_DIR=&quot;$HOME/qiniu-r2-migration/data&quot;

export R2_ACCOUNT_ID=&quot;你的Cloudflare账户ID&quot;
export R2_ACCESS_KEY_ID=&quot;你的R2 Access Key ID&quot;
export R2_SECRET_ACCESS_KEY=&quot;你的R2 Secret Access Key&quot;
export R2_BUCKET=&quot;你的R2桶名&quot;

3. 七牛认证

配置七牛认证:

qshell account &quot;${QINIU_AK}&quot; &quot;${QINIU_SK}&quot; default

4. 七牛批量下载

生成七牛下载配置文件,全量迁移整个空间时用:

cat &gt; qdownload.json &lt;&lt;EOF
{
  &quot;dest_dir&quot;: &quot;${LOCAL_DIR}&quot;,
  &quot;bucket&quot;: &quot;${QINIU_BUCKET}&quot;,
  &quot;prefix&quot;: &quot;&quot;,
  &quot;check_exists&quot;: true
}
EOF

开始从七牛下载到本地:

qshell qdownload qdownload.json

5. 统计

下载完成后先看本地统计:

find &quot;${LOCAL_DIR}&quot; -type f | wc -l
du -sh &quot;${LOCAL_DIR}&quot;

6. 创建rclone 配置文件。

mkdir -p ~/.config/rclone
cat &gt; ~/.config/rclone/rclone.conf &lt;&lt;EOF
[qiniu]
type = s3
provider = Other
access_key_id = ${QINIU_AK}
secret_access_key = ${QINIU_SK}
endpoint = https://s3-cn-east-1.qiniucs.com
acl = private
no_check_bucket = true

[r2]
type = s3
provider = Cloudflare
access_key_id = ${R2_ACCESS_KEY_ID}
secret_access_key = ${R2_SECRET_ACCESS_KEY}
endpoint = https://${R2_ACCOUNT_ID}.r2.cloudflarestorage.com
acl = private
no_check_bucket = true
EOF

7. 创建rclone 配置文件

然后创建配置文件

mkdir -p ~/.config/rclone
cat &gt; ~/.config/rclone/rclone.conf &lt;&lt;EOF
[qiniu]
type = s3
provider = Other
access_key_id = ${QINIU_AK}
secret_access_key = ${QINIU_SK}
endpoint = https://s3-cn-east-1.qiniucs.com
acl = private
no_check_bucket = true

[r2]
type = s3
provider = Cloudflare
access_key_id = ${R2_ACCESS_KEY_ID}
secret_access_key = ${R2_SECRET_ACCESS_KEY}
endpoint = https://${R2_ACCOUNT_ID}.r2.cloudflarestorage.com
acl = private
no_check_bucket = true
EOF

8. 演练测试

把七牛迁到 R2,先演练测试:

rclone sync &quot;qiniu:${QINIU_BUCKET}&quot; &quot;r2:${R2_BUCKET}&quot; --dry-run -P

9. 执行上传任务

确认无误后正式执行上传同步任务:

rclone copy &quot;qiniu:${QINIU_BUCKET}&quot; &quot;r2:${R2_BUCKET}&quot; -P --transfers 16 --checkers 32 --fast-list

10. 确认检验

迁移完成后,确认七牛和 R2 的文件完全一致先跑 <code>check</code>检查一般。

rclone check &quot;qiniu:${QINIU_BUCKET}&quot; &quot;r2:${R2_BUCKET}&quot; --size-only -P

如果结果是 <code>0 differences found</code>,就说明两边文件数量和大小一致。

五、实现 R2 和七牛云双写备份

虽然 Cloudflare R2 很香,但我个人并不习惯把所有数据放在单一平台。

万一哪天账号出现异常或者平台出现故障,图床中的所有图片都有可能受到影响。

因此我又保留了七牛云作为备份存储,实现:

上传图片
    │
    ├── Cloudflare R2(主存储)
    │
    └── 七牛云(备份存储)

这样即使其中一个存储出现问题,也可以快速切换到另一个存储服务。

下面是我自己配合Typora 上传图片的脚本,你改改自己的配置直接安装好环境直接用就行,或者让你的AI 帮你改也可以。

#!/usr/bin/python
# -*- coding: UTF-8 -*-
&quot;&quot;&quot;
PicGo 风格的命令行图片上传工具:R2 主写入,七牛云备份双写。

功能特色:
- 支持从命令行上传本地图片文件或 HTTP/HTTPS 图片地址。
- 先写入 Cloudflare R2,再把同一份对象写入七牛云作为备份。
- 输出 Markdown 图片链接,格式为:![](https://example.com/path/image.jpg)
- 如果输入地址已经属于 PUBLIC_DOMAIN,会直接复用原链接,避免重复上传。
- JPG/JPEG/JFIF、PNG、WebP、静态 GIF 会先尝试无损优化。
- 如果图片仍然超过 5 MB,会在不改变宽高尺寸的前提下继续压缩,
  方便满足微信公众号等平台的图片大小限制。

安装依赖:
    uv pip install boto3 qiniu loguru Pillow

可选系统依赖,用于更好地无损优化 JPEG:
    brew install jpeg-turbo
    # Linux 可安装发行版中提供 jpegtran 的软件包

必填环境变量:
    R2_ACCOUNT_ID
    R2_ACCESS_KEY_ID
    R2_SECRET_ACCESS_KEY
    R2_BUCKET
    QINIU_ACCESS_KEY
    QINIU_SECRET_KEY
    QINIU_BUCKET_NAME
    PUBLIC_DOMAIN

可选环境变量:
    R2_ENDPOINT_URL       # 默认:https://&lt;R2_ACCOUNT_ID&gt;.r2.cloudflarestorage.com
    BUSINESS_MEDIA_PATH   # 默认:md
    MAX_IMAGE_SIZE_BYTES  # 默认:5000000

使用示例:
    python picgo_public.py /path/to/image.png
    python picgo_public.py https://example.com/image.jpg
&quot;&quot;&quot;
import datetime
import hashlib
import io
import os
import shutil
import subprocess
import sys
import time
import urllib.parse
import urllib.request
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from typing import Optional

from loguru import logger
from qiniu import Auth, BucketManager, put_data
import boto3
from botocore.client import Config

try:
    from PIL import Image
except ImportError:
    Image = None

BASE_DIR = Path(__file__).resolve().parent
LOG_DIR = BASE_DIR / &quot;logs&quot;
LOG_DIR.mkdir(exist_ok=True)
LOG_FILE = LOG_DIR / f&quot;{datetime.datetime.now().strftime(&#039;%Y%m&#039;)}.log&quot;

logger.add(str(LOG_FILE), retention=&quot;10 days&quot;)

QINIU_ACCESS_KEY = os.getenv(
    &quot;QINIU_ACCESS_KEY&quot;,
    &quot;&quot;,
)
QINIU_SECRET_KEY = os.getenv(
    &quot;QINIU_SECRET_KEY&quot;,
    &quot;&quot;,
)
QINIU_BUCKET_NAME = os.getenv(&quot;QINIU_BUCKET_NAME&quot;, &quot;&quot;)

R2_ACCOUNT_ID = os.getenv(&quot;R2_ACCOUNT_ID&quot;, &quot;&quot;)
R2_ACCESS_KEY_ID = os.getenv(&quot;R2_ACCESS_KEY_ID&quot;, &quot;&quot;)
R2_SECRET_ACCESS_KEY = os.getenv(
    &quot;R2_SECRET_ACCESS_KEY&quot;,
    &quot;&quot;,
)
R2_BUCKET = os.getenv(&quot;R2_BUCKET&quot;, &quot;&quot;)
R2_ENDPOINT_URL = os.getenv(
    &quot;R2_ENDPOINT_URL&quot;,
    f&quot;https://{R2_ACCOUNT_ID}.r2.cloudflarestorage.com&quot; if R2_ACCOUNT_ID else &quot;&quot;,
)
PUBLIC_DOMAIN = os.getenv(&quot;PUBLIC_DOMAIN&quot;, &quot;&quot;)
BUSINESS_MEDIA_PATH = os.getenv(&quot;BUSINESS_MEDIA_PATH&quot;, &quot;md&quot;)
MAX_IMAGE_SIZE_BYTES = int(os.getenv(&quot;MAX_IMAGE_SIZE_BYTES&quot;, &quot;5000000&quot;))

@dataclass
class FilePayload:
    content: bytes
    original_filename: str
    size: int
    source_path: str

@dataclass
class UploadResult:
    provider: str
    key: str
    url: str

class StorageBackend(ABC):
    def __init__(self, base_path: str):
        self.base_path = (base_path or &quot;&quot;).strip(&quot;/&quot;)

    def build_object_key(self, file_name: str) -&gt; str:
        file_name = os.path.basename(file_name)
        date_folder = datetime.datetime.now().strftime(&quot;%Y/%m&quot;)
        if self.base_path:
            return f&quot;{self.base_path}/{date_folder}/{file_name}&quot;
        return f&quot;{date_folder}/{file_name}&quot;

    def generate_md5_filename(
            self,
            file_data: bytes,
            original_filename: Optional[str] = None,
    ) -&gt; str:
        md5_hash = hashlib.md5(file_data).hexdigest()
        extension = self.get_file_extension(original_filename or &quot;&quot;)
        return f&quot;{md5_hash}{extension}&quot; if extension else md5_hash

    @staticmethod
    def get_file_extension(file_path: str) -&gt; str:
        if file_path.startswith((&quot;http://&quot;, &quot;https://&quot;)):
            parsed_url = urllib.parse.urlparse(file_path)
            file_path = parsed_url.path
        return os.path.splitext(file_path)[1].lower()

    @abstractmethod
    def upload(self, payload: FilePayload, object_key: str) -&gt; Optional[UploadResult]:
        raise NotImplementedError

class QiniuStorage(StorageBackend):
    def __init__(
            self,
            access_key: str = QINIU_ACCESS_KEY,
            secret_key: str = QINIU_SECRET_KEY,
            bucket_name: str = QINIU_BUCKET_NAME,
            domain: str = PUBLIC_DOMAIN,
            base_path: str = BUSINESS_MEDIA_PATH,
    ):
        super().__init__(base_path)
        self.access_key = access_key
        self.secret_key = secret_key
        self.bucket_name = bucket_name
        self.domain = domain.rstrip(&quot;/&quot;)
        self.enabled = all(
            [self.access_key, self.secret_key, self.bucket_name, self.domain]
        )
        self.auth = Auth(self.access_key, self.secret_key) if self.enabled else None
        self.bucket_manager = BucketManager(self.auth) if self.enabled else None

        if self.enabled:
            logger.info(f&quot;QiniuStorage initialized for bucket: {bucket_name}&quot;)
        else:
            logger.warning(
                &quot;Qiniu backup is disabled because required env vars are missing&quot;
            )

    def get_file_url(self, key: str, expires: Optional[int] = None) -&gt; str:
        base_url = f&quot;http://{self.domain}/{key}&quot;
        if expires:
            return self.auth.private_download_url(base_url, expires=expires)
        return base_url

    def upload(self, payload: FilePayload, object_key: str) -&gt; Optional[UploadResult]:
        if not self.enabled:
            return None

        try:
            token = self.auth.upload_token(self.bucket_name, object_key, 3600)
            _, info = put_data(token, object_key, payload.content)

            if info.status_code != 200:
                logger.error(f&quot;Qiniu upload failed: {info.error}&quot;)
                return None

            logger.info(f&quot;Qiniu backup uploaded successfully: {object_key}&quot;)
            return UploadResult(
                provider=&quot;qiniu&quot;,
                key=object_key,
                url=self.get_file_url(object_key),
            )
        except Exception as exc:
            logger.error(f&quot;Error uploading file to Qiniu: {exc}&quot;)
            return None

class R2Storage(StorageBackend):
    def __init__(
            self,
            account_id: str = R2_ACCOUNT_ID,
            access_key_id: str = R2_ACCESS_KEY_ID,
            secret_access_key: str = R2_SECRET_ACCESS_KEY,
            bucket_name: str = R2_BUCKET,
            domain: str = PUBLIC_DOMAIN,
            base_path: str = BUSINESS_MEDIA_PATH,
            endpoint_url: str = R2_ENDPOINT_URL,
    ):
        super().__init__(base_path)
        self.account_id = account_id
        self.access_key_id = access_key_id
        self.secret_access_key = secret_access_key
        self.bucket_name = bucket_name
        self.domain = domain.rstrip(&quot;/&quot;)
        self.endpoint_url = endpoint_url

        if not all([
            self.account_id,
            self.access_key_id,
            self.secret_access_key,
            self.bucket_name,
        ]):
            raise ValueError(&quot;R2 is required but related env vars are missing&quot;)

        if boto3 is None or Config is None:
            raise ImportError(
                &quot;boto3 is required for R2 uploads. Please install it with pip install boto3&quot;
            )

        self.client = boto3.client(
            &quot;s3&quot;,
            endpoint_url=self.endpoint_url,
            aws_access_key_id=self.access_key_id,
            aws_secret_access_key=self.secret_access_key,
            region_name=&quot;auto&quot;,
            config=Config(signature_version=&quot;s3v4&quot;),
        )
        logger.info(f&quot;R2Storage initialized for bucket: {bucket_name}&quot;)

    def get_file_url(self, key: str) -&gt; str:
        if self.domain:
            return f&quot;https://{self.domain}/{key}&quot;
        return f&quot;{self.endpoint_url}/{self.bucket_name}/{key}&quot;

    def upload(self, payload: FilePayload, object_key: str) -&gt; Optional[UploadResult]:
        self.client.put_object(
            Bucket=self.bucket_name,
            Key=object_key,
            Body=payload.content,
            ContentLength=payload.size,
        )
        logger.info(f&quot;R2 primary upload successful: {object_key}&quot;)
        return UploadResult(
            provider=&quot;r2&quot;,
            key=object_key,
            url=self.get_file_url(object_key),
        )

class FileLoader:
    @staticmethod
    def load(path_or_url: str) -&gt; FilePayload:
        try:
            if path_or_url.startswith((&quot;http://&quot;, &quot;https://&quot;)):
                return FileLoader._load_from_url(path_or_url)
            return FileLoader._load_from_file(path_or_url)
        except Exception as exc:
            raise ValueError(f&quot;Error reading file content: {exc}&quot;) from exc

    @staticmethod
    def _load_from_url(url: str) -&gt; FilePayload:
        logger.info(f&quot;Downloading file from URL: {url}&quot;)
        filename = os.path.basename(urllib.parse.urlparse(url).path)
        if not filename:
            filename = f&quot;download_{int(time.time())}&quot;

        response = urllib.request.urlopen(url)
        if response.status != 200:
            raise ValueError(
                f&quot;Failed to download file from URL: HTTP {response.status}&quot;
            )

        content = response.read()
        return FilePayload(
            content=content,
            original_filename=filename,
            size=len(content),
            source_path=url,
        )

    @staticmethod
    def _load_from_file(file_path: str) -&gt; FilePayload:
        if not os.path.exists(file_path):
            raise FileNotFoundError(f&quot;File not found: {file_path}&quot;)

        with open(file_path, &quot;rb&quot;) as file:
            content = file.read()

        return FilePayload(
            content=content,
            original_filename=os.path.basename(file_path),
            size=len(content),
            source_path=file_path,
        )

class ImageOptimizer:
    PILLOW_FORMATS = {&quot;.png&quot;, &quot;.webp&quot;, &quot;.gif&quot;}
    JPEG_FORMATS = {&quot;.jpg&quot;, &quot;.jpeg&quot;, &quot;.jfif&quot;}

    @classmethod
    def optimize(cls, payload: FilePayload) -&gt; FilePayload:
        extension = StorageBackend.get_file_extension(payload.original_filename)
        original_size = payload.size
        optimized_content = None

        if extension in cls.JPEG_FORMATS:
            optimized_content = cls._optimize_jpeg(payload.content)
        elif extension in cls.PILLOW_FORMATS:
            optimized_content = cls._optimize_with_pillow(payload.content, extension)

        if optimized_content and len(optimized_content) &lt; payload.size:
            payload = FilePayload(
                content=optimized_content,
                original_filename=payload.original_filename,
                size=len(optimized_content),
                source_path=payload.source_path,
            )
            logger.info(f&quot;Image optimized: {original_size} -&gt; {payload.size} bytes&quot;)

        if payload.size &lt;= MAX_IMAGE_SIZE_BYTES:
            return payload

        compressed_content = cls._compress_to_limit(
            payload.content,
            extension,
            MAX_IMAGE_SIZE_BYTES,
        )
        if not compressed_content or len(compressed_content) &gt;= payload.size:
            logger.warning(
                f&quot;Image still exceeds {MAX_IMAGE_SIZE_BYTES} bytes after &quot;
                &quot;compression attempts&quot;
            )
            return payload

        logger.info(
            f&quot;Image compressed for size limit: {payload.size} -&gt; &quot;
            f&quot;{len(compressed_content)} bytes&quot;
        )
        return FilePayload(
            content=compressed_content,
            original_filename=payload.original_filename,
            size=len(compressed_content),
            source_path=payload.source_path,
        )

    @staticmethod
    def _optimize_jpeg(content: bytes) -&gt; Optional[bytes]:
        jpegtran = shutil.which(&quot;jpegtran&quot;)
        if not jpegtran:
            logger.info(&quot;Skip JPEG lossless optimization because jpegtran is missing&quot;)
            return None

        result = subprocess.run(
            [jpegtran, &quot;-copy&quot;, &quot;none&quot;, &quot;-optimize&quot;],
            input=content,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            check=False,
        )
        if result.returncode != 0:
            logger.warning(f&quot;jpegtran optimization failed: {result.stderr.decode()}&quot;)
            return None

        if ImageOptimizer._has_same_pixels(content, result.stdout):
            return result.stdout
        logger.warning(&quot;jpegtran output skipped because pixel verification failed&quot;)
        return None

    @classmethod
    def _optimize_with_pillow(cls, content: bytes, extension: str) -&gt; Optional[bytes]:
        if Image is None:
            logger.info(&quot;Skip image optimization because Pillow is not installed&quot;)
            return None

        try:
            with Image.open(io.BytesIO(content)) as image:
                if getattr(image, &quot;is_animated&quot;, False):
                    logger.info(&quot;Skip animated GIF optimization&quot;)
                    return None

                output = io.BytesIO()
                if extension == &quot;.png&quot;:
                    image.save(output, format=&quot;PNG&quot;, optimize=True, compress_level=9)
                elif extension == &quot;.webp&quot;:
                    image.save(
                        output,
                        format=&quot;WEBP&quot;,
                        lossless=True,
                        quality=100,
                        method=6,
                    )
                elif extension == &quot;.gif&quot;:
                    image.save(output, format=&quot;GIF&quot;, optimize=True)
                else:
                    return None

            optimized = output.getvalue()
            if cls._has_same_pixels(content, optimized):
                return optimized
        except Exception as exc:
            logger.warning(f&quot;Lossless image optimization skipped: {exc}&quot;)
        return None

    @classmethod
    def _compress_to_limit(
        cls,
        content: bytes,
        extension: str,
        max_bytes: int,
    ) -&gt; Optional[bytes]:
        if Image is None:
            logger.info(&quot;Skip size limit compression because Pillow is not installed&quot;)
            return None

        if extension in cls.JPEG_FORMATS:
            return cls._compress_jpeg_to_limit(content, max_bytes)
        if extension == &quot;.webp&quot;:
            return cls._compress_webp_to_limit(content, max_bytes)
        if extension == &quot;.png&quot;:
            return cls._compress_png_to_limit(content, max_bytes)
        if extension == &quot;.gif&quot;:
            return cls._compress_static_gif_to_limit(content, max_bytes)
        return None

    @staticmethod
    def _compress_jpeg_to_limit(content: bytes, max_bytes: int) -&gt; Optional[bytes]:
        with Image.open(io.BytesIO(content)) as image:
            image = image.convert(&quot;RGB&quot;)
            return ImageOptimizer._quality_search(
                image=image,
                image_format=&quot;JPEG&quot;,
                max_bytes=max_bytes,
                min_quality=35,
                save_options={&quot;optimize&quot;: True, &quot;progressive&quot;: True},
            )

    @staticmethod
    def _compress_webp_to_limit(content: bytes, max_bytes: int) -&gt; Optional[bytes]:
        with Image.open(io.BytesIO(content)) as image:
            image = image.convert(&quot;RGBA&quot; if image.mode == &quot;RGBA&quot; else &quot;RGB&quot;)
            return ImageOptimizer._quality_search(
                image=image,
                image_format=&quot;WEBP&quot;,
                max_bytes=max_bytes,
                min_quality=45,
                save_options={&quot;method&quot;: 6},
            )

    @staticmethod
    def _compress_png_to_limit(content: bytes, max_bytes: int) -&gt; Optional[bytes]:
        candidates = []
        with Image.open(io.BytesIO(content)) as image:
            if getattr(image, &quot;is_animated&quot;, False):
                return None

            for colors in (256, 192, 128, 96, 64, 32):
                output = io.BytesIO()
                palette_image = image.convert(&quot;RGBA&quot;).quantize(
                    colors=colors,
                    method=Image.Quantize.FASTOCTREE,
                )
                palette_image.save(
                    output,
                    format=&quot;PNG&quot;,
                    optimize=True,
                    compress_level=9,
                )
                candidate = output.getvalue()
                if len(candidate) &lt;= max_bytes:
                    candidates.append(candidate)

        return max(candidates, key=len) if candidates else None

    @staticmethod
    def _compress_static_gif_to_limit(content: bytes, max_bytes: int) -&gt; Optional[bytes]:
        candidates = []
        with Image.open(io.BytesIO(content)) as image:
            if getattr(image, &quot;is_animated&quot;, False):
                logger.info(&quot;Skip animated GIF size limit compression&quot;)
                return None

            for colors in (256, 192, 128, 96, 64, 32):
                output = io.BytesIO()
                palette_image = image.convert(&quot;RGB&quot;).quantize(colors=colors)
                palette_image.save(output, format=&quot;GIF&quot;, optimize=True)
                candidate = output.getvalue()
                if len(candidate) &lt;= max_bytes:
                    candidates.append(candidate)

        return max(candidates, key=len) if candidates else None

    @staticmethod
    def _quality_search(
        image: &quot;Image.Image&quot;,
        image_format: str,
        max_bytes: int,
        min_quality: int,
        save_options: dict,
    ) -&gt; Optional[bytes]:
        best = None
        low = min_quality
        high = 95

        while low &lt;= high:
            quality = (low + high) // 2
            output = io.BytesIO()
            image.save(output, format=image_format, quality=quality, **save_options)
            candidate = output.getvalue()

            if len(candidate) &lt;= max_bytes:
                best = candidate
                low = quality + 1
            else:
                high = quality - 1

        return best

    @staticmethod
    def _has_same_pixels(original: bytes, optimized: bytes) -&gt; bool:
        if Image is None:
            return False

        with Image.open(io.BytesIO(original)) as original_image:
            with Image.open(io.BytesIO(optimized)) as optimized_image:
                if original_image.size != optimized_image.size:
                    return False
                return (
                    original_image.convert(&quot;RGBA&quot;).tobytes()
                    == optimized_image.convert(&quot;RGBA&quot;).tobytes()
                )

class DualWriterUploader:
    def __init__(
            self,
            primary_storage: Optional[StorageBackend] = None,
            backup_storage: Optional[StorageBackend] = None,
    ):
        self.primary_storage = primary_storage or R2Storage()
        self.backup_storage = backup_storage or QiniuStorage()

    @staticmethod
    def is_public_domain_url(path_or_url: str) -&gt; bool:
        if not path_or_url.startswith((&quot;http://&quot;, &quot;https://&quot;)):
            return False

        parsed_url = urllib.parse.urlparse(path_or_url)
        return (parsed_url.hostname or &quot;&quot;).lower() == PUBLIC_DOMAIN.lower()

    def upload_payload(self, payload: FilePayload) -&gt; Optional[UploadResult]:
        object_key = self.primary_storage.build_object_key(
            self.primary_storage.generate_md5_filename(
                payload.content,
                payload.original_filename,
            )
        )

        try:
            primary_result = self.primary_storage.upload(payload, object_key)
        except Exception as exc:
            logger.error(f&quot;R2 primary upload failed: {exc}&quot;)
            return None

        if primary_result is None:
            return None

        backup_result = self.backup_storage.upload(payload, object_key)
        if backup_result is None:
            logger.warning(&quot;Qiniu backup upload failed, but primary R2 upload succeeded&quot;)

        return primary_result

    def upload_with_retry(
            self,
            path_or_url: str,
            max_retries: int = 20,
            retry_interval: int = 2,
    ) -&gt; Optional[str]:
        if self.is_public_domain_url(path_or_url):
            logger.info(f&quot;Skip upload for existing public URL: {path_or_url}&quot;)
            return path_or_url

        retries = 0
        while retries &lt;= max_retries:
            try:
                payload = FileLoader.load(path_or_url)
                payload = ImageOptimizer.optimize(payload)
                result = self.upload_payload(payload)
                if result is not None:
                    logger.info(f&quot;File uploaded successfully after {retries} retries&quot;)
                    return result.url
            except Exception as exc:
                logger.error(
                    f&quot;Error uploading file (attempt {retries + 1}/{max_retries}): {exc}&quot;
                )

            retries += 1
            if retries &lt;= max_retries:
                logger.warning(f&quot;Upload failed, retrying ({retries}/{max_retries})...&quot;)
                time.sleep(retry_interval)

        logger.error(f&quot;Failed to upload file after {max_retries} attempts&quot;)
        return None

def main() -&gt; None:
    if len(sys.argv) &lt; 2:
        print(&quot;调用错误, 图片格式不对&quot;)
        sys.exit(1)

    uploader = DualWriterUploader()
    uploaded_urls = []
    for file_path in sys.argv[1:]:
        file_url = uploader.upload_with_retry(file_path)
        if file_url:
            uploaded_urls.append(file_url)

    if uploaded_urls:
        print(&quot;Upload Success:&quot;)
        for file_url in uploaded_urls:
            print(file_url)

if __name__ == &quot;__main__&quot;:
    main()

六、配置Typora 上传图片

打开Typora 设置图像那里,按照如图设置即可,输入脚本命令:

(你的env绝对环境路径)/python picgo_public.py 

最后点击一下验证是否上传成功。

七、总结

好了,今天就先写到这里。

整个迁移过程其实并不复杂,如果过程中有问题,你可以把这篇文章直接复制给AI 帮你来做

迁移完后,我所有博客图片已经全部切换到了 Cloudflare R2。

当然,我个人还是建议不要把所有数据放在单一平台,所以文章里也介绍了双写备份方案。

即使某个平台出现问题,也能快速切换,不至于影响自己的图片数据丢了,同时还能保证图片正常访问。

如果你目前也在使用七牛云、腾讯云 COS、阿里云 OSS 等对象存储,并且同样被流量费用困扰,不妨抽点时间体验一下 Cloudflare R2。

最后,如果大家在迁移过程中遇到问题,欢迎在评论区留言交流。

如果本文对你有所帮助,欢迎 点赞、在看、转发 支持一下。

我是小欣,咱们下篇文章见。

扫码访问小程序中的本文

微信小程序二维码
上一篇 别再被“IP纯净度”忽悠了!这份 IP 检测工具大全请务必收藏
下一篇 实现Token 自由,将你的 ChatGPT 变成 Codex!
淘小欣

淘小欣管理员

做自己擅长的事情,并尽可能做到极致!

本月创作热力图

目录