import os import time import logging import O365 import pathlib import random import pause import readline import traceback import datetime as dt from O365 import drive as d from O365 import mailbox as mb from func_timeout import func_timeout, FunctionTimedOut from datetime import datetime, timedelta from dialog import Dialog log_format = "[%(asctime)s][%(filename)s][%(levelname)s] %(message)s" logging.basicConfig(level=logging.INFO, format=log_format, datefmt="%Y-%m-%d %H:%M:%S") def get_modified_time(file: d.DriveItem): return file.modified class O365_User: def __init__(self, account: O365.Account = None): # Authenticate the session self.account = account # Pass the account object to the Class if self.account is None: raise ValueError("The account cannot be none!") if not self.account.is_authenticated: if self.account.authenticate(): logging.info("Authenticate success.") else: logging.error("There is a problem during the authentication.") def single_routine(self, temp_file_threshold: int = 5, is_interactive: bool = False): # This operation will be executed within route. dlg = Dialog() if not self.account.is_authenticated: raise EnvironmentError("The account is not authenticated! You need to authenticate it first.") # Infinite circulation try: logging.info("Getting user's mailbox") if is_interactive: dlg.gauge_start("Getting user's mailbox...", width=54, percent=0, title="Executing...") mailbox = self.account.mailbox() inbox_folder: mb.Folder = mailbox.inbox_folder() assert isinstance(inbox_folder, mb.Folder), "inbox_folder is not a Mailbox instance." logging.info("Getting recent messages") if is_interactive: dlg.gauge_update(20, "Getting recent messages...", update_text=True) message_list: list[O365.Message] = inbox_folder.get_messages(limit=10, download_attachments=False) # assert isinstance(message_list, ), "message_list is not a list instance." output_str = "10 recent messages: \n" \ "=================== \n" \ "" count = 0 for message in message_list: count += 1 output_str += "[{:2}]\n".format(str(count)) output_str += "Subject: {0}\n".format(message.subject or "None") output_str += "Contents: \n{0}\n\n".format(message.get_body_text()) output_str += "Grubbed at {0}.".format(time.asctime()) output_str.replace(" ", "") if is_interactive: dlg.gauge_update(40, "Writing into a file...", update_text=True) main_path = pathlib.Path().resolve().parent if not (main_path.joinpath('temp').exists() or main_path.joinpath('temp').exists()): # Create a new path if it is not shown main_path.joinpath('temp').mkdir(exist_ok=False) temp_path = main_path.joinpath('temp') file_path = temp_path.joinpath('mailoutput_{0}{1:02}{2:02}{3:02}{4:02}{5:02}.txt'.format( time.gmtime().tm_year, time.gmtime().tm_mon, time.gmtime().tm_mday, time.gmtime().tm_hour, time.gmtime().tm_min, time.gmtime().tm_sec )) with open(file_path, mode="w") as f: f.writelines(output_str) f.close() logging.info("Successfully written the log into a file.") logging.info("Getting user's OneDrive drive") if is_interactive: dlg.gauge_update(60, "Getting user's OneDrive drive...", update_text=True) drive: d.Drive = self.account.storage().get_default_drive() assert isinstance(drive, d.Drive), "storage is not a Storage instance!" root_folder: d.Folder = drive.get_root_folder() if is_interactive: dlg.gauge_update(80, "Uploading file...", update_text=True) folder_list = root_folder.get_child_folders(limit=500) folder_obj = [x for x in folder_list] temp_folder = root_folder if "Temp" not in [x.name for x in folder_obj]: temp_folder: d.Folder = root_folder.create_child_folder("Temp") else: for x in folder_obj: if x.name == "Temp": temp_folder = x break if not temp_folder.upload_file(file_path) is None: logging.info("Upload success!") if is_interactive: dlg.gauge_update(95, "Upload success, check if the file exceeds the threshold...", update_text=True) # Check if the file amount exceed the threshold temp_file_list: list[dlg.DriveItem] = [x for x in temp_folder.get_items(limit=500) if not x.is_folder] if len(temp_file_list) > temp_file_threshold: temp_file_list.sort(key=get_modified_time, reverse=True) for i in range(5, len(temp_file_list)): temp_file_list[i].delete() logging.info("Temp file's amount exceeded, deleted the outdated file.") else: logging.error("Upload failed.") if is_interactive: dlg.gauge_update(80, "Update failed.", update_text=True) if file_path.exists(): os.remove(file_path) # Remove the temporary file logging.info("Circulation finished.") if is_interactive: dlg.gauge_update(100, "Circulation finished.", update_text=True) dlg.gauge_stop() except Exception as e: if is_interactive: dlg.gauge_stop() logging.error(e) logging.error(traceback.format_exc()) logging.error("AN error occurred, the circulation will be jumped " "to the end and wait for the next turn.") def infinite_routine(self, interval=(600, 1200), business_hours: bool = False, routine_timeout: int = 30, temp_file_threshold: int = 5, **kwargs): is_interactive = False dlg = Dialog() for key, value in kwargs.items(): if key == "interactive": is_interactive = value if not self.account.is_authenticated: if is_interactive: dlg.msgbox("The account is not authenticated! You need to authenticate it first.") raise EnvironmentError("The account is not authenticated! You need to authenticate it first.") while True: try: func_timeout(routine_timeout, self.single_routine, kwargs={'temp_file_threshold': temp_file_threshold, 'is_interactive': is_interactive}) except FunctionTimedOut: logging.error("Timeout exceeded, the process will be jumped to the next routine.") finally: if business_hours and \ not (9 <= time.gmtime().tm_hour < 17 and 0 <= time.gmtime().tm_wday <= 4): # Not in business hours next_routine_until = datetime.combine(dt.date.today(), dt.time.min) + timedelta(days=1, hours=9) while next_routine_until.weekday() not in range(0, 5): next_routine_until += timedelta(days=1) logging.info("Out of the business time, the next routine will be executed at {0}". format(next_routine_until.ctime())) if is_interactive: dlg.infobox("Out of the business time, the next routine will be executed at {0}". format(next_routine_until.ctime()), width=54, title="Info") pause.until(next_routine_until.timestamp()) else: if isinstance(interval, int): logging.info( "The next routine will be executed at {0}".format(time.ctime(time.time() + interval))) if is_interactive: dlg.infobox( "The next routine will be executed at {0}".format(time.ctime(time.time() + interval)), width=54, title="Info") pause.until(time.time() + interval) if isinstance(interval, tuple): if not len(interval) == 2: raise ValueError("The interval has a bad format.") (min_interval, max_interval) = interval random_interval = random.randint(min_interval if min_interval <= max_interval else max_interval, max_interval if min_interval <= max_interval else min_interval) logging.info( "The next routine will be executed at {0}".format( time.ctime(time.time() + random_interval))) pause.until(time.time() + random_interval)