from pathlib import Path from sqlalchemy import Column, Integer, String from sqlalchemy.orm import Session, relationship from database.database import Base from model.profile import Profile from model.profile_video import videoProfiles class Video(Base): __tablename__ = "videos" id = Column(Integer, primary_key=True, autoincrement=True) title = Column(String, nullable=False) youtuber = Column(String, nullable=False) thumbnailUrl = Column("thumbnail_url", String, nullable=False) youtubeUrl = Column("youtube_url", String, nullable=False) filePath = Column("file_path", String, nullable=True) profiles = relationship("Profile", secondary=videoProfiles, backref="videos") @property def isDownloaded(self) -> bool: return self.filePath is not None @property def profileIds(self) -> list[int]: return [p.id for p in self.profiles] @classmethod def deleteIfExists(cls, db: Session, youtubeUrl: str, profileId: int | None): if not profileId: profileId = 1 videos = db.query(cls).filter( cls.youtubeUrl == youtubeUrl, cls.profiles.any(Profile.id == profileId), ).all() for video in videos: db.delete(video) db.commit() @classmethod def create( cls, db: Session, title: str, youtuber: str, thumbnailUrl: str, youtubeUrl: str, profileId: int | None, ) -> "Video": if not profileId: profileId = 1 video = cls( title=title, youtuber=youtuber, thumbnailUrl=thumbnailUrl, youtubeUrl=youtubeUrl, ) profile = db.query(Profile).filter(Profile.id == profileId).first() if profile: video.profiles.append(profile) db.add(video) db.commit() db.refresh(video) return video @classmethod def getAll(cls, db: Session, profileId: int | None = None) -> list["Video"]: query = db.query(cls) if profileId: query = query.filter(cls.profiles.any(Profile.id == profileId)) return query.order_by(cls.id.desc()).all() @classmethod def getDownloaded(cls, db: Session, profileId: int | None = None) -> list["Video"]: query = db.query(cls).filter(cls.filePath.isnot(None)) if profileId: query = query.filter(cls.profiles.any(Profile.id == profileId)) return query.order_by(cls.id.desc()).all() @classmethod def getById(cls, db: Session, videoId: int) -> "Video | None": return db.query(cls).filter(cls.id == videoId).first() @classmethod def updateFilePath(cls, db: Session, videoId: int, path: str | None): video = cls.getById(db, videoId) if video: video.filePath = path db.commit() @classmethod def getValidPath(cls, db: Session, videoId: int): video = cls.getById(db, videoId) if not video or not video.filePath: return None, None path = Path(video.filePath) if not path.exists(): cls.updateFilePath(db, videoId, None) return None, None return path, video @classmethod def deleteServerFile(cls, db: Session, videoId: int) -> bool: video = cls.getById(db, videoId) if not video: return False if video.filePath: path = Path(video.filePath) if path.exists(): path.unlink() cls.updateFilePath(db, videoId, None) return True @classmethod def deleteNotDownloaded(cls, db: Session, profileId: int, excludeIds: list[int] | None = None) -> int: query = db.query(cls).filter( cls.profiles.any(Profile.id == profileId), ) if excludeIds: query = query.filter(cls.id.notin_(excludeIds)) videos = query.all() videoIds = [v.id for v in videos] if not videoIds: return 0 db.execute(videoProfiles.delete().where(videoProfiles.c.video_id.in_(videoIds))) db.query(cls).filter(cls.id.in_(videoIds)).delete(synchronize_session=False) db.commit() return len(videoIds)