Source code for greatday.session

"""Contains the GreatSession class."""

from __future__ import annotations

import datetime as dt
import os
from pathlib import Path
import string
import tempfile
from types import TracebackType
from typing import Type, cast

from logrus import Logger
import magodo
from magodo.types import Priority
from potoroo import Repo, UnitOfWork

from .common import NULL_ID
from .dates import get_relative_date
from .repo import FileRepo, SQLRepo
from .tag import GreatTag
from .todo import GreatTodo


logger = Logger(__name__)


[docs]class GreatSession(UnitOfWork[FileRepo]): """Each time todos are opened in an editor, a new session is created.""" def __init__( self, db_url: str, tag: GreatTag, *, name: str = None, verbose: int = 0, ) -> None: prefix = None if name is None else f"{name}." _, temp_path = tempfile.mkstemp(prefix=prefix, suffix=".txt") # --- public attributes self.db_url = db_url self.path = Path(temp_path) # --- private attributes self._tag = tag self._name = name self._verbose = verbose self._master_repo = SQLRepo(self.db_url, verbose=verbose) self._key_to_old_todo = {} # will be accessed via `self.repo` from this point forward self._repo = FileRepo(self.path) # init SQL repo and file repo + populate file repo's text file with # todos which match `tag` # # we also populate the `self._key_to_old_todo` dict here self.path.parent.mkdir(parents=True, exist_ok=True) with self.path.open("a+") as f: for todo in sorted(self._master_repo.get_by_tag(tag).unwrap()): f.write(todo.to_line() + "\n") self._key_to_old_todo[todo.ident] = todo def __enter__(self) -> GreatSession: """Called before entering a GreatSession with-block.""" return self def __exit__( self, exc_type: Type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None, ) -> None: """Called before exiting a GreatSession with-block.""" del exc_type del exc_value del traceback os.unlink(self.path)
[docs] def commit(self) -> None: """Commit our changes.""" removed_todo_keys = list(self._key_to_old_todo.keys()) new_todos = {} for todo in self.repo.all().unwrap(): key = todo.ident if key in removed_todo_keys: removed_todo_keys.remove(key) old_todo = self._key_to_old_todo.get(key) if key == NULL_ID: logger.info("New todo was added while editing?", todo=todo) key = self._master_repo.add(todo).unwrap() new_todos[key] = todo elif todo != old_todo: _commit_todo_changes(self._master_repo, todo, old_todo) if new_todos: # HACK: Removes all new todos by assuming that new todos will not # have been assigned an ID yet. old_lines = self.path.read_text().split("\n") self.path.write_text( "\n".join(line for line in old_lines if " id:" in line) ) for key, todo in new_todos.items(): self._key_to_old_todo[key] = todo self.repo.add(todo, key=key) for key in removed_todo_keys: removed_todo = self._master_repo.remove(key).unwrap() if removed_todo is not None: logger.info("Todo has been deleted.", todo=removed_todo) del self._key_to_old_todo[removed_todo.ident]
[docs] def rollback(self) -> None: """Revert any changes made while in this GreatSession's with-block."""
@property def repo(self) -> FileRepo: """Returns the GreatRepo object associated with this GreatSession.""" return self._repo
def _commit_todo_changes( repo: Repo[str, GreatTodo], todo: GreatTodo, old_todo: GreatTodo | None ) -> None: """Updates todo in repo. This function also handles recurring todos (i.e. todos with the 'recur' metatag). """ recur = todo.metadata.get("recur") until = todo.metadata.get("until") expired = bool( todo.done_date and until and magodo.dates.to_date(until) <= todo.done_date ) if ( old_todo and todo.done_date and not old_todo.done_date and recur and not expired ): next_metadata = dict(todo.metadata.items()) # set 'prev' and 'xp' metatags for next todo... next_metadata["prev"] = next_metadata["id"] del next_metadata["id"] if next_metadata.get("p"): del next_metadata["p"] # set 'due' metatag for next todo... due = todo.metadata.get("due") if recur.islower() or due is None: start_date = todo.done_date else: start_date = magodo.dates.to_date(due) next_date = get_relative_date(recur, start_date=start_date) next_metadata["due"] = magodo.dates.from_date(next_date) # set creation date + clear creation/done time for next todo... next_create_date = dt.date.today() for key in ["ctime", "dtime"]: if key in next_metadata: del next_metadata[key] # clear out contexts we don't want to roll over to the next todo... contexts = [ctx for ctx in todo.contexts if ctx not in ["D"]] # set priority for next todo... priority = todo.metadata.get("priority") next_priority = magodo.DEFAULT_PRIORITY if ( priority and len(priority) == 1 and priority.upper() in string.ascii_uppercase ): next_priority = cast(Priority, priority.upper()) elif priority: logger.warning("Bad 'priority' metatag value?", priority=priority) # add next todo to repo... next_todo = todo.new( contexts=contexts, create_date=next_create_date, done=False, done_date=None, metadata=next_metadata, priority=next_priority, ) next_key = repo.add(next_todo).unwrap() # add 'next' metatag to old todo... metadata = dict(todo.metadata.items()) metadata["next"] = next_key todo = todo.new(metadata=metadata) repo.update(todo.ident, todo).unwrap()