Przeglądaj źródła

Add a rate limiter to delay requests after errors 429 or of connection

jherve 1 rok temu
rodzic
commit
86667c97a8
2 zmienionych plików z 70 dodań i 4 usunięć
  1. 2 0
      settings.toml
  2. 68 4
      src/de_quoi_parle_le_monde/internet_archive.py

+ 2 - 0
settings.toml

@@ -3,3 +3,5 @@ database_url="sqlite:///test.db"
 [internet_archive]
 [internet_archive]
 limiter_max_rate=1.0
 limiter_max_rate=1.0
 limiter_time_period=1.0
 limiter_time_period=1.0
+relaxation_time_after_error_429=60
+relaxation_time_after_error_connect=60

+ 68 - 4
src/de_quoi_parle_le_monde/internet_archive.py

@@ -1,8 +1,16 @@
+import pickle
+from pathlib import Path
 from attrs import frozen, field
 from attrs import frozen, field
 from typing import Optional, ClassVar, NewType
 from typing import Optional, ClassVar, NewType
 from datetime import date, datetime, timedelta
 from datetime import date, datetime, timedelta
+from loguru import logger
 import cattrs
 import cattrs
-from aiohttp.client import ClientSession, TCPConnector
+from aiohttp.client import (
+    ClientSession,
+    TCPConnector,
+    ClientResponseError,
+    ClientConnectorError,
+)
 from aiolimiter import AsyncLimiter
 from aiolimiter import AsyncLimiter
 
 
 from config import settings
 from config import settings
@@ -108,10 +116,52 @@ class RateLimitedConnector(TCPConnector):
             return await super().connect(req, *args, **kwargs)
             return await super().connect(req, *args, **kwargs)
 
 
 
 
+@frozen
+class ErrorRateLimiter:
+    name: str
+    file_path: Path
+    relaxation_duration: timedelta
+
+    def notify(self):
+        with open(self.file_path, "wb") as f:
+            pickle.dump(datetime.now(), f)
+
+    def raise_if_not_relaxed(self):
+        time_to_relaxation_end = self._delay_since_last_error - self.relaxation_duration
+        if time_to_relaxation_end < timedelta(0):
+            raise RuntimeError(
+                "Relaxation duration not yet elapsed after last "
+                f"error '{self.name}' that occurred {self._delay_since_last_error} ago"
+                f"\nPlease wait another {-time_to_relaxation_end}"
+            )
+
+    @property
+    def _delay_since_last_error(self):
+        try:
+            with open(self.file_path, "rb") as f:
+                last_error_dt = pickle.load(f)
+        except FileNotFoundError:
+            last_error_dt = datetime.min
+
+        return datetime.now() - last_error_dt
+
+
 @frozen
 @frozen
 class InternetArchiveClient:
 class InternetArchiveClient:
     # https://github.com/internetarchive/wayback/tree/master/wayback-cdx-server
     # https://github.com/internetarchive/wayback/tree/master/wayback-cdx-server
     session: ClientSession
     session: ClientSession
+    error_429_rate_limiter: ClassVar[ErrorRateLimiter] = ErrorRateLimiter(
+        "429 HTTP",
+        Path("./error_file_429.pickle"),
+        timedelta(seconds=settings.internet_archive.relaxation_time_after_error_429),
+    )
+    error_connect_rate_limiter: ClassVar[ErrorRateLimiter] = ErrorRateLimiter(
+        "Connection",
+        Path("./error_file_connect.pickle"),
+        timedelta(
+            seconds=settings.internet_archive.relaxation_time_after_error_connect
+        ),
+    )
     search_url: ClassVar[str] = "http://web.archive.org/cdx/search/cdx"
     search_url: ClassVar[str] = "http://web.archive.org/cdx/search/cdx"
 
 
     async def search_snapshots(
     async def search_snapshots(
@@ -158,9 +208,23 @@ class InternetArchiveClient:
         return await self.session.__aexit__(exc_type, exc, tb)
         return await self.session.__aexit__(exc_type, exc, tb)
 
 
     async def _get(self, url, params=None):
     async def _get(self, url, params=None):
-        async with self.session.get(url, allow_redirects=True, params=params) as resp:
-            resp.raise_for_status()
-            return await resp.text()
+        self.error_429_rate_limiter.raise_if_not_relaxed()
+        self.error_connect_rate_limiter.raise_if_not_relaxed()
+
+        try:
+            async with self.session.get(
+                url, allow_redirects=True, params=params
+            ) as resp:
+                try:
+                    resp.raise_for_status()
+                    return await resp.text()
+                except ClientResponseError as e:
+                    if e.code == 429:
+                        self.error_429_rate_limiter.notify()
+                    raise e
+        except ClientConnectorError as e:
+            self.error_connect_rate_limiter.notify()
+            raise e
 
 
     @staticmethod
     @staticmethod
     def create():
     def create():