initial commit

main
ostrichb 3 years ago
commit 6e5d9d1bd8

149
.gitignore vendored

@ -0,0 +1,149 @@
### Example user template template
### Example user template
# IntelliJ project files
.idea
*.iml
out
gen
### Python template
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
o365_token.txt

@ -0,0 +1,20 @@
beautifulsoup4==4.11.1
certifi==2022.9.24
charset-normalizer==2.1.1
func-timeout==4.3.5
idna==3.4
O365==2.0.21
oauthlib==3.2.2
pause==0.3
python-dateutil==2.8.2
pythondialog==3.5.3
pytz==2022.6
pytz-deprecation-shim==0.1.0.post0
requests==2.28.1
requests-oauthlib==1.3.1
six==1.16.0
soupsieve==2.3.2.post1
stringcase==1.2.0
tzdata==2022.6
tzlocal==4.2
urllib3==1.26.12

@ -0,0 +1,161 @@
import logging
import sys
import O365
import traceback
import pause
import dialog
import argparse
from dialog import Dialog
from o365_util import O365_User
if __name__ == '__main__':
# Announce an argument parser
parser = argparse.ArgumentParser(description="Automated API calling program.")
parser.add_argument("-i", "--interactive", action="store_true", help="Config the arguments in interactive mode")
parser.add_argument("--business-hour", action="store_true", help="Only runs the routine in business hours, "
"that is 9AM to 5PM from Monday to Friday.")
parser.add_argument("--interval", type=int, help="Set the interval of routine. "
"(Conflicts with the min and max arguments!)")
parser.add_argument("--min", type=int, help="Set the minimum interval of routine. "
"(Conflicts with the interval arguments!)")
parser.add_argument("--max", type=int, help="Set the maximum interval of routine. "
"(Conflicts with the interval arguments!)")
parser.add_argument("-t", "--temp-file-threshold", type=int, default=5, help="The maximum amount of files "
"stored in your onedrive drive.")
parser.add_argument("--client-id", type=str, help="The client id from graph API.")
parser.add_argument("--secret", type=str, help="The client secret from graph API.")
if not len(sys.argv) == 1:
parsed_arguments = parser.parse_args(sys.argv[1:])
try:
# Check interactive mode first
if parsed_arguments.interactive:
d = Dialog()
d.set_background_title("Office E5 Continuer")
d.infobox("Welcome to use this program!", width=54)
pause.seconds(2)
# Setting the default value
credentials = [("" if not parsed_arguments.client_id else parsed_arguments.client_id),
("" if not parsed_arguments.secret else parsed_arguments.secret)]
routine_mode = ""
interval: int = 0
business_hour: bool = False
threshold: int = 5
while True:
(choice0, credentials) = d.form("Enter the Client ID and secret below:",
width=65,
elements=[
("Client ID", 2, 5,
credentials[0],
2, 16, 41, 0),
("Secret", 4, 5,
credentials[1],
4, 16, 41, 0)
],
title="Setting the API",
form_height=5)
if choice0 == "cancel":
break
(choice1, routine_mode) = d.menu("Choose the routine mode listed below.",
width=54,
choices=[("Fixed", "Fixed interval between each routine."),
("Ranged", "Randomly picks an interval in a range.")],
title="Routine Mode")
if choice1 == "cancel":
break
else:
if routine_mode == "Fixed":
(choice2, interval) = d.inputbox("Enter the interval during each routine, in seconds",
width=54,
init=str(
600 if not parsed_arguments.interval
else parsed_arguments.interval),
title="Setting the interval")
interval = int(interval)
else:
(choice2, interval) = d.form("Enter the minimum and the maximum interval, in seconds",
width=54,
elements=[
("Min", 1, 15,
str(600 if not parsed_arguments.min else
parsed_arguments.min),
1, 23, 10, 0),
("Max", 2, 15,
str(1200 if not parsed_arguments.max else
parsed_arguments.max),
2, 23, 10, 0)
],
title="Setting the interval")
for item in interval:
item = int(item)
if choice2 == "cancel":
break
business_hour = True if d.yesno("Do you wanna enable business hours mode?\n"
"If you enabled this, the API will be only called from 9 to 5, "
"from Monday to Friday.",
width=54) == d.OK else False
(choice4, threshold) = d.inputbox("Enter the maximum file amount stored in the OneDrive.",
width=54,
init=str(5 if not parsed_arguments.temp_file_threshold
else parsed_arguments.temp_file_threshold),
title="Temporary file threshold")
if choice4 == "Cancel":
break
# Conclusion
if d.yesno("You've selected:\n"
"Client ID: {0}\n"
"Secret: {1}***{2}\n"
"Routine Mode: {3}\n"
"Interval: {4} s\n"
"Business Mode: {5}\n"
"Threshold: {6}\n\n"
"Is this OK?".format(credentials[0], credentials[1][0:5],
credentials[1][-5:],
routine_mode,
str(interval) if isinstance(interval, int) else (
"{0} - {1}".format(str(interval[0]),
str(interval[1]))),
"Yes" if business_hour else "No",
str(threshold)
),
width=54,
height=15,
title="Final Check") == d.OK:
break
else:
continue
(client_id, secret) = credentials
account = O365.Account(credentials=(client_id, secret),
scopes=['basic', 'message_all', 'onedrive_all'])
user_obj = O365_User(account)
user_obj.infinite_routine(interval, business_hour, 30, threshold, interactive=True)
else:
if parsed_arguments.client_id is None or parsed_arguments.secret is None:
raise ValueError("Both of the client ID and secret VALUE (not uuid) should be provided!")
if parsed_arguments.interval and (parsed_arguments.min or parsed_arguments.max):
raise ValueError("Intervals and the range cannot be set at the same time!")
if (not (parsed_arguments.min and parsed_arguments.max)) and (not parsed_arguments.interval):
raise ValueError("The minimum and the maximum should be set at the same time. "
"If you wanna use a fixed interval, use --interval instead.")
account = O365.Account(credentials=(parsed_arguments.client_id, parsed_arguments.secret),
scopes=['basic', 'message_all', 'onedrive_all'])
user_obj = O365_User(account)
user_obj.infinite_routine(int(parsed_arguments.interval) if parsed_arguments.interval else
(int(parsed_arguments.min), int(parsed_arguments.max)),
parsed_arguments.business_hour,
30,
int(parsed_arguments.temp_file_threshold))
except dialog.PythonDialogBug:
logging.error("Cannot use dialogs here, try to change to a terminal or disable interactive mode. Make sure "
"that UNIX dialog have been installed in your machine.")
except ValueError as e:
if parsed_arguments.interactive:
d = Dialog()
d.msgbox(str(e), width=54, title="Error")
logging.error("{0}: {1}".format(type(e).__name__, e))
logging.error(traceback.format_exc())
except Exception as e:
logging.error("{0}: {1}".format(type(e).__name__, e))
logging.error(traceback.format_exc())
else:
parser.print_help()

