diff --git a/Dockerfile.sql-executor b/Dockerfile.sql-executor new file mode 100644 index 0000000..5c64cdb --- /dev/null +++ b/Dockerfile.sql-executor @@ -0,0 +1,19 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Install dependencies +RUN pip install --no-cache-dir flask PyMySQL psycopg2-binary + +# Copy the SQL executor script +COPY scripts/sql-query-executor.py /app/app.py + +# Expose port +EXPOSE 4000 + +# Health check +HEALTHCHECK --interval=10s --timeout=5s --retries=5 \ + CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:4000/health').read()" + +# Run the app +CMD ["python", "app.py"] diff --git a/compose.yaml b/compose.yaml index da268a4..957d4ca 100644 --- a/compose.yaml +++ b/compose.yaml @@ -134,6 +134,33 @@ services: retries: 5 start_period: 10s + sql-executor: + build: + context: . + dockerfile: Dockerfile.sql-executor + restart: always + ports: + - "4000:4000" + environment: + - FREESCOUT_DB_HOST=10.136.40.104 + - FREESCOUT_DB_PORT=3306 + - FREESCOUT_DB_USER=freescout + - FREESCOUT_DB_PASSWORD=5N6fv4wIgsI6BZV + - FREESCOUT_DB_NAME=freescout + - POSTGRES_HOST=postgres + - POSTGRES_PORT=5432 + - POSTGRES_USER=kb_user + - POSTGRES_PASSWORD=${POSTGRES_PASSWORD} + - POSTGRES_DB=n8n_kb + depends_on: + postgres: + condition: service_healthy + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:4000/health"] + interval: 10s + timeout: 5s + retries: 5 + volumes: n8n_data: traefik_data: diff --git a/n8n-workflows/workflow-a-http.json b/n8n-workflows/workflow-a-http.json new file mode 100644 index 0000000..d7a577f --- /dev/null +++ b/n8n-workflows/workflow-a-http.json @@ -0,0 +1,274 @@ +{ + "name": "Workflow A - Mail Processing (HTTP)", + "description": "Fetch unprocessed conversations from Freescout, analyze with AI, save suggestions", + "nodes": [ + { + "id": "uuid-trigger-1", + "name": "Trigger", + "type": "n8n-nodes-base.cron", + "typeVersion": 1, + "position": [250, 200], + "parameters": { + "cronExpression": "*/5 * * * *" + } + }, + { + "id": "uuid-get-conversations", + "name": "Get Unprocessed Conversations", + "type": "n8n-nodes-base.httpRequest", + "typeVersion": 4, + "position": [450, 200], + "parameters": { + "url": "http://host.docker.internal:4000/query/freescout", + "method": "POST", + "headers": { + "Content-Type": "application/json" + }, + "sendBody": true, + "specifyBody": "json", + "jsonBody": "{\"query\":\"SELECT c.id, c.number, c.subject, c.customer_email, c.status, GROUP_CONCAT(t.body SEPARATOR '\\\\n') as threads_text FROM conversations c LEFT JOIN threads t ON c.id = t.conversation_id WHERE c.status = 1 AND c.id NOT IN (SELECT DISTINCT conversation_id FROM conversation_custom_field WHERE custom_field_id = 8) GROUP BY c.id LIMIT 20\"}" + } + }, + { + "id": "uuid-split-results", + "name": "Split Results", + "type": "n8n-nodes-base.splitInBatches", + "typeVersion": 3, + "position": [650, 200], + "parameters": { + "batchSize": 1, + "options": {} + } + }, + { + "id": "uuid-extract-data", + "name": "Extract Conversation Data", + "type": "n8n-nodes-base.set", + "typeVersion": 3, + "position": [850, 200], + "parameters": { + "options": {}, + "assignments": { + "assignments": [ + { + "name": "ticket_id", + "value": "={{ $json.id }}", + "type": "number" + }, + { + "name": "ticket_number", + "value": "={{ $json.number }}", + "type": "number" + }, + { + "name": "subject", + "value": "={{ $json.subject }}", + "type": "string" + }, + { + "name": "problem_text", + "value": "={{ ($json.threads_text || 'No description provided').substring(0, 2000) }}", + "type": "string" + }, + { + "name": "customer_email", + "value": "={{ $json.customer_email }}", + "type": "string" + } + ] + } + } + }, + { + "id": "uuid-llm-analyze", + "name": "LiteLLM AI Analysis", + "type": "n8n-nodes-base.httpRequest", + "typeVersion": 4, + "position": [1050, 200], + "parameters": { + "url": "http://llm.eks-ai.apps.asgard.eks-lnx.fft-it.de/v1/chat/completions", + "method": "POST", + "headers": { + "Content-Type": "application/json" + }, + "sendBody": true, + "specifyBody": "json", + "jsonBody": "{\"model\":\"gpt-3.5-turbo\",\"messages\":[{\"role\":\"system\",\"content\":\"Du bist ein IT-Support-Assistent. Analysiere das folgende IT-Support-Ticket und gib eine strukturierte JSON-Antwort mit folgenden Feldern: kategorie (z.B. Hardware, Software, Netzwerk, Zugriff), lösung_typ (BARAMUNDI_JOB, AUTOMATISCHE_ANTWORT, oder ESKALATION), vertrauen (Dezimal zwischen 0.0 und 1.0 - wie sicher bist du bei dieser Lösung), baramundi_job (Name des Jobs falls BARAMUNDI_JOB), antwort_text (Die Antwort an den Nutzer), begründung (Kurze Erklärung deiner Analyse)\"},{\"role\":\"user\",\"content\":\"Ticket-Nummer: {{$json.ticket_number}}\\nBetreff: {{$json.subject}}\\nProblembeschreibung:\\n{{$json.problem_text}}\\n\\nBitte antworte NUR mit gültiger JSON in dieser Struktur: {\\\"kategorie\\\": \\\"...\\\", \\\"lösung_typ\\\": \\\"...\\\", \\\"vertrauen\\\": 0.75, \\\"baramundi_job\\\": \\\"...\\\", \\\"antwort_text\\\": \\\"...\\\", \\\"begründung\\\": \\\"...\\\"}\"}],\"temperature\":0.7,\"max_tokens\":1000}" + } + }, + { + "id": "uuid-parse-response", + "name": "Parse AI Response", + "type": "n8n-nodes-base.set", + "typeVersion": 3, + "position": [1250, 200], + "parameters": { + "options": {}, + "assignments": { + "assignments": [ + { + "name": "response_text", + "value": "={{ $json.choices[0].message.content }}", + "type": "string" + }, + { + "name": "ai_response", + "value": "={{ JSON.parse($json.response_text) }}", + "type": "object" + } + ] + } + } + }, + { + "id": "uuid-check-confidence", + "name": "Check Confidence >= 0.6", + "type": "n8n-nodes-base.if", + "typeVersion": 2, + "position": [1450, 200], + "parameters": { + "conditions": { + "options": { + "caseSensitive": true, + "extractValue": false + }, + "combinator": "and", + "conditions": [ + { + "id": "condition_1", + "leftValue": "={{ $json.ai_response.vertrauen }}", + "rightValue": 0.6, + "operator": { + "name": "filter.operator.gte", + "value": ">=" + } + } + ] + } + } + }, + { + "id": "uuid-save-to-db", + "name": "Save Suggestion to Freescout DB", + "type": "n8n-nodes-base.httpRequest", + "typeVersion": 4, + "position": [1650, 100], + "parameters": { + "url": "http://host.docker.internal:4000/query/freescout", + "method": "POST", + "headers": { + "Content-Type": "application/json" + }, + "sendBody": true, + "specifyBody": "json", + "jsonBody": "{\"query\":\"INSERT INTO conversation_custom_field (conversation_id, custom_field_id, value) VALUES ({{$json.ticket_id}}, 6, '{{$json.ai_response | json.stringify}}') ON DUPLICATE KEY UPDATE value = VALUES(value); INSERT INTO conversation_custom_field (conversation_id, custom_field_id, value) VALUES ({{$json.ticket_id}}, 7, 'PENDING') ON DUPLICATE KEY UPDATE value = VALUES(value); INSERT INTO conversation_custom_field (conversation_id, custom_field_id, value) VALUES ({{$json.ticket_id}}, 8, '1') ON DUPLICATE KEY UPDATE value = VALUES(value);\"}" + } + }, + { + "id": "uuid-no-action", + "name": "Skip - Low Confidence", + "type": "n8n-nodes-base.set", + "typeVersion": 3, + "position": [1650, 350], + "parameters": { + "options": {}, + "assignments": { + "assignments": [ + { + "name": "skipped", + "value": true, + "type": "boolean" + }, + { + "name": "reason", + "value": "Confidence {{$json.ai_response.vertrauen}} < 0.6", + "type": "string" + } + ] + } + } + } + ], + "connections": { + "Trigger": { + "main": [ + [ + { + "node": "Get Unprocessed Conversations", + "index": 0 + } + ] + ] + }, + "Get Unprocessed Conversations": { + "main": [ + [ + { + "node": "Split Results", + "index": 0 + } + ] + ] + }, + "Split Results": { + "main": [ + [ + { + "node": "Extract Conversation Data", + "index": 0 + } + ] + ] + }, + "Extract Conversation Data": { + "main": [ + [ + { + "node": "LiteLLM AI Analysis", + "index": 0 + } + ] + ] + }, + "LiteLLM AI Analysis": { + "main": [ + [ + { + "node": "Parse AI Response", + "index": 0 + } + ] + ] + }, + "Parse AI Response": { + "main": [ + [ + { + "node": "Check Confidence >= 0.6", + "index": 0 + } + ] + ] + }, + "Check Confidence >= 0.6": { + "main": [ + [ + { + "node": "Save Suggestion to Freescout DB", + "index": 0 + } + ], + [ + { + "node": "Skip - Low Confidence", + "index": 0 + } + ] + ] + } + }, + "active": false, + "settings": { + "errorHandler": "continueOnError" + } +} diff --git a/n8n-workflows/workflow-b-http.json b/n8n-workflows/workflow-b-http.json new file mode 100644 index 0000000..dbdb4c3 --- /dev/null +++ b/n8n-workflows/workflow-b-http.json @@ -0,0 +1,337 @@ +{ + "name": "Workflow B - Approval & Execution (HTTP)", + "description": "Poll for approved AI suggestions and execute them (Baramundi jobs or email replies)", + "nodes": [ + { + "id": "uuid-trigger-b", + "name": "Trigger", + "type": "n8n-nodes-base.cron", + "typeVersion": 1, + "position": [250, 200], + "parameters": { + "cronExpression": "*/2 * * * *" + } + }, + { + "id": "uuid-get-approved", + "name": "Get Approved Conversations", + "type": "n8n-nodes-base.httpRequest", + "typeVersion": 4, + "position": [450, 200], + "parameters": { + "url": "http://host.docker.internal:4000/query/freescout", + "method": "POST", + "headers": { + "Content-Type": "application/json" + }, + "sendBody": true, + "specifyBody": "json", + "jsonBody": "{\"query\":\"SELECT c.id, c.number, c.subject, c.customer_email, ccf.value as ai_suggestion FROM conversations c JOIN conversation_custom_field ccf ON c.id = ccf.conversation_id WHERE ccf.custom_field_id = 7 AND ccf.value = 'APPROVED' LIMIT 10\"}" + } + }, + { + "id": "uuid-split-approved", + "name": "Split Results", + "type": "n8n-nodes-base.splitInBatches", + "typeVersion": 3, + "position": [650, 200], + "parameters": { + "batchSize": 1, + "options": {} + } + }, + { + "id": "uuid-extract-approved", + "name": "Extract & Parse Suggestion", + "type": "n8n-nodes-base.set", + "typeVersion": 3, + "position": [850, 200], + "parameters": { + "options": {}, + "assignments": { + "assignments": [ + { + "name": "ticket_id", + "value": "={{ $json.id }}", + "type": "number" + }, + { + "name": "ticket_number", + "value": "={{ $json.number }}", + "type": "number" + }, + { + "name": "subject", + "value": "={{ $json.subject }}", + "type": "string" + }, + { + "name": "customer_email", + "value": "={{ $json.customer_email }}", + "type": "string" + }, + { + "name": "ai_suggestion_raw", + "value": "={{ typeof $json.ai_suggestion === 'string' ? $json.ai_suggestion : JSON.stringify($json.ai_suggestion) }}", + "type": "string" + }, + { + "name": "ai_suggestion", + "value": "={{ typeof $json.ai_suggestion === 'string' ? JSON.parse($json.ai_suggestion) : $json.ai_suggestion }}", + "type": "object" + }, + { + "name": "solution_type", + "value": "={{ $json.ai_suggestion.lösung_typ || 'UNKNOWN' }}", + "type": "string" + } + ] + } + } + }, + { + "id": "uuid-decide-solution", + "name": "Decide Solution Type", + "type": "n8n-nodes-base.switch", + "typeVersion": 1, + "position": [1050, 200], + "parameters": { + "options": [ + { + "condition": "equal", + "value1": "={{ $json.solution_type }}", + "value2": "BARAMUNDI_JOB" + }, + { + "condition": "equal", + "value1": "={{ $json.solution_type }}", + "value2": "AUTOMATISCHE_ANTWORT" + } + ] + } + }, + { + "id": "uuid-execute-baramundi", + "name": "Execute Baramundi Job", + "type": "n8n-nodes-base.httpRequest", + "typeVersion": 4, + "position": [1250, 50], + "parameters": { + "url": "https://baramundi-api.example.com/api/jobs", + "method": "POST", + "headers": { + "Content-Type": "application/json", + "Authorization": "Bearer YOUR_BARAMUNDI_TOKEN" + }, + "sendBody": true, + "specifyBody": "json", + "jsonBody": "{\"job_name\":\"{{$json.ai_suggestion.baramundi_job}}\",\"ticket_id\":{{$json.ticket_id}},\"target_system\":\"IT\",\"description\":\"{{$json.subject}}\"}" + } + }, + { + "id": "uuid-send-email", + "name": "Send Email Reply", + "type": "n8n-nodes-base.httpRequest", + "typeVersion": 4, + "position": [1250, 150], + "parameters": { + "url": "http://host.docker.internal:4000/query/freescout", + "method": "POST", + "headers": { + "Content-Type": "application/json" + }, + "sendBody": true, + "specifyBody": "json", + "jsonBody": "{\"query\":\"INSERT INTO threads (conversation_id, customer_id, user_id, type, status, body, created_at, updated_at) VALUES ({{$json.ticket_id}}, (SELECT customer_id FROM conversations WHERE id = {{$json.ticket_id}} LIMIT 1), NULL, 'customer', 'active', '{{$json.ai_suggestion.antwort_text | replace(\\\"'\\\", \\\"''\\\")}}', NOW(), NOW())\"}" + } + }, + { + "id": "uuid-mark-escalation", + "name": "Mark for Manual Review", + "type": "n8n-nodes-base.set", + "typeVersion": 3, + "position": [1250, 270], + "parameters": { + "options": {}, + "assignments": { + "assignments": [ + { + "name": "action", + "value": "Manual escalation required", + "type": "string" + } + ] + } + } + }, + { + "id": "uuid-update-status", + "name": "Update Status to EXECUTED", + "type": "n8n-nodes-base.httpRequest", + "typeVersion": 4, + "position": [1450, 200], + "parameters": { + "url": "http://host.docker.internal:4000/query/freescout", + "method": "POST", + "headers": { + "Content-Type": "application/json" + }, + "sendBody": true, + "specifyBody": "json", + "jsonBody": "{\"query\":\"UPDATE conversation_custom_field SET value = 'EXECUTED' WHERE conversation_id = {{$json.ticket_id}} AND custom_field_id = 7\"}" + } + }, + { + "id": "uuid-trigger-workflow-c", + "name": "Trigger Workflow C (KB Update)", + "type": "n8n-nodes-base.httpRequest", + "typeVersion": 4, + "position": [1650, 200], + "parameters": { + "url": "https://n8n.fft-it.de/webhook/workflow-c", + "method": "POST", + "headers": { + "Content-Type": "application/json" + }, + "sendBody": true, + "specifyBody": "json", + "jsonBody": "{\"ticket_id\":{{$json.ticket_id}},\"subject\":\"{{$json.subject}}\",\"problem\":\"{{$json.subject}}\",\"solution\":\"{{$json.ai_suggestion.antwort_text}}\",\"category\":\"{{$json.ai_suggestion.kategorie}}\",\"solution_type\":\"{{$json.solution_type}}\"}" + } + }, + { + "id": "uuid-log-audit", + "name": "Log to PostgreSQL", + "type": "n8n-nodes-base.httpRequest", + "typeVersion": 4, + "position": [1850, 200], + "parameters": { + "url": "http://host.docker.internal:4000/query/audit", + "method": "POST", + "headers": { + "Content-Type": "application/json" + }, + "sendBody": true, + "specifyBody": "json", + "jsonBody": "{\"query\":\"INSERT INTO workflow_executions (workflow_name, ticket_id, status, execution_time_ms, created_at) VALUES ('Workflow B - Approval Execution', {{$json.ticket_id}}, 'SUCCESS', 0, NOW())\"}" + } + } + ], + "connections": { + "Trigger": { + "main": [ + [ + { + "node": "Get Approved Conversations", + "index": 0 + } + ] + ] + }, + "Get Approved Conversations": { + "main": [ + [ + { + "node": "Split Results", + "index": 0 + } + ] + ] + }, + "Split Results": { + "main": [ + [ + { + "node": "Extract & Parse Suggestion", + "index": 0 + } + ] + ] + }, + "Extract & Parse Suggestion": { + "main": [ + [ + { + "node": "Decide Solution Type", + "index": 0 + } + ] + ] + }, + "Decide Solution Type": { + "main": [ + [ + { + "node": "Execute Baramundi Job", + "index": 0 + } + ], + [ + { + "node": "Send Email Reply", + "index": 0 + } + ], + [ + { + "node": "Mark for Manual Review", + "index": 0 + } + ] + ] + }, + "Execute Baramundi Job": { + "main": [ + [ + { + "node": "Update Status to EXECUTED", + "index": 0 + } + ] + ] + }, + "Send Email Reply": { + "main": [ + [ + { + "node": "Update Status to EXECUTED", + "index": 0 + } + ] + ] + }, + "Mark for Manual Review": { + "main": [ + [ + { + "node": "Update Status to EXECUTED", + "index": 0 + } + ] + ] + }, + "Update Status to EXECUTED": { + "main": [ + [ + { + "node": "Trigger Workflow C (KB Update)", + "index": 0 + } + ] + ] + }, + "Trigger Workflow C (KB Update)": { + "main": [ + [ + { + "node": "Log to PostgreSQL", + "index": 0 + } + ] + ] + } + }, + "active": false, + "settings": { + "errorHandler": "continueOnError" + } +} diff --git a/scripts/sql-query-executor.py b/scripts/sql-query-executor.py new file mode 100644 index 0000000..5dc997f --- /dev/null +++ b/scripts/sql-query-executor.py @@ -0,0 +1,197 @@ +#!/usr/bin/env python3 +""" +Simple HTTP Server for executing SQL queries +Used by n8n workflows to avoid needing specialized database nodes +""" + +from flask import Flask, request, jsonify +import pymysql +import psycopg2 +import logging +import os + +app = Flask(__name__) +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Database configuration +FREESCOUT_DB_CONFIG = { + 'host': os.getenv('FREESCOUT_DB_HOST', '10.136.40.104'), + 'port': int(os.getenv('FREESCOUT_DB_PORT', 3306)), + 'user': os.getenv('FREESCOUT_DB_USER', 'freescout'), + 'password': os.getenv('FREESCOUT_DB_PASSWORD', '5N6fv4wIgsI6BZV'), + 'database': os.getenv('FREESCOUT_DB_NAME', 'freescout'), + 'charset': 'utf8mb4', + 'autocommit': True, +} + +POSTGRES_AUDIT_CONFIG = { + 'host': os.getenv('POSTGRES_HOST', 'postgres'), + 'port': int(os.getenv('POSTGRES_PORT', 5432)), + 'user': os.getenv('POSTGRES_USER', 'kb_user'), + 'password': os.getenv('POSTGRES_PASSWORD', 'change_me_securely'), + 'database': os.getenv('POSTGRES_DB', 'n8n_kb'), +} + + +def execute_query(db_type, query): + """ + Execute a SQL query and return results + db_type: 'freescout' or 'audit' + """ + connection = None + cursor = None + + try: + if db_type == 'freescout': + connection = pymysql.connect(**FREESCOUT_DB_CONFIG) + cursor = connection.cursor(pymysql.cursors.DictCursor) + elif db_type == 'audit': + connection = psycopg2.connect( + host=POSTGRES_AUDIT_CONFIG['host'], + port=POSTGRES_AUDIT_CONFIG['port'], + user=POSTGRES_AUDIT_CONFIG['user'], + password=POSTGRES_AUDIT_CONFIG['password'], + database=POSTGRES_AUDIT_CONFIG['database'] + ) + cursor = connection.cursor() + else: + return None, "Invalid database type" + + logger.info(f"Executing {db_type} query: {query[:100]}...") + cursor.execute(query) + + if query.strip().upper().startswith('SELECT'): + # Fetch results for SELECT queries + if db_type == 'freescout': + results = cursor.fetchall() + else: + # PostgreSQL: convert to list of dicts + columns = [desc[0] for desc in cursor.description] + results = [dict(zip(columns, row)) for row in cursor.fetchall()] + return results, None + else: + # For INSERT/UPDATE/DELETE + connection.commit() + return {'affected_rows': cursor.rowcount}, None + + except pymysql.Error as e: + logger.error(f"Database error: {e}") + return None, str(e) + except Exception as e: + logger.error(f"Error: {e}") + return None, str(e) + finally: + if cursor: + cursor.close() + if connection: + try: + connection.close() + except: + pass + + +@app.route('/health', methods=['GET']) +def health(): + """Health check endpoint""" + return jsonify({'status': 'ok', 'service': 'sql-executor'}), 200 + + +@app.route('/query', methods=['POST']) +def query(): + """ + Execute a SQL query + + Request body: + { + "db_type": "freescout" or "audit", + "query": "SELECT * FROM conversations LIMIT 10" + } + """ + try: + data = request.get_json() + + if not data or 'query' not in data: + return jsonify({'error': 'Missing query parameter'}), 400 + + db_type = data.get('db_type', 'freescout') + query_str = data.get('query') + + results, error = execute_query(db_type, query_str) + + if error: + logger.error(f"Query failed: {error}") + return jsonify({'error': error, 'success': False}), 500 + + return jsonify({ + 'success': True, + 'data': results, + 'count': len(results) if isinstance(results, list) else 1 + }), 200 + + except Exception as e: + logger.error(f"Error: {e}") + return jsonify({'error': str(e), 'success': False}), 500 + + +@app.route('/query/freescout', methods=['POST']) +def query_freescout(): + """Execute query on Freescout database""" + try: + data = request.get_json() + if not data or 'query' not in data: + return jsonify({'error': 'Missing query parameter', 'success': False}), 400 + + query_str = data.get('query') + results, error = execute_query('freescout', query_str) + + if error: + logger.error(f"Query failed: {error}") + return jsonify({'error': error, 'success': False}), 500 + + return jsonify({ + 'success': True, + 'data': results, + 'count': len(results) if isinstance(results, list) else 1 + }), 200 + except Exception as e: + logger.error(f"Error: {e}") + return jsonify({'error': str(e), 'success': False}), 500 + + +@app.route('/query/audit', methods=['POST']) +def query_audit(): + """Execute query on Audit (PostgreSQL) database""" + try: + data = request.get_json() + if not data or 'query' not in data: + return jsonify({'error': 'Missing query parameter', 'success': False}), 400 + + query_str = data.get('query') + results, error = execute_query('audit', query_str) + + if error: + logger.error(f"Query failed: {error}") + return jsonify({'error': error, 'success': False}), 500 + + return jsonify({ + 'success': True, + 'data': results, + 'count': len(results) if isinstance(results, list) else 1 + }), 200 + except Exception as e: + logger.error(f"Error: {e}") + return jsonify({'error': str(e), 'success': False}), 500 + + +if __name__ == '__main__': + # Test connection on startup + logger.info("Testing Freescout database connection...") + results, error = execute_query('freescout', 'SELECT 1') + if error: + logger.warning(f"Freescout DB connection test failed: {error} (will retry during runtime)") + else: + logger.info(f"✓ Connected to Freescout DB") + + logger.info("Starting SQL Query Executor on 0.0.0.0:4000") + app.run(host='0.0.0.0', port=4000, debug=False, threaded=True)