2021-03-09 06:25:13 +00:00
#!/usr/bin/env python3.9
2021-03-09 06:05:42 +00:00
2021-03-09 17:49:15 +00:00
import os # Used exactly once to check for the config file
import shutil # Used exactly once to copy the sample config file
2021-03-09 04:32:26 +00:00
import asyncio
2021-03-09 18:40:55 +00:00
import aiohttp
2021-03-09 05:08:20 +00:00
import re
import json
2021-03-09 06:01:09 +00:00
import requests
import io
2021-03-09 15:46:51 +00:00
import traceback
2021-03-09 04:32:26 +00:00
2021-03-09 17:49:15 +00:00
import atoot # Asynchronous Mastodon API wrapper
import scrython # Scryfall API (similar to Gatherer but I prefer Scryfall)
import nest_asyncio # I cannot even begin to explain why I need this one.
nest_asyncio . apply ( ) # It has something to do with Scrython using asyncio, which means I can't
# use it from within my asyncio code. And that's on purpose, I think.
# But this patches asyncio to allow that, somehow?
# I'm sorry, it's completely beyond me. Look it up.
2021-03-09 04:32:26 +00:00
2021-03-09 16:12:59 +00:00
import face
2021-03-09 04:32:26 +00:00
from debug import *
async def startup ( ) :
log ( ' Starting up... ' )
if not os . path . exists ( ' config.py ' ) :
log ( ' Config file not found, copying ' , Severity . WARNING )
shutil . copyfile ( ' config.sample.py ' , ' config.py ' )
2021-03-09 14:20:30 +00:00
2021-03-09 04:32:26 +00:00
import config
async with atoot . client ( config . instance , access_token = config . access_token ) as c :
log ( ' Connected to server! ' )
me = await c . verify_account_credentials ( )
log ( ' Credentials verified! ' )
2021-03-09 14:20:30 +00:00
2021-03-09 04:32:26 +00:00
tasks = [ ]
2021-03-09 14:20:30 +00:00
2021-03-09 04:32:26 +00:00
tasks . append ( asyncio . create_task ( listen ( c , me ) ) )
tasks . append ( asyncio . create_task ( repeat ( 5 * 60 , update_followers , c , me ) ) )
2021-03-09 14:20:30 +00:00
2021-03-09 04:32:26 +00:00
for t in tasks :
await t
2021-03-09 15:46:51 +00:00
async def get_cards ( card_names ) :
2021-03-09 15:08:25 +00:00
async def download_card_image ( session , c ) :
2021-03-09 15:46:51 +00:00
log ( f ' Downloading image for { c . name ( ) } ... ' )
2021-03-09 15:08:25 +00:00
url = c . image_uris ( 0 , ' normal ' )
async with session . get ( url ) as r :
2021-03-09 17:49:15 +00:00
# BytesIO stores the data in memory as a file-like object.
# We can turn around and upload it to fedi without ever
# touching the disk.
2021-03-09 15:46:51 +00:00
image = io . BytesIO ( await r . read ( ) )
2021-03-09 16:12:59 +00:00
2021-03-09 15:46:51 +00:00
log ( f ' Done downloading image for { c . name ( ) } ! ' )
return image
2021-03-09 15:08:25 +00:00
def get_text_representation ( c ) :
2021-03-09 16:12:59 +00:00
try :
# Double face cards have to be treated ENTIRELY DIFFERENTLY
# Instead of card objects we just get a list of two dictionaries.
# I've decided to try to reuse at much code as possible by jerryrigging
# my own Face object that can be passed into my card parsing logic
return ' \n \n // \n \n ' . join ( (
get_text_representation ( face . Face ( card_face ) ) for card_face in c . card_faces ( )
) )
except :
pass
2021-03-09 15:08:25 +00:00
# I genuinely think this is the best way to check whether a card has
# power/toughness, considering how Scrython is implemented.
# Can't even check for Creature type because of stuff like Vehicles.
try :
c . power ( )
has_pt = True
except KeyError :
has_pt = False
ret = c . name ( ) # All cards have a name.
# Some cards (lands, [[Wheel of Fate]], whatever) don't have mana costs.
# Add it if it's there.
if c . mana_cost ( ) :
ret + = f ' - { c . mana_cost ( ) } '
# All cards have a type line.
ret + = ' \n ' + c . type_line ( )
# Funnily enough, not all cards have oracle text.
# It feels like they should, but that's ignoring vanilla creatures.
if c . oracle_text ( ) :
ret + = f ' \n \n { c . oracle_text ( ) } '
# Finally, power/toughness.
if has_pt :
ret + = f ' \n \n { c . power ( ) } / { c . toughness ( ) } '
return ret
2021-03-09 17:49:15 +00:00
# Responses list: One entry for each [[card name]] in parent, even if the
# response is just "No card named 'CARDNAME' was found."
responses = [ ]
# Cards list: Only cards that were found successfully
2021-03-09 06:01:09 +00:00
cards = [ ]
2021-03-09 14:20:30 +00:00
2021-03-09 06:01:09 +00:00
for name in card_names :
try :
2021-03-10 03:28:59 +00:00
if len ( name ) > 141 :
c = scrython . cards . Named ( fuzzy = ' Our Market Research Shows That Players Like Really Long Card Names So We Make This Card to Have The Absolute Longest Card Name Ever Elemental ' )
else :
c = scrython . cards . Named ( fuzzy = name )
2021-03-09 17:49:15 +00:00
cards . append ( c )
responses . append ( f ' { c . name ( ) } - { c . scryfall_uri ( ) } ' )
2021-03-09 06:01:09 +00:00
except scrython . foundation . ScryfallError :
2021-03-09 17:49:15 +00:00
responses . append ( f ' No card named " { name } " was found. ' )
# Download card images.
# A status can only have four images on it, so we can't necessarily include
# every card mentioned in the status.
# The reason I choose to include /no/ images in that case is that someone
# linking to more than 4 cards is probably talking about enough different things
# that it would be weird for the first four they happened to mention to have images.
# Like if someone's posting a decklist it would be weird for the first four cards to
# be treated as special like that.
if 1 < = len ( cards ) < = 4 :
2021-03-09 15:08:25 +00:00
async with aiohttp . ClientSession ( ) as session :
2021-03-09 17:49:15 +00:00
images = tuple ( zip (
await asyncio . gather (
* ( download_card_image ( session , c ) for c in cards )
) ,
( get_text_representation ( c ) for c in cards )
) )
2021-03-09 15:08:25 +00:00
else :
images = None
2021-03-09 14:20:30 +00:00
2021-03-09 17:56:48 +00:00
return responses , images
2021-03-09 06:01:09 +00:00
2021-03-09 04:32:26 +00:00
async def update_followers ( c , me ) :
log ( ' Updating followed accounts... ' )
accounts_following_me = set ( map ( lambda a : a [ ' id ' ] , await c . account_followers ( me ) ) )
accounts_i_follow = set ( map ( lambda a : a [ ' id ' ] , await c . account_following ( me ) ) )
2021-03-09 14:20:30 +00:00
2021-03-09 17:49:15 +00:00
# Accounts that follow me that I don't follow
2021-03-09 04:32:26 +00:00
to_follow = accounts_following_me - accounts_i_follow
2021-03-09 14:20:30 +00:00
2021-03-09 17:49:15 +00:00
# Accounts I follow that don't follow me
2021-03-09 04:32:26 +00:00
to_unfollow = accounts_i_follow - accounts_following_me
2021-03-09 14:20:30 +00:00
2021-03-09 04:32:26 +00:00
if to_follow :
2021-03-10 03:18:44 +00:00
# Note that the bot listens for follows and tries to follow
2021-03-09 17:49:15 +00:00
# back instantly. This is /usually/ dead code but it's a failsafe
# in case someone followed while the bot was down or something.
2021-03-09 04:32:26 +00:00
log ( f ' { len ( to_follow ) } accounts to follow: ' )
for account in to_follow :
await c . account_follow ( account )
log ( f ' Followed { account } ' )
else :
log ( ' No accounts to follow. ' )
2021-03-09 14:20:30 +00:00
2021-03-09 04:32:26 +00:00
if to_unfollow :
log ( f ' { len ( to_unfollow ) } accounts to unfollow: ' )
for account in to_unfollow :
await c . account_unfollow ( account )
log ( f ' Unfollowed { account } ' )
else :
log ( ' No accounts to unfollow. ' )
async def listen ( c , me ) :
log ( ' Listening... ' )
async with c . streaming ( ' user ' ) as stream :
async for msg in stream :
2021-03-09 05:08:20 +00:00
status = json . loads ( msg . json ( ) [ ' payload ' ] )
try :
2021-03-09 17:49:15 +00:00
# Two events come in for each status on the timeline. I don't know why.
# One of them has the status nested deeper. Just ignore that one I guess.
2021-03-09 05:08:20 +00:00
if ' status ' in status : continue
2021-03-09 14:20:30 +00:00
2021-03-10 02:52:37 +00:00
# Don't activate on boosts at all
2021-03-10 03:02:24 +00:00
if ' reblog ' in status and status [ ' reblog ' ] is not None : continue
2021-03-10 02:52:37 +00:00
2021-03-09 07:09:39 +00:00
status_id = status [ ' id ' ]
status_author = ' @ ' + status [ ' account ' ] [ ' acct ' ]
status_text = status [ ' content ' ]
status_visibility = status [ ' visibility ' ]
2021-03-09 05:08:20 +00:00
except :
2021-03-09 15:46:51 +00:00
try :
if status [ ' type ' ] == ' follow ' :
id = status [ ' account ' ] [ ' id ' ]
log ( f ' Received follow from { id } , following back ' )
await c . account_follow ( id )
except :
log ( ' Event came in that we don \' t know how to handle. ' , Severity . WARNING )
log ( status , Severity . WARNING )
2021-03-09 16:12:59 +00:00
2021-03-09 05:08:20 +00:00
continue
2021-03-09 14:20:30 +00:00
2021-03-09 17:49:15 +00:00
# Reply unlisted or at the same visibility as the parent, whichever is
# more restrictive
# I realized after writing this that I don't /think/ it ever matters?
# I think replies behave the same on public and unlisted. But I'm not 100%
# sure so it stays.
2021-03-09 06:01:09 +00:00
reply_visibility = min ( ( ' unlisted ' , status_visibility ) , key = [ ' direct ' , ' private ' , ' unlisted ' , ' public ' ] . index )
2021-03-09 14:20:30 +00:00
2021-03-09 06:01:09 +00:00
media_ids = None
2021-03-09 14:20:30 +00:00
2021-03-09 06:01:09 +00:00
try :
card_names = re . findall ( r ' \ [ \ [(.+?) \ ] \ ] ' , status_text )
2021-03-09 14:20:30 +00:00
2021-03-09 06:01:09 +00:00
# ignore any statuses without cards in them
if not card_names : continue
2021-03-09 14:20:30 +00:00
2021-03-09 15:46:51 +00:00
cards , media = await get_cards ( card_names )
2021-03-09 14:20:30 +00:00
2021-03-09 06:01:09 +00:00
reply_text = status_author
2021-03-09 14:20:30 +00:00
2021-03-09 17:49:15 +00:00
# Just a personal preference thing. If I ask for one card, put the
# text on the same line as the mention. If I ask for more, start the
# list a couple of lines down.
2021-03-09 06:01:09 +00:00
if len ( cards ) == 1 :
reply_text + = ' ' + cards [ 0 ]
else :
reply_text + = ' \n \n ' + ' \n ' . join ( cards )
2021-03-09 14:20:30 +00:00
2021-03-09 06:01:09 +00:00
if media :
2021-03-10 02:57:43 +00:00
try :
media_ids = [ ]
for image , desc in media :
media_ids . append ( ( await c . upload_attachment ( fileobj = image , params = { } , description = desc ) ) [ ' id ' ] )
except atoot . api . RatelimitError :
media_ids = None
reply_text + = ' \n \n Media attachments are temporarily disabled due to API restrictions, they will return shortly. '
2021-03-09 15:46:51 +00:00
except Exception as e :
2021-03-09 17:49:15 +00:00
# Oops!
2021-03-09 15:46:51 +00:00
log ( traceback . print_exc ( ) , Severity . ERROR )
2021-03-09 06:01:09 +00:00
reply_text = f ' { status_author } Sorry! You broke me somehow. Please let Holly know what you did! '
2021-03-09 14:20:30 +00:00
2021-03-09 05:08:20 +00:00
log ( ' Sending reply... ' )
2021-03-09 18:02:24 +00:00
reply = await c . create_status ( status = reply_text , media_ids = media_ids , in_reply_to_id = status_id , visibility = reply_visibility )
log ( f ' Reply sent! { reply [ " uri " ] } ' )
2021-03-09 04:32:26 +00:00
# https://stackoverflow.com/a/55505152/2114129
async def repeat ( interval , func , * args , * * kwargs ) :
2021-03-09 17:49:15 +00:00
""" Run func every interval seconds.
If func has not finished before * interval * , will run again
immediately when the previous iteration finished .
* args and * * kwargs are passed as the arguments to func .
"""
while True :
await asyncio . gather (
func ( * args , * * kwargs ) ,
asyncio . sleep ( interval ) ,
)
2021-03-09 04:32:26 +00:00
if __name__ == ' __main__ ' :
2021-03-09 06:05:42 +00:00
asyncio . run ( startup ( ) )