|
| 1 | +import logging |
| 2 | +import enlighten |
| 3 | +import requests |
| 4 | +import threading |
| 5 | + |
| 6 | +from requests.adapters import HTTPAdapter |
| 7 | +from requests.packages.urllib3.util.retry import Retry |
| 8 | + |
| 9 | +from .version import ClientVersion |
| 10 | +from .utils import Utils, PaginatedResponse |
| 11 | +from .bandwidth import NetworkBandwidth, DiskBandwidth |
| 12 | + |
| 13 | + |
| 14 | +class HTTPClient(object): |
| 15 | + def __init__(self): |
| 16 | + # Initialize empty thread object |
| 17 | + self.thread_local = None |
| 18 | + self.client_version = ClientVersion.version() |
| 19 | + self.shared_headers = { |
| 20 | + 'x-frameio-client': 'python/{}'.format(self.client_version) |
| 21 | + } |
| 22 | + # Configure retry strategy (very broad right now) |
| 23 | + self.retry_strategy = Retry( |
| 24 | + total=3, |
| 25 | + backoff_factor=1, |
| 26 | + status_forcelist=[400, 429, 500, 503], |
| 27 | + method_whitelist=["GET", "POST", "PUT", "GET", "DELETE"] |
| 28 | + ) |
| 29 | + # Create real thread |
| 30 | + self._initialize_thread() |
| 31 | + |
| 32 | + def _initialize_thread(self): |
| 33 | + self.thread_local = threading.local() |
| 34 | + |
| 35 | + def _get_session(self): |
| 36 | + if not hasattr(self.thread_local, "session"): |
| 37 | + http = requests.Session() |
| 38 | + adapter = HTTPAdapter(max_retries=self.retry_strategy) |
| 39 | + adapter.add_headers(self.shared_headers) # add version header |
| 40 | + http.mount("https", adapter) |
| 41 | + self.thread_local.session = http |
| 42 | + |
| 43 | + return self.thread_local.session |
| 44 | + |
| 45 | + |
| 46 | +class APIClient(HTTPClient, object): |
| 47 | + def __init__(self, token, host): |
| 48 | + super().__init__() |
| 49 | + self.host = host |
| 50 | + self.token = token |
| 51 | + self._initialize_thread() |
| 52 | + self.session = self._get_session() |
| 53 | + self.auth_header = { |
| 54 | + 'Authorization': 'Bearer {}'.format(self.token), |
| 55 | + } |
| 56 | + |
| 57 | + def _format_api_call(self, endpoint): |
| 58 | + return '{}/v2{}'.format(self.host, endpoint) |
| 59 | + |
| 60 | + def _api_call(self, method, endpoint, payload={}, limit=None): |
| 61 | + r = self.session.request( |
| 62 | + method, |
| 63 | + url=self._format_api_call(endpoint), |
| 64 | + headers=self.auth_header, |
| 65 | + json=payload |
| 66 | + ) |
| 67 | + |
| 68 | + if r.ok: |
| 69 | + if r.headers.get('page-number'): |
| 70 | + if int(r.headers.get('total-pages')) > 1: |
| 71 | + return PaginatedResponse( |
| 72 | + results=r.json(), |
| 73 | + limit=limit, |
| 74 | + page_size=r.headers['per-page'], |
| 75 | + total_pages=r.headers['total-pages'], |
| 76 | + total=r.headers['total'], |
| 77 | + endpoint=endpoint, |
| 78 | + method=method, |
| 79 | + payload=payload, |
| 80 | + client=self |
| 81 | + ) |
| 82 | + if isinstance(r.json(), list): |
| 83 | + return r.json()[:limit] |
| 84 | + |
| 85 | + return r.json() |
| 86 | + |
| 87 | + if r.status_code == 422 and "presentation" in endpoint: |
| 88 | + raise PresentationException |
| 89 | + |
| 90 | + return r.raise_for_status() |
| 91 | + |
| 92 | + def get_specific_page(self, method, endpoint, payload, page): |
| 93 | + """ |
| 94 | + Gets a specific page for that endpoint, used by Pagination Class |
| 95 | +
|
| 96 | + :Args: |
| 97 | + method (string): 'get', 'post' |
| 98 | + endpoint (string): endpoint ('/accounts/<ACCOUNT_ID>/teams') |
| 99 | + payload (dict): Request payload |
| 100 | + page (int): What page to get |
| 101 | + """ |
| 102 | + if method == 'get': |
| 103 | + endpoint = '{}?page={}'.format(endpoint, page) |
| 104 | + return self._api_call(method, endpoint) |
| 105 | + |
| 106 | + if method == 'post': |
| 107 | + payload['page'] = page |
| 108 | + return self._api_call(method, endpoint, payload=payload) |
| 109 | + |
| 110 | + |
| 111 | +class AWSClient(HTTPClient, object): |
| 112 | + def __init__(self, concurrency=None, progress=True): |
| 113 | + super().__init__() # Initialize via inheritance |
| 114 | + self.progress = progress |
| 115 | + if concurrency is not None: |
| 116 | + self.concurrency = concurrency |
| 117 | + else: |
| 118 | + self.concurrency = self.optimize_concurrency() |
| 119 | + |
| 120 | + def optimize_concurrency(self): |
| 121 | + """ |
| 122 | + This method looks as the net_stats and disk_stats that we've run on \ |
| 123 | + the current environment in order to suggest the best optimized \ |
| 124 | + number of concurrent TCP connections. |
| 125 | +
|
| 126 | + Example:: |
| 127 | + AWSClient.optimize_concurrency() |
| 128 | + """ |
| 129 | + |
| 130 | + net_stats = NetworkBandwidth |
| 131 | + disk_stats = DiskBandwidth |
| 132 | + |
| 133 | + # Algorithm ensues |
| 134 | + # |
| 135 | + # |
| 136 | + |
| 137 | + return 5 |
| 138 | + |
| 139 | + @staticmethod |
| 140 | + def get_byte_range(url, start_byte=0, end_byte=2048): |
| 141 | + """ |
| 142 | + Get a specific byte range from a given URL. This is **not** optimized \ |
| 143 | + for heavily-threaded operations currently because it doesn't use a shared \ |
| 144 | + HTTP session object / thread |
| 145 | +
|
| 146 | + :Args: |
| 147 | + url (string): The URL you want to fetch a byte-range from |
| 148 | + start_byte (int): The first byte you want to request |
| 149 | + end_byte (int): The last byte you want to extract |
| 150 | +
|
| 151 | + Example:: |
| 152 | + AWSClient.get_byte_range(asset, "~./Downloads") |
| 153 | + """ |
| 154 | + |
| 155 | + headers = {"Range": "bytes=%d-%d" % (start_byte, end_byte)} |
| 156 | + br = requests.get(url, headers=headers).content |
| 157 | + return br |
| 158 | + |
| 159 | + @staticmethod |
| 160 | + def check_cdn(url): |
| 161 | + # TODO improve this algo |
| 162 | + if 'assets.frame.io' in url: |
| 163 | + return 'Cloudfront' |
| 164 | + elif 's3' in url: |
| 165 | + return 'S3' |
| 166 | + else: |
| 167 | + return None |
| 168 | + |
| 169 | + |
| 170 | +class TransferJob(AWSClient): |
| 171 | + # These will be used to track the job and then push telemetry |
| 172 | + def __init__(self, job_info): |
| 173 | + self.job_info = self.check_cdn(job_info) |
| 174 | + self.cdn = 'S3' # or 'CF' - use check_cdn to confirm |
| 175 | + self.progress_manager = None |
| 176 | + |
| 177 | +class DownloadJob(TransferJob): |
| 178 | + def __init__(self): |
| 179 | + self.asset_type = 'review_link' # we should use a dataclass here |
| 180 | + # Need to create a re-usable job schema |
| 181 | + # Think URL -> output_path |
| 182 | + pass |
| 183 | + |
| 184 | +class UploadJob(TransferJob): |
| 185 | + def __init__(self, destination): |
| 186 | + self.destination = destination |
| 187 | + # Need to create a re-usable job schema |
| 188 | + # Think local_file path and remote Frame.io destination |
| 189 | + pass |
0 commit comments