This commit is contained in:
Marek Lenczewski
2026-04-07 17:55:30 +02:00
parent 8f15f51bce
commit ca988345e9
19 changed files with 201 additions and 203 deletions

View File

@@ -11,5 +11,5 @@ class Profile(Base):
name = Column(String, nullable=False, unique=True)
@classmethod
def get_all(cls, db: Session) -> list["Profile"]:
def getAll(cls, db: Session) -> list["Profile"]:
return db.query(cls).all()

View File

@@ -2,7 +2,7 @@ from sqlalchemy import Column, ForeignKey, Integer, Table
from database.database import Base
video_profiles = Table(
videoProfiles = Table(
"video_profiles",
Base.metadata,
Column("video_id", Integer, ForeignKey("videos.id", ondelete="CASCADE")),

View File

@@ -3,7 +3,7 @@ from sqlalchemy.orm import Session, relationship
from database.database import Base
from model.profile import Profile
from model.profile_video import video_profiles
from model.profile_video import videoProfiles
class Video(Base):
@@ -12,17 +12,42 @@ class Video(Base):
id = Column(Integer, primary_key=True, autoincrement=True)
title = Column(String, nullable=False)
youtuber = Column(String, nullable=False)
thumbnail_url = Column(String, nullable=False)
youtube_url = Column(String, nullable=False)
file_path = Column(String, nullable=True)
profiles = relationship("Profile", secondary=video_profiles, backref="videos")
thumbnailUrl = Column("thumbnail_url", String, nullable=False)
youtubeUrl = Column("youtube_url", String, nullable=False)
filePath = Column("file_path", String, nullable=True)
profiles = relationship("Profile", secondary=videoProfiles, backref="videos")
@classmethod
def create_from_dict(cls, db: Session, data: dict, profile_id: int | None) -> "Video":
video = cls(**data)
if not profile_id:
profile_id = 1
profile = db.query(Profile).filter(Profile.id == profile_id).first()
def deleteIfExists(cls, db: Session, youtubeUrl: str, profileId: int | None):
if not profileId:
profileId = 1
videos = db.query(cls).filter(
cls.youtubeUrl == youtubeUrl,
cls.profiles.any(Profile.id == profileId),
).all()
for video in videos:
db.delete(video)
db.commit()
@classmethod
def create(
cls,
db: Session,
title: str,
youtuber: str,
thumbnailUrl: str,
youtubeUrl: str,
profileId: int | None,
) -> "Video":
if not profileId:
profileId = 1
video = cls(
title=title,
youtuber=youtuber,
thumbnailUrl=thumbnailUrl,
youtubeUrl=youtubeUrl,
)
profile = db.query(Profile).filter(Profile.id == profileId).first()
if profile:
video.profiles.append(profile)
db.add(video)
@@ -31,47 +56,42 @@ class Video(Base):
return video
@classmethod
def get_all(cls, db: Session, profile_id: int | None = None) -> list["Video"]:
def getAll(cls, db: Session, profileId: int | None = None) -> list["Video"]:
query = db.query(cls)
if profile_id:
query = query.filter(cls.profiles.any(Profile.id == profile_id))
if profileId:
query = query.filter(cls.profiles.any(Profile.id == profileId))
return query.order_by(cls.id.desc()).all()
@classmethod
def get_downloaded(cls, db: Session, profile_id: int | None = None) -> list["Video"]:
query = db.query(cls).filter(cls.file_path.isnot(None))
if profile_id:
query = query.filter(cls.profiles.any(Profile.id == profile_id))
def getDownloaded(cls, db: Session, profileId: int | None = None) -> list["Video"]:
query = db.query(cls).filter(cls.filePath.isnot(None))
if profileId:
query = query.filter(cls.profiles.any(Profile.id == profileId))
return query.order_by(cls.id.desc()).all()
@classmethod
def get_by_id(cls, db: Session, video_id: int) -> "Video | None":
return db.query(cls).filter(cls.id == video_id).first()
def getById(cls, db: Session, videoId: int) -> "Video | None":
return db.query(cls).filter(cls.id == videoId).first()
@classmethod
def delete_by_youtube_id(cls, db: Session, youtube_id: str):
db.query(cls).filter(cls.youtube_url.contains(youtube_id)).delete(synchronize_session=False)
db.commit()
@classmethod
def update_file_path(cls, db: Session, video_id: int, path: str | None):
video = cls.get_by_id(db, video_id)
def updateFilePath(cls, db: Session, videoId: int, path: str | None):
video = cls.getById(db, videoId)
if video:
video.file_path = path
video.filePath = path
db.commit()
@classmethod
def delete_not_downloaded(cls, db: Session, profile_id: int, exclude_ids: list[int] | None = None) -> int:
def deleteNotDownloaded(cls, db: Session, profileId: int, excludeIds: list[int] | None = None) -> int:
query = db.query(cls).filter(
cls.profiles.any(Profile.id == profile_id),
cls.profiles.any(Profile.id == profileId),
)
if exclude_ids:
query = query.filter(cls.id.notin_(exclude_ids))
if excludeIds:
query = query.filter(cls.id.notin_(excludeIds))
videos = query.all()
video_ids = [v.id for v in videos]
if not video_ids:
videoIds = [v.id for v in videos]
if not videoIds:
return 0
db.execute(video_profiles.delete().where(video_profiles.c.video_id.in_(video_ids)))
db.query(cls).filter(cls.id.in_(video_ids)).delete(synchronize_session=False)
db.execute(videoProfiles.delete().where(videoProfiles.c.video_id.in_(videoIds)))
db.query(cls).filter(cls.id.in_(videoIds)).delete(synchronize_session=False)
db.commit()
return len(video_ids)
return len(videoIds)