import asyncio import html import json from pathlib import Path import pyfilename as pf import shutil import time from httpx import AsyncClient import requests from data.special_list import WEBTOON_18_BONUS class Downloader: def __init__(self, webtoon_id: any) -> None: self.webtoon_id = webtoon_id self.client = AsyncClient() self.lately_downloaded_episode: list[Path] = [] self.new_webtoon = "" def download_webtoon(self, url, path:Path) -> None: self._fetch_information(url) self.webtoon_path = path / self.title self.webtoon_path.mkdir(parents=True, exist_ok=True) self._save_information() if self.thumbnail_url != "": self._download_thumbnail() self._fetch_episode_information() unobtained_episodes = self._get_unobtained_episodes() if len(unobtained_episodes) > 0: self.new_webtoon = self.title try: asyncio.run( self._download_episodes(unobtained_episodes) ) except Exception as e: print(f"Error _download_episodes: {e}") def _fetch_information(self, url) -> None: pass def _save_information(self) -> None: information_path = self.webtoon_path / 'information.json' save_necessary = True if information_path.exists(): with open(information_path, "r", encoding='utf-8') as json_file: existing_information = json.load(json_file) if ( existing_information["title"] == self.title and existing_information["author"] == self.author and existing_information["description"] == self.description and existing_information["thumbnail_name"] == self.thumbnail_name ): save_necessary = False if (save_necessary): information = { "title": self.title, "author": self.author, "tag": self.tag, "description": self.description, "thumbnail_name": self.thumbnail_name } with open(information_path, 'w', encoding='utf-8') as json_file: json.dump(information, json_file, ensure_ascii=False, indent=2) print(f"{information_path} is saved.") def _download_thumbnail(self) -> None: thumbnail_path = self.webtoon_path / self.thumbnail_name if not thumbnail_path.exists(): response = requests.get(self.thumbnail_url) if response.status_code == 200: image_raw = response.content thumbnail_path.write_bytes(image_raw) print(f"{thumbnail_path} is saved.") else: print(response.status_code) def _fetch_episode_information(self) -> None: pass def _get_unobtained_episodes(self) -> list[int]: downloaded_episodes = [] for dir in self.webtoon_path.glob('*'): if dir.is_dir(): downloaded_episodes.append(int(dir.name.split('.')[0])) if self.title in WEBTOON_18_BONUS: count = len(self.readablities_index_list) - len(downloaded_episodes) if count > 0: episodes = self.readablities_index_list[-count:] else : diffrence = set(self.readablities_index_list) - set(downloaded_episodes) episodes = list(diffrence) print(f"{self.title} unobtained episodes: {episodes}") return episodes async def _download_episodes(self, episode_index_list: list[int]) -> None: async with self.client: for episode_index in episode_index_list: episode_name = self.episode_titles[episode_index] episode_title = self._get_safe_file_name(episode_index, episode_name) # episode_title = self._get_safe_file_name(f"{episode_index}.{self.episode_titles[episode_index]}") print(episode_title) episode_path = self.webtoon_path / episode_title episode_path.mkdir(parents=True, exist_ok=True) time.sleep(2) is_download_sucessful = await self._download_episode(episode_index, episode_path) if is_download_sucessful: self.lately_downloaded_episode.append(episode_path) print(f"Download {self.episode_titles[episode_index]} sucessful.") else: print(f"Error _download_episode: {self.episode_titles[episode_index]}") break async def _download_episode(self, episode_index: int, episode_path: Path) -> bool: episode_images_url = self._get_episode_image_urls(episode_index) if not episode_images_url: print(f"Failed get image url for: {episode_path}") return False try: await asyncio.gather( *( self._download_image(episode_path, element, i) for i, element in enumerate(episode_images_url) ) ) except Exception as e: shutil.rmtree(episode_path) raise return True def _get_episode_image_urls(self, episode_index: int) -> list[str] | None: pass async def _download_image(self, episode_path: Path, url: str, image_no: int) -> None: pass def _get_safe_file_name(self, episode_index: int, episode_name: str) -> str: if self.title == '全知讀者視角': episode_name = f"Ep{episode_name.split('.')[2]}" episode_name = episode_name.replace("(", " (") episode_name = episode_name.replace(")", ")") elif self.title == '怪力亂神': episode_name = episode_name.replace('話. ', '話 ') episode_title = f"{episode_index}.{episode_name}" return pf.convert(html.unescape(episode_title))