mirror of
https://github.com/jackeilles/xygt.git
synced 2024-11-29 10:12:39 +01:00
194 lines
6.6 KiB
Python
194 lines
6.6 KiB
Python
from config import disallowedMimeTypes, Errors, Config
|
|
from app.models import User
|
|
from app import bcrypt
|
|
import secrets
|
|
import datetime
|
|
import random
|
|
import magic
|
|
import os
|
|
|
|
def uploadFile(file, ip, userid, filename, id, retention):
|
|
|
|
# Is the MIME and file size good?
|
|
file_content = file.read()
|
|
if magic.from_buffer(file_content, mime=True) not in disallowedMimeTypes:
|
|
if file.content_length <= Config.maxFileSize:
|
|
# We're going to check whether the id variable has been filled
|
|
|
|
while True: # Loop to find an available file ID
|
|
id = randomHex() # Prevent conflicts if 2 of the same get made
|
|
if Config.files.find_one({'id': id}) is None:
|
|
filename=id
|
|
break
|
|
|
|
if userid == None:
|
|
userid = 0
|
|
elif Config.users.find_one({'userid': userid}) == None:
|
|
userid = 0
|
|
|
|
# Calculate retention before the file is written, we'll grab the filesize here as it's needed for the equation.
|
|
file.seek(0, os.SEEK_END)
|
|
fileSize = round(float(file.tell()) / (1024 * 1024), 2)
|
|
|
|
# Set the position back to 0
|
|
file.seek(0)
|
|
|
|
if retention == None:
|
|
retention = (Config.minretention+(-Config.maxretention + Config.minretention)*pow((fileSize / Config.maxFileSize -1), 3))
|
|
elif retention > (Config.minretention+(-Config.maxretention + Config.minretention)*pow((fileSize / Config.maxFileSize -1), 3)):
|
|
retention = (Config.minretention+(-Config.maxretention + Config.minretention)*pow((fileSize / Config.maxFileSize -1), 3))
|
|
else:
|
|
retention = retention
|
|
|
|
|
|
# Create the file
|
|
with open(f"{os.path.abspath(Config.fileDir)}/{filename}", "wb") as f:
|
|
f.write(file.read())
|
|
|
|
timestamp = datetime.datetime.now()
|
|
timestamp = timestamp.timestamp()
|
|
|
|
# Create the dictionary that we'll insert into the db
|
|
data = {
|
|
'id': id,
|
|
'filename': filename,
|
|
'filesize': fileSize,
|
|
'mimetype': magic.from_buffer(file_content, mime=True) if magic.from_buffer(file_content, mime=True) != None else "text/plain",
|
|
'retention': retention,
|
|
'userid': userid,
|
|
'ip': ip,
|
|
'date': timestamp,
|
|
'expiry': timestamp + retention
|
|
}
|
|
|
|
# Add the data and verify its there.
|
|
Config.files.insert_one(data)
|
|
print(Config.files.find_one({"id": id}))
|
|
|
|
return f"https://xygt.cc/{id}", 200
|
|
else:
|
|
return random.choice(Errors.fileTooLarge), 400
|
|
else:
|
|
return random.choice(Errors.fileTypeNotAllowed), 400
|
|
|
|
def shortenURL(url, ip, userid, id, retention):
|
|
# We're going to check whether the id variable has been filled
|
|
# If not then we'll generate one. (The ID variable will be the same as the filename if not rejected earlier.)
|
|
if id == None:
|
|
while True: # Loop to find an available file ID
|
|
id = randomHex() # Prevent conflicts if 2 of the same get made
|
|
if Config.files.find_one({'id': id}) is None:
|
|
break
|
|
|
|
if userid == None:
|
|
userid = 0
|
|
elif Config.users.find_one({'userid': userid}) == None:
|
|
userid = 0
|
|
|
|
if retention == None:
|
|
retention = 604800
|
|
elif retention > 31540000:
|
|
retention = 31540000
|
|
|
|
timestamp = datetime.datetime.now()
|
|
timestamp = timestamp.timestamp()
|
|
|
|
data = {
|
|
'id': id,
|
|
'url': url,
|
|
'retention': retention,
|
|
'userid': userid,
|
|
'ip': ip,
|
|
'date': timestamp,
|
|
'expiry': timestamp + retention
|
|
}
|
|
|
|
Config.url.insert_one(data)
|
|
print(Config.url.find_one({"id": data["id"]}))
|
|
|
|
return f"https://xygt.cc/{id}", 200
|
|
|
|
def idInfo(id):
|
|
# Check files and url for the ID
|
|
if Config.files.find_one({"id": id}) is not None:
|
|
check = Config.files.find_one({"id": id}, {'_id': False, "ip": False})
|
|
# "ip": False removes the IP from the returned data.
|
|
# If it's not there then check url
|
|
elif Config.url.find_one({"id": id}) is not None:
|
|
check = Config.url.find_one({"id": id}, {'_id': False, "ip": False})
|
|
|
|
# Return the mongodb info about the file, removing IP if its present
|
|
return check
|
|
|
|
def userInfo(id):
|
|
# Grab user entry from userID
|
|
user = Config.users.find_one({"userid": id})
|
|
|
|
username = user['user']
|
|
userid = id
|
|
|
|
# Search for all files from that userID
|
|
files = Config.files.find_many({"userid": userid}, {"_id": False, "ip": False})
|
|
list = {}
|
|
|
|
# Create file listing
|
|
for file in files:
|
|
list.update({
|
|
file["id"]: {
|
|
"filename": file["filename"],
|
|
"mimetype": file["mimetype"],
|
|
"filesize": file["filesize"],
|
|
"retention": file["retention"],
|
|
"creation": file["date"],
|
|
"expiry": file["expiry"]
|
|
}
|
|
})
|
|
|
|
# Search for all URL's from that userID
|
|
url = Config.url.find_many({"userid": userid})
|
|
|
|
# Format all into one JSON
|
|
return {
|
|
"user": {
|
|
"username": username,
|
|
"userid": userid
|
|
},
|
|
"files": {
|
|
"count": len(files),
|
|
"list": list
|
|
}
|
|
}
|
|
|
|
def randomHex():
|
|
hexRand = ''.join(secrets.choice('0123456789abcdef') for _ in range(6))
|
|
return hexRand
|
|
|
|
def genIDPass():
|
|
idpass = ''.join(secrets.choice('0123456789abcdef') for _ in range(16))
|
|
return idpass
|
|
|
|
def registerUser(username, password):
|
|
# Initialise some values
|
|
try:
|
|
level = 1
|
|
while True:
|
|
userid = randomHex()
|
|
if Config.users.find_one({"userid": userid}) is None:
|
|
break
|
|
idpass = bcrypt.generate_password_hash(randomHex()).decode("utf-8") # The user will not know this, they'll need to generate a new one.
|
|
password = bcrypt.generate_password_hash(password).decode("utf-8")
|
|
user = User(username, userid, password, idpass, level)
|
|
Config.users.insert_one(user.__dict__)
|
|
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
def resetIDPass(userid):
|
|
try:
|
|
idpass = genIDPass()
|
|
hashedPass = bcrypt.generate_password_hash(idpass).decode("utf-8")
|
|
Config.users.update_one({"userid": userid}, {"$set": {"idpass": hashedPass}})
|
|
return idpass
|
|
except:
|
|
return False
|