143 lines
4.4 KiB
Python
143 lines
4.4 KiB
Python
from concurrent.futures import ThreadPoolExecutor
|
|
import csv
|
|
from datetime import datetime
|
|
|
|
from vcinema_utils.VCinemaFilm import VCinemaFilm
|
|
from imdb_utils import IMDbUtils
|
|
from bookstack import Bookstack
|
|
|
|
|
|
JACKNET_WIKI_URL = "https://wiki.jacknet.io"
|
|
|
|
# Page ID of https://wiki.jacknet.io/books/vcinema/page/csv
|
|
CSV_PAGE_ID = 11
|
|
|
|
|
|
def get_viewings_csv_attachment_id(token_id, token_secret):
|
|
attachments = Bookstack.get_attachments(JACKNET_WIKI_URL, token_id, token_secret)
|
|
|
|
viewings_csv_file_name = "vcinema.csv"
|
|
|
|
return next((x['id'] for x in attachments if x['uploaded_to'] == CSV_PAGE_ID and x['name'] == viewings_csv_file_name), None)
|
|
|
|
|
|
def get_vcinema_viewings(token_id, token_secret):
|
|
attachment_id = get_viewings_csv_attachment_id(token_id, token_secret)
|
|
|
|
viewings_csv = Bookstack.get_attachment(JACKNET_WIKI_URL, token_id, token_secret, attachment_id)
|
|
viewings_csv = viewings_csv.decode("utf-8")
|
|
|
|
viewings_csv_rows = viewings_csv.strip().split("\n")
|
|
|
|
viewings = list(csv.DictReader(viewings_csv_rows, quotechar='"'))
|
|
|
|
return viewings
|
|
|
|
|
|
def get_vcinema_films(token_id, token_secret):
|
|
viewings = get_vcinema_viewings(token_id, token_secret)
|
|
films = {}
|
|
|
|
for viewing in viewings:
|
|
imdb_id = viewing["imdb_id"]
|
|
title = viewing["title"]
|
|
|
|
if imdb_id not in films.keys():
|
|
film = VCinemaFilm(imdb_id=imdb_id, title=title)
|
|
films[imdb_id] = film
|
|
|
|
date_watched = datetime.strptime(viewing['date_watched'], "%Y-%m-%d")
|
|
season = viewing['season']
|
|
rating = viewing['rating']
|
|
|
|
films[imdb_id].add_viewing(date_watched, season, rating)
|
|
|
|
return list(films.values())
|
|
|
|
|
|
def add_imdb_data(imdb_id, films, data_fields, progressbar=None):
|
|
movie = IMDbUtils.get_movie(imdb_id)
|
|
|
|
for film in films:
|
|
if film.get_imdb_id() == movie.movieID:
|
|
for field_name in data_fields:
|
|
if field_name in movie:
|
|
film.add_imdb_data(field_name, movie[field_name])
|
|
|
|
if progressbar is not None:
|
|
progressbar.next()
|
|
|
|
|
|
def add_imdb_keywords(imdb_id, films, progressbar=None):
|
|
movie = IMDbUtils.get_movie_keywords(imdb_id)
|
|
|
|
for film in films:
|
|
if film.get_imdb_id() == movie.movieID:
|
|
if 'keywords' in movie:
|
|
film.add_imdb_data('keywords', movie['keywords'])
|
|
|
|
if progressbar is not None:
|
|
progressbar.next()
|
|
|
|
|
|
def add_imdb_data_to_films(films, field_names, progress_bar=None):
|
|
with ThreadPoolExecutor(4) as executor:
|
|
future_imdb_tasks = set()
|
|
|
|
if ('keywords' in field_names and len(field_names) > 1) or ('keywords' not in field_names and len(field_names) > 0):
|
|
future_imdb_tasks.update(executor.submit(add_imdb_data, film.get_imdb_id(), films, field_names, progress_bar) for film in films)
|
|
|
|
if 'keywords' in field_names:
|
|
future_imdb_tasks.update(executor.submit(add_imdb_keywords, film.get_imdb_id(), films, progress_bar) for film in films)
|
|
|
|
progress_bar.max = len(future_imdb_tasks)
|
|
|
|
if progress_bar is not None:
|
|
progress_bar.finish()
|
|
|
|
|
|
def filter_films(films: [VCinemaFilm], field: str) -> [VCinemaFilm]:
|
|
films_filtered = {}
|
|
|
|
for film in films:
|
|
if film.get_imdb_data(field) is not None:
|
|
field_value = film.get_imdb_data(field)
|
|
if isinstance(field_value, list):
|
|
for value in list(field_value):
|
|
if value in films_filtered.keys():
|
|
films_filtered[value] += [film]
|
|
else:
|
|
films_filtered[value] = [film]
|
|
else:
|
|
if field_value in films_filtered.keys():
|
|
films_filtered[field_value] += [film]
|
|
else:
|
|
films_filtered[field_value] = [film]
|
|
|
|
return films_filtered
|
|
|
|
|
|
def get_film_list(films: [VCinemaFilm]) -> str:
|
|
film_links = []
|
|
|
|
for film in films:
|
|
film_link = film.get_imdb_link()
|
|
film_links.append(film_link)
|
|
|
|
if len(film_links) > 0:
|
|
return "<br>".join(film_links)
|
|
else:
|
|
return ""
|
|
|
|
|
|
def generate_markdown_link(text, url) -> str:
|
|
return "[{}]({})".format(text, url)
|
|
|
|
|
|
def generate_wikipedia_page_link(page_title):
|
|
return generate_markdown_link(page_title, generate_wikipedia_url(page_title))
|
|
|
|
|
|
def generate_wikipedia_url(page_title):
|
|
return "https://en.wikipedia.org/wiki/{}".format(page_title.replace(" ", "_"))
|