import asyncio
import html
import json
from pathlib import Path
#import pyfilename as pf
import shutil
import time
from httpx import AsyncClient
import requests
from data.special_list import WEBTOON_18_BONUS
class Downloader:
def __init__(self, webtoon_id: any) -> None:
self.webtoon_id = webtoon_id
self.client = AsyncClient()
self.lately_downloaded_episode: list[Path] = []
self.new_webtoon = ""
def download_webtoon(self, url, path:Path) -> None:
self._fetch_information(url)
self.webtoon_path = path / self.title
self.webtoon_path.mkdir(parents=True, exist_ok=True)
self._save_information()
if self.thumbnail_url != "":
self._download_thumbnail()
self._fetch_episode_information()
unobtained_episodes = self._get_unobtained_episodes()
if len(unobtained_episodes) > 0:
self.new_webtoon = self.title
try:
asyncio.run(
self._download_episodes(unobtained_episodes)
)
except Exception as e:
print(f"Error _download_episodes: {e}")
def _fetch_information(self, url) -> None:
pass
def _save_information(self) -> None:
information_path = self.webtoon_path / 'information.json'
save_necessary = True
if information_path.exists():
with open(information_path, "r", encoding='utf-8') as json_file:
existing_information = json.load(json_file)
if (self.author == ""):
save_necessary = False
if (
existing_information["title"] == self.title and
existing_information["author"] == self.author and
existing_information["description"] == self.description and
existing_information["thumbnail"] == self.thumbnail_name
):
save_necessary = False
if (save_necessary):
information = {
"title": self.title,
"author": self.author,
"tag": self.tag,
"description": self.description,
"thumbnail": self.thumbnail_name
}
with open(information_path, 'w', encoding='utf-8') as json_file:
json.dump(information, json_file, ensure_ascii=False, indent=2)
print(f"{information_path} is saved.")
def _download_thumbnail(self) -> None:
thumbnail_path = self.webtoon_path / self.thumbnail_name
if not thumbnail_path.exists():
response = requests.get(self.thumbnail_url)
if response.status_code == 200:
image_raw = response.content
thumbnail_path.write_bytes(image_raw)
print(f"{thumbnail_path} is saved.")
else:
print(response.status_code)
def _fetch_episode_information(self) -> None:
pass
def _get_unobtained_episodes(self) -> list[int]:
downloaded_episodes = []
for dir in self.webtoon_path.glob('*'):
if dir.is_dir():
downloaded_episodes.append(int(dir.name.split('.')[0]))
if self.title == "反派角色只有死亡結局":
downloaded_episodes = [i - 2 for i in downloaded_episodes]
if self.title == "成為我筆下男主角的妻子" or self.title == "領主夫人罷工中":
downloaded_episodes = [i - 1 for i in downloaded_episodes]
if self.title in WEBTOON_18_BONUS:
count = len(self.readablities_index_list) - len(downloaded_episodes)
if count > 0:
episodes = self.readablities_index_list[-count:]
else :
diffrence = set(self.readablities_index_list) - set(downloaded_episodes)
episodes = list(diffrence)
print(f"{self.title} unobtained episodes: {episodes}")
return episodes
async def _download_episodes(self, episode_index_list: list[int]) -> None:
async with self.client:
for episode_index in episode_index_list:
episode_name = self.episode_titles[episode_index]
episode_title = self._get_safe_file_name(episode_index, episode_name)
if self.title == "反派角色只有死亡結局":
episode_title = self._get_safe_file_name(episode_index + 2, episode_name)
if self.title == "成為我筆下男主角的妻子" or self.title == "領主夫人罷工中":
episode_title = self._get_safe_file_name(episode_index + 1, episode_name)
# episode_title = self._get_safe_file_name(f"{episode_index}.{self.episode_titles[episode_index]}")
print(episode_title)
episode_path = self.webtoon_path / episode_title
episode_path.mkdir(parents=True, exist_ok=True)
time.sleep(2)
is_download_sucessful = await self._download_episode(episode_index, episode_path)
if is_download_sucessful:
self.lately_downloaded_episode.append(episode_path)
print(f"Download {self.episode_titles[episode_index]} sucessful.")
else:
print(f"Error _download_episode: {self.episode_titles[episode_index]}")
break
async def _download_episode(self, episode_index: int, episode_path: Path) -> bool:
episode_images_url = self._get_episode_image_urls(episode_index)
if not episode_images_url:
print(f"Failed get image url for: {episode_path}")
return False
try:
await asyncio.gather(
*(
self._download_image(episode_path, element, i)
for i, element in enumerate(episode_images_url)
)
)
except Exception as e:
shutil.rmtree(episode_path)
raise
return True
def _get_episode_image_urls(self, episode_index: int) -> list[str] | None:
pass
async def _download_image(self, episode_path: Path, url: str, image_no: int) -> None:
pass
def _get_safe_file_name(self, episode_index: int, episode_name: str) -> str:
if self.title == '全知讀者視角':
episode_name = f"Ep{episode_name.split('.')[2]}"
episode_name = episode_name.replace("(", " (")
episode_name = episode_name.replace(")", ")")
elif self.title == '怪力亂神':
episode_name = episode_name.replace('話. ', '話 ')
episode_title = f"{episode_index}.{episode_name}"
return html.unescape(episode_title)