Skip to content

Commit

Permalink
Merge pull request #111 from go-faster/feat/ytlocal
Browse files Browse the repository at this point in the history
feat(ytlocal): add functions to setup cluster and CHYT
  • Loading branch information
tdakkota authored Sep 6, 2023
2 parents d2f818f + 0ef18f1 commit 6351d02
Show file tree
Hide file tree
Showing 15 changed files with 1,399 additions and 25 deletions.
196 changes: 179 additions & 17 deletions cmd/ytlocal/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ import (
"github.com/testcontainers/testcontainers-go"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.ytsaurus.tech/yt/go/ypath"
"go.ytsaurus.tech/yt/go/yson"
"go.ytsaurus.tech/yt/go/yt"
"go.ytsaurus.tech/yt/go/yt/ythttp"
"golang.org/x/sync/errgroup"

"github.com/go-faster/oteldb/internal/ytlocal"
Expand Down Expand Up @@ -62,13 +65,20 @@ func initBaseConfigs(pa *ytlocal.PortAllocator, base ytlocal.BaseServer, targets
if err != nil {
return errors.Wrap(err, "allocate")
}
skynetPort, err := pa.Allocate()
if err != nil {
return errors.Wrap(err, "allocate")
}
b := ytlocal.BaseServer{
RPCPort: port,
MonitoringPort: rpcPort,
Logging: base.Logging,
AddressResolver: base.AddressResolver,
TimestampProvider: base.TimestampProvider,
ClusterConnection: base.ClusterConnection,
// Every node would try to bind 10080 port, set some unused port to avoid
// "already used" errors.
SkynetHTTPPort: skynetPort,
}
*target = b
}
Expand Down Expand Up @@ -150,6 +160,17 @@ func main() {
zap.String("master", bin.Master),
)

chytExe, err := ytlocal.NewCHYTBinary(dir)
if err != nil {
return errors.Wrap(err, "new CHYT binary")
}
zctx.From(ctx).Info("Using CHYT binaries",
zap.String("controller", chytExe.Controller),
zap.String("trampoline", chytExe.Trampoline),
zap.String("tailer", chytExe.Tailer),
zap.String("server", chytExe.Server),
)

const clusterName = "test"
cellID := ytlocal.GenerateCellID(1, clusterName)

Expand All @@ -161,10 +182,26 @@ func main() {
if err != nil {
return errors.Wrap(err, "allocate")
}
chytControllerPort, err := pa.Allocate()
if err != nil {
return errors.Wrap(err, "allocate")
}

const localhost = "localhost"
var (
masterAddr = net.JoinHostPort(localhost, strconv.Itoa(masterPort))
httpProxyAddr = net.JoinHostPort(localhost, strconv.Itoa(arg.ProxyPort))
chytControllerAddr = net.JoinHostPort(localhost, strconv.Itoa(chytControllerPort))
)

zctx.From(ctx).Info(
"Using addrs",
zap.String("master", masterAddr),
zap.String("http_proxy", httpProxyAddr),
zap.String("chyt_controller", chytControllerAddr),
)

var (
masterAddr = net.JoinHostPort(localhost, strconv.Itoa(masterPort))
cfgTimestampProvider = ytlocal.Connection{
Addresses: []string{masterAddr},
SoftBackoffTime: 100,
Expand Down Expand Up @@ -285,20 +322,21 @@ func main() {
CellDirectory: cfgCellDirectory,
TimestampProvider: cfgTimestampProvider,
DiscoveryConnections: []ytlocal.Connection{
{
Addresses: []string{masterAddr},
},
cfgDiscoveryConnection,
},
}
cfgMaster = ytlocal.Master{
BaseServer: cfgBaseServer,

ChunkManger: cfgChunkManager,
ObjectService: cfgObjectService,
TimestampManager: cfgTimestampManager,
HiveManager: cfgHiveManager,
PrimaryMaster: cfgPrimaryMaster,
YPServiceDiscovery: cfgYPServiceDiscovery,
ChunkManger: cfgChunkManager,
ObjectService: cfgObjectService,
TimestampManager: cfgTimestampManager,
HiveManager: cfgHiveManager,
PrimaryMaster: cfgPrimaryMaster,
YPServiceDiscovery: cfgYPServiceDiscovery,
DiscoveryServer: ytlocal.DiscoveryConfig{
Addresses: cfgDiscoveryConnection.Addresses,
},
RPCDispatcher: cfgRPCDispatcher,
ChunkClientDispatcher: cfgChunkClientDispatcher,
TCPDispatcher: cfgTCPDispatcher,
Expand Down Expand Up @@ -359,16 +397,70 @@ func main() {
},
},
}

execTotalCPU = 4
cfgExecNode = ytlocal.ExecNode{
ExecAgent: ytlocal.ExecAgent{
SlotManager: ytlocal.SlotManager{
JobEnvironment: ytlocal.JobEnvironment{
StartUID: 19500,
Type: ytlocal.JobEnvironmentTypeSimple,
},
Locations: []ytlocal.SlotLocation{
{Path: filepath.Join(dir, "slots")},
},
},
JobController: ytlocal.JobController{
ResourceLimits: ytlocal.JobControllerResourceLimits{
UserSlots: execTotalCPU * 2,
},
},
},
ResourceLimits: ytlocal.ResourceLimits{
TotalCPU: float64(execTotalCPU),
TotalMemory: 4 * 1024 * 1024 * 1024,
},
DataNode: ytlocal.DataNodeOptions{
CacheLocations: []ytlocal.DiskLocation{
{Path: filepath.Join(dir, "chunk_cache")},
},
},
Flavors: []string{"exec"},
}

cfgHTTPProxy = ytlocal.HTTPProxy{
Port: arg.ProxyPort,
Coordinator: ytlocal.Coordinator{
Enable: true,
Announce: true,
ShowPorts: true,
PublicFQDN: net.JoinHostPort(localhost, strconv.Itoa(arg.ProxyPort)),
PublicFQDN: httpProxyAddr,
},
Driver: cfgDriver,
}

cfgQueryTracker = ytlocal.QueryTracker{
User: "query_tracker",
CreateStateTablesOnStartup: true,
}

cfgCHYTContoller = ytlocal.CHYTController{
Strawberry: ytlocal.Strawberry{
Root: "//sys/clickhouse/strawberry",
Stage: "production",
RobotUsername: "robot-chyt-controller",
},
LocationProxies: []string{
httpProxyAddr,
},
Controller: struct {
Resolver ytlocal.AddressResolver `yson:"address_resolver"`
}{
Resolver: cfgAddressResolver,
},
HTTPAPIEndpoint: chytControllerAddr,
DisableAPIAuth: true,
}
)
// Allocate ports and fill base config for all components, using
// cfgBaseServer as a template.
Expand All @@ -377,16 +469,25 @@ func main() {
&cfgControllerAgent.BaseServer,
&cfgNode.BaseServer,
&cfgHTTPProxy.BaseServer,
&cfgQueryTracker.BaseServer,
&cfgExecNode.BaseServer,
); err != nil {
return errors.Wrap(err, "init base configs")
}
var (
opt = ytlocal.Options{Binary: bin, Dir: dir}
master = ytlocal.NewComponent(opt, cfgMaster)
scheduler = ytlocal.NewComponent(opt, cfgScheduler)
agent = ytlocal.NewComponent(opt, cfgControllerAgent)
node = ytlocal.NewComponent(opt, cfgNode)
proxy = ytlocal.NewComponent(opt, cfgHTTPProxy)
opt = ytlocal.Options{Binary: bin, Dir: dir}
master = ytlocal.NewComponent(opt, cfgMaster)
scheduler = ytlocal.NewComponent(opt, cfgScheduler)
agent = ytlocal.NewComponent(opt, cfgControllerAgent)
node = ytlocal.NewComponent(opt, cfgNode)
proxy = ytlocal.NewComponent(opt, cfgHTTPProxy)
queryTracker = ytlocal.NewComponent(opt, cfgQueryTracker)
execNode = ytlocal.NewComponent(opt, cfgExecNode)
chytContoller = chytExe.ControllerServer(opt, cfgCHYTContoller)
chytWaiter = &waiterServer[ytlocal.CHYTController]{
done: make(chan struct{}),
server: chytContoller,
}
)
zctx.From(ctx).Info("Starting cluster",
zap.Int("master.rpc_port", master.Config.RPCPort),
Expand All @@ -399,7 +500,46 @@ func main() {
agent,
node,
proxy,
queryTracker,
execNode,
chytWaiter,
)
g.Go(func() error {
yc, err := ythttp.NewClient(&yt.Config{
Proxy: httpProxyAddr,
})
if err != nil {
return errors.Wrap(err, "create client")
}

lg := zctx.From(ctx)
if err := ytlocal.SetupMaster(ctx, yc, cfgMaster); err != nil {
return errors.Wrap(err, "setup master")
}
lg.Info("Master setup is complete")

if err := ytlocal.SetupQueryTracker(ctx, yc, clusterName); err != nil {
return errors.Wrap(err, "setup query trackers")
}
lg.Info("Query tracker setup is complete")

if _, err := yc.CreateNode(ctx,
ypath.Path(`//sys/clickhouse/strawberry/chyt`),
yt.NodeMap,
&yt.CreateNodeOptions{Recursive: true, IgnoreExisting: true},
); err != nil {
return errors.Wrap(err, "create clickhouse dir")
}
chytWaiter.Ready()
lg.Info("Run CHYT controller")

if err := chytExe.Setup(ctx, yc, chytControllerAddr, dir, ytlocal.CHYTInitCluster{Proxy: httpProxyAddr}); err != nil {
return errors.Wrap(err, "setup chyt")
}
lg.Info("CHYT cluster setup is complete")

return nil
})
return g.Wait()
},
}
Expand Down Expand Up @@ -787,3 +927,25 @@ Also, docker is required to run UI.
os.Exit(1)
}
}

