Skip to content

Commit

Permalink
feat: mongodb extractor (#87)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrayFlash authored Jun 30, 2021
1 parent aa240c8 commit a48dbd5
Show file tree
Hide file tree
Showing 7 changed files with 378 additions and 1 deletion.
8 changes: 8 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
services:
mongo:
image: mongo:4.4.6
ports:
- 27017:27017
env:
MONGO_INITDB_ROOT_USERNAME: user
MONGO_INITDB_ROOT_PASSWORD: abcd
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
Expand Down
10 changes: 10 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
version: '3'
services:
mongo:
image: mongo:4.4.6
environment:
- MONGO_INITDB_ROOT_USERNAME=user
- MONGO_INITDB_ROOT_PASSWORD=abcd
restart: on-failure
ports:
- 27017:27017
11 changes: 11 additions & 0 deletions docs/guides/extractors.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,14 @@ Leaving `service_account_json` blank will default to [Google's default authentic

#### *Notes*
Leaving `service_account_json` blank will default to [Google's default authentication](https://cloud.google.com/docs/authentication/production#automatically). It is recommended if Meteor instance runs inside the same Google Cloud environment as the BigQuery project.

## MongoDB

`mongodb`

### Configs
| Key | Value | Example | Description | |
| :-- | :---- | :------ | :---------- | :- |
| `user_id` | `string` | `user` | User ID to access the mongo server| *required* |
| `password` | `string` | `abcd` | Password for the Mongo Server | *required* |
| `host` | `string` | `localhost:27017` | The Host at which server is running | *required* |
92 changes: 92 additions & 0 deletions extractors/mongodb/extractor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package mongodb

import (
"context"
"errors"
"sort"
"time"

"github.com/mitchellh/mapstructure"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

type Extractor struct{}

type Config struct {
UserID string `mapstructure:"user_id"`
Password string `mapstructure:"password"`
Host string `mapstructure:"host"`
}

func (e *Extractor) Extract(configMap map[string]interface{}) (result []map[string]interface{}, err error) {
config, err := e.getConfig(configMap)
if err != nil {
return
}
err = e.validateConfig(config)
if err != nil {
return
}
uri := "mongodb://" + config.UserID + ":" + config.Password + "@" + config.Host
clientOptions := options.Client().ApplyURI(uri)
client, err := mongo.NewClient(clientOptions)
if err != nil {
return
}
ctx, _ := context.WithTimeout(context.Background(), 4*time.Second)
err = client.Connect(ctx)
if err != nil {
return
}
result, err = e.listCollections(client, ctx)
if err != nil {
return
}
return result, err
}

func (e *Extractor) listCollections(client *mongo.Client, ctx context.Context) (result []map[string]interface{}, err error) {
databases, err := client.ListDatabaseNames(ctx, bson.M{})
if err != nil {
return
}
sort.Strings(databases)
var collections []string
for _, db_name := range databases {
db := client.Database(db_name)
collections, err = db.ListCollectionNames(ctx, bson.D{})
if err != nil {
return
}
sort.Strings(collections)
for _, collection := range collections {
row := make(map[string]interface{})
row["collection_name"] = collection
row["database_name"] = db_name
count, _ := db.Collection(collection).EstimatedDocumentCount(ctx)
row["document_count"] = int(count)
result = append(result, row)
}
}
return result, err
}

func (e *Extractor) getConfig(configMap map[string]interface{}) (config Config, err error) {
err = mapstructure.Decode(configMap, &config)
return
}

func (e *Extractor) validateConfig(config Config) (err error) {
if config.UserID == "" {
return errors.New("user_id is required")
}
if config.Password == "" {
return errors.New("password is required")
}
if config.Host == "" {
return errors.New("host address is required")
}
return
}
181 changes: 181 additions & 0 deletions extractors/mongodb/extractor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package mongodb_test

import (
"context"
"log"
"testing"
"time"

"github.com/odpf/meteor/extractors/mongodb"
"github.com/stretchr/testify/assert"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

var testDB string = "MeteorMongoExtractorTest"

var posts = []interface{}{
bson.D{{"title", "World"}, {"body", "Hello World"}},
bson.D{{"title", "Mars"}, {"body", "Hello Mars"}},
bson.D{{"title", "Pluto"}, {"body", "Hello Pluto"}},
}

var connections = []interface{}{
bson.D{{"name", "Albert"}, {"relation", "mutual"}},
bson.D{{"name", "Josh"}, {"relation", "following"}},
bson.D{{"name", "Abish"}, {"relation", "follower"}},
}

var reach = []interface{}{
bson.D{{"views", "500"}, {"likes", "200"}, {"comments", "50"}},
bson.D{{"views", "400"}, {"likes", "100"}, {"comments", "5"}},
bson.D{{"views", "800"}, {"likes", "300"}, {"comments", "80"}},
}

func TestExtract(t *testing.T) {
t.Run("should return error if no user_id in config", func(t *testing.T) {
extractor := new(mongodb.Extractor)
_, err := extractor.Extract(map[string]interface{}{
"password": "abcd",
"host": "localhost:27017",
})

assert.NotNil(t, err)
})

t.Run("should return error if no password in config", func(t *testing.T) {
extractor := new(mongodb.Extractor)
_, err := extractor.Extract(map[string]interface{}{
"user_id": "Gaurav_Ubuntu",
"host": "localhost:27017",
})

assert.NotNil(t, err)
})

t.Run("should return error if no host in config", func(t *testing.T) {
extractor := new(mongodb.Extractor)
_, err := extractor.Extract(map[string]interface{}{
"user_id": "user",
"password": "abcd",
})

assert.NotNil(t, err)
})

t.Run("should return mockdata we generated with mongo running on localhost", func(t *testing.T) {
extractor := new(mongodb.Extractor)
uri := "mongodb://user:abcd@localhost:27017"
clientOptions := options.Client().ApplyURI(uri)

err := mockDataGenerator(clientOptions)
if err != nil {
t.Fatal(err)
}
result, err := extractor.Extract(map[string]interface{}{
"user_id": "user",
"password": "abcd",
"host": "localhost:27017",
})
if err != nil {
t.Fatal(err)
}
expected := getExpectedVal()
assert.Equal(t, result, expected)
})
}

func getExpectedVal() (expected []map[string]interface{}) {
expected = []map[string]interface{}{
{
"collection_name": "connection",
"database_name": testDB,
"document_count": 3,
},
{
"collection_name": "posts",
"database_name": testDB,
"document_count": 3,
},
{
"collection_name": "reach",
"database_name": testDB,
"document_count": 3,
},
{
"collection_name": "system.users",
"database_name": "admin",
"document_count": 1,
},
{
"collection_name": "system.version",
"database_name": "admin",
"document_count": 2,
},
{
"collection_name": "system.sessions",
"database_name": "config",
"document_count": 0,
},
}
return
}

func mockDataGenerator(clientOptions *options.ClientOptions) (err error) {
client, err := mongo.NewClient(clientOptions)
if err != nil {
log.Fatal(err)
return
}
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
err = client.Connect(ctx)
if err != nil {
return
}
db := client.Database("local")
_ = db.Collection("startup_log").Drop(ctx)
db = client.Database(testDB)
_ = db.Drop(ctx)
err = insertPosts(ctx, client)
if err != nil {
return
}
err = insertConnections(ctx, client)
if err != nil {
return
}
err = insertReach(ctx, client)
if err != nil {
return
}
client.Disconnect(ctx)
return
}

func insertPosts(ctx context.Context, client *mongo.Client) (err error) {
collection := client.Database(testDB).Collection("posts")
_, insertErr := collection.InsertMany(ctx, posts)
if insertErr != nil {
return insertErr
}
return
}

func insertConnections(ctx context.Context, client *mongo.Client) (err error) {
collection := client.Database(testDB).Collection("connection")
_, insertErr := collection.InsertMany(ctx, connections)
if insertErr != nil {
return insertErr
}
return
}

func insertReach(ctx context.Context, client *mongo.Client) (err error) {
collection := client.Database(testDB).Collection("reach")
_, insertErr := collection.InsertMany(ctx, reach)
if insertErr != nil {
return insertErr
}
return
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.16
require (
cloud.google.com/go/bigquery v1.8.0
cloud.google.com/go/storage v1.15.0 // indirect
github.com/aws/aws-sdk-go v1.38.35 // indirect
github.com/etsy/statsd v0.9.0
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
Expand All @@ -13,11 +14,11 @@ require (
github.com/kr/text v0.2.0 // indirect
github.com/mcuadros/go-defaults v1.2.0
github.com/mitchellh/mapstructure v1.4.1
github.com/pelletier/go-toml v1.7.0 // indirect
github.com/segmentio/kafka-go v0.4.16
github.com/spf13/viper v1.7.1
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.7.0
go.mongodb.org/mongo-driver v1.5.3
golang.org/x/crypto v0.0.0-20210506145944-38f3c27a63bf // indirect
golang.org/x/mod v0.4.2 // indirect
golang.org/x/net v0.0.0-20210505214959-0714010a04ed // indirect
Expand Down
Loading

0 comments on commit a48dbd5

Please sign in to comment.