Files
n8n-compose/scripts/sql-query-executor.py

198 lines
6.1 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
Simple HTTP Server for executing SQL queries
Used by n8n workflows to avoid needing specialized database nodes
"""
from flask import Flask, request, jsonify
import pymysql
import psycopg2
import logging
import os
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Database configuration
FREESCOUT_DB_CONFIG = {
'host': os.getenv('FREESCOUT_DB_HOST', '10.136.40.104'),
'port': int(os.getenv('FREESCOUT_DB_PORT', 3306)),
'user': os.getenv('FREESCOUT_DB_USER', 'freescout'),
'password': os.getenv('FREESCOUT_DB_PASSWORD', '5N6fv4wIgsI6BZV'),
'database': os.getenv('FREESCOUT_DB_NAME', 'freescout'),
'charset': 'utf8mb4',
'autocommit': True,
}
POSTGRES_AUDIT_CONFIG = {
'host': os.getenv('POSTGRES_HOST', 'postgres'),
'port': int(os.getenv('POSTGRES_PORT', 5432)),
'user': os.getenv('POSTGRES_USER', 'kb_user'),
'password': os.getenv('POSTGRES_PASSWORD', 'change_me_securely'),
'database': os.getenv('POSTGRES_DB', 'n8n_kb'),
}
def execute_query(db_type, query):
"""
Execute a SQL query and return results
db_type: 'freescout' or 'audit'
"""
connection = None
cursor = None
try:
if db_type == 'freescout':
connection = pymysql.connect(**FREESCOUT_DB_CONFIG)
cursor = connection.cursor(pymysql.cursors.DictCursor)
elif db_type == 'audit':
connection = psycopg2.connect(
host=POSTGRES_AUDIT_CONFIG['host'],
port=POSTGRES_AUDIT_CONFIG['port'],
user=POSTGRES_AUDIT_CONFIG['user'],
password=POSTGRES_AUDIT_CONFIG['password'],
database=POSTGRES_AUDIT_CONFIG['database']
)
cursor = connection.cursor()
else:
return None, "Invalid database type"
logger.info(f"Executing {db_type} query: {query[:100]}...")
cursor.execute(query)
if query.strip().upper().startswith('SELECT'):
# Fetch results for SELECT queries
if db_type == 'freescout':
results = cursor.fetchall()
else:
# PostgreSQL: convert to list of dicts
columns = [desc[0] for desc in cursor.description]
results = [dict(zip(columns, row)) for row in cursor.fetchall()]
return results, None
else:
# For INSERT/UPDATE/DELETE
connection.commit()
return {'affected_rows': cursor.rowcount}, None
except pymysql.Error as e:
logger.error(f"Database error: {e}")
return None, str(e)
except Exception as e:
logger.error(f"Error: {e}")
return None, str(e)
finally:
if cursor:
cursor.close()
if connection:
try:
connection.close()
except:
pass
@app.route('/health', methods=['GET'])
def health():
"""Health check endpoint"""
return jsonify({'status': 'ok', 'service': 'sql-executor'}), 200
@app.route('/query', methods=['POST'])
def query():
"""
Execute a SQL query
Request body:
{
"db_type": "freescout" or "audit",
"query": "SELECT * FROM conversations LIMIT 10"
}
"""
try:
data = request.get_json()
if not data or 'query' not in data:
return jsonify({'error': 'Missing query parameter'}), 400
db_type = data.get('db_type', 'freescout')
query_str = data.get('query')
results, error = execute_query(db_type, query_str)
if error:
logger.error(f"Query failed: {error}")
return jsonify({'error': error, 'success': False}), 500
return jsonify({
'success': True,
'data': results,
'count': len(results) if isinstance(results, list) else 1
}), 200
except Exception as e:
logger.error(f"Error: {e}")
return jsonify({'error': str(e), 'success': False}), 500
@app.route('/query/freescout', methods=['POST'])
def query_freescout():
"""Execute query on Freescout database"""
try:
data = request.get_json()
if not data or 'query' not in data:
return jsonify({'error': 'Missing query parameter', 'success': False}), 400
query_str = data.get('query')
results, error = execute_query('freescout', query_str)
if error:
logger.error(f"Query failed: {error}")
return jsonify({'error': error, 'success': False}), 500
return jsonify({
'success': True,
'data': results,
'count': len(results) if isinstance(results, list) else 1
}), 200
except Exception as e:
logger.error(f"Error: {e}")
return jsonify({'error': str(e), 'success': False}), 500
@app.route('/query/audit', methods=['POST'])
def query_audit():
"""Execute query on Audit (PostgreSQL) database"""
try:
data = request.get_json()
if not data or 'query' not in data:
return jsonify({'error': 'Missing query parameter', 'success': False}), 400
query_str = data.get('query')
results, error = execute_query('audit', query_str)
if error:
logger.error(f"Query failed: {error}")
return jsonify({'error': error, 'success': False}), 500
return jsonify({
'success': True,
'data': results,
'count': len(results) if isinstance(results, list) else 1
}), 200
except Exception as e:
logger.error(f"Error: {e}")
return jsonify({'error': str(e), 'success': False}), 500
if __name__ == '__main__':
# Test connection on startup
logger.info("Testing Freescout database connection...")
results, error = execute_query('freescout', 'SELECT 1')
if error:
logger.warning(f"Freescout DB connection test failed: {error} (will retry during runtime)")
else:
logger.info(f"✓ Connected to Freescout DB")
logger.info("Starting SQL Query Executor on 0.0.0.0:4000")
app.run(host='0.0.0.0', port=4000, debug=False, threaded=True)