imessage-query-fastmcp-mcp-server
by hannesrudolph
Verified
================================================
File: /docs/changelog.md
================================================
```{include} ../CHANGELOG.md
```
================================================
File: /docs/contributing.md
================================================
```{include} ../CONTRIBUTING.md
```
================================================
File: /docs/make.bat
================================================
@ECHO OFF
pushd %~dp0
REM Command file for Sphinx documentation
if "%SPHINXBUILD%" == "" (
set SPHINXBUILD=sphinx-build
)
set SOURCEDIR=.
set BUILDDIR=_build
%SPHINXBUILD% >NUL 2>NUL
if errorlevel 9009 (
echo.
echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
echo.installed, then set the SPHINXBUILD environment variable to point
echo.to the full path of the 'sphinx-build' executable. Alternatively you
echo.may add the Sphinx directory to PATH.
echo.
echo.If you don't have Sphinx installed, grab it from
echo.https://www.sphinx-doc.org/
exit /b 1
)
if "%1" == "" goto help
%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
goto end
:help
%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
:end
popd
================================================
File: /docs/Makefile
================================================
# Minimal makefile for Sphinx documentation
#
# You can set these variables from the command line, and also
# from the environment for the first two.
SPHINXOPTS ?=
SPHINXBUILD ?= sphinx-build
SOURCEDIR = .
BUILDDIR = _build
# Put it first so that "make" without argument is like "make help".
help:
@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
.PHONY: help Makefile
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
================================================
File: /docs/requirements.txt
================================================
myst-nb
sphinx-autoapi
sphinx-rtd-theme
================================================
File: /docs/index.md
================================================
```{include} ../README.md
```
```{toctree}
:maxdepth: 1
:hidden:
JulioHtml.png
JulioText.png
changelog.md
contributing.md
autoapi/index
```
================================================
File: /docs/conf.py
================================================
# Configuration file for the Sphinx documentation builder.
#
# For the full list of built-in configuration values, see the documentation:
# https://www.sphinx-doc.org/en/master/usage/configuration.html
# -- Project information -----------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information
project = 'iMessageDB'
copyright = '2023, Xev Gittler'
author = 'Xev Gittler'
release = '1.0.3'
# -- General configuration ---------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration
extensions = ["myst_parser",
'autoapi.extension',
"sphinx.ext.napoleon",
"sphinx.ext.viewcode"]
autoapi_dirs = ['../src']
templates_path = ['_templates']
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
# -- Options for HTML output -------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output
html_theme = 'sphinxdoc'
html_static_path = ['_static']
================================================
File: /main.py
================================================
# This is a sample Python script.
import argparse
import logging
import configparser
import os
import sys
import imessagedb
def create_default_config(file_name):
config = configparser.ConfigParser()
config['DISPLAY'] = {
'me_background_color': 'AliceBlue',
'them_background_color': 'Lavender',
'me_name': 'Blue',
'them_name': 'Purple',
'thread_background': 'HoneyDew',
'me_thread': 'AliceBlue',
'them_thread': 'Lavender',
}
with open(file_name, 'w') as configfile:
config.write(configfile)
return config
def Usage():
print(
f'{sys.argv[0]} --output_directory <dir> --name <name> --verbose --debug')
sys.exit()
def print_handles(db):
print(db.handles)
return
def get_contacts(configuration):
result = {}
contact_list = configuration.items('CONTACTS')
for contact in contact_list:
key = contact[0]
value = contact[1]
stripped = value.replace('\n', '').replace(' ', '')
number_list = stripped.split(',')
result[key] = number_list
return result
if __name__ == '__main__':
imessagedb.run()
exit(0)
config_file = f'{os.environ["HOME"]}/.config/iMessageDB.ini'
create = False
verbose = True
debug = False
no_table = False
no_copy = False
no_attachment = False
inline = False
output_directory = '/tmp'
logging.basicConfig(level=logging.INFO, format='%(asctime)s <%(name)s> %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
logger = logging.getLogger('main')
logger.info("Processing parameters")
argument_parser = argparse.ArgumentParser()
argument_parser.add_argument("--name", help="Person to get conversations about", required=True)
argument_parser.add_argument("-c", "--configfile", help="Location of the configuration file",
default=f'{os.environ["HOME"]}/.config/iMessageDB.ini')
argument_parser.add_argument("-o", "--output_directory",
help="The output directory where the output and attachments go")
argument_parser.add_argument("--database", help="The database file to open",
default=f"{os.environ['HOME']}/Library/Messages/chat.db")
argument_parser.add_argument("-m", "--me", help="The name to use to refer to you", default="Me")
argument_parser.add_argument("-t", "--output_type", help="The type of output", choices=["text", "html"])
argument_parser.add_argument("-i", "--inline", help="Show the attachments inline", action="store_true")
mutex_group = argument_parser.add_mutually_exclusive_group()
mutex_group.add_argument("-f", "--force", help="Force a copy of the attachments", action="store_true")
mutex_group.add_argument("--no_copy", help="Don't copy the attachments", action="store_true")
argument_parser.add_argument("--no_attachments", help="Don't process attachments at all", action="store_true")
argument_parser.add_argument("-v", "--verbose", help="Turn on additional output", action="store_true")
args = argument_parser.parse_args()
# First read in the configuration file, creating it if need be, then overwrite the values from the command line
if not os.path.exists(args.configfile):
imessagedb._create_default_configuration(args.configfile)
config = configparser.ConfigParser()
config.read(args.configfile)
CONTROL = 'CONTROL'
DISPLAY = 'DISPLAY'
config.set(CONTROL, 'Person', args.name)
config.set(CONTROL, 'verbose', str(args.verbose))
if args.output_directory:
config.set(CONTROL, 'copy directory', args.output_directory)
if args.no_copy:
config.set(CONTROL, 'copy', 'False')
if args.output_type:
config.set(CONTROL, 'output type', args.output_type)
if args.force:
config.set(CONTROL, 'force copy', 'True')
if args.no_attachments:
config.set(CONTROL, 'skip attachments', 'True')
if args.inline:
config.set(DISPLAY, 'inline attachments', 'True')
out = sys.stdout
contacts = get_contacts(config)
Person = config[CONTROL]['Person']
if Person.lower() not in contacts.keys():
logger.error(f"{Person} not known. Please edit contacts list.")
Usage()
config[CONTROL]['attachment directory'] = f"{config[CONTROL]['copy directory']}/{Person}_attachments"
filename = f'{Person}.html'
out = open(f"{config[CONTROL]['copy directory']}/{filename}", 'w')
try:
os.mkdir(config['CONTROL']['attachment directory'])
except FileExistsError:
pass
database = imessagedb.DB(args.database, config=config)
message_list = database.Messages(Person, contacts[Person.lower()])
# database.TextOutput(args.me, Person, message_list, output_file=out).print()
html_string = database.HTMLOutput(args.me, Person, message_list, output_file=out)
print("The end")
database.disconnect()
# See PyCharm help at https://www.jetbrains.com/help/pycharm/
================================================
File: /tests/test_handle.py
================================================
import imessagedb
import os
def test_handles():
database = imessagedb.DB(os.path.join(os.path.dirname(__file__), "chat.db"))
assert len(database.handles) == 2, "Unexpected number of handles"
def test_handle():
database = imessagedb.DB(os.path.join(os.path.dirname(__file__), "chat.db"))
h = database.handles
hand = h.handles[2]
assert hand.number == "scripting@schore.org", "Unexpected handle"
================================================
File: /tests/test_attachments.py
================================================
import imessagedb
import os
def test_attachments():
database = imessagedb.DB(os.path.join(os.path.dirname(__file__), "chat.db"))
attachments = database.attachment_list
assert len(attachments.attachment_list) == 2, "Unexpected number of attachments"
def test_attachment():
database = imessagedb.DB(os.path.join(os.path.dirname(__file__), "chat.db"))
attachments = database.attachment_list
attachment = attachments.attachment_list[98368]
expected_path = f"{os.environ['HOME']}/Library/Messages/Attachments/4f/15/D7CEBAED-9844-4B25-B841-F7748EA3BCAD/IMG_4911.heic"
assert attachment.original_path == expected_path, "Unexpected value in attachment"
================================================
File: /tests/test_db.py
================================================
import configparser
import imessagedb
import os
def test_connection():
database = imessagedb.DB(os.path.join(os.path.dirname(__file__), "chat.db"))
assert database
def test_configuration():
database = imessagedb.DB(os.path.join(os.path.dirname(__file__), "chat.db"))
config = database.config
assert isinstance(config, configparser.ConfigParser), \
"Expected return of database.config to be of class 'configparser.ConfigParser'"
assert config['CONTROL'], "Expected the CONTROL section to be in the configuration"
================================================
File: /tests/test_chat.py
================================================
import imessagedb
import os
def test_chats():
database = imessagedb.DB(os.path.join(os.path.dirname(__file__), "chat.db"))
assert len(database.chats) == 2, "Unexpected number of chats"
================================================
File: /tests/test_imessagedb.py
================================================
from imessagedb import db
================================================
File: /tests/imessagedb_test.py
================================================
#!/Users/xev/opt/anaconda3/envs/imessagedb/bin/python
# -*- coding: utf-8 -*-
import re
import sys
from imessagedb import run
if __name__ == '__main__':
sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
sys.exit(run())
================================================
File: /.github/workflows/ci-cd.yml
================================================
name: ci-cd
on: [push, pull_request]
jobs:
ci:
# Set up operating system
runs-on: ubuntu-latest
# Define job steps
steps:
- name: Set up Python 3.9
uses: actions/setup-python@v2
with:
python-version: 3.9
- name: Check-out repository
uses: actions/checkout@v2
- name: Install poetry
uses: snok/install-poetry@v1
- name: Install package
run: poetry install
- name: Test with pytest
run: poetry run pytest tests/ --cov=imessagedb --cov-report=xml
- name: Use Codecov to track coverage
uses: codecov/codecov-action@v2
with:
files: ./coverage.xml # coverage report
- name: Build documentation
run: poetry run make html --directory docs/
cd:
# Only run this job if the "ci" job passes
needs: ci
# Only run this job if new work is pushed to "main"
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
# Set up operating system
runs-on: ubuntu-latest
# Define job steps
steps:
- name: Set up Python 3.9
uses: actions/setup-python@v2
with:
python-version: 3.9
- name: Check-out repository
uses: actions/checkout@v2
with:
fetch-depth: 0
- name: Install poetry
uses: snok/install-poetry@v1
- name: Install package
run: poetry install
- name: Use Python Semantic Release to prepare release
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
git config user.name github-actions
git config user.email github-actions@github.com
poetry run semantic-release publish
- name: Publish to PyPI
uses: pypa/gh-action-pypi-publish@release/v1
with:
user: __token__
password: ${{ secrets.PYPI_API_TOKEN }}
================================================
File: /.readthedocs.yml
================================================
# Required
version: 2
# Image to use
build:
image: testing
# Configuration
python:
version: 3.9
install:
- requirements: docs/requirements.txt
- method: pip
path: .
================================================
File: /pyproject.toml
================================================
[tool.poetry]
name = "imessagedb"
version = "1.4.8"
description = "Reads and displays the Apple iMessage database"
authors = ["xev <git@schore.org>"]
license = "MIT"
readme = "README.md"
[tool.poetry.dependencies]
python = "^3.9"
alive-progress = "^3.1.2"
ffmpeg-python = "^0.2.0"
heic2png = "^1.0.0"
configparser = "^5.3.0"
termcolor = "^2.3.0"
python-dateutil = "^2.8.2"
[tool.poetry.group.dev.dependencies]
pytest = "^7.3.1"
pytest-cov = "^4.0.0"
myst-nb = {version = "^0.17.2", python = "^3.9"}
sphinx-autoapi = "^2.1.0"
sphinx-rtd-theme = "^1.2.0"
python-semantic-release = "^7.33.4"
[tool.poetry.scripts]
imessagedb = 'imessagedb:run'
[tool.semantic_release]
version_variable = "pyproject.toml:version"
branch = "main" # branch to make releases of
changelog_file = "CHANGELOG.md" # changelog file
build_command = "poetry build" # build dists
dist_path = "dist/" # where to put dists
upload_to_release = true # auto-create GitHub release
upload_to_pypi = false # don't auto-upload to PyPI
remove_dist = false # don't remove dists
patch_without_tag = true # patch release by default
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
================================================
File: /poetry.lock
================================================
[Content ignored: file too large]
================================================
File: /CONDUCT.md
================================================
# Code of Conduct
## Our Pledge
In the interest of fostering an open and welcoming environment, we as contributors and maintainers pledge to making participation in our project and our community a harassment-free experience for everyone, regardless of age, body size, disability, ethnicity, gender identity and expression, level of experience, nationality, personal appearance, race, religion, or sexual identity and orientation.
## Our Standards
Examples of behavior that contributes to creating a positive environment include:
* Using welcoming and inclusive language
* Being respectful of differing viewpoints and experiences
* Gracefully accepting constructive criticism
* Focusing on what is best for the community
* Showing empathy towards other community members
Examples of unacceptable behavior by participants include:
* The use of sexualized language or imagery and unwelcome sexual attention or advances
* Trolling, insulting/derogatory comments, and personal or political attacks
* Public or private harassment
* Publishing others' private information, such as a physical or electronic address, without explicit permission
* Other conduct which could reasonably be considered inappropriate in a professional setting
## Our Responsibilities
Project maintainers are responsible for clarifying the standards of acceptable behavior and are expected to take appropriate and fair corrective action in response to any instances of unacceptable behavior.
Project maintainers have the right and responsibility to remove, edit, or reject comments, commits, code, wiki edits, issues, and other contributions that are not aligned to this Code of Conduct, or to ban temporarily or permanently any contributor for other behaviors that they deem inappropriate, threatening, offensive, or harmful.
## Scope
This Code of Conduct applies both within project spaces and in public spaces when an individual is representing the project or its community. Examples of representing a project or community include using an official project e-mail address, posting via an official social media account, or acting as an appointed representative at an online or offline event. Representation of a project may be further defined and clarified by project maintainers.
## Enforcement
Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by contacting the project team. The project team will review and investigate all complaints, and will respond in a way that it deems appropriate to the circumstances. The project team is obligated to maintain confidentiality with regard to the reporter of an incident. Further details of specific enforcement policies may be posted separately.
Project maintainers who do not follow or enforce the Code of Conduct in good faith may face temporary or permanent repercussions as determined by other members of the project's leadership.
## Attribution
This Code of Conduct is adapted from the [Contributor Covenant homepage](http://contributor-covenant.org/version/1/4), version 1.4.
================================================
File: /README.md
================================================
# imessagedb
Reads and displays the Apple iMessage database
## Installation
```bash
$ pip install imessagedb
```
## Usage
`imessagedb` can be used as a library for accessing and parsing the iMessasge chat database
stored at ~/Library/Messages/chat.db, or it can be used on the command line to display the contents.
### How it works
The command line version wraps the functions provided and outputs data based on the
configuration available in the configuration file, as well as command line arguments.
The way that iMessage identifies conversation particpants is by a `handle`, which can be their
phone number or email address. To make it easier, you can map handles in the configuration file.
You can also specify the type of output (text or html) and the time frame of the messages. At
this point `imesssagedb` will pull all messages from that person, regardless of the chat that it
is part of.
If you specify html output, `imessagedb` will copy your attachments so that they are accessible
to the web browser. In addition, it will convert certain types of attachments so that they
can be viewed in the browser. For instance, it will convert HEIC files to PNG so that they
browser can display it, and will convert various type of movie files to mp4 and various audio
files to mp3. If you have many attachments, this can take a long time and take a lot of space.
### Example Output
This is what the default html version looks like. Note that I am hovering over one of the
links to show the image.

This is the same conversation rendered in text.

### Command Line
```python
imessagedb [-h] [--handle [HANDLE ...] | --name NAME] [-c CONFIGFILE]
[-o OUTPUT_DIRECTORY] [--database DATABASE] [-m ME]
[-t {text,html}] [-i] [-f | --no_copy] [--no_attachments] [-v]
[--start_time START_TIME] [--end_time END_TIME]
optional arguments:
-h, --help show this help message and exit
--handle [HANDLE ...]
A list of handles to search against
--name NAME Person to get conversations about
-c CONFIGFILE, --configfile CONFIGFILE
Location of the configuration file
-o OUTPUT_DIRECTORY, --output_directory OUTPUT_DIRECTORY
The output directory where the output and attachments
go
--database DATABASE The database file to open
-m ME, --me ME The name to use to refer to you
-t {text,html}, --output_type {text,html}
The type of output
-i, --inline Show the attachments inline
-f, --force Force a copy of the attachments
--no_copy Don't copy the attachments
--no_attachments Don't process attachments at all
-v, --verbose Turn on additional output
--start_time START_TIME
The start time of the messages
--end_time END_TIME The end time of the messages
--split_output SPLIT_OUTPUT
Split the html output into files with this many messages per file
--version Show the version number and exit
--get_handles Display the list of handles in the database and exit
--get_chats Display the list of chats in the database and exit
```
#### Command line options
**-h** prints a help message |
**--handle [HANDLE ...]** A list of handles to search against.
For instance, '*--handle +12016781234 john@smith.com*'.
**--name NAME** A person to search for. The mapping of person to handle is in the configuration
file (see below)
*** Note that you can only have one of --handle or --name, not both ***
**-c CONFIGFILE, --configfile CONFIGFILE** The location of the configuration file. If the
file does not exist, a default configuration file will be created there. If this option is
not provided, the default location is `~/.conf/iMessageDB.ini`.
**-o OUTPUT_DIRECTORY, --output_directory OUTPUT_DIRECTORY** The output directory where the
output and attachments go. If this option is not provided, the default location is your
home directory. The files will be `~/NAME.html` and attachments will be in
`~/NAME_Attachments`.
**--database DATABASE** The database file to open. If this option is not provided it will
default to `~/Library/Messages/chat.db`, which is where Apple puts it.
**-m ME, --me ME** The name to use to refer to you in the output. If this option is not
provided it will default to `Me`.
**-t {text,html}, --output_type {text,html}** The type of output, either *text* or *html*.
If this option is not provided it will default to `html`.
**--start_time START_TIME** <br>
**--end_time END_TIME** By default, the program will process all messages. If you want
to restrict that to particular timeframe, you can specify the `--start-time` and/or `--end_time`
options
**--split_output SPLIT_OUTPUT** Split the html output into files with this many messages
per file
**-i, --inline** Show the attachments inline. By default, the attachments will show as links
that you can hover over and it will pop up the attachment. With this option, it will put them
inline, which will make your output much busier and take a lot more memory in your browser.
**-f, --force** Force a copy of the attachments. By default, if the attachment already exists
in the destination directory, it will not re-copy the file, but this will force it to re-copy it.
**--no_copy** Don't copy the attachments. This will make them inaccessible for
viewing
**--no_attachments** Do not show the attachments at all
**-v, --verbose** Turn on additional output
**--version** Shows the version number and exits
**--get_handles** Display the list of handles in the database and exit
**--get_chats** Display the list of chats in the database and exit
### Configuration File
The configuration file is in configparser format. Here is the template that is created
by default:
```python
[CONTROL]
# Whether or not to copy the attachments into a different directory. This is needed for two reasons:
# 1) The web browser does not have access to the directory that the attachments are stored, so it cannot display them
# 2) Some of the attachments need to be converted in order to be viewed in the browser
copy = True
# The directory to put the output. The html file will go in {copy_directory}/{Person}.html,
# and the attachments will go in {copy_directory}/{Person}_Attachments. If you specify HOME, then
# it will put it in your home directory
copy directory = HOME
# If the file already exists in the destination directory it is not recopied, but that can be overridden by
# specifying 'force copy' as true
force copy = False
# 'Skip attachments' ignores attachments
skip attachments = False
# Additional details show some other information on each message, including the chat id and any edits that have
# been done
additional details = False
# Some extra verbosity if true
verbose = True
[DISPLAY]
# Output type, either html or text
output_type = html
# Number of messages in each html file. If this is 0 or not specified, it will be one large file.
output split = 1000
# Inline attachments mean that the images are in the HTML instead of loaded when hovered over
inline attachments = False
# Popup location is where the attachment popup window shows up, and is either 'upper right', 'upper left' or 'floating'
popup location = upper right
# 'me' is the name to put for your text messages
me = Me
# The color for the name in text output. No color is used if 'use text color' is false.
# The color can only be one of the following options:
# black, red, green, yellow, blue, magenta, cyan, white, light_grey, dark_grey,
# light_red, light_green, light_yellow, light_blue, light_magenta, light_cyan
use text color = True
me text color = blue
them text color = magenta
reply text color = light_grey
# The background color of the text messages for you and for the other person in html output
# The options for colors can be found here: https://www.w3schools.com/cssref/css_colors.php
me html background color = AliceBlue
them html background color = Lavender
# The background color of the thread in replies
thread background = HoneyDew
me thread background = AliceBlue
them thread background = Lavender
# The color of the name in the html output
me html name color = Blue
them html name color = Purple
[CONTACTS]
# A person that you text with can have multiple numbers, and you may not always want to specify the full specific
# number as stored in the handle database, so you can do the mapping here, providing the name of a person,
# and a comma separated list of numbers
Samantha: +18434676040, samanthasmilt@gmail.com,
s12ddd2@colt.edu
Abe: +16103499696
Marissa: +14029490739
```
### Library Usage
```python
import imessagedb
database = imessagedb.DB()
```
- TODO - More usage here
## Contributing
Interested in contributing? Check out the contributing guidelines. Please note that this project is released with a Code of Conduct. By contributing to this project, you agree to abide by its terms.
## License
`imessagedb` was created by Xev Gittler. It is licensed under the terms of the MIT license.
## Documentation
Full documentation available at https://imessagedb.readthedocs.io/en/latest/
================================================
File: /cfdhat.db
================================================
================================================
File: /src/imessagedb/__main__.py
================================================
import imessagedb
if __name__ == '__main__':
imessagedb.run()
================================================
File: /src/imessagedb/default.ini
================================================
[CONTROL]
# Whether or not to copy the attachments into a different directory. This is needed for two reasons:
# 1) The web browser does not have access to the directory that the attachments are stored, so it cannot display them
# 2) Some of the attachments need to be converted in order to be viewed in the browser
copy = True
# The directory to put the output. The html file will go in {copy_directory}/{Person}.html,
# and the attachments will go in {copy_directory}/{Person}_Attachments
copy directory = /tmp
# If the file already exists in the destination directory it is not recopied, but that can be overridden by
# specifying 'force copy' as true
force copy = False
# If you specify 'skip attachments' then we won't process attachments at all
skip attachments = False
# Some extra verbosity if true
verbose = True
[DISPLAY]
# 'me' is the name to put for your text messages
me = Me
# The color for the name in text output. No color is used if 'use text color' is false.
# The color can only be one of the following options:
# black, red, green, yellow, blue, magenta, cyan, white, light_grey, dark_grey,
# light_red, light_green, light_yellow, light_blue, light_magenta, light_cyan
use text color = True
me text color = blue
then text color = magenta
reply text color = light_grey
# The background color of the text messages for you and for the other person in html output
# The options for colors can be found here: https://www.w3schools.com/cssref/css_colors.php
me html background color = AliceBlue
them html background color = Lavender
# The background color of the thread in replies
thread background = HoneyDew
me thread background = AliceBlue
them thread background = Lavender
# The color of the name in the html output
me html name color = Blue
them html name color = Purple
# If you want the attachments inline, rather than as pop-ups, specify 'inline attachments'
inline attachments = False
# Popup location is where the attachment popup window shows up, and is either 'upper right', 'upper left' or 'floating'
popup location = Upper Left
[CONTACTS]
# A person that you text with can have multiple numbers, and you may not always want to specify the full specific
# number as stored in the handle database, so you can do the mapping here, providing the name of a person,
# and a comma separated list of numbers
Ashley: +18484676050, ashleywilliams_@live.com, sparkette325@yahoo.com, +17325722108,
williaa2@kean.edu
Abe: +16103499696
Alisha: +15082088327
Emily: +12672803303
Hermosa: +17329103453
Kanani: +17325351530
Randi: +16093394399
Rose: fhecla@gmail.com, +16109602522
Marta: +19172974204
Maxine: mlee99@optonline.net, panda9966@gmail.com, +19084725929, +17323816580
Monika: +19176678993, mnowak2@binghamton.edu, Xplsumnerx@gmail.com
Michele: +12153917169
Marissa: +14016490739
Lily: lilyyalexb@gmail.com, njlibrn@yahoo.com, +17327887245
Daniel: daniel@gittler.com, +17326460220, dsg1002@gmail.com, gittlerd@gmail.com, dgittler@berklee.edu
Kandela: +17324849684
Tiffany: Wickedblueglow@gmail.com, Tlgreenberg80@gmail.com, +17327105145
Jessica: jessica.marozine@icloud.com, jessica.marozine@gmail.com, +17326923688
Becky: rebecca@schore.org, +17329838356
Lynda: lylasmith559@yahoo.com, lylalombardi@icloud.com, +17323001586
Stefanie: stefanie.thomas2@gmail.com, thostefa@kean.edu, +17326101065
================================================
File: /src/imessagedb/generate_html.py
================================================
from datetime import datetime
import re
import string
import imessagedb
from imessagedb.message import Message
from imessagedb.messages import Messages
from alive_progress import alive_bar
url_pattern = re.compile(r'((https?):((//)|(\\\\))+[\w\d:#@%/;$()~_?+-=\\.&]*)', re.MULTILINE | re.UNICODE)
mailto_pattern = re.compile(r'([\w\-.]+@(\w[\w\-]+\.)+[\w\-]+)', re.MULTILINE | re.UNICODE)
def _replace_url_to_link(value: str) -> str:
""" From https://gist.github.com/guillaumepiot/4539986 """
# Replace url to link
value = url_pattern.sub(r'<a href="\1" target="_blank">\1</a>', value)
# Replace email to mailto
value = mailto_pattern.sub(r'<a href="mailto:\1">\1</a>', value)
return value
class HTMLOutput:
""" Creates an HTML file (or string) from a Messages list
...
There are a number of options in the configuration file that affect how the HTML is created.
In the CONTROL section, the following impact the output:
copy = True :
If the attachments are not copied, they are not available on the HTML output.
There are two reasons for that. 1) The web browser does not have access to the directory
that the attachments are stored, so it cannot display them 2) Some attachments need
to be converted in order to be viewed in the browser, and need a place to live.
skip attachments = False :
If true, attachments will not be available in the HTML output
In the DISPLAY section, the following impact the output:
inline attachments = False :
If inline attachments are true, then the images will be part of the HTML. If
it is false, then there will be a popup when you hover over the attachment text that will show
the attachment on demand. This will make the load much faster and the output far less cluttered.
me = Me :
The name to put for your part of the conversation. It defaults to 'Me'.
html background color list = AliceBlue, Cyan, Gold, Lavender, LightGreen, PeachPuff, Wheat
html name color list = Blue, DarkCyan, DarkGoldenRod, Purple, DarkGreen, Orange, Sienna :
The background and name color of the messages in html output
The options for colors can be found here: https://www.w3schools.com/cssref/css_colors.php
The way that the color selection works is that it will use the first color on the color list for the
first person in the conversation, the second for the second, third for the third, etc. If there are more
participants than colors, it will wrap around to the first color.
thread background = HoneyDew :
The background color of the thread in replies
"""
def __init__(self, database, me: str, messages: Messages, inline: bool = False, output_file: str = None) -> None:
"""
Parameters
----------
database : imessagedb.DB
An instance of a connected database
me : str
Your display name
messages: imessagedb.DB.Messages
The messages
inline : bool
Display attachments inline or not
output_file : str
The name of the output file"""
self._database = database
self._me = me
self._messages = messages
self._attachment_list = self._database.attachment_list
self._inline = inline
self._name_map = {}
self._color_list = self._get_next_color()
self._day = 'UNK'
self._html_array = []
self._current_messages_processed = 0
self._current_messages_file = 0
self._file_start_date: datetime = None
self._file_end_date: datetime = None
self._previous_range: str = None
self._last_row_had_conversion = False
if output_file is not None:
self._output_filename = output_file
self._split_output = self._database.config.getint('DISPLAY', 'split output', fallback=0)
self._previous_output_filename = None
self._current_output_filename = f"{self._output_filename}.html"
self._output_file_handle = open(self._current_output_filename, "w")
else:
self._output_filename = None
start_time = self._database.control.get('start time', fallback=None)
end_time = self._database.control.get('end time', fallback=None)
date_string = ""
if start_time:
date_string = f" from {start_time}"
if end_time:
date_string = f"{date_string} until {end_time}"
self._file_header_string = f' <div id="file_summary">Exchanged {len(self._messages):,} total messages with ' \
f'{self._messages.title}{date_string}.</div><p>\n'
self._html_array.append(self._generate_table(self._messages))
self._print_and_save('</body>\n</html>\n', self._html_array, eof=True)
def __repr__(self) -> str:
return ''.join(self._html_array)
def save(self, filename: str) -> None:
""" Write the output to the output file"""
file_handle = open(filename, "w")
print('\n'.join(self._html_array), file=file_handle)
file_handle.close()
return
def print(self) -> None:
""" Print the output to stdout """
print('\n'.join(self._html_array))
return
def _print_and_save(self, message: str, array: list, new_day: bool = False, eof: bool = False) -> None:
""" Save to the output file while it is processing """
if self._current_messages_processed == 0:
# If this is a new file, because it is either the first pass through, or if we need to create new file
new_file_array = [self._generate_head(), "<body>\n", self._file_header_string,
f'{" ":2s}<div class="picboxframe" id="picbox"> <img src="" /> </div>\n',
f'{" ":2s}<table style="table-layout: fixed;">\n{" ":4s}<tr>\n']
if self._previous_output_filename:
new_file_array.append(f'{" ":6s}<td style="text-align: left; width: 33%;">'
f'<a href="file://{self._previous_output_filename}"> < </a> </td>\n'
f'{" ":6s}<td style="text-align: center; width: 33%;"><div class="next_file">' +
f'<a href="file://{self._previous_output_filename}">' +
f' Previous Messages {self._previous_range}</a></div></td>\n')
else:
new_file_array.append(f'{" ":6s}<td style="width: 33%;"> </td>\n'
f'{" ":6s}<td style="width: 33%;"> </td>\n')
new_file_array.append(f'{" ":6s}<td style="text-align: right; width: 33%;" id="next_page"> </td>\n'
f'{" ":4s}</tr>\n{" ":2s}</table>\n\n')
new_file_array.append(f'{" ":2s}<table class="main_table">\n{" ":2s}</table>\n')
for i in new_file_array:
array.append(i)
if self._output_filename is not None:
print(i, end="", file=self._output_file_handle)
array.append(message)
self._current_messages_processed += 1
if self._output_filename is None: # We are not writing to a file
return
if eof:
# Normally this gets done on the file split, but the eof indicates its last file, so do it
# on the last write
description_string = f' This page contains {self._current_messages_processed:,} messages from ' \
f'{self._file_start_date.strftime("%A %Y-%m-%d")} to ' \
f'{self._file_end_date.strftime("%A %Y-%m-%d")}.'
print(f' <script>\n'
f' el = document.getElementById("file_summary")\n'
f' new_text = el.innerHTML.concat("{description_string}")\n'
f' el.innerHTML = new_text\n',
f' </script>\n', file=self._output_file_handle)
print(message, end="", file=self._output_file_handle)
if new_day and 0 < self._split_output < self._current_messages_processed:
total_processed = self._current_messages_processed
self._current_messages_processed = 0
self._current_messages_file += 1
self._previous_output_filename = self._current_output_filename
self._current_output_filename = f"{self._output_filename}_{self._current_messages_file:02d}.html"
print(f' <p><div class="next_file"><a href="file://{self._current_output_filename}">'
f' Next Messages </a></div>\n', end="", file=self._output_file_handle)
description_string = f' This page contains {total_processed:,} messages from ' \
f'{self._file_start_date.strftime("%A %Y-%m-%d")} to ' \
f'{self._file_end_date.strftime("%A %Y-%m-%d")}.'
self._previous_range = f'<br><div style="font-size: 50%;">' \
f'({self._file_start_date.strftime("%A %Y-%m-%d")} to ' \
f'{self._file_end_date.strftime("%A %Y-%m-%d")})</div>'
print(f' <script>\n'
f' el = document.getElementById("file_summary")\n'
f' new_text = el.innerHTML.concat("{description_string}")\n'
f' el.innerHTML = new_text\n'
f' document.getElementById("next_page").innerHTML = '
f' "<a href=\'file://{self._current_output_filename}\'> Next Page > </a>"\n'
f' </script>', file=self._output_file_handle)
print('</body>\n</html>\n', end="", file=self._output_file_handle)
self._output_file_handle.close()
print(f"Creating output file {self._current_output_filename}")
self._output_file_handle = open(self._current_output_filename, "w")
def _generate_thread_row(self, message: Message) -> str:
if message.is_from_me:
who_data = self._get_name(0)
else:
who_data = self._get_name(message.handle_id)
style = who_data['style']
text = message.text
row_string = f'{" ":16s}<tr>\n' \
f'{" ":18s}<td class="reply_name" style="color: {who_data["name_color"]};"> ' \
f'{who_data["name"]}: </td>\n' \
f'{" ":18s}<td class="reply_text_thread">\n' \
f'{" ":20s}<a href="#{message.rowid}">\n' \
f'{" ":22s}<button class="reply_text_{style}" style="background: ' \
f'{who_data["background_color"]};"> ' \
f'{text}</button>\n' \
f'{" ":20s}</a>\n' \
f'{" ":18s}</td>\n' \
f'{" ":16s}</tr>\n'
return row_string
def _generate_thread_table(self, message_list: list, style: str) -> str:
table_string = f'{" ":14s}<table class="thread_table_{style}">\n'
for message in message_list:
table_string = f'{table_string}{self._generate_thread_row(message)}'
table_string = f'{table_string}' \
f'{" ":14s}</table>\n' \
f'{" ":14s}<p>\n'
return table_string
def _generate_table(self, message_list: Messages) -> str:
table_array = []
self._print_and_save(f'{" ":2s}<table class="main_table">\n', table_array)
previous_day = ''
message_count = 0
with alive_bar(len(message_list), title="Generating HTML", stats="({rate}, eta: {eta})", comma=True) as bar:
for message in message_list:
message_count = message_count + 1
today = message.date[:10]
if today != previous_day:
previous_day = today
message_date = datetime(int(message.date[0:4]), int(message.date[5:7]), int(message.date[8:10]),
int(message.date[11:13]), int(message.date[14:16]),
int(message.date[17:19]))
if self._file_start_date is None:
self._file_start_date = message_date
self._file_end_date = message_date
# If it's a new day, end the table, and start a new one
self._print_and_save(f'{" ":2s}</table>\n\n', table_array, new_day=True)
self._print_and_save(f'{" ":2s}<table class="main_table">\n', table_array)
self._day = message_date.strftime('%a')
self._last_row_had_conversion = False
self._print_and_save(self._generate_row(message), table_array)
if self._last_row_had_conversion:
bar()
else:
bar(skipped=True)
self._print_and_save(f'{" ":2s}</table>\n', table_array)
return ''.join(table_array)
def _get_next_color(self):
""" A generator function to return the next color"""
counter = -1
default_background_color_list = 'AliceBlue, Cyan, Gold, Lavender, LightGreen, PeachPuff, Wheat'
default_name_color_list = 'Blue, DarkCyan, DarkGoldenRod, Purple, DarkGreen, Orange, Sienna'
background_color_list = self._database.config.get('DISPLAY', 'html background color list',
fallback=default_background_color_list)
name_color_list = self._database.config.get('DISPLAY', 'html name color list',
fallback=default_name_color_list)
background_colors = background_color_list.translate({ord(c): None for c in string.whitespace}).split(',')
name_colors = name_color_list.translate({ord(c): None for c in string.whitespace}).split(',')
while True:
counter += 1
next_background_color = counter % len(background_colors)
next_name_color = counter % len(name_colors)
yield [background_colors[next_background_color], name_colors[next_name_color]]
def _get_name(self, handle_id: str) -> dict:
if handle_id not in self._name_map:
(background_color, name_color) = next(self._color_list)
handle_list = self._database.handles.handles
if handle_id == 0: # 0 is me
self._name_map[handle_id] = {'name': self._me, 'background_color': background_color,
'name_color': name_color, 'style': 'me'}
elif handle_id in handle_list:
handle = handle_list[handle_id]
self._name_map[handle_id] = {'name': handle.name, 'background_color': background_color,
'name_color': name_color, 'style': 'them'}
else:
self._name_map[handle_id] = {'name': handle_id, 'background_color': background_color,
'name_color': name_color, 'style': 'them'}
return self._name_map[handle_id]
def _generate_row(self, message: Message) -> str:
# Specify if the message is from me, or the other person
if message.is_from_me:
who_data = self._get_name(0)
else:
who_data = self._get_name(message.handle_id)
who = who_data['name']
style = who_data['style']
# Check to see if we want the media box floating or fixed
floating_option = self._database.config['DISPLAY'].get('popup location', fallback='floating')
floating = floating_option == 'floating'
# If this message is part of a thread, then show the messages in the thread before it
thread_table = ""
if message.thread_originator_guid:
if message.thread_originator_guid in self._messages.guids:
original_message = self._messages.guids[message.thread_originator_guid]
thread_list = original_message.thread
thread_list[original_message.rowid] = original_message
print_thread = []
# sort the threads by the date sent
for i in sorted(thread_list.values(), key=lambda x: x.date):
if i == message: # stop at the current message
break
print_thread.append(i)
thread_table = self._generate_thread_table(print_thread, style)
# Generate the attachment string
attachments_string = ""
if message.attachments:
for attachment_key in message.attachments:
# If the attachment listed does not exist, then just list is as missing and continue to the next one
if attachment_key not in self._attachment_list.attachment_list:
attachments_string = f'{attachments_string} <span class="missing"> Attachment missing </span> '
continue
attachment = self._attachment_list.attachment_list[attachment_key]
# If the attachment exists, but we have it marked to skip, skip it
if attachment.skip:
continue
# If the attachment exists, but is marked as missing, list it as missing and continue
if attachment.missing:
attachments_string = f'{attachments_string} <span class="missing"> Attachment missing </span> '
continue
# If we should copy the attachment, copy it or convert it
if attachment.copy:
if attachment.conversion_type == 'HEIC':
attachment.convert_heic_image(attachment.original_path, attachment.destination_path)
self._last_row_had_conversion = True
elif attachment.conversion_type == 'Audio' or attachment.conversion_type == 'Video':
attachment.convert_audio_video(attachment.original_path, attachment.destination_path)
self._last_row_had_conversion = True
else:
attachment.copy_attachment()
self._last_row_had_conversion = True
if floating:
box_name = f'PopUp{attachment.rowid}'
image_box = f'<div class="imageBox" id="PopUp{attachment.rowid}"> <img src="" /> </div>'
else:
box_name = 'picbox'
image_box = ''
if attachment.popup_type == 'Picture':
if self._inline:
attachment_string = f'<p><a href="{attachment.html_path}" target="_blank">' \
f'<img src="{attachment.html_path}" target="_blank"/><p>' \
f' {attachment.html_path} </a>\n'
else:
attachment_string = f'''<a href="{attachment.html_path}" target="_blank"
onMouseOver="ShowPicture('{box_name}',1,'{attachment.html_path}')"
onMouseOut="ShowPicture('{box_name}',0)"> {attachment.html_path} </a>
'''
elif attachment.popup_type == 'Audio':
# Not going to do popups for audio, just inline
attachment_string = f'<p><audio controls> <source src="{attachment.html_path}" ' \
f'type="audio/mp3"></audio> <a href="{attachment.html_path}" ' \
f'target="_blank"> {attachment.html_path} </a>\n'
elif attachment.popup_type == 'Video':
if self._inline:
attachment_string = f'<p><video controls> <source src="{attachment.html_path}" ' \
f' type="video/mp4"></video> <p><a href="{attachment.html_path}"' \
f' target="_blank"> {attachment.html_path} </a>\n'
else:
attachment_string = f'''<a href="{attachment.html_path}" target="_blank"
onMouseOver="ShowMovie('{box_name}', 1, '{attachment.html_path}')"> {attachment.html_path} </a>
'''
else:
attachment_string = f'<a href="{attachment.html_path}" target="_blank"> ' \
f'{attachment.html_path} </a>\n'
attachments_string = f'{attachments_string} <p> {attachment_string} {image_box} '
# Structure of the text row. The first three columns are normal, the fourth column is complex
# <tr>
# <td> date
# <td> who
# <td>
# <table>
# <tr hidden>
# <td> Edited Row
# </tr>
# <tr>
# <td> Text of message (edited if required)
# <td hidden> extra text
# <td> info button (if configured)
# <tr>
text = _replace_url_to_link(f'{message.text} {attachments_string}')
# Check for edits on the text. If there are edits, then set up the html to allow for that. The row
# doesn't exist if there is no edits
edited_string = ""
text_cell_edit_row = ""
if len(message.edits) > 0:
edit_table = f'{" ":14s}<div class="edits_{style}">\n'
for i in range(0, len(message.edits)):
edit_table = f'{edit_table}{" ":16s}"{message.edits[i]["text"]} <p>\n'
edit_table = f'{edit_table}{" ":14s}</div>\n'
edited_string = f'<sub><button class="edited_button"' \
f' onclick="ToggleDisplay(\'{message.rowid}editTable\')"> Edited </button></sub>'
text_cell_edit_row = f'{" ":10s}<tr id={message.rowid}editTable class="edits">\n' \
f'{" ":12s}<td>\n' \
f'{edit_table} ' \
f'{" ":12s}</td>\n ' \
f'{" ":10s}</tr>\n'
# Check for if we want additional data
info_text = ""
info_button = ""
if self._database.control.getboolean('additional details', fallback=False):
info_text = f'{" ":12s}<td class="infocell" id={message.rowid}info>\n ' \
f'{" ":14s}<table>\n' \
f'{" ":16s}<tr>\n' \
f'{" ":18s}<td> ChatID: {message.chat_id} </td>\n' \
f'{" ":16s}</tr>\n' \
f'{" ":14s}</table>\n' \
f'{" ":12s}</td>\n'
info_button = f'{" ":12s}<td class="button-wrapper"> <button class="text_{style}" ' \
f'onclick="ToggleDisplay(\'{message.rowid}info\')"> ℹ️ </button> </td>\n'
# Put together the row cells
date_cell = f'{" ":6s}<td class="date"> {self._day} {message.date} </td>\n'
name_cell = f'{" ":6s}<td class="name_{style}" style="color: {who_data["name_color"]};"> {who}: </td>\n'
text_cell = f'{" ":6s}<td>\n ' \
f'{" ":8s}<table>\n' \
f'{text_cell_edit_row}' \
f'{" ":10s}<tr>\n' \
f'{" ":12s}<td class="text_{style}" style="background: {who_data["background_color"]};">\n' \
f'{thread_table}' \
f'{" ":14s}{text} {edited_string}\n' \
f'{" ":12s}</td>\n' \
f'{info_text}' \
f'{info_button}' \
f'{" ":10s}</tr>\n' \
f'{" ":8s}</table>\n' \
f'{" ":6s}</td>\n'
row_string = f'{" ":4s}<tr id={message.rowid}>\n' \
f'{date_cell}' \
f'{name_cell}' \
f'{text_cell}' \
f'{" ":4s}</tr>\n'
return row_string
def _generate_head(self) -> str:
popup = self._database.config['DISPLAY'].get('popup location', fallback='upper right')
if popup == 'upper right':
popup_location = 'right'
else:
popup_location = 'left'
thread_background_color = self._database.config['DISPLAY'].get('thread background',
fallback='HoneyDew')
css = ''' <style>
table {''' + f'''
width: 100%;
table-layout: auto;
''' + ''' }
table.main_table {''' + f'''
border-bottom: 3px solid black;
border-spacing: 8px;
''' + ''' }
table.thread_table_me {''' + f'''
width: 50%;
margin-right: 0px;
margin-left: auto;
background: {thread_background_color};
padding: 5px;
border-radius: 30px;
''' + ''' }
table.thread_table_them {''' + f'''
width: 50%;
margin-right: auto;
margin-left: 0px;
background: {thread_background_color};
padding: 5px;
border-radius: 30px;
''' + ''' }
td {''' + f'''
padding: 0px;
margin: 0;
line-height: 1;
''' + ''' }
td.date {''' + f'''
text-align: left;
width: 150px;
vertical-align: text-middle;
font-size: 80%;
''' + ''' }
td.name_me {''' + f'''
text-align: right;
font-weight: bold;
width: 50px;
padding-right: 5px;
vertical-align: text-middle;
font-size: 80%;
''' + ''' }
td.name_them {''' + f'''
text-align: right;
font-weight: bold;
width: 50px;
padding-right: 5px;
vertical-align: text-middle;
font-size: 80%;
''' + ''' }
td.text_me {''' + f'''
text-align: right;
word-wrap: break-word;
border-radius: 30px;
padding: 15px;
border-spacing: 40px;
''' + ''' }
td.text_them {''' + f'''
text-align: left;
word-wrap: break-word;
border-radius: 30px;
padding: 15px;
border-spacing: 40px;
''' + ''' }
.edits_me {''' + f'''
display: none;
font-size: 70%;
font-style: italic;
text-align: right;
border-radius: 30px;
''' + ''' }
.edits_them {''' + f'''
display: none;
font-size: 70%;
font-style: italic;
text-align: left;
border-radius: 30px;
''' + ''' }
.infocell {''' + f'''
margin-right: 0px;
margin-left: auto;
width: 20%;
text-align: right;
font-size: 70%;
display: none;
''' + ''' }
button.edited_button {''' + f'''
font-size: 50%;
border-radius: 30px;
font-size: 50%;
padding-left: 0px;
padding-right: 0px;
border-spacing: 0px;
border-bottom: 0px;
margin-right: 0px;
margin-left: 0px;
text-align: center;
border: 0px;
color: blue;
font-style: italic;
background-color: transparent;
''' + ''' }
td.button-wrapper {''' + f'''
margin-right: 0px;
margin-left: 4px;
padding-left: 0px;
padding-right: 0px;
border-spacing: 0px;
text-align: center;
width:0.1%;
background-color: transparent;
''' + ''' }
button.text_me {''' + f'''
border-radius: 30px;
font-size: 50%;
padding-left: 0px;
padding-right: 0px;
border-spacing: 0px;
border-bottom: 0px;
margin-right: 0px;
margin-left: 0px;
text-align: center;
border: 0px;
''' + ''' }
button.text_them {''' + f'''
border-radius: 30px;
font-size: 50%;
padding-left: 0px;
padding-right: 0px;
border-spacing: 0px;
border-bottom: 0px;
margin-right: 0px;
margin-left: 0px;
text-align: center;
border: 0px;
''' + ''' }
.reply_name {
font-size: 50%;
text-align: right;
}
.reply_text_me {''' + f'''
border: 2px solid;
border-radius: 6px;
border-radius: 50px;
font-size: 60%
''' + ''' }
.reply_text_them {''' + f'''
border: 2px solid;
border-radius: 6px;
border-radius: 50px;
font-size: 60%
''' + ''' }
td.blank {''' + f'''
border: none;
width: 50%
''' + ''' }
.missing {''' + f'''
color: red;
''' + ''' }
.badjoin {''' + f'''
color: red;
''' + ''' }
.imageBox {''' + f'''
position: absolute;
visibility: hidden;
height: 200;
border: solid 1px #CCC;
padding: 5px;
''' + ''' }
img {''' + f'''
height: 250px;
width: auto;
''' + ''' }
.picboxframe {''' + f'''
position: fixed;
top: 2%;
''' + f'{popup_location}: 2%;' \
'''
background: Blue;
transition: all .5s ease;
''' + ''' }
.next_file {
text-align: center;
font-size: 150%;
}
#file_summary {
font-style: italic;
}
</style>'''
script = ''' <script>
function ToggleDisplay(id) {
if (document.getElementById(id).style.display == "none") {
document.getElementById(id).style.display = "inline";
}
else {
document.getElementById(id).style.display = "none";
}
}
function ShowPicture(id,show, img) {
if (show=="1") {
document.getElementById(id).style.visibility = "visible"
document.getElementById(id).childNodes[1].src = img;
}
else if (show=="0") {
document.getElementById(id).style.visibility = "hidden"
}
}
function ShowMovie(id, show, movie) {
var elem = document.getElementById(id);
''' \
f' var htmlstring = "<video controls onMouseOut=\'ShowMovie(\\\"\" + id + "\\\",0)\'> ' \
f'<source src=\'" + movie + "\'> </video>";' \
'''
if (show == "1") {
{
elem.style.visibility = "visible";
elem.innerHTML = htmlstring;
}
} else if (show == "0") {
{
elem.style.visibility = "hidden";
elem.innerHTML = " "
}
}
}
</script>'''
head_string = '''<!DOCTYPE html>
<html lang="en-US">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
''' \
f' <title> {self._messages.title} </title>\n{css}\n{script}\n</head>\n'
return head_string
================================================
File: /src/imessagedb/chat.py
================================================
from datetime import datetime
class Chat:
""" Class for holding information about a chat """
def __init__(self, database, rowid: str, chat_identifier: str, chat_name: str,
last_message_date=None) -> None:
"""
Parameters
----------
database : imessagedb.DB
An instance of a connected database
rowid, chat_identifier, chat_name : str
The parameters are the fields in the database
last_message_date : date
The date the last message in this chat was sent"""
self._database = database
self._rowid = rowid
self._chat_identifier = chat_identifier
self._chat_name = chat_name
self._last_message_date = last_message_date
self._participants = []
def __repr__(self) -> str:
return f'{self.rowid}: id => {self.chat_identifier} name => "{self.chat_name}" ' \
f'last_message => {self.last_message_date}, participants => "{self.participants}"'
@property
def rowid(self) -> str:
return self._rowid
@property
def chat_identifier(self) -> str:
return self._chat_identifier
@property
def chat_name(self) -> str:
return self._chat_name
@property
def last_message_date(self) -> datetime:
return self._last_message_date
@last_message_date.setter
def last_message_date(self, date: datetime):
self._last_message_date = date
@property
def participants(self) -> str:
""" Returns the participants in the chat """
strings = []
for handle_id in self._participants:
if handle_id in self._database.handles.handles:
number = self._database.handles.handles[handle_id].number
name = self._database.handles.name_for_number(number)
if name is None:
strings.append(f'{number} ({handle_id})')
else:
strings.append(f'{name} ({number}):({handle_id})')
else:
strings.append(f'{handle_id}')
return ', '.join(strings)
def add_participant(self, participant: str):
""" Add a participant to the chat """
if participant not in self._participants:
self._participants.append(participant)
================================================
File: /src/imessagedb/__init__.py
================================================
"""
Provides access to the iMessage chat.db, including parsing and output
"""
from importlib.metadata import version
# read version from installed package
__version__ = version("imessagedb")
import os
import configparser
import logging
import argparse
import sys
import dateutil.parser
from imessagedb.db import DB
from imessagedb.utils import *
DEFAULT_CONFIGURATION = '''
[CONTROL]
# Whether or not to copy the attachments into a different directory. This is needed for two reasons:
# 1) The web browser does not have access to the directory that the attachments are stored, so it cannot display them
# 2) Some of the attachments need to be converted in order to be viewed in the browser
copy = True
# The directory to put the output. The html file will go in {copy_directory}/{Person}.html,
# and the attachments will go in {copy_directory}/{Person}_Attachments. If you specify HOME, then
# it will put it in your home directory
copy directory = HOME
# If the file already exists in the destination directory it is not recopied, but that can be overridden by
# specifying 'force copy' as true
force copy = False
# 'Skip attachments' ignores attachments
skip attachments = False
# Additional details show some other information on each message, including the chat id and any edits that have
# been done
additional details = False
# Some extra verbosity if true
verbose = True
[DISPLAY]
# Output type, either html or text
output type = html
# Number of messages in each html file. If this is 0 or not specified, it will be one large file.
# There may be more messages than this per file, as it splits at the next date change after that number
# of messages.
split output = 1000
# Inline attachments mean that the images are in the HTML instead of loaded when hovered over
inline attachments = False
# Popup location is where the attachment popup window shows up, and is either 'upper right', 'upper left' or 'floating'
popup location = upper right
# 'me' is the name to put for your text messages
me = Me
# The color for the name in text output. No color is used if 'use text color' is false.
# The color can only be one of the following options:
# black, red, green, yellow, blue, magenta, cyan, white, light_grey, dark_grey,
# light_red, light_green, light_yellow, light_blue, light_magenta, light_cyan
# The way that the color selection works is that it will use the first color on the color list for the first person
# in the conversation, the second for the second, third for the third, etc. If there are more participants than colors,
# it will wrap around to the first color.
use text color = True
text color list = red, green, yellow, blue, magenta, cyan
reply text color = light_grey
# The background and name color of the messages in html output
# The options for colors can be found here: https://www.w3schools.com/cssref/css_colors.php
# The way that the color selection works is that it will use the first color on the color list for the first person
# in the conversation, the second for the second, third for the third, etc. If there are more participants than colors,
# it will wrap around to the first color.
html background color list = AliceBlue, Cyan, Gold, Lavender, LightGreen, PeachPuff, Wheat
html name color list = Blue, DarkCyan, DarkGoldenRod, Purple, DarkGreen, Orange, Sienna
# The background color of the thread in replies
thread background = HoneyDew
[CONTACTS]
# A person that you text with can have multiple numbers, and you may not always want to specify the full specific
# number as stored in the handle database, so you can do the mapping here, providing the name of a person,
# and a comma separated list of numbers
Samantha: +18434676040, samanthasmilt@gmail.com,
s12ddd2@colt.edu
Abe: +16103499696
Marissa: +14029490739
'''
def _create_default_configuration(filename: str) -> None:
"""Generates a default configuration if one is not passed in"""
f = open(filename, "w")
f.write(DEFAULT_CONFIGURATION)
f.close()
return
def _get_contacts(configuration: configparser.ConfigParser) -> dict:
result = {}
contact_list = configuration.items('CONTACTS')
for contact in contact_list:
key = contact[0]
value = contact[1]
# Get rid of spaces and newlines
stripped = value.replace('\n', '').replace(' ', '')
number_list = stripped.split(',')
result[key] = number_list
return result
def run() -> None:
""" Run the imessagedb command line"""
out = sys.stdout
logging.basicConfig(level=logging.INFO, format='%(asctime)s <%(name)s> %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
logger = logging.getLogger('main')
logger.debug("Processing parameters")
argument_parser = argparse.ArgumentParser()
argument_parser.add_argument("--name", help="Person to get conversations about")
type_mutex_group = argument_parser.add_mutually_exclusive_group()
type_mutex_group.add_argument('--handle', help="A list of handles to search against", nargs='*')
type_mutex_group.add_argument('--chat', help="A chat to print")
argument_parser.add_argument("-c", "--configfile", help="Location of the configuration file",
default=f'{os.environ["HOME"]}/.config/iMessageDB.ini')
argument_parser.add_argument("-o", "--output_directory",
help="The output directory where the output and attachments go")
argument_parser.add_argument("--database", help="The database file to open",
default=f"{os.environ['HOME']}/Library/Messages/chat.db")
argument_parser.add_argument("-m", "--me", help="The name to use to refer to you", default="Me")
argument_parser.add_argument("-t", "--output_type", help="The type of output", choices=["text", "html"])
argument_parser.add_argument("-i", "--inline", help="Show the attachments inline", action="store_true")
copy_mutex_group = argument_parser.add_mutually_exclusive_group()
copy_mutex_group.add_argument("-f", "--force", help="Force a copy of the attachments", action="store_true")
copy_mutex_group.add_argument("--no_copy", help="Don't copy the attachments", action="store_true")
argument_parser.add_argument("--no_attachments", help="Don't process attachments at all", action="store_true")
argument_parser.add_argument("-v", "--verbose", help="Turn on additional output", action="store_true")
argument_parser.add_argument('--start_time', '--start-time',
help="The start date/time of the messages")
argument_parser.add_argument('--end_time', '--end-time',
help="The end date/time of the messages")
argument_parser.add_argument('--split_output', '--split-output',
help="Split the html output into files with this many messages per file")
argument_parser.add_argument('--get_handles', '--get-handles',
help="Display the list of handles in the database and exit", action="store_true")
argument_parser.add_argument('--get_chats', '--get-chats',
help="Display the list of chats in the database and exit", action="store_true")
argument_parser.add_argument('--version', help="Prints the version number", action="store_true")
args = argument_parser.parse_args()
if args.version:
print(f"imessagedb {__version__}", file=sys.stderr)
exit(0)
# First read in the configuration file, creating it if need be, then overwrite the values from the command line
if not os.path.exists(args.configfile):
_create_default_configuration(args.configfile)
config = configparser.ConfigParser()
config.read(args.configfile)
CONTROL = 'CONTROL'
DISPLAY = 'DISPLAY'
config.set(CONTROL, 'verbose', str(args.verbose))
if args.output_directory:
config.set(CONTROL, 'copy directory', args.output_directory)
if args.no_copy:
config.set(CONTROL, 'copy', 'False')
if args.output_type:
config.set(CONTROL, 'output type', args.output_type)
if args.force:
config.set(CONTROL, 'force copy', 'True')
if args.no_attachments:
config.set(CONTROL, 'skip attachments', 'True')
if args.inline:
config.set(DISPLAY, 'inline attachments', 'True')
if args.split_output:
config.set(DISPLAY, 'split output', args.split_output)
start_date = None
end_date = None
if args.start_time:
try:
start_date = dateutil.parser.parse(args.start_time)
except ValueError as exp:
argument_parser.print_help(sys.stderr)
print(f"\n **Start time not correct: {exp}", file=sys.stderr)
exit(1)
config.set(CONTROL, 'start time', str(start_date))
if args.end_time:
try:
end_date = dateutil.parser.parse(args.end_time)
except ValueError as exp:
argument_parser.print_help(sys.stderr)
print(f"\n** End time not correct: {exp}", file=sys.stderr)
exit(1)
config.set(CONTROL, 'end time', str(end_date))
if start_date and end_date and start_date >= end_date:
argument_parser.print_help(sys.stderr)
print(f"\n **Start date ({start_date}) must be before end date ({end_date})", file=sys.stderr)
exit(1)
generic_database_request = False
if args.get_handles or args.get_chats:
config[CONTROL]['skip attachments'] = 'True'
generic_database_request = True
person = None
numbers = None
if not generic_database_request:
if args.chat:
person = f"chat_{args.chat}"
if args.name:
person = args.name
else:
if args.handle:
numbers = args.handle
if args.name:
person = args.name
else:
person = ', '.join(numbers)
elif args.name:
person = args.name
contacts = _get_contacts(config)
if person.lower() not in contacts.keys():
logger.error(f"{person} not known. Please edit your contacts list.")
argument_parser.print_help()
exit(1)
# Get rid of new lines and split it into a list
numbers = config['CONTACTS'][person].replace('\n', '').split(',')
else:
argument_parser.print_help(sys.stderr)
print("\n ** You must supply either a name or one or more handles")
exit(1)
config.set(CONTROL, 'Person', person)
copy_directory = config[CONTROL].get('copy directory', fallback=os.environ['HOME'])
if copy_directory == "HOME":
copy_directory = os.environ['HOME']
attachment_directory = f"{copy_directory}/{safe_filename(person)}_attachments"
config[CONTROL]['attachment directory'] = attachment_directory
try:
os.mkdir(attachment_directory)
except FileExistsError:
pass
# Connect to the database
database = DB(args.database, config=config)
if args.get_handles:
print(f"Available handles in the database:\n{database.handles.get_handles()}")
sys.exit(0)
if args.get_chats:
print(f"Available chats in the database:\n{database.chats.get_chats()}")
sys.exit(0)
if args.chat:
chat_id = args.chat
title = args.chat
if args.chat in database.chats.chat_names:
chats = database.chats.chat_names[args.chat]
if len(chats) != 1:
error_string = f"You have {len(chats)} chats named {args.chat}, " \
f"but this program can only handle the case where there is one. " \
f"Rename your group and try again."
logger.error(error_string)
exit(1)
chat_id = chats[0].rowid
title = args.chat
elif int(args.chat) in database.chats.chat_list:
chat_id = int(args.chat)
if database.chats.chat_list[chat_id].chat_name:
title = database.chats.chat_list[chat_id].chat_name
elif args.name:
title = args.name
else:
title = chat_id
else:
logger.error(f"{args.chat} not recognized as a chat. Run 'imessagedb --get_chats' to get the list of chats")
argument_parser.print_help()
exit(1)
message_list = database.Messages('chat', title, chat_id=chat_id)
else:
message_list = database.Messages('person', person, numbers=numbers)
me = config.get('DISPLAY', 'me', fallback='Me')
filename = os.path.join(copy_directory, safe_filename(person))
output_type = config[CONTROL].get('output type', fallback='html')
if output_type == 'text':
database.TextOutput(me, message_list, output_file=out).print()
else:
database.HTMLOutput(me, message_list, output_file=filename)
database.disconnect()
if __name__ == '__main__':
run()
================================================
File: /src/imessagedb/messages.py
================================================
from imessagedb.utils import *
from alive_progress import alive_bar
from imessagedb.message import Message
class Messages:
""" All messages in a conversation or conversations with a particular person """
def __init__(self, database, query_type: str, title: str, numbers: list = None, chat_id: str = None) -> None:
"""
Parameters
----------
database : imessagedb.DB
An instance of a connected database
query_type : str
The type of messages, either 'person' or 'chat'
title : str
The name of the conversation
numbers : list
A list of numbers associated with the person, as represented in the handle data table
chat_id : str
The id of the chat
"""
self._database = database
self._query_type = query_type
self._numbers = numbers
self._chat_id = chat_id
self._title = title
self._guids = {}
self._message_list = {}
time_rules = []
time_where_clause = ""
start_time = self._database.control.get('start time', fallback=None)
end_time = self._database.control.get('end time', fallback=None)
if start_time:
database_start_date = convert_to_database_date(start_time)
time_rules.append(f"message.date >= {database_start_date}")
if end_time:
database_end_date = convert_to_database_date(end_time)
time_rules.append(f"message.date <= {database_end_date}")
if len(time_rules) > 0:
time_where_clause = f" and {' AND '.join(time_rules)}"
if self._query_type == "person":
numbers_string = "','".join(self._numbers)
where_clause = "rowid in (" \
" select message_id from chat_message_join where chat_id in (" \
" select chat_id from chat_handle_join where handle_id in (" \
f" select rowid from handle where id in ('{numbers_string}')" \
" )" \
" )" \
f") {time_where_clause}"
select_string = "select message.rowid, guid, " \
"datetime(message.date/1000000000 + " \
"strftime('%s', '2001-01-01'),'unixepoch','localtime'), " \
"message.is_from_me, message.handle_id, " \
" message.attributedBody, message.message_summary_info, message.text, " \
"reply_to_guid, thread_originator_guid, thread_originator_part, cmj.chat_id " \
"from message, chat_message_join cmj " \
f"where message.rowid = cmj.message_id and {where_clause} " \
"order by message.date asc"
elif self._query_type == "chat":
where_clause = f"rowid in (select message_id from chat_message_join where chat_id = {self._chat_id}) " \
f" {time_where_clause}"
select_string = "select message.rowid, guid, " \
"datetime(message.date/1000000000 + " \
"strftime('%s', '2001-01-01'),'unixepoch','localtime'), " \
"message.is_from_me, message.handle_id, " \
" message.attributedBody, message.message_summary_info, message.text, " \
"reply_to_guid, thread_originator_guid, thread_originator_part, cmj.chat_id " \
"from message, chat_message_join cmj " \
f"where message.rowid = cmj.message_id and {where_clause} " \
"order by message.date asc"
else:
raise KeyError
row_count_string = f"select count (*) from message where {where_clause}"
self._database.connection.execute(row_count_string)
(row_count_total) = self._database.connection.fetchone()
row_count_total = row_count_total[0]
self._database.connection.execute(select_string)
i = self._database.connection.fetchone()
skip_attachment = self._database.control.getboolean('skip attachments', fallback=False)
with alive_bar(row_count_total, title="Getting Messages", stats="({rate}, eta: {eta})") as bar:
message_count = 0
while i:
(rowid, guid, date, is_from_me, handle_id, attributed_body, message_summary_info, text,
reply_to_guid, thread_originator_guid, thread_originator_part, chat_id) = i
message_count = message_count + 1
attachment_list = None
if not skip_attachment:
if rowid in self._database.attachment_list.message_join:
attachment_list = self._database.attachment_list.message_join[rowid]
new_message = Message(self._database, rowid, guid, date, is_from_me, handle_id, attributed_body,
message_summary_info, text, reply_to_guid, thread_originator_guid,
thread_originator_part, chat_id, attachment_list)
self._guids[guid] = new_message
self._message_list[rowid] = new_message
# Manage the thread
if thread_originator_guid and thread_originator_guid in self._guids:
self._guids[thread_originator_guid].thread[rowid] = new_message
bar()
i = self._database.connection.fetchone()
self._sorted_message_list = sorted(self._message_list.values(), key=lambda x: x.date)
@property
def message_list(self) -> list:
""" Returns a list of messages sorted by the date of the message"""
return self._sorted_message_list
def stats(self) -> list:
""" Returns a list of stats about the message list suitable for importing into a spreadsheet.
The fields returned are:
name: The name of the person sending the message
date: The date of the message
character_count: The number of characters in the message
word_count: The number of words in the message
text: The text of the message
"""
result = []
for i in self._sorted_message_list:
if i.text is not None:
# Take out the carriage returns so this can be parsed by a spreadsheet
text = i.text.replace('\n', ' ')
text_len = len(i.text)
word_count = len(i.text.split(' '))
else:
text = ""
text_len = 0
word_count = 0
date = i.date
stats = {'name': i.name, 'date': date, 'character_count': text_len, 'word_count': word_count, 'text': text}
result.append(stats)
return result
def print_stats(self) -> str:
""" Returns a string of stats about the message list suitable for importing into a spreadsheet """
result = ["Name\tDate\tCharacter Count\tWord Count\tText"]
for i in self.stats():
result.append(f"{i['name']}\t{i['date']}\t{i['character_count']}\t{i['word_count']}\t{i['text']}")
return '\n'.join(result)
@property
def guids(self) -> dict:
return self._guids
@property
def title(self) -> str:
return self._title
def __iter__(self):
return self._sorted_message_list.__iter__()
def __len__(self) -> int:
return len(self._sorted_message_list)
================================================
File: /src/imessagedb/handle.py
================================================
class Handle:
""" Class for holding information about a Handle """
def __init__(self, database, rowid: str, name: str, number: str, service: str) -> None:
"""
Parameters
----------
database : imessagedb.DB
An instance of a connected database
rowid, number, service : str
The parameters are the fields in the database
name : str
A name to associate with it, from the contacts list"""
self._database = database
self._rowid = rowid
self._number = number
self._service = service
self._name = name.title()
def __repr__(self) -> str:
return f'{self._rowid}: Name => {self._name}, ID => {self._number}, Service => {self._service} '
@property
def rowid(self) -> str:
return self._rowid
@property
def number(self) -> str:
return self._number
@property
def service(self) -> str:
return self._service
@property
def name(self) -> str:
return self._name
================================================
File: /src/imessagedb/attachment.py
================================================
import os
import urllib.parse
import re
import shutil
import ffmpeg
import heic2png
class Attachment:
""" Class for holding information about an attachment """
def __init__(self, database, rowid: str, filename: str, mime_type: str,
copy=False, copy_directory=None, home_directory=os.environ['HOME']) -> None:
"""
Parameters
----------
database : imessagedb.DB
An instance of a connected database
rowid, filename, mime_type : str
Fields from the database
copy : bool
Whether or not to copy the attachment, default is False
copy_directory : str
The directory to copy the attachment to, default is none
home_directory : str
Needed to know where to get the attachments from. The attachments in the database
are listed under ~/Library/Messages/Attachments, so need to be able to translate that
"""
self._database = database
self._rowid = rowid
self._filename = filename
self._mime_type = mime_type
self._copy = copy
self._copy_directory = copy_directory
self._home_directory = home_directory
self._destination_path = None
self._popup_type = None
self._conversion_type = None
self._skip = False
self._missing = False
self._needs_conversion = False
self._force = self._database.control.getboolean('force copy', False)
# The path is set to use ~, so replace it with the home directory
self._original_path = self._filename.replace('~', self._home_directory)
if not os.path.exists(self._original_path):
self._missing = True
return
if self._copy_directory is None or not os.path.isdir(self._copy_directory):
self._copy = False
if self._copy:
parts = self._original_path.split('/')
last = parts.pop()
penultimate = parts.pop()
self._destination_filename = f'{penultimate}-{last}'
self._process_mime_type() # We may need to do a conversion and therefore change the filename
if self._copy:
self._destination_path = f'{self._copy_directory}/{self._destination_filename}'
if len(self._destination_path) > 200: # Some filenames are too long
self._destination_filename = f'{self._destination_filename[:50]}---{self._destination_filename[-50:]}'
self._destination_path = f'{self._copy_directory}/{self._destination_filename}'
else:
self._destination_path = self._original_path
return
@property
def rowid(self) -> str:
""" Return the rowid """
return self._rowid
@property
def filename(self) -> str:
""" Return the filename of the attachment """
return self._filename
@property
def mime_type(self) -> str:
""" Return the mime_type of the attachment"""
return self._mime_type
@property
def copy(self) -> bool:
""" Return if the attachment should be copied """
return self._copy
@property
def destination_path(self) -> str:
""" Return the destination path the attachment will be copied to """
return self._destination_path
@property
def popup_type(self) -> str:
""" Return the popup type, one of [Audio, Video, Picture] """
return self._popup_type
@property
def conversion_type(self) -> str:
""" Return the conversion type, one of [Audio, HEIC, Video] """
return self._conversion_type
@property
def skip(self) -> bool:
""" Return if we need to skip this attachment """
return self._skip
@property
def missing(self) -> bool:
""" Return if the attachment is missing """
return self._missing
@property
def original_path(self) -> str:
""" Return the original path of the attachment """
return self._original_path
@property
def destination_filename(self) -> str:
""" Return the filename at the destination (not the whole path)"""
return self._destination_filename
@property
def html_path(self) -> str:
""" Return the html escaped path """
return urllib.parse.quote(self.destination_path)
@property
def link_path(self) -> str:
""" Return a link to the attachment """
return f"file://{urllib.parse.quote(self.destination_path)}"
def _process_mime_type(self) -> None:
""" Based on the mime_type, decide how to convert the file if necessary and how to rename it """
if self._mime_type is None:
# The only type of file that doesn't have a mime type that I am currently interested in are audio files
search_name = re.search(".caf$", self._filename)
if search_name:
self._popup_type = "Audio"
self._conversion_type = "Audio"
if self.copy:
self._destination_filename = f'{self._destination_filename}.mp3'
else:
self._skip = True # We don't care about this attachment
elif self._mime_type[0:5] == "image":
self._popup_type = "Picture"
if self.mime_type == 'image/heic':
self._conversion_type = "HEIC"
if self._copy:
self._destination_filename = f'{self._destination_filename}.png'
# Process audio
elif self._mime_type[0:5] == "audio":
self._popup_type = "Audio"
self._conversion_type = "Audio"
if self._copy:
self._destination_filename = f'{self._destination_filename}.mp3'
# Process Video
elif self._mime_type[0:5] == "video":
self._popup_type = "Video"
if self.mime_type != "video/mp4":
self._conversion_type = "Video"
if self.copy:
self._destination_filename = f'{self._destination_filename}.mp4'
def copy_attachment(self) -> None:
""" Copy the attachment """
# Skip the file copy if the copy already exists
if self._force or not os.path.exists(self._destination_path):
print(f"Copying {self._destination_filename}")
try:
shutil.copyfile(self._original_path, self._destination_path)
return
except Exception as exp:
print(f"Failed to copy {self._destination_filename}: {exp}")
return
else:
# If the file already exists, do nothing
return
def convert_heic_image(self, heic_location: str, png_location: str) -> None:
""" Convert a HEIC image to a PNG so it can be viewed in the browser """
# Don't do the expensive conversion if we've already converted it
if self._force or not os.path.exists(png_location):
try:
print(f"Converting {os.path.basename(png_location)}")
heic_image = heic2png.HEIC2PNG(heic_location)
heic_image.save(png_location)
return
except Exception as exp:
print(f'Failed to convert {heic_location} to {png_location}: {exp}')
return
else:
# If the file exists already, don't convert it
return
def convert_audio_video(self, original: str, converted: str) -> None:
""" Convert an audio or video file from an undisplayable source to something the browser can display"""
if self._force or not os.path.exists(converted):
try:
print(f"Converting {os.path.basename(converted)}")
stream = ffmpeg.input(original)
stream = ffmpeg.output(stream, converted)
stream = ffmpeg.overwrite_output(stream)
ffmpeg.run(stream, quiet=True)
return
except Exception as exp:
print(f'Failed to convert {original} to {converted}: {exp}')
return
else:
# If the file exists already, don't convert it
return
================================================
File: /src/imessagedb/handles.py
================================================
from imessagedb.handle import Handle
class Handles:
""" All handles in the database """
def __init__(self, database) -> None:
"""
Parameters
----------
database : imessagedb.DB
An instance of a connected database
"""
self._database = database
self._handle_list = {} # Handles by rowid
self._numbers = {} # Handles by phone number / email
self._names = {} # Handles by name (from contact list)
self._contacts_by_name = {}
self._contacts_by_number = {}
# Process the contacts first
if self._database.config.has_section('CONTACTS'):
for (key, value) in self._database.config.items('CONTACTS'):
# Capitalize the first letter of every word, since configparser loses case
key = key.title()
value = value.replace('\n', '')
values = value.split(',')
self._contacts_by_name[key] = values
for item in values:
self._contacts_by_number[item] = key
self._get_handles_from_database()
return
def _get_handles_from_database(self):
self._database.connection.execute('select rowid, id, service from handle')
rows = self._database.connection.fetchall()
for row in rows:
rowid = row[0]
number = row[1]
service = row[2]
if number in self._contacts_by_number:
name = self._contacts_by_number[number]
else:
name = number
new_handle = Handle(self._database, rowid, name, number, service)
# Add the handle to the rowid dictionary
self._handle_list[new_handle.rowid] = new_handle
# Add the handle to the numbers dictionary
if new_handle.number in self._numbers:
self._numbers[new_handle.number].append(new_handle)
else:
self._numbers[new_handle.number] = [new_handle]
# Add the handle to the names dictionary
if new_handle.number in self._contacts_by_number:
name = self._contacts_by_number[new_handle.number]
if name in self._names:
self.names[name].append(new_handle)
else:
self._names[name] = [new_handle]
def get_handles(self) -> str:
""" Return a string with the list of handles"""
number_list = list(self._numbers.keys())
return_string = '\n'.join(number_list)
return return_string
@property
def handles(self) -> dict:
""" Return the list of handles """
return self._handle_list
@property
def numbers(self) -> dict:
""" Return the list of handles indexed by the number """
return self._numbers
@property
def names(self) -> dict:
""" Return the list of handles indexed by the number """
return self._names
def name_for_number(self, number: str) -> str:
if number in self._contacts_by_number:
return self._contacts_by_number[number]
return None
def __iter__(self):
return self._handle_list
def __len__(self) -> int:
return len(self._handle_list)
def __repr__(self) -> str:
handle_array = []
for i in sorted(self._handle_list.keys()):
handle_array.append(self._handle_list[i])
return '\n'.join(map(str, handle_array))
def __getitem__(self, item: str) -> Handle:
if item in self._names:
return self._names[item]
if item in self._numbers:
return self._numbers[item]
raise KeyError
================================================
File: /src/imessagedb/generate_text.py
================================================
from termcolor import colored
from datetime import datetime
import string
class TextOutput:
""" Creates a text file (or string) from a Messages list
...
There are a number of options in the configuration file that affect how the HTML is created.
In the DISPLAY section, the following impact the output:
me = Me :
The name to put for your part of the conversation. It defaults to 'Me'.
use text color = True :
If False, don't color the text at all
text color list = red, green, yellow, blue, magenta, cyan
The color for the name in text output. No color is used if 'use text color' is false.
The color can only be one of the following options:
black, red, green, yellow, blue, magenta, cyan, white, light_grey, dark_grey,
light_red, light_green, light_yellow, light_blue, light_magenta, light_cyan
The way that the color selection works is that it will use the first color on the color list
for the first person in the conversation, the second for the second, third for the third, etc.
If there are more participants than colors, it will wrap around to the first color.
reply text color = light_grey :
The color for the reply text """
def __init__(self, database, me: str, messages, output_file=None) -> None:
self._database = database
self._me = me
self._messages = messages
self._attachment_list = self._database.attachment_list
self._output_file = output_file
self._color_list = self._get_next_color()
self._name_map = {}
self._use_color = self._database.config['DISPLAY'].getboolean('use text color', fallback=True)
self._me_color = self._database.config['DISPLAY'].get('me text color', fallback="blue")
self._them_color = self._database.config['DISPLAY'].get('them text color', fallback="magenta")
self._reply_color = self._database.config['DISPLAY'].get('reply text color', fallback="light_grey")
start_time = self._database.control.get('start time', fallback=None)
end_time = self._database.control.get('end time', fallback=None)
date_string = ""
if start_time:
date_string = f"from {start_time} "
if end_time:
date_string = f"{date_string} until {end_time}"
header_string = f"Exchanged {len(self._messages.message_list):,} messages with " \
f"{self._messages.title} {date_string}"
self._string_array = [header_string]
self._get_messages()
return
def _get_name(self, handle_id: str) -> dict:
if handle_id not in self._name_map:
color = next(self._color_list)
handle_list = self._database.handles.handles
if handle_id == 0: # 0 is me
self._name_map[handle_id] = {'name': self._me, 'color': color}
elif handle_id in handle_list:
handle = handle_list[handle_id]
self._name_map[handle_id] = {'name': handle.name, 'color': color}
else:
self._name_map[handle_id] = {'name': handle_id, 'color': color}
return self._name_map[handle_id]
def _print_thread(self, thread_header, current_message: str) -> str:
thread_string = ""
thread_list = thread_header.thread
thread_list[thread_header.rowid] = thread_header
for i in sorted(thread_list.values(), key=lambda x: x.date):
if i == current_message:
break
attachment_string = ""
if i.attachments is not None:
attachment_string = f" Attachments: {i.attachments}"
thread_string = f'{thread_string}[{self._name_map[i.handle_id]["name"]}: {i.text}{attachment_string}] '
return thread_string
def _get_messages(self) -> None:
for message in self._messages.message_list:
date = message.date
day = datetime(int(date[0:4]), int(date[5:7]), int(date[8:10]),
int(date[11:13]), int(date[14:16]),
int(date[17:19])).strftime('%a')
if message.is_from_me:
who_data = self._get_name(0)
else:
who_data = self._get_name(message.handle_id)
who = self._color(who_data['name'], who_data['color'], attrs=['bold'])
reply_to = ""
attachment_string = ""
if message.attachments:
attachments_array = []
attachment_list = self._database.attachment_list.attachment_list
for i in message.attachments:
if i in attachment_list:
attachments_array.append(attachment_list[i].original_path)
else:
attachments_array.append(f"Missing attachment ({i})")
attachment_string = f'Attachments: {",".join(attachments_array)}'
if message.thread_originator_guid:
if message.thread_originator_guid in self._messages.guids:
original_message = self._messages.guids[message.thread_originator_guid]
reply_to = self._color(f'Reply to: {self._print_thread(original_message, message)}',
self._reply_color)
self._string_array.append(f'<{day} {date}> {who}: {message.text} {reply_to} {attachment_string}')
def _get_next_color(self):
""" A generator function to return the next color"""
counter = -1
color_list = self._database.config.get('DISPLAY', 'text color list',
fallback='red, green, yellow, blue, magenta, cyan')
colors = color_list.translate({ord(c): None for c in string.whitespace}).split(',')
while True:
counter += 1
yield colors[counter % len(colors)]
def _color(self, text: str, color: str, attrs=None) -> str:
if self._use_color:
if attrs:
return colored(text, color, attrs=attrs)
else:
return colored(text, color)
else:
return text
def save(self) -> None:
""" Save the text output to the file """
print('\n'.join(self._string_array), file=self._output_file)
return
def print(self) -> None:
""" Print the text output to stdout """
print('\n'.join(self._string_array))
return
def __repr__(self) -> str:
return '\n'.join(self._string_array)
================================================
File: /src/imessagedb/message.py
================================================
import plistlib
from datetime import datetime
from imessagedb.utils import *
import imessagedb
def _convert_attributed_body(encoded: bytes) -> str:
# This general logic was copied from
# https://github.com/my-other-github-account/imessage_tools/blob/master/imessage_tools.py, however
# I needed to make some improvements
text = encoded.split(b'NSNumber')[0]
text = text.split(b'NSString')[1]
text = text.split(b'NSDictionary')[0]
text = text[6:-12]
if b'\x01' in text:
text = text.split(b'\x01')[1]
if b'\x02' in text:
text = text.split(b'\x02')[1]
if b'\x00' in text:
text = text.split(b'\x00')[1]
if b'\x86' in text:
text = text.split(b'\x86')[0]
return text.decode('utf-8', errors='replace')
class Message:
""" Class for holding information about a message """
def __init__(self, database, rowid: int, guid: str, date: str, is_from_me: bool, handle_id: str,
attributed_body: bytes, message_summary_info: bytes, text: str, reply_to_guid: str,
thread_originator_guid: str, thread_originator_part: str, chat_id: str, message_attachments: list):
"""
Parameters
----------
database : imessagedb.DB
An instance of a connected database
rowid, guid, date, is_from_me, handle_id, attributed_body, message_summary_info, text,
reply_to_guid, thread_originator_guid, thread_originator_part, chat_id : str
The parameters are the fields in the database
message_attachments : list
The attachments for this message"""
self._rowid = rowid
self._guid = guid
self._date = date
self._is_from_me = is_from_me
self._handle_id = handle_id
self._attributed_body = attributed_body
self._message_summary_info = message_summary_info
self._text = text
self._attributed_body = attributed_body
self._reply_to_guid = reply_to_guid
self._thread_originator_guid = thread_originator_guid
self._thread_originator_part = thread_originator_part
self._chat_id = chat_id
self._attachments = message_attachments
self._thread = {}
self._edits = []
# There are a lot of messages that are saved into attributed_body instead of the text field.
# There isn't a good way to convert this in Python that I've found, so I have to run a
# program to do it. I need to fix this.
if (self._text is None or text == '' or text == ' ') and self._attributed_body is not None:
self._text = _convert_attributed_body(self._attributed_body)
# Edits are stored in message_summary_info
try:
plist = plistlib.loads(self._message_summary_info)
if 'ec' in plist:
for row in plist['ec']['0']:
self._edits.append({'text': _convert_attributed_body(row['t']),
'date': convert_from_database_date(row['d'])})
except plistlib.InvalidFileException as exp:
pass
def __repr__(self) -> str:
return_string = f'RowID: {self._rowid}' \
f' GUID: {self._guid}' \
f' Date: {self._date}' \
f' From me: {self._is_from_me}' \
f' HandleID: {self._handle_id}' \
f' Message: {self._text}' \
f' Originator Thread: {self._thread_originator_guid}' \
f' Reply Message: {self._reply_to_guid}' \
f' Thread Part: {self._thread_originator_part}' \
f' Chat ID: {self._chat_id}' \
f' Attachments: {self._attachments}'
return str(return_string)
@property
def rowid(self) -> int:
return self._rowid
@property
def guid(self) -> str:
return self._guid
@property
def date(self) -> str:
return self._date
@property
def is_from_me(self) -> bool:
return self._is_from_me
@property
def handle_id(self) -> str:
return self._handle_id
@property
def attributed_body(self) -> bytes:
return self._attributed_body
@property
def message_summary_info(self) -> bytes:
return self._message_summary_info
@property
def text(self) -> str:
return self._text
@property
def edits(self) -> list:
return self._edits
@property
def reply_to_guid(self) -> str:
return self._reply_to_guid
@property
def thread_originator_guid(self) -> str:
return self._thread_originator_guid
@property
def chat_id(self) -> str:
return self._chat_id
@property
def attachments(self) -> list:
return self._attachments
@property
def thread(self) -> dict:
return self._thread
================================================
File: /src/imessagedb/db.py
================================================
import configparser
import os
import sqlite3
import imessagedb
from imessagedb.attachments import Attachments
from imessagedb.chats import Chats
from imessagedb.handles import Handles
from imessagedb.generate_html import HTMLOutput
from imessagedb.messages import Messages
from imessagedb.generate_text import TextOutput
class DB:
"""
A class to connect to an iMessage database
"""
def __init__(self, database_name=None, config=None):
"""
Parameters
----------
database_name : str
The database that it connects to (the default is to use the default database in the caller's home directory)
config : ConfigParser
Configuration information. If none is provided, we will create a default.
"""
if database_name is None:
database_name = f"{os.environ['HOME']}/Library/Messages/chat.db"
self._database_name = database_name
self._configuration = config
if self._configuration is None:
self._configuration = configparser.ConfigParser()
self._configuration.read_string(imessagedb.DEFAULT_CONFIGURATION)
self._control = self._configuration['CONTROL']
self._chat_connection = sqlite3.connect(database_name)
self._cursor = self._chat_connection.cursor()
# Preload some of the data
self._handles = Handles(self)
self._chats = Chats(self)
self._attachment_list = Attachments(self)
return
def Messages(self, query_type: str, title: str, numbers: list = None, chat_id: str = None) -> Messages:
"""A wrapper to create a Messages class
"""
return Messages(self, query_type, title, numbers=numbers, chat_id=chat_id)
def HTMLOutput(self, me: str, message_list: Messages, inline=False, output_file=None) -> HTMLOutput:
"""A wrapper to create an HTMLOutput class
"""
return HTMLOutput(self, me, message_list, inline, output_file)
def TextOutput(self, me: str, message_list: Messages, output_file=None) -> TextOutput:
"""A wrapper to create a TextOutput class
"""
return TextOutput(self, me, message_list, output_file)
def disconnect(self) -> None:
"""Disconnects from the database
"""
self._chat_connection.close()
return
@property
def connection(self) -> sqlite3.Cursor:
"""Returns a connection to query the database
"""
return self._cursor
@property
def handles(self) -> Handles:
"""Returns an imessagedb.Handles class with all the handles
"""
return self._handles
@property
def chats(self) -> Chats:
"""Returns an imessagedb.Chats class with all the chats
"""
return self._chats
@property
def attachment_list(self) -> Attachments:
"""Returns an imessagedb.Attachments class with all the attachments
"""
return self._attachment_list
@property
def config(self) -> configparser.ConfigParser:
"""Returns the configuration object
"""
return self._configuration
@property
def control(self):
"""Returns a shortcut to the CONTROL section of the configuration
"""
return self._control
================================================
File: /src/imessagedb/attachments.py
================================================
from imessagedb.attachment import Attachment
from alive_progress import alive_bar
class Attachments:
""" All attachments """
def __init__(self, database, copy=None, copy_directory=None) -> None:
"""
Parameters
----------
database : imessagedb.DB
An instance of a connected database
copy : bool
Whether or not to copy attachments, default is to use the configuration parameter
copy_directory : str
The directory to copy attachments into
"""
self._database = database
if copy is None:
self._copy = self._database.control.getboolean('copy', fallback=False)
else:
self._copy = copy
if copy_directory is None:
self._copy_directory = self._database.control.get('attachment directory', fallback="/tmp/attachments")
else:
self._copy_directory = copy_directory
self._attachment_list = {}
self._message_join = {}
# Get the list of all the attachments, unless we are skipping them
if self._database.control.getboolean('skip attachments', fallback=False):
return
self._database.connection.execute('select count(rowid)from attachment')
(row_count_total) = self._database.connection.fetchone()
row_count_total = row_count_total[0]
self._database.connection.execute('select rowid, filename, mime_type from attachment')
i = self._database.connection.fetchone()
with alive_bar(row_count_total, title="Getting Attachments", stats="({rate}, eta: {eta})") as bar:
while i:
rowid = i[0]
filename = i[1]
mime_type = i[2]
if filename is not None:
self.attachment_list[rowid] = Attachment(self._database, rowid, filename, mime_type,
copy=self._copy,
copy_directory=self._copy_directory)
bar()
i = self._database.connection.fetchone()
# Get the join of attachments and messages
self._database.connection.execute('select message_id, attachment_id from message_attachment_join')
i = self._database.connection.fetchone()
while i:
message_id = i[0]
attachment_id = i[1]
if message_id in self.message_join:
self.message_join[message_id].append(attachment_id)
else:
self.message_join[message_id] = [attachment_id]
i = self._database.connection.fetchone()
return
@property
def attachment_list(self) -> dict:
""" Return the dictionary of all attachments """
return self._attachment_list
@property
def message_join(self) -> dict:
""" Return the mapping of messages to attachments """
return self._message_join
def __iter__(self):
return self.attachment_list.__iter__()
def __len__(self) -> int:
return len(self.attachment_list.keys())
================================================
File: /src/imessagedb/utils.py
================================================
""" Utility functions for the class """
from datetime import datetime
mac_epoch_start = int(datetime(2001, 1, 1, 0, 0, 0).strftime('%s'))
def convert_to_database_date(date_string: str) -> float:
date_ = datetime.strptime(date_string, '%Y-%m-%d %H:%M:%S')
epoch_date = int(date_.strftime('%s'))
diff = epoch_date - mac_epoch_start
return diff * 1000000000
def convert_from_database_date(date_value: float) -> datetime:
# date_ = date_value / 1000000000
epoch_date = date_value + mac_epoch_start
return datetime.fromtimestamp(epoch_date)
def safe_filename(filename: str) -> str:
safe_name = filename.replace(' ', '_')
return safe_name
================================================
File: /src/imessagedb/chats.py
================================================
from imessagedb.chat import Chat
class Chats:
""" All Chats in the database """
def __init__(self, database) -> None:
"""
Parameters
----------
database : imessagedb.DB
An instance of a connected database"""
self._database = database
self._chat_list = {} # Chat list by rowid
self._chat_identifiers = {} # Chat list by identifier
self._chat_names = {} # Chat list by name
self._get_chats_from_database()
return
def __repr__(self) -> str:
string_array = []
for i in sorted(self.chat_list):
string_array.append(str(self.chat_list[i]))
return '\n'.join(string_array)
def __len__(self) -> int:
return len(self._chat_list)
def _get_chats_from_database(self) -> None:
self._database.connection.execute('select rowid, chat_identifier, display_name from chat')
rows = self._database.connection.fetchall()
for row in rows:
rowid = row[0]
chat_identifier = row[1]
display_name = row[2]
new_chat = Chat(self._database, rowid, chat_identifier, display_name)
self.chat_list[new_chat.rowid] = new_chat
# Add the chat to the chat_identifiers
if new_chat.chat_identifier in self.chat_identifiers:
self._chat_identifiers[new_chat.chat_identifier].append(new_chat)
else:
self._chat_identifiers[new_chat.chat_identifier] = [new_chat]
# Add the chat to the chat_names
if new_chat.chat_name != "":
if new_chat.chat_name in self._chat_names:
self._chat_names[new_chat.chat_name].append(new_chat)
else:
self._chat_names[new_chat.chat_name] = [new_chat]
# Add the last chat date to all the chats
for i in self.chat_list.values():
select_string = "select " \
"datetime(max(message_date)/1000000000 + strftime('%s', '2001-01-01'),'unixepoch','localtime') " \
f"from chat_message_join cmj where chat_id = {i.rowid}"
self._database.connection.execute(select_string)
rows = self._database.connection.fetchall()
i.last_message_date = rows[0][0]
# Add the participants for all the chats
self._database.connection.execute('select chat_id, handle_id from chat_handle_join')
rows = self._database.connection.fetchall()
for row in rows:
chat_id = row[0]
handle_id = row[1]
self.chat_list[row[0]].add_participant(row[1])
return
def get_chats(self) -> str:
""" Return a string with the list of chats in the database"""
return_array = []
for chat_id in sorted(self._chat_list):
chat = self._chat_list[chat_id]
chat_name = ""
if chat.chat_name and chat.chat_name != '':
chat_name = f"{chat.rowid} ({chat.chat_name}):"
else:
chat_name = f"{chat.rowid}:"
chat_string = f"{chat_name} Participants: {chat.participants}, Last Message Sent: {chat.last_message_date}"
return_array.append(chat_string)
return '\n'.join(return_array)
@property
def chat_list(self) -> dict:
""" Return the list of chats by rowid in a dict"""
return self._chat_list
@property
def chat_names(self) -> dict:
""" Return the list of chats by rowid in a dict"""
return self._chat_names
@property
def chat_identifiers(self) -> dict:
""" Return the list of chats by chat identifier in a dict"""
return self._chat_identifiers
def __getitem__(self, item) -> Chat:
if item in self._chat_names:
return self._chat_names[item]
if item in self._chat_list:
return self._chat_list[item]
if item in self._chat_identifiers:
return self._chat_identifiers[item]
raise KeyError