179 lines
6.7 KiB
Python
179 lines
6.7 KiB
Python
import asyncio
|
||
import html
|
||
import json
|
||
from pathlib import Path
|
||
#import pyfilename as pf
|
||
import shutil
|
||
import time
|
||
from httpx import AsyncClient
|
||
import requests
|
||
|
||
from data.special_list import WEBTOON_18_BONUS
|
||
|
||
|
||
class Downloader:
|
||
def __init__(self, webtoon_id: any) -> None:
|
||
self.webtoon_id = webtoon_id
|
||
self.client = AsyncClient()
|
||
self.lately_downloaded_episode: list[Path] = []
|
||
self.new_webtoon = ""
|
||
|
||
|
||
def download_webtoon(self, url, path:Path) -> None:
|
||
self._fetch_information(url)
|
||
self.webtoon_path = path / self.title
|
||
self.webtoon_path.mkdir(parents=True, exist_ok=True)
|
||
|
||
self._save_information()
|
||
if self.thumbnail_url != "":
|
||
self._download_thumbnail()
|
||
|
||
self._fetch_episode_information()
|
||
unobtained_episodes = self._get_unobtained_episodes()
|
||
|
||
if len(unobtained_episodes) > 0:
|
||
self.new_webtoon = self.title
|
||
|
||
try:
|
||
asyncio.run(
|
||
self._download_episodes(unobtained_episodes)
|
||
)
|
||
except Exception as e:
|
||
print(f"Error _download_episodes: {e}")
|
||
|
||
|
||
|
||
def _fetch_information(self, url) -> None:
|
||
pass
|
||
|
||
def _save_information(self) -> None:
|
||
information_path = self.webtoon_path / 'information.json'
|
||
save_necessary = True
|
||
|
||
if information_path.exists():
|
||
with open(information_path, "r", encoding='utf-8') as json_file:
|
||
existing_information = json.load(json_file)
|
||
if (self.author == ""):
|
||
save_necessary = False
|
||
if (
|
||
existing_information["title"] == self.title and
|
||
existing_information["author"] == self.author and
|
||
existing_information["description"] == self.description and
|
||
existing_information["thumbnail"] == self.thumbnail_name
|
||
):
|
||
save_necessary = False
|
||
if (save_necessary):
|
||
information = {
|
||
"title": self.title,
|
||
"author": self.author,
|
||
"tag": self.tag,
|
||
"description": self.description,
|
||
"thumbnail": self.thumbnail_name
|
||
}
|
||
|
||
with open(information_path, 'w', encoding='utf-8') as json_file:
|
||
json.dump(information, json_file, ensure_ascii=False, indent=2)
|
||
print(f"{information_path} is saved.")
|
||
|
||
|
||
def _download_thumbnail(self) -> None:
|
||
thumbnail_path = self.webtoon_path / self.thumbnail_name
|
||
if not thumbnail_path.exists():
|
||
response = requests.get(self.thumbnail_url)
|
||
if response.status_code == 200:
|
||
image_raw = response.content
|
||
thumbnail_path.write_bytes(image_raw)
|
||
print(f"{thumbnail_path} is saved.")
|
||
else:
|
||
print(response.status_code)
|
||
|
||
|
||
def _fetch_episode_information(self) -> None:
|
||
pass
|
||
|
||
def _get_unobtained_episodes(self) -> list[int]:
|
||
downloaded_episodes = []
|
||
|
||
for dir in self.webtoon_path.glob('*'):
|
||
if dir.is_dir():
|
||
downloaded_episodes.append(int(dir.name.split('.')[0]))
|
||
|
||
if self.title == "反派角色只有死亡結局":
|
||
downloaded_episodes = [i - 2 for i in downloaded_episodes]
|
||
if self.title == "成為我筆下男主角的妻子" or self.title == "領主夫人罷工中":
|
||
downloaded_episodes = [i - 1 for i in downloaded_episodes]
|
||
|
||
if self.title in WEBTOON_18_BONUS:
|
||
count = len(self.readablities_index_list) - len(downloaded_episodes)
|
||
if count > 0:
|
||
episodes = self.readablities_index_list[-count:]
|
||
|
||
else :
|
||
diffrence = set(self.readablities_index_list) - set(downloaded_episodes)
|
||
episodes = list(diffrence)
|
||
|
||
print(f"{self.title} unobtained episodes: {episodes}")
|
||
|
||
return episodes
|
||
|
||
async def _download_episodes(self, episode_index_list: list[int]) -> None:
|
||
async with self.client:
|
||
for episode_index in episode_index_list:
|
||
episode_name = self.episode_titles[episode_index]
|
||
episode_title = self._get_safe_file_name(episode_index, episode_name)
|
||
if self.title == "反派角色只有死亡結局":
|
||
episode_title = self._get_safe_file_name(episode_index + 2, episode_name)
|
||
if self.title == "成為我筆下男主角的妻子" or self.title == "領主夫人罷工中":
|
||
episode_title = self._get_safe_file_name(episode_index + 1, episode_name)
|
||
# episode_title = self._get_safe_file_name(f"{episode_index}.{self.episode_titles[episode_index]}")
|
||
print(episode_title)
|
||
episode_path = self.webtoon_path / episode_title
|
||
episode_path.mkdir(parents=True, exist_ok=True)
|
||
time.sleep(2)
|
||
is_download_sucessful = await self._download_episode(episode_index, episode_path)
|
||
if is_download_sucessful:
|
||
self.lately_downloaded_episode.append(episode_path)
|
||
print(f"Download {self.episode_titles[episode_index]} sucessful.")
|
||
else:
|
||
print(f"Error _download_episode: {self.episode_titles[episode_index]}")
|
||
break
|
||
|
||
|
||
async def _download_episode(self, episode_index: int, episode_path: Path) -> bool:
|
||
episode_images_url = self._get_episode_image_urls(episode_index)
|
||
|
||
if not episode_images_url:
|
||
print(f"Failed get image url for: {episode_path}")
|
||
return False
|
||
|
||
try:
|
||
await asyncio.gather(
|
||
*(
|
||
self._download_image(episode_path, element, i)
|
||
for i, element in enumerate(episode_images_url)
|
||
)
|
||
)
|
||
except Exception as e:
|
||
shutil.rmtree(episode_path)
|
||
raise
|
||
|
||
return True
|
||
|
||
|
||
def _get_episode_image_urls(self, episode_index: int) -> list[str] | None:
|
||
pass
|
||
|
||
async def _download_image(self, episode_path: Path, url: str, image_no: int) -> None:
|
||
pass
|
||
|
||
def _get_safe_file_name(self, episode_index: int, episode_name: str) -> str:
|
||
if self.title == '全知讀者視角':
|
||
episode_name = f"Ep{episode_name.split('.')[2]}"
|
||
episode_name = episode_name.replace("(", " (")
|
||
episode_name = episode_name.replace(")", ")")
|
||
elif self.title == '怪力亂神':
|
||
episode_name = episode_name.replace('話. ', '話 ')
|
||
|
||
episode_title = f"{episode_index}.{episode_name}"
|
||
|
||
return html.unescape(episode_title) |