@ -0,0 +1,456 @@ | |||
import config | |||
import discord | |||
from discord.ext import commands | |||
from discord.ext.commands import Cog | |||
from helpers.checks import check_if_staff, check_if_target_is_staff | |||
import io | |||
import os | |||
import urllib.parse | |||
class Lists(Cog): | |||
""" | |||
Manages channels that are dedicated to lists. | |||
""" | |||
def __init__(self, bot): | |||
self.bot = bot | |||
# Helpers | |||
def is_edit(self, emoji): | |||
return str(emoji)[0] == "โ" or str(emoji)[0] == "๐" | |||
def is_delete(self, emoji): | |||
return str(emoji)[0] == "โ" or str(emoji)[0] == "โ" | |||
def is_recycle(self, emoji): | |||
return str(emoji)[0] == "โป" | |||
def is_insert_above(self, emoji): | |||
return str(emoji)[0] == "โคด๏ธ" or str(emoji)[0] == "โฌ" | |||
def is_insert_below(self, emoji): | |||
return str(emoji)[0] == "โคต๏ธ" or str(emoji)[0] == "โฌ" | |||
def is_reaction_valid(self, reaction): | |||
allowed_reactions = [ | |||
"โ", | |||
"๐", | |||
"โ", | |||
"โ", | |||
"โป", | |||
"โคด๏ธ", | |||
"โฌ", | |||
"โฌ", | |||
"โคต๏ธ", | |||
] | |||
return str(reaction.emoji)[0] in allowed_reactions | |||
async def find_reactions(self, user_id, channel_id, limit=None): | |||
reactions = [] | |||
channel = self.bot.get_channel(channel_id) | |||
async for message in channel.history(limit=limit): | |||
if len(message.reactions) == 0: | |||
continue | |||
for reaction in message.reactions: | |||
users = await reaction.users().flatten() | |||
user_ids = map(lambda user: user.id, users) | |||
if user_id in user_ids: | |||
reactions.append(reaction) | |||
return reactions | |||
def create_log_message(self, emoji, action, user, channel, reason=""): | |||
msg = ( | |||
f"{emoji} **{action}** \n" | |||
f"from {self.bot.escape_message(user.name)} ({user.id}), in {channel.mention}" | |||
) | |||
if reason != "": | |||
msg += f":\n`{reason}`" | |||
return msg | |||
async def clean_up_raw_text_file_message(self, guild_id, message): | |||
embeds = message.embeds | |||
if len(embeds) == 0: | |||
return | |||
fields = embeds[0].fields | |||
for field in fields: | |||
if field.name == "Message ID": | |||
files_channel = self.bot.get_channel( | |||
config.list_files_channel[guild_id] | |||
) | |||
file_message = await files_channel.fetch_message(int(field.value)) | |||
await file_message.delete() | |||
await message.edit(embed=None) | |||
async def link_list_item(self, ctx, channel: discord.TextChannel, number: int): | |||
if number <= 0: | |||
await ctx.send(f"Number must be greater than 0.") | |||
return False | |||
if channel.id not in config.list_channels[ctx.guild.id]: | |||
await ctx.send(f"{channel.mention} is not a list channel.") | |||
return False | |||
counter = 0 | |||
async for message in channel.history(limit=None, oldest_first=True): | |||
if message.content.strip(): | |||
counter += 1 | |||
if counter == number: | |||
embed = discord.Embed( | |||
title=f"Item #{number} in #{channel.name}", | |||
description=message.content.replace("โ", "").strip(), | |||
) | |||
embed.add_field(name="Jump URL", value=f"[Jump!]({message.jump_url})") | |||
await ctx.send(content="", embed=embed) | |||
return True | |||
await ctx.send(f"Unable to find item #{number} in {channel.mention}.") | |||
return False | |||
async def cache_message(self, message): | |||
msg = { | |||
"has_attachment": False, | |||
"attachment_filename": "", | |||
"attachment_data": b"", | |||
"content": message.content, | |||
} | |||
if len(message.attachments) != 0: | |||
attachment = next( | |||
( | |||
a | |||
for a in message.attachments | |||
if os.path.splitext(a.filename)[1] in [".png", ".jpg", ".jpeg"] | |||
), | |||
None, | |||
) | |||
if attachment is not None: | |||
msg["has_attachment"] = True | |||
msg["attachment_filename"] = attachment.filename | |||
msg["attachment_data"] = await attachment.read() | |||
return msg | |||
async def send_cached_message(self, channel, message): | |||
if message["has_attachment"] == True: | |||
file = discord.File( | |||
io.BytesIO(message["attachment_data"]), | |||
filename=message["attachment_filename"], | |||
) | |||
await channel.send(content=message["content"], file=file) | |||
else: | |||
await channel.send(content=message["content"]) | |||
# Commands | |||
@commands.guild_only() | |||
@commands.command(aliases=["list"]) | |||
async def listitem(self, ctx, channel: discord.TextChannel, number: int): | |||
"""Link to a specific list item.""" | |||
await self.link_list_item(ctx, channel, number) | |||
@commands.guild_only() | |||
@commands.command(aliases=["rule"]) | |||
async def rules(self, ctx, number: int): | |||
"""Link to a specific list item in #rules""" | |||
channel = ctx.guild.get_channel(config.rules_channel[ctx.guild.id]) | |||
await self.link_list_item(ctx, channel, number) | |||
@commands.guild_only() | |||
@commands.check(check_if_staff) | |||
@commands.command(aliases=["warnrule", "ruleswarn", "warnrules"]) | |||
async def rulewarn( | |||
self, ctx, target: discord.Member, number: int, *, reason: str = "" | |||
): | |||
if "Mod" not in self.bot.cogs: | |||
await ctx.send("Mod cog must be loaded to run this command.") | |||
return | |||
mod_cog = self.bot.cogs["Mod"] | |||
warn_command = None | |||
for command in mod_cog.get_commands(): | |||
if command.name == "warn": | |||
warn_command = command | |||
break | |||
if warn_command is None: | |||
await ctx.send("Unable to find the warn command from the Mod cog.") | |||
return | |||
if not await warn_command.can_run(ctx): | |||
await ctx.send("Unable to run the warn command from the Mod cog.") | |||
return | |||
if ( | |||
len(warn_command.params) != 4 | |||
or "self" not in warn_command.params | |||
or "ctx" not in warn_command.params | |||
or "target" not in warn_command.params | |||
or "reason" not in warn_command.params | |||
): | |||
await ctx.send("Warn's signature has changed please update the Lists cog.") | |||
return | |||
channel = ctx.guild.get_channel(config.rules_channel[ctx.guild.id]) | |||
if await self.link_list_item(ctx, channel, number): | |||
await warn_command.callback( | |||
mod_cog, ctx=ctx, target=target, reason=f"Rule {number} - {reason}" | |||
) | |||
# Listeners | |||
@Cog.listener() | |||
async def on_raw_reaction_add(self, payload): | |||
await self.bot.wait_until_ready() | |||
# We only care about reactions in Rules, and Support FAQ | |||
if payload.channel_id not in config.list_channels[payload.guild_id]: | |||
return | |||
channel = self.bot.get_channel(payload.channel_id) | |||
message = await channel.fetch_message(payload.message_id) | |||
member = channel.guild.get_member(payload.user_id) | |||
user = self.bot.get_user(payload.user_id) | |||
reaction = next( | |||
( | |||
reaction | |||
for reaction in message.reactions | |||
if str(reaction.emoji) == str(payload.emoji) | |||
), | |||
None, | |||
) | |||
if reaction is None: | |||
return | |||
# Only staff can add reactions in these channels. | |||
if not check_if_target_is_staff(payload.guild_id, member): | |||
await reaction.remove(user) | |||
return | |||
# Reactions are only allowed on messages from the bot. | |||
if not message.author.bot: | |||
await reaction.remove(user) | |||
return | |||
# Only certain reactions are allowed. | |||
if not self.is_reaction_valid(reaction): | |||
await reaction.remove(user) | |||
return | |||
# Remove all other reactions from user in this channel. | |||
for r in await self.find_reactions(payload.user_id, payload.channel_id): | |||
if r.message.id != message.id or ( | |||
r.message.id == message.id and str(r.emoji) != str(reaction.emoji) | |||
): | |||
await r.remove(user) | |||
# When editing we want to provide the user a copy of the raw text. | |||
if ( | |||
self.is_edit(reaction.emoji) | |||
and config.list_files_channel[payload.guild_id] != 0 | |||
): | |||
files_channel = self.bot.get_channel( | |||
config.list_files_channel[payload.guild_id] | |||
) | |||
file = discord.File( | |||
io.BytesIO(message.content.encode("utf-8")), | |||
filename=f"{message.id}.txt", | |||
) | |||
file_message = await files_channel.send(file=file) | |||
embed = discord.Embed( | |||
title="Click here to get the raw text to modify.", | |||
url=f"{file_message.attachments[0].url}?", | |||
) | |||
embed.add_field(name="Message ID", value=file_message.id, inline=False) | |||
await message.edit(embed=embed) | |||
@Cog.listener() | |||
async def on_raw_reaction_remove(self, payload): | |||
await self.bot.wait_until_ready() | |||
# We only care about reactions in Rules, and Support FAQ | |||
if payload.channel_id not in config.list_channels[payload.guild_id]: | |||
return | |||
channel = self.bot.get_channel(payload.channel_id) | |||
message = await channel.fetch_message(payload.message_id) | |||
# Reaction was removed from a message we don't care about. | |||
if not message.author.bot: | |||
return | |||
# We want to remove the embed we added. | |||
if ( | |||
self.is_edit(payload.emoji) | |||
and config.list_files_channel[payload.guild_id] != 0 | |||
): | |||
await self.clean_up_raw_text_file_message(payload.guild_id, message) | |||
@Cog.listener() | |||
async def on_message(self, message): | |||
await self.bot.wait_until_ready() | |||
# We only care about messages in Rules, and Support FAQ | |||
if message.channel.id not in config.list_channels[message.guild.id]: | |||
return | |||
# We don't care about messages from bots. | |||
if message.author.bot: | |||
return | |||
# Only staff can modify lists. | |||
if not check_if_target_is_staff(message.guild.id, message.author): | |||
await message.delete() | |||
return | |||
channel = message.channel | |||
content = message.content | |||
user = message.author | |||
attachment_filename = None | |||
attachment_data = None | |||
if len(message.attachments) != 0: | |||
# Lists will only reupload the first image. | |||
attachment = next( | |||
( | |||
a | |||
for a in message.attachments | |||
if os.path.splitext(a.filename)[1] in [".png", ".jpg", ".jpeg"] | |||
), | |||
None, | |||
) | |||
if attachment is not None: | |||
attachment_filename = attachment.filename | |||
attachment_data = await attachment.read() | |||
await message.delete() | |||
reactions = await self.find_reactions(user.id, channel.id) | |||
# Add to the end of the list if there is no reactions or somehow more | |||
# than one. | |||
if len(reactions) != 1: | |||
if attachment_filename is not None and attachment_data is not None: | |||
file = discord.File( | |||
io.BytesIO(attachment_data), filename=attachment_filename | |||
) | |||
await channel.send(content=content, file=file) | |||
else: | |||
await channel.send(content) | |||
for reaction in reactions: | |||
await reaction.remove(user) | |||
await self.bot.botlog_channels[message.guild.id].send( | |||
self.create_log_message("๐ฌ", "List item added:", user, channel) | |||
) | |||
return | |||
targeted_reaction = reactions[0] | |||
targeted_message = targeted_reaction.message | |||
if self.is_edit(targeted_reaction): | |||
if config.list_files_channel[message.guild.id] != 0: | |||
await self.clean_up_raw_text_file_message( | |||
message.guild.id, targeted_message | |||
) | |||
await targeted_message.edit(content=content) | |||
await targeted_reaction.remove(user) | |||
await self.bot.botlog_channels[message.guild.id].send( | |||
self.create_log_message("๐", "List item edited:", user, channel) | |||
) | |||
elif self.is_delete(targeted_reaction): | |||
await targeted_message.delete() | |||
await self.bot.botlog_channels[message.guild.id].send( | |||
self.create_log_message( | |||
"โ", "List item deleted:", user, channel, content | |||
) | |||
) | |||
elif self.is_recycle(targeted_reaction): | |||
messages = [await self.cache_message(targeted_message)] | |||
for message in await channel.history( | |||
limit=None, after=targeted_message, oldest_first=True | |||
).flatten(): | |||
messages.append(await self.cache_message(message)) | |||
await channel.purge(limit=len(messages) + 1, bulk=True) | |||
for message in messages: | |||
await self.send_cached_message(channel, message) | |||
await self.bot.botlog_channels[message.guild.id].send( | |||
self.create_log_message( | |||
"โป", "List item recycled:", user, channel, content | |||
) | |||
) | |||
elif self.is_insert_above(targeted_reaction): | |||
messages = [await self.cache_message(targeted_message)] | |||
for message in await channel.history( | |||
limit=None, after=targeted_message, oldest_first=True | |||
).flatten(): | |||
messages.append(await self.cache_message(message)) | |||
await channel.purge(limit=len(messages) + 1, bulk=True) | |||
if attachment_filename is not None and attachment_data is not None: | |||
file = discord.File( | |||
io.BytesIO(attachment_data), filename=attachment_filename | |||
) | |||
await channel.send(content=content, file=file) | |||
else: | |||
await channel.send(content) | |||
for message in messages: | |||
await self.send_cached_message(channel, message) | |||
await self.bot.botlog_channels[message.guild.id].send( | |||
self.create_log_message("๐ฌ", "List item added:", user, channel) | |||
) | |||
elif self.is_insert_below(targeted_reaction): | |||
await targeted_reaction.remove(user) | |||
messages = [] | |||
for message in await channel.history( | |||
limit=None, after=targeted_message, oldest_first=True | |||
).flatten(): | |||
messages.append(await self.cache_message(message)) | |||
await channel.purge(limit=len(messages), bulk=True) | |||
if attachment_filename is not None and attachment_data is not None: | |||
file = discord.File( | |||
io.BytesIO(attachment_data), filename=attachment_filename | |||
) | |||
await channel.send(content=content, file=file) | |||
else: | |||
await channel.send(content) | |||
for message in messages: | |||
await self.send_cached_message(channel, message) | |||
await self.bot.botlog_channels[message.guild.id].send( | |||
self.create_log_message("๐ฌ", "List item added:", user, channel) | |||
) | |||
def setup(bot): | |||
bot.add_cog(Lists(bot)) |
@ -0,0 +1,345 @@ | |||
import discord | |||
from discord.ext.commands import Cog | |||
import json | |||
import re | |||
import config | |||
from helpers.restrictions import get_user_restrictions | |||
from helpers.checks import check_if_staff | |||
class Logs(Cog): | |||
""" | |||
Logs join and leave messages, bans and unbans, and member changes. | |||
""" | |||
def __init__(self, bot): | |||
self.bot = bot | |||
self.invite_re = re.compile( | |||
r"((discord\.gg|discordapp\.com/" r"+invite)/+[a-zA-Z0-9-]+)", re.IGNORECASE | |||
) | |||
self.name_re = re.compile(r"[a-zA-Z0-9].*") | |||
self.clean_re = re.compile(r"[^a-zA-Z0-9_ ]+", re.UNICODE) | |||
# All lower case, no spaces, nothing non-alphanumeric | |||
self.susp_hellgex = {} | |||
for guild_id in config.guild_whitelist: | |||
susp_hellgex = "|".join( | |||
[r"\W*".join(list(word)) for word in config.suspect_words[guild_id]] | |||
) | |||
self.susp_hellgex[guild_id] = re.compile(susp_hellgex, re.IGNORECASE) | |||
@Cog.listener() | |||
async def on_member_join(self, member): | |||
await self.bot.wait_until_ready() | |||
if member.guild.id not in config.guild_whitelist: | |||
return | |||
log_channel = self.bot.get_channel(config.botlog_channel[member.guild.id]) | |||
# We use this a lot, might as well get it once | |||
escaped_name = self.bot.escape_message(member) | |||
# Attempt to correlate the user joining with an invite | |||
with open("data/invites.json", "r") as f: | |||
invites = json.load(f) | |||
real_invites = await member.guild.invites() | |||
# Add unknown active invites. Can happen if invite was manually created | |||
for invite in real_invites: | |||
if invite.id not in invites: | |||
invites[invite.id] = { | |||
"uses": 0, | |||
"url": invite.url, | |||
"max_uses": invite.max_uses, | |||
"code": invite.code, | |||
} | |||
probable_invites_used = [] | |||
items_to_delete = [] | |||
# Look for invites whose usage increased since last lookup | |||
for id, invite in invites.items(): | |||
real_invite = next((x for x in real_invites if x.id == id), None) | |||
if real_invite is None: | |||
# Invite does not exist anymore. Was either revoked manually | |||
# or the final use was used up | |||
probable_invites_used.append(invite) | |||
items_to_delete.append(id) | |||
elif invite["uses"] < real_invite.uses: | |||
probable_invites_used.append(invite) | |||
invite["uses"] = real_invite.uses | |||
# Delete used up invites | |||
for id in items_to_delete: | |||
del invites[id] | |||
# Save invites data. | |||
with open("data/invites.json", "w") as f: | |||
f.write(json.dumps(invites)) | |||
# Prepare the invite correlation message | |||
if len(probable_invites_used) == 1: | |||
invite_used = probable_invites_used[0]["code"] | |||
elif len(probable_invites_used) == 0: | |||
invite_used = "Unknown" | |||
else: | |||
invite_used = "One of: " | |||
invite_used += ", ".join([x["code"] for x in probable_invites_used]) | |||
msg = ( | |||
f"โ **Join**: {member.mention} | " | |||
f"{escaped_name}\n" | |||
f"๐ __Creation__: {member.created_at}\n" | |||
f"๐ Account age: {age}\n" | |||
f"โ Joined with: {invite_used}\n" | |||
f"๐ท __User ID__: {member.id}" | |||
) | |||
# Handles user restrictions | |||
# Basically, gives back muted role to users that leave with it. | |||
rsts = get_user_restrictions(member.id) | |||
roles = [discord.utils.get(member.guild.roles, id=rst) for rst in rsts] | |||
await member.add_roles(*roles) | |||
# Real hell zone. | |||
with open("data/userlog.json", "r") as f: | |||
warns = json.load(f) | |||
try: | |||
if len(warns[str(member.id)]["warns"]) == 0: | |||
await log_channel.send(msg) | |||
else: | |||
embed = discord.Embed( | |||
color=discord.Color.dark_red(), title=f"Warns for {escaped_name}" | |||
) | |||
embed.set_thumbnail(url=member.avatar_url) | |||
for idx, warn in enumerate(warns[str(member.id)]["warns"]): | |||
embed.add_field( | |||
name=f"{idx + 1}: {warn['timestamp']}", | |||
value=f"Issuer: {warn['issuer_name']}" | |||
f"\nReason: {warn['reason']}", | |||
) | |||
await log_channel.send(msg, embed=embed) | |||
except KeyError: # if the user is not in the file | |||
await log_channel.send(msg) | |||
async def do_spy(self, message): | |||
if message.author.bot: | |||
return | |||
if check_if_staff(message): | |||
return | |||
alert = False | |||
cleancont = self.clean_re.sub("", message.content).lower() | |||
msg = ( | |||
f"๐จ Suspicious message by {message.author.mention} " | |||
f"({message.author.id}):" | |||
) | |||
invites = self.invite_re.findall(message.content) | |||
for invite in invites: | |||
msg += f"\n- Has invite: https://{invite[0]}" | |||
alert = True | |||
for susp_word in config.suspect_words[message.guild.id]: | |||
if susp_word in cleancont and not any( | |||
ok_word in cleancont | |||
for ok_word in config.suspect_ignored_words[message.guild.id] | |||
): | |||
msg += f"\n- Contains suspicious word: `{susp_word}`" | |||
alert = True | |||
if alert: | |||
msg += f"\n\nJump: <{message.jump_url}>" | |||
spy_channel = self.bot.get_channel(config.spylog_channel[message.guild.id]) | |||
# Bad Code :tm:, blame retr0id | |||
message_clean = message.content.replace("*", "").replace("_", "") | |||
regd = self.susp_hellgex[message.guild.id].sub( | |||
lambda w: "**{}**".format(w.group(0)), message_clean | |||
) | |||
# Show a message embed | |||
embed = discord.Embed(description=regd) | |||
embed.set_author( | |||
name=message.author.display_name, icon_url=message.author.avatar_url | |||
) | |||
await spy_channel.send(msg, embed=embed) | |||
@Cog.listener() | |||
async def on_message(self, message): | |||
await self.bot.wait_until_ready() | |||
if message.channel.id not in config.spy_channels[message.guild.id]: | |||
return | |||
await self.do_spy(message) | |||
@Cog.listener() | |||
async def on_message_edit(self, before, after): | |||
await self.bot.wait_until_ready() | |||
if ( | |||
after.channel.id not in config.spy_channels[after.guild.id] | |||
or after.author.bot | |||
): | |||
return | |||
# If content is the same, just skip over it | |||
# This usually means that something embedded. | |||
if before.clean_content == after.clean_content: | |||
return | |||
await self.do_spy(after) | |||
# U+200D is a Zero Width Joiner stopping backticks from breaking the formatting | |||
before_content = before.clean_content.replace("`", "`\u200d") | |||
after_content = after.clean_content.replace("`", "`\u200d") | |||
log_channel = self.bot.get_channel(config.botlog_channel[after.guild.id]) | |||
msg = ( | |||
"๐ **Message edit**: \n" | |||
f"from {self.bot.escape_message(after.author.name)} " | |||
f"({after.author.id}), in {after.channel.mention}:\n" | |||
f"```{before_content}``` โ ```{after_content}```" | |||
) | |||
# If resulting message is too long, upload to hastebin | |||
if len(msg) > 2000: | |||
haste_url = await self.bot.haste(msg) | |||
msg = f"๐ **Message edit**: \nToo long: <{haste_url}>" | |||
await log_channel.send(msg) | |||
@Cog.listener() | |||
async def on_message_delete(self, message): | |||
await self.bot.wait_until_ready() | |||
if ( | |||
message.channel.id not in config.spy_channels[message.guild.id] | |||
or message.author.bot | |||
): | |||
return | |||
log_channel = self.bot.get_channel(config.botlog_channel[message.guild.id]) | |||
msg = ( | |||
"๐๏ธ **Message delete**: \n" | |||
f"from {self.bot.escape_message(message.author.name)} " | |||
f"({message.author.id}), in {message.channel.mention}:\n" | |||
f"`{message.clean_content}`" | |||
) | |||
# If resulting message is too long, upload to hastebin | |||
if len(msg) > 2000: | |||
haste_url = await self.bot.haste(msg) | |||
msg = f"๐๏ธ **Message delete**: \nToo long: <{haste_url}>" | |||
await log_channel.send(msg) | |||
@Cog.listener() | |||
async def on_member_remove(self, member): | |||
await self.bot.wait_until_ready() | |||
if member.guild.id not in config.guild_whitelist: | |||
return | |||
log_channel = self.bot.get_channel(config.botlog_channel[member.guild.id]) | |||
msg = ( | |||
f"โฌ ๏ธ **Leave**: {member.mention} | " | |||
f"{self.bot.escape_message(member)}\n" | |||
f"๐ท __User ID__: {member.id}" | |||
) | |||
await log_channel.send(msg) | |||
@Cog.listener() | |||
async def on_member_ban(self, guild, member): | |||
await self.bot.wait_until_ready() | |||
if guild.id not in config.guild_whitelist: | |||
return | |||
log_channel = self.bot.get_channel(config.botlog_channel[guild.id]) | |||
msg = ( | |||
f"โ **Ban**: {member.mention} | " | |||
f"{self.bot.escape_message(member)}\n" | |||
f"๐ท __User ID__: {member.id}" | |||
) | |||
await log_channel.send(msg) | |||
@Cog.listener() | |||
async def on_member_unban(self, guild, user): | |||
await self.bot.wait_until_ready() | |||
if guild.id not in config.guild_whitelist: | |||
return | |||
log_channel = self.bot.get_channel(config.botlog_channel[guild.id]) | |||
msg = ( | |||
f"โ ๏ธ **Unban**: {user.mention} | " | |||
f"{self.bot.escape_message(user)}\n" | |||
f"๐ท __User ID__: {user.id}" | |||
) | |||
await log_channel.send(msg) | |||
@Cog.listener() | |||
async def on_member_update(self, member_before, member_after): | |||
await self.bot.wait_until_ready() | |||
if member_after.guild.id not in config.guild_whitelist: | |||
return | |||
msg = "" | |||
log_channel = self.bot.get_channel(config.botlog_channel[member_after.guild.id]) | |||
if member_before.roles != member_after.roles: | |||
# role removal | |||
role_removal = [] | |||
for index, role in enumerate(member_before.roles): | |||
if role not in member_after.roles: | |||
role_removal.append(role) | |||
# role addition | |||
role_addition = [] | |||
for index, role in enumerate(member_after.roles): | |||
if role not in member_before.roles: | |||
role_addition.append(role) | |||
if len(role_addition) != 0 or len(role_removal) != 0: | |||
msg += "\n๐ __Role change__: " | |||
roles = [] | |||
for role in role_removal: | |||
roles.append("_~~" + role.name + "~~_") | |||
for role in role_addition: | |||
roles.append("__**" + role.name + "**__") | |||
for index, role in enumerate(member_after.roles): | |||
if role.name == "@everyone": | |||
continue | |||
if role not in role_removal and role not in role_addition: | |||
roles.append(role.name) | |||
msg += ", ".join(roles) | |||
if member_before.name != member_after.name: | |||
msg += ( | |||
"\n๐ __Username change__: " | |||
f"{self.bot.escape_message(member_before)} โ " | |||
f"{self.bot.escape_message(member_after)}" | |||
) | |||
if member_before.nick != member_after.nick: | |||
if not member_before.nick: | |||
msg += "\n๐ท __Nickname addition__" | |||
elif not member_after.nick: | |||
msg += "\n๐ท __Nickname removal__" | |||
else: | |||
msg += "\n๐ท __Nickname change__" | |||
msg += ( | |||
f": {self.bot.escape_message(member_before.nick)} โ " | |||
f"{self.bot.escape_message(member_after.nick)}" | |||
) | |||
if msg: | |||
msg = ( | |||
f"โน๏ธ **Member update**: {member_after.mention} | " | |||
f"{self.bot.escape_message(member_after)}{msg}" | |||
) | |||
await log_channel.send(msg) | |||
def setup(bot): | |||
bot.add_cog(Logs(bot)) |
@ -0,0 +1,514 @@ | |||
import discord | |||
from discord.ext import commands | |||
from discord.ext.commands import Cog | |||
import config | |||
from helpers.checks import ( | |||
check_if_bot_manager, | |||
check_if_staff, | |||
check_if_target_is_staff, | |||
) | |||
from helpers.userlogs import userlog | |||
from helpers.restrictions import add_restriction, remove_restriction | |||
import io | |||
class Mod(Cog): | |||
def __init__(self, bot): | |||
self.bot = bot | |||
@commands.guild_only() | |||
@commands.check(check_if_staff) | |||
@commands.command() | |||
async def mute(self, ctx, target: discord.Member, *, reason: str = ""): | |||
"""Mutes a user, staff only.""" | |||
# Hedge-proofing the code | |||
if target == ctx.author: | |||
return await ctx.send("You can't do mod actions on yourself.") | |||
elif target == self.bot.user: | |||
return await ctx.send( | |||
f"I'm sorry {ctx.author.mention}, I'm afraid I can't do that." | |||
) | |||
elif self.check_if_target_is_staff(ctx.guild.id, target): | |||
return await ctx.send( | |||
"I can't mute this user as they're a member of staff." | |||
) | |||
userlog(target.id, ctx.author, reason, "mutes", target.name) | |||
safe_name = await commands.clean_content(escape_markdown=True).convert( | |||
ctx, str(target) | |||
) | |||
dm_message = f"You were muted!" | |||
if reason: | |||
dm_message += f' The given reason is: "{reason}".' | |||
try: | |||
await target.send(dm_message) | |||
except discord.errors.Forbidden: | |||
# Prevents kick issues in cases where user blocked bot | |||
# or has DMs disabled | |||
pass | |||
mute_role = ctx.guild.get_role(config.mute_role[ctx.guild.id]) | |||
await target.add_roles(mute_role, reason=str(ctx.author)) | |||
chan_message = ( | |||
f"๐ **Muted**: {ctx.author.mention} muted " | |||
f"{target.mention} | {safe_name}\n" | |||
f"๐ท __User ID__: {target.id}\n" | |||
) | |||
if reason: | |||
chan_message += f'โ๏ธ __Reason__: "{reason}"' | |||
else: | |||
chan_message += ( | |||
"Please add an explanation below. In the future, " | |||
"it is recommended to use `.mute <user> [reason]`" | |||
" as the reason is automatically sent to the user." | |||
) | |||
chan_message += f"\n๐ __Jump__: <{ctx.message.jump_url}>" | |||
log_channel = self.bot.get_channel(config.botlog_channel[ctx.guild.id]) | |||
await log_channel.send(chan_message) | |||
await ctx.send(f"{target.mention} can no longer speak.") | |||
add_restriction(target.id, config.mute_role[ctx.guild.id]) | |||
@commands.guild_only() | |||
@commands.check(check_if_staff) | |||
@commands.command() | |||
async def unmute(self, ctx, target: discord.Member): | |||
"""Unmutes a user, staff only.""" | |||
safe_name = await commands.clean_content(escape_markdown=True).convert( | |||
ctx, str(target) | |||
) | |||
mute_role = ctx.guild.get_role(config.mute_role[ctx.guild.id]) | |||
await target.remove_roles(mute_role, reason=str(ctx.author)) | |||
chan_message = ( | |||
f"๐ **Unmuted**: {ctx.author.mention} unmuted " | |||
f"{target.mention} | {safe_name}\n" | |||
f"๐ท __User ID__: {target.id}\n" | |||
) | |||
chan_message += f"\n๐ __Jump__: <{ctx.message.jump_url}>" | |||
log_channel = self.bot.get_channel(config.botlog_channel[ctx.guild.id]) | |||
await log_channel.send(chan_message) | |||
await ctx.send(f"{target.mention} can now speak again.") | |||
remove_restriction(target.id, config.mute_role[ctx.guild.id]) | |||
@commands.guild_only() | |||
@commands.bot_has_permissions(kick_members=True) | |||
@commands.check(check_if_staff) | |||
@commands.command() | |||
async def kick(self, ctx, target: discord.Member, *, reason: str = ""): | |||
"""Kicks a user, staff only.""" | |||
# Hedge-proofing the code | |||
if target == ctx.author: | |||
return await ctx.send("You can't do mod actions on yourself.") | |||
elif target == self.bot.user: | |||
return await ctx.send( | |||
f"I'm sorry {ctx.author.mention}, I'm afraid I can't do that." | |||
) | |||
elif self.check_if_target_is_staff(ctx.guild.id, target): | |||
return await ctx.send( | |||
"I can't kick this user as they're a member of staff." | |||
) | |||
userlog(target.id, ctx.author, reason, "kicks", target.name) | |||
safe_name = await commands.clean_content(escape_markdown=True).convert( | |||
ctx, str(target) | |||
) | |||
dm_message = f"You were kicked from {ctx.guild.name}." | |||
if reason: | |||
dm_message += f' The given reason is: "{reason}".' | |||
dm_message += ( | |||
"\n\nYou are able to rejoin the server," | |||
" but please be sure to behave when participating again." | |||
) | |||
try: | |||
await target.send(dm_message) | |||
except discord.errors.Forbidden: | |||
# Prevents kick issues in cases where user blocked bot | |||
# or has DMs disabled | |||
pass | |||
await target.kick(reason=f"{ctx.author}, reason: {reason}") | |||
chan_message = ( | |||
f"๐ข **Kick**: {ctx.author.mention} kicked " | |||
f"{target.mention} | {safe_name}\n" | |||
f"๐ท __User ID__: {target.id}\n" | |||
) | |||
if reason: | |||
chan_message += f'โ๏ธ __Reason__: "{reason}"' | |||
else: | |||
chan_message += ( | |||
"Please add an explanation below. In the future" | |||
", it is recommended to use " | |||
"`.kick <user> [reason]`" | |||
" as the reason is automatically sent to the user." | |||
) | |||
chan_message += f"\n๐ __Jump__: <{ctx.message.jump_url}>" | |||
log_channel = self.bot.get_channel(config.botlog_channel[ctx.guild.id]) | |||
await log_channel.send(chan_message) | |||
await ctx.send(f"๐ข {safe_name}, ๐.") | |||
@commands.guild_only() | |||
@commands.bot_has_permissions(ban_members=True) | |||
@commands.check(check_if_staff) | |||
@commands.command(aliases=["yeet"]) | |||
async def ban(self, ctx, target: discord.Member, *, reason: str = ""): | |||
"""Bans a user, staff only.""" | |||
# Hedge-proofing the code | |||
if target == ctx.author: | |||
if target.id == 181627658520625152: | |||
return await ctx.send( | |||
"https://cdn.discordapp.com/attachments/286612533757083648/403080855402315796/rehedge.PNG" | |||
) | |||
return await ctx.send("hedgeberg#7337 is now b&. ๐") | |||
elif target == self.bot.user: | |||
return await ctx.send( | |||
f"I'm sorry {ctx.author.mention}, I'm afraid I can't do that." | |||
) | |||
elif self.check_if_target_is_staff(ctx.guild.id, target): | |||
return await ctx.send("I can't ban this user as they're a member of staff.") | |||
userlog(target.id, ctx.author, reason, "bans", target.name) | |||
safe_name = await commands.clean_content(escape_markdown=True).convert( | |||
ctx, str(target) | |||
) | |||
dm_message = f"You were banned from {ctx.guild.name}." | |||
if reason: | |||
dm_message += f' The given reason is: "{reason}".' | |||
dm_message += "\n\nThis ban does not expire." | |||
try: | |||
await target.send(dm_message) | |||
except discord.errors.Forbidden: | |||
# Prevents ban issues in cases where user blocked bot | |||
# or has DMs disabled | |||
pass | |||
await target.ban( | |||
reason=f"{ctx.author}, reason: {reason}", delete_message_days=0 | |||
) | |||
chan_message = ( | |||
f"โ **Ban**: {ctx.author.mention} banned " | |||
f"{target.mention} | {safe_name}\n" | |||
f"๐ท __User ID__: {target.id}\n" | |||
) | |||
if reason: | |||
chan_message += f'โ๏ธ __Reason__: "{reason}"' | |||
else: | |||
chan_message += ( | |||
"Please add an explanation below. In the future" | |||
", it is recommended to use `.ban <user> [reason]`" | |||
" as the reason is automatically sent to the user." | |||
) | |||
chan_message += f"\n๐ __Jump__: <{ctx.message.jump_url}>" | |||
log_channel = self.bot.get_channel(config.botlog_channel[ctx.guild.id]) | |||
await log_channel.send(chan_message) | |||
await ctx.send(f"{safe_name} is now b&. ๐") | |||
@commands.guild_only() | |||
@commands.bot_has_permissions(ban_members=True) | |||
@commands.check(check_if_staff) | |||
@commands.command(aliases=["softban"]) | |||
async def hackban(self, ctx, target: int, *, reason: str = ""): | |||
"""Bans a user with their ID, doesn't message them, staff only.""" | |||
target_user = await self.bot.fetch_user(target) | |||
target_member = ctx.guild.get_member(target) | |||
# Hedge-proofing the code | |||
if target == ctx.author.id: | |||
return await ctx.send("You can't do mod actions on yourself.") | |||
elif target == self.bot.user: | |||
return await ctx.send( | |||
f"I'm sorry {ctx.author.mention}, I'm afraid I can't do that." | |||
) | |||
elif target_member and self.check_if_target_is_staff( | |||
ctx.guild.id, target_member | |||
): | |||
return await ctx.send("I can't ban this user as they're a member of staff.") | |||
userlog(target, ctx.author, reason, "bans", target_user.name) | |||
safe_name = await commands.clean_content(escape_markdown=True).convert( | |||
ctx, str(target) | |||
) | |||
await ctx.guild.ban( | |||
target_user, reason=f"{ctx.author}, reason: {reason}", delete_message_days=0 | |||
) | |||
chan_message = ( | |||
f"โ **Hackban**: {ctx.author.mention} banned " | |||
f"{target_user.mention} | {safe_name}\n" | |||
f"๐ท __User ID__: {target}\n" | |||
) | |||
if reason: | |||
chan_message += f'โ๏ธ __Reason__: "{reason}"' | |||
else: | |||
chan_message += ( | |||
"Please add an explanation below. In the future" | |||
", it is recommended to use " | |||
"`.hackban <user> [reason]`." | |||
) | |||
chan_message += f"\n๐ __Jump__: <{ctx.message.jump_url}>" | |||
log_channel = self.bot.get_channel(config.botlog_channel[ctx.guild.id]) | |||
await log_channel.send(chan_message) | |||
await ctx.send(f"{safe_name} is now b&. ๐") | |||
@commands.guild_only() | |||
@commands.bot_has_permissions(ban_members=True) | |||
@commands.check(check_if_staff) | |||
@commands.command() | |||
async def unban(self, ctx, target: int, *, reason: str = ""): | |||
"""Unbans a user with their ID, doesn't message them, staff only.""" | |||
target_user = await self.bot.fetch_user(target) | |||
safe_name = await commands.clean_content(escape_markdown=True).convert( | |||
ctx, str(target) | |||
) | |||
await ctx.guild.unban(target_user, reason=f"{ctx.author}, reason: {reason}") | |||
chan_message = ( | |||
f"โ ๏ธ **Unban**: {ctx.author.mention} unbanned " | |||
f"{target_user.mention} | {safe_name}\n" | |||
f"๐ท __User ID__: {target}\n" | |||
) | |||
if reason: | |||
chan_message += f'โ๏ธ __Reason__: "{reason}"' | |||
else: | |||
chan_message += ( | |||
"Please add an explanation below. In the future" | |||
", it is recommended to use " | |||
"`.unban <user id> [reason]`." | |||
) | |||
chan_message += f"\n๐ __Jump__: <{ctx.message.jump_url}>" | |||
log_channel = self.bot.get_channel(config.botlog_channel[ctx.guild.id]) | |||
await log_channel.send(chan_message) | |||
await ctx.send(f"{safe_name} is now unb&.") | |||
@commands.guild_only() | |||
@commands.bot_has_permissions(ban_members=True) | |||
@commands.check(check_if_staff) | |||
@commands.command() | |||
async def silentban(self, ctx, target: discord.Member, *, reason: str = ""): | |||
"""Bans a user, staff only.""" | |||
# Hedge-proofing the code | |||
if target == ctx.author: | |||
return await ctx.send("You can't do mod actions on yourself.") | |||
elif target == self.bot.user: | |||
return await ctx.send( | |||
f"I'm sorry {ctx.author.mention}, I'm afraid I can't do that." | |||
) | |||
elif self.check_if_target_is_staff(ctx.guild.id, target): | |||
return await ctx.send("I can't ban this user as they're a member of staff.") | |||
userlog(target.id, ctx.author, reason, "bans", target.name) | |||
safe_name = await commands.clean_content(escape_markdown=True).convert( | |||
ctx, str(target) | |||
) | |||
await target.ban( | |||
reason=f"{ctx.author}, reason: {reason}", delete_message_days=0 | |||
) | |||
chan_message = ( | |||
f"โ **Silent ban**: {ctx.author.mention} banned " | |||
f"{target.mention} | {safe_name}\n" | |||
f"๐ท __User ID__: {target.id}\n" | |||
) | |||
if reason: | |||
chan_message += f'โ๏ธ __Reason__: "{reason}"' | |||
else: | |||
chan_message += ( | |||
"Please add an explanation below. In the future" | |||
", it is recommended to use `.ban <user> [reason]`" | |||
" as the reason is automatically sent to the user." | |||
) | |||
chan_message += f"\n๐ __Jump__: <{ctx.message.jump_url}>" | |||
log_channel = self.bot.get_channel(config.botlog_channel[ctx.guild.id]) | |||
await log_channel.send(chan_message) | |||
@commands.guild_only() | |||
@commands.check(check_if_staff) | |||
@commands.command(aliases=["clear"]) | |||
async def purge(self, ctx, limit: int, channel: discord.TextChannel = None): | |||
"""Clears a given number of messages, staff only.""" | |||
log_channel = self.bot.get_channel(config.botlog_channel[ctx.guild.id]) | |||
if not channel: | |||
channel = ctx.channel | |||
await channel.purge(limit=limit) | |||
msg = ( | |||
f"๐ **Purged**: {ctx.author.mention} purged {limit} " | |||
f"messages in {channel.mention}." | |||
) | |||
await log_channel.send(msg) | |||
@commands.guild_only() | |||
@commands.check(check_if_staff) | |||
@commands.command() | |||
async def warn(self, ctx, target: discord.Member, *, reason: str = ""): | |||
"""Warns a user, staff only.""" | |||
# Hedge-proofing the code | |||
if target == ctx.author: | |||
return await ctx.send("You can't do mod actions on yourself.") | |||
elif target == self.bot.user: | |||
return await ctx.send( | |||
f"I'm sorry {ctx.author.mention}, I'm afraid I can't do that." | |||
) | |||
elif self.check_if_target_is_staff(ctx.guild.id, target): | |||
return await ctx.send( | |||
"I can't warn this user as they're a member of staff." | |||
) | |||
log_channel = self.bot.get_channel(config.botlog_channel[ctx.guild.id]) | |||
warn_count = userlog(target.id, ctx.author, reason, "warns", target.name) | |||
safe_name = await commands.clean_content(escape_markdown=True).convert( | |||
ctx, str(target) | |||
) | |||
chan_msg = ( | |||
f"โ ๏ธ **Warned**: {ctx.author.mention} warned " | |||
f"{target.mention} (warn #{warn_count}) " | |||
f"| {safe_name}\n" | |||
) | |||
msg = f"You were warned on {ctx.guild.name}." | |||
if reason: | |||
msg += " The given reason is: " + reason | |||
msg += ( | |||
f"\n\nPlease read the rules in the rules channel. " | |||
f"This is warn #{warn_count}." | |||
) | |||
if warn_count == 2: | |||
msg += " __The next warn will automatically kick.__" | |||
if warn_count == 3: | |||
msg += ( | |||
"\n\nYou were kicked because of this warning. " | |||
"You can join again right away. " | |||
"Two more warnings will result in an automatic ban." | |||
) | |||
if warn_count == 4: | |||
msg += ( | |||
"\n\nYou were kicked because of this warning. " | |||
"This is your final warning. " | |||
"You can join again, but " | |||
"**one more warn will result in a ban**." | |||
) | |||
chan_msg += "**This resulted in an auto-kick.**\n" | |||
if warn_count == 5: | |||
msg += "\n\nYou were automatically banned due to five warnings." | |||
chan_msg += "**This resulted in an auto-ban.**\n" | |||
try: | |||
await target.send(msg) | |||
except discord.errors.Forbidden: | |||
# Prevents log issues in cases where user blocked bot | |||
# or has DMs disabled | |||
pass | |||
if warn_count == 3 or warn_count == 4: | |||
await target.kick() | |||
if warn_count >= 5: # just in case | |||
await target.ban(reason="exceeded warn limit", delete_message_days=0) | |||
await ctx.send( | |||
f"{target.mention} warned. " f"User has {warn_count} warning(s)." | |||
) | |||
if reason: | |||
chan_msg += f'โ๏ธ __Reason__: "{reason}"' | |||
else: | |||
chan_msg += ( | |||
"Please add an explanation below. In the future" | |||
", it is recommended to use `.warn <user> [reason]`" | |||
" as the reason is automatically sent to the user." | |||
) | |||
chan_msg += f"\n๐ __Jump__: <{ctx.message.jump_url}>" | |||
await log_channel.send(chan_msg) | |||
@commands.guild_only() | |||
@commands.check(check_if_staff) | |||
@commands.command(aliases=["setnick", "nick"]) | |||
async def nickname(self, ctx, target: discord.Member, *, nick: str = ""): | |||
"""Sets a user's nickname, staff only. | |||
Just send .nickname <user> to wipe the nickname.""" | |||
try: | |||
if nick: | |||
await target.edit(nick=nick, reason=str(ctx.author)) | |||
else: | |||
await target.edit(nick=None, reason=str(ctx.author)) | |||
await ctx.send("Successfully set nickname.") | |||
except discord.errors.Forbidden: | |||
await ctx.send( | |||
"I don't have the permission to set that user's nickname.\n" | |||
"User's top role may be above mine, or I may lack Manage Nicknames permission." | |||
) | |||
@commands.guild_only() | |||
@commands.check(check_if_staff) | |||
@commands.command(aliases=["echo"]) | |||
async def say(self, ctx, *, the_text: str): | |||
"""Repeats a given text, staff only.""" | |||
await ctx.send(the_text) | |||
@commands.guild_only() | |||
@commands.check(check_if_staff) | |||
@commands.command() | |||
async def speak(self, ctx, channel: discord.TextChannel, *, the_text: str): | |||
"""Repeats a given text in a given channel, staff only.""" | |||
await channel.send(the_text) | |||
@commands.guild_only() | |||
@commands.check(check_if_staff) | |||
@commands.command(aliases=["setplaying", "setgame"]) | |||
async def playing(self, ctx, *, game: str = ""): | |||
"""Sets the bot's currently played game name, staff only. | |||
Just send .playing to wipe the playing state.""" | |||
if game: | |||
await self.bot.change_presence(activity=discord.Game(name=game)) | |||
else: | |||
await self.bot.change_presence(activity=None) | |||
await ctx.send("Successfully set game.") | |||
@commands.guild_only() | |||
@commands.check(check_if_staff) | |||
@commands.command(aliases=["setbotnick", "botnick", "robotnick"]) | |||
async def botnickname(self, ctx, *, nick: str = ""): | |||
"""Sets the bot's nickname, staff only. | |||
Just send .botnickname to wipe the nickname.""" | |||
if nick: | |||
await ctx.guild.me.edit(nick=nick, reason=str(ctx.author)) | |||
else: | |||
await ctx.guild.me.edit(nick=None, reason=str(ctx.author)) | |||
await ctx.send("Successfully set bot nickname.") | |||
def setup(bot): | |||
bot.add_cog(Mod(bot)) |
@ -0,0 +1,248 @@ | |||
import discord | |||
from discord.ext import commands | |||
from discord.ext.commands import Cog | |||
import config | |||
import json | |||
from helpers.checks import check_if_staff | |||
from helpers.userlogs import get_userlog, set_userlog, userlog_event_types | |||
class ModUserlog(Cog): | |||
def __init__(self, bot): | |||
self.bot = bot | |||
def get_userlog_embed_for_id( | |||
self, uid: str, name: str, own: bool = False, event="" | |||
): | |||
own_note = " Good for you!" if own else "" | |||
wanted_events = ["warns", "bans", "kicks", "mutes"] | |||
if event and not isinstance(event, list): | |||
wanted_events = [event] | |||
embed = discord.Embed(color=discord.Color.dark_red()) | |||
embed.set_author(name=f"Userlog for {name}") | |||
userlog = get_userlog() | |||
if uid not in userlog: | |||
embed.description = f"There are none!{own_note} (no entry)" | |||
embed.color = discord.Color.green() | |||
return embed | |||
for event_type in wanted_events: | |||
if event_type in userlog[uid] and userlog[uid][event_type]: | |||
event_name = userlog_event_types[event_type] | |||
for idx, event in enumerate(userlog[uid][event_type]): | |||
issuer = ( | |||
"" | |||
if own | |||
else f"Issuer: {event['issuer_name']} " | |||