-
Notifications
You must be signed in to change notification settings - Fork 0
/
fapi.py
110 lines (91 loc) · 3.23 KB
/
fapi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import psycopg2
from fastapi import FastAPI
from typing import List
from pydantic import BaseModel
# Update connection string information
host = "pythonfastapi.postgres.database.azure.com"
dbname = "postgres"
user = "fastapi@pythonfastapi"
password = "apifast1234*"
sslmode = "require"
# FastAPI setup
app = FastAPI()
# Construct connection string
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)
# Function to establish a database connection
def get_db():
conn = psycopg2.connect(conn_string)
print("Connection established")
try:
yield conn
finally:
conn.close()
# Create a model for CRUD operations
class InventoryItem(BaseModel):
id: int
name: str
quantity: int
# API endpoint to create the table
@app.post("/create-table/")
async def create_table():
conn = psycopg2.connect(conn_string)
cursor = conn.cursor()
cursor.execute("CREATE TABLE IF NOT EXISTS inventory (id serial PRIMARY KEY, name VARCHAR(50), quantity INTEGER);")
conn.commit()
cursor.close()
conn.close()
return {"message": "Table created successfully"}
# API endpoint to insert data into the table
@app.post("/insert-data/")
async def insert_data(name: str, quantity: int):
conn = psycopg2.connect(conn_string)
cursor = conn.cursor()
cursor.execute("INSERT INTO inventory (name, quantity) VALUES (%s, %s);", (name, quantity))
conn.commit()
cursor.close()
conn.close()
return {"message": "Data inserted successfully"}
# Create endpoint for reading all items
@app.get("/items/", response_model=List[InventoryItem])
async def get_items():
conn = psycopg2.connect(conn_string)
cursor = conn.cursor()
cursor.execute("SELECT id, name, quantity FROM inventory;")
items = [InventoryItem(id=id, name=name, quantity=quantity) for id, name, quantity in cursor.fetchall()]
cursor.close()
conn.close()
return items
# Create endpoint for reading a single item by ID
@app.get("/items/{item_id}", response_model=InventoryItem)
async def read_item(item_id: int):
conn = psycopg2.connect(conn_string)
cursor = conn.cursor()
cursor.execute("SELECT id, name, quantity FROM inventory WHERE id = %s;", (item_id,))
result = cursor.fetchone()
cursor.close()
conn.close()
if result:
id, name, quantity = result
return InventoryItem(id=id, name=name, quantity=quantity) # Create a Pydantic model instance
else:
return None
# Create endpoint for updating an item
@app.put("/items/{item_id}/")
async def update_item(item_id: int, name: str, quantity: int):
conn = psycopg2.connect(conn_string)
cursor = conn.cursor()
cursor.execute("UPDATE inventory SET name = %s, quantity = %s WHERE id = %s;", (name, quantity, item_id))
conn.commit()
cursor.close()
conn.close()
return {"message": "Item updated successfully"}
# Create endpoint for deleting an item
@app.delete("/items/{item_id}/")
async def delete_item(item_id: int):
conn = psycopg2.connect(conn_string)
cursor = conn.cursor()
cursor.execute("DELETE FROM inventory WHERE id = %s;", (item_id,))
conn.commit()
cursor.close()
conn.close()
return {"message": "Item deleted successfully"}