Skip to content

Commit

Permalink
ElasticSearchWriter Plugin (#880)
Browse files Browse the repository at this point in the history
* initial commit for WriteLocalDatabase plugin

* feat: add identifier for document upsert
  • Loading branch information
mattcam authored Oct 28, 2024
1 parent 93789bb commit 0f84231
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from typing import Optional

from pydantic import field_validator
import json
from tracardi.domain.named_entity import NamedEntity
from tracardi.service.plugin.domain.config import PluginConfig

class Config(PluginConfig):
index: str
documents: str
source: NamedEntity
identifier: str
log: Optional[bool] = False

@field_validator("index")
@classmethod
def validate_index(cls, value):
if value is None or len(value) == 0:
raise ValueError("This field cannot be empty.")
return value

@field_validator("documents")
@classmethod
def validate_documents(cls, value):
if value is None or len(value) == 0:
raise ValueError("This field cannot be empty.")
return value
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import json

from tracardi.domain.value_object.bulk_insert_result import BulkInsertResult
from tracardi.service.notation.dict_traverser import DictTraverser

from tracardi.service.plugin.domain.register import Plugin, Spec, MetaData, Documentation, PortDoc, Form, FormGroup, \
FormField, FormComponent
from tracardi.service.plugin.runner import ActionRunner
from tracardi.service.plugin.domain.result import Result
from .model.config import Config
from tracardi.service.storage.elastic.interface import raw as raw_db


def validate(config: dict):
config = Config(**config)
return config

class WriteLocalDatabase(ActionRunner):

config: Config

async def set_up(self, init):
self.config = Config(**init)

async def run(self, payload: dict, in_edge=None) -> Result:

dot = self._get_dot_accessor(payload)

try:

index=dot[self.config.index]
documents=dot[self.config.documents]
identifier=dot[self.config.identifier]

if isinstance(documents, str):
documents = json.loads(documents)

if isinstance(documents, list) and identifier:
documents = [{identifier: item} for item in documents]

if identifier:
for item in documents:
if identifier in item:
item["_id"] = item[identifier]

result = await raw_db.bulk_upsert(
index=index,
data=documents
)

if isinstance(result, BulkInsertResult):
result_dict = result.dict()

except Exception as e:
self.console.error(str(e))
return Result(port="error", value={
"message": str(e)
})

return Result(port="result", value=result_dict)

def register() -> Plugin:
return Plugin(
start=False,
spec=Spec(
module=__name__,
className=WriteLocalDatabase.__name__,
inputs=["payload"],
outputs=["result", "error"],
version='1.0.3',
license="MIT",
author="Matt Cameron",
manual='write_data',
init={
"index": None,
"document": None
},
form=Form(
groups=[
FormGroup(
name="Configuration",
fields=[
FormField(
id="source",
name="Elasticsearch resource",
description="Please select your Elasticsearch resource.",
component=FormComponent(type="resource", props={"label": "Resource",
"tag": "elasticsearch"})
),
FormField(
id="index",
name="Index",
description="The index for where the documents will be upserted.",
component=FormComponent(type="dotPath", props={
"label": "Index"
})
),
FormField(
id="documents",
name="Documents",
description="The documents to be upserted/inserted.",
component=FormComponent(type="dotPath", props={
"label": "Documents"
})
),
FormField(
id="identifier",
name="Identifier",
description="The primary key to be used if documents are to be upserted.",
component=FormComponent(type="dotPath", props={
"label": "Documents"
})
),
]
)
]
)
),
metadata=MetaData(
name='Write data',
desc='Write local Elasticsearch database',
icon='elasticsearch',
group=["Databases"],
tags=['database', 'nosql', 'elastic'],
documentation=Documentation(
inputs={
"payload": PortDoc(desc="This port takes payload object.")
},
outputs={
"result": PortDoc(desc="This port returns result of upserting ElasticSearch instance."),
"error": PortDoc(desc="This port returns error if an error occurs.")
}
)
)
)
6 changes: 6 additions & 0 deletions tracardi/service/setup/setup_plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,12 @@
resource=None)
),

"tracardi.process_engine.action.v1.connectors.elasticsearch.write_local.plugin": PluginMetadata(
test=PluginTest(
init={'index': '', 'documents':'', 'identifier':''},
resource=None)
),

"tracardi.process_engine.action.v1.connectors.ghost.plugin": PluginMetadata(
test=PluginTest(init={'uuid': ''},
resource={
Expand Down

0 comments on commit 0f84231

Please sign in to comment.