From fa881ea15ce17d8bd96c2b881fd12a0fc3100147 Mon Sep 17 00:00:00 2001 From: Raj Patil Date: Thu, 22 Aug 2024 19:04:42 +0530 Subject: [PATCH] updates --- Content/20230717162224-concurrency.org | 177 ++++++++++++++ Content/20240205171209-go.org | 3 + Content/20240726181733-cloud_native.org | 288 ++++++++++++++++++++++- Content/20240821102020-autocomplete.org | 5 + Content/20240822172256-rate_limiting.org | 9 + 5 files changed, 481 insertions(+), 1 deletion(-) create mode 100644 Content/20240821102020-autocomplete.org create mode 100644 Content/20240822172256-rate_limiting.org diff --git a/Content/20230717162224-concurrency.org b/Content/20230717162224-concurrency.org index 895c584..17368a2 100644 --- a/Content/20230717162224-concurrency.org +++ b/Content/20230717162224-concurrency.org @@ -4,6 +4,183 @@ #+title: Concurrency #+filetags: :programming: +* Abstract +- Important aspect when it comes to optimizing the [[id:314236f7-81ae-48b7-b62b-dc822119180e][design of a system]]. +- I'll be initiating the initial deep dive into low level concurrency primitives and extensively studying patterns in [[id:ad4ba668-b2ec-47b1-9214-2284aedaceba][Golang]] +* Patterns +** Fan-In +:PROPERTIES: +:ID: 400e0635-f797-4abd-b6b5-d6379634b430 +:END: +- multiplexes multiple input channels onto one output channel +- 3 components: + - Sources : set of one or more input channels with the same type, accepted by funnel + - Destination : an output channel of the same type as Sources. Created and provided by funnel + - Funnel : Accepts sources and immediately returns Destination. Any input from any Sources will be output by Destination +*** Code +- Funnel is variadic on sources (channels of a type (string for this instance)) + #+begin_src go :exports both +package main + +import ( + "sync" + "time" + "fmt" +) +func Funnel(sources ...<-chan string ) <-chan string { + dest := make(chan string) //the shared destination channel + + var wg sync.WaitGroup // to close dest when all sources are closed + + wg.Add(len(sources)) // setting size of WaitGroup + + for _, ch := range sources { + go func(c <-chan string){ // start goroutine for each source + defer wg.Done() //notify wg when closing + for n := range c { + dest <- n + } + }(ch) + } + + go func(){ //init goroutine for closing dest + wg.Wait() // wati + close(dest) + }() + + return dest +} + +func main() { + sources := make([]<-chan string, 0) //empty channel slice + + for i:= 0; i<3; i++ { + ch := make(chan string) + sources = append(sources, ch) + + go func(chanNum int) { // run goroutine for each channel + defer close(ch) + + for i:= 1; i<= 5; i++ { + ch <- fmt.Sprintf("sending %d from chan %d", i, chanNum) + time.Sleep(time.Second ) + } + }(i) + } + + dest := Funnel(sources...) + for d := range dest { + fmt.Println(d) + } +} + #+end_src + + #+RESULTS: + #+begin_example + sending 1 from chan 0 + sending 1 from chan 2 + sending 1 from chan 1 + sending 2 from chan 0 + sending 2 from chan 2 + sending 2 from chan 1 + sending 3 from chan 2 + sending 3 from chan 0 + sending 3 from chan 1 + sending 4 from chan 1 + sending 4 from chan 0 + sending 4 from chan 2 + sending 5 from chan 2 + sending 5 from chan 0 + sending 5 from chan 1 + #+end_example + + +- Note that the sends were from multiple sources but the Prints are from the destination channel + +** Fan-Out +:PROPERTIES: +:ID: 59c59a72-789c-4812-b2e8-18b1731511df +:END: +- evenly distributes messages from an input channel to multiple output channels +- 3 components + - Source : input channel, accepted by split + - Destinations : output channels, of the same type as that of source; provided by split + - split : accepts source and returns destination + +*** Code +- two possible implementations: + - single source to destination writer that round-robins across destinations + - delays, if the current destination is unable to receive + - needs just one goroutine + - multiple destination readers that compete for source's output + - slightly more complex but decreased likelihood of holdups due to a single worker + - chances of a destination reader starving + - showing example for this +#+begin_src go :exports both +package main + +import ( + "fmt" + "sync" +) + +func Split(source <-chan string, n int) []<-chan string { + dests := make([]<-chan string, 0) //creating destinations' channel slice + + for i:= 0; i= retries { + return response, err + } + log.Printf("Attempt %d failed; retrying in %v", r + 1, delay) + + select { + case <- time.After(delay): + case <- ctx.Done(): + return "", ctx.Err() + } + } + } +} + #+end_src + + - emulating an erroneous function to try out retry + + #+begin_src go +var count int + +func EmulateTransientError(ctx context.Context) (string, error) { + count++ + + if count <= 3 { + return "intentional fail", errors.New("error") + } else { + return "success", nil + } +} + +func main() { + r := Retry(EmulateTransientError, 5, 2*time.Second) + + res, err := r(context.Background()) + + fmt.Println(res,err) +} + #+ +*** Throttle +:PROPERTIES: +:ID: f437c67e-a680-4400-8640-1fd32cc9e363 +:END: +- limits the frequency of a funciton call to some maximum number of invocations per minute +- See [[id:a9f836f0-d43d-4e97-96fc-06f75e982d15][Rate Limiting]] Algorithms +- diff w/ [[id:d78e2fbe-8c51-489c-b97c-74b01a0abcb6][Debounce]] + - debounce collates clusters of calls (across flexible durations) into representative boundary calls + - throttle limits the amount of calls in a relatively fixed duration +- 2 components + - Effector : the function being regulated + - Throttle : the enwrapping closure over Effector : implementing the rate limiting layer + + #+begin_src go +type Effector func(context.Context) (string, error) + #+end_src + + #+begin_src go +func Throttle(e Effector, max uint, refill uint, d time.Duration) Effector { + var tokens = max + var once sync.Once + + return func(ctx context.Context) (string, error) { + if ctx.Err() != nil { + return "", ctx.Err() + } + } + + once.Do(func() { + ticker := time.NewTicker(d) + + go func() { + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + + case <- ticker.C: + t := tokens + refill + if t > max { + t = max + } + tokens = t + } + } + }() + }) + + if tokens <= 0 { + return "", fmt.Errorf("too many calls") + } + + tokens-- + + return e(ctx) +} + #+end_src + +*** Timeout +:PROPERTIES: +:ID: f58d7534-8fef-4af0-bf8f-45cf50375e93 +:END: +- allows a process to stop waiting for an answer once it's clear that an answer may not be coming +- 3 components: + - client : calls a slow function + - slowfunction : a long running function + - timeout : wrapper over slow function +- straightforward if a function utilizes context.Context in golang, + #+begin_src go +ctx := context.Background() +ctxt,cancel := context.WithTimeout(ctx, 10*time.Second) +defer cancel() +result, err := SomeFunction(ctxt) + #+end_src +- This isn't usually the case though + - build a closure that respects the context in such a case, followed by a select over your injected timeout context and the result of a goroutined slow function +- will need to convert the slow function into a context respecting wrapper as follows + +#+begin_src go +type SlowFunction func(string) (string, error) + +type WithContext func(context.Context, string) (string, error) + +func Timeout(f SlowFunction) WithContext { + return func(ctx context.Context, arg string) (string error) { + chres := make(chan string) //channel for results + cherr := make(chan string) //channel for errors + + go func() { + res, err := f(arg) //dispatch slow function + chres <- res + cherr <- err + }() + + select { + case res := <-chres://if done before timeout + return res, <-cherr + case <-ctx.Done()://in case of timeout + return "",ctx.Err() + } + } +} +#+end_src + +- finally, using Timeout will be like +#+begin_src go +func main() { + ctx := context.Background() + ctxt, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + timeout := Timeout(Slow) + res, err := timeout(ctxt, "some input") + + fmt.Println(res, err) +} +#+end_src +- an alternative to using context.Context (context.Context is the preferred method btw), is using the time.After function : https://pkg.go.dev/time#After +** [[id:618d0535-411d-4c36-b176-84413ec8bfc1][Concurrency]] Patterns * CNCF (Cloud Native Computing Foundation) :PROPERTIES: :ID: 56e931a4-16af-4eba-bcd0-c8f0b9566153 diff --git a/Content/20240821102020-autocomplete.org b/Content/20240821102020-autocomplete.org new file mode 100644 index 0000000..5a7f17d --- /dev/null +++ b/Content/20240821102020-autocomplete.org @@ -0,0 +1,5 @@ +:PROPERTIES: +:ID: e0b818a4-972a-43e7-922f-e3e7a47af4d1 +:END: +#+title: AutoComplete +#+filetags: :programming: diff --git a/Content/20240822172256-rate_limiting.org b/Content/20240822172256-rate_limiting.org new file mode 100644 index 0000000..a9b62be --- /dev/null +++ b/Content/20240822172256-rate_limiting.org @@ -0,0 +1,9 @@ +:PROPERTIES: +:ID: a9f836f0-d43d-4e97-96fc-06f75e982d15 +:END: +#+title: Rate Limiting +#+filetags: :programming: + +* Algorithms +** Token Bucket +- https://en.wikipedia.org/wiki/Token_bucket