from sqlalchemy.orm import Session from models import Profile, Video, video_profiles from schemas import VideoCreate def create_video(db: Session, video_data: VideoCreate) -> Video: profile_id = video_data.profile_id data = video_data.model_dump(exclude={"profile_id"}) video = Video(**data) if profile_id: profile = db.query(Profile).filter(Profile.id == profile_id).first() if profile: video.profiles.append(profile) db.add(video) db.commit() db.refresh(video) return video def get_all_videos(db: Session, profile_id: int | None = None) -> list[Video]: query = db.query(Video) if profile_id: query = query.filter(Video.profiles.any(Profile.id == profile_id)) return query.order_by(Video.id.desc()).all() def get_downloaded_videos(db: Session, profile_id: int | None = None) -> list[Video]: query = db.query(Video).filter(Video.file_path.isnot(None)) if profile_id: query = query.filter(Video.profiles.any(Profile.id == profile_id)) return query.order_by(Video.id.desc()).all() def get_video(db: Session, video_id: int) -> Video | None: return db.query(Video).filter(Video.id == video_id).first() def delete_by_youtube_id(db: Session, youtube_id: str): db.query(Video).filter(Video.youtube_url.contains(youtube_id)).delete(synchronize_session=False) db.commit() def update_file_path(db: Session, video_id: int, path: str): video = get_video(db, video_id) if video: video.file_path = path db.commit() def get_all_profiles(db: Session) -> list[Profile]: return db.query(Profile).all()