Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
rajp152k committed Aug 22, 2024
1 parent da1132b commit fa881ea
Show file tree
Hide file tree
Showing 5 changed files with 481 additions and 1 deletion.
177 changes: 177 additions & 0 deletions Content/20230717162224-concurrency.org
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,183 @@
#+title: Concurrency
#+filetags: :programming:

* Abstract
- Important aspect when it comes to optimizing the [[id:314236f7-81ae-48b7-b62b-dc822119180e][design of a system]].
- I'll be initiating the initial deep dive into low level concurrency primitives and extensively studying patterns in [[id:ad4ba668-b2ec-47b1-9214-2284aedaceba][Golang]]
* Patterns
** Fan-In
:PROPERTIES:
:ID: 400e0635-f797-4abd-b6b5-d6379634b430
:END:
- multiplexes multiple input channels onto one output channel
- 3 components:
- Sources : set of one or more input channels with the same type, accepted by funnel
- Destination : an output channel of the same type as Sources. Created and provided by funnel
- Funnel : Accepts sources and immediately returns Destination. Any input from any Sources will be output by Destination
*** Code
- Funnel is variadic on sources (channels of a type (string for this instance))
#+begin_src go :exports both
package main

import (
"sync"
"time"
"fmt"
)
func Funnel(sources ...<-chan string ) <-chan string {
dest := make(chan string) //the shared destination channel

var wg sync.WaitGroup // to close dest when all sources are closed

wg.Add(len(sources)) // setting size of WaitGroup

for _, ch := range sources {
go func(c <-chan string){ // start goroutine for each source
defer wg.Done() //notify wg when closing
for n := range c {
dest <- n
}
}(ch)
}

go func(){ //init goroutine for closing dest
wg.Wait() // wati
close(dest)
}()

return dest
}

func main() {
sources := make([]<-chan string, 0) //empty channel slice

for i:= 0; i<3; i++ {
ch := make(chan string)
sources = append(sources, ch)

go func(chanNum int) { // run goroutine for each channel
defer close(ch)

for i:= 1; i<= 5; i++ {
ch <- fmt.Sprintf("sending %d from chan %d", i, chanNum)
time.Sleep(time.Second )
}
}(i)
}

dest := Funnel(sources...)
for d := range dest {
fmt.Println(d)
}
}
#+end_src

#+RESULTS:
#+begin_example
sending 1 from chan 0
sending 1 from chan 2
sending 1 from chan 1
sending 2 from chan 0
sending 2 from chan 2
sending 2 from chan 1
sending 3 from chan 2
sending 3 from chan 0
sending 3 from chan 1
sending 4 from chan 1
sending 4 from chan 0
sending 4 from chan 2
sending 5 from chan 2
sending 5 from chan 0
sending 5 from chan 1
#+end_example


- Note that the sends were from multiple sources but the Prints are from the destination channel

** Fan-Out
:PROPERTIES:
:ID: 59c59a72-789c-4812-b2e8-18b1731511df
:END:
- evenly distributes messages from an input channel to multiple output channels
- 3 components
- Source : input channel, accepted by split
- Destinations : output channels, of the same type as that of source; provided by split
- split : accepts source and returns destination

*** Code
- two possible implementations:
- single source to destination writer that round-robins across destinations
- delays, if the current destination is unable to receive
- needs just one goroutine
- multiple destination readers that compete for source's output
- slightly more complex but decreased likelihood of holdups due to a single worker
- chances of a destination reader starving
- showing example for this
#+begin_src go :exports both
package main

import (
"fmt"
"sync"
)

func Split(source <-chan string, n int) []<-chan string {
dests := make([]<-chan string, 0) //creating destinations' channel slice

for i:= 0; i<n ; i ++ {
ch := make(chan string)// creating n destinations
dests = append(dests, ch)

go func(chanNum int) {
defer close(ch)
// dedicated goroutine for each channel
// that competes for reads from the source

for val := range source {
ch <- fmt.Sprintf("dest chan # %d intook { %s }",i, val)
}
}(i)
}

return dests
}


func main(){
source := make(chan string) //The input channel
dests := Split(source, 5) // Retrieve 5 output channels

go func() {
for i := 1; i<= 10; i++ { //sending into source
source <- fmt.Sprintf("%d from source chan", i)
}
close(source) //closing when done
}()

var wg sync.WaitGroup // wait until all dests close

wg.Add(len(dests))

for i, ch := range dests {
go func(i int, d <- chan string) {
defer wg.Done()

for val := range d {
fmt.Println(val)
}
}(i, ch)
}
wg.Wait()
}
#+end_src
** Future-Promise
:PROPERTIES:
:ID: 0e346737-3bff-4f35-b08c-9c8c2f163a3d
:END:
- provides a placeholder for a value that is not yet known
* Major Flavors
- [[id:3b44673f-5e7c-4b96-8ef2-1d68f5131173][Actor-Model Computation]]
- [[id:0458f827-5634-41e0-b261-dfc5cb2d2389][CSP: Communicating Sequential Processes]]

* Resources
** [[id:64bfc13e-1b7c-4cbe-ba0e-9d17ebaacef1][BOOK: Cloud Native Go]] : Chapter 4
3 changes: 3 additions & 0 deletions Content/20240205171209-go.org
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,6 @@ func main() {
:END:

** BOOK: Cloud Native Go
:PROPERTIES:
:ID: 64bfc13e-1b7c-4cbe-ba0e-9d17ebaacef1
:END:
Loading

0 comments on commit fa881ea

Please sign in to comment.