You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

193 lines
9.7 KiB

This file contains invisible Unicode characters!

This file contains invisible Unicode characters that may be processed differently from what appears below. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to reveal hidden characters.

import os
import time
import logging
import O365
import pathlib
import random
import pause
import readline
import traceback
import datetime as dt
from O365 import drive as d
from O365 import mailbox as mb
from func_timeout import func_timeout, FunctionTimedOut
from datetime import datetime, timedelta
from dialog import Dialog
log_format = "[%(asctime)s][%(filename)s][%(levelname)s] %(message)s"
logging.basicConfig(level=logging.INFO, format=log_format, datefmt="%Y-%m-%d %H:%M:%S")
def get_modified_time(file: d.DriveItem):
return file.modified
class O365_User:
def __init__(self, account: O365.Account = None):
# Authenticate the session
self.account = account # Pass the account object to the Class
if self.account is None:
raise ValueError("The account cannot be none!")
if not self.account.is_authenticated:
if self.account.authenticate():
logging.info("Authenticate success.")
else:
logging.error("There is a problem during the authentication.")
def single_routine(self, temp_file_threshold: int = 5, is_interactive: bool = False):
# This operation will be executed within route.
dlg = Dialog()
if not self.account.is_authenticated:
raise EnvironmentError("The account is not authenticated! You need to authenticate it first.")
# Infinite circulation
try:
logging.info("Getting user's mailbox")
if is_interactive:
dlg.gauge_start("Getting user's mailbox...", width=54, percent=0, title="Executing...")
mailbox = self.account.mailbox()
inbox_folder: mb.Folder = mailbox.inbox_folder()
assert isinstance(inbox_folder, mb.Folder), "inbox_folder is not a Mailbox instance."
logging.info("Getting recent messages")
if is_interactive:
dlg.gauge_update(20, "Getting recent messages...", update_text=True)
message_list: list[O365.Message] = inbox_folder.get_messages(limit=10, download_attachments=False)
# assert isinstance(message_list, ), "message_list is not a list instance."
output_str = "10 recent messages: \n" \
"=================== \n" \
""
count = 0
for message in message_list:
count += 1
output_str += "[{:2}]\n".format(str(count))
output_str += "Subject: {0}\n".format(message.subject or "None")
output_str += "Contents: \n{0}\n\n".format(message.get_body_text())
output_str += "Grubbed at {0}.".format(time.asctime())
output_str.replace(" ", "")
if is_interactive:
dlg.gauge_update(40, "Writing into a file...", update_text=True)
main_path = pathlib.Path().resolve().parent
if not (main_path.joinpath('temp').exists() or main_path.joinpath('temp').exists()):
# Create a new path if it is not shown
main_path.joinpath('temp').mkdir(exist_ok=False)
temp_path = main_path.joinpath('temp')
file_path = temp_path.joinpath('mailoutput_{0}{1:02}{2:02}{3:02}{4:02}{5:02}.txt'.format(
time.gmtime().tm_year, time.gmtime().tm_mon, time.gmtime().tm_mday,
time.gmtime().tm_hour, time.gmtime().tm_min, time.gmtime().tm_sec
))
with open(file_path, mode="w") as f:
f.writelines(output_str)
f.close()
logging.info("Successfully written the log into a file.")
logging.info("Getting user's OneDrive drive")
if is_interactive:
dlg.gauge_update(60, "Getting user's OneDrive drive...", update_text=True)
drive: d.Drive = self.account.storage().get_default_drive()
assert isinstance(drive, d.Drive), "storage is not a Storage instance!"
root_folder: d.Folder = drive.get_root_folder()
if is_interactive:
dlg.gauge_update(80, "Uploading file...", update_text=True)
folder_list = root_folder.get_child_folders(limit=500)
folder_obj = [x for x in folder_list]
temp_folder = root_folder
if "Temp" not in [x.name for x in folder_obj]:
temp_folder: d.Folder = root_folder.create_child_folder("Temp")
else:
for x in folder_obj:
if x.name == "Temp":
temp_folder = x
break
if not temp_folder.upload_file(file_path) is None:
logging.info("Upload success!")
if is_interactive:
dlg.gauge_update(95, "Upload success, check if the file exceeds the threshold...", update_text=True)
# Check if the file amount exceed the threshold
temp_file_list: list[dlg.DriveItem] = [x for x in temp_folder.get_items(limit=500) if not x.is_folder]
if len(temp_file_list) > temp_file_threshold:
temp_file_list.sort(key=get_modified_time, reverse=True)
for i in range(5, len(temp_file_list)):
temp_file_list[i].delete()
logging.info("Temp file's amount exceeded, deleted the outdated file.")
else:
logging.error("Upload failed.")
if is_interactive:
dlg.gauge_update(80, "Update failed.", update_text=True)
if file_path.exists():
os.remove(file_path) # Remove the temporary file
logging.info("Circulation finished.")
if is_interactive:
dlg.gauge_update(100, "Circulation finished.", update_text=True)
dlg.gauge_stop()
except Exception as e:
if is_interactive:
dlg.gauge_stop()
logging.error(e)
logging.error(traceback.format_exc())
logging.error("AN error occurred, the circulation will be jumped "
"to the end and wait for the next turn.")
def infinite_routine(self, interval=(600, 1200),
business_hours: bool = False,
routine_timeout: int = 30,
temp_file_threshold: int = 5, **kwargs):
is_interactive = False
dlg = Dialog()
for key, value in kwargs.items():
if key == "interactive":
is_interactive = value
if not self.account.is_authenticated:
if is_interactive:
dlg.msgbox("The account is not authenticated! You need to authenticate it first.")
raise EnvironmentError("The account is not authenticated! You need to authenticate it first.")
while True:
try:
func_timeout(routine_timeout, self.single_routine, kwargs={'temp_file_threshold': temp_file_threshold,
'is_interactive': is_interactive})
except FunctionTimedOut:
logging.error("Timeout exceeded, the process will be jumped to the next routine.")
finally:
if business_hours and \
not (9 <= time.gmtime().tm_hour < 17 and 0 <= time.gmtime().tm_wday <= 4):
# Not in business hours
next_routine_until = datetime.combine(dt.date.today(), dt.time.min) + timedelta(days=1, hours=9)
while next_routine_until.weekday() not in range(0, 5):
next_routine_until += timedelta(days=1)
logging.info("Out of the business time, the next routine will be executed at {0}".
format(next_routine_until.ctime()))
if is_interactive:
dlg.infobox("Out of the business time, the next routine will be executed at {0}".
format(next_routine_until.ctime()), width=54, title="Info")
pause.until(next_routine_until.timestamp())
else:
print("1")
print(repr(interval))
if isinstance(interval, int):
print("2")
logging.info(
"The next routine will be executed at {0}".format(time.ctime(time.time() + interval)))
if is_interactive:
dlg.infobox(
"The next routine will be executed at {0}".format(time.ctime(time.time() + interval)),
width=54, title="Info")
pause.until(time.time() + interval)
if isinstance(interval, tuple):
print("3")
if not len(interval) == 2:
raise ValueError("The interval has a not good format.")
(min_interval, max_interval) = interval
random_interval = random.randint(min_interval if min_interval <= max_interval else max_interval,
max_interval if min_interval <= max_interval else min_interval)
print(random_interval)
logging.info(
"The next routine will be executed at {0}".format(
time.ctime(time.time() + random_interval)))
if is_interactive:
dlg.infobox(
"The next routine will be executed at {0}".format(time.ctime(time.time() + random_interval)),
width=54, title="Info")
pause.until(time.time() + random_interval)