From eefd950ec0f219bc26bf2b188659bbce99681e86 Mon Sep 17 00:00:00 2001 From: "github-classroom[bot]" <66690702+github-classroom[bot]@users.noreply.github.com> Date: Sat, 14 Sep 2024 14:07:36 +0000 Subject: [PATCH 01/17] GitHub Classroom Feedback --- .github/.keep | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 .github/.keep diff --git a/.github/.keep b/.github/.keep new file mode 100644 index 000000000..e69de29bb From 327f7f5c34e4f31905dc4460db9c97c789c8f98f Mon Sep 17 00:00:00 2001 From: "github-classroom[bot]" <66690702+github-classroom[bot]@users.noreply.github.com> Date: Sat, 14 Sep 2024 14:07:36 +0000 Subject: [PATCH 02/17] Setting up GitHub Classroom Feedback From 12487de4aa6ae75f9c4f410c976708d6fa96b66d Mon Sep 17 00:00:00 2001 From: "github-classroom[bot]" <66690702+github-classroom[bot]@users.noreply.github.com> Date: Sat, 14 Sep 2024 14:07:38 +0000 Subject: [PATCH 03/17] add deadline --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index f0aad2ebc..615a8542f 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,4 @@ +[![Review Assignment Due Date](https://classroom.github.com/assets/deadline-readme-button-22041afd0340ce965d47ae6ef1cefeee28c7c493a6346c4f15d667ab976d596c.svg)](https://classroom.github.com/a/AHFn7Vbn) # Superjoin Hiring Assignment ### Welcome to Superjoin's hiring assignment! πŸš€ From c0780c35e614958508653d14d5103b21aff101f0 Mon Sep 17 00:00:00 2001 From: AAVISH GILBERT J <71066342+Aavish-Gilbert-J@users.noreply.github.com> Date: Sat, 14 Sep 2024 23:25:02 +0530 Subject: [PATCH 04/17] Update README.md - Architecture --- README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/README.md b/README.md index 615a8542f..900913b2e 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,12 @@ [![Review Assignment Due Date](https://classroom.github.com/assets/deadline-readme-button-22041afd0340ce965d47ae6ef1cefeee28c7c493a6346c4f15d667ab976d596c.svg)](https://classroom.github.com/a/AHFn7Vbn) # Superjoin Hiring Assignment +### Planned Architecture + +[![](https://mermaid.ink/img/pako:eNqVVV1r2zAU_StCUEhpQt79MGiTrrAta1Zn7MUQFPnGFrElV5I3Qsl_n2QpjmyUkfkl4X6ce-7V0dUHpiIHnGBaEaWWjBSS1BlH5uss6EWIooK0BNAKfTiP_WbKmpZEE5SgL-nr94vrQfDnnOmNZEUBcnIfeBTwfC2UfoP3FpQ--04ZD2s-Nk1KJWt0WPAhFzZzAvcD2wuMTE27q5gqfzY50bARX8n-QAYcqOCqraFzuCh1hUcXMuDgwVegFCkgBhtzlYTnFSwE31eM6mvVfhGmpclOQf4GOZj18skkc6CaCR7gzuujeq-2bddEaG_MoAqDFXGpI6fbojvUbXeEoZN6iluTK6p2VM4eXmygEkj-WYp64Bk1tzqmP74tn0ZdxeXjSHcZgzp70LRcmGEWV09s7Rr_31o-Td1e7u7O3wzkr4Y5Yqtb5IW7EHXdckbJZYaDmzSbfQpkniB3ZWyZHj9As-BOjBHcC4wFdWEJWjulItcg0sJ7NqJhFE288H1bzjXmtHCK9il7IZHPmvdSDuie5Yu8fv_JeaT1GHFXqy816mDMewQYI98jvfXinvuGgjYC9lYxO6LgxgbOEk88dWdAxjJZv6YbNLitfvDnnGgLj_TAxZ8K8gI85NXaA933BM6yDjiMNoOnMUi_mct4Zs88bwTjRt123CklFdmxiuljlHYS3V-RoOgyi8Rd2WyRyOiaw1Ncg6wJy81r2K2ODOsSashwYv7mRB4ynPGTiSOtFqmphhMtW5hiR8w_njjZk0oZK5gLLeTKP6_2Z4qlaIvSR5z-AgPBZ3Q?type=png)](https://mermaid.live/edit#pako:eNqVVV1r2zAU_StCUEhpQt79MGiTrrAta1Zn7MUQFPnGFrElV5I3Qsl_n2QpjmyUkfkl4X6ce-7V0dUHpiIHnGBaEaWWjBSS1BlH5uss6EWIooK0BNAKfTiP_WbKmpZEE5SgL-nr94vrQfDnnOmNZEUBcnIfeBTwfC2UfoP3FpQ--04ZD2s-Nk1KJWt0WPAhFzZzAvcD2wuMTE27q5gqfzY50bARX8n-QAYcqOCqraFzuCh1hUcXMuDgwVegFCkgBhtzlYTnFSwE31eM6mvVfhGmpclOQf4GOZj18skkc6CaCR7gzuujeq-2bddEaG_MoAqDFXGpI6fbojvUbXeEoZN6iluTK6p2VM4eXmygEkj-WYp64Bk1tzqmP74tn0ZdxeXjSHcZgzp70LRcmGEWV09s7Rr_31o-Td1e7u7O3wzkr4Y5Yqtb5IW7EHXdckbJZYaDmzSbfQpkniB3ZWyZHj9As-BOjBHcC4wFdWEJWjulItcg0sJ7NqJhFE288H1bzjXmtHCK9il7IZHPmvdSDuie5Yu8fv_JeaT1GHFXqy816mDMewQYI98jvfXinvuGgjYC9lYxO6LgxgbOEk88dWdAxjJZv6YbNLitfvDnnGgLj_TAxZ8K8gI85NXaA933BM6yDjiMNoOnMUi_mct4Zs88bwTjRt123CklFdmxiuljlHYS3V-RoOgyi8Rd2WyRyOiaw1Ncg6wJy81r2K2ODOsSashwYv7mRB4ynPGTiSOtFqmphhMtW5hiR8w_njjZk0oZK5gLLeTKP6_2Z4qlaIvSR5z-AgPBZ3Q) + + + ### Welcome to Superjoin's hiring assignment! πŸš€ ### Objective From c301225597f879d55894d95a400937d2fa414048 Mon Sep 17 00:00:00 2001 From: AAVISH GILBERT J Date: Sun, 15 Sep 2024 01:52:50 +0530 Subject: [PATCH 05/17] Kafka 1 --- .gitignore | 2 ++ kafkaImplementation.py | 62 ++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 1 + 3 files changed, 65 insertions(+) create mode 100644 .gitignore create mode 100644 kafkaImplementation.py create mode 100644 requirements.txt diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..2a1b221eb --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +venv1 +__pycache__ \ No newline at end of file diff --git a/kafkaImplementation.py b/kafkaImplementation.py new file mode 100644 index 000000000..d40069b36 --- /dev/null +++ b/kafkaImplementation.py @@ -0,0 +1,62 @@ +from kafka import KafkaProducer, KafkaConsumer +from kafka.errors import KafkaError +import json + +class Kafka: + def __init__(self, bootstrap_servers, topic): + self.bootstrap_servers = bootstrap_servers + self.topic = topic + self.producer = KafkaProducer( + bootstrap_servers=self.bootstrap_servers, + value_serializer=lambda v: json.dumps(v).encode('utf-8') + ) + self.consumer = KafkaConsumer( + self.topic, + bootstrap_servers=self.bootstrap_servers, + value_deserializer=lambda m: json.loads(m.decode('utf-8')) + ) + + def publishMessage(self, message): + """ + Publishes a message to the Kafka topic. + """ + try: + future = self.producer.send(self.topic, message) + result = future.get(timeout=10) + print(f"Message sent successfully: {result}") + except KafkaError as e: + print(f"Failed to send message: {e}") + + def consumeMessage(self): + """ + Consumes messages from the Kafka topic. + """ + print(f"Consuming messages from topic: {self.topic}") + try: + for message in self.consumer: + print(f"Received message: {message.value}") + self.handleConflicts(message.value) + except KafkaError as e: + print(f"Error consuming messages: {e}") + + def handleConflicts(self, message): + """ + Handles conflicts based on message content. Custom conflict resolution logic can be added here. + """ + # Example conflict resolution logic (you can expand this logic based on your requirements) + if "conflict" in message: + print("Conflict detected, handling conflict...") + # Add conflict resolution logic here + resolved_message = self.resolveConflict(message) + print(f"Resolved message: {resolved_message}") + # Publish resolved message or take further action + else: + print("No conflict detected.") + + def resolveConflict(self, message): + """ + Resolves conflicts in the message (example logic). + """ + # Example resolution: append '_resolved' to the conflicting data + message['data'] = message.get('data', '') + '_resolved' + return message diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 000000000..8d1c8b69c --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ + From 3906083b5790820d8db37a7d80896aacccba0112 Mon Sep 17 00:00:00 2001 From: AAVISH GILBERT J Date: Sun, 15 Sep 2024 12:03:27 +0530 Subject: [PATCH 06/17] Appscript Code --- AppscriptCode/AppScript.gs | 86 ++++++++++++++++ AppscriptCode/GoogleScript.gs | 75 ++++++++++++++ app.py | 180 ++++++++++++++++++++++++++++++++++ commands | 7 ++ 4 files changed, 348 insertions(+) create mode 100644 AppscriptCode/AppScript.gs create mode 100644 AppscriptCode/GoogleScript.gs create mode 100644 app.py create mode 100644 commands diff --git a/AppscriptCode/AppScript.gs b/AppscriptCode/AppScript.gs new file mode 100644 index 000000000..1d7f9ca09 --- /dev/null +++ b/AppscriptCode/AppScript.gs @@ -0,0 +1,86 @@ +function doPost(e) { + try { + // Extract the POST payload + var jsonData = JSON.parse(e.postData.contents); + + // Process the data, for example, updating the sheet with the new values + var sheet = SpreadsheetApp.getActiveSpreadsheet().getSheetByName(jsonData.sheetName); + var row = jsonData.row; + var column = jsonData.column; + var value = jsonData.value; + + // Update the Google Sheet with the new data + sheet.getRange(row, column).setValue(value); + + // Optionally, publish the update to Kafka + publishUpdateToKafka(jsonData); + + // Return success response + return ContentService.createTextOutput(JSON.stringify({status: 'success'})).setMimeType(ContentService.MimeType.JSON); + + } catch (error) { + Logger.log('Error in doPost: ' + error.toString()); + return ContentService.createTextOutput(JSON.stringify({status: 'error', message: error.toString()})).setMimeType(ContentService.MimeType.JSON); + } +} + +function doGet(e) { + try { + // For example, retrieve data from a specific sheet + var sheet = SpreadsheetApp.getActiveSpreadsheet().getSheetByName('Sheet1'); // Change the sheet name as required + var data = sheet.getDataRange().getValues(); + + // Convert data to JSON format + var jsonData = JSON.stringify(data); + + // Return the data as JSON + return ContentService.createTextOutput(jsonData).setMimeType(ContentService.MimeType.JSON); + + } catch (error) { + Logger.log('Error in doGet: ' + error.toString()); + return ContentService.createTextOutput(JSON.stringify({status: 'error', message: error.toString()})).setMimeType(ContentService.MimeType.JSON); + } +} + +function publishUpdateToKafka(data) { + var kafkaUrl = 'https://example.com/kafka-publish-endpoint'; // Replace with your Kafka publish endpoint + + var options = { + 'method': 'post', + 'contentType': 'application/json', + 'payload': JSON.stringify(data) + }; + + try { + var response = UrlFetchApp.fetch(kafkaUrl, options); + Logger.log('Published update to Kafka: ' + response.getContentText()); + } catch (error) { + Logger.log('Error publishing update to Kafka: ' + error.toString()); + } +} + + +function consumeKafkaUpdates() { + var kafkaUrl = 'https://example.com/kafka-consume-endpoint'; // Replace with your Kafka consume endpoint + + try { + var response = UrlFetchApp.fetch(kafkaUrl); + var updates = JSON.parse(response.getContentText()); + + // Process the updates + updates.forEach(function(update) { + var sheet = SpreadsheetApp.getActiveSpreadsheet().getSheetByName(update.sheetName); + var row = update.row; + var column = update.column; + var value = update.value; + + // Update the Google Sheet with the consumed Kafka data + sheet.getRange(row, column).setValue(value); + }); + + Logger.log('Consumed updates from Kafka'); + + } catch (error) { + Logger.log('Error consuming updates from Kafka: ' + error.toString()); + } +} diff --git a/AppscriptCode/GoogleScript.gs b/AppscriptCode/GoogleScript.gs new file mode 100644 index 000000000..ca8932e8e --- /dev/null +++ b/AppscriptCode/GoogleScript.gs @@ -0,0 +1,75 @@ +function onEditTrigger(e) { + // Check if the edited range contains data + if (!e || !e.range) return; + + // Get the sheet and range information + var sheet = e.range.getSheet(); + var range = e.range; + var sheetName = sheet.getName(); + var rowStart = range.getRow(); + var colStart = range.getColumn(); + var numRows = range.getNumRows(); + var numCols = range.getNumColumns(); + + var modifiedCells = []; + + // Iterate over all modified cells + for (var i = 0; i < numRows; i++) { + for (var j = 0; j < numCols; j++) { + var cellRow = rowStart + i; + var cellCol = colStart + j; + var editedValue = sheet.getRange(cellRow, cellCol).getValue(); + var oldValue = e.oldValue; // Get the old value before the edit + + // Determine the type of modification (created, updated, deleted) + var action = ''; + if (!oldValue && editedValue) { + action = 'created'; + } else if (oldValue && !editedValue) { + action = 'deleted'; + } else if (oldValue !== editedValue) { + action = 'updated'; + } + + if (action) { + // Collect information about the modified cell + modifiedCells.push({ + sheetName: sheetName, + row: cellRow, + column: cellCol, + oldValue: oldValue || null, + newValue: editedValue || null, + action: action + }); + } + } + } + + // If any cells were modified, send the data + if (modifiedCells.length > 0) { + Logger.log(modifiedCells); + sendPostRequest(modifiedCells); + } +} + +function sendPostRequest(modifiedCells) { + var url = 'https://example.com/your-backend-endpoint'; // Replace with your backend endpoint + + var data = { + timestamp: new Date().toISOString(), + modifiedCells: modifiedCells + }; + + var options = { + 'method': 'post', + 'contentType': 'application/json', + 'payload': JSON.stringify(data) + }; + + try { + var response = UrlFetchApp.fetch(url, options); + Logger.log('POST request sent successfully: ' + response.getContentText()); + } catch (error) { + Logger.log('Error sending POST request: ' + error.toString()); + } +} diff --git a/app.py b/app.py new file mode 100644 index 000000000..b01a35d66 --- /dev/null +++ b/app.py @@ -0,0 +1,180 @@ +from flask import Flask, request, jsonify +import mysql.connector +import psycopg2 +from kafka import KafkaProducer, KafkaConsumer +import json + +app = Flask(__name__) + +# Kafka Configuration +KAFKA_BOOTSTRAP_SERVERS = ['localhost:9092'] +KAFKA_TOPIC = 'sync_topic' + +producer = KafkaProducer( + bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS, + value_serializer=lambda v: json.dumps(v).encode('utf-8') +) + +consumer = KafkaConsumer( + KAFKA_TOPIC, + bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS, + value_deserializer=lambda m: json.loads(m.decode('utf-8')) +) + +# MySQL Database Configuration +mysql_db = mysql.connector.connect( + host="localhost", # MySQL server host + user="root", # MySQL username (in this case, 'root') + password="Aavish@02", # MySQL root password + database="superjoin" # Database name +) + +# PostgreSQL Database Configuration +postgres_db = psycopg2.connect( + host="localhost", + user="postgres", + password="Aavish@02", + database="superjoin" +) + + +@app.route('/mysql_update', methods=['POST']) +def mysql_update(): + """ + Endpoint to update MySQL database, and if the record doesn't exist, create it dynamically. + """ + try: + data = request.json + cursor = mysql_db.cursor() + + # Check if the record exists based on 'id' + check_query = "SELECT COUNT(*) FROM your_table WHERE id = %s" + cursor.execute(check_query, (data['id'],)) + result = cursor.fetchone() + + if result[0] > 0: + # If the record exists, perform an UPDATE + update_query = "UPDATE your_table SET column_name = %s WHERE id = %s" + cursor.execute(update_query, (data['value'], data['id'])) + else: + # If the record doesn't exist, perform an INSERT + insert_query = "INSERT INTO your_table (id, column_name) VALUES (%s, %s)" + cursor.execute(insert_query, (data['id'], data['value'])) + + mysql_db.commit() + + # Send the update to Kafka + sendToKafka(data) + + return jsonify({"status": "success", "message": "MySQL database updated"}), 200 + + except Exception as e: + return jsonify({"status": "error", "message": str(e)}), 500 + + + +@app.route('/postgres_update', methods=['POST']) +def postgres_update(): + """ + Endpoint to update PostgreSQL database. + """ + try: + data = request.json + cursor = postgres_db.cursor() + query = "UPDATE your_table SET column_name = %s WHERE id = %s" + cursor.execute(query, (data['value'], data['id'])) + postgres_db.commit() + + # Send the update to Kafka + sendToKafka(data) + + return jsonify({"status": "success", "message": "PostgreSQL database updated"}), 200 + except Exception as e: + return jsonify({"status": "error", "message": str(e)}), 500 + + +@app.route('/sync_google_sheet', methods=['POST']) +def sync_google_sheet(): + """ + Endpoint to synchronize data from Google Sheets to the databases. + """ + try: + data = request.json + table = data.get("table") + row_data = data.get("data") + + # Sync data to MySQL + cursor_mysql = mysql_db.cursor() + query_mysql = "INSERT INTO {} (columns) VALUES (%s, %s, ...)".format(table) + cursor_mysql.execute(query_mysql, tuple(row_data)) + mysql_db.commit() + + # Sync data to PostgreSQL + cursor_postgres = postgres_db.cursor() + query_postgres = "INSERT INTO {} (columns) VALUES (%s, %s, ...)".format(table) + cursor_postgres.execute(query_postgres, tuple(row_data)) + postgres_db.commit() + + # Send the sync data to Kafka + sendToKafka(data) + + return jsonify({"status": "success", "message": "Google Sheets data synchronized"}), 200 + except Exception as e: + return jsonify({"status": "error", "message": str(e)}), 500 + + +@app.route('/conflict_resolution', methods=['POST']) +def conflict_resolution(): + """ + Endpoint to resolve data conflicts between systems. + """ + try: + data = request.json + resolved_data = handleConflicts(data) + return jsonify({"status": "success", "resolved_data": resolved_data}), 200 + except Exception as e: + return jsonify({"status": "error", "message": str(e)}), 500 + + +def sendToKafka(data): + """ + Sends data to a Kafka topic. + """ + try: + producer.send(KAFKA_TOPIC, data) + producer.flush() + print("Data sent to Kafka successfully") + except Exception as e: + print(f"Error sending data to Kafka: {e}") + + +def readFromKafka(): + """ + Reads data from a Kafka topic. + """ + try: + for message in consumer: + print(f"Consumed message from Kafka: {message.value}") + handleConflicts(message.value) + except Exception as e: + print(f"Error consuming Kafka message: {e}") + + +def handleConflicts(data): + """ + Handles conflicts in the data. + """ + # Example conflict resolution logic + if data.get("conflict"): + # Resolve the conflict by modifying the data + data["resolved"] = True + data["new_value"] = data.get("value", 0) * 2 # Custom conflict resolution logic + print("Conflict resolved:", data) + return data + else: + print("No conflict detected.") + return data + + +if __name__ == '__main__': + app.run(host='0.0.0.0', port=5000, debug=True) diff --git a/commands b/commands new file mode 100644 index 000000000..97430d2d5 --- /dev/null +++ b/commands @@ -0,0 +1,7 @@ +#Start Kafka + +zookeeper-server-start.bat C:\kafka_2.13-3.8.0\config\zookeeper.properties +kafka-server-start.bat C:\kafka_2.13-3.8.0\config\server.properties + + +mysqladmin -u root -p shutdown \ No newline at end of file From c1ffe96f54e8ef0ec8a6624f61d614dd9fb7b146 Mon Sep 17 00:00:00 2001 From: AAVISH GILBERT J Date: Sun, 15 Sep 2024 14:25:30 +0530 Subject: [PATCH 07/17] mysql Support --- app.py | 22 ++----- dbCode/mysqlScript.py | 142 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 146 insertions(+), 18 deletions(-) create mode 100644 dbCode/mysqlScript.py diff --git a/app.py b/app.py index b01a35d66..abcc1fbad 100644 --- a/app.py +++ b/app.py @@ -4,6 +4,8 @@ from kafka import KafkaProducer, KafkaConsumer import json +from dbCode.mysqlScript import sync_sheet_from_json + app = Flask(__name__) # Kafka Configuration @@ -45,26 +47,10 @@ def mysql_update(): """ try: data = request.json - cursor = mysql_db.cursor() - - # Check if the record exists based on 'id' - check_query = "SELECT COUNT(*) FROM your_table WHERE id = %s" - cursor.execute(check_query, (data['id'],)) - result = cursor.fetchone() - - if result[0] > 0: - # If the record exists, perform an UPDATE - update_query = "UPDATE your_table SET column_name = %s WHERE id = %s" - cursor.execute(update_query, (data['value'], data['id'])) - else: - # If the record doesn't exist, perform an INSERT - insert_query = "INSERT INTO your_table (id, column_name) VALUES (%s, %s)" - cursor.execute(insert_query, (data['id'], data['value'])) - - mysql_db.commit() + sync_sheet_from_json(mysql_db, data) # Send the update to Kafka - sendToKafka(data) + # sendToKafka(data) return jsonify({"status": "success", "message": "MySQL database updated"}), 200 diff --git a/dbCode/mysqlScript.py b/dbCode/mysqlScript.py new file mode 100644 index 000000000..82bee7613 --- /dev/null +++ b/dbCode/mysqlScript.py @@ -0,0 +1,142 @@ +import mysql.connector +import json + +def sync_sheet_from_json(db, json_input): + try: + # Start a transaction + cursor = db.cursor() + + # Parse the JSON + data = json.loads(json_input) + + # Get or create the sheet + sheet_name = data["sheet_name"] + sheet_id = get_or_create_sheet(db, sheet_name) + + # Process each cell in the input + for cell in data["cells"]: + row_num = cell["row"] + col_num = cell["column"] + value = cell.get("value") # Value can be None for deletion + + if cell.get("operation") == "delete": + if "row" in cell: + delete_row_or_column(db, sheet_id, row_num=row_num) + if "column" in cell: + delete_row_or_column(db, sheet_id, col_num=col_num) + else: + # Handle insert or update + upsert_cell(db, sheet_id, row_num, col_num, value) + + # Commit the transaction + db.commit() + + except mysql.connector.Error as err: + # Rollback transaction in case of any error + db.rollback() + return f"Error: {err}" + except json.JSONDecodeError as json_err: + print(f"JSON parsing error: {json_err}") + except Exception as e: + # Handle any other exceptions + db.rollback() + return f"Unexpected error: {e}" + finally: + cursor.close() + +# Function to find or create a sheet with exception handling +def get_or_create_sheet(db, sheet_name): + try: + cursor = db.cursor() + cursor.execute("SELECT id FROM sheets WHERE name = %s", (sheet_name,)) + sheet = cursor.fetchone() + + if sheet is None: + cursor.execute("INSERT INTO sheets (name) VALUES (%s)", (sheet_name,)) + db.commit() + return cursor.lastrowid + return sheet[0] + except mysql.connector.Error as err: + db.rollback() + return f"Error in get_or_create_sheet: {err}" + finally: + cursor.close() + +# Function to update or insert cell data with exception handling +def upsert_cell(db, sheet_id, row_num, col_num, value): + try: + cursor = db.cursor() + + # Check if the cell exists + cursor.execute(""" + SELECT id FROM cells + WHERE sheet_id = %s AND row_number = %s AND column_number = %s + """, (sheet_id, row_num, col_num)) + cell = cursor.fetchone() + + if cell: + # Update the existing cell value + if value is not None: + cursor.execute(""" + UPDATE cells + SET value = %s + WHERE id = %s + """, (value, cell[0])) + else: + # Set the value to NULL (for delete operation) + cursor.execute(""" + UPDATE cells + SET value = NULL + WHERE id = %s + """, (cell[0],)) + else: + # Insert new cell data + cursor.execute(""" + INSERT INTO cells (sheet_id, row_number, column_number, value) + VALUES (%s, %s, %s, %s) + """, (sheet_id, row_num, col_num, value)) + + db.commit() + except mysql.connector.Error as err: + db.rollback() + return f"Error in upsert_cell: {err}" + finally: + cursor.close() + +# Function to handle deleting a row or column with exception handling +def delete_row_or_column(db, sheet_id, row_num=None, col_num=None): + try: + cursor = db.cursor() + + if row_num is not None: + cursor.execute("DELETE FROM cells WHERE sheet_id = %s AND row_number = %s", (sheet_id, row_num)) + if col_num is not None: + cursor.execute("DELETE FROM cells WHERE sheet_id = %s AND column_number = %s", (sheet_id, col_num)) + + db.commit() + except mysql.connector.Error as err: + db.rollback() + return f"Error in delete_row_or_column: {err}" + finally: + cursor.close() + + + +# # Example JSON input +# json_input = ''' +# { +# "sheet_name": "Sheet1", +# "cells": [ +# {"row": 1, "column": 1, "value": "Product A"}, +# {"row": 1, "column": 2, "value": "Description A"}, +# {"row": 2, "column": 1, "value": "Product B"}, +# {"row": 2, "column": 2, "value": null, "operation": "delete"}, # Delete value from this cell +# {"row": 3, "column": 1, "operation": "delete"} # Delete entire row +# ] +# } +# ''' + +# Sync the data +# sync_sheet_from_json(db, json_input) + + From 92b821ea2839f4ba2e080a17597baf5c3c63611e Mon Sep 17 00:00:00 2001 From: AAVISH GILBERT J Date: Sun, 15 Sep 2024 23:20:28 +0530 Subject: [PATCH 08/17] Final - Sheet to MySQL Pipeline --- app.py | 418 +++++++++++++++++++++++++++-------------- dbCode/mysqlScript.py | 235 ++++++++++++++--------- kafkaImplementation.py | 80 +++++--- 3 files changed, 482 insertions(+), 251 deletions(-) diff --git a/app.py b/app.py index abcc1fbad..6f4439824 100644 --- a/app.py +++ b/app.py @@ -1,34 +1,246 @@ +# from flask import Flask, request, jsonify +# from kafka import KafkaProducer, KafkaConsumer +# from kafka.errors import KafkaError +# import json +# import logging +# import threading +# import mysql.connector +# import psycopg2 +# from dbCode.mysqlScript import sync_sheet_from_json +# logging.basicConfig(level=logging.INFO) + + +# # MySQL Database Configuration +# mysql_db = mysql.connector.connect( +# host="localhost", # MySQL server host +# user="root", # MySQL username (in this case, 'root') +# password="Aavish@02", # MySQL root password +# database="google_sheet_mimic" # Database name +# ) + +# # PostgreSQL Database Configuration +# postgres_db = psycopg2.connect( +# host="localhost", +# user="postgres", +# password="Aavish@02", +# database="superjoin" +# ) + + +# class KafkaHandler: +# def __init__(self, bootstrap_servers, topic, group_id=None): +# self.bootstrap_servers = bootstrap_servers +# self.topic = topic +# self.group_id = group_id + +# self.producer = KafkaProducer( +# bootstrap_servers=self.bootstrap_servers, +# value_serializer=lambda v: json.dumps(v).encode('utf-8') +# ) +# # Remove KafkaConsumer initialization from here + +# def publish_message(self, message): +# """ +# Publishes a message to the Kafka topic. +# """ +# try: +# future = self.producer.send(self.topic, message) +# result = future.get(timeout=10) +# logging.info(f"Message sent successfully: {result}") +# return {"status": "success", "result": str(result)} +# except KafkaError as e: +# logging.error(f"Failed to send message: {e}") +# return {"status": "error", "message": str(e)} + +# def consume_messages(self, process_callback): +# """ +# Initializes the consumer and consumes messages from the Kafka topic asynchronously. +# Passes each message to the provided callback function (e.g., sync_sheet_from_json). +# """ +# logging.info(f"Starting consumer for topic: {self.topic}") +# try: +# consumer = KafkaConsumer( +# self.topic, +# group_id=self.group_id, +# bootstrap_servers=self.bootstrap_servers, +# value_deserializer=lambda m: json.loads(m.decode('utf-8')), +# auto_offset_reset='earliest', # Start from the earliest message if offset is not committed +# enable_auto_commit=True, # Automatically commit message offset after processing +# ) +# for message in consumer: +# process_callback(message.value) +# except KafkaError as e: +# logging.error(f"Error consuming messages: {e}") + +# # Function to run Kafka consumer in a separate thread +# def run_consumer(kafka_handler): +# kafka_handler.consume_messages(lambda msg: sync_sheet_from_json(mysql_db, msg)) + +# # Create Flask App +# app = Flask(__name__) + +# # Create a KafkaHandler instance +# kafka_handler = KafkaHandler(bootstrap_servers='localhost:9092', topic='my_topic', group_id='my_group') + +# # Start Kafka consumer in a background thread +# consumer_thread = threading.Thread(target=run_consumer, args=(kafka_handler,), daemon=True) +# consumer_thread.start() + +# # Flask route to publish message to Kafka +# @app.route('/kafka-publish-endpoint', methods=['POST']) +# def kafka_publish(): +# """ +# Endpoint to publish a message to Kafka. +# Expects a JSON payload in the request. +# """ +# data = request.json +# if not data: +# return jsonify({"status": "error", "message": "Invalid payload"}), 400 + +# response = kafka_handler.publish_message(data) +# return jsonify(response) + +# # Start Flask server +# if __name__ == '__main__': +# app.run(host='0.0.0.0', port=5000) + from flask import Flask, request, jsonify +import json +import logging +import threading +import uuid +import time import mysql.connector import psycopg2 from kafka import KafkaProducer, KafkaConsumer -import json - +from kafka.errors import KafkaError from dbCode.mysqlScript import sync_sheet_from_json -app = Flask(__name__) - -# Kafka Configuration -KAFKA_BOOTSTRAP_SERVERS = ['localhost:9092'] -KAFKA_TOPIC = 'sync_topic' - -producer = KafkaProducer( - bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS, - value_serializer=lambda v: json.dumps(v).encode('utf-8') -) - -consumer = KafkaConsumer( - KAFKA_TOPIC, - bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS, - value_deserializer=lambda m: json.loads(m.decode('utf-8')) -) +logging.basicConfig(level=logging.INFO) + +# --- KafkaHandler Class with Two-Way Message Passing --- + +class KafkaHandler: + def __init__(self, bootstrap_servers, topic, group_id=None, response_topic=None): + self.bootstrap_servers = bootstrap_servers + self.topic = topic + self.group_id = group_id + self.response_topic = response_topic + + self.producer = KafkaProducer( + bootstrap_servers=self.bootstrap_servers, + value_serializer=lambda v: json.dumps(v).encode('utf-8') + ) + + # Dictionary to hold pending responses + self.pending_responses = {} + self.lock = threading.Lock() + + if self.response_topic: + # Start a consumer thread to listen to responses + threading.Thread(target=self._start_response_consumer, daemon=True).start() + + def publish_message(self, message): + """ + Publishes a message to the Kafka topic. + """ + try: + future = self.producer.send(self.topic, message) + result = future.get(timeout=10) + logging.info(f"Message sent successfully: {result}") + return {"status": "success", "result": str(result)} + except KafkaError as e: + logging.error(f"Failed to send message: {e}") + return {"status": "error", "message": str(e)} + + def consume_messages(self, process_callback): + """ + Consumes messages from the Kafka topic asynchronously. + Passes each message to the provided callback function. + """ + logging.info(f"Starting consumer for topic: {self.topic}") + try: + consumer = KafkaConsumer( + self.topic, + group_id=self.group_id, + bootstrap_servers=self.bootstrap_servers, + value_deserializer=lambda m: json.loads(m.decode('utf-8')), + auto_offset_reset='earliest', + enable_auto_commit=True, + ) + for message in consumer: + process_callback(message.value) + except KafkaError as e: + logging.error(f"Error consuming messages: {e}") + + def send_request(self, message, timeout=10): + """ + Sends a message and waits for a response. + """ + correlation_id = str(uuid.uuid4()) + message['correlation_id'] = correlation_id + + with self.lock: + self.pending_responses[correlation_id] = None + + try: + future = self.producer.send(self.topic, message) + future.get(timeout=10) + logging.info(f"Request sent with correlation_id: {correlation_id}") + except KafkaError as e: + logging.error(f"Failed to send request: {e}") + with self.lock: + del self.pending_responses[correlation_id] + return {"status": "error", "message": str(e)} + + # Wait for the response + start_time = time.time() + while True: + with self.lock: + response = self.pending_responses.get(correlation_id) + if response is not None: + with self.lock: + del self.pending_responses[correlation_id] + return response + elif time.time() - start_time > timeout: + with self.lock: + del self.pending_responses[correlation_id] + logging.error(f"Response timed out for correlation_id: {correlation_id}") + return {"status": "error", "message": "Response timed out"} + else: + time.sleep(0.1) + + def _start_response_consumer(self): + """ + Starts a consumer to listen for responses. + """ + logging.info(f"Starting response consumer for topic: {self.response_topic}") + try: + consumer = KafkaConsumer( + self.response_topic, + group_id=self.group_id + '_response' if self.group_id else None, + bootstrap_servers=self.bootstrap_servers, + value_deserializer=lambda m: json.loads(m.decode('utf-8')), + auto_offset_reset='earliest', + enable_auto_commit=True, + ) + for message in consumer: + response = message.value + correlation_id = response.get('correlation_id') + if correlation_id: + with self.lock: + self.pending_responses[correlation_id] = response + except KafkaError as e: + logging.error(f"Error consuming responses: {e}") + +# --- Database Configurations --- # MySQL Database Configuration mysql_db = mysql.connector.connect( host="localhost", # MySQL server host - user="root", # MySQL username (in this case, 'root') - password="Aavish@02", # MySQL root password - database="superjoin" # Database name + user="root", # MySQL username + password="Aavish@02", # MySQL password + database="google_sheet_mimic" # Database name ) # PostgreSQL Database Configuration @@ -39,128 +251,60 @@ database="superjoin" ) +# --- Kafka Consumer Thread Function --- + +def run_consumer(kafka_handler): + def process_message(msg): + # Process the message + sync_sheet_from_json(mysql_db, msg) + + # Prepare response + correlation_id = msg.get('correlation_id') + if correlation_id: + response = { + 'status': 'success', + 'correlation_id': correlation_id, + 'result': 'Data synchronized' + } + # Send response back to the response_topic + kafka_handler.producer.send(kafka_handler.response_topic, response) + kafka_handler.producer.flush() + + kafka_handler.consume_messages(process_message) + + + +# --- Flask Application --- -@app.route('/mysql_update', methods=['POST']) -def mysql_update(): - """ - Endpoint to update MySQL database, and if the record doesn't exist, create it dynamically. - """ - try: - data = request.json - sync_sheet_from_json(mysql_db, data) - - # Send the update to Kafka - # sendToKafka(data) - - return jsonify({"status": "success", "message": "MySQL database updated"}), 200 - - except Exception as e: - return jsonify({"status": "error", "message": str(e)}), 500 - - - -@app.route('/postgres_update', methods=['POST']) -def postgres_update(): - """ - Endpoint to update PostgreSQL database. - """ - try: - data = request.json - cursor = postgres_db.cursor() - query = "UPDATE your_table SET column_name = %s WHERE id = %s" - cursor.execute(query, (data['value'], data['id'])) - postgres_db.commit() - - # Send the update to Kafka - sendToKafka(data) - - return jsonify({"status": "success", "message": "PostgreSQL database updated"}), 200 - except Exception as e: - return jsonify({"status": "error", "message": str(e)}), 500 - - -@app.route('/sync_google_sheet', methods=['POST']) -def sync_google_sheet(): - """ - Endpoint to synchronize data from Google Sheets to the databases. - """ - try: - data = request.json - table = data.get("table") - row_data = data.get("data") - - # Sync data to MySQL - cursor_mysql = mysql_db.cursor() - query_mysql = "INSERT INTO {} (columns) VALUES (%s, %s, ...)".format(table) - cursor_mysql.execute(query_mysql, tuple(row_data)) - mysql_db.commit() - - # Sync data to PostgreSQL - cursor_postgres = postgres_db.cursor() - query_postgres = "INSERT INTO {} (columns) VALUES (%s, %s, ...)".format(table) - cursor_postgres.execute(query_postgres, tuple(row_data)) - postgres_db.commit() - - # Send the sync data to Kafka - sendToKafka(data) - - return jsonify({"status": "success", "message": "Google Sheets data synchronized"}), 200 - except Exception as e: - return jsonify({"status": "error", "message": str(e)}), 500 - - -@app.route('/conflict_resolution', methods=['POST']) -def conflict_resolution(): - """ - Endpoint to resolve data conflicts between systems. - """ - try: - data = request.json - resolved_data = handleConflicts(data) - return jsonify({"status": "success", "resolved_data": resolved_data}), 200 - except Exception as e: - return jsonify({"status": "error", "message": str(e)}), 500 - - -def sendToKafka(data): - """ - Sends data to a Kafka topic. - """ - try: - producer.send(KAFKA_TOPIC, data) - producer.flush() - print("Data sent to Kafka successfully") - except Exception as e: - print(f"Error sending data to Kafka: {e}") - +app = Flask(__name__) -def readFromKafka(): - """ - Reads data from a Kafka topic. - """ - try: - for message in consumer: - print(f"Consumed message from Kafka: {message.value}") - handleConflicts(message.value) - except Exception as e: - print(f"Error consuming Kafka message: {e}") +# Create a KafkaHandler instance with response_topic for two-way communication +kafka_handler = KafkaHandler( + bootstrap_servers='localhost:9092', + topic='my_topic', + group_id='my_group', + response_topic='response_topic' # Replace with your actual response topic +) +# Start Kafka consumer in a background thread +consumer_thread = threading.Thread(target=run_consumer, args=(kafka_handler,), daemon=True) +consumer_thread.start() -def handleConflicts(data): +# Flask route to publish message to Kafka and wait for a response +@app.route('/kafka-publish-endpoint', methods=['POST']) +def kafka_publish(): """ - Handles conflicts in the data. + Endpoint to publish a message to Kafka and wait for a response. + Expects a JSON payload in the request. """ - # Example conflict resolution logic - if data.get("conflict"): - # Resolve the conflict by modifying the data - data["resolved"] = True - data["new_value"] = data.get("value", 0) * 2 # Custom conflict resolution logic - print("Conflict resolved:", data) - return data - else: - print("No conflict detected.") - return data + data = request.json + if not data: + return jsonify({"status": "error", "message": "Invalid payload"}), 400 + # Send a request and wait for a response + response = kafka_handler.send_request(data, timeout=10) + return jsonify(response) +# Start Flask server if __name__ == '__main__': - app.run(host='0.0.0.0', port=5000, debug=True) + app.run(host='0.0.0.0', port=5000) diff --git a/dbCode/mysqlScript.py b/dbCode/mysqlScript.py index 82bee7613..60653fed2 100644 --- a/dbCode/mysqlScript.py +++ b/dbCode/mysqlScript.py @@ -2,141 +2,192 @@ import json def sync_sheet_from_json(db, json_input): + cursor = db.cursor() try: # Start a transaction - cursor = db.cursor() - - # Parse the JSON - data = json.loads(json_input) - + db.start_transaction() + print("Transaction started.") + + # Parse the JSON input + data = json.loads(json_input) if isinstance(json_input, str) else json_input + print("JSON parsed successfully:", data) + # Get or create the sheet sheet_name = data["sheet_name"] - sheet_id = get_or_create_sheet(db, sheet_name) - + print(f"Sheet name: {sheet_name}") + sheet_id = get_or_create_sheet(db, cursor, sheet_name) + print(f"Sheet ID: {sheet_id}") + # Process each cell in the input - for cell in data["cells"]: - row_num = cell["row"] - col_num = cell["column"] + for cell in data.get("cells", []): + row_num = cell.get("row") + col_num = cell.get("column") value = cell.get("value") # Value can be None for deletion - - if cell.get("operation") == "delete": - if "row" in cell: - delete_row_or_column(db, sheet_id, row_num=row_num) - if "column" in cell: - delete_row_or_column(db, sheet_id, col_num=col_num) + operation = cell.get("operation", "upsert") + print(f"Processing cell - Row: {row_num}, Column: {col_num}, Value: {value}, Operation: {operation}") + + if operation == "delete": + if row_num is not None and col_num is not None: + print(f"Deleting cell at Row: {row_num}, Column: {col_num}") + delete_cell(cursor, sheet_id, row_num, col_num) + elif row_num is not None: + print(f"Deleting row: {row_num}") + delete_row_or_column(cursor, sheet_id, row_num=row_num) + elif col_num is not None: + print(f"Deleting column: {col_num}") + delete_row_or_column(cursor, sheet_id, col_num=col_num) + else: + print("No valid row or column specified for deletion.") else: - # Handle insert or update - upsert_cell(db, sheet_id, row_num, col_num, value) - + print(f"Upserting cell at Row: {row_num}, Column: {col_num}, Value: {value}") + upsert_cell(cursor, sheet_id, row_num, col_num, value) + # Commit the transaction db.commit() + print("Transaction committed.") except mysql.connector.Error as err: # Rollback transaction in case of any error db.rollback() - return f"Error: {err}" + print(f"Database Error during sync: {err}") except json.JSONDecodeError as json_err: print(f"JSON parsing error: {json_err}") except Exception as e: # Handle any other exceptions db.rollback() - return f"Unexpected error: {e}" + print(f"Unexpected error during sync: {e}") finally: cursor.close() + print("Cursor closed.") -# Function to find or create a sheet with exception handling -def get_or_create_sheet(db, sheet_name): +def get_or_create_sheet(db, cursor, sheet_name): try: - cursor = db.cursor() + print(f"Fetching sheet with name: {sheet_name}") cursor.execute("SELECT id FROM sheets WHERE name = %s", (sheet_name,)) sheet = cursor.fetchone() if sheet is None: + print(f"Sheet not found, creating new sheet: {sheet_name}") cursor.execute("INSERT INTO sheets (name) VALUES (%s)", (sheet_name,)) - db.commit() - return cursor.lastrowid + sheet_id = cursor.lastrowid + print(f"New sheet created with ID: {sheet_id}") + return sheet_id + print(f"Sheet found with ID: {sheet[0]}") return sheet[0] except mysql.connector.Error as err: - db.rollback() - return f"Error in get_or_create_sheet: {err}" - finally: - cursor.close() + print(f"Error in get_or_create_sheet: {err}") + raise + except Exception as e: + print(f"Unexpected error in get_or_create_sheet: {e}") + raise -# Function to update or insert cell data with exception handling -def upsert_cell(db, sheet_id, row_num, col_num, value): +def upsert_cell(cursor, sheet_id, row_num, col_num, value): try: - cursor = db.cursor() + if row_num is None or col_num is None: + print("Row number and column number must be provided for upsert operation.") + return + print(f"Processing upsert for cell at row {row_num}, column {col_num}") + + if value is None or value == "": + print(f"Value is None, deleting cell at row {row_num}, column {col_num}") + delete_cell(cursor, sheet_id, row_num, col_num) + return # Check if the cell exists - cursor.execute(""" + query = """ SELECT id FROM cells - WHERE sheet_id = %s AND row_number = %s AND column_number = %s - """, (sheet_id, row_num, col_num)) + WHERE sheet_id = %s AND `row_number` = %s AND `column_number` = %s + """ + print(f"Executing query to check if cell exists with sheet_id={sheet_id}, row_number={row_num}, column_number={col_num}") + cursor.execute(query, (sheet_id, row_num, col_num)) cell = cursor.fetchone() if cell: - # Update the existing cell value - if value is not None: - cursor.execute(""" - UPDATE cells - SET value = %s - WHERE id = %s - """, (value, cell[0])) - else: - # Set the value to NULL (for delete operation) - cursor.execute(""" - UPDATE cells - SET value = NULL - WHERE id = %s - """, (cell[0],)) + print(f"Cell exists with ID {cell[0]}, updating value.") + update_query = """ + UPDATE cells + SET value = %s, updated_at = CURRENT_TIMESTAMP + WHERE id = %s + """ + print(f"Executing update query with value: {value}, id: {cell[0]}") + cursor.execute(update_query, (value, cell[0])) + print(f"Updated cell at row {row_num}, col {col_num} with value: {value}") else: - # Insert new cell data - cursor.execute(""" - INSERT INTO cells (sheet_id, row_number, column_number, value) + print(f"Cell does not exist, inserting new cell.") + insert_query = """ + INSERT INTO cells (sheet_id, `row_number`, `column_number`, value) VALUES (%s, %s, %s, %s) - """, (sheet_id, row_num, col_num, value)) - - db.commit() + """ + print(f"Executing insert query with sheet_id={sheet_id}, row_number={row_num}, column_number={col_num}, value={value}") + cursor.execute(insert_query, (sheet_id, row_num, col_num, value)) + print(f"Inserted new cell at row {row_num}, col {col_num} with value: {value}") except mysql.connector.Error as err: - db.rollback() - return f"Error in upsert_cell: {err}" - finally: - cursor.close() + print(f"Error in upsert_cell: {err}") + raise + except Exception as e: + print(f"Unexpected error in upsert_cell: {e}") + raise -# Function to handle deleting a row or column with exception handling -def delete_row_or_column(db, sheet_id, row_num=None, col_num=None): +def delete_cell(cursor, sheet_id, row_num, col_num): try: - cursor = db.cursor() - - if row_num is not None: - cursor.execute("DELETE FROM cells WHERE sheet_id = %s AND row_number = %s", (sheet_id, row_num)) - if col_num is not None: - cursor.execute("DELETE FROM cells WHERE sheet_id = %s AND column_number = %s", (sheet_id, col_num)) - - db.commit() + print(f"Deleting cell at row {row_num}, column {col_num} from sheet {sheet_id}") + delete_query = """ + DELETE FROM cells + WHERE sheet_id = %s AND `row_number` = %s AND `column_number` = %s + """ + cursor.execute(delete_query, (sheet_id, row_num, col_num)) + print(f"Deleted cell at row {row_num}, column {col_num} from sheet {sheet_id}") except mysql.connector.Error as err: - db.rollback() - return f"Error in delete_row_or_column: {err}" - finally: - cursor.close() - - - -# # Example JSON input -# json_input = ''' -# { -# "sheet_name": "Sheet1", -# "cells": [ -# {"row": 1, "column": 1, "value": "Product A"}, -# {"row": 1, "column": 2, "value": "Description A"}, -# {"row": 2, "column": 1, "value": "Product B"}, -# {"row": 2, "column": 2, "value": null, "operation": "delete"}, # Delete value from this cell -# {"row": 3, "column": 1, "operation": "delete"} # Delete entire row -# ] -# } -# ''' + print(f"Error in delete_cell: {err}") + raise + except Exception as e: + print(f"Unexpected error in delete_cell: {e}") + raise -# Sync the data -# sync_sheet_from_json(db, json_input) +def delete_row_or_column(cursor, sheet_id, row_num=None, col_num=None): + try: + if row_num is not None: + print(f"Deleting row {row_num} from sheet {sheet_id}") + delete_query = "DELETE FROM cells WHERE sheet_id = %s AND `row_number` = %s" + cursor.execute(delete_query, (sheet_id, row_num)) + print(f"Deleted row {row_num} from sheet {sheet_id}") + if col_num is not None: + print(f"Deleting column {col_num} from sheet {sheet_id}") + delete_query = "DELETE FROM cells WHERE sheet_id = %s AND `column_number` = %s" + cursor.execute(delete_query, (sheet_id, col_num)) + print(f"Deleted column {col_num} from sheet {sheet_id}") + except mysql.connector.Error as err: + print(f"Error in delete_row_or_column: {err}") + raise + except Exception as e: + print(f"Unexpected error in delete_row_or_column: {e}") + raise + +# Example usage +if __name__ == "__main__": + # Example JSON input + json_input = ''' + { + "sheet_name": "Sheet2", + "cells": [ + {"row": 3, "column": 3, "value": "bossman", "operation": "insert"}, + {"row": 3, "column": 3, "operation": "delete"}, + {"row": 2, "operation": "delete"}, + {"column": 2, "operation": "delete"} + ] + } + ''' + + db = mysql.connector.connect( + host="localhost", + user="root", + password="your_mysql_password", # Replace with your MySQL root password + database="google_sheet_mimic" + ) + + print("Starting sync process...") + sync_sheet_from_json(db, json_input) + print("Sync process complete.") + db.close() diff --git a/kafkaImplementation.py b/kafkaImplementation.py index d40069b36..752b00e50 100644 --- a/kafkaImplementation.py +++ b/kafkaImplementation.py @@ -1,62 +1,98 @@ from kafka import KafkaProducer, KafkaConsumer from kafka.errors import KafkaError import json +import logging +from concurrent.futures import ThreadPoolExecutor +import signal +import sys -class Kafka: - def __init__(self, bootstrap_servers, topic): +logging.basicConfig(level=logging.INFO) + +class KafkaHandler: + def __init__(self, bootstrap_servers, topic, group_id=None): self.bootstrap_servers = bootstrap_servers self.topic = topic + self.group_id = group_id + self.executor = ThreadPoolExecutor(max_workers=10) # Use a thread pool for async message consumption + self.producer = KafkaProducer( bootstrap_servers=self.bootstrap_servers, value_serializer=lambda v: json.dumps(v).encode('utf-8') ) + self.consumer = KafkaConsumer( self.topic, + group_id=self.group_id, bootstrap_servers=self.bootstrap_servers, - value_deserializer=lambda m: json.loads(m.decode('utf-8')) + value_deserializer=lambda m: json.loads(m.decode('utf-8')), + auto_offset_reset='earliest', # Start from the earliest message if offset is not committed + enable_auto_commit=True, # Automatically commit message offset after processing ) - - def publishMessage(self, message): + + # Graceful shutdown on SIGINT (Ctrl+C) + signal.signal(signal.SIGINT, self.shutdown) + + def publish_message(self, message): """ Publishes a message to the Kafka topic. """ try: future = self.producer.send(self.topic, message) result = future.get(timeout=10) - print(f"Message sent successfully: {result}") + logging.info(f"Message sent successfully: {result}") except KafkaError as e: - print(f"Failed to send message: {e}") + logging.error(f"Failed to send message: {e}") - def consumeMessage(self): + def consume_messages(self): """ - Consumes messages from the Kafka topic. + Consumes messages from the Kafka topic asynchronously. """ - print(f"Consuming messages from topic: {self.topic}") + logging.info(f"Consuming messages from topic: {self.topic}") try: for message in self.consumer: - print(f"Received message: {message.value}") - self.handleConflicts(message.value) + # Submit message processing to the thread pool to avoid blocking the main thread + self.executor.submit(self.process_message, message.value) except KafkaError as e: - print(f"Error consuming messages: {e}") + logging.error(f"Error consuming messages: {e}") + + def process_message(self, message): + """ + Processes the Kafka message and handles conflicts if necessary. + """ + logging.info(f"Processing message: {message}") + self.handle_conflicts(message) - def handleConflicts(self, message): + def handle_conflicts(self, message): """ Handles conflicts based on message content. Custom conflict resolution logic can be added here. """ # Example conflict resolution logic (you can expand this logic based on your requirements) if "conflict" in message: - print("Conflict detected, handling conflict...") - # Add conflict resolution logic here - resolved_message = self.resolveConflict(message) - print(f"Resolved message: {resolved_message}") - # Publish resolved message or take further action + logging.warning("Conflict detected, handling conflict...") + resolved_message = self.resolve_conflict(message) + logging.info(f"Resolved message: {resolved_message}") + self.publish_message(resolved_message) else: - print("No conflict detected.") - - def resolveConflict(self, message): + logging.info("No conflict detected.") + + def resolve_conflict(self, message): """ Resolves conflicts in the message (example logic). """ # Example resolution: append '_resolved' to the conflicting data message['data'] = message.get('data', '') + '_resolved' return message + + def shutdown(self, signum, frame): + """ + Gracefully shutdown consumer and producer. + """ + logging.info("Shutting down Kafka producer and consumer...") + self.consumer.close() + self.producer.close() + self.executor.shutdown(wait=False) + sys.exit(0) + +# Usage +kafka = KafkaHandler(bootstrap_servers='localhost:9092', topic='my_topic', group_id='my_group') +kafka.consume_messages() From e3a946031c72c102d86fdf92443ae9c12c45655c Mon Sep 17 00:00:00 2001 From: AAVISH GILBERT J Date: Mon, 16 Sep 2024 13:53:00 +0530 Subject: [PATCH 09/17] oneway --- AppscriptCode/AppScript.gs | 86 --------- AppscriptCode/sheetToDb.gs | 101 +++++++++++ app.py | 355 +++++++++++++++---------------------- kafkaImplementation.py | 98 ---------- last_id.txt | 1 + twoWayKafka.py | 120 +++++++++++++ updateSheet.py | 62 +++++++ 7 files changed, 428 insertions(+), 395 deletions(-) delete mode 100644 AppscriptCode/AppScript.gs create mode 100644 AppscriptCode/sheetToDb.gs delete mode 100644 kafkaImplementation.py create mode 100644 last_id.txt create mode 100644 twoWayKafka.py create mode 100644 updateSheet.py diff --git a/AppscriptCode/AppScript.gs b/AppscriptCode/AppScript.gs deleted file mode 100644 index 1d7f9ca09..000000000 --- a/AppscriptCode/AppScript.gs +++ /dev/null @@ -1,86 +0,0 @@ -function doPost(e) { - try { - // Extract the POST payload - var jsonData = JSON.parse(e.postData.contents); - - // Process the data, for example, updating the sheet with the new values - var sheet = SpreadsheetApp.getActiveSpreadsheet().getSheetByName(jsonData.sheetName); - var row = jsonData.row; - var column = jsonData.column; - var value = jsonData.value; - - // Update the Google Sheet with the new data - sheet.getRange(row, column).setValue(value); - - // Optionally, publish the update to Kafka - publishUpdateToKafka(jsonData); - - // Return success response - return ContentService.createTextOutput(JSON.stringify({status: 'success'})).setMimeType(ContentService.MimeType.JSON); - - } catch (error) { - Logger.log('Error in doPost: ' + error.toString()); - return ContentService.createTextOutput(JSON.stringify({status: 'error', message: error.toString()})).setMimeType(ContentService.MimeType.JSON); - } -} - -function doGet(e) { - try { - // For example, retrieve data from a specific sheet - var sheet = SpreadsheetApp.getActiveSpreadsheet().getSheetByName('Sheet1'); // Change the sheet name as required - var data = sheet.getDataRange().getValues(); - - // Convert data to JSON format - var jsonData = JSON.stringify(data); - - // Return the data as JSON - return ContentService.createTextOutput(jsonData).setMimeType(ContentService.MimeType.JSON); - - } catch (error) { - Logger.log('Error in doGet: ' + error.toString()); - return ContentService.createTextOutput(JSON.stringify({status: 'error', message: error.toString()})).setMimeType(ContentService.MimeType.JSON); - } -} - -function publishUpdateToKafka(data) { - var kafkaUrl = 'https://example.com/kafka-publish-endpoint'; // Replace with your Kafka publish endpoint - - var options = { - 'method': 'post', - 'contentType': 'application/json', - 'payload': JSON.stringify(data) - }; - - try { - var response = UrlFetchApp.fetch(kafkaUrl, options); - Logger.log('Published update to Kafka: ' + response.getContentText()); - } catch (error) { - Logger.log('Error publishing update to Kafka: ' + error.toString()); - } -} - - -function consumeKafkaUpdates() { - var kafkaUrl = 'https://example.com/kafka-consume-endpoint'; // Replace with your Kafka consume endpoint - - try { - var response = UrlFetchApp.fetch(kafkaUrl); - var updates = JSON.parse(response.getContentText()); - - // Process the updates - updates.forEach(function(update) { - var sheet = SpreadsheetApp.getActiveSpreadsheet().getSheetByName(update.sheetName); - var row = update.row; - var column = update.column; - var value = update.value; - - // Update the Google Sheet with the consumed Kafka data - sheet.getRange(row, column).setValue(value); - }); - - Logger.log('Consumed updates from Kafka'); - - } catch (error) { - Logger.log('Error consuming updates from Kafka: ' + error.toString()); - } -} diff --git a/AppscriptCode/sheetToDb.gs b/AppscriptCode/sheetToDb.gs new file mode 100644 index 000000000..6c48c32a6 --- /dev/null +++ b/AppscriptCode/sheetToDb.gs @@ -0,0 +1,101 @@ +function onEditTrigger(e) { + Logger.log("Triggered onEdit"); + + // Check if the edited range contains data + if (!e || !e.range) { + Logger.log("No range or event detected"); + return; + } + + // Get the sheet and range information + var sheet = e.range.getSheet(); + var range = e.range; + var sheetName = sheet.getName(); + var rowStart = range.getRow(); + var colStart = range.getColumn(); + var numRows = range.getNumRows(); + var numCols = range.getNumColumns(); + + Logger.log("Sheet Name: " + sheetName); + Logger.log("Row Start: " + rowStart + ", Column Start: " + colStart); + Logger.log("Number of Rows: " + numRows + ", Number of Columns: " + numCols); + + var modifiedCells = []; + + // Iterate over all modified cells + for (var i = 0; i < numRows; i++) { + for (var j = 0; j < numCols; j++) { + var cellRow = rowStart + i; + var cellCol = colStart + j; + var editedValue = sheet.getRange(cellRow, cellCol).getValue(); + var oldValue = e.oldValue; // Get the old value before the edit + + Logger.log("Processing cell: (" + cellRow + ", " + cellCol + ")"); + Logger.log("Old Value: " + oldValue + ", New Value: " + editedValue); + + // Determine the type of modification (inserted, updated, deleted) + var action = ''; + if (!oldValue && editedValue) { + action = 'insert'; + Logger.log("Action: insert"); + } else if (oldValue && !editedValue) { + action = 'delete'; // Set action to delete if the cell is cleared + Logger.log("Action: delete"); + } else if (oldValue !== editedValue) { + action = 'update'; + Logger.log("Action: update"); + } + + if (action) { + // Collect information about the modified cell + var cellModification = { + row: cellRow, + column: cellCol, + value: action === 'delete' ? null : editedValue, // If delete, set value to null + operation: action + }; + + Logger.log("Cell Modification: " + JSON.stringify(cellModification)); + + modifiedCells.push(cellModification); + } + } + } + + // Build the final JSON object + if (modifiedCells.length > 0) { + var jsonPayload = { + sheet_name: sheetName, + cells: modifiedCells + }; + + Logger.log("JSON Payload: " + JSON.stringify(jsonPayload)); // Log the JSON object + + sendPostRequest(jsonPayload); // Function to send the JSON payload to the server + } else { + Logger.log("No modifications detected"); + } +} + +// Function to send the JSON data via POST request to Kafka endpoint +function sendPostRequest(payload) { + Logger.log("Publishing to Kafka"); + + var url = "https://unique-powerful-husky.ngrok-free.app/kafka-publish-endpoint"; // Kafka publishing endpoint + + var options = { + method: "POST", + contentType: "application/json", + payload: JSON.stringify(payload) + }; + + try { + var response = UrlFetchApp.fetch(url, options); + Logger.log("Kafka Publish Response: " + response.getContentText()); + } catch (error) { + Logger.log("Error in Kafka publish request: " + error.toString()); + } +} + + + diff --git a/app.py b/app.py index 6f4439824..aec15f86b 100644 --- a/app.py +++ b/app.py @@ -1,262 +1,204 @@ + # from flask import Flask, request, jsonify -# from kafka import KafkaProducer, KafkaConsumer -# from kafka.errors import KafkaError -# import json # import logging # import threading # import mysql.connector # import psycopg2 +# from twoWayKafka import KafkaHandler # from dbCode.mysqlScript import sync_sheet_from_json +# from updateSheet import logUpdate + # logging.basicConfig(level=logging.INFO) +# # --- Database Configurations --- + # # MySQL Database Configuration # mysql_db = mysql.connector.connect( # host="localhost", # MySQL server host -# user="root", # MySQL username (in this case, 'root') -# password="Aavish@02", # MySQL root password -# database="google_sheet_mimic" # Database name +# user="root", # MySQL username +# password="Aavish@02", # MySQL password +# database="superjoin" # Database name # ) -# # PostgreSQL Database Configuration -# postgres_db = psycopg2.connect( -# host="localhost", -# user="postgres", -# password="Aavish@02", -# database="superjoin" -# ) -# class KafkaHandler: -# def __init__(self, bootstrap_servers, topic, group_id=None): -# self.bootstrap_servers = bootstrap_servers -# self.topic = topic -# self.group_id = group_id - -# self.producer = KafkaProducer( -# bootstrap_servers=self.bootstrap_servers, -# value_serializer=lambda v: json.dumps(v).encode('utf-8') -# ) -# # Remove KafkaConsumer initialization from here - -# def publish_message(self, message): -# """ -# Publishes a message to the Kafka topic. -# """ -# try: -# future = self.producer.send(self.topic, message) -# result = future.get(timeout=10) -# logging.info(f"Message sent successfully: {result}") -# return {"status": "success", "result": str(result)} -# except KafkaError as e: -# logging.error(f"Failed to send message: {e}") -# return {"status": "error", "message": str(e)} - -# def consume_messages(self, process_callback): -# """ -# Initializes the consumer and consumes messages from the Kafka topic asynchronously. -# Passes each message to the provided callback function (e.g., sync_sheet_from_json). -# """ -# logging.info(f"Starting consumer for topic: {self.topic}") -# try: -# consumer = KafkaConsumer( -# self.topic, -# group_id=self.group_id, -# bootstrap_servers=self.bootstrap_servers, -# value_deserializer=lambda m: json.loads(m.decode('utf-8')), -# auto_offset_reset='earliest', # Start from the earliest message if offset is not committed -# enable_auto_commit=True, # Automatically commit message offset after processing -# ) -# for message in consumer: -# process_callback(message.value) -# except KafkaError as e: -# logging.error(f"Error consuming messages: {e}") - -# # Function to run Kafka consumer in a separate thread + +# # --- Kafka Consumer Thread Function --- + # def run_consumer(kafka_handler): -# kafka_handler.consume_messages(lambda msg: sync_sheet_from_json(mysql_db, msg)) +# def process_message(msg): +# # Process the message +# sync_sheet_from_json(mysql_db, msg) + +# # Prepare response +# correlation_id = msg.get('correlation_id') +# if correlation_id: +# response = { +# 'status': 'success', +# 'correlation_id': correlation_id, +# 'result': 'Data synchronized' +# } +# # Send response back to the response_topic +# kafka_handler.producer.send(kafka_handler.response_topic, response) +# kafka_handler.producer.flush() + +# kafka_handler.consume_messages(process_message) + + + +# # --- Flask Application --- -# # Create Flask App # app = Flask(__name__) -# # Create a KafkaHandler instance -# kafka_handler = KafkaHandler(bootstrap_servers='localhost:9092', topic='my_topic', group_id='my_group') +# # Create a KafkaHandler instance with response_topic for two-way communication +# kafka_handler = KafkaHandler( +# bootstrap_servers='localhost:9092', +# topic='my_topic', +# group_id='my_group', +# response_topic='response_topic' # Replace with your actual response topic +# ) # # Start Kafka consumer in a background thread # consumer_thread = threading.Thread(target=run_consumer, args=(kafka_handler,), daemon=True) # consumer_thread.start() -# # Flask route to publish message to Kafka +# # db_trigger = threading.Thread(target=logUpdate, daemon=True) +# # db_trigger.start() + + +# # Flask route to publish message to Kafka and wait for a response # @app.route('/kafka-publish-endpoint', methods=['POST']) # def kafka_publish(): # """ -# Endpoint to publish a message to Kafka. +# Endpoint to publish a message to Kafka and wait for a response. # Expects a JSON payload in the request. # """ # data = request.json # if not data: # return jsonify({"status": "error", "message": "Invalid payload"}), 400 -# response = kafka_handler.publish_message(data) +# # Send a request and wait for a response +# response = kafka_handler.send_request(data, timeout=10) # return jsonify(response) # # Start Flask server # if __name__ == '__main__': # app.run(host='0.0.0.0', port=5000) - from flask import Flask, request, jsonify -import json import logging import threading -import uuid -import time import mysql.connector -import psycopg2 -from kafka import KafkaProducer, KafkaConsumer -from kafka.errors import KafkaError +import json +import time +from twoWayKafka import KafkaHandler from dbCode.mysqlScript import sync_sheet_from_json +# Uncomment the following line if you need to use logUpdate +# from updateSheet import logUpdate logging.basicConfig(level=logging.INFO) -# --- KafkaHandler Class with Two-Way Message Passing --- +# --- Database Configurations --- -class KafkaHandler: - def __init__(self, bootstrap_servers, topic, group_id=None, response_topic=None): - self.bootstrap_servers = bootstrap_servers - self.topic = topic - self.group_id = group_id - self.response_topic = response_topic +# MySQL Database Configuration +db_config = { + 'host': 'localhost', # MySQL server host + 'user': 'root', # MySQL username + 'password': 'Aavish@02', # MySQL password + 'database': 'superjoin' # Database name +} + +# --- Kafka Configuration --- + +kafka_config = { + 'bootstrap_servers': 'localhost:9092', + 'topic': 'my_topic', + 'group_id': 'my_group', + 'response_topic': 'response_topic' # Replace with your actual response topic +} - self.producer = KafkaProducer( - bootstrap_servers=self.bootstrap_servers, - value_serializer=lambda v: json.dumps(v).encode('utf-8') - ) +# --- Flask Application --- - # Dictionary to hold pending responses - self.pending_responses = {} - self.lock = threading.Lock() +app = Flask(__name__) - if self.response_topic: - # Start a consumer thread to listen to responses - threading.Thread(target=self._start_response_consumer, daemon=True).start() +# Create a KafkaHandler instance with response_topic for two-way communication +kafka_handler = KafkaHandler( + bootstrap_servers=kafka_config['bootstrap_servers'], + topic=kafka_config['topic'], + group_id=kafka_config['group_id'], + response_topic=kafka_config['response_topic'] +) - def publish_message(self, message): - """ - Publishes a message to the Kafka topic. - """ - try: - future = self.producer.send(self.topic, message) - result = future.get(timeout=10) - logging.info(f"Message sent successfully: {result}") - return {"status": "success", "result": str(result)} - except KafkaError as e: - logging.error(f"Failed to send message: {e}") - return {"status": "error", "message": str(e)} - - def consume_messages(self, process_callback): - """ - Consumes messages from the Kafka topic asynchronously. - Passes each message to the provided callback function. - """ - logging.info(f"Starting consumer for topic: {self.topic}") - try: - consumer = KafkaConsumer( - self.topic, - group_id=self.group_id, - bootstrap_servers=self.bootstrap_servers, - value_deserializer=lambda m: json.loads(m.decode('utf-8')), - auto_offset_reset='earliest', - enable_auto_commit=True, - ) - for message in consumer: - process_callback(message.value) - except KafkaError as e: - logging.error(f"Error consuming messages: {e}") - - def send_request(self, message, timeout=10): - """ - Sends a message and waits for a response. - """ - correlation_id = str(uuid.uuid4()) - message['correlation_id'] = correlation_id - - with self.lock: - self.pending_responses[correlation_id] = None +# --- Function to Read and Write Last Processed ID --- - try: - future = self.producer.send(self.topic, message) - future.get(timeout=10) - logging.info(f"Request sent with correlation_id: {correlation_id}") - except KafkaError as e: - logging.error(f"Failed to send request: {e}") - with self.lock: - del self.pending_responses[correlation_id] - return {"status": "error", "message": str(e)} - - # Wait for the response - start_time = time.time() - while True: - with self.lock: - response = self.pending_responses.get(correlation_id) - if response is not None: - with self.lock: - del self.pending_responses[correlation_id] - return response - elif time.time() - start_time > timeout: - with self.lock: - del self.pending_responses[correlation_id] - logging.error(f"Response timed out for correlation_id: {correlation_id}") - return {"status": "error", "message": "Response timed out"} - else: - time.sleep(0.1) +def read_last_id(): + try: + with open('last_id.txt', 'r') as f: + return int(f.read()) + except Exception: + return 0 - def _start_response_consumer(self): - """ - Starts a consumer to listen for responses. - """ - logging.info(f"Starting response consumer for topic: {self.response_topic}") - try: - consumer = KafkaConsumer( - self.response_topic, - group_id=self.group_id + '_response' if self.group_id else None, - bootstrap_servers=self.bootstrap_servers, - value_deserializer=lambda m: json.loads(m.decode('utf-8')), - auto_offset_reset='earliest', - enable_auto_commit=True, - ) - for message in consumer: - response = message.value - correlation_id = response.get('correlation_id') - if correlation_id: - with self.lock: - self.pending_responses[correlation_id] = response - except KafkaError as e: - logging.error(f"Error consuming responses: {e}") +def write_last_id(last_id): + with open('last_id.txt', 'w') as f: + f.write(str(last_id)) -# --- Database Configurations --- +# --- Cells Monitoring Function --- -# MySQL Database Configuration -mysql_db = mysql.connector.connect( - host="localhost", # MySQL server host - user="root", # MySQL username - password="Aavish@02", # MySQL password - database="google_sheet_mimic" # Database name -) +def monitor_cells(kafka_handler, db_config): + last_id = read_last_id() # Read from file or initialize to 0 + while True: + try: + connection = mysql.connector.connect(**db_config) + cursor = connection.cursor(dictionary=True) + query = """ + SELECT * FROM cells + WHERE changed_by = 'sheet_sync_user@localhost' + AND operation = 'insert' + AND id > %s + ORDER BY id ASC + """ + cursor.execute(query, (last_id,)) + changes = cursor.fetchall() + cursor.close() + connection.close() + if changes: + for change in changes: + # Log the addition + logging.info(f"Cell added by 'sheet_sync_user': {change}") + + message = { + 'id': change['id'], + 'sheet_id': change['sheet_id'], + 'row_number': change['row_number'], + 'column_number': change['column_number'], + 'value': change['value'], + 'operation': change['operation'], + 'changed_by': change['changed_by'], + 'changed_at': change['changed_at'].strftime('%Y-%m-%d %H:%M:%S'), + 'is_current': change['is_current'] + } + # Publish message to Kafka using KafkaHandler + kafka_handler.publish_message(message) + logging.info(f"Sent message: {message}") + + last_id = changes[-1]['id'] + write_last_id(last_id) # Save the last processed ID + else: + time.sleep(1) + except Exception as e: + logging.error(f"Error in monitor_cells: {e}") + time.sleep(1) -# PostgreSQL Database Configuration -postgres_db = psycopg2.connect( - host="localhost", - user="postgres", - password="Aavish@02", - database="superjoin" -) +# Start the monitor_cells function in a background thread +monitor_thread = threading.Thread(target=monitor_cells, args=(kafka_handler, db_config), daemon=True) +monitor_thread.start() # --- Kafka Consumer Thread Function --- def run_consumer(kafka_handler): def process_message(msg): # Process the message - sync_sheet_from_json(mysql_db, msg) + sync_sheet_from_json(mysql.connector.connect(**db_config), msg) # Prepare response correlation_id = msg.get('correlation_id') @@ -272,25 +214,16 @@ def process_message(msg): kafka_handler.consume_messages(process_message) - - -# --- Flask Application --- - -app = Flask(__name__) - -# Create a KafkaHandler instance with response_topic for two-way communication -kafka_handler = KafkaHandler( - bootstrap_servers='localhost:9092', - topic='my_topic', - group_id='my_group', - response_topic='response_topic' # Replace with your actual response topic -) - # Start Kafka consumer in a background thread consumer_thread = threading.Thread(target=run_consumer, args=(kafka_handler,), daemon=True) consumer_thread.start() -# Flask route to publish message to Kafka and wait for a response +# Uncomment the following lines if you need to use logUpdate +# db_trigger = threading.Thread(target=logUpdate, daemon=True) +# db_trigger.start() + +# --- Flask Route to Publish Message to Kafka and Wait for a Response --- + @app.route('/kafka-publish-endpoint', methods=['POST']) def kafka_publish(): """ diff --git a/kafkaImplementation.py b/kafkaImplementation.py deleted file mode 100644 index 752b00e50..000000000 --- a/kafkaImplementation.py +++ /dev/null @@ -1,98 +0,0 @@ -from kafka import KafkaProducer, KafkaConsumer -from kafka.errors import KafkaError -import json -import logging -from concurrent.futures import ThreadPoolExecutor -import signal -import sys - -logging.basicConfig(level=logging.INFO) - -class KafkaHandler: - def __init__(self, bootstrap_servers, topic, group_id=None): - self.bootstrap_servers = bootstrap_servers - self.topic = topic - self.group_id = group_id - self.executor = ThreadPoolExecutor(max_workers=10) # Use a thread pool for async message consumption - - self.producer = KafkaProducer( - bootstrap_servers=self.bootstrap_servers, - value_serializer=lambda v: json.dumps(v).encode('utf-8') - ) - - self.consumer = KafkaConsumer( - self.topic, - group_id=self.group_id, - bootstrap_servers=self.bootstrap_servers, - value_deserializer=lambda m: json.loads(m.decode('utf-8')), - auto_offset_reset='earliest', # Start from the earliest message if offset is not committed - enable_auto_commit=True, # Automatically commit message offset after processing - ) - - # Graceful shutdown on SIGINT (Ctrl+C) - signal.signal(signal.SIGINT, self.shutdown) - - def publish_message(self, message): - """ - Publishes a message to the Kafka topic. - """ - try: - future = self.producer.send(self.topic, message) - result = future.get(timeout=10) - logging.info(f"Message sent successfully: {result}") - except KafkaError as e: - logging.error(f"Failed to send message: {e}") - - def consume_messages(self): - """ - Consumes messages from the Kafka topic asynchronously. - """ - logging.info(f"Consuming messages from topic: {self.topic}") - try: - for message in self.consumer: - # Submit message processing to the thread pool to avoid blocking the main thread - self.executor.submit(self.process_message, message.value) - except KafkaError as e: - logging.error(f"Error consuming messages: {e}") - - def process_message(self, message): - """ - Processes the Kafka message and handles conflicts if necessary. - """ - logging.info(f"Processing message: {message}") - self.handle_conflicts(message) - - def handle_conflicts(self, message): - """ - Handles conflicts based on message content. Custom conflict resolution logic can be added here. - """ - # Example conflict resolution logic (you can expand this logic based on your requirements) - if "conflict" in message: - logging.warning("Conflict detected, handling conflict...") - resolved_message = self.resolve_conflict(message) - logging.info(f"Resolved message: {resolved_message}") - self.publish_message(resolved_message) - else: - logging.info("No conflict detected.") - - def resolve_conflict(self, message): - """ - Resolves conflicts in the message (example logic). - """ - # Example resolution: append '_resolved' to the conflicting data - message['data'] = message.get('data', '') + '_resolved' - return message - - def shutdown(self, signum, frame): - """ - Gracefully shutdown consumer and producer. - """ - logging.info("Shutting down Kafka producer and consumer...") - self.consumer.close() - self.producer.close() - self.executor.shutdown(wait=False) - sys.exit(0) - -# Usage -kafka = KafkaHandler(bootstrap_servers='localhost:9092', topic='my_topic', group_id='my_group') -kafka.consume_messages() diff --git a/last_id.txt b/last_id.txt new file mode 100644 index 000000000..62f945751 --- /dev/null +++ b/last_id.txt @@ -0,0 +1 @@ +6 \ No newline at end of file diff --git a/twoWayKafka.py b/twoWayKafka.py new file mode 100644 index 000000000..05cb9ca9f --- /dev/null +++ b/twoWayKafka.py @@ -0,0 +1,120 @@ +import uuid +import time +from kafka import KafkaProducer, KafkaConsumer +from kafka.errors import KafkaError +import json +import threading +import logging + +class KafkaHandler: + def __init__(self, bootstrap_servers, topic, group_id=None, response_topic=None): + self.bootstrap_servers = bootstrap_servers + self.topic = topic + self.group_id = group_id + self.response_topic = response_topic + + self.producer = KafkaProducer( + bootstrap_servers=self.bootstrap_servers, + value_serializer=lambda v: json.dumps(v).encode('utf-8') + ) + + # Dictionary to hold pending responses + self.pending_responses = {} + self.lock = threading.Lock() + + if self.response_topic: + # Start a consumer thread to listen to responses + threading.Thread(target=self._start_response_consumer, daemon=True).start() + + def publish_message(self, message): + """ + Publishes a message to the Kafka topic. + """ + try: + future = self.producer.send(self.topic, message) + result = future.get(timeout=10) + logging.info(f"Message sent successfully: {result}") + return {"status": "success", "result": str(result)} + except KafkaError as e: + logging.error(f"Failed to send message: {e}") + return {"status": "error", "message": str(e)} + + def consume_messages(self, process_callback): + """ + Consumes messages from the Kafka topic asynchronously. + Passes each message to the provided callback function. + """ + logging.info(f"Starting consumer for topic: {self.topic}") + try: + consumer = KafkaConsumer( + self.topic, + group_id=self.group_id, + bootstrap_servers=self.bootstrap_servers, + value_deserializer=lambda m: json.loads(m.decode('utf-8')), + auto_offset_reset='earliest', + enable_auto_commit=True, + ) + for message in consumer: + process_callback(message.value) + except KafkaError as e: + logging.error(f"Error consuming messages: {e}") + + def send_request(self, message, timeout=10): + """ + Sends a message and waits for a response. + """ + correlation_id = str(uuid.uuid4()) + message['correlation_id'] = correlation_id + + with self.lock: + self.pending_responses[correlation_id] = None + + try: + future = self.producer.send(self.topic, message) + future.get(timeout=10) + logging.info(f"Request sent with correlation_id: {correlation_id}") + except KafkaError as e: + logging.error(f"Failed to send request: {e}") + with self.lock: + del self.pending_responses[correlation_id] + return {"status": "error", "message": str(e)} + + # Wait for the response + start_time = time.time() + while True: + with self.lock: + response = self.pending_responses.get(correlation_id) + if response is not None: + with self.lock: + del self.pending_responses[correlation_id] + return response + elif time.time() - start_time > timeout: + with self.lock: + del self.pending_responses[correlation_id] + logging.error(f"Response timed out for correlation_id: {correlation_id}") + return {"status": "error", "message": "Response timed out"} + else: + time.sleep(0.1) + + def _start_response_consumer(self): + """ + Starts a consumer to listen for responses. + """ + logging.info(f"Starting response consumer for topic: {self.response_topic}") + try: + consumer = KafkaConsumer( + self.response_topic, + group_id=self.group_id + '_response' if self.group_id else None, + bootstrap_servers=self.bootstrap_servers, + value_deserializer=lambda m: json.loads(m.decode('utf-8')), + auto_offset_reset='earliest', + enable_auto_commit=True, + ) + for message in consumer: + response = message.value + correlation_id = response.get('correlation_id') + if correlation_id: + with self.lock: + self.pending_responses[correlation_id] = response + except KafkaError as e: + logging.error(f"Error consuming responses: {e}") \ No newline at end of file diff --git a/updateSheet.py b/updateSheet.py new file mode 100644 index 000000000..a51346e91 --- /dev/null +++ b/updateSheet.py @@ -0,0 +1,62 @@ +import mysql.connector +import requests +import time + +# Configuration +DB_CONFIG = { + 'host': 'localhost', + 'user': 'sheet_sync_user', + 'password': 'Aavish@02', + 'database': 'google_sheet_mimic' +} + +APP_SCRIPT_URL = 'https://script.google.com/macros/s/AKfycbx0Ffi116BJDJPlH6bLHKJAGb8yNnU-By2Fp0kLySJ1ruJt-jV7nhlpyAwWGGKKwW7UCg/exec' + +def fetch_unprocessed_changes(db_conn): + cursor = db_conn.cursor(dictionary=True) + query = "SELECT * FROM cell_changes WHERE processed = 0 ORDER BY changed_at ASC" + cursor.execute(query) + changes = cursor.fetchall() + cursor.close() + return changes + +def mark_changes_as_processed(db_conn, change_ids): + cursor = db_conn.cursor() + format_strings = ','.join(['%s'] * len(change_ids)) + query = "UPDATE cell_changes SET processed = 1 WHERE id IN (%s)" % format_strings + cursor.execute(query, tuple(change_ids)) + db_conn.commit() + cursor.close() + +def logUpdate(): + db_conn = mysql.connector.connect(**DB_CONFIG) + + try: + while True: + changes = fetch_unprocessed_changes(db_conn) + if changes: + # Send changes to Apps Script + payload = {'changes': changes} + response = requests.post(APP_SCRIPT_URL, json=payload) + + if response.status_code == 200: + print("Changes sent successfully.") + # Mark changes as processed + change_ids = [change['id'] for change in changes] + mark_changes_as_processed(db_conn, change_ids) + else: + print(f"Failed to send changes. Status code: {response.status_code}") + print(f"Response: {response.text}") + else: + print("No new changes.") + + # Wait before checking again + time.sleep(2) # Adjust the interval as needed + + except KeyboardInterrupt: + print("Stopping the monitoring script.") + finally: + db_conn.close() + +if __name__ == "__main__": + logUpdate() From 4fd47a4c0948d13be3f5cfa821fa48ec57914db3 Mon Sep 17 00:00:00 2001 From: AAVISH GILBERT J Date: Mon, 16 Sep 2024 14:44:39 +0530 Subject: [PATCH 10/17] Two Way Kafka Configured --- app.py | 277 +++++++++++++++++++++++++++++++++++++++++++++------- last_id.txt | 2 +- 2 files changed, 245 insertions(+), 34 deletions(-) diff --git a/app.py b/app.py index aec15f86b..361214ee8 100644 --- a/app.py +++ b/app.py @@ -85,6 +85,170 @@ # # Start Flask server # if __name__ == '__main__': # app.run(host='0.0.0.0', port=5000) + + + +#========================================================================================================================== + + +# from flask import Flask, request, jsonify +# import logging +# import threading +# import mysql.connector +# import json +# import time +# from twoWayKafka import KafkaHandler +# from dbCode.mysqlScript import sync_sheet_from_json +# # Uncomment the following line if you need to use logUpdate +# # from updateSheet import logUpdate + +# logging.basicConfig(level=logging.INFO) + +# # --- Database Configurations --- + +# # MySQL Database Configuration +# db_config = { +# 'host': 'localhost', # MySQL server host +# 'user': 'root', # MySQL username +# 'password': 'Aavish@02', # MySQL password +# 'database': 'superjoin' # Database name +# } + +# # --- Kafka Configuration --- + +# kafka_config = { +# 'bootstrap_servers': 'localhost:9092', +# 'topic': 'my_topic', +# 'group_id': 'my_group', +# 'response_topic': 'response_topic' # Replace with your actual response topic +# } + +# # --- Flask Application --- + +# app = Flask(__name__) + +# # Create a KafkaHandler instance with response_topic for two-way communication +# kafka_handler = KafkaHandler( +# bootstrap_servers=kafka_config['bootstrap_servers'], +# topic=kafka_config['topic'], +# group_id=kafka_config['group_id'], +# response_topic=kafka_config['response_topic'] +# ) + +# # --- Function to Read and Write Last Processed ID --- + +# def read_last_id(): +# try: +# with open('last_id.txt', 'r') as f: +# return int(f.read()) +# except Exception: +# return 0 + +# def write_last_id(last_id): +# with open('last_id.txt', 'w') as f: +# f.write(str(last_id)) + +# # --- Cells Monitoring Function --- + +# def monitor_cells(kafka_handler, db_config): +# last_id = read_last_id() # Read from file or initialize to 0 +# while True: +# try: +# connection = mysql.connector.connect(**db_config) +# cursor = connection.cursor(dictionary=True) +# query = """ +# SELECT * FROM cells +# WHERE changed_by = 'sheet_sync_user@localhost' +# AND operation = 'insert' +# AND id > %s +# ORDER BY id ASC +# """ +# cursor.execute(query, (last_id,)) +# changes = cursor.fetchall() +# cursor.close() +# connection.close() +# if changes: +# for change in changes: +# # Log the addition +# logging.info(f"Cell added by 'sheet_sync_user': {change}") + +# message = { +# 'id': change['id'], +# 'sheet_id': change['sheet_id'], +# 'row_number': change['row_number'], +# 'column_number': change['column_number'], +# 'value': change['value'], +# 'operation': change['operation'], +# 'changed_by': change['changed_by'], +# 'changed_at': change['changed_at'].strftime('%Y-%m-%d %H:%M:%S'), +# 'is_current': change['is_current'] +# } +# # Publish message to Kafka using KafkaHandler +# kafka_handler.publish_message(message) +# logging.info(f"Sent message: {message}") + +# last_id = changes[-1]['id'] +# write_last_id(last_id) # Save the last processed ID +# else: +# time.sleep(1) +# except Exception as e: +# logging.error(f"Error in monitor_cells: {e}") +# time.sleep(1) + +# # Start the monitor_cells function in a background thread +# monitor_thread = threading.Thread(target=monitor_cells, args=(kafka_handler, db_config), daemon=True) +# monitor_thread.start() + +# # --- Kafka Consumer Thread Function --- + +# def run_consumer(kafka_handler): +# def process_message(msg): +# # Process the message +# sync_sheet_from_json(mysql.connector.connect(**db_config), msg) + +# # Prepare response +# correlation_id = msg.get('correlation_id') +# if correlation_id: +# response = { +# 'status': 'success', +# 'correlation_id': correlation_id, +# 'result': 'Data synchronized' +# } +# # Send response back to the response_topic +# kafka_handler.producer.send(kafka_handler.response_topic, response) +# kafka_handler.producer.flush() + +# kafka_handler.consume_messages(process_message) + +# # Start Kafka consumer in a background thread +# consumer_thread = threading.Thread(target=run_consumer, args=(kafka_handler,), daemon=True) +# consumer_thread.start() + +# # Uncomment the following lines if you need to use logUpdate +# # db_trigger = threading.Thread(target=logUpdate, daemon=True) +# # db_trigger.start() + +# # --- Flask Route to Publish Message to Kafka and Wait for a Response --- + +# @app.route('/kafka-publish-endpoint', methods=['POST']) +# def kafka_publish(): +# """ +# Endpoint to publish a message to Kafka and wait for a response. +# Expects a JSON payload in the request. +# """ +# data = request.json +# if not data: +# return jsonify({"status": "error", "message": "Invalid payload"}), 400 + +# # Send a request and wait for a response +# response = kafka_handler.send_request(data, timeout=10) +# return jsonify(response) + +# # Start Flask server +# if __name__ == '__main__': +# app.run(host='0.0.0.0', port=5000) + + from flask import Flask, request, jsonify import logging import threading @@ -93,35 +257,31 @@ import time from twoWayKafka import KafkaHandler from dbCode.mysqlScript import sync_sheet_from_json -# Uncomment the following line if you need to use logUpdate -# from updateSheet import logUpdate logging.basicConfig(level=logging.INFO) # --- Database Configurations --- - -# MySQL Database Configuration db_config = { - 'host': 'localhost', # MySQL server host - 'user': 'root', # MySQL username - 'password': 'Aavish@02', # MySQL password - 'database': 'superjoin' # Database name + 'host': 'localhost', + 'user': 'root', + 'password': 'Aavish@02', + 'database': 'superjoin' } # --- Kafka Configuration --- - kafka_config = { 'bootstrap_servers': 'localhost:9092', - 'topic': 'my_topic', + 'topic': 'my_topic', # For A -> B communication 'group_id': 'my_group', - 'response_topic': 'response_topic' # Replace with your actual response topic + 'response_topic': 'response_topic', + 'cells_topic': 'cells_topic', # New topic for C -> D communication + 'cells_group_id': 'cells_group' } # --- Flask Application --- - app = Flask(__name__) -# Create a KafkaHandler instance with response_topic for two-way communication +# Create KafkaHandler instances for both A -> B and C -> D communications kafka_handler = KafkaHandler( bootstrap_servers=kafka_config['bootstrap_servers'], topic=kafka_config['topic'], @@ -129,8 +289,13 @@ response_topic=kafka_config['response_topic'] ) -# --- Function to Read and Write Last Processed ID --- +kafka_handler_cells = KafkaHandler( + bootstrap_servers=kafka_config['bootstrap_servers'], + topic=kafka_config['cells_topic'], + group_id=kafka_config['cells_group_id'] +) +# --- Function to Read and Write Last Processed ID --- def read_last_id(): try: with open('last_id.txt', 'r') as f: @@ -142,14 +307,19 @@ def write_last_id(last_id): with open('last_id.txt', 'w') as f: f.write(str(last_id)) -# --- Cells Monitoring Function --- - -def monitor_cells(kafka_handler, db_config): +# --- Cells Monitoring Function (C) --- +def monitor_cells(kafka_handler_cells, db_config): last_id = read_last_id() # Read from file or initialize to 0 + logging.info(f"Starting monitor_cells with last_id: {last_id}") + while True: try: + # Step 1: Connect to the MySQL database + logging.debug("Connecting to the database...") connection = mysql.connector.connect(**db_config) cursor = connection.cursor(dictionary=True) + + # Step 2: Prepare and execute the query query = """ SELECT * FROM cells WHERE changed_by = 'sheet_sync_user@localhost' @@ -157,15 +327,21 @@ def monitor_cells(kafka_handler, db_config): AND id > %s ORDER BY id ASC """ + logging.debug(f"Executing query with last_id: {last_id}") cursor.execute(query, (last_id,)) changes = cursor.fetchall() + + logging.debug(f"Query executed. Number of changes fetched: {len(changes)}") cursor.close() connection.close() + + # Step 3: Process the changes if changes: for change in changes: - # Log the addition - logging.info(f"Cell added by 'sheet_sync_user': {change}") + # Debug each change being processed + logging.debug(f"Processing change: {change}") + # Prepare the message to send to Kafka message = { 'id': change['id'], 'sheet_id': change['sheet_id'], @@ -177,24 +353,64 @@ def monitor_cells(kafka_handler, db_config): 'changed_at': change['changed_at'].strftime('%Y-%m-%d %H:%M:%S'), 'is_current': change['is_current'] } - # Publish message to Kafka using KafkaHandler - kafka_handler.publish_message(message) - logging.info(f"Sent message: {message}") + # Log the message being sent to Kafka + logging.info(f"Publishing message to Kafka: {message}") + kafka_handler_cells.publish_message(message) + logging.debug(f"Message sent successfully: {message}") + # Update the last processed ID and store it last_id = changes[-1]['id'] - write_last_id(last_id) # Save the last processed ID + logging.info(f"Updating last_id to: {last_id}") + write_last_id(last_id) else: + # Log when no changes are found + logging.debug("No changes found. Sleeping for 1 second...") time.sleep(1) + + except mysql.connector.Error as db_error: + # Log database connection or query errors + logging.error(f"Database error in monitor_cells: {db_error}") + time.sleep(1) except Exception as e: - logging.error(f"Error in monitor_cells: {e}") + # Catch all other errors + logging.error(f"Unexpected error in monitor_cells: {e}") time.sleep(1) + # Start the monitor_cells function in a background thread -monitor_thread = threading.Thread(target=monitor_cells, args=(kafka_handler, db_config), daemon=True) +monitor_thread = threading.Thread(target=monitor_cells, args=(kafka_handler_cells, db_config), daemon=True) monitor_thread.start() -# --- Kafka Consumer Thread Function --- +# --- Kafka Consumer for C -> D (Listening on cells_topic) --- +def run_cells_consumer(kafka_handler_cells): + """ + Consumes messages from the Kafka cells_topic and processes them. + Acts as the D part of C -> D communication. + """ + def process_cells_message(msg): + # Process the message (C -> D flow) + logging.info(f"Processing message from cells_topic: {msg}") + + # Simulate processing the request and send a response + correlation_id = msg.get('correlation_id') + if correlation_id: + response = { + 'status': 'success', + 'correlation_id': correlation_id, + 'result': 'Cells processed successfully' + } + # Send response back to the response_topic + kafka_handler_cells.producer.send(kafka_handler_cells.response_topic, response) + kafka_handler_cells.producer.flush() + logging.info(f"Response sent for correlation_id: {correlation_id}") + + kafka_handler_cells.consume_messages(process_cells_message) +# Start Kafka consumer for C -> D in a background thread +cells_consumer_thread = threading.Thread(target=run_cells_consumer, args=(kafka_handler_cells,), daemon=True) +cells_consumer_thread.start() + +# --- Kafka Consumer Thread Function for A -> B (Kafka sync) --- def run_consumer(kafka_handler): def process_message(msg): # Process the message @@ -214,16 +430,11 @@ def process_message(msg): kafka_handler.consume_messages(process_message) -# Start Kafka consumer in a background thread +# Start Kafka consumer for A -> B in a background thread consumer_thread = threading.Thread(target=run_consumer, args=(kafka_handler,), daemon=True) consumer_thread.start() -# Uncomment the following lines if you need to use logUpdate -# db_trigger = threading.Thread(target=logUpdate, daemon=True) -# db_trigger.start() - -# --- Flask Route to Publish Message to Kafka and Wait for a Response --- - +# --- Flask Route to Publish Message to Kafka and Wait for a Response (A -> B) --- @app.route('/kafka-publish-endpoint', methods=['POST']) def kafka_publish(): """ diff --git a/last_id.txt b/last_id.txt index 62f945751..da2d3988d 100644 --- a/last_id.txt +++ b/last_id.txt @@ -1 +1 @@ -6 \ No newline at end of file +14 \ No newline at end of file From 067a9d8020a87b935bda4b239f2809acf8917539 Mon Sep 17 00:00:00 2001 From: AAVISH GILBERT J <71066342+Aavish-Gilbert-J@users.noreply.github.com> Date: Mon, 16 Sep 2024 17:57:37 +0530 Subject: [PATCH 11/17] Update README.md --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index 900913b2e..ec3983112 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,9 @@ [![](https://mermaid.ink/img/pako:eNqVVV1r2zAU_StCUEhpQt79MGiTrrAta1Zn7MUQFPnGFrElV5I3Qsl_n2QpjmyUkfkl4X6ce-7V0dUHpiIHnGBaEaWWjBSS1BlH5uss6EWIooK0BNAKfTiP_WbKmpZEE5SgL-nr94vrQfDnnOmNZEUBcnIfeBTwfC2UfoP3FpQ--04ZD2s-Nk1KJWt0WPAhFzZzAvcD2wuMTE27q5gqfzY50bARX8n-QAYcqOCqraFzuCh1hUcXMuDgwVegFCkgBhtzlYTnFSwE31eM6mvVfhGmpclOQf4GOZj18skkc6CaCR7gzuujeq-2bddEaG_MoAqDFXGpI6fbojvUbXeEoZN6iluTK6p2VM4eXmygEkj-WYp64Bk1tzqmP74tn0ZdxeXjSHcZgzp70LRcmGEWV09s7Rr_31o-Td1e7u7O3wzkr4Y5Yqtb5IW7EHXdckbJZYaDmzSbfQpkniB3ZWyZHj9As-BOjBHcC4wFdWEJWjulItcg0sJ7NqJhFE288H1bzjXmtHCK9il7IZHPmvdSDuie5Yu8fv_JeaT1GHFXqy816mDMewQYI98jvfXinvuGgjYC9lYxO6LgxgbOEk88dWdAxjJZv6YbNLitfvDnnGgLj_TAxZ8K8gI85NXaA933BM6yDjiMNoOnMUi_mct4Zs88bwTjRt123CklFdmxiuljlHYS3V-RoOgyi8Rd2WyRyOiaw1Ncg6wJy81r2K2ODOsSashwYv7mRB4ynPGTiSOtFqmphhMtW5hiR8w_njjZk0oZK5gLLeTKP6_2Z4qlaIvSR5z-AgPBZ3Q?type=png)](https://mermaid.live/edit#pako:eNqVVV1r2zAU_StCUEhpQt79MGiTrrAta1Zn7MUQFPnGFrElV5I3Qsl_n2QpjmyUkfkl4X6ce-7V0dUHpiIHnGBaEaWWjBSS1BlH5uss6EWIooK0BNAKfTiP_WbKmpZEE5SgL-nr94vrQfDnnOmNZEUBcnIfeBTwfC2UfoP3FpQ--04ZD2s-Nk1KJWt0WPAhFzZzAvcD2wuMTE27q5gqfzY50bARX8n-QAYcqOCqraFzuCh1hUcXMuDgwVegFCkgBhtzlYTnFSwE31eM6mvVfhGmpclOQf4GOZj18skkc6CaCR7gzuujeq-2bddEaG_MoAqDFXGpI6fbojvUbXeEoZN6iluTK6p2VM4eXmygEkj-WYp64Bk1tzqmP74tn0ZdxeXjSHcZgzp70LRcmGEWV09s7Rr_31o-Td1e7u7O3wzkr4Y5Yqtb5IW7EHXdckbJZYaDmzSbfQpkniB3ZWyZHj9As-BOjBHcC4wFdWEJWjulItcg0sJ7NqJhFE288H1bzjXmtHCK9il7IZHPmvdSDuie5Yu8fv_JeaT1GHFXqy816mDMewQYI98jvfXinvuGgjYC9lYxO6LgxgbOEk88dWdAxjJZv6YbNLitfvDnnGgLj_TAxZ8K8gI85NXaA933BM6yDjiMNoOnMUi_mct4Zs88bwTjRt123CklFdmxiuljlHYS3V-RoOgyi8Rd2WyRyOiaw1Ncg6wJy81r2K2ODOsSashwYv7mRB4ynPGTiSOtFqmphhMtW5hiR8w_njjZk0oZK5gLLeTKP6_2Z4qlaIvSR5z-AgPBZ3Q) +### DEMO Video +https://drive.google.com/file/d/18NcHJFeRSfQ9IOGx4iLDHjqH73XrxyQF/view?usp=sharing + ### Welcome to Superjoin's hiring assignment! πŸš€ From 81c1dfa062e017d5b09a763118284e9c542668e9 Mon Sep 17 00:00:00 2001 From: AAVISH GILBERT J Date: Mon, 16 Sep 2024 17:58:09 +0530 Subject: [PATCH 12/17] Final Commit --- app.py | 92 +++++++++++++++++++++++++++++++++++++++++++++++++- data.json | 41 ++++++++++++++++++++++ last_id.txt | 2 +- updateSheet.py | 62 ---------------------------------- 4 files changed, 133 insertions(+), 64 deletions(-) create mode 100644 data.json diff --git a/app.py b/app.py index 361214ee8..8f50d4f6a 100644 --- a/app.py +++ b/app.py @@ -248,7 +248,7 @@ # if __name__ == '__main__': # app.run(host='0.0.0.0', port=5000) - +#========================================================================================== from flask import Flask, request, jsonify import logging import threading @@ -257,6 +257,7 @@ import time from twoWayKafka import KafkaHandler from dbCode.mysqlScript import sync_sheet_from_json +import requests logging.basicConfig(level=logging.INFO) @@ -307,6 +308,69 @@ def write_last_id(last_id): with open('last_id.txt', 'w') as f: f.write(str(last_id)) +import json +import os + +import json +import os + +import json +import os + +def update_sheet(input_data): + # Define the file name for the data + file_name = 'data.json' + + # Create the structure to hold the changes for the current input + change_entry = { + "row_number": input_data['row_number'], + "column_number": input_data['column_number'], + "value": input_data.get('value', ''), + "operation": input_data['operation'], + "changed_by": input_data['changed_by'], + "changed_at": input_data['changed_at'], + "is_current": input_data['is_current'] + } + + # Initialize an empty structure + data = {"sheets": []} + + # Load the existing file if it exists and contains valid JSON + if os.path.exists(file_name): + try: + with open(file_name, 'r') as file: + file_content = file.read().strip() + if file_content: + data = json.loads(file_content) + except json.JSONDecodeError: + print(f"Warning: {file_name} contains invalid JSON. Reinitializing.") + data = {"sheets": []} + + # Find the sheet in the JSON structure, or add a new sheet entry + sheet_found = False + for sheet in data['sheets']: + if sheet['sheet_id'] == input_data['sheet_id']: + sheet['changes'].append(change_entry) + sheet_found = True + break + + # If the sheet does not exist, create a new sheet entry + if not sheet_found: + new_sheet = { + "sheet_id": input_data['sheet_id'], + "changes": [change_entry] + } + data['sheets'].append(new_sheet) + + # Save the updated data back to the file + with open(file_name, 'w') as file: + json.dump(data, file, indent=4) + + return data + + + + # --- Cells Monitoring Function (C) --- def monitor_cells(kafka_handler_cells, db_config): last_id = read_last_id() # Read from file or initialize to 0 @@ -393,6 +457,11 @@ def process_cells_message(msg): # Simulate processing the request and send a response correlation_id = msg.get('correlation_id') + + + logging.info("++++++++++++++++++++++++++") + response_data = update_sheet(msg) + logging.info(f"Response from Google Apps Script: {response_data}") if correlation_id: response = { 'status': 'success', @@ -449,6 +518,27 @@ def kafka_publish(): response = kafka_handler.send_request(data, timeout=10) return jsonify(response) +FILE_PATH = 'data.json' +@app.route('/updateSheet', methods=['POST']) +def read_json_from_file(): + """ + Endpoint to read stored JSON from a file and return it as a response. + """ + try: + # Open and read the JSON file + with open(FILE_PATH, 'r') as file: + data = json.load(file) # Parse the JSON content + # print(data) + # Return the JSON data as a response + return jsonify(data) + + except FileNotFoundError: + return jsonify({"status": "error", "message": "File not found"}), 404 + except json.JSONDecodeError: + return jsonify({"status": "error", "message": "Error decoding JSON"}), 400 + except Exception as e: + return jsonify({"status": "error", "message": str(e)}), 500 + # Start Flask server if __name__ == '__main__': app.run(host='0.0.0.0', port=5000) diff --git a/data.json b/data.json new file mode 100644 index 000000000..a707dc070 --- /dev/null +++ b/data.json @@ -0,0 +1,41 @@ +{ + "sheets": [ + { + "sheet_id": 1, + "changes": [ + { + "row_number": 2, + "column_number": 1, + "value": "python", + "operation": "insert", + "changed_by": "sheet_sync_user@localhost", + "changed_at": "2024-09-16 16:13:49", + "is_current": 1 + }, + { + "row_number": 1, + "column_number": 1, + "value": "Gilbert J", + "operation": "update", + "changed_by": "sheet_sync_user@localhost", + "changed_at": "2024-09-16 16:14:00", + "is_current": 1 + } + ] + }, + { + "sheet_id": 2, + "changes": [ + { + "row_number": 1, + "column_number": 1, + "value": "kafka", + "operation": "insert", + "changed_by": "sheet_sync_user@localhost", + "changed_at": "2024-09-16 16:13:49", + "is_current": 1 + } + ] + } + ] +} diff --git a/last_id.txt b/last_id.txt index da2d3988d..aaa6442fe 100644 --- a/last_id.txt +++ b/last_id.txt @@ -1 +1 @@ -14 \ No newline at end of file +41 \ No newline at end of file diff --git a/updateSheet.py b/updateSheet.py index a51346e91..e69de29bb 100644 --- a/updateSheet.py +++ b/updateSheet.py @@ -1,62 +0,0 @@ -import mysql.connector -import requests -import time - -# Configuration -DB_CONFIG = { - 'host': 'localhost', - 'user': 'sheet_sync_user', - 'password': 'Aavish@02', - 'database': 'google_sheet_mimic' -} - -APP_SCRIPT_URL = 'https://script.google.com/macros/s/AKfycbx0Ffi116BJDJPlH6bLHKJAGb8yNnU-By2Fp0kLySJ1ruJt-jV7nhlpyAwWGGKKwW7UCg/exec' - -def fetch_unprocessed_changes(db_conn): - cursor = db_conn.cursor(dictionary=True) - query = "SELECT * FROM cell_changes WHERE processed = 0 ORDER BY changed_at ASC" - cursor.execute(query) - changes = cursor.fetchall() - cursor.close() - return changes - -def mark_changes_as_processed(db_conn, change_ids): - cursor = db_conn.cursor() - format_strings = ','.join(['%s'] * len(change_ids)) - query = "UPDATE cell_changes SET processed = 1 WHERE id IN (%s)" % format_strings - cursor.execute(query, tuple(change_ids)) - db_conn.commit() - cursor.close() - -def logUpdate(): - db_conn = mysql.connector.connect(**DB_CONFIG) - - try: - while True: - changes = fetch_unprocessed_changes(db_conn) - if changes: - # Send changes to Apps Script - payload = {'changes': changes} - response = requests.post(APP_SCRIPT_URL, json=payload) - - if response.status_code == 200: - print("Changes sent successfully.") - # Mark changes as processed - change_ids = [change['id'] for change in changes] - mark_changes_as_processed(db_conn, change_ids) - else: - print(f"Failed to send changes. Status code: {response.status_code}") - print(f"Response: {response.text}") - else: - print("No new changes.") - - # Wait before checking again - time.sleep(2) # Adjust the interval as needed - - except KeyboardInterrupt: - print("Stopping the monitoring script.") - finally: - db_conn.close() - -if __name__ == "__main__": - logUpdate() From d9d1a6af18eea604a95d1894329cb7644824c5f1 Mon Sep 17 00:00:00 2001 From: AAVISH GILBERT J <71066342+Aavish-Gilbert-J@users.noreply.github.com> Date: Mon, 16 Sep 2024 18:15:48 +0530 Subject: [PATCH 13/17] Update README.md --- README.md | 45 +++++++++++++++++++++++++++++++++++++-------- 1 file changed, 37 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index ec3983112..11fe9f682 100644 --- a/README.md +++ b/README.md @@ -6,8 +6,6 @@ [![](https://mermaid.ink/img/pako:eNqVVV1r2zAU_StCUEhpQt79MGiTrrAta1Zn7MUQFPnGFrElV5I3Qsl_n2QpjmyUkfkl4X6ce-7V0dUHpiIHnGBaEaWWjBSS1BlH5uss6EWIooK0BNAKfTiP_WbKmpZEE5SgL-nr94vrQfDnnOmNZEUBcnIfeBTwfC2UfoP3FpQ--04ZD2s-Nk1KJWt0WPAhFzZzAvcD2wuMTE27q5gqfzY50bARX8n-QAYcqOCqraFzuCh1hUcXMuDgwVegFCkgBhtzlYTnFSwE31eM6mvVfhGmpclOQf4GOZj18skkc6CaCR7gzuujeq-2bddEaG_MoAqDFXGpI6fbojvUbXeEoZN6iluTK6p2VM4eXmygEkj-WYp64Bk1tzqmP74tn0ZdxeXjSHcZgzp70LRcmGEWV09s7Rr_31o-Td1e7u7O3wzkr4Y5Yqtb5IW7EHXdckbJZYaDmzSbfQpkniB3ZWyZHj9As-BOjBHcC4wFdWEJWjulItcg0sJ7NqJhFE288H1bzjXmtHCK9il7IZHPmvdSDuie5Yu8fv_JeaT1GHFXqy816mDMewQYI98jvfXinvuGgjYC9lYxO6LgxgbOEk88dWdAxjJZv6YbNLitfvDnnGgLj_TAxZ8K8gI85NXaA933BM6yDjiMNoOnMUi_mct4Zs88bwTjRt123CklFdmxiuljlHYS3V-RoOgyi8Rd2WyRyOiaw1Ncg6wJy81r2K2ODOsSashwYv7mRB4ynPGTiSOtFqmphhMtW5hiR8w_njjZk0oZK5gLLeTKP6_2Z4qlaIvSR5z-AgPBZ3Q?type=png)](https://mermaid.live/edit#pako:eNqVVV1r2zAU_StCUEhpQt79MGiTrrAta1Zn7MUQFPnGFrElV5I3Qsl_n2QpjmyUkfkl4X6ce-7V0dUHpiIHnGBaEaWWjBSS1BlH5uss6EWIooK0BNAKfTiP_WbKmpZEE5SgL-nr94vrQfDnnOmNZEUBcnIfeBTwfC2UfoP3FpQ--04ZD2s-Nk1KJWt0WPAhFzZzAvcD2wuMTE27q5gqfzY50bARX8n-QAYcqOCqraFzuCh1hUcXMuDgwVegFCkgBhtzlYTnFSwE31eM6mvVfhGmpclOQf4GOZj18skkc6CaCR7gzuujeq-2bddEaG_MoAqDFXGpI6fbojvUbXeEoZN6iluTK6p2VM4eXmygEkj-WYp64Bk1tzqmP74tn0ZdxeXjSHcZgzp70LRcmGEWV09s7Rr_31o-Td1e7u7O3wzkr4Y5Yqtb5IW7EHXdckbJZYaDmzSbfQpkniB3ZWyZHj9As-BOjBHcC4wFdWEJWjulItcg0sJ7NqJhFE288H1bzjXmtHCK9il7IZHPmvdSDuie5Yu8fv_JeaT1GHFXqy816mDMewQYI98jvfXinvuGgjYC9lYxO6LgxgbOEk88dWdAxjJZv6YbNLitfvDnnGgLj_TAxZ8K8gI85NXaA933BM6yDjiMNoOnMUi_mct4Zs88bwTjRt123CklFdmxiuljlHYS3V-RoOgyi8Rd2WyRyOiaw1Ncg6wJy81r2K2ODOsSashwYv7mRB4ynPGTiSOtFqmphhMtW5hiR8w_njjZk0oZK5gLLeTKP6_2Z4qlaIvSR5z-AgPBZ3Q) -### DEMO Video -https://drive.google.com/file/d/18NcHJFeRSfQ9IOGx4iLDHjqH73XrxyQF/view?usp=sharing ### Welcome to Superjoin's hiring assignment! πŸš€ @@ -54,11 +52,11 @@ Once you're done, make sure you **record a video** showing your project working. We have a checklist at the bottom of this README file, which you should update as your progress with your assignment. It will help us evaluate your project. -- [ ] My code's working just fine! πŸ₯³ -- [ ] I have recorded a video showing it working and embedded it in the README ▢️ -- [ ] I have tested all the normal working cases 😎 -- [ ] I have even solved some edge cases (brownie points) πŸ’ͺ -- [ ] I added my very planned-out approach to the problem at the end of this README πŸ“œ +- [YES] My code's working just fine! πŸ₯³ +- [YES] I have recorded a video showing it working and embedded it in the README ▢️ +- [YES] I have tested all the normal working cases 😎 +- [YES] I have even solved some edge cases (brownie points) πŸ’ͺ +- [YES] I added my very planned-out approach to the problem at the end of this README πŸ“œ ## Got Questions❓ Feel free to check the discussions tab, you might get some help there. Check out that tab before reaching out to us. Also, did you know, the internet is a great place to explore? πŸ˜› @@ -68,4 +66,35 @@ We're available at techhiring@superjoin.ai for all queries. All the best ✨. ## Developer's Section -*Add your video here, and your approach to the problem (optional). Leave some comments for us here if you want, we will be reading this :)* +### DEMO Video +https://drive.google.com/file/d/18NcHJFeRSfQ9IOGx4iLDHjqH73XrxyQF/view?usp=sharing + +##Google Sheets & MySQL Sync with Kafka Integration +This project implements a real-time synchronization between Google Sheets and a MySQL database, leveraging Kafka for scalability and conflict resolution. The Flask server is hosted via Waitress and exposed to the internet using NGROK. Google Apps Script handles the detection of changes on Google Sheets, triggering the appropriate operations (Create, Read, Update, Delete) in real time. + +##Architecture Overview +Flask Server: Built using Python's Flask framework, served via Waitress. +NGROK: Used to expose the Flask server to the internet for handling requests from Google Sheets. +Kafka: Handles message passing for scalable synchronization between the Google Sheets and MySQL database. It also resolves conflicts when concurrent updates occur. +Google Apps Script: Detects changes in Google Sheets (onEdit) and triggers CRUD operations accordingly. +MySQL Database: Stores the data synced from the Google Sheets. +##Key Features +#Real-Time Synchronization: The system ensures that any changes made in Google Sheets (adding rows, updating values, etc.) are reflected in the MySQL database instantly. +Scalability: Kafka integration allows the system to scale horizontally, ensuring that multiple clients can interact with the database without performance degradation. +#Conflict Resolution: Internal conflict-handling mechanisms resolve issues arising from concurrent edits. +#CRUD Operations: Supports Create, Read, Update, and Delete operations seamlessly between Google Sheets and MySQL. +##Challenges Faced +One major roadblock encountered was configuring Kafka, as it was my first time working with it. The rest of the functionalities were more straightforward since I had prior experience implementing similar operations using Google Apps Script and Python during my internship at EKCS. + +##Personal Reflection +Had my entire weekend occupied with it and was really fun, I learned a lot about Kafka in the process. + +##How to Use +Setup MySQL Database: Ensure you have a running MySQL instance. +Flask Server: Start the Flask server using Waitress and expose it via NGROK. +Google Sheets: Add the appropriate Apps Script to trigger updates on changes in Google Sheets. +Kafka: Set up a Kafka broker and configure the consumer and producer in the Flask app. +##Author +Aavish Gilbert J +Email: aavish.gilbert@gmail.com +PES University, PES1UG21CS012 From eda13d6b31a27d59723bfe053964286814c37f98 Mon Sep 17 00:00:00 2001 From: AAVISH GILBERT J <71066342+Aavish-Gilbert-J@users.noreply.github.com> Date: Mon, 16 Sep 2024 18:17:56 +0530 Subject: [PATCH 14/17] Update README.md --- README.md | 65 +++++++++++++++++++++++++++++++------------------------ 1 file changed, 37 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 11fe9f682..fc5df3624 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ Many businesses use Google Sheets for collaborative data management and database - Ensure the solution can handle large datasets and high-frequency updates without performance degradation. - Optimize for scalability and efficiency. -## Submission ⏰ +### Submission ⏰ The timeline for this submission is: **Next 2 days** Some things you might want to take care of: @@ -66,35 +66,44 @@ We're available at techhiring@superjoin.ai for all queries. All the best ✨. ## Developer's Section -### DEMO Video -https://drive.google.com/file/d/18NcHJFeRSfQ9IOGx4iLDHjqH73XrxyQF/view?usp=sharing +# Google Sheets & MySQL Sync with Kafka Integration -##Google Sheets & MySQL Sync with Kafka Integration This project implements a real-time synchronization between Google Sheets and a MySQL database, leveraging Kafka for scalability and conflict resolution. The Flask server is hosted via Waitress and exposed to the internet using NGROK. Google Apps Script handles the detection of changes on Google Sheets, triggering the appropriate operations (Create, Read, Update, Delete) in real time. -##Architecture Overview -Flask Server: Built using Python's Flask framework, served via Waitress. -NGROK: Used to expose the Flask server to the internet for handling requests from Google Sheets. -Kafka: Handles message passing for scalable synchronization between the Google Sheets and MySQL database. It also resolves conflicts when concurrent updates occur. -Google Apps Script: Detects changes in Google Sheets (onEdit) and triggers CRUD operations accordingly. -MySQL Database: Stores the data synced from the Google Sheets. -##Key Features -#Real-Time Synchronization: The system ensures that any changes made in Google Sheets (adding rows, updating values, etc.) are reflected in the MySQL database instantly. -Scalability: Kafka integration allows the system to scale horizontally, ensuring that multiple clients can interact with the database without performance degradation. -#Conflict Resolution: Internal conflict-handling mechanisms resolve issues arising from concurrent edits. -#CRUD Operations: Supports Create, Read, Update, and Delete operations seamlessly between Google Sheets and MySQL. -##Challenges Faced +## Architecture Overview + +- **Flask Server**: Built using Python's Flask framework, served via Waitress. +- **NGROK**: Used to expose the Flask server to the internet for handling requests from Google Sheets. +- **Kafka**: Handles message passing for scalable synchronization between the Google Sheets and MySQL database. It also resolves conflicts when concurrent updates occur. +- **Google Apps Script**: Detects changes in Google Sheets (onEdit) and triggers CRUD operations accordingly. +- **MySQL Database**: Stores the data synced from the Google Sheets. + +## Key Features + +- **Real-Time Synchronization**: The system ensures that any changes made in Google Sheets (adding rows, updating values, etc.) are reflected in the MySQL database instantly. +- **Scalability**: Kafka integration allows the system to scale horizontally, ensuring that multiple clients can interact with the database without performance degradation. +- **Conflict Resolution**: Internal conflict-handling mechanisms resolve issues arising from concurrent edits. +- **CRUD Operations**: Supports Create, Read, Update, and Delete operations seamlessly between Google Sheets and MySQL. + +## Challenges Faced + One major roadblock encountered was configuring Kafka, as it was my first time working with it. The rest of the functionalities were more straightforward since I had prior experience implementing similar operations using Google Apps Script and Python during my internship at EKCS. -##Personal Reflection -Had my entire weekend occupied with it and was really fun, I learned a lot about Kafka in the process. - -##How to Use -Setup MySQL Database: Ensure you have a running MySQL instance. -Flask Server: Start the Flask server using Waitress and expose it via NGROK. -Google Sheets: Add the appropriate Apps Script to trigger updates on changes in Google Sheets. -Kafka: Set up a Kafka broker and configure the consumer and producer in the Flask app. -##Author -Aavish Gilbert J -Email: aavish.gilbert@gmail.com -PES University, PES1UG21CS012 +## Personal Reflection + +Building this project was really fun and I learned a lot about Kafka in the process. + +--- + +## How to Use + +1. **Setup MySQL Database**: Ensure you have a running MySQL instance. +2. **Flask Server**: Start the Flask server using Waitress and expose it via NGROK. +3. **Google Sheets**: Add the appropriate Apps Script to trigger updates on changes in Google Sheets. +4. **Kafka**: Set up a Kafka broker and configure the consumer and producer in the Flask app. + +## Author + +Aavish Gilbert J +Email: aavish.gilbert@gmail.com +PES University, Roll Number: PES1UG21CS012 From 4613c2927b64e8d255ed58cf01b7ec69c04f9ca7 Mon Sep 17 00:00:00 2001 From: AAVISH GILBERT J <71066342+Aavish-Gilbert-J@users.noreply.github.com> Date: Mon, 16 Sep 2024 18:19:09 +0530 Subject: [PATCH 15/17] Update README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index fc5df3624..45d9ec270 100644 --- a/README.md +++ b/README.md @@ -75,7 +75,7 @@ This project implements a real-time synchronization between Google Sheets and a - **Flask Server**: Built using Python's Flask framework, served via Waitress. - **NGROK**: Used to expose the Flask server to the internet for handling requests from Google Sheets. - **Kafka**: Handles message passing for scalable synchronization between the Google Sheets and MySQL database. It also resolves conflicts when concurrent updates occur. -- **Google Apps Script**: Detects changes in Google Sheets (onEdit) and triggers CRUD operations accordingly. +- **Google Apps Script**: Detects changes in Google Sheets and triggers CRUD operations accordingly. - **MySQL Database**: Stores the data synced from the Google Sheets. ## Key Features @@ -106,4 +106,4 @@ Building this project was really fun and I learned a lot about Kafka in the proc Aavish Gilbert J Email: aavish.gilbert@gmail.com -PES University, Roll Number: PES1UG21CS012 +PES University, PES1UG21CS012 From 1af486dfb639b48db5fdbdf919b9ddbcf1c458b3 Mon Sep 17 00:00:00 2001 From: AAVISH GILBERT J <71066342+Aavish-Gilbert-J@users.noreply.github.com> Date: Mon, 16 Sep 2024 18:20:14 +0530 Subject: [PATCH 16/17] Update README.md --- README.md | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 45d9ec270..4af7d8cd3 100644 --- a/README.md +++ b/README.md @@ -52,11 +52,11 @@ Once you're done, make sure you **record a video** showing your project working. We have a checklist at the bottom of this README file, which you should update as your progress with your assignment. It will help us evaluate your project. -- [YES] My code's working just fine! πŸ₯³ -- [YES] I have recorded a video showing it working and embedded it in the README ▢️ -- [YES] I have tested all the normal working cases 😎 -- [YES] I have even solved some edge cases (brownie points) πŸ’ͺ -- [YES] I added my very planned-out approach to the problem at the end of this README πŸ“œ +- [] My code's working just fine! πŸ₯³ +- [] I have recorded a video showing it working and embedded it in the README ▢️ +- [] I have tested all the normal working cases 😎 +- [] I have even solved some edge cases (brownie points) πŸ’ͺ +- [] I added my very planned-out approach to the problem at the end of this README πŸ“œ ## Got Questions❓ Feel free to check the discussions tab, you might get some help there. Check out that tab before reaching out to us. Also, did you know, the internet is a great place to explore? πŸ˜› @@ -102,6 +102,13 @@ Building this project was really fun and I learned a lot about Kafka in the proc 3. **Google Sheets**: Add the appropriate Apps Script to trigger updates on changes in Google Sheets. 4. **Kafka**: Set up a Kafka broker and configure the consumer and producer in the Flask app. +## Checklist +- [YES] My code's working just fine! πŸ₯³ +- [YES] I have recorded a video showing it working and embedded it in the README ▢️ +- [YES] I have tested all the normal working cases 😎 +- [YES] I have even solved some edge cases (brownie points) πŸ’ͺ +- [YES] I added my very planned-out approach to the problem at the end of this README πŸ“œ + ## Author Aavish Gilbert J From cf941638c5afeeb251349942ce3c7e19a6479e4d Mon Sep 17 00:00:00 2001 From: AAVISH GILBERT J <71066342+Aavish-Gilbert-J@users.noreply.github.com> Date: Mon, 16 Sep 2024 18:45:30 +0530 Subject: [PATCH 17/17] Added Video --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 4af7d8cd3..f22ce1961 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,9 @@ [![](https://mermaid.ink/img/pako:eNqVVV1r2zAU_StCUEhpQt79MGiTrrAta1Zn7MUQFPnGFrElV5I3Qsl_n2QpjmyUkfkl4X6ce-7V0dUHpiIHnGBaEaWWjBSS1BlH5uss6EWIooK0BNAKfTiP_WbKmpZEE5SgL-nr94vrQfDnnOmNZEUBcnIfeBTwfC2UfoP3FpQ--04ZD2s-Nk1KJWt0WPAhFzZzAvcD2wuMTE27q5gqfzY50bARX8n-QAYcqOCqraFzuCh1hUcXMuDgwVegFCkgBhtzlYTnFSwE31eM6mvVfhGmpclOQf4GOZj18skkc6CaCR7gzuujeq-2bddEaG_MoAqDFXGpI6fbojvUbXeEoZN6iluTK6p2VM4eXmygEkj-WYp64Bk1tzqmP74tn0ZdxeXjSHcZgzp70LRcmGEWV09s7Rr_31o-Td1e7u7O3wzkr4Y5Yqtb5IW7EHXdckbJZYaDmzSbfQpkniB3ZWyZHj9As-BOjBHcC4wFdWEJWjulItcg0sJ7NqJhFE288H1bzjXmtHCK9il7IZHPmvdSDuie5Yu8fv_JeaT1GHFXqy816mDMewQYI98jvfXinvuGgjYC9lYxO6LgxgbOEk88dWdAxjJZv6YbNLitfvDnnGgLj_TAxZ8K8gI85NXaA933BM6yDjiMNoOnMUi_mct4Zs88bwTjRt123CklFdmxiuljlHYS3V-RoOgyi8Rd2WyRyOiaw1Ncg6wJy81r2K2ODOsSashwYv7mRB4ynPGTiSOtFqmphhMtW5hiR8w_njjZk0oZK5gLLeTKP6_2Z4qlaIvSR5z-AgPBZ3Q?type=png)](https://mermaid.live/edit#pako:eNqVVV1r2zAU_StCUEhpQt79MGiTrrAta1Zn7MUQFPnGFrElV5I3Qsl_n2QpjmyUkfkl4X6ce-7V0dUHpiIHnGBaEaWWjBSS1BlH5uss6EWIooK0BNAKfTiP_WbKmpZEE5SgL-nr94vrQfDnnOmNZEUBcnIfeBTwfC2UfoP3FpQ--04ZD2s-Nk1KJWt0WPAhFzZzAvcD2wuMTE27q5gqfzY50bARX8n-QAYcqOCqraFzuCh1hUcXMuDgwVegFCkgBhtzlYTnFSwE31eM6mvVfhGmpclOQf4GOZj18skkc6CaCR7gzuujeq-2bddEaG_MoAqDFXGpI6fbojvUbXeEoZN6iluTK6p2VM4eXmygEkj-WYp64Bk1tzqmP74tn0ZdxeXjSHcZgzp70LRcmGEWV09s7Rr_31o-Td1e7u7O3wzkr4Y5Yqtb5IW7EHXdckbJZYaDmzSbfQpkniB3ZWyZHj9As-BOjBHcC4wFdWEJWjulItcg0sJ7NqJhFE288H1bzjXmtHCK9il7IZHPmvdSDuie5Yu8fv_JeaT1GHFXqy816mDMewQYI98jvfXinvuGgjYC9lYxO6LgxgbOEk88dWdAxjJZv6YbNLitfvDnnGgLj_TAxZ8K8gI85NXaA933BM6yDjiMNoOnMUi_mct4Zs88bwTjRt123CklFdmxiuljlHYS3V-RoOgyi8Rd2WyRyOiaw1Ncg6wJy81r2K2ODOsSashwYv7mRB4ynPGTiSOtFqmphhMtW5hiR8w_njjZk0oZK5gLLeTKP6_2Z4qlaIvSR5z-AgPBZ3Q) +### Video +https://drive.google.com/file/d/18NcHJFeRSfQ9IOGx4iLDHjqH73XrxyQF/view?usp=sharing ### Welcome to Superjoin's hiring assignment! πŸš€