Skip to content

Commit

Permalink
add mutext lock on params map
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-ext-simba-jl committed Sep 26, 2023
1 parent 741dc03 commit 8837b0e
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 2 deletions.
6 changes: 6 additions & 0 deletions chunk_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,11 @@ func (scd *snowflakeChunkDownloader) start() error {
// if the rowsetbase64 retrieved from the server is empty, move on to downloading chunks
var err error
var loc *time.Location
paramsMutex.Lock()
if scd.sc != nil && scd.sc.cfg != nil {
loc = getCurrentLocation(scd.sc.cfg.Params)
}
paramsMutex.Unlock()
firstArrowChunk := buildFirstArrowChunk(scd.RowSet.RowSetBase64, loc, scd.pool)
higherPrecision := higherPrecisionEnabled(scd.ctx)
scd.CurrentChunk, err = firstArrowChunk.decodeArrowChunk(scd.RowSet.RowType, higherPrecision)
Expand Down Expand Up @@ -271,9 +273,11 @@ func (scd *snowflakeChunkDownloader) startArrowBatches() error {
var err error
chunkMetaLen := len(scd.ChunkMetas)
var loc *time.Location
paramsMutex.Lock()
if scd.sc != nil && scd.sc.cfg != nil {
loc = getCurrentLocation(scd.sc.cfg.Params)
}
paramsMutex.Unlock()
firstArrowChunk := buildFirstArrowChunk(scd.RowSet.RowSetBase64, loc, scd.pool)
scd.FirstBatch = &ArrowBatch{
idx: 0,
Expand Down Expand Up @@ -430,9 +434,11 @@ func decodeChunk(scd *snowflakeChunkDownloader, idx int, bufStream *bufio.Reader
return err
}
var loc *time.Location
paramsMutex.Lock()
if scd.sc != nil && scd.sc.cfg != nil {
loc = getCurrentLocation(scd.sc.cfg.Params)
}
paramsMutex.Unlock()
arc := arrowResultChunk{
ipcReader,
0,
Expand Down
4 changes: 4 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,9 @@ func (sc *snowflakeConn) cleanup() {
sc.rest.Client.CloseIdleConnections()
}
sc.rest = nil
paramsMutex.Lock()
sc.cfg = nil
paramsMutex.Unlock()
}

func (sc *snowflakeConn) Close() (err error) {
Expand Down Expand Up @@ -648,9 +650,11 @@ type snowflakeArrowStreamChunkDownloader struct {
}

func (scd *snowflakeArrowStreamChunkDownloader) Location() *time.Location {
paramsMutex.Lock()
if scd.sc != nil {
return getCurrentLocation(scd.sc.cfg.Params)
}
paramsMutex.Unlock()
return nil
}
func (scd *snowflakeArrowStreamChunkDownloader) TotalRows() int64 { return scd.Total }
Expand Down
2 changes: 0 additions & 2 deletions location.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,11 @@ func init() {
func getCurrentLocation(params map[string]*string) *time.Location {
loc := time.Now().Location()
var err error
paramsMutex.Lock()
if tz, ok := params["timezone"]; ok && tz != nil {
loc, err = time.LoadLocation(*tz)
if err != nil {
loc = time.Now().Location()
}
}
paramsMutex.Unlock()
return loc
}
2 changes: 2 additions & 0 deletions rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@ type snowflakeRows struct {
}

func (rows *snowflakeRows) getLocation() *time.Location {
paramsMutex.Lock()
if rows.location == nil && rows.sc != nil && rows.sc.cfg != nil {
rows.location = getCurrentLocation(rows.sc.cfg.Params)
}
paramsMutex.Unlock()
return rows.location
}

Expand Down

0 comments on commit 8837b0e

Please sign in to comment.