from sqlalchemy import Column, Integer, String from sqlalchemy.orm import Session, relationship from database.database import Base from model.profile import Profile from model.profile_video import video_profiles class Video(Base): __tablename__ = "videos" id = Column(Integer, primary_key=True, autoincrement=True) title = Column(String, nullable=False) youtuber = Column(String, nullable=False) thumbnail_url = Column(String, nullable=False) youtube_url = Column(String, nullable=False) file_path = Column(String, nullable=True) profiles = relationship("Profile", secondary=video_profiles, backref="videos") @classmethod def create_from_dict(cls, db: Session, data: dict, profile_id: int | None) -> "Video": video = cls(**data) if not profile_id: profile_id = 1 profile = db.query(Profile).filter(Profile.id == profile_id).first() if profile: video.profiles.append(profile) db.add(video) db.commit() db.refresh(video) return video @classmethod def get_all(cls, db: Session, profile_id: int | None = None) -> list["Video"]: query = db.query(cls) if profile_id: query = query.filter(cls.profiles.any(Profile.id == profile_id)) return query.order_by(cls.id.desc()).all() @classmethod def get_downloaded(cls, db: Session, profile_id: int | None = None) -> list["Video"]: query = db.query(cls).filter(cls.file_path.isnot(None)) if profile_id: query = query.filter(cls.profiles.any(Profile.id == profile_id)) return query.order_by(cls.id.desc()).all() @classmethod def get_by_id(cls, db: Session, video_id: int) -> "Video | None": return db.query(cls).filter(cls.id == video_id).first() @classmethod def delete_by_youtube_id(cls, db: Session, youtube_id: str): db.query(cls).filter(cls.youtube_url.contains(youtube_id)).delete(synchronize_session=False) db.commit() @classmethod def update_file_path(cls, db: Session, video_id: int, path: str | None): video = cls.get_by_id(db, video_id) if video: video.file_path = path db.commit() @classmethod def delete_not_downloaded(cls, db: Session, profile_id: int, exclude_ids: list[int] | None = None) -> int: query = db.query(cls).filter( cls.profiles.any(Profile.id == profile_id), ) if exclude_ids: query = query.filter(cls.id.notin_(exclude_ids)) videos = query.all() video_ids = [v.id for v in videos] if not video_ids: return 0 db.execute(video_profiles.delete().where(video_profiles.c.video_id.in_(video_ids))) db.query(cls).filter(cls.id.in_(video_ids)).delete(synchronize_session=False) db.commit() return len(video_ids)