We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/jordanburke/kuzudb-mcp-server'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
import { test, expect, type APIRequestContext } from "@playwright/test"
import * as jwt from "jsonwebtoken"
const JWT_SECRET = "test-secret-key"
const ADMIN_USERNAME = "admin"
const ADMIN_PASSWORD = "secret123"
test.describe("OAuth JWT Token Management", () => {
let apiContext: APIRequestContext
test.beforeAll(async ({ playwright }) => {
apiContext = await playwright.request.newContext({
baseURL: "http://localhost:3000",
})
})
test.afterAll(async () => {
await apiContext.dispose()
})
test.describe("JWT Token Generation", () => {
test("should generate JWT token with login endpoint", async () => {
// Start server with JWT mode
const response = await apiContext.post("/auth/login", {
data: {
username: ADMIN_USERNAME,
password: ADMIN_PASSWORD,
},
})
// If login endpoint exists
if (response.ok()) {
expect(response.status()).toBe(200)
const body = await response.json()
expect(body).toHaveProperty("token")
expect(body.token).toBeTruthy()
// Verify token structure
const decoded = jwt.decode(body.token)
expect(decoded).toHaveProperty("sub")
expect(decoded).toHaveProperty("iat")
expect(decoded).toHaveProperty("exp")
}
})
test("should reject login with invalid credentials", async () => {
const response = await apiContext.post("/auth/login", {
data: {
username: "wronguser",
password: "wrongpass",
},
})
if (response.status() !== 404) {
// If endpoint exists
expect(response.status()).toBe(401)
const body = await response.json()
expect(body.error).toContain("Invalid credentials")
}
})
})
test.describe("Token Expiration", () => {
test("should handle expired JWT tokens", async () => {
// Create an expired token
const expiredToken = jwt.sign(
{ sub: "test-user" },
JWT_SECRET,
{ expiresIn: "-1h" }, // Expired 1 hour ago
)
const response = await apiContext.post("/", {
headers: {
Authorization: `Bearer ${expiredToken}`,
},
data: {
jsonrpc: "2.0",
id: 1,
method: "tools/list",
params: {},
},
})
// Server might not implement JWT verification yet
if (response.status() === 401) {
const body = await response.json()
expect(body.error).toContain("expired")
}
})
test("should accept valid JWT tokens within expiration", async () => {
// Create a valid token
const validToken = jwt.sign({ sub: "test-user" }, JWT_SECRET, { expiresIn: "1h" })
const response = await apiContext.post("/", {
headers: {
Authorization: `Bearer ${validToken}`,
},
data: {
jsonrpc: "2.0",
id: 1,
method: "tools/list",
params: {},
},
})
// If JWT is implemented
if (response.ok()) {
expect(response.status()).toBe(200)
}
})
test("should handle tokens with custom expiration times", async () => {
const tokens = [
{ exp: "30s", shouldWork: true },
{ exp: "5m", shouldWork: true },
{ exp: "1h", shouldWork: true },
{ exp: "-1s", shouldWork: false },
]
for (const { exp, shouldWork } of tokens) {
const token = jwt.sign({ sub: "test-user" }, JWT_SECRET, { expiresIn: exp })
const response = await apiContext.post("/", {
headers: {
Authorization: `Bearer ${token}`,
},
data: {
jsonrpc: "2.0",
id: 1,
method: "tools/list",
params: {},
},
})
if (shouldWork) {
// May be 200 or 401 depending on implementation
expect([200, 401]).toContain(response.status())
} else {
// Expired tokens should fail if JWT is implemented
if (response.status() === 401) {
const body = await response.json()
expect(body.error).toBeDefined()
}
}
}
})
})
test.describe("Token Refresh", () => {
test("should refresh token before expiration", async () => {
const response = await apiContext.post("/auth/refresh", {
headers: {
Authorization: `Bearer test-token-123`,
},
})
// If refresh endpoint exists
if (response.status() !== 404) {
if (response.ok()) {
const body = await response.json()
expect(body).toHaveProperty("token")
expect(body.token).toBeTruthy()
}
}
})
test("should not refresh expired tokens", async () => {
const expiredToken = jwt.sign({ sub: "test-user" }, JWT_SECRET, { expiresIn: "-1h" })
const response = await apiContext.post("/auth/refresh", {
headers: {
Authorization: `Bearer ${expiredToken}`,
},
})
// If refresh endpoint exists and validates tokens
if (response.status() !== 404) {
expect(response.status()).toBe(401)
}
})
})
test.describe("Token Claims", () => {
test("should include standard JWT claims", async () => {
const token = jwt.sign(
{
sub: "test-user",
role: "admin",
permissions: ["read", "write"],
},
JWT_SECRET,
{
expiresIn: "1h",
issuer: "kuzu-mcp-server",
audience: "mcp-client",
},
)
const decoded = jwt.decode(token, { complete: true })
expect(decoded.payload).toHaveProperty("sub", "test-user")
expect(decoded.payload).toHaveProperty("iss", "kuzu-mcp-server")
expect(decoded.payload).toHaveProperty("aud", "mcp-client")
expect(decoded.payload).toHaveProperty("iat")
expect(decoded.payload).toHaveProperty("exp")
})
test("should validate token signature", async () => {
// Create token with wrong secret
const invalidToken = jwt.sign({ sub: "test-user" }, "wrong-secret", { expiresIn: "1h" })
const response = await apiContext.post("/", {
headers: {
Authorization: `Bearer ${invalidToken}`,
},
data: {
jsonrpc: "2.0",
id: 1,
method: "tools/list",
params: {},
},
})
// If JWT validation is implemented
if (response.status() === 401) {
const body = await response.json()
expect(body.error).toBeDefined()
}
})
})
test.describe("Token Revocation", () => {
test("should handle logout/revocation", async () => {
const response = await apiContext.post("/auth/logout", {
headers: {
Authorization: `Bearer test-token-123`,
},
})
// If logout endpoint exists
if (response.status() !== 404) {
expect(response.status()).toBe(200)
// Subsequent requests with same token should fail
const followUp = await apiContext.post("/", {
headers: {
Authorization: `Bearer test-token-123`,
},
data: {
jsonrpc: "2.0",
id: 1,
method: "tools/list",
params: {},
},
})
// May still work if revocation list not implemented
expect([200, 401]).toContain(followUp.status())
}
})
})
})