imessage-query-fastmcp-mcp-server

by hannesrudolph
Verified
================================================ File: /docs/changelog.md ================================================ ```{include} ../CHANGELOG.md ``` ================================================ File: /docs/contributing.md ================================================ ```{include} ../CONTRIBUTING.md ``` ================================================ File: /docs/make.bat ================================================ @ECHO OFF pushd %~dp0 REM Command file for Sphinx documentation if "%SPHINXBUILD%" == "" ( set SPHINXBUILD=sphinx-build ) set SOURCEDIR=. set BUILDDIR=_build %SPHINXBUILD% >NUL 2>NUL if errorlevel 9009 ( echo. echo.The 'sphinx-build' command was not found. Make sure you have Sphinx echo.installed, then set the SPHINXBUILD environment variable to point echo.to the full path of the 'sphinx-build' executable. Alternatively you echo.may add the Sphinx directory to PATH. echo. echo.If you don't have Sphinx installed, grab it from echo.https://www.sphinx-doc.org/ exit /b 1 ) if "%1" == "" goto help %SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% goto end :help %SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% :end popd ================================================ File: /docs/Makefile ================================================ # Minimal makefile for Sphinx documentation # # You can set these variables from the command line, and also # from the environment for the first two. SPHINXOPTS ?= SPHINXBUILD ?= sphinx-build SOURCEDIR = . BUILDDIR = _build # Put it first so that "make" without argument is like "make help". help: @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) .PHONY: help Makefile # Catch-all target: route all unknown targets to Sphinx using the new # "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). %: Makefile @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) ================================================ File: /docs/requirements.txt ================================================ myst-nb sphinx-autoapi sphinx-rtd-theme ================================================ File: /docs/index.md ================================================ ```{include} ../README.md ``` ```{toctree} :maxdepth: 1 :hidden: JulioHtml.png JulioText.png changelog.md contributing.md autoapi/index ``` ================================================ File: /docs/conf.py ================================================ # Configuration file for the Sphinx documentation builder. # # For the full list of built-in configuration values, see the documentation: # https://www.sphinx-doc.org/en/master/usage/configuration.html # -- Project information ----------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information project = 'iMessageDB' copyright = '2023, Xev Gittler' author = 'Xev Gittler' release = '1.0.3' # -- General configuration --------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration extensions = ["myst_parser", 'autoapi.extension', "sphinx.ext.napoleon", "sphinx.ext.viewcode"] autoapi_dirs = ['../src'] templates_path = ['_templates'] exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store'] # -- Options for HTML output ------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output html_theme = 'sphinxdoc' html_static_path = ['_static'] ================================================ File: /main.py ================================================ # This is a sample Python script. import argparse import logging import configparser import os import sys import imessagedb def create_default_config(file_name): config = configparser.ConfigParser() config['DISPLAY'] = { 'me_background_color': 'AliceBlue', 'them_background_color': 'Lavender', 'me_name': 'Blue', 'them_name': 'Purple', 'thread_background': 'HoneyDew', 'me_thread': 'AliceBlue', 'them_thread': 'Lavender', } with open(file_name, 'w') as configfile: config.write(configfile) return config def Usage(): print( f'{sys.argv[0]} --output_directory <dir> --name <name> --verbose --debug') sys.exit() def print_handles(db): print(db.handles) return def get_contacts(configuration): result = {} contact_list = configuration.items('CONTACTS') for contact in contact_list: key = contact[0] value = contact[1] stripped = value.replace('\n', '').replace(' ', '') number_list = stripped.split(',') result[key] = number_list return result if __name__ == '__main__': imessagedb.run() exit(0) config_file = f'{os.environ["HOME"]}/.config/iMessageDB.ini' create = False verbose = True debug = False no_table = False no_copy = False no_attachment = False inline = False output_directory = '/tmp' logging.basicConfig(level=logging.INFO, format='%(asctime)s <%(name)s> %(message)s', datefmt='%Y-%m-%d %H:%M:%S') logger = logging.getLogger('main') logger.info("Processing parameters") argument_parser = argparse.ArgumentParser() argument_parser.add_argument("--name", help="Person to get conversations about", required=True) argument_parser.add_argument("-c", "--configfile", help="Location of the configuration file", default=f'{os.environ["HOME"]}/.config/iMessageDB.ini') argument_parser.add_argument("-o", "--output_directory", help="The output directory where the output and attachments go") argument_parser.add_argument("--database", help="The database file to open", default=f"{os.environ['HOME']}/Library/Messages/chat.db") argument_parser.add_argument("-m", "--me", help="The name to use to refer to you", default="Me") argument_parser.add_argument("-t", "--output_type", help="The type of output", choices=["text", "html"]) argument_parser.add_argument("-i", "--inline", help="Show the attachments inline", action="store_true") mutex_group = argument_parser.add_mutually_exclusive_group() mutex_group.add_argument("-f", "--force", help="Force a copy of the attachments", action="store_true") mutex_group.add_argument("--no_copy", help="Don't copy the attachments", action="store_true") argument_parser.add_argument("--no_attachments", help="Don't process attachments at all", action="store_true") argument_parser.add_argument("-v", "--verbose", help="Turn on additional output", action="store_true") args = argument_parser.parse_args() # First read in the configuration file, creating it if need be, then overwrite the values from the command line if not os.path.exists(args.configfile): imessagedb._create_default_configuration(args.configfile) config = configparser.ConfigParser() config.read(args.configfile) CONTROL = 'CONTROL' DISPLAY = 'DISPLAY' config.set(CONTROL, 'Person', args.name) config.set(CONTROL, 'verbose', str(args.verbose)) if args.output_directory: config.set(CONTROL, 'copy directory', args.output_directory) if args.no_copy: config.set(CONTROL, 'copy', 'False') if args.output_type: config.set(CONTROL, 'output type', args.output_type) if args.force: config.set(CONTROL, 'force copy', 'True') if args.no_attachments: config.set(CONTROL, 'skip attachments', 'True') if args.inline: config.set(DISPLAY, 'inline attachments', 'True') out = sys.stdout contacts = get_contacts(config) Person = config[CONTROL]['Person'] if Person.lower() not in contacts.keys(): logger.error(f"{Person} not known. Please edit contacts list.") Usage() config[CONTROL]['attachment directory'] = f"{config[CONTROL]['copy directory']}/{Person}_attachments" filename = f'{Person}.html' out = open(f"{config[CONTROL]['copy directory']}/{filename}", 'w') try: os.mkdir(config['CONTROL']['attachment directory']) except FileExistsError: pass database = imessagedb.DB(args.database, config=config) message_list = database.Messages(Person, contacts[Person.lower()]) # database.TextOutput(args.me, Person, message_list, output_file=out).print() html_string = database.HTMLOutput(args.me, Person, message_list, output_file=out) print("The end") database.disconnect() # See PyCharm help at https://www.jetbrains.com/help/pycharm/ ================================================ File: /tests/test_handle.py ================================================ import imessagedb import os def test_handles(): database = imessagedb.DB(os.path.join(os.path.dirname(__file__), "chat.db")) assert len(database.handles) == 2, "Unexpected number of handles" def test_handle(): database = imessagedb.DB(os.path.join(os.path.dirname(__file__), "chat.db")) h = database.handles hand = h.handles[2] assert hand.number == "scripting@schore.org", "Unexpected handle" ================================================ File: /tests/test_attachments.py ================================================ import imessagedb import os def test_attachments(): database = imessagedb.DB(os.path.join(os.path.dirname(__file__), "chat.db")) attachments = database.attachment_list assert len(attachments.attachment_list) == 2, "Unexpected number of attachments" def test_attachment(): database = imessagedb.DB(os.path.join(os.path.dirname(__file__), "chat.db")) attachments = database.attachment_list attachment = attachments.attachment_list[98368] expected_path = f"{os.environ['HOME']}/Library/Messages/Attachments/4f/15/D7CEBAED-9844-4B25-B841-F7748EA3BCAD/IMG_4911.heic" assert attachment.original_path == expected_path, "Unexpected value in attachment" ================================================ File: /tests/test_db.py ================================================ import configparser import imessagedb import os def test_connection(): database = imessagedb.DB(os.path.join(os.path.dirname(__file__), "chat.db")) assert database def test_configuration(): database = imessagedb.DB(os.path.join(os.path.dirname(__file__), "chat.db")) config = database.config assert isinstance(config, configparser.ConfigParser), \ "Expected return of database.config to be of class 'configparser.ConfigParser'" assert config['CONTROL'], "Expected the CONTROL section to be in the configuration" ================================================ File: /tests/test_chat.py ================================================ import imessagedb import os def test_chats(): database = imessagedb.DB(os.path.join(os.path.dirname(__file__), "chat.db")) assert len(database.chats) == 2, "Unexpected number of chats" ================================================ File: /tests/test_imessagedb.py ================================================ from imessagedb import db ================================================ File: /tests/imessagedb_test.py ================================================ #!/Users/xev/opt/anaconda3/envs/imessagedb/bin/python # -*- coding: utf-8 -*- import re import sys from imessagedb import run if __name__ == '__main__': sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0]) sys.exit(run()) ================================================ File: /.github/workflows/ci-cd.yml ================================================ name: ci-cd on: [push, pull_request] jobs: ci: # Set up operating system runs-on: ubuntu-latest # Define job steps steps: - name: Set up Python 3.9 uses: actions/setup-python@v2 with: python-version: 3.9 - name: Check-out repository uses: actions/checkout@v2 - name: Install poetry uses: snok/install-poetry@v1 - name: Install package run: poetry install - name: Test with pytest run: poetry run pytest tests/ --cov=imessagedb --cov-report=xml - name: Use Codecov to track coverage uses: codecov/codecov-action@v2 with: files: ./coverage.xml # coverage report - name: Build documentation run: poetry run make html --directory docs/ cd: # Only run this job if the "ci" job passes needs: ci # Only run this job if new work is pushed to "main" if: github.event_name == 'push' && github.ref == 'refs/heads/main' # Set up operating system runs-on: ubuntu-latest # Define job steps steps: - name: Set up Python 3.9 uses: actions/setup-python@v2 with: python-version: 3.9 - name: Check-out repository uses: actions/checkout@v2 with: fetch-depth: 0 - name: Install poetry uses: snok/install-poetry@v1 - name: Install package run: poetry install - name: Use Python Semantic Release to prepare release env: GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} run: | git config user.name github-actions git config user.email github-actions@github.com poetry run semantic-release publish - name: Publish to PyPI uses: pypa/gh-action-pypi-publish@release/v1 with: user: __token__ password: ${{ secrets.PYPI_API_TOKEN }} ================================================ File: /.readthedocs.yml ================================================ # Required version: 2 # Image to use build: image: testing # Configuration python: version: 3.9 install: - requirements: docs/requirements.txt - method: pip path: . ================================================ File: /pyproject.toml ================================================ [tool.poetry] name = "imessagedb" version = "1.4.8" description = "Reads and displays the Apple iMessage database" authors = ["xev <git@schore.org>"] license = "MIT" readme = "README.md" [tool.poetry.dependencies] python = "^3.9" alive-progress = "^3.1.2" ffmpeg-python = "^0.2.0" heic2png = "^1.0.0" configparser = "^5.3.0" termcolor = "^2.3.0" python-dateutil = "^2.8.2" [tool.poetry.group.dev.dependencies] pytest = "^7.3.1" pytest-cov = "^4.0.0" myst-nb = {version = "^0.17.2", python = "^3.9"} sphinx-autoapi = "^2.1.0" sphinx-rtd-theme = "^1.2.0" python-semantic-release = "^7.33.4" [tool.poetry.scripts] imessagedb = 'imessagedb:run' [tool.semantic_release] version_variable = "pyproject.toml:version" branch = "main" # branch to make releases of changelog_file = "CHANGELOG.md" # changelog file build_command = "poetry build" # build dists dist_path = "dist/" # where to put dists upload_to_release = true # auto-create GitHub release upload_to_pypi = false # don't auto-upload to PyPI remove_dist = false # don't remove dists patch_without_tag = true # patch release by default [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" ================================================ File: /poetry.lock ================================================ [Content ignored: file too large] ================================================ File: /CONDUCT.md ================================================ # Code of Conduct ## Our Pledge In the interest of fostering an open and welcoming environment, we as contributors and maintainers pledge to making participation in our project and our community a harassment-free experience for everyone, regardless of age, body size, disability, ethnicity, gender identity and expression, level of experience, nationality, personal appearance, race, religion, or sexual identity and orientation. ## Our Standards Examples of behavior that contributes to creating a positive environment include: * Using welcoming and inclusive language * Being respectful of differing viewpoints and experiences * Gracefully accepting constructive criticism * Focusing on what is best for the community * Showing empathy towards other community members Examples of unacceptable behavior by participants include: * The use of sexualized language or imagery and unwelcome sexual attention or advances * Trolling, insulting/derogatory comments, and personal or political attacks * Public or private harassment * Publishing others' private information, such as a physical or electronic address, without explicit permission * Other conduct which could reasonably be considered inappropriate in a professional setting ## Our Responsibilities Project maintainers are responsible for clarifying the standards of acceptable behavior and are expected to take appropriate and fair corrective action in response to any instances of unacceptable behavior. Project maintainers have the right and responsibility to remove, edit, or reject comments, commits, code, wiki edits, issues, and other contributions that are not aligned to this Code of Conduct, or to ban temporarily or permanently any contributor for other behaviors that they deem inappropriate, threatening, offensive, or harmful. ## Scope This Code of Conduct applies both within project spaces and in public spaces when an individual is representing the project or its community. Examples of representing a project or community include using an official project e-mail address, posting via an official social media account, or acting as an appointed representative at an online or offline event. Representation of a project may be further defined and clarified by project maintainers. ## Enforcement Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by contacting the project team. The project team will review and investigate all complaints, and will respond in a way that it deems appropriate to the circumstances. The project team is obligated to maintain confidentiality with regard to the reporter of an incident. Further details of specific enforcement policies may be posted separately. Project maintainers who do not follow or enforce the Code of Conduct in good faith may face temporary or permanent repercussions as determined by other members of the project's leadership. ## Attribution This Code of Conduct is adapted from the [Contributor Covenant homepage](http://contributor-covenant.org/version/1/4), version 1.4. ================================================ File: /README.md ================================================ # imessagedb Reads and displays the Apple iMessage database ## Installation ```bash $ pip install imessagedb ``` ## Usage `imessagedb` can be used as a library for accessing and parsing the iMessasge chat database stored at ~/Library/Messages/chat.db, or it can be used on the command line to display the contents. ### How it works The command line version wraps the functions provided and outputs data based on the configuration available in the configuration file, as well as command line arguments. The way that iMessage identifies conversation particpants is by a `handle`, which can be their phone number or email address. To make it easier, you can map handles in the configuration file. You can also specify the type of output (text or html) and the time frame of the messages. At this point `imesssagedb` will pull all messages from that person, regardless of the chat that it is part of. If you specify html output, `imessagedb` will copy your attachments so that they are accessible to the web browser. In addition, it will convert certain types of attachments so that they can be viewed in the browser. For instance, it will convert HEIC files to PNG so that they browser can display it, and will convert various type of movie files to mp4 and various audio files to mp3. If you have many attachments, this can take a long time and take a lot of space. ### Example Output This is what the default html version looks like. Note that I am hovering over one of the links to show the image. ![](JulioHtml.png) This is the same conversation rendered in text. ![](JulioText.png) ### Command Line ```python imessagedb [-h] [--handle [HANDLE ...] | --name NAME] [-c CONFIGFILE] [-o OUTPUT_DIRECTORY] [--database DATABASE] [-m ME] [-t {text,html}] [-i] [-f | --no_copy] [--no_attachments] [-v] [--start_time START_TIME] [--end_time END_TIME] optional arguments: -h, --help show this help message and exit --handle [HANDLE ...] A list of handles to search against --name NAME Person to get conversations about -c CONFIGFILE, --configfile CONFIGFILE Location of the configuration file -o OUTPUT_DIRECTORY, --output_directory OUTPUT_DIRECTORY The output directory where the output and attachments go --database DATABASE The database file to open -m ME, --me ME The name to use to refer to you -t {text,html}, --output_type {text,html} The type of output -i, --inline Show the attachments inline -f, --force Force a copy of the attachments --no_copy Don't copy the attachments --no_attachments Don't process attachments at all -v, --verbose Turn on additional output --start_time START_TIME The start time of the messages --end_time END_TIME The end time of the messages --split_output SPLIT_OUTPUT Split the html output into files with this many messages per file --version Show the version number and exit --get_handles Display the list of handles in the database and exit --get_chats Display the list of chats in the database and exit ``` #### Command line options **-h** prints a help message | **--handle [HANDLE ...]** A list of handles to search against. For instance, '*--handle +12016781234 john@smith.com*'. **--name NAME** A person to search for. The mapping of person to handle is in the configuration file (see below) *** Note that you can only have one of --handle or --name, not both *** **-c CONFIGFILE, --configfile CONFIGFILE** The location of the configuration file. If the file does not exist, a default configuration file will be created there. If this option is not provided, the default location is `~/.conf/iMessageDB.ini`. **-o OUTPUT_DIRECTORY, --output_directory OUTPUT_DIRECTORY** The output directory where the output and attachments go. If this option is not provided, the default location is your home directory. The files will be `~/NAME.html` and attachments will be in `~/NAME_Attachments`. **--database DATABASE** The database file to open. If this option is not provided it will default to `~/Library/Messages/chat.db`, which is where Apple puts it. **-m ME, --me ME** The name to use to refer to you in the output. If this option is not provided it will default to `Me`. **-t {text,html}, --output_type {text,html}** The type of output, either *text* or *html*. If this option is not provided it will default to `html`. **--start_time START_TIME** <br> **--end_time END_TIME** By default, the program will process all messages. If you want to restrict that to particular timeframe, you can specify the `--start-time` and/or `--end_time` options **--split_output SPLIT_OUTPUT** Split the html output into files with this many messages per file **-i, --inline** Show the attachments inline. By default, the attachments will show as links that you can hover over and it will pop up the attachment. With this option, it will put them inline, which will make your output much busier and take a lot more memory in your browser. **-f, --force** Force a copy of the attachments. By default, if the attachment already exists in the destination directory, it will not re-copy the file, but this will force it to re-copy it. **--no_copy** Don't copy the attachments. This will make them inaccessible for viewing **--no_attachments** Do not show the attachments at all **-v, --verbose** Turn on additional output **--version** Shows the version number and exits **--get_handles** Display the list of handles in the database and exit **--get_chats** Display the list of chats in the database and exit ### Configuration File The configuration file is in configparser format. Here is the template that is created by default: ```python [CONTROL] # Whether or not to copy the attachments into a different directory. This is needed for two reasons: # 1) The web browser does not have access to the directory that the attachments are stored, so it cannot display them # 2) Some of the attachments need to be converted in order to be viewed in the browser copy = True # The directory to put the output. The html file will go in {copy_directory}/{Person}.html, # and the attachments will go in {copy_directory}/{Person}_Attachments. If you specify HOME, then # it will put it in your home directory copy directory = HOME # If the file already exists in the destination directory it is not recopied, but that can be overridden by # specifying 'force copy' as true force copy = False # 'Skip attachments' ignores attachments skip attachments = False # Additional details show some other information on each message, including the chat id and any edits that have # been done additional details = False # Some extra verbosity if true verbose = True [DISPLAY] # Output type, either html or text output_type = html # Number of messages in each html file. If this is 0 or not specified, it will be one large file. output split = 1000 # Inline attachments mean that the images are in the HTML instead of loaded when hovered over inline attachments = False # Popup location is where the attachment popup window shows up, and is either 'upper right', 'upper left' or 'floating' popup location = upper right # 'me' is the name to put for your text messages me = Me # The color for the name in text output. No color is used if 'use text color' is false. # The color can only be one of the following options: # black, red, green, yellow, blue, magenta, cyan, white, light_grey, dark_grey, # light_red, light_green, light_yellow, light_blue, light_magenta, light_cyan use text color = True me text color = blue them text color = magenta reply text color = light_grey # The background color of the text messages for you and for the other person in html output # The options for colors can be found here: https://www.w3schools.com/cssref/css_colors.php me html background color = AliceBlue them html background color = Lavender # The background color of the thread in replies thread background = HoneyDew me thread background = AliceBlue them thread background = Lavender # The color of the name in the html output me html name color = Blue them html name color = Purple [CONTACTS] # A person that you text with can have multiple numbers, and you may not always want to specify the full specific # number as stored in the handle database, so you can do the mapping here, providing the name of a person, # and a comma separated list of numbers Samantha: +18434676040, samanthasmilt@gmail.com, s12ddd2@colt.edu Abe: +16103499696 Marissa: +14029490739 ``` ### Library Usage ```python import imessagedb database = imessagedb.DB() ``` - TODO - More usage here ## Contributing Interested in contributing? Check out the contributing guidelines. Please note that this project is released with a Code of Conduct. By contributing to this project, you agree to abide by its terms. ## License `imessagedb` was created by Xev Gittler. It is licensed under the terms of the MIT license. ## Documentation Full documentation available at https://imessagedb.readthedocs.io/en/latest/ ================================================ File: /cfdhat.db ================================================ ================================================ File: /src/imessagedb/__main__.py ================================================ import imessagedb if __name__ == '__main__': imessagedb.run() ================================================ File: /src/imessagedb/default.ini ================================================ [CONTROL] # Whether or not to copy the attachments into a different directory. This is needed for two reasons: # 1) The web browser does not have access to the directory that the attachments are stored, so it cannot display them # 2) Some of the attachments need to be converted in order to be viewed in the browser copy = True # The directory to put the output. The html file will go in {copy_directory}/{Person}.html, # and the attachments will go in {copy_directory}/{Person}_Attachments copy directory = /tmp # If the file already exists in the destination directory it is not recopied, but that can be overridden by # specifying 'force copy' as true force copy = False # If you specify 'skip attachments' then we won't process attachments at all skip attachments = False # Some extra verbosity if true verbose = True [DISPLAY] # 'me' is the name to put for your text messages me = Me # The color for the name in text output. No color is used if 'use text color' is false. # The color can only be one of the following options: # black, red, green, yellow, blue, magenta, cyan, white, light_grey, dark_grey, # light_red, light_green, light_yellow, light_blue, light_magenta, light_cyan use text color = True me text color = blue then text color = magenta reply text color = light_grey # The background color of the text messages for you and for the other person in html output # The options for colors can be found here: https://www.w3schools.com/cssref/css_colors.php me html background color = AliceBlue them html background color = Lavender # The background color of the thread in replies thread background = HoneyDew me thread background = AliceBlue them thread background = Lavender # The color of the name in the html output me html name color = Blue them html name color = Purple # If you want the attachments inline, rather than as pop-ups, specify 'inline attachments' inline attachments = False # Popup location is where the attachment popup window shows up, and is either 'upper right', 'upper left' or 'floating' popup location = Upper Left [CONTACTS] # A person that you text with can have multiple numbers, and you may not always want to specify the full specific # number as stored in the handle database, so you can do the mapping here, providing the name of a person, # and a comma separated list of numbers Ashley: +18484676050, ashleywilliams_@live.com, sparkette325@yahoo.com, +17325722108, williaa2@kean.edu Abe: +16103499696 Alisha: +15082088327 Emily: +12672803303 Hermosa: +17329103453 Kanani: +17325351530 Randi: +16093394399 Rose: fhecla@gmail.com, +16109602522 Marta: +19172974204 Maxine: mlee99@optonline.net, panda9966@gmail.com, +19084725929, +17323816580 Monika: +19176678993, mnowak2@binghamton.edu, Xplsumnerx@gmail.com Michele: +12153917169 Marissa: +14016490739 Lily: lilyyalexb@gmail.com, njlibrn@yahoo.com, +17327887245 Daniel: daniel@gittler.com, +17326460220, dsg1002@gmail.com, gittlerd@gmail.com, dgittler@berklee.edu Kandela: +17324849684 Tiffany: Wickedblueglow@gmail.com, Tlgreenberg80@gmail.com, +17327105145 Jessica: jessica.marozine@icloud.com, jessica.marozine@gmail.com, +17326923688 Becky: rebecca@schore.org, +17329838356 Lynda: lylasmith559@yahoo.com, lylalombardi@icloud.com, +17323001586 Stefanie: stefanie.thomas2@gmail.com, thostefa@kean.edu, +17326101065 ================================================ File: /src/imessagedb/generate_html.py ================================================ from datetime import datetime import re import string import imessagedb from imessagedb.message import Message from imessagedb.messages import Messages from alive_progress import alive_bar url_pattern = re.compile(r'((https?):((//)|(\\\\))+[\w\d:#@%/;$()~_?+-=\\.&]*)', re.MULTILINE | re.UNICODE) mailto_pattern = re.compile(r'([\w\-.]+@(\w[\w\-]+\.)+[\w\-]+)', re.MULTILINE | re.UNICODE) def _replace_url_to_link(value: str) -> str: """ From https://gist.github.com/guillaumepiot/4539986 """ # Replace url to link value = url_pattern.sub(r'<a href="\1" target="_blank">\1</a>', value) # Replace email to mailto value = mailto_pattern.sub(r'<a href="mailto:\1">\1</a>', value) return value class HTMLOutput: """ Creates an HTML file (or string) from a Messages list ... There are a number of options in the configuration file that affect how the HTML is created. In the CONTROL section, the following impact the output: copy = True : If the attachments are not copied, they are not available on the HTML output. There are two reasons for that. 1) The web browser does not have access to the directory that the attachments are stored, so it cannot display them 2) Some attachments need to be converted in order to be viewed in the browser, and need a place to live. skip attachments = False : If true, attachments will not be available in the HTML output In the DISPLAY section, the following impact the output: inline attachments = False : If inline attachments are true, then the images will be part of the HTML. If it is false, then there will be a popup when you hover over the attachment text that will show the attachment on demand. This will make the load much faster and the output far less cluttered. me = Me : The name to put for your part of the conversation. It defaults to 'Me'. html background color list = AliceBlue, Cyan, Gold, Lavender, LightGreen, PeachPuff, Wheat html name color list = Blue, DarkCyan, DarkGoldenRod, Purple, DarkGreen, Orange, Sienna : The background and name color of the messages in html output The options for colors can be found here: https://www.w3schools.com/cssref/css_colors.php The way that the color selection works is that it will use the first color on the color list for the first person in the conversation, the second for the second, third for the third, etc. If there are more participants than colors, it will wrap around to the first color. thread background = HoneyDew : The background color of the thread in replies """ def __init__(self, database, me: str, messages: Messages, inline: bool = False, output_file: str = None) -> None: """ Parameters ---------- database : imessagedb.DB An instance of a connected database me : str Your display name messages: imessagedb.DB.Messages The messages inline : bool Display attachments inline or not output_file : str The name of the output file""" self._database = database self._me = me self._messages = messages self._attachment_list = self._database.attachment_list self._inline = inline self._name_map = {} self._color_list = self._get_next_color() self._day = 'UNK' self._html_array = [] self._current_messages_processed = 0 self._current_messages_file = 0 self._file_start_date: datetime = None self._file_end_date: datetime = None self._previous_range: str = None self._last_row_had_conversion = False if output_file is not None: self._output_filename = output_file self._split_output = self._database.config.getint('DISPLAY', 'split output', fallback=0) self._previous_output_filename = None self._current_output_filename = f"{self._output_filename}.html" self._output_file_handle = open(self._current_output_filename, "w") else: self._output_filename = None start_time = self._database.control.get('start time', fallback=None) end_time = self._database.control.get('end time', fallback=None) date_string = "" if start_time: date_string = f" from {start_time}" if end_time: date_string = f"{date_string} until {end_time}" self._file_header_string = f' <div id="file_summary">Exchanged {len(self._messages):,} total messages with ' \ f'{self._messages.title}{date_string}.</div><p>\n' self._html_array.append(self._generate_table(self._messages)) self._print_and_save('</body>\n</html>\n', self._html_array, eof=True) def __repr__(self) -> str: return ''.join(self._html_array) def save(self, filename: str) -> None: """ Write the output to the output file""" file_handle = open(filename, "w") print('\n'.join(self._html_array), file=file_handle) file_handle.close() return def print(self) -> None: """ Print the output to stdout """ print('\n'.join(self._html_array)) return def _print_and_save(self, message: str, array: list, new_day: bool = False, eof: bool = False) -> None: """ Save to the output file while it is processing """ if self._current_messages_processed == 0: # If this is a new file, because it is either the first pass through, or if we need to create new file new_file_array = [self._generate_head(), "<body>\n", self._file_header_string, f'{" ":2s}<div class="picboxframe" id="picbox"> <img src="" /> </div>\n', f'{" ":2s}<table style="table-layout: fixed;">\n{" ":4s}<tr>\n'] if self._previous_output_filename: new_file_array.append(f'{" ":6s}<td style="text-align: left; width: 33%;">' f'<a href="file://{self._previous_output_filename}"> &lt </a> </td>\n' f'{" ":6s}<td style="text-align: center; width: 33%;"><div class="next_file">' + f'<a href="file://{self._previous_output_filename}">' + f' Previous Messages {self._previous_range}</a></div></td>\n') else: new_file_array.append(f'{" ":6s}<td style="width: 33%;"> </td>\n' f'{" ":6s}<td style="width: 33%;"> </td>\n') new_file_array.append(f'{" ":6s}<td style="text-align: right; width: 33%;" id="next_page"> </td>\n' f'{" ":4s}</tr>\n{" ":2s}</table>\n\n') new_file_array.append(f'{" ":2s}<table class="main_table">\n{" ":2s}</table>\n') for i in new_file_array: array.append(i) if self._output_filename is not None: print(i, end="", file=self._output_file_handle) array.append(message) self._current_messages_processed += 1 if self._output_filename is None: # We are not writing to a file return if eof: # Normally this gets done on the file split, but the eof indicates its last file, so do it # on the last write description_string = f' This page contains {self._current_messages_processed:,} messages from ' \ f'{self._file_start_date.strftime("%A %Y-%m-%d")} to ' \ f'{self._file_end_date.strftime("%A %Y-%m-%d")}.' print(f' <script>\n' f' el = document.getElementById("file_summary")\n' f' new_text = el.innerHTML.concat("{description_string}")\n' f' el.innerHTML = new_text\n', f' </script>\n', file=self._output_file_handle) print(message, end="", file=self._output_file_handle) if new_day and 0 < self._split_output < self._current_messages_processed: total_processed = self._current_messages_processed self._current_messages_processed = 0 self._current_messages_file += 1 self._previous_output_filename = self._current_output_filename self._current_output_filename = f"{self._output_filename}_{self._current_messages_file:02d}.html" print(f' <p><div class="next_file"><a href="file://{self._current_output_filename}">' f' Next Messages </a></div>\n', end="", file=self._output_file_handle) description_string = f' This page contains {total_processed:,} messages from ' \ f'{self._file_start_date.strftime("%A %Y-%m-%d")} to ' \ f'{self._file_end_date.strftime("%A %Y-%m-%d")}.' self._previous_range = f'<br><div style="font-size: 50%;">' \ f'({self._file_start_date.strftime("%A %Y-%m-%d")} to ' \ f'{self._file_end_date.strftime("%A %Y-%m-%d")})</div>' print(f' <script>\n' f' el = document.getElementById("file_summary")\n' f' new_text = el.innerHTML.concat("{description_string}")\n' f' el.innerHTML = new_text\n' f' document.getElementById("next_page").innerHTML = ' f' "<a href=\'file://{self._current_output_filename}\'> Next Page &gt </a>"\n' f' </script>', file=self._output_file_handle) print('</body>\n</html>\n', end="", file=self._output_file_handle) self._output_file_handle.close() print(f"Creating output file {self._current_output_filename}") self._output_file_handle = open(self._current_output_filename, "w") def _generate_thread_row(self, message: Message) -> str: if message.is_from_me: who_data = self._get_name(0) else: who_data = self._get_name(message.handle_id) style = who_data['style'] text = message.text row_string = f'{" ":16s}<tr>\n' \ f'{" ":18s}<td class="reply_name" style="color: {who_data["name_color"]};"> ' \ f'{who_data["name"]}: </td>\n' \ f'{" ":18s}<td class="reply_text_thread">\n' \ f'{" ":20s}<a href="#{message.rowid}">\n' \ f'{" ":22s}<button class="reply_text_{style}" style="background: ' \ f'{who_data["background_color"]};"> ' \ f'{text}</button>\n' \ f'{" ":20s}</a>\n' \ f'{" ":18s}</td>\n' \ f'{" ":16s}</tr>\n' return row_string def _generate_thread_table(self, message_list: list, style: str) -> str: table_string = f'{" ":14s}<table class="thread_table_{style}">\n' for message in message_list: table_string = f'{table_string}{self._generate_thread_row(message)}' table_string = f'{table_string}' \ f'{" ":14s}</table>\n' \ f'{" ":14s}<p>\n' return table_string def _generate_table(self, message_list: Messages) -> str: table_array = [] self._print_and_save(f'{" ":2s}<table class="main_table">\n', table_array) previous_day = '' message_count = 0 with alive_bar(len(message_list), title="Generating HTML", stats="({rate}, eta: {eta})", comma=True) as bar: for message in message_list: message_count = message_count + 1 today = message.date[:10] if today != previous_day: previous_day = today message_date = datetime(int(message.date[0:4]), int(message.date[5:7]), int(message.date[8:10]), int(message.date[11:13]), int(message.date[14:16]), int(message.date[17:19])) if self._file_start_date is None: self._file_start_date = message_date self._file_end_date = message_date # If it's a new day, end the table, and start a new one self._print_and_save(f'{" ":2s}</table>\n\n', table_array, new_day=True) self._print_and_save(f'{" ":2s}<table class="main_table">\n', table_array) self._day = message_date.strftime('%a') self._last_row_had_conversion = False self._print_and_save(self._generate_row(message), table_array) if self._last_row_had_conversion: bar() else: bar(skipped=True) self._print_and_save(f'{" ":2s}</table>\n', table_array) return ''.join(table_array) def _get_next_color(self): """ A generator function to return the next color""" counter = -1 default_background_color_list = 'AliceBlue, Cyan, Gold, Lavender, LightGreen, PeachPuff, Wheat' default_name_color_list = 'Blue, DarkCyan, DarkGoldenRod, Purple, DarkGreen, Orange, Sienna' background_color_list = self._database.config.get('DISPLAY', 'html background color list', fallback=default_background_color_list) name_color_list = self._database.config.get('DISPLAY', 'html name color list', fallback=default_name_color_list) background_colors = background_color_list.translate({ord(c): None for c in string.whitespace}).split(',') name_colors = name_color_list.translate({ord(c): None for c in string.whitespace}).split(',') while True: counter += 1 next_background_color = counter % len(background_colors) next_name_color = counter % len(name_colors) yield [background_colors[next_background_color], name_colors[next_name_color]] def _get_name(self, handle_id: str) -> dict: if handle_id not in self._name_map: (background_color, name_color) = next(self._color_list) handle_list = self._database.handles.handles if handle_id == 0: # 0 is me self._name_map[handle_id] = {'name': self._me, 'background_color': background_color, 'name_color': name_color, 'style': 'me'} elif handle_id in handle_list: handle = handle_list[handle_id] self._name_map[handle_id] = {'name': handle.name, 'background_color': background_color, 'name_color': name_color, 'style': 'them'} else: self._name_map[handle_id] = {'name': handle_id, 'background_color': background_color, 'name_color': name_color, 'style': 'them'} return self._name_map[handle_id] def _generate_row(self, message: Message) -> str: # Specify if the message is from me, or the other person if message.is_from_me: who_data = self._get_name(0) else: who_data = self._get_name(message.handle_id) who = who_data['name'] style = who_data['style'] # Check to see if we want the media box floating or fixed floating_option = self._database.config['DISPLAY'].get('popup location', fallback='floating') floating = floating_option == 'floating' # If this message is part of a thread, then show the messages in the thread before it thread_table = "" if message.thread_originator_guid: if message.thread_originator_guid in self._messages.guids: original_message = self._messages.guids[message.thread_originator_guid] thread_list = original_message.thread thread_list[original_message.rowid] = original_message print_thread = [] # sort the threads by the date sent for i in sorted(thread_list.values(), key=lambda x: x.date): if i == message: # stop at the current message break print_thread.append(i) thread_table = self._generate_thread_table(print_thread, style) # Generate the attachment string attachments_string = "" if message.attachments: for attachment_key in message.attachments: # If the attachment listed does not exist, then just list is as missing and continue to the next one if attachment_key not in self._attachment_list.attachment_list: attachments_string = f'{attachments_string} <span class="missing"> Attachment missing </span> ' continue attachment = self._attachment_list.attachment_list[attachment_key] # If the attachment exists, but we have it marked to skip, skip it if attachment.skip: continue # If the attachment exists, but is marked as missing, list it as missing and continue if attachment.missing: attachments_string = f'{attachments_string} <span class="missing"> Attachment missing </span> ' continue # If we should copy the attachment, copy it or convert it if attachment.copy: if attachment.conversion_type == 'HEIC': attachment.convert_heic_image(attachment.original_path, attachment.destination_path) self._last_row_had_conversion = True elif attachment.conversion_type == 'Audio' or attachment.conversion_type == 'Video': attachment.convert_audio_video(attachment.original_path, attachment.destination_path) self._last_row_had_conversion = True else: attachment.copy_attachment() self._last_row_had_conversion = True if floating: box_name = f'PopUp{attachment.rowid}' image_box = f'<div class="imageBox" id="PopUp{attachment.rowid}"> <img src="" /> </div>' else: box_name = 'picbox' image_box = '' if attachment.popup_type == 'Picture': if self._inline: attachment_string = f'<p><a href="{attachment.html_path}" target="_blank">' \ f'<img src="{attachment.html_path}" target="_blank"/><p>' \ f' {attachment.html_path} </a>\n' else: attachment_string = f'''<a href="{attachment.html_path}" target="_blank" onMouseOver="ShowPicture('{box_name}',1,'{attachment.html_path}')" onMouseOut="ShowPicture('{box_name}',0)"> {attachment.html_path} </a> ''' elif attachment.popup_type == 'Audio': # Not going to do popups for audio, just inline attachment_string = f'<p><audio controls> <source src="{attachment.html_path}" ' \ f'type="audio/mp3"></audio> <a href="{attachment.html_path}" ' \ f'target="_blank"> {attachment.html_path} </a>\n' elif attachment.popup_type == 'Video': if self._inline: attachment_string = f'<p><video controls> <source src="{attachment.html_path}" ' \ f' type="video/mp4"></video> <p><a href="{attachment.html_path}"' \ f' target="_blank"> {attachment.html_path} </a>\n' else: attachment_string = f'''<a href="{attachment.html_path}" target="_blank" onMouseOver="ShowMovie('{box_name}', 1, '{attachment.html_path}')"> {attachment.html_path} </a> ''' else: attachment_string = f'<a href="{attachment.html_path}" target="_blank"> ' \ f'{attachment.html_path} </a>\n' attachments_string = f'{attachments_string} <p> {attachment_string} {image_box} ' # Structure of the text row. The first three columns are normal, the fourth column is complex # <tr> # <td> date # <td> who # <td> # <table> # <tr hidden> # <td> Edited Row # </tr> # <tr> # <td> Text of message (edited if required) # <td hidden> extra text # <td> info button (if configured) # <tr> text = _replace_url_to_link(f'{message.text} {attachments_string}') # Check for edits on the text. If there are edits, then set up the html to allow for that. The row # doesn't exist if there is no edits edited_string = "" text_cell_edit_row = "" if len(message.edits) > 0: edit_table = f'{" ":14s}<div class="edits_{style}">\n' for i in range(0, len(message.edits)): edit_table = f'{edit_table}{" ":16s}"{message.edits[i]["text"]} <p>\n' edit_table = f'{edit_table}{" ":14s}</div>\n' edited_string = f'<sub><button class="edited_button"' \ f' onclick="ToggleDisplay(\'{message.rowid}editTable\')"> Edited </button></sub>' text_cell_edit_row = f'{" ":10s}<tr id={message.rowid}editTable class="edits">\n' \ f'{" ":12s}<td>\n' \ f'{edit_table} ' \ f'{" ":12s}</td>\n ' \ f'{" ":10s}</tr>\n' # Check for if we want additional data info_text = "" info_button = "" if self._database.control.getboolean('additional details', fallback=False): info_text = f'{" ":12s}<td class="infocell" id={message.rowid}info>\n ' \ f'{" ":14s}<table>\n' \ f'{" ":16s}<tr>\n' \ f'{" ":18s}<td> ChatID: {message.chat_id} </td>\n' \ f'{" ":16s}</tr>\n' \ f'{" ":14s}</table>\n' \ f'{" ":12s}</td>\n' info_button = f'{" ":12s}<td class="button-wrapper"> <button class="text_{style}" ' \ f'onclick="ToggleDisplay(\'{message.rowid}info\')"> ℹ️ </button> </td>\n' # Put together the row cells date_cell = f'{" ":6s}<td class="date"> {self._day} {message.date} </td>\n' name_cell = f'{" ":6s}<td class="name_{style}" style="color: {who_data["name_color"]};"> {who}: </td>\n' text_cell = f'{" ":6s}<td>\n ' \ f'{" ":8s}<table>\n' \ f'{text_cell_edit_row}' \ f'{" ":10s}<tr>\n' \ f'{" ":12s}<td class="text_{style}" style="background: {who_data["background_color"]};">\n' \ f'{thread_table}' \ f'{" ":14s}{text} {edited_string}\n' \ f'{" ":12s}</td>\n' \ f'{info_text}' \ f'{info_button}' \ f'{" ":10s}</tr>\n' \ f'{" ":8s}</table>\n' \ f'{" ":6s}</td>\n' row_string = f'{" ":4s}<tr id={message.rowid}>\n' \ f'{date_cell}' \ f'{name_cell}' \ f'{text_cell}' \ f'{" ":4s}</tr>\n' return row_string def _generate_head(self) -> str: popup = self._database.config['DISPLAY'].get('popup location', fallback='upper right') if popup == 'upper right': popup_location = 'right' else: popup_location = 'left' thread_background_color = self._database.config['DISPLAY'].get('thread background', fallback='HoneyDew') css = ''' <style> table {''' + f''' width: 100%; table-layout: auto; ''' + ''' } table.main_table {''' + f''' border-bottom: 3px solid black; border-spacing: 8px; ''' + ''' } table.thread_table_me {''' + f''' width: 50%; margin-right: 0px; margin-left: auto; background: {thread_background_color}; padding: 5px; border-radius: 30px; ''' + ''' } table.thread_table_them {''' + f''' width: 50%; margin-right: auto; margin-left: 0px; background: {thread_background_color}; padding: 5px; border-radius: 30px; ''' + ''' } td {''' + f''' padding: 0px; margin: 0; line-height: 1; ''' + ''' } td.date {''' + f''' text-align: left; width: 150px; vertical-align: text-middle; font-size: 80%; ''' + ''' } td.name_me {''' + f''' text-align: right; font-weight: bold; width: 50px; padding-right: 5px; vertical-align: text-middle; font-size: 80%; ''' + ''' } td.name_them {''' + f''' text-align: right; font-weight: bold; width: 50px; padding-right: 5px; vertical-align: text-middle; font-size: 80%; ''' + ''' } td.text_me {''' + f''' text-align: right; word-wrap: break-word; border-radius: 30px; padding: 15px; border-spacing: 40px; ''' + ''' } td.text_them {''' + f''' text-align: left; word-wrap: break-word; border-radius: 30px; padding: 15px; border-spacing: 40px; ''' + ''' } .edits_me {''' + f''' display: none; font-size: 70%; font-style: italic; text-align: right; border-radius: 30px; ''' + ''' } .edits_them {''' + f''' display: none; font-size: 70%; font-style: italic; text-align: left; border-radius: 30px; ''' + ''' } .infocell {''' + f''' margin-right: 0px; margin-left: auto; width: 20%; text-align: right; font-size: 70%; display: none; ''' + ''' } button.edited_button {''' + f''' font-size: 50%; border-radius: 30px; font-size: 50%; padding-left: 0px; padding-right: 0px; border-spacing: 0px; border-bottom: 0px; margin-right: 0px; margin-left: 0px; text-align: center; border: 0px; color: blue; font-style: italic; background-color: transparent; ''' + ''' } td.button-wrapper {''' + f''' margin-right: 0px; margin-left: 4px; padding-left: 0px; padding-right: 0px; border-spacing: 0px; text-align: center; width:0.1%; background-color: transparent; ''' + ''' } button.text_me {''' + f''' border-radius: 30px; font-size: 50%; padding-left: 0px; padding-right: 0px; border-spacing: 0px; border-bottom: 0px; margin-right: 0px; margin-left: 0px; text-align: center; border: 0px; ''' + ''' } button.text_them {''' + f''' border-radius: 30px; font-size: 50%; padding-left: 0px; padding-right: 0px; border-spacing: 0px; border-bottom: 0px; margin-right: 0px; margin-left: 0px; text-align: center; border: 0px; ''' + ''' } .reply_name { font-size: 50%; text-align: right; } .reply_text_me {''' + f''' border: 2px solid; border-radius: 6px; border-radius: 50px; font-size: 60% ''' + ''' } .reply_text_them {''' + f''' border: 2px solid; border-radius: 6px; border-radius: 50px; font-size: 60% ''' + ''' } td.blank {''' + f''' border: none; width: 50% ''' + ''' } .missing {''' + f''' color: red; ''' + ''' } .badjoin {''' + f''' color: red; ''' + ''' } .imageBox {''' + f''' position: absolute; visibility: hidden; height: 200; border: solid 1px #CCC; padding: 5px; ''' + ''' } img {''' + f''' height: 250px; width: auto; ''' + ''' } .picboxframe {''' + f''' position: fixed; top: 2%; ''' + f'{popup_location}: 2%;' \ ''' background: Blue; transition: all .5s ease; ''' + ''' } .next_file { text-align: center; font-size: 150%; } #file_summary { font-style: italic; } </style>''' script = ''' <script> function ToggleDisplay(id) { if (document.getElementById(id).style.display == "none") { document.getElementById(id).style.display = "inline"; } else { document.getElementById(id).style.display = "none"; } } function ShowPicture(id,show, img) { if (show=="1") { document.getElementById(id).style.visibility = "visible" document.getElementById(id).childNodes[1].src = img; } else if (show=="0") { document.getElementById(id).style.visibility = "hidden" } } function ShowMovie(id, show, movie) { var elem = document.getElementById(id); ''' \ f' var htmlstring = "<video controls onMouseOut=\'ShowMovie(\\\"\" + id + "\\\",0)\'> ' \ f'<source src=\'" + movie + "\'> </video>";' \ ''' if (show == "1") { { elem.style.visibility = "visible"; elem.innerHTML = htmlstring; } } else if (show == "0") { { elem.style.visibility = "hidden"; elem.innerHTML = " " } } } </script>''' head_string = '''<!DOCTYPE html> <html lang="en-US"> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> ''' \ f' <title> {self._messages.title} </title>\n{css}\n{script}\n</head>\n' return head_string ================================================ File: /src/imessagedb/chat.py ================================================ from datetime import datetime class Chat: """ Class for holding information about a chat """ def __init__(self, database, rowid: str, chat_identifier: str, chat_name: str, last_message_date=None) -> None: """ Parameters ---------- database : imessagedb.DB An instance of a connected database rowid, chat_identifier, chat_name : str The parameters are the fields in the database last_message_date : date The date the last message in this chat was sent""" self._database = database self._rowid = rowid self._chat_identifier = chat_identifier self._chat_name = chat_name self._last_message_date = last_message_date self._participants = [] def __repr__(self) -> str: return f'{self.rowid}: id => {self.chat_identifier} name => "{self.chat_name}" ' \ f'last_message => {self.last_message_date}, participants => "{self.participants}"' @property def rowid(self) -> str: return self._rowid @property def chat_identifier(self) -> str: return self._chat_identifier @property def chat_name(self) -> str: return self._chat_name @property def last_message_date(self) -> datetime: return self._last_message_date @last_message_date.setter def last_message_date(self, date: datetime): self._last_message_date = date @property def participants(self) -> str: """ Returns the participants in the chat """ strings = [] for handle_id in self._participants: if handle_id in self._database.handles.handles: number = self._database.handles.handles[handle_id].number name = self._database.handles.name_for_number(number) if name is None: strings.append(f'{number} ({handle_id})') else: strings.append(f'{name} ({number}):({handle_id})') else: strings.append(f'{handle_id}') return ', '.join(strings) def add_participant(self, participant: str): """ Add a participant to the chat """ if participant not in self._participants: self._participants.append(participant) ================================================ File: /src/imessagedb/__init__.py ================================================ """ Provides access to the iMessage chat.db, including parsing and output """ from importlib.metadata import version # read version from installed package __version__ = version("imessagedb") import os import configparser import logging import argparse import sys import dateutil.parser from imessagedb.db import DB from imessagedb.utils import * DEFAULT_CONFIGURATION = ''' [CONTROL] # Whether or not to copy the attachments into a different directory. This is needed for two reasons: # 1) The web browser does not have access to the directory that the attachments are stored, so it cannot display them # 2) Some of the attachments need to be converted in order to be viewed in the browser copy = True # The directory to put the output. The html file will go in {copy_directory}/{Person}.html, # and the attachments will go in {copy_directory}/{Person}_Attachments. If you specify HOME, then # it will put it in your home directory copy directory = HOME # If the file already exists in the destination directory it is not recopied, but that can be overridden by # specifying 'force copy' as true force copy = False # 'Skip attachments' ignores attachments skip attachments = False # Additional details show some other information on each message, including the chat id and any edits that have # been done additional details = False # Some extra verbosity if true verbose = True [DISPLAY] # Output type, either html or text output type = html # Number of messages in each html file. If this is 0 or not specified, it will be one large file. # There may be more messages than this per file, as it splits at the next date change after that number # of messages. split output = 1000 # Inline attachments mean that the images are in the HTML instead of loaded when hovered over inline attachments = False # Popup location is where the attachment popup window shows up, and is either 'upper right', 'upper left' or 'floating' popup location = upper right # 'me' is the name to put for your text messages me = Me # The color for the name in text output. No color is used if 'use text color' is false. # The color can only be one of the following options: # black, red, green, yellow, blue, magenta, cyan, white, light_grey, dark_grey, # light_red, light_green, light_yellow, light_blue, light_magenta, light_cyan # The way that the color selection works is that it will use the first color on the color list for the first person # in the conversation, the second for the second, third for the third, etc. If there are more participants than colors, # it will wrap around to the first color. use text color = True text color list = red, green, yellow, blue, magenta, cyan reply text color = light_grey # The background and name color of the messages in html output # The options for colors can be found here: https://www.w3schools.com/cssref/css_colors.php # The way that the color selection works is that it will use the first color on the color list for the first person # in the conversation, the second for the second, third for the third, etc. If there are more participants than colors, # it will wrap around to the first color. html background color list = AliceBlue, Cyan, Gold, Lavender, LightGreen, PeachPuff, Wheat html name color list = Blue, DarkCyan, DarkGoldenRod, Purple, DarkGreen, Orange, Sienna # The background color of the thread in replies thread background = HoneyDew [CONTACTS] # A person that you text with can have multiple numbers, and you may not always want to specify the full specific # number as stored in the handle database, so you can do the mapping here, providing the name of a person, # and a comma separated list of numbers Samantha: +18434676040, samanthasmilt@gmail.com, s12ddd2@colt.edu Abe: +16103499696 Marissa: +14029490739 ''' def _create_default_configuration(filename: str) -> None: """Generates a default configuration if one is not passed in""" f = open(filename, "w") f.write(DEFAULT_CONFIGURATION) f.close() return def _get_contacts(configuration: configparser.ConfigParser) -> dict: result = {} contact_list = configuration.items('CONTACTS') for contact in contact_list: key = contact[0] value = contact[1] # Get rid of spaces and newlines stripped = value.replace('\n', '').replace(' ', '') number_list = stripped.split(',') result[key] = number_list return result def run() -> None: """ Run the imessagedb command line""" out = sys.stdout logging.basicConfig(level=logging.INFO, format='%(asctime)s <%(name)s> %(message)s', datefmt='%Y-%m-%d %H:%M:%S') logger = logging.getLogger('main') logger.debug("Processing parameters") argument_parser = argparse.ArgumentParser() argument_parser.add_argument("--name", help="Person to get conversations about") type_mutex_group = argument_parser.add_mutually_exclusive_group() type_mutex_group.add_argument('--handle', help="A list of handles to search against", nargs='*') type_mutex_group.add_argument('--chat', help="A chat to print") argument_parser.add_argument("-c", "--configfile", help="Location of the configuration file", default=f'{os.environ["HOME"]}/.config/iMessageDB.ini') argument_parser.add_argument("-o", "--output_directory", help="The output directory where the output and attachments go") argument_parser.add_argument("--database", help="The database file to open", default=f"{os.environ['HOME']}/Library/Messages/chat.db") argument_parser.add_argument("-m", "--me", help="The name to use to refer to you", default="Me") argument_parser.add_argument("-t", "--output_type", help="The type of output", choices=["text", "html"]) argument_parser.add_argument("-i", "--inline", help="Show the attachments inline", action="store_true") copy_mutex_group = argument_parser.add_mutually_exclusive_group() copy_mutex_group.add_argument("-f", "--force", help="Force a copy of the attachments", action="store_true") copy_mutex_group.add_argument("--no_copy", help="Don't copy the attachments", action="store_true") argument_parser.add_argument("--no_attachments", help="Don't process attachments at all", action="store_true") argument_parser.add_argument("-v", "--verbose", help="Turn on additional output", action="store_true") argument_parser.add_argument('--start_time', '--start-time', help="The start date/time of the messages") argument_parser.add_argument('--end_time', '--end-time', help="The end date/time of the messages") argument_parser.add_argument('--split_output', '--split-output', help="Split the html output into files with this many messages per file") argument_parser.add_argument('--get_handles', '--get-handles', help="Display the list of handles in the database and exit", action="store_true") argument_parser.add_argument('--get_chats', '--get-chats', help="Display the list of chats in the database and exit", action="store_true") argument_parser.add_argument('--version', help="Prints the version number", action="store_true") args = argument_parser.parse_args() if args.version: print(f"imessagedb {__version__}", file=sys.stderr) exit(0) # First read in the configuration file, creating it if need be, then overwrite the values from the command line if not os.path.exists(args.configfile): _create_default_configuration(args.configfile) config = configparser.ConfigParser() config.read(args.configfile) CONTROL = 'CONTROL' DISPLAY = 'DISPLAY' config.set(CONTROL, 'verbose', str(args.verbose)) if args.output_directory: config.set(CONTROL, 'copy directory', args.output_directory) if args.no_copy: config.set(CONTROL, 'copy', 'False') if args.output_type: config.set(CONTROL, 'output type', args.output_type) if args.force: config.set(CONTROL, 'force copy', 'True') if args.no_attachments: config.set(CONTROL, 'skip attachments', 'True') if args.inline: config.set(DISPLAY, 'inline attachments', 'True') if args.split_output: config.set(DISPLAY, 'split output', args.split_output) start_date = None end_date = None if args.start_time: try: start_date = dateutil.parser.parse(args.start_time) except ValueError as exp: argument_parser.print_help(sys.stderr) print(f"\n **Start time not correct: {exp}", file=sys.stderr) exit(1) config.set(CONTROL, 'start time', str(start_date)) if args.end_time: try: end_date = dateutil.parser.parse(args.end_time) except ValueError as exp: argument_parser.print_help(sys.stderr) print(f"\n** End time not correct: {exp}", file=sys.stderr) exit(1) config.set(CONTROL, 'end time', str(end_date)) if start_date and end_date and start_date >= end_date: argument_parser.print_help(sys.stderr) print(f"\n **Start date ({start_date}) must be before end date ({end_date})", file=sys.stderr) exit(1) generic_database_request = False if args.get_handles or args.get_chats: config[CONTROL]['skip attachments'] = 'True' generic_database_request = True person = None numbers = None if not generic_database_request: if args.chat: person = f"chat_{args.chat}" if args.name: person = args.name else: if args.handle: numbers = args.handle if args.name: person = args.name else: person = ', '.join(numbers) elif args.name: person = args.name contacts = _get_contacts(config) if person.lower() not in contacts.keys(): logger.error(f"{person} not known. Please edit your contacts list.") argument_parser.print_help() exit(1) # Get rid of new lines and split it into a list numbers = config['CONTACTS'][person].replace('\n', '').split(',') else: argument_parser.print_help(sys.stderr) print("\n ** You must supply either a name or one or more handles") exit(1) config.set(CONTROL, 'Person', person) copy_directory = config[CONTROL].get('copy directory', fallback=os.environ['HOME']) if copy_directory == "HOME": copy_directory = os.environ['HOME'] attachment_directory = f"{copy_directory}/{safe_filename(person)}_attachments" config[CONTROL]['attachment directory'] = attachment_directory try: os.mkdir(attachment_directory) except FileExistsError: pass # Connect to the database database = DB(args.database, config=config) if args.get_handles: print(f"Available handles in the database:\n{database.handles.get_handles()}") sys.exit(0) if args.get_chats: print(f"Available chats in the database:\n{database.chats.get_chats()}") sys.exit(0) if args.chat: chat_id = args.chat title = args.chat if args.chat in database.chats.chat_names: chats = database.chats.chat_names[args.chat] if len(chats) != 1: error_string = f"You have {len(chats)} chats named {args.chat}, " \ f"but this program can only handle the case where there is one. " \ f"Rename your group and try again." logger.error(error_string) exit(1) chat_id = chats[0].rowid title = args.chat elif int(args.chat) in database.chats.chat_list: chat_id = int(args.chat) if database.chats.chat_list[chat_id].chat_name: title = database.chats.chat_list[chat_id].chat_name elif args.name: title = args.name else: title = chat_id else: logger.error(f"{args.chat} not recognized as a chat. Run 'imessagedb --get_chats' to get the list of chats") argument_parser.print_help() exit(1) message_list = database.Messages('chat', title, chat_id=chat_id) else: message_list = database.Messages('person', person, numbers=numbers) me = config.get('DISPLAY', 'me', fallback='Me') filename = os.path.join(copy_directory, safe_filename(person)) output_type = config[CONTROL].get('output type', fallback='html') if output_type == 'text': database.TextOutput(me, message_list, output_file=out).print() else: database.HTMLOutput(me, message_list, output_file=filename) database.disconnect() if __name__ == '__main__': run() ================================================ File: /src/imessagedb/messages.py ================================================ from imessagedb.utils import * from alive_progress import alive_bar from imessagedb.message import Message class Messages: """ All messages in a conversation or conversations with a particular person """ def __init__(self, database, query_type: str, title: str, numbers: list = None, chat_id: str = None) -> None: """ Parameters ---------- database : imessagedb.DB An instance of a connected database query_type : str The type of messages, either 'person' or 'chat' title : str The name of the conversation numbers : list A list of numbers associated with the person, as represented in the handle data table chat_id : str The id of the chat """ self._database = database self._query_type = query_type self._numbers = numbers self._chat_id = chat_id self._title = title self._guids = {} self._message_list = {} time_rules = [] time_where_clause = "" start_time = self._database.control.get('start time', fallback=None) end_time = self._database.control.get('end time', fallback=None) if start_time: database_start_date = convert_to_database_date(start_time) time_rules.append(f"message.date >= {database_start_date}") if end_time: database_end_date = convert_to_database_date(end_time) time_rules.append(f"message.date <= {database_end_date}") if len(time_rules) > 0: time_where_clause = f" and {' AND '.join(time_rules)}" if self._query_type == "person": numbers_string = "','".join(self._numbers) where_clause = "rowid in (" \ " select message_id from chat_message_join where chat_id in (" \ " select chat_id from chat_handle_join where handle_id in (" \ f" select rowid from handle where id in ('{numbers_string}')" \ " )" \ " )" \ f") {time_where_clause}" select_string = "select message.rowid, guid, " \ "datetime(message.date/1000000000 + " \ "strftime('%s', '2001-01-01'),'unixepoch','localtime'), " \ "message.is_from_me, message.handle_id, " \ " message.attributedBody, message.message_summary_info, message.text, " \ "reply_to_guid, thread_originator_guid, thread_originator_part, cmj.chat_id " \ "from message, chat_message_join cmj " \ f"where message.rowid = cmj.message_id and {where_clause} " \ "order by message.date asc" elif self._query_type == "chat": where_clause = f"rowid in (select message_id from chat_message_join where chat_id = {self._chat_id}) " \ f" {time_where_clause}" select_string = "select message.rowid, guid, " \ "datetime(message.date/1000000000 + " \ "strftime('%s', '2001-01-01'),'unixepoch','localtime'), " \ "message.is_from_me, message.handle_id, " \ " message.attributedBody, message.message_summary_info, message.text, " \ "reply_to_guid, thread_originator_guid, thread_originator_part, cmj.chat_id " \ "from message, chat_message_join cmj " \ f"where message.rowid = cmj.message_id and {where_clause} " \ "order by message.date asc" else: raise KeyError row_count_string = f"select count (*) from message where {where_clause}" self._database.connection.execute(row_count_string) (row_count_total) = self._database.connection.fetchone() row_count_total = row_count_total[0] self._database.connection.execute(select_string) i = self._database.connection.fetchone() skip_attachment = self._database.control.getboolean('skip attachments', fallback=False) with alive_bar(row_count_total, title="Getting Messages", stats="({rate}, eta: {eta})") as bar: message_count = 0 while i: (rowid, guid, date, is_from_me, handle_id, attributed_body, message_summary_info, text, reply_to_guid, thread_originator_guid, thread_originator_part, chat_id) = i message_count = message_count + 1 attachment_list = None if not skip_attachment: if rowid in self._database.attachment_list.message_join: attachment_list = self._database.attachment_list.message_join[rowid] new_message = Message(self._database, rowid, guid, date, is_from_me, handle_id, attributed_body, message_summary_info, text, reply_to_guid, thread_originator_guid, thread_originator_part, chat_id, attachment_list) self._guids[guid] = new_message self._message_list[rowid] = new_message # Manage the thread if thread_originator_guid and thread_originator_guid in self._guids: self._guids[thread_originator_guid].thread[rowid] = new_message bar() i = self._database.connection.fetchone() self._sorted_message_list = sorted(self._message_list.values(), key=lambda x: x.date) @property def message_list(self) -> list: """ Returns a list of messages sorted by the date of the message""" return self._sorted_message_list def stats(self) -> list: """ Returns a list of stats about the message list suitable for importing into a spreadsheet. The fields returned are: name: The name of the person sending the message date: The date of the message character_count: The number of characters in the message word_count: The number of words in the message text: The text of the message """ result = [] for i in self._sorted_message_list: if i.text is not None: # Take out the carriage returns so this can be parsed by a spreadsheet text = i.text.replace('\n', ' ') text_len = len(i.text) word_count = len(i.text.split(' ')) else: text = "" text_len = 0 word_count = 0 date = i.date stats = {'name': i.name, 'date': date, 'character_count': text_len, 'word_count': word_count, 'text': text} result.append(stats) return result def print_stats(self) -> str: """ Returns a string of stats about the message list suitable for importing into a spreadsheet """ result = ["Name\tDate\tCharacter Count\tWord Count\tText"] for i in self.stats(): result.append(f"{i['name']}\t{i['date']}\t{i['character_count']}\t{i['word_count']}\t{i['text']}") return '\n'.join(result) @property def guids(self) -> dict: return self._guids @property def title(self) -> str: return self._title def __iter__(self): return self._sorted_message_list.__iter__() def __len__(self) -> int: return len(self._sorted_message_list) ================================================ File: /src/imessagedb/handle.py ================================================ class Handle: """ Class for holding information about a Handle """ def __init__(self, database, rowid: str, name: str, number: str, service: str) -> None: """ Parameters ---------- database : imessagedb.DB An instance of a connected database rowid, number, service : str The parameters are the fields in the database name : str A name to associate with it, from the contacts list""" self._database = database self._rowid = rowid self._number = number self._service = service self._name = name.title() def __repr__(self) -> str: return f'{self._rowid}: Name => {self._name}, ID => {self._number}, Service => {self._service} ' @property def rowid(self) -> str: return self._rowid @property def number(self) -> str: return self._number @property def service(self) -> str: return self._service @property def name(self) -> str: return self._name ================================================ File: /src/imessagedb/attachment.py ================================================ import os import urllib.parse import re import shutil import ffmpeg import heic2png class Attachment: """ Class for holding information about an attachment """ def __init__(self, database, rowid: str, filename: str, mime_type: str, copy=False, copy_directory=None, home_directory=os.environ['HOME']) -> None: """ Parameters ---------- database : imessagedb.DB An instance of a connected database rowid, filename, mime_type : str Fields from the database copy : bool Whether or not to copy the attachment, default is False copy_directory : str The directory to copy the attachment to, default is none home_directory : str Needed to know where to get the attachments from. The attachments in the database are listed under ~/Library/Messages/Attachments, so need to be able to translate that """ self._database = database self._rowid = rowid self._filename = filename self._mime_type = mime_type self._copy = copy self._copy_directory = copy_directory self._home_directory = home_directory self._destination_path = None self._popup_type = None self._conversion_type = None self._skip = False self._missing = False self._needs_conversion = False self._force = self._database.control.getboolean('force copy', False) # The path is set to use ~, so replace it with the home directory self._original_path = self._filename.replace('~', self._home_directory) if not os.path.exists(self._original_path): self._missing = True return if self._copy_directory is None or not os.path.isdir(self._copy_directory): self._copy = False if self._copy: parts = self._original_path.split('/') last = parts.pop() penultimate = parts.pop() self._destination_filename = f'{penultimate}-{last}' self._process_mime_type() # We may need to do a conversion and therefore change the filename if self._copy: self._destination_path = f'{self._copy_directory}/{self._destination_filename}' if len(self._destination_path) > 200: # Some filenames are too long self._destination_filename = f'{self._destination_filename[:50]}---{self._destination_filename[-50:]}' self._destination_path = f'{self._copy_directory}/{self._destination_filename}' else: self._destination_path = self._original_path return @property def rowid(self) -> str: """ Return the rowid """ return self._rowid @property def filename(self) -> str: """ Return the filename of the attachment """ return self._filename @property def mime_type(self) -> str: """ Return the mime_type of the attachment""" return self._mime_type @property def copy(self) -> bool: """ Return if the attachment should be copied """ return self._copy @property def destination_path(self) -> str: """ Return the destination path the attachment will be copied to """ return self._destination_path @property def popup_type(self) -> str: """ Return the popup type, one of [Audio, Video, Picture] """ return self._popup_type @property def conversion_type(self) -> str: """ Return the conversion type, one of [Audio, HEIC, Video] """ return self._conversion_type @property def skip(self) -> bool: """ Return if we need to skip this attachment """ return self._skip @property def missing(self) -> bool: """ Return if the attachment is missing """ return self._missing @property def original_path(self) -> str: """ Return the original path of the attachment """ return self._original_path @property def destination_filename(self) -> str: """ Return the filename at the destination (not the whole path)""" return self._destination_filename @property def html_path(self) -> str: """ Return the html escaped path """ return urllib.parse.quote(self.destination_path) @property def link_path(self) -> str: """ Return a link to the attachment """ return f"file://{urllib.parse.quote(self.destination_path)}" def _process_mime_type(self) -> None: """ Based on the mime_type, decide how to convert the file if necessary and how to rename it """ if self._mime_type is None: # The only type of file that doesn't have a mime type that I am currently interested in are audio files search_name = re.search(".caf$", self._filename) if search_name: self._popup_type = "Audio" self._conversion_type = "Audio" if self.copy: self._destination_filename = f'{self._destination_filename}.mp3' else: self._skip = True # We don't care about this attachment elif self._mime_type[0:5] == "image": self._popup_type = "Picture" if self.mime_type == 'image/heic': self._conversion_type = "HEIC" if self._copy: self._destination_filename = f'{self._destination_filename}.png' # Process audio elif self._mime_type[0:5] == "audio": self._popup_type = "Audio" self._conversion_type = "Audio" if self._copy: self._destination_filename = f'{self._destination_filename}.mp3' # Process Video elif self._mime_type[0:5] == "video": self._popup_type = "Video" if self.mime_type != "video/mp4": self._conversion_type = "Video" if self.copy: self._destination_filename = f'{self._destination_filename}.mp4' def copy_attachment(self) -> None: """ Copy the attachment """ # Skip the file copy if the copy already exists if self._force or not os.path.exists(self._destination_path): print(f"Copying {self._destination_filename}") try: shutil.copyfile(self._original_path, self._destination_path) return except Exception as exp: print(f"Failed to copy {self._destination_filename}: {exp}") return else: # If the file already exists, do nothing return def convert_heic_image(self, heic_location: str, png_location: str) -> None: """ Convert a HEIC image to a PNG so it can be viewed in the browser """ # Don't do the expensive conversion if we've already converted it if self._force or not os.path.exists(png_location): try: print(f"Converting {os.path.basename(png_location)}") heic_image = heic2png.HEIC2PNG(heic_location) heic_image.save(png_location) return except Exception as exp: print(f'Failed to convert {heic_location} to {png_location}: {exp}') return else: # If the file exists already, don't convert it return def convert_audio_video(self, original: str, converted: str) -> None: """ Convert an audio or video file from an undisplayable source to something the browser can display""" if self._force or not os.path.exists(converted): try: print(f"Converting {os.path.basename(converted)}") stream = ffmpeg.input(original) stream = ffmpeg.output(stream, converted) stream = ffmpeg.overwrite_output(stream) ffmpeg.run(stream, quiet=True) return except Exception as exp: print(f'Failed to convert {original} to {converted}: {exp}') return else: # If the file exists already, don't convert it return ================================================ File: /src/imessagedb/handles.py ================================================ from imessagedb.handle import Handle class Handles: """ All handles in the database """ def __init__(self, database) -> None: """ Parameters ---------- database : imessagedb.DB An instance of a connected database """ self._database = database self._handle_list = {} # Handles by rowid self._numbers = {} # Handles by phone number / email self._names = {} # Handles by name (from contact list) self._contacts_by_name = {} self._contacts_by_number = {} # Process the contacts first if self._database.config.has_section('CONTACTS'): for (key, value) in self._database.config.items('CONTACTS'): # Capitalize the first letter of every word, since configparser loses case key = key.title() value = value.replace('\n', '') values = value.split(',') self._contacts_by_name[key] = values for item in values: self._contacts_by_number[item] = key self._get_handles_from_database() return def _get_handles_from_database(self): self._database.connection.execute('select rowid, id, service from handle') rows = self._database.connection.fetchall() for row in rows: rowid = row[0] number = row[1] service = row[2] if number in self._contacts_by_number: name = self._contacts_by_number[number] else: name = number new_handle = Handle(self._database, rowid, name, number, service) # Add the handle to the rowid dictionary self._handle_list[new_handle.rowid] = new_handle # Add the handle to the numbers dictionary if new_handle.number in self._numbers: self._numbers[new_handle.number].append(new_handle) else: self._numbers[new_handle.number] = [new_handle] # Add the handle to the names dictionary if new_handle.number in self._contacts_by_number: name = self._contacts_by_number[new_handle.number] if name in self._names: self.names[name].append(new_handle) else: self._names[name] = [new_handle] def get_handles(self) -> str: """ Return a string with the list of handles""" number_list = list(self._numbers.keys()) return_string = '\n'.join(number_list) return return_string @property def handles(self) -> dict: """ Return the list of handles """ return self._handle_list @property def numbers(self) -> dict: """ Return the list of handles indexed by the number """ return self._numbers @property def names(self) -> dict: """ Return the list of handles indexed by the number """ return self._names def name_for_number(self, number: str) -> str: if number in self._contacts_by_number: return self._contacts_by_number[number] return None def __iter__(self): return self._handle_list def __len__(self) -> int: return len(self._handle_list) def __repr__(self) -> str: handle_array = [] for i in sorted(self._handle_list.keys()): handle_array.append(self._handle_list[i]) return '\n'.join(map(str, handle_array)) def __getitem__(self, item: str) -> Handle: if item in self._names: return self._names[item] if item in self._numbers: return self._numbers[item] raise KeyError ================================================ File: /src/imessagedb/generate_text.py ================================================ from termcolor import colored from datetime import datetime import string class TextOutput: """ Creates a text file (or string) from a Messages list ... There are a number of options in the configuration file that affect how the HTML is created. In the DISPLAY section, the following impact the output: me = Me : The name to put for your part of the conversation. It defaults to 'Me'. use text color = True : If False, don't color the text at all text color list = red, green, yellow, blue, magenta, cyan The color for the name in text output. No color is used if 'use text color' is false. The color can only be one of the following options: black, red, green, yellow, blue, magenta, cyan, white, light_grey, dark_grey, light_red, light_green, light_yellow, light_blue, light_magenta, light_cyan The way that the color selection works is that it will use the first color on the color list for the first person in the conversation, the second for the second, third for the third, etc. If there are more participants than colors, it will wrap around to the first color. reply text color = light_grey : The color for the reply text """ def __init__(self, database, me: str, messages, output_file=None) -> None: self._database = database self._me = me self._messages = messages self._attachment_list = self._database.attachment_list self._output_file = output_file self._color_list = self._get_next_color() self._name_map = {} self._use_color = self._database.config['DISPLAY'].getboolean('use text color', fallback=True) self._me_color = self._database.config['DISPLAY'].get('me text color', fallback="blue") self._them_color = self._database.config['DISPLAY'].get('them text color', fallback="magenta") self._reply_color = self._database.config['DISPLAY'].get('reply text color', fallback="light_grey") start_time = self._database.control.get('start time', fallback=None) end_time = self._database.control.get('end time', fallback=None) date_string = "" if start_time: date_string = f"from {start_time} " if end_time: date_string = f"{date_string} until {end_time}" header_string = f"Exchanged {len(self._messages.message_list):,} messages with " \ f"{self._messages.title} {date_string}" self._string_array = [header_string] self._get_messages() return def _get_name(self, handle_id: str) -> dict: if handle_id not in self._name_map: color = next(self._color_list) handle_list = self._database.handles.handles if handle_id == 0: # 0 is me self._name_map[handle_id] = {'name': self._me, 'color': color} elif handle_id in handle_list: handle = handle_list[handle_id] self._name_map[handle_id] = {'name': handle.name, 'color': color} else: self._name_map[handle_id] = {'name': handle_id, 'color': color} return self._name_map[handle_id] def _print_thread(self, thread_header, current_message: str) -> str: thread_string = "" thread_list = thread_header.thread thread_list[thread_header.rowid] = thread_header for i in sorted(thread_list.values(), key=lambda x: x.date): if i == current_message: break attachment_string = "" if i.attachments is not None: attachment_string = f" Attachments: {i.attachments}" thread_string = f'{thread_string}[{self._name_map[i.handle_id]["name"]}: {i.text}{attachment_string}] ' return thread_string def _get_messages(self) -> None: for message in self._messages.message_list: date = message.date day = datetime(int(date[0:4]), int(date[5:7]), int(date[8:10]), int(date[11:13]), int(date[14:16]), int(date[17:19])).strftime('%a') if message.is_from_me: who_data = self._get_name(0) else: who_data = self._get_name(message.handle_id) who = self._color(who_data['name'], who_data['color'], attrs=['bold']) reply_to = "" attachment_string = "" if message.attachments: attachments_array = [] attachment_list = self._database.attachment_list.attachment_list for i in message.attachments: if i in attachment_list: attachments_array.append(attachment_list[i].original_path) else: attachments_array.append(f"Missing attachment ({i})") attachment_string = f'Attachments: {",".join(attachments_array)}' if message.thread_originator_guid: if message.thread_originator_guid in self._messages.guids: original_message = self._messages.guids[message.thread_originator_guid] reply_to = self._color(f'Reply to: {self._print_thread(original_message, message)}', self._reply_color) self._string_array.append(f'<{day} {date}> {who}: {message.text} {reply_to} {attachment_string}') def _get_next_color(self): """ A generator function to return the next color""" counter = -1 color_list = self._database.config.get('DISPLAY', 'text color list', fallback='red, green, yellow, blue, magenta, cyan') colors = color_list.translate({ord(c): None for c in string.whitespace}).split(',') while True: counter += 1 yield colors[counter % len(colors)] def _color(self, text: str, color: str, attrs=None) -> str: if self._use_color: if attrs: return colored(text, color, attrs=attrs) else: return colored(text, color) else: return text def save(self) -> None: """ Save the text output to the file """ print('\n'.join(self._string_array), file=self._output_file) return def print(self) -> None: """ Print the text output to stdout """ print('\n'.join(self._string_array)) return def __repr__(self) -> str: return '\n'.join(self._string_array) ================================================ File: /src/imessagedb/message.py ================================================ import plistlib from datetime import datetime from imessagedb.utils import * import imessagedb def _convert_attributed_body(encoded: bytes) -> str: # This general logic was copied from # https://github.com/my-other-github-account/imessage_tools/blob/master/imessage_tools.py, however # I needed to make some improvements text = encoded.split(b'NSNumber')[0] text = text.split(b'NSString')[1] text = text.split(b'NSDictionary')[0] text = text[6:-12] if b'\x01' in text: text = text.split(b'\x01')[1] if b'\x02' in text: text = text.split(b'\x02')[1] if b'\x00' in text: text = text.split(b'\x00')[1] if b'\x86' in text: text = text.split(b'\x86')[0] return text.decode('utf-8', errors='replace') class Message: """ Class for holding information about a message """ def __init__(self, database, rowid: int, guid: str, date: str, is_from_me: bool, handle_id: str, attributed_body: bytes, message_summary_info: bytes, text: str, reply_to_guid: str, thread_originator_guid: str, thread_originator_part: str, chat_id: str, message_attachments: list): """ Parameters ---------- database : imessagedb.DB An instance of a connected database rowid, guid, date, is_from_me, handle_id, attributed_body, message_summary_info, text, reply_to_guid, thread_originator_guid, thread_originator_part, chat_id : str The parameters are the fields in the database message_attachments : list The attachments for this message""" self._rowid = rowid self._guid = guid self._date = date self._is_from_me = is_from_me self._handle_id = handle_id self._attributed_body = attributed_body self._message_summary_info = message_summary_info self._text = text self._attributed_body = attributed_body self._reply_to_guid = reply_to_guid self._thread_originator_guid = thread_originator_guid self._thread_originator_part = thread_originator_part self._chat_id = chat_id self._attachments = message_attachments self._thread = {} self._edits = [] # There are a lot of messages that are saved into attributed_body instead of the text field. # There isn't a good way to convert this in Python that I've found, so I have to run a # program to do it. I need to fix this. if (self._text is None or text == '' or text == ' ') and self._attributed_body is not None: self._text = _convert_attributed_body(self._attributed_body) # Edits are stored in message_summary_info try: plist = plistlib.loads(self._message_summary_info) if 'ec' in plist: for row in plist['ec']['0']: self._edits.append({'text': _convert_attributed_body(row['t']), 'date': convert_from_database_date(row['d'])}) except plistlib.InvalidFileException as exp: pass def __repr__(self) -> str: return_string = f'RowID: {self._rowid}' \ f' GUID: {self._guid}' \ f' Date: {self._date}' \ f' From me: {self._is_from_me}' \ f' HandleID: {self._handle_id}' \ f' Message: {self._text}' \ f' Originator Thread: {self._thread_originator_guid}' \ f' Reply Message: {self._reply_to_guid}' \ f' Thread Part: {self._thread_originator_part}' \ f' Chat ID: {self._chat_id}' \ f' Attachments: {self._attachments}' return str(return_string) @property def rowid(self) -> int: return self._rowid @property def guid(self) -> str: return self._guid @property def date(self) -> str: return self._date @property def is_from_me(self) -> bool: return self._is_from_me @property def handle_id(self) -> str: return self._handle_id @property def attributed_body(self) -> bytes: return self._attributed_body @property def message_summary_info(self) -> bytes: return self._message_summary_info @property def text(self) -> str: return self._text @property def edits(self) -> list: return self._edits @property def reply_to_guid(self) -> str: return self._reply_to_guid @property def thread_originator_guid(self) -> str: return self._thread_originator_guid @property def chat_id(self) -> str: return self._chat_id @property def attachments(self) -> list: return self._attachments @property def thread(self) -> dict: return self._thread ================================================ File: /src/imessagedb/db.py ================================================ import configparser import os import sqlite3 import imessagedb from imessagedb.attachments import Attachments from imessagedb.chats import Chats from imessagedb.handles import Handles from imessagedb.generate_html import HTMLOutput from imessagedb.messages import Messages from imessagedb.generate_text import TextOutput class DB: """ A class to connect to an iMessage database """ def __init__(self, database_name=None, config=None): """ Parameters ---------- database_name : str The database that it connects to (the default is to use the default database in the caller's home directory) config : ConfigParser Configuration information. If none is provided, we will create a default. """ if database_name is None: database_name = f"{os.environ['HOME']}/Library/Messages/chat.db" self._database_name = database_name self._configuration = config if self._configuration is None: self._configuration = configparser.ConfigParser() self._configuration.read_string(imessagedb.DEFAULT_CONFIGURATION) self._control = self._configuration['CONTROL'] self._chat_connection = sqlite3.connect(database_name) self._cursor = self._chat_connection.cursor() # Preload some of the data self._handles = Handles(self) self._chats = Chats(self) self._attachment_list = Attachments(self) return def Messages(self, query_type: str, title: str, numbers: list = None, chat_id: str = None) -> Messages: """A wrapper to create a Messages class """ return Messages(self, query_type, title, numbers=numbers, chat_id=chat_id) def HTMLOutput(self, me: str, message_list: Messages, inline=False, output_file=None) -> HTMLOutput: """A wrapper to create an HTMLOutput class """ return HTMLOutput(self, me, message_list, inline, output_file) def TextOutput(self, me: str, message_list: Messages, output_file=None) -> TextOutput: """A wrapper to create a TextOutput class """ return TextOutput(self, me, message_list, output_file) def disconnect(self) -> None: """Disconnects from the database """ self._chat_connection.close() return @property def connection(self) -> sqlite3.Cursor: """Returns a connection to query the database """ return self._cursor @property def handles(self) -> Handles: """Returns an imessagedb.Handles class with all the handles """ return self._handles @property def chats(self) -> Chats: """Returns an imessagedb.Chats class with all the chats """ return self._chats @property def attachment_list(self) -> Attachments: """Returns an imessagedb.Attachments class with all the attachments """ return self._attachment_list @property def config(self) -> configparser.ConfigParser: """Returns the configuration object """ return self._configuration @property def control(self): """Returns a shortcut to the CONTROL section of the configuration """ return self._control ================================================ File: /src/imessagedb/attachments.py ================================================ from imessagedb.attachment import Attachment from alive_progress import alive_bar class Attachments: """ All attachments """ def __init__(self, database, copy=None, copy_directory=None) -> None: """ Parameters ---------- database : imessagedb.DB An instance of a connected database copy : bool Whether or not to copy attachments, default is to use the configuration parameter copy_directory : str The directory to copy attachments into """ self._database = database if copy is None: self._copy = self._database.control.getboolean('copy', fallback=False) else: self._copy = copy if copy_directory is None: self._copy_directory = self._database.control.get('attachment directory', fallback="/tmp/attachments") else: self._copy_directory = copy_directory self._attachment_list = {} self._message_join = {} # Get the list of all the attachments, unless we are skipping them if self._database.control.getboolean('skip attachments', fallback=False): return self._database.connection.execute('select count(rowid)from attachment') (row_count_total) = self._database.connection.fetchone() row_count_total = row_count_total[0] self._database.connection.execute('select rowid, filename, mime_type from attachment') i = self._database.connection.fetchone() with alive_bar(row_count_total, title="Getting Attachments", stats="({rate}, eta: {eta})") as bar: while i: rowid = i[0] filename = i[1] mime_type = i[2] if filename is not None: self.attachment_list[rowid] = Attachment(self._database, rowid, filename, mime_type, copy=self._copy, copy_directory=self._copy_directory) bar() i = self._database.connection.fetchone() # Get the join of attachments and messages self._database.connection.execute('select message_id, attachment_id from message_attachment_join') i = self._database.connection.fetchone() while i: message_id = i[0] attachment_id = i[1] if message_id in self.message_join: self.message_join[message_id].append(attachment_id) else: self.message_join[message_id] = [attachment_id] i = self._database.connection.fetchone() return @property def attachment_list(self) -> dict: """ Return the dictionary of all attachments """ return self._attachment_list @property def message_join(self) -> dict: """ Return the mapping of messages to attachments """ return self._message_join def __iter__(self): return self.attachment_list.__iter__() def __len__(self) -> int: return len(self.attachment_list.keys()) ================================================ File: /src/imessagedb/utils.py ================================================ """ Utility functions for the class """ from datetime import datetime mac_epoch_start = int(datetime(2001, 1, 1, 0, 0, 0).strftime('%s')) def convert_to_database_date(date_string: str) -> float: date_ = datetime.strptime(date_string, '%Y-%m-%d %H:%M:%S') epoch_date = int(date_.strftime('%s')) diff = epoch_date - mac_epoch_start return diff * 1000000000 def convert_from_database_date(date_value: float) -> datetime: # date_ = date_value / 1000000000 epoch_date = date_value + mac_epoch_start return datetime.fromtimestamp(epoch_date) def safe_filename(filename: str) -> str: safe_name = filename.replace(' ', '_') return safe_name ================================================ File: /src/imessagedb/chats.py ================================================ from imessagedb.chat import Chat class Chats: """ All Chats in the database """ def __init__(self, database) -> None: """ Parameters ---------- database : imessagedb.DB An instance of a connected database""" self._database = database self._chat_list = {} # Chat list by rowid self._chat_identifiers = {} # Chat list by identifier self._chat_names = {} # Chat list by name self._get_chats_from_database() return def __repr__(self) -> str: string_array = [] for i in sorted(self.chat_list): string_array.append(str(self.chat_list[i])) return '\n'.join(string_array) def __len__(self) -> int: return len(self._chat_list) def _get_chats_from_database(self) -> None: self._database.connection.execute('select rowid, chat_identifier, display_name from chat') rows = self._database.connection.fetchall() for row in rows: rowid = row[0] chat_identifier = row[1] display_name = row[2] new_chat = Chat(self._database, rowid, chat_identifier, display_name) self.chat_list[new_chat.rowid] = new_chat # Add the chat to the chat_identifiers if new_chat.chat_identifier in self.chat_identifiers: self._chat_identifiers[new_chat.chat_identifier].append(new_chat) else: self._chat_identifiers[new_chat.chat_identifier] = [new_chat] # Add the chat to the chat_names if new_chat.chat_name != "": if new_chat.chat_name in self._chat_names: self._chat_names[new_chat.chat_name].append(new_chat) else: self._chat_names[new_chat.chat_name] = [new_chat] # Add the last chat date to all the chats for i in self.chat_list.values(): select_string = "select " \ "datetime(max(message_date)/1000000000 + strftime('%s', '2001-01-01'),'unixepoch','localtime') " \ f"from chat_message_join cmj where chat_id = {i.rowid}" self._database.connection.execute(select_string) rows = self._database.connection.fetchall() i.last_message_date = rows[0][0] # Add the participants for all the chats self._database.connection.execute('select chat_id, handle_id from chat_handle_join') rows = self._database.connection.fetchall() for row in rows: chat_id = row[0] handle_id = row[1] self.chat_list[row[0]].add_participant(row[1]) return def get_chats(self) -> str: """ Return a string with the list of chats in the database""" return_array = [] for chat_id in sorted(self._chat_list): chat = self._chat_list[chat_id] chat_name = "" if chat.chat_name and chat.chat_name != '': chat_name = f"{chat.rowid} ({chat.chat_name}):" else: chat_name = f"{chat.rowid}:" chat_string = f"{chat_name} Participants: {chat.participants}, Last Message Sent: {chat.last_message_date}" return_array.append(chat_string) return '\n'.join(return_array) @property def chat_list(self) -> dict: """ Return the list of chats by rowid in a dict""" return self._chat_list @property def chat_names(self) -> dict: """ Return the list of chats by rowid in a dict""" return self._chat_names @property def chat_identifiers(self) -> dict: """ Return the list of chats by chat identifier in a dict""" return self._chat_identifiers def __getitem__(self, item) -> Chat: if item in self._chat_names: return self._chat_names[item] if item in self._chat_list: return self._chat_list[item] if item in self._chat_identifiers: return self._chat_identifiers[item] raise KeyError