diff --git a/bower.json b/bower.json index d3911ef..c1ffdbd 100644 --- a/bower.json +++ b/bower.json @@ -1,33 +1,34 @@ { - "name": "purescript-redis-client", - "license": [ - "BSD-3-Clause" - ], - "repository": { - "type": "git", - "url": "https://github.com/paluh/purescript-redis-client.git" - }, - "ignore": [ - "**/.*", - "node_modules", - "bower_components", - "output" - ], - "dependencies": { - "purescript-aff": "^v5.1.2", - "purescript-arrays": "^v5.3.1", - "purescript-bytestrings": "^v8.0.0", - "purescript-console": "^v4.4.0", - "purescript-effect": "^v2.0.1", - "purescript-either": "^v4.1.1", - "purescript-int-53": "^v4.0.0", - "purescript-maybe": "^v4.0.1", - "purescript-nullable": "^v4.1.1", - "purescript-prelude": "^v4.1.1", - "purescript-transformers": "^v4.2.0" - }, - "devDependencies": { - "purescript-psci-support": "^v4.0.0", - "purescript-test-unit": "^v15.0.0" - } + "name": "purescript-redis-client", + "license": [ + "BSD-3-Clause" + ], + "repository": { + "type": "git", + "url": "https://github.com/paluh/purescript-redis-client.git" + }, + "ignore": [ + "**/.*", + "node_modules", + "bower_components", + "output" + ], + "dependencies": { + "purescript-aff": "^v5.1.2", + "purescript-arrays": "^v5.3.1", + "purescript-bytestrings": "^v8.0.0", + "purescript-console": "^v4.4.0", + "purescript-effect": "^v2.0.1", + "purescript-either": "^v4.1.1", + "purescript-int-53": "^v4.0.0", + "purescript-maybe": "^v4.0.1", + "purescript-nullable": "^v4.1.1", + "purescript-prelude": "^v4.1.1", + "purescript-transformers": "^v4.2.0", + "purescript-node-streams": "^4.0.1" + }, + "devDependencies": { + "purescript-psci-support": "^v4.0.0", + "purescript-test-unit": "^v15.0.0" + } } diff --git a/src/Database/Redis.js b/src/Database/Redis.js index 03da61f..6eeeef2 100644 --- a/src/Database/Redis.js +++ b/src/Database/Redis.js @@ -624,3 +624,87 @@ exports.zscoreImpl = function(conn) { }; }; }; + +exports.scanStreamImpl = function(conn){ + return function(options){ + return function(tuple){ + return function (onError, onSuccess){ + var stream = conn.scanStream(options) + + // `resultKeys` is an array of strings representing key names. + // Note that resultKeys may contain 0 keys, and that it will sometimes + // contain duplicates due to SCAN's implementation in Redis. + // woody: Should I check for duplicates here or leave it to end user?? + stream.on("data", function(keys){ + onSuccess(tuple(keys)(stream)) + }) + + stream.on("error", onError) + + return function (cancelError, cancelerError, cancelerSuccess) { + cancelerSuccess() + } + } + } + } +} + +exports.hscanStreamImpl = function(conn){ + return function(options){ + return function(hash){ + return function(tuple){ + return function (onError, onSuccess){ + var stream = conn.hscanStream(hash,options) + + // `resultKeys` is an array of strings representing key names. + // Note that resultKeys may contain 0 keys, and that it will sometimes + // contain duplicates due to SCAN's implementation in Redis. + // woody: Should I check for duplicates here or leave it to end user?? + stream.on("data", function(result){ + var arr = []; + for (var i = 0; i < result.length; i += 2) { + arr.push({ key: result[i], value: result[i + 1] }); + } + onSuccess(tuple(arr)(stream)) + }) + + stream.on("error", onError) + + return function (cancelError, cancelerError, cancelerSuccess) { + cancelerSuccess() + } + } + } + } + } +} + +exports.zscanStreamImpl = function(conn){ + return function(options){ + return function(key){ + return function(tuple){ + return function (onError, onSuccess){ + var stream = conn.zscanStream(key,options) + + // `resultKeys` is an array of strings representing key names. + // Note that resultKeys may contain 0 keys, and that it will sometimes + // contain duplicates due to SCAN's implementation in Redis. + // woody: Should I check for duplicates here or leave it to end user?? + stream.on("data", function(result){ + var arr = []; + for (var i = 0; i < result.length; i += 2) { + arr.push({ member: result[i], value: parseFloat(result[i + 1]) }); + } + onSuccess(tuple(arr)(stream)) + }) + + stream.on("error", onError) + + return function (cancelError, cancelerError, cancelerSuccess) { + cancelerSuccess() + } + } + } + } + } +} diff --git a/src/Database/Redis.purs b/src/Database/Redis.purs index 6eb300d..6b3b822 100644 --- a/src/Database/Redis.purs +++ b/src/Database/Redis.purs @@ -6,6 +6,7 @@ module Database.Redis , IPFamily , negInf , posInf + , ScanStreamOptions , Write(..) , Zadd(..) , ZaddReturn(..) @@ -22,6 +23,7 @@ module Database.Redis , hget , hgetall , hset + , hscanStream , get , incr , keys @@ -33,6 +35,7 @@ module Database.Redis , rpop , rpush , set + , scanStream , withConnection , zadd , zcard @@ -45,6 +48,7 @@ module Database.Redis , zremrangebyrank , zremrangebyscore , zrevrangebyscore + , zscanStream , zscore ) where @@ -59,6 +63,8 @@ import Data.Maybe (Maybe(..)) import Data.NonEmpty (NonEmpty) import Data.Nullable (Nullable, toMaybe, toNullable) import Data.Tuple (Tuple(..)) +import Prim.Row (class Union) +import Node.Stream import Unsafe.Coerce (unsafeCoerce) -------------------------------------------------------------------------------- @@ -549,3 +555,51 @@ zscore -> ByteString -> Aff (Maybe Int53) zscore conn key = (toMaybe <$> _) <<< fromEffectFnAff <<< zscoreImpl conn key + +type ScanStreamOptions = + ( count :: Int + , match :: String + ) + +foreign import scanStreamImpl :: forall opts. + Connection + -> Record opts + -> (Array String -> Readable () -> Tuple (Array String) (Readable ())) + -> EffectFnAff (Tuple (Array String) (Readable ())) + +foreign import hscanStreamImpl :: forall opts. + Connection + -> Record opts + -> String + -> (Array {key :: String, value :: String} -> Readable () -> Tuple (Array {key :: String, value :: String}) (Readable ())) + -> EffectFnAff (Tuple (Array {key :: String, value :: String}) (Readable ())) + +foreign import zscanStreamImpl :: forall opts. + Connection + -> Record opts + -> String + -> (Array {member :: String, score :: Int} -> Readable () -> Tuple (Array {member :: String, score :: Int}) (Readable ())) + -> EffectFnAff (Tuple (Array {member :: String, score :: Int}) (Readable ())) + +scanStream :: forall options t. + Union options t ScanStreamOptions + => Connection + -> Record options + -> Aff (Tuple (Array String) (Readable ())) +scanStream redis options = fromEffectFnAff $ scanStreamImpl redis options Tuple + +hscanStream :: forall options t. + Union options t ScanStreamOptions + => Connection + -> Record options + -> String + -> Aff (Tuple (Array {key :: String, value :: String}) (Readable ())) +hscanStream redis options hash = fromEffectFnAff $ hscanStreamImpl redis options hash Tuple + +zscanStream :: forall options t. + Union options t ScanStreamOptions + => Connection + -> Record options + -> String + -> Aff (Tuple (Array {member :: String, score :: Int}) (Readable ())) +zscanStream redis options key = fromEffectFnAff $ zscanStreamImpl redis options key Tuple \ No newline at end of file diff --git a/test/Main.purs b/test/Main.purs index c7b801d..3dd6083 100644 --- a/test/Main.purs +++ b/test/Main.purs @@ -4,18 +4,19 @@ module Test.Main import Prelude -import Effect.Aff (Aff, Milliseconds(Milliseconds), delay, forkAff) -import Effect (Effect) import Control.Monad.Except (catchError, throwError) import Data.Array (drop, filter, fromFoldable, sort, sortWith, take) -import Data.ByteString (ByteString) +import Data.ByteString (ByteString, Encoding(..)) import Data.ByteString as ByteString import Data.Foldable (length) import Data.Int53 (fromInt) import Data.Maybe (Maybe(..)) import Data.NonEmpty (singleton, (:|)) +import Data.Tuple (fst) import Database.Redis (Connection, Expire(..), Write(..), ZscoreInterval(..), Config, defaultConfig, flushdb, keys, negInf, posInf, withConnection) import Database.Redis as Redis +import Effect (Effect) +import Effect.Aff (Aff, Milliseconds(Milliseconds), delay, forkAff) import Test.Unit (TestSuite, suite) import Test.Unit as Test.Unit import Test.Unit.Assert as Assert @@ -24,6 +25,9 @@ import Test.Unit.Main (runTest) b :: String -> ByteString b = ByteString.toUTF8 +text :: ByteString -> String +text = flip ByteString.toString UTF8 + test :: forall a . Config @@ -470,3 +474,40 @@ main = runTest $ do void $ Redis.rpush conn2 testList value1 v <- Redis.brpopIndef conn (singleton testList) Assert.equal v.value value1 + + suite "scan streams" do + test addr "keys" $ \conn -> do + void $ Redis.incr conn key1 + void $ Redis.incr conn key2 + got <- fst <$> Redis.scanStream conn {} + Assert.equal (sort [text key1, text key2]) (sort got) + + test addr "hash" $ \conn -> do + let + testHash = b "testHash" + value1 = { key: key1, value: b "val1" } + value2 = { key: key2, value: b "val2" } + + void $ Redis.hset conn testHash value1.key value1.value + void $ Redis.hset conn testHash value2.key value2.value + values <- fst <$> Redis.hscanStream conn {} (text testHash) + + Assert.equal + [text value1.value, text value2.value] + (map _.value <<< sortWith _.key $ values) + + test addr "sorted set" $ \conn -> do + let + testSet = b "testSet" + members = + {member: b "m1", score: 1 } :| [{ member: b "m2", score: 2 } , { member: b "m3", score: 3 }] + + void $ Redis.zadd + conn + testSet + (Redis.ZaddAll Redis.Added) + members + + values <- fst <$> Redis.zscanStream conn {} (text testSet) + + Assert.equal (map _.score $ fromFoldable members) (map _.score values) \ No newline at end of file