163 lines
4.3 KiB
Python
163 lines
4.3 KiB
Python
import os
|
|
import shutil
|
|
import asyncio
|
|
import re
|
|
import json
|
|
import requests
|
|
import io
|
|
|
|
import atoot
|
|
import scrython
|
|
import nest_asyncio
|
|
nest_asyncio.apply()
|
|
|
|
from debug import *
|
|
|
|
async def startup():
|
|
log('Starting up...')
|
|
if not os.path.exists('config.py'):
|
|
log('Config file not found, copying', Severity.WARNING)
|
|
shutil.copyfile('config.sample.py', 'config.py')
|
|
|
|
import config
|
|
async with atoot.client(config.instance, access_token=config.access_token) as c:
|
|
log('Connected to server!')
|
|
me = await c.verify_account_credentials()
|
|
log('Credentials verified!')
|
|
|
|
tasks = []
|
|
|
|
tasks.append(asyncio.create_task(listen(c, me)))
|
|
tasks.append(asyncio.create_task(repeat(5 * 60, update_followers, c, me)))
|
|
|
|
for t in tasks:
|
|
await t
|
|
|
|
def get_cards(card_names):
|
|
cards = []
|
|
found = []
|
|
images = []
|
|
|
|
for name in card_names:
|
|
try:
|
|
c = scrython.cards.Named(fuzzy=name)
|
|
found.append(c)
|
|
cards.append(f'{c.name()} - {c.scryfall_uri()}')
|
|
except scrython.foundation.ScryfallError:
|
|
cards.append(f'No card found "{name}" was found.')
|
|
|
|
if 1 <= len(found) <= 4:
|
|
r = requests.get(c.image_uris(0, 'normal'), stream=True)
|
|
|
|
has_pt = False
|
|
try:
|
|
c.power()
|
|
has_pt = True
|
|
except KeyError:
|
|
pass
|
|
|
|
description = c.name()
|
|
if c.mana_cost():
|
|
description += f' - {c.mana_cost()}'
|
|
description += '\n' + c.type_line()
|
|
if c.oracle_text():
|
|
description += f'\n\n{c.oracle_text()}'
|
|
if has_pt:
|
|
description += f'\n\n{c.power()}/{c.toughness()}'
|
|
|
|
images.append((io.BytesIO(r.content), description))
|
|
|
|
return cards, images
|
|
|
|
async def update_followers(c, me):
|
|
log('Updating followed accounts...')
|
|
accounts_following_me = set(map(lambda a: a['id'], await c.account_followers(me)))
|
|
accounts_i_follow = set(map(lambda a: a['id'], await c.account_following(me)))
|
|
|
|
# accounts that follow me that i don't follow
|
|
to_follow = accounts_following_me - accounts_i_follow
|
|
|
|
# accounts i follow that don't follow me
|
|
to_unfollow = accounts_i_follow - accounts_following_me
|
|
|
|
if to_follow:
|
|
log(f'{len(to_follow)} accounts to follow:')
|
|
for account in to_follow:
|
|
await c.account_follow(account)
|
|
log(f'Followed {account}')
|
|
else:
|
|
log('No accounts to follow.')
|
|
|
|
if to_unfollow:
|
|
log(f'{len(to_unfollow)} accounts to unfollow:')
|
|
for account in to_unfollow:
|
|
await c.account_unfollow(account)
|
|
log(f'Unfollowed {account}')
|
|
else:
|
|
log('No accounts to unfollow.')
|
|
|
|
async def listen(c, me):
|
|
log('Listening...')
|
|
async with c.streaming('user') as stream:
|
|
async for msg in stream:
|
|
status = json.loads(msg.json()['payload'])
|
|
|
|
try:
|
|
# two events come in for the statuses, one of them has the status nested deeper
|
|
# just ignore that one
|
|
if 'status' in status: continue
|
|
except:
|
|
# ignore any events we don't know how to handle
|
|
continue
|
|
|
|
status_id = status['id']
|
|
status_author = '@' + status['account']['acct']
|
|
status_text = status['content']
|
|
status_visibility = status['visibility']
|
|
|
|
reply_visibility = min(('unlisted', status_visibility), key=['direct', 'private', 'unlisted', 'public'].index)
|
|
|
|
media_ids = None
|
|
|
|
try:
|
|
card_names = re.findall(r'\[\[(.+?)\]\]', status_text)
|
|
|
|
# ignore any statuses without cards in them
|
|
if not card_names: continue
|
|
|
|
cards, media = get_cards(card_names)
|
|
|
|
reply_text = status_author
|
|
|
|
if len(cards) == 1:
|
|
reply_text += ' ' + cards[0]
|
|
else:
|
|
reply_text += '\n\n' + '\n'.join(cards)
|
|
|
|
if media:
|
|
media_ids = []
|
|
for image, desc in media:
|
|
media_ids.append((await c.upload_attachment(fileobj=image, params={}, description=desc))['id'])
|
|
except:
|
|
reply_text = f'{status_author} Sorry! You broke me somehow. Please let Holly know what you did!'
|
|
|
|
log('Sending reply...')
|
|
await c.create_status(status=reply_text, media_ids=media_ids, in_reply_to_id=status_id, visibility=reply_visibility)
|
|
|
|
# https://stackoverflow.com/a/55505152/2114129
|
|
async def repeat(interval, func, *args, **kwargs):
|
|
"""Run func every interval seconds.
|
|
|
|
If func has not finished before *interval*, will run again
|
|
immediately when the previous iteration finished.
|
|
|
|
*args and **kwargs are passed as the arguments to func.
|
|
"""
|
|
while True:
|
|
await asyncio.gather(
|
|
func(*args, **kwargs),
|
|
asyncio.sleep(interval),
|
|
)
|
|
|
|
if __name__ == '__main__':
|
|
asyncio.run(startup()) |