1- use serde_json :: Value ;
1+ use std :: thread ;
22
3- use redismodule:: { Context , RedisError , RedisResult , RedisValue } ;
4- use redismodule:: { NextArg , REDIS_OK } ;
3+ use serde_json:: { Map , Value } ;
4+
5+ use redismodule:: { Context , NextArg , RedisError , RedisResult , RedisValue , REDIS_OK } ;
56
67use redisearch_api:: { Document , FieldType } ;
78
@@ -75,13 +76,11 @@ pub fn add_document(key: &str, index_name: &str, doc: &RedisJSON) -> RedisResult
7576
7677 let map = schema_map:: as_ref ( ) ;
7778
78- map. get ( index_name)
79- . ok_or ( "ERR no such index" . into ( ) )
80- . and_then ( |schema| {
81- let rsdoc = create_document ( key, schema, doc) ?;
82- schema. index . add_document ( & rsdoc) ?;
83- REDIS_OK
84- } )
79+ if let Some ( schema) = map. get ( index_name) {
80+ let rsdoc = create_document ( key, schema, doc) ?;
81+ schema. index . add_document ( & rsdoc) ?;
82+ }
83+ REDIS_OK
8584}
8685
8786fn create_document ( key : & str , schema : & Schema , doc : & RedisJSON ) -> Result < Document , Error > {
@@ -91,13 +90,14 @@ fn create_document(key: &str, schema: &Schema, doc: &RedisJSON) -> Result<Docume
9190 let rsdoc = Document :: create ( key, score) ;
9291
9392 for ( field_name, path) in fields {
94- let value = doc. get_doc ( & path) ?;
95-
96- match value {
97- Value :: String ( v) => rsdoc. add_field ( field_name, & v, FieldType :: FULLTEXT ) ,
98- Value :: Number ( v) => rsdoc. add_field ( field_name, & v. to_string ( ) , FieldType :: NUMERIC ) ,
99- Value :: Bool ( v) => rsdoc. add_field ( field_name, & v. to_string ( ) , FieldType :: TAG ) ,
100- _ => { }
93+ let results = doc. get_values ( path) ?;
94+ if let Some ( value) = results. first ( ) {
95+ match value {
96+ Value :: String ( v) => rsdoc. add_field ( field_name, & v, FieldType :: FULLTEXT ) ,
97+ Value :: Number ( v) => rsdoc. add_field ( field_name, & v. to_string ( ) , FieldType :: NUMERIC ) ,
98+ Value :: Bool ( v) => rsdoc. add_field ( field_name, & v. to_string ( ) , FieldType :: TAG ) ,
99+ _ => { }
100+ }
101101 }
102102 }
103103
@@ -120,14 +120,78 @@ where
120120 match subcommand. to_uppercase ( ) . as_str ( ) {
121121 "ADD" => {
122122 let path = args. next_string ( ) ?;
123- add_field ( & index_name, & field_name, & path)
123+ add_field ( & index_name, & field_name, & path) ?;
124+
125+ // TODO handle another "ADD" calls in prallel a running call
126+ thread:: spawn ( move || {
127+ let schema = if let Some ( stored_schema) = schema_map:: as_ref ( ) . get ( & index_name) {
128+ stored_schema
129+ } else {
130+ return ; // TODO handle this case
131+ } ;
132+
133+ let ctx = Context :: get_thread_safe_context ( ) ;
134+ let mut cursor: u64 = 0 ;
135+ loop {
136+ ctx. lock ( ) ;
137+ let res = scan_and_index ( & ctx, & schema, cursor) ;
138+ ctx. unlock ( ) ;
139+
140+ match res {
141+ Ok ( c) => cursor = c,
142+ Err ( e) => {
143+ eprintln ! ( "Err on index {:?}" , e) ; // TODO hadnle this better
144+ return ;
145+ }
146+ }
147+ if cursor == 0 {
148+ break ;
149+ }
150+ }
151+ } ) ;
152+
153+ REDIS_OK
124154 }
125155 //"DEL" => {}
126156 //"INFO" => {}
127157 _ => Err ( "ERR unknown subcommand - try `JSON.INDEX HELP`" . into ( ) ) ,
128158 }
129159}
130160
161+ fn scan_and_index ( ctx : & Context , schema : & Schema , cursor : u64 ) -> Result < u64 , RedisError > {
162+ let values = ctx. call ( "scan" , & [ & cursor. to_string ( ) ] ) ;
163+ match values {
164+ Ok ( RedisValue :: Array ( arr) ) => match ( arr. get ( 0 ) , arr. get ( 1 ) ) {
165+ ( Some ( RedisValue :: SimpleString ( next_cursor) ) , Some ( RedisValue :: Array ( keys) ) ) => {
166+ let cursor = next_cursor. parse ( ) . unwrap ( ) ;
167+ let res = keys. iter ( ) . try_for_each ( |k| {
168+ if let RedisValue :: SimpleString ( key) = k {
169+ ctx. open_key ( & key)
170+ . get_value :: < RedisJSON > ( & REDIS_JSON_TYPE )
171+ . and_then ( |doc| {
172+ if let Some ( data) = doc {
173+ if let Some ( index) = & data. index {
174+ if schema. name == * index {
175+ add_document ( key, index, data) ?;
176+ }
177+ }
178+ Ok ( ( ) )
179+ } else {
180+ Err ( "Error on get value from key" . into ( ) )
181+ }
182+ } )
183+ } else {
184+ Err ( "Error on parsing reply from scan" . into ( ) )
185+ }
186+ } ) ;
187+ res. map ( |_| cursor)
188+ }
189+ _ => Err ( "Error on parsing reply from scan" . into ( ) ) ,
190+ } ,
191+ _ => Err ( "Error on parsing reply from scan" . into ( ) ) ,
192+ }
193+ }
194+
131195// JSON.QGET <index> <query> <path>
132196pub fn qget < I > ( ctx : & Context , args : I ) -> RedisResult
133197where
@@ -145,18 +209,29 @@ where
145209 . ok_or ( "ERR no such index" . into ( ) )
146210 . map ( |schema| & schema. index )
147211 . and_then ( |index| {
148- let results: Result < Vec < _ > , RedisError > = index
149- . search ( & query) ?
150- . map ( |key| {
151- let key = ctx. open_key_writable ( & key) ;
152- let value = match key. get_value :: < RedisJSON > ( & REDIS_JSON_TYPE ) ? {
153- Some ( doc) => doc. to_string ( & path, Format :: JSON ) ?. into ( ) ,
154- None => RedisValue :: None ,
155- } ;
156- Ok ( value)
157- } )
158- . collect ( ) ;
159-
160- Ok ( results?. into ( ) )
212+ let result: Value =
213+ index
214+ . search ( & query) ?
215+ . try_fold ( Value :: Object ( Map :: new ( ) ) , |mut acc, key| {
216+ ctx. open_key ( & key)
217+ . get_value :: < RedisJSON > ( & REDIS_JSON_TYPE )
218+ . and_then ( |doc| {
219+ doc. map_or ( Ok ( Vec :: new ( ) ) , |data| {
220+ data. get_values ( & path)
221+ . map_err ( |e| e. into ( ) ) // Convert Error to RedisError
222+ . map ( |values| {
223+ values. into_iter ( ) . map ( |val| val. clone ( ) ) . collect ( )
224+ } )
225+ } )
226+ } )
227+ . map ( |r| {
228+ acc. as_object_mut ( )
229+ . unwrap ( )
230+ . insert ( key. to_string ( ) , Value :: Array ( r) ) ;
231+ acc
232+ } )
233+ } ) ?;
234+
235+ Ok ( RedisJSON :: serialize ( & result, Format :: JSON ) ?. into ( ) )
161236 } )
162237}
0 commit comments