use external pleroma.py

This commit is contained in:
io 2023-01-11 06:23:42 +00:00
parent 82320560a1
commit 03305c4a55
4 changed files with 18 additions and 210 deletions

View file

@ -10,19 +10,13 @@ import operator
import aiosqlite
import contextlib
from yarl import URL
from pleroma import Pleroma
from pleroma import Pleroma, HandleRateLimits
from bs4 import BeautifulSoup
from functools import partial
from typing import Iterable, NewType
from utils import shield, HandleRateLimits, suppress
from utils import shield, suppress, http_session_factory
from third_party.utils import extract_post_content
USER_AGENT = (
'pleroma-ebooks; '
f'{aiohttp.__version__}; '
f'{platform.python_implementation()}/{platform.python_version()}'
)
UTC = pendulum.timezone('UTC')
JSON_CONTENT_TYPE = 'application/json'
ACTIVITYPUB_CONTENT_TYPE = 'application/activity+json'
@ -40,11 +34,8 @@ class PostFetcher:
Pleroma(api_base_url=self.config['site'], access_token=self.config['access_token']),
)
self._http = await stack.enter_async_context(
aiohttp.ClientSession(
headers={
'User-Agent': USER_AGENT,
'Accept': ', '.join([JSON_CONTENT_TYPE, ACTIVITYPUB_CONTENT_TYPE]),
},
http_session_factory(
headers={'Accept': ', '.join([JSON_CONTENT_TYPE, ACTIVITYPUB_CONTENT_TYPE])},
trust_env=True,
raise_for_status=True,
),

View file

@ -1,164 +0,0 @@
# SPDX-License-Identifier: AGPL-3.0-only
import sys
import yarl
import json
import hashlib
import aiohttp
from http import HTTPStatus
def http_session_factory(headers={}):
py_version = '.'.join(map(str, sys.version_info))
user_agent = (
'pleroma-ebooks (https://github.com/ioistired/pleroma-ebooks); '
'aiohttp/{aiohttp.__version__}; '
'python/{py_version}'
)
return aiohttp.ClientSession(
headers={'User-Agent': user_agent, **headers},
)
class BadRequest(Exception):
pass
class BadResponse(Exception):
pass
class Pleroma:
def __init__(self, *, api_base_url, access_token):
self.api_base_url = api_base_url.rstrip('/')
self.access_token = access_token
self._session = http_session_factory({'Authorization': 'Bearer ' + access_token})
self._logged_in_id = None
async def __aenter__(self):
self._session = await self._session.__aenter__()
return self
async def __aexit__(self, *excinfo):
return await self._session.__aexit__(*excinfo)
async def request(self, method, path, **kwargs):
# blocklist of some horrible instances
if hashlib.sha256(
yarl.URL(self.api_base_url).host.encode()
+ bytes.fromhex('d590e3c48d599db6776e89dfc8ebaf53c8cd84866a76305049d8d8c5d4126ce1')
).hexdigest() in {
'56704d4d95b882e81c8e7765e9079be0afc4e353925ba9add8fd65976f52db83',
'1932431fa41a0baaccce7815115b01e40e0237035bb155713712075b887f5a19',
'a42191105a9f3514a1d5131969c07a95e06d0fdf0058f18e478823bf299881c9',
}:
raise RuntimeError('stop being a chud')
async with self._session.request(method, self.api_base_url + path, **kwargs) as resp:
if resp.status == HTTPStatus.BAD_REQUEST:
raise BadRequest((await resp.json())['error'])
if resp.status == HTTPStatus.INTERNAL_SERVER_ERROR:
raise BadResponse((await resp.json()))
#resp.raise_for_status()
return await resp.json()
async def verify_credentials(self):
return await self.request('GET', '/api/v1/accounts/verify_credentials')
me = verify_credentials
async def _get_logged_in_id(self):
if self._logged_in_id is None:
self._logged_in_id = (await self.me())['id']
return self._logged_in_id
async def following(self, account_id=None):
account_id = account_id or await self._get_logged_in_id()
return await self.request('GET', f'/api/v1/accounts/{account_id}/following')
@staticmethod
def _unpack_id(obj):
if isinstance(obj, dict) and 'id' in obj:
return obj['id']
return obj
async def status_context(self, id):
id = self._unpack_id(id)
return await self.request('GET', f'/api/v1/statuses/{id}/context')
async def post(self, content, *, in_reply_to_id=None, cw=None, visibility=None):
if visibility not in {None, 'private', 'public', 'unlisted', 'direct'}:
raise ValueError('invalid visibility', visibility)
data = dict(status=content)
if in_reply_to_id := self._unpack_id(in_reply_to_id):
data['in_reply_to_id'] = in_reply_to_id
if visibility is not None:
data['visibility'] = visibility
# normally, this would be a check against None.
# however, apparently Pleroma serializes posts without CWs as posts with an empty string
# as a CW, so per the robustness principle we'll accept that too.
if cw:
data['spoiler_text'] = cw
return await self.request('POST', '/api/v1/statuses', data=data)
async def reply(self, to_status, content, *, cw=None):
user_id = await self._get_logged_in_id()
mentioned_accounts = {}
mentioned_accounts[to_status['account']['id']] = to_status['account']['acct']
for account in to_status['mentions']:
if account['id'] != user_id and account['id'] not in mentioned_accounts:
mentioned_accounts[account['id']] = account['acct']
content = ''.join('@' + x + ' ' for x in mentioned_accounts.values()) + content
visibility = 'unlisted' if to_status['visibility'] == 'public' else to_status['visibility']
if not cw and 'spoiler_text' in to_status and to_status['spoiler_text']:
cw = 're: ' + to_status['spoiler_text']
return await self.post(content, in_reply_to_id=to_status['id'], cw=cw, visibility=visibility)
async def favorite(self, id):
id = self._unpack_id(id)
return await self.request('POST', f'/api/v1/statuses/{id}/favourite')
async def unfavorite(self, id):
id = self._unpack_id(id)
return await self.request('POST', f'/api/v1/statuses/{id}/unfavourite')
async def react(self, id, reaction):
id = self._unpack_id(id)
return await self.request('PUT', f'/api/v1/pleroma/statuses/{id}/reactions/{reaction}')
async def remove_reaction(self, id, reaction):
id = self._unpack_id(id)
return await self.request('DELETE', f'/api/v1/pleroma/statuses/{id}/reactions/{reaction}')
async def pin(self, id):
id = self._unpack_id(id)
return await self.request('POST', f'/api/v1/statuses/{id}/pin')
async def unpin(self, id):
id = self._unpack_id(id)
return await self.request('POST', f'/api/v1/statuses/{id}/unpin')
async def stream(self, stream_name, *, target_event_type=None):
async with self._session.ws_connect(
self.api_base_url + f'/api/v1/streaming?stream={stream_name}&access_token={self.access_token}'
) as ws:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
event = msg.json()
# the only event type that doesn't define `payload` is `filters_changed`
if event['event'] == 'filters_changed':
yield event
elif target_event_type is None or event['event'] == target_event_type:
# don't ask me why the payload is also JSON encoded smh
yield json.loads(event['payload'])
async def stream_notifications(self):
async for notif in self.stream('user:notification', target_event_type='notification'):
yield notif
async def stream_mentions(self):
async for notif in self.stream_notifications():
if notif['type'] == 'mention':
yield notif

View file

@ -1,4 +1,5 @@
beautifulsoup4 ~= 4.9
pleroma.py ~= 0.0.1
aiohttp ~= 3.0
json5 ~= 0.9.5
anyio ~= 3.0

View file

@ -1,10 +1,10 @@
# SPDX-License-Identifier: AGPL-3.0-only
import sys
import anyio
import aiohttp
import contextlib
from functools import wraps
from datetime import datetime, timezone
from dateutil.parser import parse as parsedate
def as_corofunc(f):
@wraps(f)
@ -36,34 +36,14 @@ def removeprefix(s, prefix):
# compatibility for pre-3.9
return s[len(prefix):] if s.startswith(prefix) else s
async def sleep_until(dt):
await anyio.sleep((dt - datetime.now(timezone.utc)).total_seconds())
class HandleRateLimits:
def __init__(self, http):
self.http = http
def request(self, *args, **kwargs):
return _RateLimitContextManager(self.http, args, kwargs)
class _RateLimitContextManager(contextlib.AbstractAsyncContextManager):
def __init__(self, http, args, kwargs):
self.http = http
self.args = args
self.kwargs = kwargs
async def __aenter__(self):
self._request_cm = self.http.request(*self.args, **self.kwargs)
return await self._do_enter()
async def _do_enter(self):
resp = await self._request_cm.__aenter__()
if resp.headers.get('X-RateLimit-Remaining') not in {'0', '1'}:
return resp
await sleep_until(parsedate(resp.headers['X-RateLimit-Reset']))
await self._request_cm.__aexit__(*(None,)*3)
return await self.__aenter__()
async def __aexit__(self, *excinfo):
return await self._request_cm.__aexit__(*excinfo)
def http_session_factory(headers={}, **kwargs):
py_version = '.'.join(map(str, sys.version_info))
user_agent = (
'pleroma-ebooks (https://github.com/ioistired/pleroma-ebooks); '
f'aiohttp/{aiohttp.__version__}; '
f'python/{py_version}'
)
return aiohttp.ClientSession(
headers={'User-Agent': user_agent, **headers},
**kwargs,
)