- Replace mysql-connector-python with PyMySQL driver for better MariaDB compatibility - PyMySQL handles utf8mb4_0900_ai_ci collation properly without errors - Update Dockerfile.sql-executor to install PyMySQL and psycopg2-binary - Refactor sql-query-executor.py to use PyMySQL API (pymysql.connect, DictCursor) - Verified sql-executor service with SELECT, INSERT, UPDATE operations on Freescout DB - Add n8n workflow definitions: workflow-a-http.json and workflow-b-http.json * Workflow A: Polls unprocessed conversations, analyzes with LiteLLM, saves suggestions * Workflow B: Polls approved suggestions, executes Baramundi jobs or email replies - Update compose.yaml with sql-executor service configuration and dependencies All SQL operations now execute successfully against MariaDB 11.3.2
198 lines
6.1 KiB
Python
198 lines
6.1 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Simple HTTP Server for executing SQL queries
|
|
Used by n8n workflows to avoid needing specialized database nodes
|
|
"""
|
|
|
|
from flask import Flask, request, jsonify
|
|
import pymysql
|
|
import psycopg2
|
|
import logging
|
|
import os
|
|
|
|
app = Flask(__name__)
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Database configuration
|
|
FREESCOUT_DB_CONFIG = {
|
|
'host': os.getenv('FREESCOUT_DB_HOST', '10.136.40.104'),
|
|
'port': int(os.getenv('FREESCOUT_DB_PORT', 3306)),
|
|
'user': os.getenv('FREESCOUT_DB_USER', 'freescout'),
|
|
'password': os.getenv('FREESCOUT_DB_PASSWORD', '5N6fv4wIgsI6BZV'),
|
|
'database': os.getenv('FREESCOUT_DB_NAME', 'freescout'),
|
|
'charset': 'utf8mb4',
|
|
'autocommit': True,
|
|
}
|
|
|
|
POSTGRES_AUDIT_CONFIG = {
|
|
'host': os.getenv('POSTGRES_HOST', 'postgres'),
|
|
'port': int(os.getenv('POSTGRES_PORT', 5432)),
|
|
'user': os.getenv('POSTGRES_USER', 'kb_user'),
|
|
'password': os.getenv('POSTGRES_PASSWORD', 'change_me_securely'),
|
|
'database': os.getenv('POSTGRES_DB', 'n8n_kb'),
|
|
}
|
|
|
|
|
|
def execute_query(db_type, query):
|
|
"""
|
|
Execute a SQL query and return results
|
|
db_type: 'freescout' or 'audit'
|
|
"""
|
|
connection = None
|
|
cursor = None
|
|
|
|
try:
|
|
if db_type == 'freescout':
|
|
connection = pymysql.connect(**FREESCOUT_DB_CONFIG)
|
|
cursor = connection.cursor(pymysql.cursors.DictCursor)
|
|
elif db_type == 'audit':
|
|
connection = psycopg2.connect(
|
|
host=POSTGRES_AUDIT_CONFIG['host'],
|
|
port=POSTGRES_AUDIT_CONFIG['port'],
|
|
user=POSTGRES_AUDIT_CONFIG['user'],
|
|
password=POSTGRES_AUDIT_CONFIG['password'],
|
|
database=POSTGRES_AUDIT_CONFIG['database']
|
|
)
|
|
cursor = connection.cursor()
|
|
else:
|
|
return None, "Invalid database type"
|
|
|
|
logger.info(f"Executing {db_type} query: {query[:100]}...")
|
|
cursor.execute(query)
|
|
|
|
if query.strip().upper().startswith('SELECT'):
|
|
# Fetch results for SELECT queries
|
|
if db_type == 'freescout':
|
|
results = cursor.fetchall()
|
|
else:
|
|
# PostgreSQL: convert to list of dicts
|
|
columns = [desc[0] for desc in cursor.description]
|
|
results = [dict(zip(columns, row)) for row in cursor.fetchall()]
|
|
return results, None
|
|
else:
|
|
# For INSERT/UPDATE/DELETE
|
|
connection.commit()
|
|
return {'affected_rows': cursor.rowcount}, None
|
|
|
|
except pymysql.Error as e:
|
|
logger.error(f"Database error: {e}")
|
|
return None, str(e)
|
|
except Exception as e:
|
|
logger.error(f"Error: {e}")
|
|
return None, str(e)
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
if connection:
|
|
try:
|
|
connection.close()
|
|
except:
|
|
pass
|
|
|
|
|
|
@app.route('/health', methods=['GET'])
|
|
def health():
|
|
"""Health check endpoint"""
|
|
return jsonify({'status': 'ok', 'service': 'sql-executor'}), 200
|
|
|
|
|
|
@app.route('/query', methods=['POST'])
|
|
def query():
|
|
"""
|
|
Execute a SQL query
|
|
|
|
Request body:
|
|
{
|
|
"db_type": "freescout" or "audit",
|
|
"query": "SELECT * FROM conversations LIMIT 10"
|
|
}
|
|
"""
|
|
try:
|
|
data = request.get_json()
|
|
|
|
if not data or 'query' not in data:
|
|
return jsonify({'error': 'Missing query parameter'}), 400
|
|
|
|
db_type = data.get('db_type', 'freescout')
|
|
query_str = data.get('query')
|
|
|
|
results, error = execute_query(db_type, query_str)
|
|
|
|
if error:
|
|
logger.error(f"Query failed: {error}")
|
|
return jsonify({'error': error, 'success': False}), 500
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'data': results,
|
|
'count': len(results) if isinstance(results, list) else 1
|
|
}), 200
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error: {e}")
|
|
return jsonify({'error': str(e), 'success': False}), 500
|
|
|
|
|
|
@app.route('/query/freescout', methods=['POST'])
|
|
def query_freescout():
|
|
"""Execute query on Freescout database"""
|
|
try:
|
|
data = request.get_json()
|
|
if not data or 'query' not in data:
|
|
return jsonify({'error': 'Missing query parameter', 'success': False}), 400
|
|
|
|
query_str = data.get('query')
|
|
results, error = execute_query('freescout', query_str)
|
|
|
|
if error:
|
|
logger.error(f"Query failed: {error}")
|
|
return jsonify({'error': error, 'success': False}), 500
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'data': results,
|
|
'count': len(results) if isinstance(results, list) else 1
|
|
}), 200
|
|
except Exception as e:
|
|
logger.error(f"Error: {e}")
|
|
return jsonify({'error': str(e), 'success': False}), 500
|
|
|
|
|
|
@app.route('/query/audit', methods=['POST'])
|
|
def query_audit():
|
|
"""Execute query on Audit (PostgreSQL) database"""
|
|
try:
|
|
data = request.get_json()
|
|
if not data or 'query' not in data:
|
|
return jsonify({'error': 'Missing query parameter', 'success': False}), 400
|
|
|
|
query_str = data.get('query')
|
|
results, error = execute_query('audit', query_str)
|
|
|
|
if error:
|
|
logger.error(f"Query failed: {error}")
|
|
return jsonify({'error': error, 'success': False}), 500
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'data': results,
|
|
'count': len(results) if isinstance(results, list) else 1
|
|
}), 200
|
|
except Exception as e:
|
|
logger.error(f"Error: {e}")
|
|
return jsonify({'error': str(e), 'success': False}), 500
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Test connection on startup
|
|
logger.info("Testing Freescout database connection...")
|
|
results, error = execute_query('freescout', 'SELECT 1')
|
|
if error:
|
|
logger.warning(f"Freescout DB connection test failed: {error} (will retry during runtime)")
|
|
else:
|
|
logger.info(f"✓ Connected to Freescout DB")
|
|
|
|
logger.info("Starting SQL Query Executor on 0.0.0.0:4000")
|
|
app.run(host='0.0.0.0', port=4000, debug=False, threaded=True)
|