// Package load provides APOC data loading functions.
//
// This package implements all apoc.load.* functions for loading
// data from external sources (JSON, CSV, XML, JDBC, etc.).
package load
import (
"encoding/csv"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strings"
)
// Json loads JSON data from a URL or file.
//
// Example:
//
// apoc.load.json('http://example.com/data.json') => parsed data
// apoc.load.json('/path/to/file.json') => parsed data
func Json(urlOrFile string) (interface{}, error) {
var data interface{}
var err error
// Check if it's a URL or file path
if strings.HasPrefix(urlOrFile, "http://") || strings.HasPrefix(urlOrFile, "https://") {
data, err = loadJsonFromURL(urlOrFile)
} else {
data, err = loadJsonFromFile(urlOrFile)
}
if err != nil {
return nil, fmt.Errorf("failed to load JSON: %w", err)
}
return data, nil
}
// loadJsonFromURL loads JSON from an HTTP(S) URL.
func loadJsonFromURL(url string) (interface{}, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("HTTP error: %s", resp.Status)
}
var data interface{}
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
return nil, err
}
return data, nil
}
// loadJsonFromFile loads JSON from a file path.
func loadJsonFromFile(path string) (interface{}, error) {
file, err := os.Open(path)
if err != nil {
return nil, err
}
defer file.Close()
var data interface{}
if err := json.NewDecoder(file).Decode(&data); err != nil {
return nil, err
}
return data, nil
}
// JsonStream loads JSON data as a stream.
//
// Example:
//
// apoc.load.jsonStream('http://example.com/data.json') => stream of objects
func JsonStream(url string) ([]interface{}, error) {
// Placeholder - would stream JSON objects
data, err := Json(url)
if err != nil {
return nil, err
}
if arr, ok := data.([]interface{}); ok {
return arr, nil
}
return []interface{}{data}, nil
}
// JsonArray loads a JSON array from a URL.
//
// Example:
//
// apoc.load.jsonArray('http://example.com/data.json') => array
func JsonArray(url string) ([]interface{}, error) {
data, err := Json(url)
if err != nil {
return nil, err
}
if arr, ok := data.([]interface{}); ok {
return arr, nil
}
return nil, fmt.Errorf("data is not an array")
}
// JsonParams loads JSON with parameters.
//
// Example:
//
// apoc.load.jsonParams('http://api.com/data', {headers: {...}}, 'POST') => data
func JsonParams(url string, headers map[string]string, method string) (interface{}, error) {
client := &http.Client{}
req, err := http.NewRequest(method, url, nil)
if err != nil {
return nil, err
}
for key, value := range headers {
req.Header.Set(key, value)
}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var data interface{}
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
return nil, err
}
return data, nil
}
// Csv loads CSV data from a URL or file.
//
// Example:
//
// apoc.load.csv('http://example.com/data.csv', {delimiter: ',', header: true}) => rows
// apoc.load.csv('/path/to/file.csv', {delimiter: ',', header: true}) => rows
func Csv(urlOrFile string, config map[string]interface{}) ([]map[string]interface{}, error) {
// Parse configuration
delimiter := ','
if d, ok := config["delimiter"].(string); ok && len(d) > 0 {
delimiter = rune(d[0])
}
if d, ok := config["sep"].(string); ok && len(d) > 0 {
delimiter = rune(d[0])
}
hasHeader := true
if h, ok := config["header"].(bool); ok {
hasHeader = h
}
var reader *csv.Reader
var closer io.Closer
// Load from URL or file
if strings.HasPrefix(urlOrFile, "http://") || strings.HasPrefix(urlOrFile, "https://") {
resp, err := http.Get(urlOrFile)
if err != nil {
return nil, fmt.Errorf("failed to fetch CSV: %w", err)
}
if resp.StatusCode != http.StatusOK {
resp.Body.Close()
return nil, fmt.Errorf("HTTP error: %s", resp.Status)
}
closer = resp.Body
reader = csv.NewReader(resp.Body)
} else {
file, err := os.Open(urlOrFile)
if err != nil {
return nil, fmt.Errorf("failed to open CSV: %w", err)
}
closer = file
reader = csv.NewReader(file)
}
defer closer.Close()
reader.Comma = delimiter
reader.LazyQuotes = true
// Read all records
records, err := reader.ReadAll()
if err != nil {
return nil, fmt.Errorf("failed to parse CSV: %w", err)
}
if len(records) == 0 {
return []map[string]interface{}{}, nil
}
// Build result
rows := make([]map[string]interface{}, 0)
var headers []string
startRow := 0
if hasHeader && len(records) > 0 {
headers = records[0]
startRow = 1
}
for i := startRow; i < len(records); i++ {
record := records[i]
// Build map if we have headers
row := make(map[string]interface{})
if hasHeader && headers != nil {
for j, header := range headers {
if j < len(record) {
row[header] = record[j]
}
}
} else {
// No headers - use column indices
for j, value := range record {
row[fmt.Sprintf("col%d", j)] = value
}
}
rows = append(rows, row)
}
return rows, nil
}
// CsvStream loads CSV data as a stream.
//
// Example:
//
// apoc.load.csvStream('http://example.com/data.csv', {}) => stream of rows
func CsvStream(url string, config map[string]interface{}) ([]map[string]interface{}, error) {
return Csv(url, config)
}
// Xml loads XML data from a URL or file.
//
// Example:
//
// apoc.load.xml('http://example.com/data.xml') => parsed XML
func Xml(url string) (map[string]interface{}, error) {
// Placeholder - would fetch and parse XML
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
// Simple XML parsing placeholder
return map[string]interface{}{
"root": "xml data",
}, nil
}
// XmlSimple loads XML data in simplified format.
//
// Example:
//
// apoc.load.xmlSimple('http://example.com/data.xml') => simplified XML
func XmlSimple(url string) (map[string]interface{}, error) {
return Xml(url)
}
// Html loads HTML data from a URL.
//
// Example:
//
// apoc.load.html('http://example.com', {}) => parsed HTML
func Html(url string, config map[string]interface{}) (map[string]interface{}, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
// Placeholder - would parse HTML
return map[string]interface{}{
"title": "Page Title",
"body": "Page Content",
}, nil
}
// Jdbc loads data from a JDBC database connection.
//
// Example:
//
// apoc.load.jdbc('jdbc:mysql://localhost/db', 'SELECT * FROM users') => rows
func Jdbc(jdbcUrl string, query string) ([]map[string]interface{}, error) {
// Placeholder - would connect to database and execute query
return []map[string]interface{}{}, nil
}
// JdbcUpdate executes an update query via JDBC.
//
// Example:
//
// apoc.load.jdbcUpdate('jdbc:mysql://localhost/db', 'UPDATE users SET active=1') => count
func JdbcUpdate(jdbcUrl string, query string) (int, error) {
// Placeholder - would execute update query
return 0, nil
}
// Driver loads data using a custom driver.
//
// Example:
//
// apoc.load.driver('mongodb', 'mongodb://localhost/db', 'users.find()') => data
func Driver(driverName string, url string, query string) (interface{}, error) {
// Placeholder - would use custom driver
return nil, fmt.Errorf("driver not implemented: %s", driverName)
}
// Directory loads all files from a directory.
//
// Example:
//
// apoc.load.directory('/path/to/dir', '*.json') => file list
func Directory(path string, pattern string) ([]string, error) {
// Placeholder - would list directory files
return []string{}, nil
}
// DirectoryTree loads directory structure as a tree.
//
// Example:
//
// apoc.load.directoryTree('/path/to/dir') => tree structure
func DirectoryTree(path string) (map[string]interface{}, error) {
// Placeholder - would build directory tree
return map[string]interface{}{
"path": path,
"children": []interface{}{},
}, nil
}
// Ldap loads data from an LDAP server.
//
// Example:
//
// apoc.load.ldap('ldap://server', 'dc=example,dc=com', '(uid=*)') => entries
func Ldap(url string, baseDN string, filter string) ([]map[string]interface{}, error) {
// Placeholder - would query LDAP
return []map[string]interface{}{}, nil
}
// JsonSchema validates JSON against a schema.
//
// Example:
//
// apoc.load.jsonSchema(data, schema) => {valid: true}
func JsonSchema(data interface{}, schema map[string]interface{}) (map[string]interface{}, error) {
// Placeholder - would validate against JSON schema
return map[string]interface{}{
"valid": true,
"errors": []string{},
}, nil
}
// Arrow loads Apache Arrow data.
//
// Example:
//
// apoc.load.arrow('http://example.com/data.arrow') => data
func Arrow(url string) ([]map[string]interface{}, error) {
// Placeholder - would load Arrow format
return []map[string]interface{}{}, nil
}
// Parquet loads Apache Parquet data.
//
// Example:
//
// apoc.load.parquet('http://example.com/data.parquet') => data
func Parquet(url string) ([]map[string]interface{}, error) {
// Placeholder - would load Parquet format
return []map[string]interface{}{}, nil
}
// Avro loads Apache Avro data.
//
// Example:
//
// apoc.load.avro('http://example.com/data.avro') => data
func Avro(url string) ([]map[string]interface{}, error) {
// Placeholder - would load Avro format
return []map[string]interface{}{}, nil
}
// S3 loads data from Amazon S3.
//
// Example:
//
// apoc.load.s3('s3://bucket/key', {credentials: {...}}) => data
func S3(url string, config map[string]interface{}) ([]byte, error) {
// Placeholder - would load from S3
return []byte{}, nil
}
// Gcs loads data from Google Cloud Storage.
//
// Example:
//
// apoc.load.gcs('gs://bucket/key', {credentials: {...}}) => data
func Gcs(url string, config map[string]interface{}) ([]byte, error) {
// Placeholder - would load from GCS
return []byte{}, nil
}
// Azure loads data from Azure Blob Storage.
//
// Example:
//
// apoc.load.azure('https://account.blob.core.windows.net/container/blob', {}) => data
func Azure(url string, config map[string]interface{}) ([]byte, error) {
// Placeholder - would load from Azure
return []byte{}, nil
}
// Kafka loads messages from Apache Kafka.
//
// Example:
//
// apoc.load.kafka('localhost:9092', 'topic', {}) => messages
func Kafka(brokers string, topic string, config map[string]interface{}) ([]map[string]interface{}, error) {
// Placeholder - would consume from Kafka
return []map[string]interface{}{}, nil
}
// Redis loads data from Redis.
//
// Example:
//
// apoc.load.redis('localhost:6379', 'GET', 'key') => value
func Redis(addr string, command string, args ...string) (interface{}, error) {
// Placeholder - would execute Redis command
return nil, nil
}
// Elasticsearch loads data from Elasticsearch.
//
// Example:
//
// apoc.load.elasticsearch('http://localhost:9200', 'index', 'query') => hits
func Elasticsearch(url string, index string, query string) ([]map[string]interface{}, error) {
// Placeholder - would query Elasticsearch
return []map[string]interface{}{}, nil
}
// GraphQL loads data from a GraphQL endpoint.
//
// Example:
//
// apoc.load.graphql('http://api.com/graphql', 'query { users { id name } }', {}) => data
func GraphQL(url string, query string, variables map[string]interface{}) (interface{}, error) {
// Placeholder - would execute GraphQL query
payload := map[string]interface{}{
"query": query,
"variables": variables,
}
jsonData, err := json.Marshal(payload)
if err != nil {
return nil, err
}
resp, err := http.Post(url, "application/json", strings.NewReader(string(jsonData)))
if err != nil {
return nil, err
}
defer resp.Body.Close()
var result map[string]interface{}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, err
}
return result["data"], nil
}
// Rest loads data from a REST API.
//
// Example:
//
// apoc.load.rest('http://api.com/users', {method: 'GET'}) => data
func Rest(url string, config map[string]interface{}) (interface{}, error) {
method := "GET"
if m, ok := config["method"].(string); ok {
method = m
}
client := &http.Client{}
req, err := http.NewRequest(method, url, nil)
if err != nil {
return nil, err
}
if headers, ok := config["headers"].(map[string]string); ok {
for key, value := range headers {
req.Header.Set(key, value)
}
}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var data interface{}
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
return nil, err
}
return data, nil
}
// Binary loads binary data from a URL.
//
// Example:
//
// apoc.load.binary('http://example.com/file.bin') => bytes
func Binary(url string) ([]byte, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return io.ReadAll(resp.Body)
}
// Stream loads data as a stream.
//
// Example:
//
// apoc.load.stream('http://example.com/data', {format: 'json'}) => stream
func Stream(url string, config map[string]interface{}) (io.ReadCloser, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
return resp.Body, nil
}