We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/jhigh1594/agileplace-mcp-server'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""Define the invoke tasks."""
import sys
import textwrap
from invoke import Collection
from invoke import task
from invoke.tasks import call
from platforma_invocations import (
alembic,
deploy,
docker,
hasura,
linter,
minikube,
poetry,
pytest,
sphinx,
static_analyzer,
templar,
)
from platforma_invocations.utils import common, aws, git
from platforma_invocations.utils import minikube as minikube_utils
# Define AWS parameters for the development environment.
AWS_ACCOUNT = "323341875315"
AWS_REGION = "us-west-2"
# Define Docker parameters.
ORG = "platforma"
SERVICE_NAME = "okrs-api"
TAG = "snapshot"
JOBS_NAME = f"{SERVICE_NAME}-jobs"
# Define Helm parameters.
NAMESPACE = "okr"
CHART_REPOSITORY = ORG
CHART_NAME = SERVICE_NAME
CHART_VERSION = "0.3.3"
# Define general parameters.
PROJECT = "okrs_api"
TOOLS = "tools"
CONNEXION_ENV = {"CONNEXION_ENVIRONMENT": "local"}
HASURA_GRAPHQL_ADMIN_SECRET = "hasurasecret"
@task
def install(c):
"""Install the project."""
poetry.install(c, ORG, AWS_ACCOUNT)
@task
def update(c):
"""Update the project."""
poetry.update(c, ORG, AWS_ACCOUNT)
@task
def check_models(c):
"""Check that models are in sync with specfile."""
status = c.run("git status --porcelain", hide=True).stdout.strip()
if "models_autogenerated.py" in status:
print(
textwrap.dedent(
"""
The models do not match the specification file.
Verify that the changes were expected and commit the new models."""
).strip()
)
sys.exit(1)
@task(
pre=[
static_analyzer.format_,
call(check_models),
sphinx.build,
call(linter.all_, directory=PROJECT),
call(pytest.unit),
call(pytest.integration),
]
)
def ci_project(_c):
"""Run all the CI tasks for the main project."""
@task(
pre=[
static_analyzer.format_,
call(linter.all_, project=TOOLS),
]
)
def ci_tools(_c):
"""Run all the CI tasks for the scripts."""
@task(ci_project, ci_tools)
def ci(_c):
"""Run all the CI tasks at once."""
@task
def start_api(c):
"""Run Connexion locally."""
c.run(f"poetry run python {PROJECT}/main.py", env=CONNEXION_ENV)
@task
def start_wsgi(c):
"""Run Connexion locally as a WSGI application."""
c.run(
"poetry run gunicorn "
"--reload "
"--timeout 1800 "
"--log-level debug "
"-b 0.0.0.0:8000 "
"--worker-class aiohttp.GunicornUVLoopWebWorker "
"--access-logfile - "
'--access-logformat \'%a %t "%r" %s %b %Tf %D "%{Referer}i" "%{User-Agent}i"\' '
f"{PROJECT}.main",
env=CONNEXION_ENV,
pty=True,
)
@task(help={"dry-run": "Only output the migration changes. Do not make a migration."})
def apply_triggers(c, dry_run=False):
cmd = f"poetry run python {TOOLS}/custom_migration.py triggers"
if dry_run:
cmd = f"{cmd} --dry-run"
c.run(cmd)
@task(
iterable=["tablename"],
help={
"tablename": "Table to add create timestamp fields to. Multiple allowed.",
},
)
def timestamp_migration(c, tablename):
"""Generate migration to add created_at, updated_at fields w/ trigger to a table."""
tablenames = " ".join(map(str, tablename))
migration_revision = common.custom_now()
migration_name = f"timestamps_for_{tablenames.replace(' ', '_')}"[0:37]
c.run(
"poetry run alembic revision "
f'--rev-id={migration_revision} -m "{migration_name}"'
)
c.run(
f"poetry run python {TOOLS}/timestamp-migration.py "
f"{migration_revision}_{migration_name} "
f"{tablenames}"
)
@task
def deleted_event_trigger_migration(c):
"""
Generate migration to cleanup old or renamed event triggers.
This will not only clean up the trigger logs, but also delete any unused
postgres triggers starting with `notify_hasura_`.
Use this sql query in order to find event_triggers that can be removed:
SELECT trigger_name, archived, COUNT(*) FROM hdb_catalog.event_log GROUP BY trigger_name, archived;
https://github.com/hasura/graphql-engine/issues/5461#issuecomment-664463460
"""
migration_revision = common.custom_now()
migration_name = "cleanup_event_triggers"
c.run(
"poetry run alembic revision "
f'--rev-id={migration_revision} -m "{migration_name}"'
)
c.run(
f"poetry run python {TOOLS}/deleted-event-trigger-migration.py "
f"{migration_revision}_{migration_name} "
)
@task(
help={
"app": "Application to imitate sending JWT from",
"user_id": "Application user id",
"env_id": "Application environment id",
"domain": "Application domain",
"org_id": "Application org id",
},
)
def generate_pts_jwt(c, app, user_id, env_id, domain, org_id):
"""
Generate a JWT with the planview token service.
ENV variables needed:
- `PLANVIEW_TOKEN_SERVICE_CLIENT_ID`: the client id for PTS
- `PLANVIEW_TOKEN_SERVICE_CLIENT_SECRET`: the client secret for PTS
- `PLANVIEW_TOKEN_SERVICE_REGION`: the region for the service. Default ('us-west-2')
"""
c.run(
f"poetry run python {TOOLS}/pts-jwt-generator.py "
f"{app} {user_id} {env_id} {domain} {org_id}"
)
@task
def generate_controllers(c):
"""Generate default controllers."""
c.run(
"poetry run python tools/connexion-controller.py "
"--openapi openapi/openapi.yml "
f"--output {PROJECT}/api/controller/"
)
@task
def seed_data(c):
"""Seed the database."""
with c.cd(TOOLS):
c.run("poetry run python seeder.py")
@task(
help={
"port": "Supply the tunnel port number. Default: 8000",
"subdomain": "Supply a custom subdomain name.",
}
)
def create_local_tunnel(c, port=8000, subdomain=None):
"""Creates a local tunnel for the Hasura instance to call as webhook"""
installed = _check_local_tunnel_package_installed(c)
if not installed:
_install_local_tunnel_package(c)
if not subdomain:
whoami = c.run("whoami", hide=True).stdout.strip()
subdomain = f"pv-{whoami}-{NAMESPACE}"
print("Running the local tunnel... Press ^c to stop")
c.run(f"lt --port {port} --subdomain {subdomain}")
@task
def generate_models(c):
"""Generate OpenAlchemy models."""
c.run(
"poetry run openalchemy generate "
"openapi/openapi.yml "
f"{PROJECT}/api/models_autogenerated.py "
)
@task
def hasura_export(c):
"""Export Hasura Medatata."""
with c.cd("hasura"):
c.run("hasura md export")
@task
def hasura_import(c):
"""Import Hasura Metadata."""
with c.cd("hasura"):
c.run("hasura md apply")
@task
def hasura_export_odata(c):
"""Export Odata Hasura Medatata."""
with c.cd("hasura-reporting/reporting-metadata"):
c.run("hasura md export")
@task
def hasura_import_odata(c):
"""Import Odata Hasura Metadata."""
with c.cd("hasura-reporting/reporting-metadata"):
c.run("hasura md apply")
@task(
help={
"old_role": "The role name to clone from",
"new_role": "The role name to clone to",
"permissions": "(optional) Permissions to clone.",
},
)
def hasura_clone_role(c, old_role, new_role, permissions=None):
cmd = f"poetry run python hasura-role-cloner.py " f"{old_role} {new_role}"
if permissions:
cmd = f"{cmd} {permissions}"
with c.cd(TOOLS):
c.run(cmd)
@task(pre=[alembic.migrate, hasura_import, seed_data])
def prepare(_c):
"""Alias combining all the tasks required to prepare the environment."""
def _check_local_tunnel_package_installed(c):
"""Checks if the localtunnel npm package is installed globally"""
cmd = "npm list --depth 0 -g localtunnel | grep localtunnel"
output = c.run(cmd, warn=True, hide=True).stdout.strip()
return bool(output)
def _install_local_tunnel_package(c):
"""Will install the localtunnel npm package"""
c.run("npm install -g localtunnel", warn=True)
@task
def delete_minikube(c):
"""Delete the helm release from minikube."""
minikube.delete(c, SERVICE_NAME, NAMESPACE)
@task
def poetry_setup(c):
"""Configure Poetry environment."""
poetry.setup(c, ORG, AWS_ACCOUNT)
@task
def docker_build(c):
"""Build the Docker image."""
docker.build(
c,
ORG,
SERVICE_NAME,
use_codeartifact=True,
domain=ORG,
domain_owner=AWS_ACCOUNT,
)
def custom_docker_build(
c,
organization,
service_name,
docker_file="Dockerfile",
use_codeartifact=False,
domain=None,
domain_owner=None,
force=False,
build_args=None,
):
"""Build the Docker image."""
# Get the current tag.
tag = git.get_current_tag(c)
# Skip the task if the image already exists.
if not force and aws.docker_image_exists(c, service_name, tag):
return
# Prepare the command and run it.
cmd = [f"docker build -f {docker_file} -t {organization}/{service_name}:{tag}"]
if use_codeartifact:
auth_token = aws.get_codeartifact_token(c, domain, domain_owner)
cmd.append(f"--build-arg CODEARTIFACT_AUTH_TOKEN={auth_token}")
if build_args:
for build_arg in build_args:
cmd.append(f"--build-arg {build_arg}")
cmd.append(".")
c.run(" ".join(cmd), env={"DOCKER_BUILDKIT": "1"})
@task
def docker_batch_build(c):
"""Build the Docker image."""
custom_docker_build(
c,
ORG,
"okrs-batch",
docker_file="Dockerfile.batch",
use_codeartifact=True,
domain=ORG,
domain_owner=AWS_ACCOUNT,
)
@task
def docker_run(c):
"""Run the Docker container."""
docker.run(c, ORG, SERVICE_NAME, publish_ports="8000:8000")
@task
def docker_publish(c, latest=False):
"""Publish the Docker image."""
docker.publish(c, AWS_ACCOUNT, AWS_REGION, ORG, SERVICE_NAME, latest)
@task
def deploy_minikube(c):
"""Deploy the project to minikube."""
deploy.deploy(
c,
SERVICE_NAME,
CHART_REPOSITORY,
CHART_NAME,
CHART_VERSION,
"minikube",
"minikube",
"local",
None,
NAMESPACE,
values=[
f"--set ingress.hosts[0].host={SERVICE_NAME}.{NAMESPACE}.{minikube_utils.ip(c)}.nip.io",
"--set ingress.hosts[0].paths[0]=/",
],
)
@task
def deploy_any(c, kube_context, environment, region):
"""Deploy the project to any environment."""
deploy.deploy(
c,
SERVICE_NAME,
CHART_REPOSITORY,
CHART_NAME,
CHART_VERSION,
kube_context,
environment,
region,
None,
NAMESPACE,
[],
)
@task
def publish(c):
"""Publish Python packages on our internal PyPI."""
poetry.publish(c, ORG, AWS_ACCOUNT)
# ##############################################################################
# Create a CI collection.
ci_collection = Collection("ci")
ci_collection.add_task(ci_project, name="project", default=True)
ci_collection.add_task(ci_tools, name="tools")
ci_collection.add_task(ci, name="all")
# Create a deploy collection.
deployment = Collection("deploy")
deployment.add_task(deploy_minikube, "minikube")
deployment.add_task(deploy_any, "any")
# Create a docker collection.
docker_collection = Collection("docker")
docker_collection.add_task(docker_build, "build")
docker_collection.add_task(docker_batch_build, "build_batch")
docker_collection.add_task(docker_publish, "publish")
docker_collection.add_task(docker_run, "run")
# Create a generate collection.
generate = Collection("generate")
generate.add_task(generate_controllers, "controllers")
generate.add_task(generate_pts_jwt, "pts_jwt")
# Create a hasura collection.
hasura_collection = Collection("hasura")
hasura_collection.add_task(hasura_export, "export")
hasura_collection.add_task(hasura_import, "import")
hasura_collection.add_task(hasura_clone_role, "clone_role")
hasura_collection.add_task(hasura_export_odata, "export_odata")
hasura_collection.add_task(hasura_import_odata, "import_odata")
# Create a migration collection.
migration = Collection("migration")
migration.add_task(alembic.autogenerate_migration, "generate")
migration.add_task(alembic.migrate)
migration.add_task(alembic.new_migration, "new")
migration.add_task(alembic.merge_migration, "merge")
migration.add_task(timestamp_migration, "timestamp")
migration.add_task(apply_triggers, "apply_triggers")
migration.add_task(deleted_event_trigger_migration, "deleted_event_trigger")
# Create a models collection.
models = Collection("models")
models.add_task(check_models, "check")
models.add_task(generate_models, "generate")
# Create a setup collection.
setup = Collection("setup")
setup.add_task(poetry_setup, "poetry")
setup.add_task(seed_data, "seed")
setup.add_task(prepare, "prepare")
# Create a start collection.
start = Collection("start")
start.add_task(start_api, "api")
start.add_task(start_wsgi, "wsgi", default=True)
start.add_task(create_local_tunnel, "local-tunnel")
# Create the default collection.
ns = Collection()
ns.add_task(delete_minikube, "delete")
ns.add_task(static_analyzer.format_)
ns.add_task(install, default=True)
ns.add_task(publish)
ns.add_task(update)
# Add the sub-collections.
ns.add_collection(ci_collection)
ns.add_collection(deployment)
ns.add_collection(docker_collection)
ns.add_collection(generate)
ns.add_collection(hasura_collection)
ns.add_collection(linter, name="lint")
ns.add_collection(migration)
ns.add_collection(models)
ns.add_collection(setup)
ns.add_collection(start)
ns.add_collection(pytest, name="test")
ns.add_collection(sphinx, name="docs")
ns.add_collection(templar)
# Set default tasks.
ns.collections["docs"].default = "build"
ns.collections["docker"].default = "build"
ns.collections["lint"].default = "all"
ns.collections["test"].default = "unit"