Browse Source

Initial commit

main
Ave O 2 years ago
commit
05d83f2553
No known key found for this signature in database GPG Key ID: 9356ABAA42C842B
9 changed files with 672 additions and 0 deletions
  1. +100
    -0
      .gitignore
  2. +21
    -0
      LICENSE
  3. +3
    -0
      README.md
  4. +4
    -0
      botbase.ini.example
  5. +160
    -0
      botbase.py
  6. +170
    -0
      cogs/admin.py
  7. +42
    -0
      cogs/basic.py
  8. +166
    -0
      cogs/common.py
  9. +6
    -0
      requirements.txt

+ 100
- 0
.gitignore View File

@@ -0,0 +1,100 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
.hypothesis/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# IPython Notebook
.ipynb_checkpoints

# pyenv
.python-version

# celery beat schedule file
celerybeat-schedule

# dotenv
.env

# virtualenv
venv/
ENV/

# Spyder project settings
.spyderproject

# Rope project settings
.ropeproject

# botbase stuff
# *.log # mentioned above on django.
*.ini
files/*

# pycharm
.idea
*.ttf

priv-*

+ 21
- 0
LICENSE View File

@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2018 Arda "Ave" Ozkal

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

+ 3
- 0
README.md View File

@@ -0,0 +1,3 @@
# BotBase

A crappy discord.py@rewrite bot base.

+ 4
- 0
botbase.ini.example View File

@@ -0,0 +1,4 @@
[base]
prefix = bb!
token = token_goes_here
description = Your bot description goes here.

+ 160
- 0
botbase.py View File

@@ -0,0 +1,160 @@
import os
import sys
import logging
import logging.handlers
import traceback
import configparser
from pathlib import Path
import aiohttp

import discord
from discord.ext import commands

script_name = os.path.basename(__file__).split('.')[0]

log_file_name = f"{script_name}.log"

# Limit of discord (non-nitro) is 8MB (not MiB)
max_file_size = 1000 * 1000 * 8
backup_count = 10000 # random big number
file_handler = logging.handlers.RotatingFileHandler(
filename=log_file_name, maxBytes=max_file_size, backupCount=backup_count)
stdout_handler = logging.StreamHandler(sys.stdout)

log_format = logging.Formatter(
'[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s')
file_handler.setFormatter(log_format)
stdout_handler.setFormatter(log_format)

log = logging.getLogger('discord')
log.setLevel(logging.INFO)
log.addHandler(file_handler)
log.addHandler(stdout_handler)

config = configparser.ConfigParser()
config.read(f"{script_name}.ini")


def get_prefix(bot, message):
prefixes = [config['base']['prefix']]

return commands.when_mentioned_or(*prefixes)(bot, message)


initial_extensions = ['cogs.common',
'cogs.admin',
'cogs.basic']

bot = commands.Bot(command_prefix=get_prefix,
description=config['base']['description'], pm_help=None)

bot.log = log
bot.config = config
bot.script_name = script_name

if __name__ == '__main__':
for extension in initial_extensions:
try:
bot.load_extension(extension)
except Exception as e:
log.error(f'Failed to load extension {extension}.', file=sys.stderr)
log.error(traceback.print_exc())


@bot.event
async def on_ready():
aioh = {"User-Agent": f"{script_name}/1.0'"}
bot.aiosession = aiohttp.ClientSession(headers=aioh)
bot.app_info = await bot.application_info()

log.info(f'\nLogged in as: {bot.user.name} - '
f'{bot.user.id}\ndpy version: {discord.__version__}\n')
game_name = f"{config['base']['prefix']}help"
await bot.change_presence(game=discord.Game(name=game_name))


@bot.event
async def on_command(ctx):
log_text = f"{ctx.message.author} ({ctx.message.author.id}): "\
f"\"{ctx.message.content}\" "
if ctx.guild: # was too long for tertiary if
log_text += f"on \"{ctx.channel.name}\" ({ctx.channel.id}) "\
f"at \"{ctx.guild.name}\" ({ctx.guild.id})"
else:
log_text += f"on DMs ({ctx.channel.id})"
log.info(log_text)


@bot.event
async def on_error(event_method, *args, **kwargs):
log.error(f"Error on {event_method}: {sys.exc_info()}")


@bot.event
async def on_command_error(ctx, error):
log.error(f"Error with \"{ctx.message.content}\" from "
f"\"{ctx.message.author}\ ({ctx.message.author.id}) "
f"of type {type(error)}: {error}")

if isinstance(error, commands.NoPrivateMessage):
return await ctx.send("This command doesn't work on DMs.")
elif isinstance(error, commands.MissingPermissions):
roles_needed = '\n- '.join(error.missing_perms)
return await ctx.send(f"{ctx.author.mention}: You don't have the right"
" permissions to run this command. You need: "
f"```- {roles_needed}```")
elif isinstance(error, commands.BotMissingPermissions):
roles_needed = '\n-'.join(error.missing_perms)
return await ctx.send(f"{ctx.author.mention}: Bot doesn't have "
"the right permissions to run this command. "
"Please add the following roles: "
f"```- {roles_needed}```")
elif isinstance(error, commands.CommandOnCooldown):
return await ctx.send(f"{ctx.author.mention}: You're being "
"ratelimited. Try in "
f"{error.retry_after:.1f} seconds.")
elif isinstance(error, commands.CheckFailure):
return await ctx.send(f"{ctx.author.mention}: Check failed. "
"You might not have the right permissions "
"to run this command.")

help_text = f"Usage of this command is: ```{ctx.prefix}"\
f"{ctx.command.signature}```\nPlease see `{ctx.prefix}help "\
f"{ctx.command.name}` for more info about this command."
if isinstance(error, commands.BadArgument):
return await ctx.send(f"{ctx.author.mention}: You gave incorrect "
f"arguments. {help_text}")
elif isinstance(error, commands.MissingRequiredArgument):
return await ctx.send(f"{ctx.author.mention}: You gave incomplete "
f"arguments. {help_text}")


@bot.event
async def on_guild_join(guild):
bot.log.info(f"Joined guild \"{guild.name}\" ({guild.id}).")
await guild.owner.send(f"Hello and welcome to {script_name}!\n"
"If you don't know why you're getting this message"
f", it's because someone added {script_name} to your"
" server\nDue to Discord API ToS, I am required to "
"inform you that **I log command usages and "
"errors**.\n**I don't log *anything* else**."
"\n\nIf you do not agree to be logged, stop"
f" using {script_name} and remove it from your "
"server as soon as possible.")


@bot.event
async def on_message(message):
if message.author.bot:
return

ctx = await bot.get_context(message)
await bot.invoke(ctx)

if not Path(f"{script_name}.ini").is_file():
log.warning(
f"No config file ({script_name}.ini) found, "
f"please create one from {script_name}.ini.example file.")
exit(3)

bot.run(config['base']['token'], bot=True, reconnect=True)

+ 170
- 0
cogs/admin.py View File

@@ -0,0 +1,170 @@
import discord
from discord.ext import commands
import traceback
import inspect
import re


class AdminCog:
def __init__(self, bot):
self.bot = bot
self.last_eval_result = None
self.previous_eval_code = None

@commands.is_owner()
@commands.command(aliases=['echo'], hidden=True)
async def say(self, ctx, *, the_text: str):
"""Repeats a given text."""
await ctx.send(the_text)

@commands.is_owner()
@commands.command(name='exit', hidden=True)
async def _exit(self, ctx):
"""Shuts down the bot, owner only."""
await ctx.send(":wave: Exiting bot, goodbye!")
await self.bot.logout()

@commands.is_owner()
@commands.command(hidden=True)
async def load(self, ctx, ext: str):
"""Loads a cog, owner only."""
try:
self.bot.load_extension("cogs." + ext)
except:
await ctx.send(f':x: Cog loading failed, traceback: '
f'```\n{traceback.format_exc()}\n```')
return
self.bot.log.info(f'Loaded ext {ext}')
await ctx.send(f':white_check_mark: `{ext}` successfully loaded.')

@commands.is_owner()
@commands.command(hidden=True)
async def fetchlog(self, ctx):
"""Returns log"""
await ctx.send(file=discord.File(f"{self.bot.script_name}.log"),
content="Here's the current log file:")

@commands.is_owner()
@commands.command(name='eval', hidden=True)
async def _eval(self, ctx, *, code: str):
"""Evaluates some code (Owner only)"""
try:
code = code.strip('` ')

env = {
'bot': self.bot,
'ctx': ctx,
'message': ctx.message,
'server': ctx.guild,
'guild': ctx.guild,
'channel': ctx.message.channel,
'author': ctx.message.author,

# modules
'discord': discord,
'commands': commands,

# utilities
'_get': discord.utils.get,
'_find': discord.utils.find,

# last result
'_': self.last_eval_result,
'_p': self.previous_eval_code,
}
env.update(globals())

self.bot.log.info(f"Evaling {repr(code)}:")
result = eval(code, env)
if inspect.isawaitable(result):
result = await result

if result is not None:
self.last_eval_result = result

self.previous_eval_code = code

sliced_message = await self.bot.slice_message(repr(result),
prefix="```",
suffix="```")
for msg in sliced_message:
await ctx.send(msg)
except:
sliced_message = \
await self.bot.slice_message(traceback.format_exc(),
prefix="```",
suffix="```")
for msg in sliced_message:
await ctx.send(msg)

@commands.is_owner()
@commands.command(hidden=True)
async def pull(self, ctx, auto=False):
"""Does a git pull (Owner only)."""
tmp = await ctx.send('Pulling...')
git_output = await self.bot.async_call_shell("git pull")
await tmp.edit(content=f"Pull complete. Output: ```{git_output}```")
if auto:
cogs_to_reload = re.findall(r'cogs/([a-z]*).py[ ]*\|', git_output)
for cog in cogs_to_reload:
try:
self.bot.unload_extension("cogs." + cog)
self.bot.load_extension("cogs." + cog)
self.bot.log.info(f'Reloaded ext {cog}')
await ctx.send(f':white_check_mark: `{cog}` '
'successfully reloaded.')
except:
await ctx.send(f':x: Cog reloading failed, traceback: '
'```\n{traceback.format_exc()}\n```')
return

@commands.is_owner()
@commands.command(hidden=True)
async def sh(self, ctx, *, command: str):
"""Runs a command on shell."""
command = command.strip('`')
tmp = await ctx.send(f'Running `{command}`...')
self.bot.log.info(f"Running {command}")
shell_output = await self.bot.async_call_shell(command)
shell_output = f"\"{command}\" output:\n\n{shell_output}"
self.bot.log.info(shell_output)
sliced_message = await self.bot.slice_message(shell_output,
prefix="```",
suffix="```")
if len(sliced_message) == 1:
await tmp.edit(content=sliced_message[0])
return
await tmp.delete()
for msg in sliced_message:
await ctx.send(msg)

@commands.is_owner()
@commands.command(hidden=True)
async def unload(self, ctx, ext: str):
"""Unloads a cog, owner only."""
self.bot.unload_extension("cogs." + ext)
self.bot.log.info(f'Unloaded ext {ext}')
await ctx.send(f':white_check_mark: `{ext}` successfully unloaded.')

@commands.is_owner()
@commands.command(hidden=True)
async def reload(self, ctx, ext="_"):
"""Reloads a cog, owner only."""
if ext == "_":
ext = self.lastreload
else:
self.lastreload = ext

try:
self.bot.unload_extension("cogs." + ext)
self.bot.load_extension("cogs." + ext)
except:
await ctx.send(f':x: Cog reloading failed, traceback: '
f'```\n{traceback.format_exc()}\n```')
return
self.bot.log.info(f'Reloaded ext {ext}')
await ctx.send(f':white_check_mark: `{ext}` successfully reloaded.')


def setup(bot):
bot.add_cog(AdminCog(bot))

+ 42
- 0
cogs/basic.py View File

@@ -0,0 +1,42 @@
import time

from discord.ext import commands


class Basic:
def __init__(self, bot):
self.bot = bot

@commands.command()
async def invite(self, ctx):
"""Sends an invite to add the bot"""
await ctx.send(f"{ctx.author.mention}: You can use "
"<https://discordapp.com/api/oauth2/authorize?"
f"client_id={self.bot.user.id}"
"&permissions=268435456&scope=bot> "
"to add RoleBot to your guild.")

@commands.command()
async def hello(self, ctx):
"""Says hello. Duh."""
await ctx.send(f"Hello {ctx.author.mention}!")

@commands.command(aliases=['p'])
async def ping(self, ctx):
"""Shows ping values to discord.

RTT = Round-trip time, time taken to send a message to discord
GW = Gateway Ping"""
before = time.monotonic()
tmp = await ctx.send('Calculating ping...')
after = time.monotonic()
rtt_ms = (after - before) * 1000
gw_ms = self.bot.latency * 1000

message_text = f":ping_pong: rtt: `{rtt_ms:.1f}ms`, `gw: {gw_ms:.1f}ms`"
self.bot.log.info(message_text)
await tmp.edit(content=message_text)


def setup(bot):
bot.add_cog(Basic(bot))

+ 166
- 0
cogs/common.py View File

@@ -0,0 +1,166 @@
import asyncio
import traceback
import datetime
import humanize


class Common:
def __init__(self, bot):
self.bot = bot

self.bot.async_call_shell = self.async_call_shell
self.bot.slice_message = self.slice_message
self.max_split_length = 3
self.bot.hex_to_int = self.hex_to_int
self.bot.download_file = self.download_file
self.bot.aiojson = self.aiojson
self.bot.aioget = self.aioget
self.bot.aiogetbytes = self.aiogetbytes
self.bot.get_relative_timestamp = self.get_relative_timestamp


def get_relative_timestamp(self, time_from=None, time_to=None,
humanized=False, include_from=False,
include_to=False):
# Setting default value to utcnow() makes it show time from cog load
# which is not what we want
if not time_from:
time_from = datetime.datetime.utcnow()
if not time_to:
time_to = datetime.datetime.utcnow()
if humanized:
humanized_string = humanize.naturaltime(time_to - time_from)
if include_from and include_to:
str_with_from_and_to = f"{humanized_string} "\
f"({str(time_from).split('.')[0]} "\
f"- {str(time_to).split('.')[0]})"
return str_with_from_and_to
elif include_from:
str_with_from = f"{humanized_string} "\
f"({str(time_from).split('.')[0]})"
return str_with_from
elif include_to:
str_with_to = f"{humanized_string} ({str(time_to).split('.')[0]})"
return str_with_to
return humanized_string
else:
epoch = datetime.datetime.utcfromtimestamp(0)
epoch_from = (time_from - epoch).total_seconds()
epoch_to = (time_to - epoch).total_seconds()
second_diff = epoch_to - epoch_from
result_string = str(datetime.timedelta(
seconds=second_diff)).split('.')[0]
return result_string

async def aioget(self, url):
try:
data = await self.bot.aiosession.get(url)
if data.status == 200:
text_data = await data.text()
self.bot.log.info(f"Data from {url}: {text_data}")
return text_data
else:
self.bot.log.error(f"HTTP Error {data.status} "
"while getting {url}")
except:
self.bot.log.error(f"Error while getting {url} "
f"on aiogetbytes: {traceback.format_exc()}")

async def aiogetbytes(self, url):
try:
data = await self.bot.aiosession.get(url)
if data.status == 200:
byte_data = await data.read()
self.bot.log.debug(f"Data from {url}: {byte_data}")
return byte_data
else:
self.bot.log.error(f"HTTP Error {data.status} "
"while getting {url}")
except:
self.bot.log.error(f"Error while getting {url} "
f"on aiogetbytes: {traceback.format_exc()}")

async def aiojson(self, url):
try:
data = await self.bot.aiosession.get(url)
if data.status == 200:
text_data = await data.text()
self.bot.log.info(f"Data from {url}: {text_data}")
content_type = data.headers['Content-Type']
return await data.json(content_type=content_type)
else:
self.bot.log.error(f"HTTP Error {data.status} "
"while getting {url}")
except:
self.bot.log.error(f"Error while getting {url} "
f"on aiogetbytes: {traceback.format_exc()}")

def hex_to_int(self, color_hex: str):
"""Turns a given hex color into an integer"""
return int("0x" + color_hex.strip('#'), 16)

# This function is based on https://stackoverflow.com/a/35435419/3286892
# by link2110 (https://stackoverflow.com/users/5890923/link2110)
# modified by Ave (https://github.com/aveao), licensed CC-BY-SA 3.0
async def download_file(self, url, local_filename):
file_resp = await self.bot.aiosession.get(url)
file = await file_resp.read()
with open(local_filename, "wb") as f:
f.write(file)

# 2000 is maximum limit of discord
async def slice_message(self, text, size=2000, prefix="", suffix=""):
"""Slices a message into multiple messages"""
if len(text) > size * self.max_split_length:
haste_url = await self.haste(text)
return [f"Message is too long ({len(text)} > "
f"{size * self.max_split_length} "
f"({size} * {self.max_split_length}))"
f", go to haste: <{haste_url}>"]
reply_list = []
size_wo_fix = size - len(prefix) - len(suffix)
while len(text) > size_wo_fix:
reply_list.append(f"{prefix}{text[:size_wo_fix]}{suffix}")
text = text[size_wo_fix:]
reply_list.append(f"{prefix}{text}{suffix}")
return reply_list

async def haste(self, text, instance='https://hastebin.com/'):
response = await self.bot.aiosession.post(f"{instance}documents",
data=text)
if response.status == 200:
result_json = await response.json()
return f"{instance}{result_json['key']}"

async def async_call_shell(self, shell_command: str,
inc_stdout=True, inc_stderr=True):
pipe = asyncio.subprocess.PIPE
proc = await asyncio.create_subprocess_shell(str(shell_command),
stdout=pipe,
stderr=pipe)

if not (inc_stdout or inc_stderr):
return "??? you set both stdout and stderr to False????"

proc_result = await proc.communicate()
stdout_str = proc_result[0].decode('utf-8').strip()
stderr_str = proc_result[1].decode('utf-8').strip()

if inc_stdout and not inc_stderr:
return stdout_str
elif inc_stderr and not inc_stdout:
return stderr_str

if stdout_str and stderr_str:
return f"stdout:\n\n{stdout_str}\n\n"\
f"======\n\nstderr:\n\n{stderr_str}"
elif stdout_str:
return f"stdout:\n\n{stdout_str}"
elif stderr_str:
return f"stderr:\n\n{stderr_str}"

return "No output."


def setup(bot):
bot.add_cog(Common(bot))

+ 6
- 0
requirements.txt View File

@@ -0,0 +1,6 @@
git+https://github.com/Rapptz/discord.py@rewrite

asyncio==3.4.3
python-dateutil==2.6.1
humanize==0.5.1
aiohttp==3.0.7

Loading…
Cancel
Save