diff --git a/.github/.keep b/.github/.keep new file mode 100644 index 000000000..e69de29bb diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..2a1b221eb --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +venv1 +__pycache__ \ No newline at end of file diff --git a/AppscriptCode/GoogleScript.gs b/AppscriptCode/GoogleScript.gs new file mode 100644 index 000000000..ca8932e8e --- /dev/null +++ b/AppscriptCode/GoogleScript.gs @@ -0,0 +1,75 @@ +function onEditTrigger(e) { + // Check if the edited range contains data + if (!e || !e.range) return; + + // Get the sheet and range information + var sheet = e.range.getSheet(); + var range = e.range; + var sheetName = sheet.getName(); + var rowStart = range.getRow(); + var colStart = range.getColumn(); + var numRows = range.getNumRows(); + var numCols = range.getNumColumns(); + + var modifiedCells = []; + + // Iterate over all modified cells + for (var i = 0; i < numRows; i++) { + for (var j = 0; j < numCols; j++) { + var cellRow = rowStart + i; + var cellCol = colStart + j; + var editedValue = sheet.getRange(cellRow, cellCol).getValue(); + var oldValue = e.oldValue; // Get the old value before the edit + + // Determine the type of modification (created, updated, deleted) + var action = ''; + if (!oldValue && editedValue) { + action = 'created'; + } else if (oldValue && !editedValue) { + action = 'deleted'; + } else if (oldValue !== editedValue) { + action = 'updated'; + } + + if (action) { + // Collect information about the modified cell + modifiedCells.push({ + sheetName: sheetName, + row: cellRow, + column: cellCol, + oldValue: oldValue || null, + newValue: editedValue || null, + action: action + }); + } + } + } + + // If any cells were modified, send the data + if (modifiedCells.length > 0) { + Logger.log(modifiedCells); + sendPostRequest(modifiedCells); + } +} + +function sendPostRequest(modifiedCells) { + var url = 'https://example.com/your-backend-endpoint'; // Replace with your backend endpoint + + var data = { + timestamp: new Date().toISOString(), + modifiedCells: modifiedCells + }; + + var options = { + 'method': 'post', + 'contentType': 'application/json', + 'payload': JSON.stringify(data) + }; + + try { + var response = UrlFetchApp.fetch(url, options); + Logger.log('POST request sent successfully: ' + response.getContentText()); + } catch (error) { + Logger.log('Error sending POST request: ' + error.toString()); + } +} diff --git a/AppscriptCode/sheetToDb.gs b/AppscriptCode/sheetToDb.gs new file mode 100644 index 000000000..6c48c32a6 --- /dev/null +++ b/AppscriptCode/sheetToDb.gs @@ -0,0 +1,101 @@ +function onEditTrigger(e) { + Logger.log("Triggered onEdit"); + + // Check if the edited range contains data + if (!e || !e.range) { + Logger.log("No range or event detected"); + return; + } + + // Get the sheet and range information + var sheet = e.range.getSheet(); + var range = e.range; + var sheetName = sheet.getName(); + var rowStart = range.getRow(); + var colStart = range.getColumn(); + var numRows = range.getNumRows(); + var numCols = range.getNumColumns(); + + Logger.log("Sheet Name: " + sheetName); + Logger.log("Row Start: " + rowStart + ", Column Start: " + colStart); + Logger.log("Number of Rows: " + numRows + ", Number of Columns: " + numCols); + + var modifiedCells = []; + + // Iterate over all modified cells + for (var i = 0; i < numRows; i++) { + for (var j = 0; j < numCols; j++) { + var cellRow = rowStart + i; + var cellCol = colStart + j; + var editedValue = sheet.getRange(cellRow, cellCol).getValue(); + var oldValue = e.oldValue; // Get the old value before the edit + + Logger.log("Processing cell: (" + cellRow + ", " + cellCol + ")"); + Logger.log("Old Value: " + oldValue + ", New Value: " + editedValue); + + // Determine the type of modification (inserted, updated, deleted) + var action = ''; + if (!oldValue && editedValue) { + action = 'insert'; + Logger.log("Action: insert"); + } else if (oldValue && !editedValue) { + action = 'delete'; // Set action to delete if the cell is cleared + Logger.log("Action: delete"); + } else if (oldValue !== editedValue) { + action = 'update'; + Logger.log("Action: update"); + } + + if (action) { + // Collect information about the modified cell + var cellModification = { + row: cellRow, + column: cellCol, + value: action === 'delete' ? null : editedValue, // If delete, set value to null + operation: action + }; + + Logger.log("Cell Modification: " + JSON.stringify(cellModification)); + + modifiedCells.push(cellModification); + } + } + } + + // Build the final JSON object + if (modifiedCells.length > 0) { + var jsonPayload = { + sheet_name: sheetName, + cells: modifiedCells + }; + + Logger.log("JSON Payload: " + JSON.stringify(jsonPayload)); // Log the JSON object + + sendPostRequest(jsonPayload); // Function to send the JSON payload to the server + } else { + Logger.log("No modifications detected"); + } +} + +// Function to send the JSON data via POST request to Kafka endpoint +function sendPostRequest(payload) { + Logger.log("Publishing to Kafka"); + + var url = "https://unique-powerful-husky.ngrok-free.app/kafka-publish-endpoint"; // Kafka publishing endpoint + + var options = { + method: "POST", + contentType: "application/json", + payload: JSON.stringify(payload) + }; + + try { + var response = UrlFetchApp.fetch(url, options); + Logger.log("Kafka Publish Response: " + response.getContentText()); + } catch (error) { + Logger.log("Error in Kafka publish request: " + error.toString()); + } +} + + + diff --git a/README.md b/README.md index f0aad2ebc..f22ce1961 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,15 @@ +[![Review Assignment Due Date](https://classroom.github.com/assets/deadline-readme-button-22041afd0340ce965d47ae6ef1cefeee28c7c493a6346c4f15d667ab976d596c.svg)](https://classroom.github.com/a/AHFn7Vbn) # Superjoin Hiring Assignment +### Planned Architecture + +[![](https://mermaid.ink/img/pako:eNqVVV1r2zAU_StCUEhpQt79MGiTrrAta1Zn7MUQFPnGFrElV5I3Qsl_n2QpjmyUkfkl4X6ce-7V0dUHpiIHnGBaEaWWjBSS1BlH5uss6EWIooK0BNAKfTiP_WbKmpZEE5SgL-nr94vrQfDnnOmNZEUBcnIfeBTwfC2UfoP3FpQ--04ZD2s-Nk1KJWt0WPAhFzZzAvcD2wuMTE27q5gqfzY50bARX8n-QAYcqOCqraFzuCh1hUcXMuDgwVegFCkgBhtzlYTnFSwE31eM6mvVfhGmpclOQf4GOZj18skkc6CaCR7gzuujeq-2bddEaG_MoAqDFXGpI6fbojvUbXeEoZN6iluTK6p2VM4eXmygEkj-WYp64Bk1tzqmP74tn0ZdxeXjSHcZgzp70LRcmGEWV09s7Rr_31o-Td1e7u7O3wzkr4Y5Yqtb5IW7EHXdckbJZYaDmzSbfQpkniB3ZWyZHj9As-BOjBHcC4wFdWEJWjulItcg0sJ7NqJhFE288H1bzjXmtHCK9il7IZHPmvdSDuie5Yu8fv_JeaT1GHFXqy816mDMewQYI98jvfXinvuGgjYC9lYxO6LgxgbOEk88dWdAxjJZv6YbNLitfvDnnGgLj_TAxZ8K8gI85NXaA933BM6yDjiMNoOnMUi_mct4Zs88bwTjRt123CklFdmxiuljlHYS3V-RoOgyi8Rd2WyRyOiaw1Ncg6wJy81r2K2ODOsSashwYv7mRB4ynPGTiSOtFqmphhMtW5hiR8w_njjZk0oZK5gLLeTKP6_2Z4qlaIvSR5z-AgPBZ3Q?type=png)](https://mermaid.live/edit#pako:eNqVVV1r2zAU_StCUEhpQt79MGiTrrAta1Zn7MUQFPnGFrElV5I3Qsl_n2QpjmyUkfkl4X6ce-7V0dUHpiIHnGBaEaWWjBSS1BlH5uss6EWIooK0BNAKfTiP_WbKmpZEE5SgL-nr94vrQfDnnOmNZEUBcnIfeBTwfC2UfoP3FpQ--04ZD2s-Nk1KJWt0WPAhFzZzAvcD2wuMTE27q5gqfzY50bARX8n-QAYcqOCqraFzuCh1hUcXMuDgwVegFCkgBhtzlYTnFSwE31eM6mvVfhGmpclOQf4GOZj18skkc6CaCR7gzuujeq-2bddEaG_MoAqDFXGpI6fbojvUbXeEoZN6iluTK6p2VM4eXmygEkj-WYp64Bk1tzqmP74tn0ZdxeXjSHcZgzp70LRcmGEWV09s7Rr_31o-Td1e7u7O3wzkr4Y5Yqtb5IW7EHXdckbJZYaDmzSbfQpkniB3ZWyZHj9As-BOjBHcC4wFdWEJWjulItcg0sJ7NqJhFE288H1bzjXmtHCK9il7IZHPmvdSDuie5Yu8fv_JeaT1GHFXqy816mDMewQYI98jvfXinvuGgjYC9lYxO6LgxgbOEk88dWdAxjJZv6YbNLitfvDnnGgLj_TAxZ8K8gI85NXaA933BM6yDjiMNoOnMUi_mct4Zs88bwTjRt123CklFdmxiuljlHYS3V-RoOgyi8Rd2WyRyOiaw1Ncg6wJy81r2K2ODOsSashwYv7mRB4ynPGTiSOtFqmphhMtW5hiR8w_njjZk0oZK5gLLeTKP6_2Z4qlaIvSR5z-AgPBZ3Q) + + +### Video + +https://drive.google.com/file/d/18NcHJFeRSfQ9IOGx4iLDHjqH73XrxyQF/view?usp=sharing + ### Welcome to Superjoin's hiring assignment! πŸš€ ### Objective @@ -25,7 +35,7 @@ Many businesses use Google Sheets for collaborative data management and database - Ensure the solution can handle large datasets and high-frequency updates without performance degradation. - Optimize for scalability and efficiency. -## Submission ⏰ +### Submission ⏰ The timeline for this submission is: **Next 2 days** Some things you might want to take care of: @@ -44,11 +54,11 @@ Once you're done, make sure you **record a video** showing your project working. We have a checklist at the bottom of this README file, which you should update as your progress with your assignment. It will help us evaluate your project. -- [ ] My code's working just fine! πŸ₯³ -- [ ] I have recorded a video showing it working and embedded it in the README ▢️ -- [ ] I have tested all the normal working cases 😎 -- [ ] I have even solved some edge cases (brownie points) πŸ’ͺ -- [ ] I added my very planned-out approach to the problem at the end of this README πŸ“œ +- [] My code's working just fine! πŸ₯³ +- [] I have recorded a video showing it working and embedded it in the README ▢️ +- [] I have tested all the normal working cases 😎 +- [] I have even solved some edge cases (brownie points) πŸ’ͺ +- [] I added my very planned-out approach to the problem at the end of this README πŸ“œ ## Got Questions❓ Feel free to check the discussions tab, you might get some help there. Check out that tab before reaching out to us. Also, did you know, the internet is a great place to explore? πŸ˜› @@ -58,4 +68,51 @@ We're available at techhiring@superjoin.ai for all queries. All the best ✨. ## Developer's Section -*Add your video here, and your approach to the problem (optional). Leave some comments for us here if you want, we will be reading this :)* +# Google Sheets & MySQL Sync with Kafka Integration + +This project implements a real-time synchronization between Google Sheets and a MySQL database, leveraging Kafka for scalability and conflict resolution. The Flask server is hosted via Waitress and exposed to the internet using NGROK. Google Apps Script handles the detection of changes on Google Sheets, triggering the appropriate operations (Create, Read, Update, Delete) in real time. + +## Architecture Overview + +- **Flask Server**: Built using Python's Flask framework, served via Waitress. +- **NGROK**: Used to expose the Flask server to the internet for handling requests from Google Sheets. +- **Kafka**: Handles message passing for scalable synchronization between the Google Sheets and MySQL database. It also resolves conflicts when concurrent updates occur. +- **Google Apps Script**: Detects changes in Google Sheets and triggers CRUD operations accordingly. +- **MySQL Database**: Stores the data synced from the Google Sheets. + +## Key Features + +- **Real-Time Synchronization**: The system ensures that any changes made in Google Sheets (adding rows, updating values, etc.) are reflected in the MySQL database instantly. +- **Scalability**: Kafka integration allows the system to scale horizontally, ensuring that multiple clients can interact with the database without performance degradation. +- **Conflict Resolution**: Internal conflict-handling mechanisms resolve issues arising from concurrent edits. +- **CRUD Operations**: Supports Create, Read, Update, and Delete operations seamlessly between Google Sheets and MySQL. + +## Challenges Faced + +One major roadblock encountered was configuring Kafka, as it was my first time working with it. The rest of the functionalities were more straightforward since I had prior experience implementing similar operations using Google Apps Script and Python during my internship at EKCS. + +## Personal Reflection + +Building this project was really fun and I learned a lot about Kafka in the process. + +--- + +## How to Use + +1. **Setup MySQL Database**: Ensure you have a running MySQL instance. +2. **Flask Server**: Start the Flask server using Waitress and expose it via NGROK. +3. **Google Sheets**: Add the appropriate Apps Script to trigger updates on changes in Google Sheets. +4. **Kafka**: Set up a Kafka broker and configure the consumer and producer in the Flask app. + +## Checklist +- [YES] My code's working just fine! πŸ₯³ +- [YES] I have recorded a video showing it working and embedded it in the README ▢️ +- [YES] I have tested all the normal working cases 😎 +- [YES] I have even solved some edge cases (brownie points) πŸ’ͺ +- [YES] I added my very planned-out approach to the problem at the end of this README πŸ“œ + +## Author + +Aavish Gilbert J +Email: aavish.gilbert@gmail.com +PES University, PES1UG21CS012 diff --git a/app.py b/app.py new file mode 100644 index 000000000..8f50d4f6a --- /dev/null +++ b/app.py @@ -0,0 +1,544 @@ + +# from flask import Flask, request, jsonify +# import logging +# import threading +# import mysql.connector +# import psycopg2 +# from twoWayKafka import KafkaHandler +# from dbCode.mysqlScript import sync_sheet_from_json +# from updateSheet import logUpdate + +# logging.basicConfig(level=logging.INFO) + + +# # --- Database Configurations --- + +# # MySQL Database Configuration +# mysql_db = mysql.connector.connect( +# host="localhost", # MySQL server host +# user="root", # MySQL username +# password="Aavish@02", # MySQL password +# database="superjoin" # Database name +# ) + + + + +# # --- Kafka Consumer Thread Function --- + +# def run_consumer(kafka_handler): +# def process_message(msg): +# # Process the message +# sync_sheet_from_json(mysql_db, msg) + +# # Prepare response +# correlation_id = msg.get('correlation_id') +# if correlation_id: +# response = { +# 'status': 'success', +# 'correlation_id': correlation_id, +# 'result': 'Data synchronized' +# } +# # Send response back to the response_topic +# kafka_handler.producer.send(kafka_handler.response_topic, response) +# kafka_handler.producer.flush() + +# kafka_handler.consume_messages(process_message) + + + +# # --- Flask Application --- + +# app = Flask(__name__) + +# # Create a KafkaHandler instance with response_topic for two-way communication +# kafka_handler = KafkaHandler( +# bootstrap_servers='localhost:9092', +# topic='my_topic', +# group_id='my_group', +# response_topic='response_topic' # Replace with your actual response topic +# ) + +# # Start Kafka consumer in a background thread +# consumer_thread = threading.Thread(target=run_consumer, args=(kafka_handler,), daemon=True) +# consumer_thread.start() + +# # db_trigger = threading.Thread(target=logUpdate, daemon=True) +# # db_trigger.start() + + +# # Flask route to publish message to Kafka and wait for a response +# @app.route('/kafka-publish-endpoint', methods=['POST']) +# def kafka_publish(): +# """ +# Endpoint to publish a message to Kafka and wait for a response. +# Expects a JSON payload in the request. +# """ +# data = request.json +# if not data: +# return jsonify({"status": "error", "message": "Invalid payload"}), 400 + +# # Send a request and wait for a response +# response = kafka_handler.send_request(data, timeout=10) +# return jsonify(response) + +# # Start Flask server +# if __name__ == '__main__': +# app.run(host='0.0.0.0', port=5000) + + + +#========================================================================================================================== + + +# from flask import Flask, request, jsonify +# import logging +# import threading +# import mysql.connector +# import json +# import time +# from twoWayKafka import KafkaHandler +# from dbCode.mysqlScript import sync_sheet_from_json +# # Uncomment the following line if you need to use logUpdate +# # from updateSheet import logUpdate + +# logging.basicConfig(level=logging.INFO) + +# # --- Database Configurations --- + +# # MySQL Database Configuration +# db_config = { +# 'host': 'localhost', # MySQL server host +# 'user': 'root', # MySQL username +# 'password': 'Aavish@02', # MySQL password +# 'database': 'superjoin' # Database name +# } + +# # --- Kafka Configuration --- + +# kafka_config = { +# 'bootstrap_servers': 'localhost:9092', +# 'topic': 'my_topic', +# 'group_id': 'my_group', +# 'response_topic': 'response_topic' # Replace with your actual response topic +# } + +# # --- Flask Application --- + +# app = Flask(__name__) + +# # Create a KafkaHandler instance with response_topic for two-way communication +# kafka_handler = KafkaHandler( +# bootstrap_servers=kafka_config['bootstrap_servers'], +# topic=kafka_config['topic'], +# group_id=kafka_config['group_id'], +# response_topic=kafka_config['response_topic'] +# ) + +# # --- Function to Read and Write Last Processed ID --- + +# def read_last_id(): +# try: +# with open('last_id.txt', 'r') as f: +# return int(f.read()) +# except Exception: +# return 0 + +# def write_last_id(last_id): +# with open('last_id.txt', 'w') as f: +# f.write(str(last_id)) + +# # --- Cells Monitoring Function --- + +# def monitor_cells(kafka_handler, db_config): +# last_id = read_last_id() # Read from file or initialize to 0 +# while True: +# try: +# connection = mysql.connector.connect(**db_config) +# cursor = connection.cursor(dictionary=True) +# query = """ +# SELECT * FROM cells +# WHERE changed_by = 'sheet_sync_user@localhost' +# AND operation = 'insert' +# AND id > %s +# ORDER BY id ASC +# """ +# cursor.execute(query, (last_id,)) +# changes = cursor.fetchall() +# cursor.close() +# connection.close() +# if changes: +# for change in changes: +# # Log the addition +# logging.info(f"Cell added by 'sheet_sync_user': {change}") + +# message = { +# 'id': change['id'], +# 'sheet_id': change['sheet_id'], +# 'row_number': change['row_number'], +# 'column_number': change['column_number'], +# 'value': change['value'], +# 'operation': change['operation'], +# 'changed_by': change['changed_by'], +# 'changed_at': change['changed_at'].strftime('%Y-%m-%d %H:%M:%S'), +# 'is_current': change['is_current'] +# } +# # Publish message to Kafka using KafkaHandler +# kafka_handler.publish_message(message) +# logging.info(f"Sent message: {message}") + +# last_id = changes[-1]['id'] +# write_last_id(last_id) # Save the last processed ID +# else: +# time.sleep(1) +# except Exception as e: +# logging.error(f"Error in monitor_cells: {e}") +# time.sleep(1) + +# # Start the monitor_cells function in a background thread +# monitor_thread = threading.Thread(target=monitor_cells, args=(kafka_handler, db_config), daemon=True) +# monitor_thread.start() + +# # --- Kafka Consumer Thread Function --- + +# def run_consumer(kafka_handler): +# def process_message(msg): +# # Process the message +# sync_sheet_from_json(mysql.connector.connect(**db_config), msg) + +# # Prepare response +# correlation_id = msg.get('correlation_id') +# if correlation_id: +# response = { +# 'status': 'success', +# 'correlation_id': correlation_id, +# 'result': 'Data synchronized' +# } +# # Send response back to the response_topic +# kafka_handler.producer.send(kafka_handler.response_topic, response) +# kafka_handler.producer.flush() + +# kafka_handler.consume_messages(process_message) + +# # Start Kafka consumer in a background thread +# consumer_thread = threading.Thread(target=run_consumer, args=(kafka_handler,), daemon=True) +# consumer_thread.start() + +# # Uncomment the following lines if you need to use logUpdate +# # db_trigger = threading.Thread(target=logUpdate, daemon=True) +# # db_trigger.start() + +# # --- Flask Route to Publish Message to Kafka and Wait for a Response --- + +# @app.route('/kafka-publish-endpoint', methods=['POST']) +# def kafka_publish(): +# """ +# Endpoint to publish a message to Kafka and wait for a response. +# Expects a JSON payload in the request. +# """ +# data = request.json +# if not data: +# return jsonify({"status": "error", "message": "Invalid payload"}), 400 + +# # Send a request and wait for a response +# response = kafka_handler.send_request(data, timeout=10) +# return jsonify(response) + +# # Start Flask server +# if __name__ == '__main__': +# app.run(host='0.0.0.0', port=5000) + +#========================================================================================== +from flask import Flask, request, jsonify +import logging +import threading +import mysql.connector +import json +import time +from twoWayKafka import KafkaHandler +from dbCode.mysqlScript import sync_sheet_from_json +import requests + +logging.basicConfig(level=logging.INFO) + +# --- Database Configurations --- +db_config = { + 'host': 'localhost', + 'user': 'root', + 'password': 'Aavish@02', + 'database': 'superjoin' +} + +# --- Kafka Configuration --- +kafka_config = { + 'bootstrap_servers': 'localhost:9092', + 'topic': 'my_topic', # For A -> B communication + 'group_id': 'my_group', + 'response_topic': 'response_topic', + 'cells_topic': 'cells_topic', # New topic for C -> D communication + 'cells_group_id': 'cells_group' +} + +# --- Flask Application --- +app = Flask(__name__) + +# Create KafkaHandler instances for both A -> B and C -> D communications +kafka_handler = KafkaHandler( + bootstrap_servers=kafka_config['bootstrap_servers'], + topic=kafka_config['topic'], + group_id=kafka_config['group_id'], + response_topic=kafka_config['response_topic'] +) + +kafka_handler_cells = KafkaHandler( + bootstrap_servers=kafka_config['bootstrap_servers'], + topic=kafka_config['cells_topic'], + group_id=kafka_config['cells_group_id'] +) + +# --- Function to Read and Write Last Processed ID --- +def read_last_id(): + try: + with open('last_id.txt', 'r') as f: + return int(f.read()) + except Exception: + return 0 + +def write_last_id(last_id): + with open('last_id.txt', 'w') as f: + f.write(str(last_id)) + +import json +import os + +import json +import os + +import json +import os + +def update_sheet(input_data): + # Define the file name for the data + file_name = 'data.json' + + # Create the structure to hold the changes for the current input + change_entry = { + "row_number": input_data['row_number'], + "column_number": input_data['column_number'], + "value": input_data.get('value', ''), + "operation": input_data['operation'], + "changed_by": input_data['changed_by'], + "changed_at": input_data['changed_at'], + "is_current": input_data['is_current'] + } + + # Initialize an empty structure + data = {"sheets": []} + + # Load the existing file if it exists and contains valid JSON + if os.path.exists(file_name): + try: + with open(file_name, 'r') as file: + file_content = file.read().strip() + if file_content: + data = json.loads(file_content) + except json.JSONDecodeError: + print(f"Warning: {file_name} contains invalid JSON. Reinitializing.") + data = {"sheets": []} + + # Find the sheet in the JSON structure, or add a new sheet entry + sheet_found = False + for sheet in data['sheets']: + if sheet['sheet_id'] == input_data['sheet_id']: + sheet['changes'].append(change_entry) + sheet_found = True + break + + # If the sheet does not exist, create a new sheet entry + if not sheet_found: + new_sheet = { + "sheet_id": input_data['sheet_id'], + "changes": [change_entry] + } + data['sheets'].append(new_sheet) + + # Save the updated data back to the file + with open(file_name, 'w') as file: + json.dump(data, file, indent=4) + + return data + + + + +# --- Cells Monitoring Function (C) --- +def monitor_cells(kafka_handler_cells, db_config): + last_id = read_last_id() # Read from file or initialize to 0 + logging.info(f"Starting monitor_cells with last_id: {last_id}") + + while True: + try: + # Step 1: Connect to the MySQL database + logging.debug("Connecting to the database...") + connection = mysql.connector.connect(**db_config) + cursor = connection.cursor(dictionary=True) + + # Step 2: Prepare and execute the query + query = """ + SELECT * FROM cells + WHERE changed_by = 'sheet_sync_user@localhost' + AND operation = 'insert' + AND id > %s + ORDER BY id ASC + """ + logging.debug(f"Executing query with last_id: {last_id}") + cursor.execute(query, (last_id,)) + changes = cursor.fetchall() + + logging.debug(f"Query executed. Number of changes fetched: {len(changes)}") + cursor.close() + connection.close() + + # Step 3: Process the changes + if changes: + for change in changes: + # Debug each change being processed + logging.debug(f"Processing change: {change}") + + # Prepare the message to send to Kafka + message = { + 'id': change['id'], + 'sheet_id': change['sheet_id'], + 'row_number': change['row_number'], + 'column_number': change['column_number'], + 'value': change['value'], + 'operation': change['operation'], + 'changed_by': change['changed_by'], + 'changed_at': change['changed_at'].strftime('%Y-%m-%d %H:%M:%S'), + 'is_current': change['is_current'] + } + # Log the message being sent to Kafka + logging.info(f"Publishing message to Kafka: {message}") + kafka_handler_cells.publish_message(message) + logging.debug(f"Message sent successfully: {message}") + + # Update the last processed ID and store it + last_id = changes[-1]['id'] + logging.info(f"Updating last_id to: {last_id}") + write_last_id(last_id) + else: + # Log when no changes are found + logging.debug("No changes found. Sleeping for 1 second...") + time.sleep(1) + + except mysql.connector.Error as db_error: + # Log database connection or query errors + logging.error(f"Database error in monitor_cells: {db_error}") + time.sleep(1) + except Exception as e: + # Catch all other errors + logging.error(f"Unexpected error in monitor_cells: {e}") + time.sleep(1) + + +# Start the monitor_cells function in a background thread +monitor_thread = threading.Thread(target=monitor_cells, args=(kafka_handler_cells, db_config), daemon=True) +monitor_thread.start() + +# --- Kafka Consumer for C -> D (Listening on cells_topic) --- +def run_cells_consumer(kafka_handler_cells): + """ + Consumes messages from the Kafka cells_topic and processes them. + Acts as the D part of C -> D communication. + """ + def process_cells_message(msg): + # Process the message (C -> D flow) + logging.info(f"Processing message from cells_topic: {msg}") + + # Simulate processing the request and send a response + correlation_id = msg.get('correlation_id') + + + logging.info("++++++++++++++++++++++++++") + response_data = update_sheet(msg) + logging.info(f"Response from Google Apps Script: {response_data}") + if correlation_id: + response = { + 'status': 'success', + 'correlation_id': correlation_id, + 'result': 'Cells processed successfully' + } + # Send response back to the response_topic + kafka_handler_cells.producer.send(kafka_handler_cells.response_topic, response) + kafka_handler_cells.producer.flush() + logging.info(f"Response sent for correlation_id: {correlation_id}") + + kafka_handler_cells.consume_messages(process_cells_message) + +# Start Kafka consumer for C -> D in a background thread +cells_consumer_thread = threading.Thread(target=run_cells_consumer, args=(kafka_handler_cells,), daemon=True) +cells_consumer_thread.start() + +# --- Kafka Consumer Thread Function for A -> B (Kafka sync) --- +def run_consumer(kafka_handler): + def process_message(msg): + # Process the message + sync_sheet_from_json(mysql.connector.connect(**db_config), msg) + + # Prepare response + correlation_id = msg.get('correlation_id') + if correlation_id: + response = { + 'status': 'success', + 'correlation_id': correlation_id, + 'result': 'Data synchronized' + } + # Send response back to the response_topic + kafka_handler.producer.send(kafka_handler.response_topic, response) + kafka_handler.producer.flush() + + kafka_handler.consume_messages(process_message) + +# Start Kafka consumer for A -> B in a background thread +consumer_thread = threading.Thread(target=run_consumer, args=(kafka_handler,), daemon=True) +consumer_thread.start() + +# --- Flask Route to Publish Message to Kafka and Wait for a Response (A -> B) --- +@app.route('/kafka-publish-endpoint', methods=['POST']) +def kafka_publish(): + """ + Endpoint to publish a message to Kafka and wait for a response. + Expects a JSON payload in the request. + """ + data = request.json + if not data: + return jsonify({"status": "error", "message": "Invalid payload"}), 400 + + # Send a request and wait for a response + response = kafka_handler.send_request(data, timeout=10) + return jsonify(response) + +FILE_PATH = 'data.json' +@app.route('/updateSheet', methods=['POST']) +def read_json_from_file(): + """ + Endpoint to read stored JSON from a file and return it as a response. + """ + try: + # Open and read the JSON file + with open(FILE_PATH, 'r') as file: + data = json.load(file) # Parse the JSON content + # print(data) + # Return the JSON data as a response + return jsonify(data) + + except FileNotFoundError: + return jsonify({"status": "error", "message": "File not found"}), 404 + except json.JSONDecodeError: + return jsonify({"status": "error", "message": "Error decoding JSON"}), 400 + except Exception as e: + return jsonify({"status": "error", "message": str(e)}), 500 + +# Start Flask server +if __name__ == '__main__': + app.run(host='0.0.0.0', port=5000) diff --git a/commands b/commands new file mode 100644 index 000000000..97430d2d5 --- /dev/null +++ b/commands @@ -0,0 +1,7 @@ +#Start Kafka + +zookeeper-server-start.bat C:\kafka_2.13-3.8.0\config\zookeeper.properties +kafka-server-start.bat C:\kafka_2.13-3.8.0\config\server.properties + + +mysqladmin -u root -p shutdown \ No newline at end of file diff --git a/data.json b/data.json new file mode 100644 index 000000000..a707dc070 --- /dev/null +++ b/data.json @@ -0,0 +1,41 @@ +{ + "sheets": [ + { + "sheet_id": 1, + "changes": [ + { + "row_number": 2, + "column_number": 1, + "value": "python", + "operation": "insert", + "changed_by": "sheet_sync_user@localhost", + "changed_at": "2024-09-16 16:13:49", + "is_current": 1 + }, + { + "row_number": 1, + "column_number": 1, + "value": "Gilbert J", + "operation": "update", + "changed_by": "sheet_sync_user@localhost", + "changed_at": "2024-09-16 16:14:00", + "is_current": 1 + } + ] + }, + { + "sheet_id": 2, + "changes": [ + { + "row_number": 1, + "column_number": 1, + "value": "kafka", + "operation": "insert", + "changed_by": "sheet_sync_user@localhost", + "changed_at": "2024-09-16 16:13:49", + "is_current": 1 + } + ] + } + ] +} diff --git a/dbCode/mysqlScript.py b/dbCode/mysqlScript.py new file mode 100644 index 000000000..60653fed2 --- /dev/null +++ b/dbCode/mysqlScript.py @@ -0,0 +1,193 @@ +import mysql.connector +import json + +def sync_sheet_from_json(db, json_input): + cursor = db.cursor() + try: + # Start a transaction + db.start_transaction() + print("Transaction started.") + + # Parse the JSON input + data = json.loads(json_input) if isinstance(json_input, str) else json_input + print("JSON parsed successfully:", data) + + # Get or create the sheet + sheet_name = data["sheet_name"] + print(f"Sheet name: {sheet_name}") + sheet_id = get_or_create_sheet(db, cursor, sheet_name) + print(f"Sheet ID: {sheet_id}") + + # Process each cell in the input + for cell in data.get("cells", []): + row_num = cell.get("row") + col_num = cell.get("column") + value = cell.get("value") # Value can be None for deletion + operation = cell.get("operation", "upsert") + print(f"Processing cell - Row: {row_num}, Column: {col_num}, Value: {value}, Operation: {operation}") + + if operation == "delete": + if row_num is not None and col_num is not None: + print(f"Deleting cell at Row: {row_num}, Column: {col_num}") + delete_cell(cursor, sheet_id, row_num, col_num) + elif row_num is not None: + print(f"Deleting row: {row_num}") + delete_row_or_column(cursor, sheet_id, row_num=row_num) + elif col_num is not None: + print(f"Deleting column: {col_num}") + delete_row_or_column(cursor, sheet_id, col_num=col_num) + else: + print("No valid row or column specified for deletion.") + else: + print(f"Upserting cell at Row: {row_num}, Column: {col_num}, Value: {value}") + upsert_cell(cursor, sheet_id, row_num, col_num, value) + + # Commit the transaction + db.commit() + print("Transaction committed.") + + except mysql.connector.Error as err: + # Rollback transaction in case of any error + db.rollback() + print(f"Database Error during sync: {err}") + except json.JSONDecodeError as json_err: + print(f"JSON parsing error: {json_err}") + except Exception as e: + # Handle any other exceptions + db.rollback() + print(f"Unexpected error during sync: {e}") + finally: + cursor.close() + print("Cursor closed.") + +def get_or_create_sheet(db, cursor, sheet_name): + try: + print(f"Fetching sheet with name: {sheet_name}") + cursor.execute("SELECT id FROM sheets WHERE name = %s", (sheet_name,)) + sheet = cursor.fetchone() + + if sheet is None: + print(f"Sheet not found, creating new sheet: {sheet_name}") + cursor.execute("INSERT INTO sheets (name) VALUES (%s)", (sheet_name,)) + sheet_id = cursor.lastrowid + print(f"New sheet created with ID: {sheet_id}") + return sheet_id + print(f"Sheet found with ID: {sheet[0]}") + return sheet[0] + except mysql.connector.Error as err: + print(f"Error in get_or_create_sheet: {err}") + raise + except Exception as e: + print(f"Unexpected error in get_or_create_sheet: {e}") + raise + +def upsert_cell(cursor, sheet_id, row_num, col_num, value): + try: + if row_num is None or col_num is None: + print("Row number and column number must be provided for upsert operation.") + return + print(f"Processing upsert for cell at row {row_num}, column {col_num}") + + if value is None or value == "": + print(f"Value is None, deleting cell at row {row_num}, column {col_num}") + delete_cell(cursor, sheet_id, row_num, col_num) + return + + # Check if the cell exists + query = """ + SELECT id FROM cells + WHERE sheet_id = %s AND `row_number` = %s AND `column_number` = %s + """ + print(f"Executing query to check if cell exists with sheet_id={sheet_id}, row_number={row_num}, column_number={col_num}") + cursor.execute(query, (sheet_id, row_num, col_num)) + cell = cursor.fetchone() + + if cell: + print(f"Cell exists with ID {cell[0]}, updating value.") + update_query = """ + UPDATE cells + SET value = %s, updated_at = CURRENT_TIMESTAMP + WHERE id = %s + """ + print(f"Executing update query with value: {value}, id: {cell[0]}") + cursor.execute(update_query, (value, cell[0])) + print(f"Updated cell at row {row_num}, col {col_num} with value: {value}") + else: + print(f"Cell does not exist, inserting new cell.") + insert_query = """ + INSERT INTO cells (sheet_id, `row_number`, `column_number`, value) + VALUES (%s, %s, %s, %s) + """ + print(f"Executing insert query with sheet_id={sheet_id}, row_number={row_num}, column_number={col_num}, value={value}") + cursor.execute(insert_query, (sheet_id, row_num, col_num, value)) + print(f"Inserted new cell at row {row_num}, col {col_num} with value: {value}") + except mysql.connector.Error as err: + print(f"Error in upsert_cell: {err}") + raise + except Exception as e: + print(f"Unexpected error in upsert_cell: {e}") + raise + +def delete_cell(cursor, sheet_id, row_num, col_num): + try: + print(f"Deleting cell at row {row_num}, column {col_num} from sheet {sheet_id}") + delete_query = """ + DELETE FROM cells + WHERE sheet_id = %s AND `row_number` = %s AND `column_number` = %s + """ + cursor.execute(delete_query, (sheet_id, row_num, col_num)) + print(f"Deleted cell at row {row_num}, column {col_num} from sheet {sheet_id}") + except mysql.connector.Error as err: + print(f"Error in delete_cell: {err}") + raise + except Exception as e: + print(f"Unexpected error in delete_cell: {e}") + raise + +def delete_row_or_column(cursor, sheet_id, row_num=None, col_num=None): + try: + if row_num is not None: + print(f"Deleting row {row_num} from sheet {sheet_id}") + delete_query = "DELETE FROM cells WHERE sheet_id = %s AND `row_number` = %s" + cursor.execute(delete_query, (sheet_id, row_num)) + print(f"Deleted row {row_num} from sheet {sheet_id}") + + if col_num is not None: + print(f"Deleting column {col_num} from sheet {sheet_id}") + delete_query = "DELETE FROM cells WHERE sheet_id = %s AND `column_number` = %s" + cursor.execute(delete_query, (sheet_id, col_num)) + print(f"Deleted column {col_num} from sheet {sheet_id}") + + except mysql.connector.Error as err: + print(f"Error in delete_row_or_column: {err}") + raise + except Exception as e: + print(f"Unexpected error in delete_row_or_column: {e}") + raise + +# Example usage +if __name__ == "__main__": + # Example JSON input + json_input = ''' + { + "sheet_name": "Sheet2", + "cells": [ + {"row": 3, "column": 3, "value": "bossman", "operation": "insert"}, + {"row": 3, "column": 3, "operation": "delete"}, + {"row": 2, "operation": "delete"}, + {"column": 2, "operation": "delete"} + ] + } + ''' + + db = mysql.connector.connect( + host="localhost", + user="root", + password="your_mysql_password", # Replace with your MySQL root password + database="google_sheet_mimic" + ) + + print("Starting sync process...") + sync_sheet_from_json(db, json_input) + print("Sync process complete.") + db.close() diff --git a/last_id.txt b/last_id.txt new file mode 100644 index 000000000..aaa6442fe --- /dev/null +++ b/last_id.txt @@ -0,0 +1 @@ +41 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 000000000..8d1c8b69c --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ + diff --git a/twoWayKafka.py b/twoWayKafka.py new file mode 100644 index 000000000..05cb9ca9f --- /dev/null +++ b/twoWayKafka.py @@ -0,0 +1,120 @@ +import uuid +import time +from kafka import KafkaProducer, KafkaConsumer +from kafka.errors import KafkaError +import json +import threading +import logging + +class KafkaHandler: + def __init__(self, bootstrap_servers, topic, group_id=None, response_topic=None): + self.bootstrap_servers = bootstrap_servers + self.topic = topic + self.group_id = group_id + self.response_topic = response_topic + + self.producer = KafkaProducer( + bootstrap_servers=self.bootstrap_servers, + value_serializer=lambda v: json.dumps(v).encode('utf-8') + ) + + # Dictionary to hold pending responses + self.pending_responses = {} + self.lock = threading.Lock() + + if self.response_topic: + # Start a consumer thread to listen to responses + threading.Thread(target=self._start_response_consumer, daemon=True).start() + + def publish_message(self, message): + """ + Publishes a message to the Kafka topic. + """ + try: + future = self.producer.send(self.topic, message) + result = future.get(timeout=10) + logging.info(f"Message sent successfully: {result}") + return {"status": "success", "result": str(result)} + except KafkaError as e: + logging.error(f"Failed to send message: {e}") + return {"status": "error", "message": str(e)} + + def consume_messages(self, process_callback): + """ + Consumes messages from the Kafka topic asynchronously. + Passes each message to the provided callback function. + """ + logging.info(f"Starting consumer for topic: {self.topic}") + try: + consumer = KafkaConsumer( + self.topic, + group_id=self.group_id, + bootstrap_servers=self.bootstrap_servers, + value_deserializer=lambda m: json.loads(m.decode('utf-8')), + auto_offset_reset='earliest', + enable_auto_commit=True, + ) + for message in consumer: + process_callback(message.value) + except KafkaError as e: + logging.error(f"Error consuming messages: {e}") + + def send_request(self, message, timeout=10): + """ + Sends a message and waits for a response. + """ + correlation_id = str(uuid.uuid4()) + message['correlation_id'] = correlation_id + + with self.lock: + self.pending_responses[correlation_id] = None + + try: + future = self.producer.send(self.topic, message) + future.get(timeout=10) + logging.info(f"Request sent with correlation_id: {correlation_id}") + except KafkaError as e: + logging.error(f"Failed to send request: {e}") + with self.lock: + del self.pending_responses[correlation_id] + return {"status": "error", "message": str(e)} + + # Wait for the response + start_time = time.time() + while True: + with self.lock: + response = self.pending_responses.get(correlation_id) + if response is not None: + with self.lock: + del self.pending_responses[correlation_id] + return response + elif time.time() - start_time > timeout: + with self.lock: + del self.pending_responses[correlation_id] + logging.error(f"Response timed out for correlation_id: {correlation_id}") + return {"status": "error", "message": "Response timed out"} + else: + time.sleep(0.1) + + def _start_response_consumer(self): + """ + Starts a consumer to listen for responses. + """ + logging.info(f"Starting response consumer for topic: {self.response_topic}") + try: + consumer = KafkaConsumer( + self.response_topic, + group_id=self.group_id + '_response' if self.group_id else None, + bootstrap_servers=self.bootstrap_servers, + value_deserializer=lambda m: json.loads(m.decode('utf-8')), + auto_offset_reset='earliest', + enable_auto_commit=True, + ) + for message in consumer: + response = message.value + correlation_id = response.get('correlation_id') + if correlation_id: + with self.lock: + self.pending_responses[correlation_id] = response + except KafkaError as e: + logging.error(f"Error consuming responses: {e}") \ No newline at end of file diff --git a/updateSheet.py b/updateSheet.py new file mode 100644 index 000000000..e69de29bb