mtg-card-lookup/mtgcardlookup.py

204 lines
5.5 KiB
Python
Raw Normal View History

2021-03-09 06:25:13 +00:00
#!/usr/bin/env python3.9
2021-03-09 06:05:42 +00:00
import os
import shutil
import asyncio
2021-03-09 05:08:20 +00:00
import re
import json
2021-03-09 06:01:09 +00:00
import requests
import io
import sys
import traceback
import atoot
2021-03-09 06:01:09 +00:00
import scrython
import nest_asyncio
nest_asyncio.apply()
from debug import *
async def startup():
log('Starting up...')
if not os.path.exists('config.py'):
log('Config file not found, copying', Severity.WARNING)
shutil.copyfile('config.sample.py', 'config.py')
2021-03-09 14:20:30 +00:00
import config
async with atoot.client(config.instance, access_token=config.access_token) as c:
log('Connected to server!')
me = await c.verify_account_credentials()
log('Credentials verified!')
2021-03-09 14:20:30 +00:00
tasks = []
2021-03-09 14:20:30 +00:00
tasks.append(asyncio.create_task(listen(c, me)))
tasks.append(asyncio.create_task(repeat(5 * 60, update_followers, c, me)))
2021-03-09 14:20:30 +00:00
for t in tasks:
await t
async def get_cards(card_names):
2021-03-09 15:08:25 +00:00
async def download_card_image(session, c):
log(f'Downloading image for {c.name()}...')
2021-03-09 15:08:25 +00:00
url = c.image_uris(0, 'normal')
async with session.get(url) as r:
image = io.BytesIO(await r.read())
log(f'Done downloading image for {c.name()}!')
return image
2021-03-09 15:08:25 +00:00
def get_text_representation(c):
# I genuinely think this is the best way to check whether a card has
# power/toughness, considering how Scrython is implemented.
# Can't even check for Creature type because of stuff like Vehicles.
try:
c.power()
has_pt = True
except KeyError:
has_pt = False
ret = c.name() # All cards have a name.
# Some cards (lands, [[Wheel of Fate]], whatever) don't have mana costs.
# Add it if it's there.
if c.mana_cost():
ret += f' - {c.mana_cost()}'
# All cards have a type line.
ret += '\n' + c.type_line()
# Funnily enough, not all cards have oracle text.
# It feels like they should, but that's ignoring vanilla creatures.
if c.oracle_text():
ret += f'\n\n{c.oracle_text()}'
# Finally, power/toughness.
if has_pt:
ret += f'\n\n{c.power()}/{c.toughness()}'
return ret
2021-03-09 06:01:09 +00:00
cards = []
found = []
2021-03-09 14:20:30 +00:00
2021-03-09 06:01:09 +00:00
for name in card_names:
try:
c = scrython.cards.Named(fuzzy=name)
found.append(c)
cards.append(f'{c.name()} - {c.scryfall_uri()}')
except scrython.foundation.ScryfallError:
2021-03-09 06:03:54 +00:00
cards.append(f'No card named "{name}" was found.')
2021-03-09 14:20:30 +00:00
2021-03-09 15:08:25 +00:00
if 1 <= len(found) <= 4:
# download card images
async with aiohttp.ClientSession() as session:
images = list(zip(await asyncio.gather(
2021-03-09 15:08:25 +00:00
*(download_card_image(session, c) for c in found)
), (get_text_representation(c) for c in found)))
2021-03-09 14:20:30 +00:00
2021-03-09 15:08:25 +00:00
else:
images = None
2021-03-09 14:20:30 +00:00
2021-03-09 06:01:09 +00:00
return cards, images
async def update_followers(c, me):
log('Updating followed accounts...')
accounts_following_me = set(map(lambda a: a['id'], await c.account_followers(me)))
accounts_i_follow = set(map(lambda a: a['id'], await c.account_following(me)))
2021-03-09 14:20:30 +00:00
# accounts that follow me that i don't follow
to_follow = accounts_following_me - accounts_i_follow
2021-03-09 14:20:30 +00:00
# accounts i follow that don't follow me
to_unfollow = accounts_i_follow - accounts_following_me
2021-03-09 14:20:30 +00:00
if to_follow:
log(f'{len(to_follow)} accounts to follow:')
for account in to_follow:
await c.account_follow(account)
log(f'Followed {account}')
else:
log('No accounts to follow.')
2021-03-09 14:20:30 +00:00
if to_unfollow:
log(f'{len(to_unfollow)} accounts to unfollow:')
for account in to_unfollow:
await c.account_unfollow(account)
log(f'Unfollowed {account}')
else:
log('No accounts to unfollow.')
async def listen(c, me):
log('Listening...')
async with c.streaming('user') as stream:
async for msg in stream:
2021-03-09 05:08:20 +00:00
status = json.loads(msg.json()['payload'])
try:
# two events come in for the statuses, one of them has the status nested deeper
# just ignore that one
if 'status' in status: continue
2021-03-09 14:20:30 +00:00
status_id = status['id']
status_author = '@' + status['account']['acct']
status_text = status['content']
status_visibility = status['visibility']
2021-03-09 05:08:20 +00:00
except:
try:
if status['type'] == 'follow':
id = status['account']['id']
log(f'Received follow from {id}, following back')
await c.account_follow(id)
except:
log('Event came in that we don\'t know how to handle.', Severity.WARNING)
log(status, Severity.WARNING)
2021-03-09 05:08:20 +00:00
continue
2021-03-09 14:20:30 +00:00
2021-03-09 06:01:09 +00:00
reply_visibility = min(('unlisted', status_visibility), key=['direct', 'private', 'unlisted', 'public'].index)
2021-03-09 14:20:30 +00:00
2021-03-09 06:01:09 +00:00
media_ids = None
2021-03-09 14:20:30 +00:00
2021-03-09 06:01:09 +00:00
try:
card_names = re.findall(r'\[\[(.+?)\]\]', status_text)
2021-03-09 14:20:30 +00:00
2021-03-09 06:01:09 +00:00
# ignore any statuses without cards in them
if not card_names: continue
2021-03-09 14:20:30 +00:00
cards, media = await get_cards(card_names)
2021-03-09 14:20:30 +00:00
2021-03-09 06:01:09 +00:00
reply_text = status_author
2021-03-09 14:20:30 +00:00
2021-03-09 06:01:09 +00:00
if len(cards) == 1:
reply_text += ' ' + cards[0]
else:
reply_text += '\n\n' + '\n'.join(cards)
2021-03-09 14:20:30 +00:00
2021-03-09 06:01:09 +00:00
if media:
media_ids = []
for image, desc in media:
media_ids.append((await c.upload_attachment(fileobj=image, params={}, description=desc))['id'])
except Exception as e:
log(traceback.print_exc(), Severity.ERROR)
2021-03-09 06:01:09 +00:00
reply_text = f'{status_author} Sorry! You broke me somehow. Please let Holly know what you did!'
2021-03-09 14:20:30 +00:00
2021-03-09 05:08:20 +00:00
log('Sending reply...')
2021-03-09 06:01:09 +00:00
await c.create_status(status=reply_text, media_ids=media_ids, in_reply_to_id=status_id, visibility=reply_visibility)
# https://stackoverflow.com/a/55505152/2114129
async def repeat(interval, func, *args, **kwargs):
"""Run func every interval seconds.
If func has not finished before *interval*, will run again
immediately when the previous iteration finished.
*args and **kwargs are passed as the arguments to func.
"""
while True:
await asyncio.gather(
func(*args, **kwargs),
asyncio.sleep(interval),
)
if __name__ == '__main__':
2021-03-09 06:05:42 +00:00
asyncio.run(startup())