Skip to content

Commit

Permalink
GEOMESA-3315 Redis - add configuration for socket read timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
elahrvivaz committed Dec 6, 2023
1 parent fa57984 commit d3e0a6d
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 12 deletions.
2 changes: 2 additions & 0 deletions docs/user/redis/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ Parameter Type Description
``redis.connection.pool.size`` Integer Max number of simultaneous connections to use
``redis.connection.pool.validate`` Boolean Test connections when borrowed from the pool. Connections may be closed due to
inactivity, which would cause a transient error if validation is disabled
``redis.connection.timeout`` String Timeout for socket connections to Redis. The timeout is specified as a duration,
e.g. ``10 seconds``. The default value is ``2 seconds``
``redis.pipeline.enabled`` Boolean Enable pipelining of query requests. This reduces network latency, but restricts
queries to a single execution thread
``geomesa.query.threads`` Integer The number of threads to use per query (if not pipelining)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import redis.clients.jedis.{Jedis, JedisPool}

import java.awt.RenderingHints
import java.net.URI
import scala.util.Try
import scala.util.{Failure, Success, Try}

class RedisDataStoreFactory extends DataStoreFactorySpi with LazyLogging {

Expand Down Expand Up @@ -66,6 +66,7 @@ object RedisDataStoreFactory extends GeoMesaDataStoreInfo with LazyLogging {
RedisUrlParam,
RedisCatalogParam,
PoolSizeParam,
SocketTimeoutParam,
QueryThreadsParam,
QueryTimeoutParam,
PipelineParam,
Expand All @@ -89,20 +90,29 @@ object RedisDataStoreFactory extends GeoMesaDataStoreInfo with LazyLogging {
*/
def buildConnection(params: java.util.Map[String, _]): JedisPool = {
ConnectionPoolParam.lookupOpt(params).getOrElse {
val url = RedisUrlParam.lookup(params)
val uri = {
val url = RedisUrlParam.lookup(params)
// if there is no protocol/port, or the url is a valid redis url, use as is
// else use the redis:// protocol to support databases, etc
val parsed =
if (url.indexOf(":") == -1) {
Try(new URI(url))
} else {
parse(url).orElse(parse(s"redis://$url"))
}
parsed match {
case Success(uri) => uri
case Failure(e) =>
throw new IllegalArgumentException(s"Could not create valid Redis connection URI from: $url", e)
}
}

val config = new GenericObjectPoolConfig[Jedis]()
PoolSizeParam.lookupOpt(params).foreach(s => config.setMaxTotal(s.intValue()))
config.setTestOnBorrow(TestConnectionParam.lookup(params))
// if there is no protocol/port, or the url is a valid redis url, use as is
// else use the redis:// protocol to support databases, etc
if (url.indexOf(":") == -1) {
new JedisPool(config, url)
} else {
val uri = parse(url).orElse(parse(s"redis://$url")).getOrElse {
throw new IllegalArgumentException(s"Could not create valid Redis connection URI from: $url")
}
new JedisPool(config, uri)
}
val timeout = SocketTimeoutParam.lookup(params).toMillis.toInt

new JedisPool(config, uri, timeout)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import org.locationtech.geomesa.utils.conf.GeoMesaSystemProperties.SystemPropert
import org.locationtech.geomesa.utils.geotools.GeoMesaParam
import redis.clients.jedis.JedisPool

import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal

package object data extends LazyLogging {
Expand Down Expand Up @@ -55,6 +57,14 @@ package object data extends LazyLogging {
supportsNiFiExpressions = true
)

val SocketTimeoutParam =
new GeoMesaParam[Duration](
"redis.connection.timeout",
"Connection socket timeout for calls to Redis",
default = Duration(redis.clients.jedis.Protocol.DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS),
supportsNiFiExpressions = true
)

val TestConnectionParam =
new GeoMesaParam[java.lang.Boolean](
"redis.connection.pool.validate",
Expand Down

0 comments on commit d3e0a6d

Please sign in to comment.