@ -0,0 +1,181 @@
import os
import time
import logging
import O365
import pathlib
import random
import pause
import readline
import datetime as dt
from O365 import drive as d
from O365 import mailbox as mb
from func_timeout import func_timeout, FunctionTimedOut
from datetime import datetime, timedelta
from dialog import Dialog
log_format = "[%(asctime)s][%(filename)s][%(levelname)s] %(message)s"
logging.basicConfig(level=logging.INFO, format=log_format, datefmt="%Y-%m-%d %H:%M:%S")
def get_modified_time(file: d.DriveItem):
return file.modified
class O365_User:
def __init__(self, account: O365.Account = None):
# Authenticate the session
self.account = account # Pass the account object to the Class
if self.account is None:
raise ValueError("The account cannot be none!")
if not self.account.is_authenticated:
if self.account.authenticate():
logging.info("Authenticate success.")
else:
logging.error("There is a problem during the authentication.")
def single_routine(self, temp_file_threshold: int = 5, is_interactive: bool = False):
# This operation will be executed within route.
dlg = Dialog()
if not self.account.is_authenticated:
raise EnvironmentError("The account is not authenticated! You need to authenticate it first.")
# Infinite circulation
try:
logging.info("Getting user's mailbox")
if is_interactive:
dlg.gauge_start("Getting user's mailbox...", width=54, percent=0, title="Executing...")
mailbox = self.account.mailbox()
inbox_folder: mb.Folder = mailbox.inbox_folder()
assert isinstance(inbox_folder, mb.Folder), "inbox_folder is not a Mailbox instance."
logging.info("Getting recent messages")
if is_interactive:
dlg.gauge_update(20, "Getting recent messages...", update_text=True)
message_list: list[O365.Message] = inbox_folder.get_messages(limit=10, download_attachments=False)
# assert isinstance(message_list, ), "message_list is not a list instance."
output_str = "10 recent messages: \n" \
"=================== \n" \
""
count = 0
for message in message_list:
count += 1
output_str += "[{:2}]\n".format(str(count))
output_str += "Subject: {0}\n".format(message.subject or "None")
output_str += "Contents: \n{0}\n\n".format(message.get_body_text())
output_str += "Grubbed at {0}.".format(time.asctime())
output_str.replace(" ", "")
if is_interactive:
dlg.gauge_update(40, "Writing into a file...", update_text=True)
main_path = pathlib.Path().resolve().parent
if not (main_path.joinpath('temp').exists() or main_path.joinpath('temp').exists()):
# Create a new path if it is not shown
main_path.joinpath('temp').mkdir(exist_ok=False)
temp_path = main_path.joinpath('temp')
file_path = temp_path.joinpath('mailoutput_{0}{1:02}{2:02}{3:02}{4:02}{5:02}.txt'.format(
time.gmtime().tm_year, time.gmtime().tm_mon, time.gmtime().tm_mday,
time.gmtime().tm_hour, time.gmtime().tm_min, time.gmtime().tm_sec
))
with open(file_path, mode="w") as f:
f.writelines(output_str)
f.close()
logging.info("Successfully written the log into a file.")
logging.info("Getting user's OneDrive drive")
if is_interactive:
dlg.gauge_update(60, "Getting user's OneDrive drive...", update_text=True)
drive: d.Drive = self.account.storage().get_default_drive()
assert isinstance(drive, d.Drive), "storage is not a Storage instance!"
root_folder: d.Folder = drive.get_root_folder()
if is_interactive:
dlg.gauge_update(80, "Uploading file...", update_text=True)
folder_list = root_folder.get_child_folders(limit=500)
folder_obj = [x for x in folder_list]
temp_folder = root_folder
if "Temp" not in [x.name for x in folder_obj]:
temp_folder: d.Folder = root_folder.create_child_folder("Temp")
else:
for x in folder_obj:
if x.name == "Temp":
temp_folder = x
break
if not temp_folder.upload_file(file_path) is None:
logging.info("Upload success!")
if is_interactive:
dlg.gauge_update(95, "Upload success, check if the file exceeds the threshold...", update_text=True)
# Check if the file amount exceed the threshold
temp_file_list: list[dlg.DriveItem] = [x for x in temp_folder.get_items(limit=500) if not x.is_folder]
if len(temp_file_list) > temp_file_threshold:
temp_file_list.sort(key=get_modified_time, reverse=True)
for i in range(5, len(temp_file_list)):
temp_file_list[i].delete()
logging.info("Temp file's amount exceeded, deleted the outdated file.")
else:
logging.error("Upload failed.")
if is_interactive:
dlg.gauge_update(80, "Update failed.", update_text=True)
if file_path.exists():
os.remove(file_path) # Remove the temporary file
logging.info("Circulation finished.")
if is_interactive:
dlg.gauge_update(100, "Circulation finished.", update_text=True)
dlg.gauge_stop()
except Exception as e:
if is_interactive:
dlg.gauge_stop()
logging.error(e)
logging.error("AN error occurred, the circulation will be jumped "
"to the end and wait for the next turn.")
def infinite_routine(self, interval=(600, 1200),
business_hours: bool = False,
routine_timeout: int = 30,
temp_file_threshold: int = 5, **kwargs):
is_interactive = False
dlg = Dialog()
for key, value in kwargs.items():
if key == "interactive":
is_interactive = value
if not self.account.is_authenticated:
if is_interactive:
dlg.msgbox("The account is not authenticated! You need to authenticate it first.")
raise EnvironmentError("The account is not authenticated! You need to authenticate it first.")
while True:
try:
func_timeout(routine_timeout, self.single_routine, kwargs={'temp_file_threshold': temp_file_threshold,
'is_interactive': is_interactive})
except FunctionTimedOut:
logging.error("Timeout exceeded, the process will be jumped to the next routine.")
finally:
if business_hours and \
not (9 <= time.gmtime().tm_hour < 17 and 0 <= time.gmtime().tm_wday <= 4):
# Not in business hours
next_routine_until = datetime.combine(dt.date.today(), dt.time.min) + timedelta(days=1, hours=9)
while next_routine_until.weekday() not in range(0, 5):
next_routine_until += timedelta(days=1)
logging.info("Out of the business time, the next routine will be executed at {0}".
format(next_routine_until.ctime()))
if is_interactive:
dlg.infobox("Out of the business time, the next routine will be executed at {0}".
format(next_routine_until.ctime()), width=54, title="Info")
pause.until(next_routine_until.timestamp())
else:
if isinstance(interval, int):
logging.info(
"The next routine will be executed at {0}".format(time.ctime(time.time() + interval)))
if is_interactive:
dlg.infobox(
"The next routine will be executed at {0}".format(time.ctime(time.time() + interval)),
width=54, title="Info")
pause.until(time.time() + interval)
if isinstance(interval, tuple):
if not len(interval) == 2:
raise ValueError("The interval has a bad format.")
(min_interval, max_interval) = interval
random_interval = random.randint(min_interval if min_interval <= max_interval else max_interval,
max_interval if min_interval <= max_interval else min_interval)
logging.info(
"The next routine will be executed at {0}".format(
time.ctime(time.time() + random_interval)))
pause.until(time.time() + random_interval)
Loading…
Cancel
Save