#!/usr/bin/python2 # # Youtube-upload is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Youtube-upload is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Youtube-upload. If not, see . # # Author: Arnau Sanchez # Websites: http://code.google.com/p/youtube-upload # http://code.google.com/p/tokland """ Upload a video to Youtube from the command-line. $ youtube-upload --title="A.S. Mutter playing" \ --description="Anne Sophie Mutter plays Beethoven" \ --category=Music \ --keywords="mutter, beethoven" \ anne_sophie_mutter.flv www.youtube.com/watch?v=pxzZ-fYjeYs """ import os import sys import locale import optparse import collections import youtube_upload.auth import youtube_upload.upload_video import youtube_upload.categories # http://code.google.com/p/python-progressbar (>= 2.3) try: import progressbar except ImportError: progressbar = None class InvalidCategory(Exception): pass class VideoArgumentMissing(Exception): pass class OptionsMissing(Exception): pass class BadAuthentication(Exception): pass class ParseError(Exception): pass class VideoNotFound(Exception): pass class UnsuccessfulHTTPResponseCode(Exception): pass VERSION = "0.8.0" EXIT_CODES = { # Non-retryable BadAuthentication: 1, VideoArgumentMissing: 2, OptionsMissing: 2, InvalidCategory: 3, ParseError: 5, VideoNotFound: 6, # Retryable UnsuccessfulHTTPResponseCode: 100, } WATCH_VIDEO_URL = "https://www.youtube.com/watch?v={id}" ProgressInfo = collections.namedtuple("ProgressInfo", ["callback", "finish"]) def to_utf8(s): """Re-encode string from the default system encoding to UTF-8.""" current = locale.getpreferredencoding() return s.decode(current).encode("UTF-8") if s and current != "UTF-8" else s def debug(obj, fd=sys.stderr): """Write obj to standard error.""" string = str(obj.encode(get_encoding(fd), "backslashreplace") if isinstance(obj, unicode) else obj) fd.write(string + "\n") def catch_exceptions(exit_codes, fun, *args, **kwargs): """ Catch exceptions on fun(*args, **kwargs) and return the exit code specified in the exit_codes dictionary. Return 0 if no exception is raised. """ try: fun(*args, **kwargs) return 0 except tuple(exit_codes.keys()) as exc: debug("[%s] %s" % (exc.__class__.__name__, exc)) return exit_codes[exc.__class__] def get_encoding(fd): """Guess terminal encoding.""" return fd.encoding or locale.getpreferredencoding() def compact(it): """Filter false (in the truth sense) elements in iterator.""" return filter(bool, it) def tosize(seq, size): """Return list of fixed length from sequence.""" return seq[:size] if len(seq) >= size else (seq + [None] * (size-len(seq))) def first(it): """Return first element in iterable.""" return it.next() def get_progress_info(): """Return a function callback to update the progressbar.""" def _callback(total_size, completed): if not hasattr(bar, "next_update"): bar.maxval = total_size bar.start() bar.update(completed) widgets = [ progressbar.Percentage(), ' ', progressbar.Bar(), ' ', progressbar.ETA(), ' ', progressbar.FileTransferSpeed(), ] bar = progressbar.ProgressBar(widgets=widgets) return ProgressInfo(callback=_callback, finish=bar.finish) def string_to_dict(string): """Return dictionary from string "key1=value1, key2=value2".""" pairs = [s.strip() for s in (string or "").split(",")] return dict(pair.split("=") for pair in pairs) def upload_video(youtube, options, video_path, total_videos, index): """Upload video with index (for split videos).""" title = to_utf8(options.title) description = to_utf8(options.description or "").decode("string-escape") ns = dict(title=title, n=index+1, total=total_videos) complete_title = \ (options.title_template.format(**ns) if total_videos > 1 else title) progress = get_progress_info() if options.category: ids = youtube_upload.categories.IDS if options.category in ids: category_id = str(ids[options.category]) else: msg = "{} is not a valid category".format(options.category) raise InvalidCategory(msg) else: category_id = None body = { "snippet": { "title": complete_title, "tags": map(str.strip, (options.keywords or "").split(",")), "description": description, "categoryId": category_id, }, "status": { "privacyStatus": options.privacy }, "recordingDetails": { "location": string_to_dict(options.location), }, } video_id = youtube_upload.upload_video.upload(youtube, video_path, body, progress_callback=progress.callback, chunksize=16*1024) progress.finish() return video_id def run_main(parser, options, args, output=sys.stdout): """Run the main scripts from the parsed options/args.""" required_options = ["title"] missing = [opt for opt in required_options if not getattr(options, opt)] if missing: parser.print_usage() msg = "Some required option are missing: %s" % ", ".join(missing) raise OptionsMissing(msg) default_client_secrets = \ os.path.join(sys.prefix, "share/youtube_upload/client_secrets.json") home = os.path.expanduser("~") default_credentials = os.path.join(home, ".youtube_upload-credentials.json") client_secrets = options.client_secrets or default_client_secrets credentials = options.credentials_file or default_credentials debug("Using client secrets: {}".format(client_secrets)) debug("Using credentials file: {}".format(credentials)) youtube = youtube_upload.auth.get_resource(client_secrets, credentials) for index, video_path in enumerate(args): video_id = upload_video(youtube, options, video_path, len(args), index) video_url = WATCH_VIDEO_URL.format(id=video_id) debug("Watch URL: {}".format(video_url)) output.write(video_id + "\n") def main(arguments): """Upload video to Youtube.""" usage = """Usage: %prog [OPTIONS] VIDEO_PATH ... Upload videos to youtube.""" parser = optparse.OptionParser(usage, version=VERSION) # Required options parser.add_option('-t', '--title', dest='title', type="string", help='Video(s) title') parser.add_option('-c', '--category', dest='category', type="string", help='Video(s) category') # Optional options parser.add_option('-d', '--description', dest='description', type="string", help='Video(s) description') parser.add_option('', '--keywords', dest='keywords', type="string", help='Video(s) keywords (separated by commas: tag1,tag2,...)') parser.add_option('', '--title-template', dest='title_template', type="string", default="$title [$n/$total]", metavar="STRING", help='Template for multiple videos (default: {title} [{n}/{total}])') parser.add_option('', '--privacy', dest='privacy', metavar="STRING", default="public", help='Privacy status (public | unlisted | private)') parser.add_option('', '--location', dest='location', type="string", default=None, metavar="LAT,LON", help='Video(s) location (latitude=VAL, longitude=VAL, altitude=VAL)."') # Authentication parser.add_option('', '--client-secrets', dest='client_secrets', type="string", help='Client secrets JSON file') parser.add_option('', '--credentials-file', dest='credentials_file', type="string", help='Client secrets JSON file') options, args = parser.parse_args(arguments) run_main(parser, options, args) if __name__ == '__main__': sys.exit(catch_exceptions(EXIT_CODES, main, sys.argv[1:]))