type waiterServer[T any] struct {
done chan struct{}
server *ytlocal.Server[T]
}

func (s *waiterServer[T]) Ready() {
close(s.done)
}

func (s *waiterServer[T]) String() string {
return s.server.String()
}

func (s *waiterServer[T]) Run(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-s.done:
return s.server.Run(ctx)
}
}
20 changes: 20 additions & 0 deletions internal/ytlocal/_golden/data-node.yson
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
];
"resource_limits"={
"total_memory"=8388608000;
"node_dedicated_cpu"=0.000000;
};
"data_node"={
"store_locations"=[
Expand All @@ -82,5 +83,24 @@
"high_watermark"=8589934592;
};
];
"cache_locations"=[
];
"block_cache"={
"compressed_data"={
capacity=0;
};
"uncompressed_data"={
capacity=0;
};
};
"blocks_ext_cache"={
capacity=0;
};
"chunk_meta_cache"={
capacity=0;
};
"block_meta_cache"={
capacity=0;
};
};
}
29 changes: 29 additions & 0 deletions internal/ytlocal/_golden/exec-node-simple.yson
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
];
"resource_limits"={
"total_memory"=8388608000;
"node_dedicated_cpu"=0.000000;
};
"tablet_node"={
"versioned_chunk_meta_cache"={
Expand All @@ -82,5 +83,33 @@
};
];
};
"job_controller"={
"resource_limits"={
"user_slots"=0;
};
};
};
"data_node"={
"store_locations"=[
];
"cache_locations"=[
];
"block_cache"={
"compressed_data"={
capacity=0;
};
"uncompressed_data"={
capacity=0;
};
};
"blocks_ext_cache"={
capacity=0;
};
"chunk_meta_cache"={
capacity=0;
};
"block_meta_cache"={
capacity=0;
};
};
}
Loading

0 comments on commit 6351d02

Please sign in to comment.