Files
my-webtoon/downloaders/downloader.py
2025-05-11 18:58:43 +02:00

179 lines
6.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import html
import json
from pathlib import Path
#import pyfilename as pf
import shutil
import time
from httpx import AsyncClient
import requests
from data.special_list import WEBTOON_18_BONUS
class Downloader:
def __init__(self, webtoon_id: any) -> None:
self.webtoon_id = webtoon_id
self.client = AsyncClient()
self.lately_downloaded_episode: list[Path] = []
self.new_webtoon = ""
def download_webtoon(self, url, path:Path) -> None:
self._fetch_information(url)
self.webtoon_path = path / self.title
self.webtoon_path.mkdir(parents=True, exist_ok=True)
self._save_information()
if self.thumbnail_url != "":
self._download_thumbnail()
self._fetch_episode_information()
unobtained_episodes = self._get_unobtained_episodes()
if len(unobtained_episodes) > 0:
self.new_webtoon = self.title
try:
asyncio.run(
self._download_episodes(unobtained_episodes)
)
except Exception as e:
print(f"Error _download_episodes: {e}")
def _fetch_information(self, url) -> None:
pass
def _save_information(self) -> None:
information_path = self.webtoon_path / 'information.json'
save_necessary = True
if information_path.exists():
with open(information_path, "r", encoding='utf-8') as json_file:
existing_information = json.load(json_file)
if (self.author == ""):
save_necessary = False
if (
existing_information["title"] == self.title and
existing_information["author"] == self.author and
existing_information["description"] == self.description and
existing_information["thumbnail"] == self.thumbnail_name
):
save_necessary = False
if (save_necessary):
information = {
"title": self.title,
"author": self.author,
"tag": self.tag,
"description": self.description,
"thumbnail": self.thumbnail_name
}
with open(information_path, 'w', encoding='utf-8') as json_file:
json.dump(information, json_file, ensure_ascii=False, indent=2)
print(f"{information_path} is saved.")
def _download_thumbnail(self) -> None:
thumbnail_path = self.webtoon_path / self.thumbnail_name
if not thumbnail_path.exists():
response = requests.get(self.thumbnail_url)
if response.status_code == 200:
image_raw = response.content
thumbnail_path.write_bytes(image_raw)
print(f"{thumbnail_path} is saved.")
else:
print(response.status_code)
def _fetch_episode_information(self) -> None:
pass
def _get_unobtained_episodes(self) -> list[int]:
downloaded_episodes = []
for dir in self.webtoon_path.glob('*'):
if dir.is_dir():
downloaded_episodes.append(int(dir.name.split('.')[0]))
if self.title == "反派角色只有死亡結局":
downloaded_episodes = [i - 2 for i in downloaded_episodes]
if self.title == "成為我筆下男主角的妻子" or self.title == "領主夫人罷工中":
downloaded_episodes = [i - 1 for i in downloaded_episodes]
if self.title in WEBTOON_18_BONUS:
count = len(self.readablities_index_list) - len(downloaded_episodes)
if count > 0:
episodes = self.readablities_index_list[-count:]
else :
diffrence = set(self.readablities_index_list) - set(downloaded_episodes)
episodes = list(diffrence)
print(f"{self.title} unobtained episodes: {episodes}")
return episodes
async def _download_episodes(self, episode_index_list: list[int]) -> None:
async with self.client:
for episode_index in episode_index_list:
episode_name = self.episode_titles[episode_index]
episode_title = self._get_safe_file_name(episode_index, episode_name)
if self.title == "反派角色只有死亡結局":
episode_title = self._get_safe_file_name(episode_index + 2, episode_name)
if self.title == "成為我筆下男主角的妻子" or self.title == "領主夫人罷工中":
episode_title = self._get_safe_file_name(episode_index + 1, episode_name)
# episode_title = self._get_safe_file_name(f"{episode_index}.{self.episode_titles[episode_index]}")
print(episode_title)
episode_path = self.webtoon_path / episode_title
episode_path.mkdir(parents=True, exist_ok=True)
time.sleep(2)
is_download_sucessful = await self._download_episode(episode_index, episode_path)
if is_download_sucessful:
self.lately_downloaded_episode.append(episode_path)
print(f"Download {self.episode_titles[episode_index]} sucessful.")
else:
print(f"Error _download_episode: {self.episode_titles[episode_index]}")
break
async def _download_episode(self, episode_index: int, episode_path: Path) -> bool:
episode_images_url = self._get_episode_image_urls(episode_index)
if not episode_images_url:
print(f"Failed get image url for: {episode_path}")
return False
try:
await asyncio.gather(
*(
self._download_image(episode_path, element, i)
for i, element in enumerate(episode_images_url)
)
)
except Exception as e:
shutil.rmtree(episode_path)
raise
return True
def _get_episode_image_urls(self, episode_index: int) -> list[str] | None:
pass
async def _download_image(self, episode_path: Path, url: str, image_no: int) -> None:
pass
def _get_safe_file_name(self, episode_index: int, episode_name: str) -> str:
if self.title == '全知讀者視角':
episode_name = f"Ep{episode_name.split('.')[2]}"
episode_name = episode_name.replace("", " (")
episode_name = episode_name.replace("", ")")
elif self.title == '怪力亂神':
episode_name = episode_name.replace('話. ', '')
episode_title = f"{episode_index}.{episode_name}"
return html.unescape(episode_title)