xygt/app/worker.py

198 lines
6.6 KiB
Python
Raw Normal View History

from config import disallowedMimeTypes, Errors, Config
from app.models import User
from app import bcrypt
import secrets
import datetime
import random
2024-01-28 11:27:26 +01:00
import magic
import os
def uploadFile(file, ip, userid, filename, id, retention):
# Is the MIME and file size good?
file_content = file.read()
if magic.from_buffer(file_content, mime=True) not in disallowedMimeTypes:
if file.content_length <= Config.maxFileSize:
# We're going to check whether the id variable has been filled
while True: # Loop to find an available file ID
id = randomHex() # Prevent conflicts if 2 of the same get made
if Config.files.find_one({'id': id}) is None:
2023-12-29 20:47:28 +01:00
filename=id
break
if userid == None:
userid = 0
elif Config.users.find_one({'userid': userid}) == None:
userid = 0
# Calculate retention before the file is written, we'll grab the filesize here as it's needed for the equation.
file.seek(0, os.SEEK_END)
fileSize = round(float(file.tell()) / (1024 * 1024), 2)
# Set the position back to 0
file.seek(0)
if retention == None:
retention = (Config.minretention+(-Config.maxretention + Config.minretention)*pow((fileSize / Config.maxFileSize -1), 3))
elif retention > (Config.minretention+(-Config.maxretention + Config.minretention)*pow((fileSize / Config.maxFileSize -1), 3)):
retention = (Config.minretention+(-Config.maxretention + Config.minretention)*pow((fileSize / Config.maxFileSize -1), 3))
2023-12-29 20:47:28 +01:00
else:
retention = retention
# Create the file
with open(f"{os.path.abspath(Config.fileDir)}/{filename}", "wb") as f:
f.write(file.read())
2024-01-05 15:18:55 +01:00
timestamp = datetime.datetime.now()
timestamp = timestamp.timestamp()
# Create the dictionary that we'll insert into the db
data = {
'id': id,
'filename': filename,
'filesize': fileSize,
'mimetype': magic.from_buffer(file_content, mime=True) if magic.from_buffer(file_content, mime=True) != None else "text/plain",
'retention': retention,
'userid': userid,
'ip': ip,
2024-01-05 15:18:55 +01:00
'date': timestamp,
'expiry': timestamp + retention
}
# Add the data and verify its there.
Config.files.insert_one(data)
print(Config.files.find_one({"id": id}))
return f"https://xygt.cc/{id}", 200
else:
return random.choice(Errors.fileTooLarge), 400
else:
return random.choice(Errors.fileTypeNotAllowed), 400
def shortenURL(url, ip, userid, id, retention):
# We're going to check whether the id variable has been filled
# If not then we'll generate one. (The ID variable will be the same as the filename if not rejected earlier.)
if id == None:
while True: # Loop to find an available file ID
id = randomHex() # Prevent conflicts if 2 of the same get made
if Config.files.find_one({'id': id}) is None:
break
if userid == None:
userid = 0
elif Config.users.find_one({'userid': userid}) == None:
userid = 0
if retention == None:
retention = 604800
elif retention > 31540000:
retention = 31540000
2024-01-05 15:18:55 +01:00
timestamp = datetime.datetime.now()
timestamp = timestamp.timestamp()
data = {
2024-01-05 15:18:55 +01:00
'id': id,
'url': url,
'retention': retention,
'userid': userid,
'ip': ip,
'date': timestamp,
'expiry': timestamp + retention
}
Config.url.insert_one(data)
print(Config.url.find_one({"id": data["id"]}))
return f"https://xygt.cc/{id}", 200
def idInfo(id):
# Check files and url for the ID
if Config.files.find_one({"id": id}) is not None:
2023-12-22 22:16:10 +01:00
check = Config.files.find_one({"id": id}, {'_id': False, "ip": False})
# "ip": False removes the IP from the returned data.
# If it's not there then check url
elif Config.url.find_one({"id": id}) is not None:
2023-12-22 22:16:10 +01:00
check = Config.url.find_one({"id": id}, {'_id': False, "ip": False})
2024-02-26 20:59:36 +01:00
else:
check = {"Error": "File Not Found"}
# Return the mongodb info about the file
return check
2024-02-26 20:59:36 +01:00
def userInfo(id):
# Grab user entry from userID
user = Config.users.find_one({"userid": id})
username = user['username']
userid = id
# Search for all files from that userID
files = Config.files.find({"userid": userid}, {"_id": False, "ip": False})
list = {}
# Create file listing
for file in files:
list.update({
file["id"]: {
"filename": file["filename"],
"mimetype": file["mimetype"],
"filesize": file["filesize"],
"retention": file["retention"],
"creation": file["date"],
"expiry": file["expiry"]
}
})
# Search for all URL's from that userID
url = Config.url.find({"userid": userid})
# Format all into one JSON
return {
"user": {
"username": username,
"userid": userid
},
"files": {
"count": len(files),
"list": list
}
}
def randomHex():
hexRand = ''.join(secrets.choice('0123456789abcdef') for _ in range(6))
return hexRand
def genIDPass():
idpass = ''.join(secrets.choice('0123456789abcdef') for _ in range(16))
return idpass
def registerUser(username, password):
# Initialise some values
try:
level = 1
while True:
userid = randomHex()
if Config.users.find_one({"userid": userid}) is None:
break
idpass = bcrypt.generate_password_hash(randomHex()).decode("utf-8") # The user will not know this, they'll need to generate a new one.
password = bcrypt.generate_password_hash(password).decode("utf-8")
user = User(username, userid, password, idpass, level)
Config.users.insert_one(user.__dict__)
return True
except:
return False
def resetIDPass(userid):
try:
idpass = genIDPass()
hashedPass = bcrypt.generate_password_hash(idpass).decode("utf-8")
Config.users.update_one({"userid": userid}, {"$set": {"idpass": hashedPass}})
return idpass
except:
2024-02-26 20:59:36 +01:00
return False