mtg-card-lookup/mtgcardlookup.py

492 lines
17 KiB
Python
Executable File

#!/usr/bin/env python3.9
import os # Used exactly once to check for the config file
import shutil # Used exactly once to copy the sample config file
import time
import asyncio
import aiohttp
import re
import json
import io
import argparse
import traceback
import nh3
from PIL import Image
import atoot # Asynchronous Mastodon API wrapper
import scrython # Scryfall API (similar to Gatherer but I prefer Scryfall)
import nest_asyncio # I cannot even begin to explain why I need this one.
nest_asyncio.apply() # It has something to do with Scrython using asyncio, which means I can't
# use it from within my asyncio code. And that's on purpose, I think.
# But this patches asyncio to allow that, somehow?
# I'm sorry, it's completely beyond me. Look it up.
import face
from easter_eggs import eggs
from debug import *
async def startup(args):
"""Start up the entire bot, logging in and executing code"""
log('Starting up...')
if not os.path.exists('config.py'):
log('Config file not found, copying', Severity.WARNING)
shutil.copyfile('config.sample.py', 'config.py')
import config
async with atoot.client(config.instance, access_token=config.access_token) as c:
log('Connected to server!')
me = await c.verify_account_credentials()
log('Credentials verified!')
if args.update_pins:
await update_pins(c, me, config)
else:
tasks = [
asyncio.create_task(listen(c, me)),
asyncio.create_task(repeat(5 * 60, update_followers, c, me)),
]
for t in tasks:
await t
async def update_pins(c, me, config):
"""Resend and repin the thread in pinned_thread.txt"""
async def get_pinned_statuses(c, user_id):
# atoot can't look up a user's pinned toots, so we
# have to do it ourselves
statuses = await c.get(f'/api/v1/accounts/{me["id"]}/statuses?pinned=true')
return (s['id'] for s in statuses)
log('Ready to repost the pinned introduction thread!')
# Get text for each status of the thread
log('Getting thread text...')
try:
with open('pinned_thread.txt') as f:
thread_text = f.read().strip().split('\n-----\n')
except:
log('Error reading text for new pinned thread!', Severity.ERROR)
log('Make sure that the file pinned_thread.txt exists and that it contains the text of each status, separated by lines containing only "-----".', Severity.ERROR)
exit(-1)
thread_statuses = []
# Post the new thread
log('Posting new thread...')
for status_text in thread_text:
thread_statuses.append(
await c.create_status(
status=status_text,
in_reply_to_id=(thread_statuses[-1]['id'] if thread_statuses else None),
visibility='unlisted',
)
)
# Unpin all existing pins
log('Unpinning old thread...')
for status in (await get_pinned_statuses(c, me)):
await c.status_unpin(status)
log('Pinning new thread...')
# Pin the new thread in reverse order, so it reads chronologically top to bottom
for status in thread_statuses[::-1]:
await c.status_pin(status)
log('Done!')
async def get_cards(card_names):
"""
Return information about all cards with names in card_names
params: card_names (iterable with names of cards as strings)
return: [response, images]
where response is a list of strings, each being either:
the card name and scryfall url
"No card named {name} was found"
and where images is either:
a list of up to 4 tuples containing:
io.BytesIO images of cards
Oracle text for the card it's an image of
None
"""
async def get_card_image(session, c, get_oracle=True):
async def download_card_image(session, c):
async with session.get(c.image_uris(0, 'normal')) as r:
log(f'Downloading image for {c.name()}...')
# BytesIO stores the data in memory as a file-like object.
# We can turn around and upload it to fedi without ever
# touching the disk.
b = await r.read()
log(f'Done downloading image for {c.name()}!')
return io.BytesIO(b)
async def download_card_text(session, c):
async with session.get(c.uri() + '?format=text') as r:
log(f'Downloading text representation of {c.name()}...')
text = await r.text()
log(f'Done downloading text representation of {c.name()}!')
return text
try:
# Scrython exposes this method for every card and just manually raises a KeyError if it's not
# a DFC, so this whole section has to be wrapped in a try-catch to deal with it
c.card_faces()
log(f'{c.name()} is a DFC, getting each face separately...')
oracle_text = download_card_text(session, c)
front, back = map(Image.open, await asyncio.gather(
*(get_card_image(session, face.Face(card_face), False) for card_face in c.card_faces())
))
new_image = Image.new('RGB', (front.width*2, front.height))
new_image.paste(front, (0, 0))
new_image.paste(back, (front.width, 0))
output = io.BytesIO()
new_image.save(output, format=front.format)
output.seek(0)
return (output, await oracle_text)
except KeyError:
pass
if get_oracle:
return await asyncio.gather(
download_card_image(session, c),
download_card_text(session, c)
)
else:
return await download_card_image(session, c)
# Responses list: One entry for each [[card name]] in parent, even if the
# response is just "No card named 'CARDNAME' was found."
responses = []
# Cards list: Only cards that were found successfully
cards = []
for name in card_names:
name = re.sub(r'<.*?>', '', name).strip()
# Handle set codes and collector numbers
name, set_code, collector_num, *_ = (name.split('|') + ['']*2)
try:
# Check if any of the easter eggs should happen
for func, replacement in eggs:
if func(name):
c = scrython.cards.Named(fuzzy=replacement)
break
else:
c = scrython.cards.Named(fuzzy=name, set=set_code.lower())
if collector_num:
c_by_num = scrython.cards.Collector(
code=set_code.lower(),
collector_number=collector_num,
)
# Make sure the provided name matches
if c_by_num.name() != c.name():
raise scrython.foundation.ScryfallError
c = c_by_num
cards.append(c)
responses.append(f'{c.name()} - {c.scryfall_uri()}')
except scrython.foundation.ScryfallError as e:
if collector_num:
responses.append(
f'No card named "{name}" from set with code '
f'{set_code.upper()} and collector number {collector_num} '
'was found.')
elif set_code:
responses.append(f'No card named "{name}" from set with code {set_code.upper()} was found.')
else:
responses.append(f'No card named "{name}" was found.')
# Download card images.
# A status can only have four images on it, so we can't necessarily include
# every card mentioned in the status.
# The reason I choose to include /no/ images in that case is that someone
# linking to more than 4 cards is probably talking about enough different things
# that it would be weird for the first four they happened to mention to have images.
# Like if someone's posting a decklist it would be weird for the first four cards to
# be treated as special like that.
if 1 <= len(cards) <= 4:
async with aiohttp.ClientSession() as session:
images = await asyncio.gather(
*(get_card_image(session, c) for c in cards)
)
else:
images = None
return responses, images
async def update_followers(c, me):
"""
Execute follows/unfollows to ensure that following and follower lists are synced
params: c (mastodon client object)
me (id of this bot's user account)
"""
log('Updating followed accounts...')
accounts_following_me = set(map(lambda a: a['id'], await c.get_all(c.account_followers(me))))
accounts_i_follow = set(map(lambda a: a['id'], await c.get_all(c.account_following(me))))
# Accounts that follow me that I don't follow
to_follow = accounts_following_me - accounts_i_follow
# Accounts I follow that don't follow me
to_unfollow = accounts_i_follow - accounts_following_me
if to_follow:
# Note that the bot listens for follows and tries to follow
# back instantly. This is /usually/ dead code but it's a failsafe
# in case someone followed while the bot was down or something.
log(f'{len(to_follow)} accounts to follow:')
for account in to_follow:
log(f'Following {account}...')
account_dict = await c.get(f'/api/v1/accounts/{account}')
moved_to = account_dict.get('moved')
if moved_to:
log('Account has moved, skipping...')
continue
await c.account_follow(account)
time.sleep(1)
else:
log('No accounts to follow.')
if to_unfollow:
log(f'{len(to_unfollow)} accounts to unfollow:')
for account in to_unfollow:
log(f'Unfollowing {account}...')
await c.account_follow(account)
time.sleep(1)
else:
log('No accounts to unfollow.')
async def handle_status(c, status, mentions_me, me):
"""
Determine if a status should be replied to and, if so, construct and post that reply
params: c (mastodon client object)
status (the status in question)
mentions_me (bool, whether or not the status mentions this bot)
me (id of this bot's user account)
"""
def get_status_info(status):
status_id = status['id']
status_author = '@' + status['account']['acct']
status_text = nh3.clean(status['content'], tags=set()) # Remove all HTML from the status text
status_visibility = status['visibility']
# Reply unlisted or at the same visibility as the parent, whichever is
# more restrictive
# This doesn't matter for, say, Mastodon. But apparently some fedi
# fedi displays public replies in public timelines? And even if that's
# wrong, it doesn't hurt to keep it this way, just in case.
reply_visibility = min(('unlisted', status_visibility), key=['direct', 'private', 'unlisted', 'public'].index)
return status_id, status_author, status_text, status_visibility, reply_visibility
async def get_our_non_error_replies(status_id, me):
all_descendants = (await c.get(f'/api/v1/statuses/{status_id}/context'))['descendants']
replies = (s for s in all_descendants if s['in_reply_to_id'] == status_id)
ours = (s for s in replies if s['account']['id'] == me["id"])
return (s for s in ours if 'Sorry! You broke me somehow.' not in s['content'])
async def send_reply(c, reply_text, media_ids, status_id, reply_visibility):
try:
reply = await c.create_status(status=reply_text, media_ids=media_ids, in_reply_to_id=status_id, visibility=reply_visibility)
log(f'Reply sent! {reply["uri"]}')
except atoot.api.UnprocessedError as e:
log(f'Could not send reply!', Severity.ERROR)
log(traceback.format_exc(), Severity.ERROR)
error_msg = 'An error occured sending the reply. This most likely means that it would have been greater than 500 characters. If it was something else, please let Holly know!'
await c.create_status(status=f'{status_author} {error_msg}', in_reply_to_id=status_id, visibility=reply_visibility)
# Ignore all reblogs
if status.get('reblog'): return
status_id, status_author, status_text, status_visibility, reply_visibility = get_status_info(status)
reply_text = status_author
# The bot can be called on the parent status if:
# - The only non-mention text in the current status is "last", "back", "parent" or "up"
# - We are explicitly mentioned in the status
# - There is a parent status
# - We can access the parent status
# - That parent status was written by the same user as the current status
# - This bot hasn't already replied to the parent status, unless that reply was an error
if mentions_me and re.sub(r'\S*@\S*', '', status_text).strip().lower() in ('last', 'back', 'parent', 'up'):
log(f'Trying to handle parent of status {status_id}')
# First two checks passed, the rest is a little complicated
# Ensure that there is a parent
if status['in_reply_to_account_id'] is None:
log(f'There is no parent stauts, bail')
await send_reply(c, f'{status_author} This message isn\'t a reply!', None, status_id, reply_visibility)
return
# Ensure that the parent message is reachable
parent_id = status['in_reply_to_id']
try:
log(f'Try to get the parent status with id {parent_id}')
reply_status = status
status = await c.get(f'/api/v1/statuses/{parent_id}')
except atoot.api.NotFoundError:
log(f'Can\'t get the parent status, bail')
await send_reply(
c,
f'{status_author} I can\'t see the parent status! It\'s probably followers-only or a DM, and I don\'t have permission to view it.',
None,
status_id,
reply_visibility,
)
return
# Enusre that the authors match
if reply_status['account']['id'] != status['account']['id']:
log(f'Author ID {reply_status["account"]["id"]} doesn\'t match parent author id {status["account"]["id"]}, bail')
await send_reply(c, f'{status_author} This operation can only be performed on your own status!', None, status_id, reply_visibility)
return
# Ensure that we haven't already replied to the parent
if (replies := tuple(s for s in await get_our_non_error_replies(status['id'], me))):
existing_reply = replies[0]
log(f'We\'ve already responded to this parent at {existing_reply["id"]}, bail')
await send_reply(c, f'{status_author} I\'ve already replied to the parent status! {existing_reply["url"]}', None, status_id, reply_visibility)
return
log('Checks all passed, running remaining function on parent status')
status_id, status_author, status_text, status_visibility, reply_visibility = get_status_info(status)
reply_text = f'{status_author} [Retroactive scan requested]'
media_ids = None
try:
card_names = re.findall(
r'''
(?:\[\[|\{\{) # A non-capturing group of the characters "[[" or "{{"
(.*?) # The card text being searched for (in a capturing group, so it's returned alone)
(?:\]\]|\}\}) # A non-capturing group of the characters "]]" or "}}"
''',
status_text,
re.VERBOSE)
# ignore any statuses without cards in them
if not card_names: return
log(f'Found a status with cards {card_names}...')
cards, media = await get_cards(card_names)
# Just a personal preference thing. If I ask for one card, put the
# text on the same line as the mention. If I ask for more, start the
# list a couple of lines down. On a retroactive scan, always start a
# couple lines down.
if len(cards) == 1 and '[Retroactive scan requested]' not in reply_text:
reply_text += ' ' + cards[0]
else:
reply_text += '\n\n' + '\n'.join(cards)
if media:
try:
media_ids = []
for image, desc in media:
media_ids.append((await c.upload_attachment(fileobj=image, params={}, description=desc))['id'])
except atoot.api.RatelimitError:
media_ids = None
reply_text += '\n\nMedia attachments are temporarily disabled due to API restrictions, they will return shortly.'
except Exception as e:
# Oops!
log(traceback.print_exc(), Severity.ERROR)
reply_text = f'{status_author} Sorry! You broke me somehow. Please let Holly know what you did!'
log('Sending reply...')
await send_reply(c, reply_text, media_ids, status_id, reply_visibility)
async def handle_follow(c, follow):
"""
Follow back any users who follow
params: c (mastodon client object)
follow (the follow notification)
"""
id = follow['account']['id']
log(f'Received follow from {id}, following back')
await c.account_follow(id)
async def listen(c, me):
"""
Wait for incoming statuses and notifications and handle them appropriately
params: c (mastodon client object)
me (id of this bot's user account)
"""
log('Listening...')
async with c.streaming('user') as stream:
async for msg in stream:
event = msg.json()['event']
payload = json.loads(msg.json()['payload'])
# We only care about 'update' and 'notification' events
if event == 'update':
mentions_me = any((mentioned['id'] == me['id'] for mentioned in payload['mentions']))
# Ignore any incoming status that mentions us
# We're also going to get a notification event, we'll handle it there
if not mentions_me:
await handle_status(c, payload, False, me)
elif event == 'notification':
if payload['type'] == 'follow':
await handle_follow(c, payload)
elif payload['type'] == 'mention':
await handle_status(c, payload['status'], True, me)
# https://stackoverflow.com/a/55505152/2114129
async def repeat(interval, func, *args, **kwargs):
"""Run func every interval seconds.
If func has not finished before *interval*, will run again
immediately when the previous iteration finished.
*args and **kwargs are passed as the arguments to func.
"""
while True:
await asyncio.gather(
func(*args, **kwargs),
asyncio.sleep(interval),
)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--update-pins',
help='repost and repin the introduction thread from pinned_thread.txt, then exit',
action='store_true'
)
args = parser.parse_args()
asyncio.run(startup(args))