Allow users to request a rescan of a message that was missed

This commit is contained in:
Holly 2024-04-15 20:23:28 +00:00
parent 9688645864
commit 08b001e38f
3 changed files with 104 additions and 38 deletions

View File

@ -2,7 +2,7 @@
A fediverse bot to look up Magic: The Gathering cards.
To use, mention the bot with a message that contains at least one Magic card name wrapped in either [[square brackets]] or {{curly braces}}. It will search Scryfall for the card and reply with the result. Alternatively, follow the bot, and it will follow you back. Any posts it sees with that syntax in its home timeline will also be processed.
To use, mention the bot with a message that contains at least one Magic card name wrapped in either [[square brackets]] or {{curly braces}}. It will search Scryfall for the card and reply with the result. Alternatively, follow the bot, and it will follow you back. Any posts it sees with that syntax in its home timeline will also be processed. Finally, if you made a post that you intended to be scanned, but wasn't because you forgot to mention the bot, reply to that post while mentioning the bot and the text "last", "back", "parent" or "up".
See it in action [here](https://botsin.space/@mtgcardlookup)!

View File

@ -3,12 +3,12 @@
import os # Used exactly once to check for the config file
import shutil # Used exactly once to copy the sample config file
import asyncio
import aiohttp
import re
import json
import io
import argparse
import traceback
import nh3
from PIL import Image
import atoot # Asynchronous Mastodon API wrapper
@ -53,13 +53,11 @@ async def startup(args):
async def update_pins(c, me, config):
"""Resend and repin the thread in pinned_thread.txt"""
async def get_pinned_statuses(user_id):
# atoot can't look up a user's pinned toots for some reason, so we
async def get_pinned_statuses(c, user_id):
# atoot can't look up a user's pinned toots, so we
# have to do it ourselves
async with aiohttp.ClientSession() as session:
url = f'https://{config.instance}/api/v1/accounts/{me["id"]}/statuses?pinned=true'
async with session.get(url) as r:
return [e['id'] for e in await r.json()]
statuses = await c.get(f'/api/v1/accounts/{me["id"]}/statuses?pinned=true')
return (s['id'] for s in statuses)
log('Ready to repost the pinned introduction thread!')
@ -88,7 +86,7 @@ async def update_pins(c, me, config):
# Unpin all existing pins
log('Unpinning old thread...')
for status in (await get_pinned_statuses(me)):
for status in (await get_pinned_statuses(c, me)):
await c.status_unpin(status)
log('Pinning new thread...')
@ -274,28 +272,103 @@ async def update_followers(c, me):
else:
log('No accounts to unfollow.')
async def handle_status(c, status):
async def handle_status(c, status, mentions_me, me):
"""
Determine if a status should be replied to and, if so, construct and post that reply
params: c (mastodon client object)
status (the status in question)
mentions_me (bool, whether or not the status mentions this bot)
me (id of this bot's user account)
"""
def get_status_info(status):
status_id = status['id']
status_author = '@' + status['account']['acct']
status_text = nh3.clean(status['content'], tags=set()) # Remove all HTML from the status text
status_visibility = status['visibility']
# Reply unlisted or at the same visibility as the parent, whichever is
# more restrictive
# This doesn't matter for, say, Mastodon. But apparently some fedi
# fedi displays public replies in public timelines? And even if that's
# wrong, it doesn't hurt to keep it this way, just in case.
reply_visibility = min(('unlisted', status_visibility), key=['direct', 'private', 'unlisted', 'public'].index)
return status_id, status_author, status_text, status_visibility, reply_visibility
async def get_our_non_error_replies(status_id, me):
all_descendants = (await c.get(f'/api/v1/statuses/{status_id}/context'))['descendants']
replies = (s for s in all_descendants if s['in_reply_to_id'] == status_id)
ours = (s for s in replies if s['account']['id'] == me["id"])
return (s for s in ours if 'Sorry! You broke me somehow.' not in s['content'])
async def send_reply(c, reply_text, media_ids, status_id, reply_visibility):
try:
reply = await c.create_status(status=reply_text, media_ids=media_ids, in_reply_to_id=status_id, visibility=reply_visibility)
log(f'Reply sent! {reply["uri"]}')
except atoot.api.UnprocessedError as e:
log(f'Could not send reply!', Severity.ERROR)
log(traceback.format_exc(), Severity.ERROR)
error_msg = 'An error occured sending the reply. This most likely means that it would have been greater than 500 characters. If it was something else, please let Holly know!'
await c.create_status(status=f'{status_author} {error_msg}', in_reply_to_id=status_id, visibility=reply_visibility)
# Ignore all reblogs
if status.get('reblog'): return
status_id = status['id']
status_author = '@' + status['account']['acct']
status_text = status['content']
status_visibility = status['visibility']
# Reply unlisted or at the same visibility as the parent, whichever is
# more restrictive
# This doesn't matter for, say, Mastodon. But apparently some fedi
# fedi displays public replies in public timelines? And even if that's
# wrong, it doesn't hurt to keep it this way, just in case.
reply_visibility = min(('unlisted', status_visibility), key=['direct', 'private', 'unlisted', 'public'].index)
status_id, status_author, status_text, status_visibility, reply_visibility = get_status_info(status)
reply_text = status_author
# The bot can be called on the parent status if:
# - The only non-mention text in the current status is "last", "back", "parent" or "up"
# - We are explicitly mentioned in the status
# - There is a parent status
# - We can access the parent status
# - That parent status was written by the same user as the current status
# - This bot hasn't already replied to the parent status, unless that reply was an error
if mentions_me and re.sub(r'\S*@\S*', '', status_text).strip().lower() in ('last', 'back', 'parent', 'up'):
log(f'Trying to handle parent of status {status_id}')
# First two checks passed, the rest is a little complicated
# Ensure that there is a parent
if status['in_reply_to_account_id'] is None:
log(f'There is no parent stauts, bail')
await send_reply(c, f'{status_author} This message isn\'t a reply!', None, status_id, reply_visibility)
return
# Ensure that the parent message is reachable
parent_id = status['in_reply_to_id']
try:
log(f'Try to get the parent status with id {parent_id}')
reply_status = status
status = await c.get(f'/api/v1/statuses/{parent_id}')
except atoot.api.NotFoundError:
log(f'Can\'t get the parent status, bail')
await send_reply(
c,
f'{status_author} I can\'t see the parent status! It\'s probably followers-only or a DM, and I don\'t have permission to view it.',
None,
status_id,
reply_visibility,
)
return
# Enusre that the authors match
if reply_status['account']['id'] != status['account']['id']:
log(f'Author ID {reply_status["account"]["id"]} doesn\'t match parent author id {status["account"]["id"]}, bail')
await send_reply(c, f'{status_author} This operation can only be performed on your own status!', None, status_id, reply_visibility)
return
# Ensure that we haven't already replied to the parent
if (replies := tuple(s for s in await get_our_non_error_replies(status['id'], me))):
existing_reply = replies[0]
log(f'We\'ve already responded to this parent at {existing_reply["id"]}, bail')
await send_reply(c, f'{status_author} I\'ve already replied to the parent status! {existing_reply["url"]}', None, status_id, reply_visibility)
return
log('Checks all passed, running remaining function on parent status')
status_id, status_author, status_text, status_visibility, reply_visibility = get_status_info(status)
reply_text = f'{status_author} [Retroactive scan requested]'
media_ids = None
@ -316,12 +389,11 @@ async def handle_status(c, status):
cards, media = await get_cards(card_names)
reply_text = status_author
# Just a personal preference thing. If I ask for one card, put the
# text on the same line as the mention. If I ask for more, start the
# list a couple of lines down.
if len(cards) == 1:
# list a couple of lines down. On a retroactive scan, always start a
# couple lines down.
if len(cards) == 1 and '[Retroactive scan requested]' not in reply_text:
reply_text += ' ' + cards[0]
else:
reply_text += '\n\n' + '\n'.join(cards)
@ -340,14 +412,7 @@ async def handle_status(c, status):
reply_text = f'{status_author} Sorry! You broke me somehow. Please let Holly know what you did!'
log('Sending reply...')
try:
reply = await c.create_status(status=reply_text, media_ids=media_ids, in_reply_to_id=status_id, visibility=reply_visibility)
log(f'Reply sent! {reply["uri"]}')
except atoot.api.UnprocessedError as e:
log(f'Could not send reply!', Severity.ERROR)
log(traceback.format_exc(), Severity.ERROR)
error_msg = 'An error occured sending the reply. This most likely means that it would have been greater than 500 characters. If it was something else, please let Holly know!'
await c.create_status(status=f'{status_author} {error_msg}', in_reply_to_id=status_id, visibility=reply_visibility)
await send_reply(c, reply_text, media_ids, status_id, reply_visibility)
async def handle_follow(c, follow):
"""
@ -380,14 +445,15 @@ async def listen(c, me):
mentions_me = any((mentioned['id'] == me['id'] for mentioned in payload['mentions']))
# Ignore any incoming status that mentions us
# We're also going to get a ntification event, we'll handle it there
# We're also going to get a notification event, we'll handle it there
if not mentions_me:
await handle_status(c, payload)
await handle_status(c, payload, False, me)
elif event == 'notification':
if payload['type'] == 'follow':
await handle_follow(c, payload)
elif payload['type'] == 'mention':
await handle_status(c, payload['status'])
await handle_status(c, payload['status'], True, me)
# https://stackoverflow.com/a/55505152/2114129
async def repeat(interval, func, *args, **kwargs):

View File

@ -1,5 +1,5 @@
# For making asynchronous http requests
aiohttp
# For stripping HTML from status content
nh3
# Image processing, for combining the faces of DFCs
Pillow