Files
n8n-compose/scripts/sql-query-executor.py
Claude Code 96d70d9edf fix: resolve MariaDB collation error by switching from mysql-connector to PyMySQL
- Replace mysql-connector-python with PyMySQL driver for better MariaDB compatibility
- PyMySQL handles utf8mb4_0900_ai_ci collation properly without errors
- Update Dockerfile.sql-executor to install PyMySQL and psycopg2-binary
- Refactor sql-query-executor.py to use PyMySQL API (pymysql.connect, DictCursor)
- Verified sql-executor service with SELECT, INSERT, UPDATE operations on Freescout DB
- Add n8n workflow definitions: workflow-a-http.json and workflow-b-http.json
  * Workflow A: Polls unprocessed conversations, analyzes with LiteLLM, saves suggestions
  * Workflow B: Polls approved suggestions, executes Baramundi jobs or email replies
- Update compose.yaml with sql-executor service configuration and dependencies

All SQL operations now execute successfully against MariaDB 11.3.2
2026-03-17 09:31:03 +01:00

198 lines
6.1 KiB
Python

#!/usr/bin/env python3
"""
Simple HTTP Server for executing SQL queries
Used by n8n workflows to avoid needing specialized database nodes
"""
from flask import Flask, request, jsonify
import pymysql
import psycopg2
import logging
import os
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Database configuration
FREESCOUT_DB_CONFIG = {
'host': os.getenv('FREESCOUT_DB_HOST', '10.136.40.104'),
'port': int(os.getenv('FREESCOUT_DB_PORT', 3306)),
'user': os.getenv('FREESCOUT_DB_USER', 'freescout'),
'password': os.getenv('FREESCOUT_DB_PASSWORD', '5N6fv4wIgsI6BZV'),
'database': os.getenv('FREESCOUT_DB_NAME', 'freescout'),
'charset': 'utf8mb4',
'autocommit': True,
}
POSTGRES_AUDIT_CONFIG = {
'host': os.getenv('POSTGRES_HOST', 'postgres'),
'port': int(os.getenv('POSTGRES_PORT', 5432)),
'user': os.getenv('POSTGRES_USER', 'kb_user'),
'password': os.getenv('POSTGRES_PASSWORD', 'change_me_securely'),
'database': os.getenv('POSTGRES_DB', 'n8n_kb'),
}
def execute_query(db_type, query):
"""
Execute a SQL query and return results
db_type: 'freescout' or 'audit'
"""
connection = None
cursor = None
try:
if db_type == 'freescout':
connection = pymysql.connect(**FREESCOUT_DB_CONFIG)
cursor = connection.cursor(pymysql.cursors.DictCursor)
elif db_type == 'audit':
connection = psycopg2.connect(
host=POSTGRES_AUDIT_CONFIG['host'],
port=POSTGRES_AUDIT_CONFIG['port'],
user=POSTGRES_AUDIT_CONFIG['user'],
password=POSTGRES_AUDIT_CONFIG['password'],
database=POSTGRES_AUDIT_CONFIG['database']
)
cursor = connection.cursor()
else:
return None, "Invalid database type"
logger.info(f"Executing {db_type} query: {query[:100]}...")
cursor.execute(query)
if query.strip().upper().startswith('SELECT'):
# Fetch results for SELECT queries
if db_type == 'freescout':
results = cursor.fetchall()
else:
# PostgreSQL: convert to list of dicts
columns = [desc[0] for desc in cursor.description]
results = [dict(zip(columns, row)) for row in cursor.fetchall()]
return results, None
else:
# For INSERT/UPDATE/DELETE
connection.commit()
return {'affected_rows': cursor.rowcount}, None
except pymysql.Error as e:
logger.error(f"Database error: {e}")
return None, str(e)
except Exception as e:
logger.error(f"Error: {e}")
return None, str(e)
finally:
if cursor:
cursor.close()
if connection:
try:
connection.close()
except:
pass
@app.route('/health', methods=['GET'])
def health():
"""Health check endpoint"""
return jsonify({'status': 'ok', 'service': 'sql-executor'}), 200
@app.route('/query', methods=['POST'])
def query():
"""
Execute a SQL query
Request body:
{
"db_type": "freescout" or "audit",
"query": "SELECT * FROM conversations LIMIT 10"
}
"""
try:
data = request.get_json()
if not data or 'query' not in data:
return jsonify({'error': 'Missing query parameter'}), 400
db_type = data.get('db_type', 'freescout')
query_str = data.get('query')
results, error = execute_query(db_type, query_str)
if error:
logger.error(f"Query failed: {error}")
return jsonify({'error': error, 'success': False}), 500
return jsonify({
'success': True,
'data': results,
'count': len(results) if isinstance(results, list) else 1
}), 200
except Exception as e:
logger.error(f"Error: {e}")
return jsonify({'error': str(e), 'success': False}), 500
@app.route('/query/freescout', methods=['POST'])
def query_freescout():
"""Execute query on Freescout database"""
try:
data = request.get_json()
if not data or 'query' not in data:
return jsonify({'error': 'Missing query parameter', 'success': False}), 400
query_str = data.get('query')
results, error = execute_query('freescout', query_str)
if error:
logger.error(f"Query failed: {error}")
return jsonify({'error': error, 'success': False}), 500
return jsonify({
'success': True,
'data': results,
'count': len(results) if isinstance(results, list) else 1
}), 200
except Exception as e:
logger.error(f"Error: {e}")
return jsonify({'error': str(e), 'success': False}), 500
@app.route('/query/audit', methods=['POST'])
def query_audit():
"""Execute query on Audit (PostgreSQL) database"""
try:
data = request.get_json()
if not data or 'query' not in data:
return jsonify({'error': 'Missing query parameter', 'success': False}), 400
query_str = data.get('query')
results, error = execute_query('audit', query_str)
if error:
logger.error(f"Query failed: {error}")
return jsonify({'error': error, 'success': False}), 500
return jsonify({
'success': True,
'data': results,
'count': len(results) if isinstance(results, list) else 1
}), 200
except Exception as e:
logger.error(f"Error: {e}")
return jsonify({'error': str(e), 'success': False}), 500
if __name__ == '__main__':
# Test connection on startup
logger.info("Testing Freescout database connection...")
results, error = execute_query('freescout', 'SELECT 1')
if error:
logger.warning(f"Freescout DB connection test failed: {error} (will retry during runtime)")
else:
logger.info(f"✓ Connected to Freescout DB")
logger.info("Starting SQL Query Executor on 0.0.0.0:4000")
app.run(host='0.0.0.0', port=4000, debug=False, threaded=True)