From e26d04aabca88ad3635d5fa717de22a6c6695918 Mon Sep 17 00:00:00 2001 From: drfok Date: Thu, 28 Jul 2022 15:37:43 -0700 Subject: [PATCH 1/8] Loose Local Implementation of LZA --- granule_ingester/README.md | 70 +++++++++++++++++-- granule_ingester/conda-requirements.txt | 5 ++ .../granule_ingester/processors/__init__.py | 1 + .../processors/reading_processors/__init__.py | 1 + .../granule_ingester/writers/__init__.py | 2 + 5 files changed, 74 insertions(+), 5 deletions(-) diff --git a/granule_ingester/README.md b/granule_ingester/README.md index 1339835..41d3953 100644 --- a/granule_ingester/README.md +++ b/granule_ingester/README.md @@ -9,18 +9,78 @@ data to Cassandra and Solr. ## Prerequisites -Python 3.7 +Python 3.7 (```conda install -c anaconda python=3.7``` in conda env) ## Building the service From `incubator-sdap-ingester`, run: - $ cd common && python setup.py install - $ cd ../granule_ingester && python setup.py install - + cd common && python setup.py install + cd ../granule_ingester && python setup.py install + +# Install nexusproto + +Clone repo + + git clone https://github.com/apache/incubator-sdap-nexusproto.git + +From `incubator-sdap-nexus-proto`, run: + + cd build/python/nexusproto && python setup.py install + ## Launching the service From `incubator-sdap-ingester`, run: - $ python granule_ingester/granule_ingester/main.py -h + python granule_ingester/granule_ingester/main.py -h + +In order to successfully run the service, you will need to have a Cassandra, +Solr, and RabbitMQ connection. Make sure to provide their respective credentials. + +### How to Launch Service with Port-Forwarded Instances in `bigdata` + +**NOTE: Run each of the following in different terminals** + +#### Cassandra +Connect to bigdata + + ssh -L 9042:localhost:9042 bigdata + +Log in and then run the following + + ssh sdeploy@bigdata + +Now, port forward the service + + kubectl port-forward svc/nexus-cassandra -n sdap 9042:9042 + +#### Solr +Connect to bigdata + + ssh -L 8983:localhost:8984 bigdata + +Log in and then run the following + + ssh sdeploy@bigdata + +Now, port forward the service + + kubectl port-forward svc/nexus-solr-svc -n sdap 8984:8983 + +#### RabbitMQ +Connect to bigdata + + ssh -L 5672:localhost:5672 bigdata + +Log in and then run the following + + ssh sdeploy@bigdata + +Now, port forward the service + + kubectl port-forward svc/rabbitmq -n sdap 5672:5672 + +From `incubator-sdap-ingester`, run: + + python granule_ingester/granule_ingester/main.py --cassandra-username=cassandra --cassandra-password=cassandra ## Running the tests From `incubator-sdap-ingester`, run: diff --git a/granule_ingester/conda-requirements.txt b/granule_ingester/conda-requirements.txt index a1e4206..25cfd1d 100644 --- a/granule_ingester/conda-requirements.txt +++ b/granule_ingester/conda-requirements.txt @@ -8,4 +8,9 @@ pyyaml==5.3.1 aiohttp==3.6.2 tenacity requests==2.27.1 +<<<<<<< HEAD +======= +numcodecs==0.9.1 +zarr +>>>>>>> 7c1367b (Loose Local Implementation of LZA) diff --git a/granule_ingester/granule_ingester/processors/__init__.py b/granule_ingester/granule_ingester/processors/__init__.py index 174a833..8da7bba 100644 --- a/granule_ingester/granule_ingester/processors/__init__.py +++ b/granule_ingester/granule_ingester/processors/__init__.py @@ -5,3 +5,4 @@ from granule_ingester.processors.kelvintocelsius import KelvinToCelsius from granule_ingester.processors.Subtract180FromLongitude import Subtract180FromLongitude from granule_ingester.processors.ForceAscendingLatitude import ForceAscendingLatitude +from granule_ingester.processors.NetCDFProcessor import NetCDFProcessor diff --git a/granule_ingester/granule_ingester/processors/reading_processors/__init__.py b/granule_ingester/granule_ingester/processors/reading_processors/__init__.py index 84cd8f0..7e0b9e4 100644 --- a/granule_ingester/granule_ingester/processors/reading_processors/__init__.py +++ b/granule_ingester/granule_ingester/processors/reading_processors/__init__.py @@ -5,3 +5,4 @@ from granule_ingester.processors.reading_processors.SwathReadingProcessor import SwathReadingProcessor from granule_ingester.processors.reading_processors.TileReadingProcessor import TileReadingProcessor from granule_ingester.processors.reading_processors.TimeSeriesReadingProcessor import TimeSeriesReadingProcessor +from granule_ingester.processors.reading_processors.LazyLoadProcessor import LazyLoadProcessor diff --git a/granule_ingester/granule_ingester/writers/__init__.py b/granule_ingester/granule_ingester/writers/__init__.py index 9323d8c..ce4d80a 100644 --- a/granule_ingester/granule_ingester/writers/__init__.py +++ b/granule_ingester/granule_ingester/writers/__init__.py @@ -2,3 +2,5 @@ from granule_ingester.writers.MetadataStore import MetadataStore from granule_ingester.writers.SolrStore import SolrStore from granule_ingester.writers.CassandraStore import CassandraStore +from granule_ingester.writers.netCDF_Store import netCDF_Store +from granule_ingester.writers.LocalStore import LocalStore \ No newline at end of file From 278401715476164f5287249c10cacfd50d0757f7 Mon Sep 17 00:00:00 2001 From: drfok Date: Thu, 28 Jul 2022 16:05:14 -0700 Subject: [PATCH 2/8] Adding Missing Files Loose Implementation --- granule_ingester/conda-requirements.txt | 3 -- .../granule_ingester/writers/LocalStore.py | 41 ++++++++++++++++++ .../granule_ingester/writers/netCDF_Store.py | 13 ++++++ .../TROPOMI_methane_mixing_ratio_20200801.nc | Bin 0 -> 63118 bytes .../TROPOMI_methane_mixing_ratio_20200802.nc | Bin 0 -> 63118 bytes .../tests/writers/test_LocalStore.py | 30 +++++++++++++ 6 files changed, 84 insertions(+), 3 deletions(-) create mode 100644 granule_ingester/granule_ingester/writers/LocalStore.py create mode 100644 granule_ingester/granule_ingester/writers/netCDF_Store.py create mode 100644 granule_ingester/tests/granules/TROPOMI_methane_mixing_ratio_20200801.nc create mode 100644 granule_ingester/tests/granules/TROPOMI_methane_mixing_ratio_20200802.nc create mode 100644 granule_ingester/tests/writers/test_LocalStore.py diff --git a/granule_ingester/conda-requirements.txt b/granule_ingester/conda-requirements.txt index 25cfd1d..7febdaf 100644 --- a/granule_ingester/conda-requirements.txt +++ b/granule_ingester/conda-requirements.txt @@ -8,9 +8,6 @@ pyyaml==5.3.1 aiohttp==3.6.2 tenacity requests==2.27.1 -<<<<<<< HEAD -======= numcodecs==0.9.1 zarr ->>>>>>> 7c1367b (Loose Local Implementation of LZA) diff --git a/granule_ingester/granule_ingester/writers/LocalStore.py b/granule_ingester/granule_ingester/writers/LocalStore.py new file mode 100644 index 0000000..e738e67 --- /dev/null +++ b/granule_ingester/granule_ingester/writers/LocalStore.py @@ -0,0 +1,41 @@ +# LocalStore +# Purpose: Writes netCDF4 file into a machine local directory +# Written by: Ricky Fok +from os import path +import asyncio +import logging +from typing import Tuple +#from msilib.schema import Error +import traceback +import logging +import xarray as xr +from numcodecs import Blosc + +from granule_ingester.exceptions import CassandraFailedHealthCheckError, CassandraLostConnectionError +from granule_ingester.writers import netCDF_Store + +logger = logging.getLogger(__name__) + +class LocalStore(netCDF_Store): + def __init__(self, store_path: str): + self.store_path = store_path + + #async def health_check(self) -> bool: + #try: + #session = self._get_session() + #session.shutdown() + #return True + #except Exception: + #raise CassandraFailedHealthCheckError("Cannot connect to Cassandra!") + + def save_data(self, ds: xr.Dataset, cname: str, clevel: int, shuffle: int, chunkShape: Tuple[int, int, int]) -> None: + compressor = Blosc(cname = cname, clevel = clevel, shuffle = shuffle) + # @TODO be able to customize encoding for each dava variable + encoding = {vname: {'compressor': compressor, 'chunks': chunkShape} for vname in ds.data_vars} + + try: + ds.to_zarr(self.store_path, encoding = encoding, consolidated=True , mode='w') + # @TODO update metadata in master Zarr and place error checks for invalid paths. + except Exception as e: + logging.error(traceback.format_exc()) + \ No newline at end of file diff --git a/granule_ingester/granule_ingester/writers/netCDF_Store.py b/granule_ingester/granule_ingester/writers/netCDF_Store.py new file mode 100644 index 0000000..277e70d --- /dev/null +++ b/granule_ingester/granule_ingester/writers/netCDF_Store.py @@ -0,0 +1,13 @@ +from abc import ABC, abstractmethod + +import netCDF4 + +from granule_ingester.healthcheck import HealthCheck + + +class netCDF_Store(ABC): + + @abstractmethod + def save_data(self, ds: netCDF4._netCDF4) -> None: + pass + \ No newline at end of file diff --git a/granule_ingester/tests/granules/TROPOMI_methane_mixing_ratio_20200801.nc b/granule_ingester/tests/granules/TROPOMI_methane_mixing_ratio_20200801.nc new file mode 100644 index 0000000000000000000000000000000000000000..43dd0dac453f829fa4078ae78e714165c8becf50 GIT binary patch literal 63118 zcmeHQ3!F{W_djF2jYlevLPv@`8yXT}&&wmP5cz5Doiof}#9)lKA4d`?k31rzQ}PJC zLi%Q~$iu}*n>#W;&oz6@`7EA%kfRqaZT%4*R@F8l)c?mE1s%;1 z5oy&7Qwo}G7FJ^0M0e_Brf)a@S&gEGajn|Mx9ilZeY?a?vC(bfm5BSTvM{9puhlNP zZM@0MYgv}STGqfQ5mAdYQNNElhx`hm{K)U|5Ua9j%aFj z$*c*g4NgnV7-Z&Js#P-oVI#>&!w0E@Qqy{*sP!W2Mb?gNSUa+=8X4J${}<8Nk{_lN z;fnT1>YI|5oRn@>=RcM#>mSM*klw#<%7Bd29*MouQ+oAJPfbdDedf$vFnUqa=yd`a z3exa9EIGHS&CI5(AEHDRQ=+b#Ac9e>S^_F$0QrBT2pN~g;3(FyM{$llya?(7!5+o? ztgd?-MQcdVDCU!p#X+#c`ln^lewO-oetmDb z_OUHn7p!KPD=fb~(R-5B}#%#w^lEB0{-GN#Itvu zYHrDhXw7AqkD~RgC$@`yJiY&bl=O_$ltE^J^&nI+|KYd~Nl71++CQyHy;^DfnQanP z=ixe0wR$J_tly+=ox1hvMAqtBVDkV=r1dlR74zp`guLZ?{?M%6gsiDN{O;*P3EBQ# z?}RDO5pv+IuCGp-MaXykS{{4uO+wDsZFtYP4+sgrP)!@NiICE3%f3GH3qmT7E_7lj zBP25B;=LJX2x(UEhaLm|CZyfY18?>%9!*?Rwtd&TLNw{y@x!VqHKWPMGK+g9HHs#a z4$WHXY86dhSup;me|3!}OS=!Q(WO^3`KW5j(;bIIlWnJ-SkZ2LH2Kdvalbq^Gn$-C zkE+vRVKn)@!F~N>S4NYtzixl8*{9K@)W-0$jrT>9d&Zrs+wgcasng=y^r&B>Nt0sx zKZv{%P1@|;biP(Z40&qC%KFvsjv;-Xd}GM{HDZX_+m9+g5zwD~7bTI#M*2bem&#xO^;0O)w4fKr9(%iaYUe zESYf9G<5S=^72x1Ksv{gMSa`uJdhkqR+-K29~4VApRd(s?=!LF%QY1y?|Lbg{C8Bu z=i7}~@|&sNmUm;x)%-uVryF8Pi5-Wht=|(%Do)<@#hN3rq?Xx@)u&@gqdS*(djHQ@ z^4P(-Gv6)}N6gXv>aB8dq_^4K#ns}-ko!%`G>9YSXv|pG%k39Q_t9G44+PcMsh`9R`>BmLRt?O1HkyhJ%rq96J6X~oeKbN0aG?9jP`029+ zVTp9$?cWbweaTJPS0}3OKI5kC<{ba(#P@ExzMCoOsGHWPc<`Z8U%9E3U3Isc7JTE& zmzr#L)4|Dm$rEec^jou5pZDF=5;u0Kn{Hgbwe6hO+_bi((93Q*Khn&Z?50IcHI6*% zro&dRDROS4o0_ezDKf}SpJ=qQm)gfox%x?NTGtZL)lFX;yEG%Yy_*(~TU2Fab2lBy z8+KFPOq82;ePLeY#+q)rt*xoqy>1#+%IcY$TIz-0=B7MZ*iA=2X)<1Q(O=3>e5}O< z7k$by&@V2!!_+q8q>DbBWO5vKQL`1Z1sAo7r%1AQv5Pnm_Fs7v`S3w3b=LMVnfNcf05c(@e)Z zyXak(T5Vj^A`x*eYW`YOrLl`z=518ZMfaQC=}_H8qq(76^xZOMLuFmm8kSilT+}kd z@&YdUr`hbL%L%l%YL3{s1bV<4&Ql50GGD3BDmTrL0l=;t! z23QtD+{k)bI+oxe1MoEee;S=tEVulw5molCweTg5ht*5-M~_&em`-n)Qc}qL<2v6?BlTTBr|hc^YtH98?LQBM<~`HMIc4V?C6_(@72vGi zW(j}S7jJC%F`<1{trEu@sUId#qit25R`=Hp;gQ4f`i@hua7r@P@bP8`{&Pd{r@wPq zz?ffKMDC|Ba$-{_PxZ~{63b;f0w6TrD5dL!H+*#dF8^*z=`t)zIgfASgU{k~& zJJT3&wzxH?`YUI>>S=VY1$+VGTb&K_8|hcrXFaktZ=6>L+x|H%kF_qy?M}=8=yg1U z)jsL>WkB-T(IEm3jE`acMm@*{8ju|1mnpqaMUc-n-oexC-+%a&%Nl||BFno~(^_qD zTyn||bmDz{eAH@}`1q)2#d9mUU2<_1=OfdWb82iq1Zh9`(=y5d4s85U_eNe1<+GLt zL5__-y>u@DN6431iuh9$0`eNbK0}rh!k@U$y9R$6oLkCv3wAnC{9(IV3gfuZO2a&B zl>I@#74jh^`>z21^4(tfwAOGwMph*F#2Ai4H;Av+r+@n&*FA#8}hQqmr z-U{x!=;J;S%CSCM5B)%j`5roA1+PbI*5`U5^_1KC83iF;jQJ!9=3jM01k@V?ad7rm zDc}8j^e5gPJ9my#y|x{fmh6D-+$xkT+QjXly(&JBY*}#kCx}1tKJ3fsvMn$k`aDjr zv3H|ag?us`^i&`Ei032z=z;GbU1a!6!~Rp?0|I>DSaKD zN4hSs_@!|v+~@i%w|A?iS<|eYe|qfIw+Xj1+m+0zK1a`#-AQA6J+82h*LOSw`Hij%IX|t@ms4Xe#Cf!d;D56e zxW7lo-p~7`Jqr6nby+pBpHl$mC|6qu_wL!f@cdx@txEMRfyK|9?4$lBo@26waKDY& z{vYB`nc59-*uc-e%3-}m{3-ol{GIgi4uG>Q z=eZoYo}NYg$#VRqjia!S^vCB^&gxJ1sz2gS9|O3dYhnMW6{{@HdlLT0XAp;{)5ZJJ zw<`Y7?nXzz3yCfZxS0+1Q7-}hjF*2F#z8Hiaz3`C4)Ci*I5n0%$f9~n5JFey$?F7m>}PX0)H6!0;Ti=*mtm&4Kufhkh!? z53pVBG(5*hG2eLeHbHO9zu76>0r$_?DR}=y>j_l1@hA76JOqA*QQ6+t+4z(B*duU$ zLM{WIm3m2F-?0Ta)h5FJU9RUZtxmG{_tWI}+kE>^EML}e=uaQHTs!Q0U!a%G+&Uu5 zzuD3<{>V3QPnL_Y@k(c(Rltl}@L_P<{E^QRvj z2k|7Mz^!%PSuYLoG2>@gSCFdvz;Dwx*m2Z_u#O`BSR05F&^~aF1?{V?_kO>aejY2} z5c!d65B2d`j(kp({tA9CQYXI5<{L4vkH~C2{lF^$tzF|FY+#&hiXfP^R%|;>1^T7slDvxs};dtFj~Ijv)}r^;hZJA?a>*q zu4GqEo~z#q&NtO>p#SV5+=o=UgMWbd!+z;M!ThFA0UWXF5Jy(;gSZx32Io!2G1v#v zad6)k@ka~y1Kf35&Z*jKORmHp_BTX*@1L`4A8j+l!DRfg<>05v@;&%F1LyH_xiSmZ zQN$nV|03Xx!Fq0Vd4{Li|I)7u{>W_K_qJsHwFwUW3eYc?<%QCL;a8gh_ltsMFE$nC zMOL{f$C-Yrw?Gy56rQHjR&ko$cJ({pyj5F%eeNe_S6_eiS##LWBK60g*UQPjCi>jx zC^g0TP=6HeqZ^Ci9N*X#$8lu5-HB7RK7llW^;)Yin5Xp$`#8<(_R6mQtaMGUc3`9I z=e08UJ;o{VzCw1}z0viJ-{HLjSq~@tu}${-57PQWYQuXF#s_eYVN4$dc77$Yj^Cw&HH3bWQv@)#Q zh(BYueQ#h1#IubE-*(GtKM;Rd-zZ;2up8#*b*jG!@jax0_D6dH?%is23ITpf!hTLY z3-g4B9)$BYW%?k_r;brL)gO5X>^TJP{nA+lf`Yq1^$(aY65^gt{Kk^t#8+hbtS;>D z)TZG7(H;;dK#KfY8pK~QomZ*a+CCqf0OyfT{U8^MaoiXY@VyGH6r78zv4A_44)=1g zK6?-H_0E7#(iirDh(Gne;{b=*obRx`pwa%~Jr=SS?tunPzSLjiTXCH=R>QojCVF3Y zv?x2SpYP^44^j8664p;AJ8NG9{#XgP|AP2ap1cQeln(0>oeb}VU4uV2tkdcz_vKF9 z*|?y8!M#DWgN#38XkhmXuzsMFwt#bW#GhP`Jq`B+X$dB;>uH!}yF|Nj~X zAl^de!hV9ZIvfzbWO;Jh7**aY&WsPmIF?+`lWsAa>!D6c35q@*xtX+ocI)YB;<=g@ zZ~GCp9xNW_G4j0lT^%FAzDH`~5A{&%_>NZ~Uv??$5BATj9E#t$$NnMy?ER}&5&U`m z9hmn}KH`r)3GZ!UzPi=>KG|5jG$33U?|F~kqkO%{IxdIJh4+xu?jv}blpDrrUc(=w z?}7e@?fFW=yT0pQ;kl&wzl8p;v){WydN=sN%68cIu|j7!AKDG6{Z`-on`#dQO^gZ-K^>>*V7N$=c+a(k>dH0z7zJ-TL05rZ=}j5cppf632>%d-O0-h z8-GSP#0lsui2Gr?H1-nbL#qB3ob?6e%Tzx&Jt+1z$~`FTZ{>Cr752H;Y=$_2NAAi$03Y+`a;i0ieVV8I znFqYujqR|{*QSDBY)pc6mR*kJ{Aw7yXGHI|*WVXm|EuqOE?3*f({Ben&^BTmK;~CV z!aOhM=X9Eo4|26#^|df>(O%x4TWaG^wv+lD__WvlFl$ApTfg zhzF{va4xP5ygs8zauchW&oD z6aF0A{Se}=q!qkRM9+i2C-WPPAf88R+UK@Y{^rl)^~Rs9S6)ve{?zyY&w+8??g7tz zZs|XLBO_DuGp!EmF74c$ucvyYS^{M)Y6$DHF&)-f{R;RaNR`iXcE1;>eU1JQw?m5c zNRQRrF2?WMIMupA{2VFrWtJMVgR>81N2L7_;_D`%WG46(PWEN1p&T~FEGIla zn4i;>90b1vX->Cyz2$3Lx6C7Eqx9jS!nnzL&<(IZlk?@YaS`IgQy1#+8PiseUeCgX@zsF+X9;1vux(fVUcDya;SBJp8Tp53X zw(G>_gg;Ua;y+IH0?pT_E#Tv2tQOyH0PZ~aYXkQ$we#>Cv+d&Zjdr}zRE@Il{U<^^8Sy9UMN?s&m-F*lniYlj zc7m<9v0?x`w@3@8^K}>7QC}R$%aLNfOpSf>09Q8Es=>UbXMi7Jylwj#l^)}KicR&s z@IEe?2m5tnI()y6%!hqGQn|fgrPa>C#kYQ^@9(p)g4sJq3s%2#iyXb#SdK47?-6j9 zRsCqh5RNac5PaW)eE{FhA(h~sBq?d*tTDs^=u=Mv9?=hZ71rs@w$HwOCaCi_wyzm4 zc$I5VI8W6-hVyyk511|&_oLdXco>Ht7IUh$`^0Oz0n7K}*J?dPzbaUAvnt1U{={5j zRyKe5d$8;-@8^B`l6pSvLS_x)cw)QZy>G-Hodf4a#tgWxOumWdeCjK{@1vnU{-`#q zF4s#d>)T&4UKGz?{mON~pTBzKLHLf6Bj4gQ!Hy*1Ss^XskJYxn(>(Ezz~?C65B{*8 z*4(?lg5}qX%J|D_yNEw+=Rl4xeSHC#-wxZ~#gOo)hX?2Ve#D=wH`$f5^N8FYrk(J| z4#2sJ-c*bS$@oJ#EO9u;n+JaZ;tK2EDpZ?c`>~1bINppB@SXZiUZclZ= zBY?|n+LIN6c(3uWjdMvJJJunn^M$|hq??};+A~U?=4pM!T22vv9yy;66!<;)Qa>=C zz50g0&!{1!k?&TaY%1VLe+1TBq_lV|jxRQ|J>ZOhUvaD0C9n5W{cAVJHPW2wk6!i9 zp!gm7S7Tt_Yx+3g?zZ}zs=vTJW_26HV{)n=S(QuF_1w;6!-p`BXmGBORe5qfZ>(*jpu7o_WPd>>m>5a6!9nL`%BYe;`xt1{+qp= z=7;laHPVh3OZZd&I>X0F-*Lg~I0nl;GXAt<8gGxZ*$nje83DB~<4@L)lzW@Y$u9n6 zx!0rzz2#M3#2-nszu(j-IQJ5;eX`oOpQc`d=MMRAG)4RwbKt#Fw#uf90hh{7_)d~P z{Ao?Z{W|uGtZ40Z9I!mtlr6L0zYTCcBaKzq*9OQRY<;hh*Av!xnwU9U($C?!@`pb* z8tx@s)9wxKKfS91j6-pIp0lYs6w>-wnk5`*3Gn_IQZnFmE{7a{KTGob*-hWQi_1l- z#zTBoi-PZdVtqE!KF_u(=4Y0Y|G~M79%1hbZ7R!A&qKMKAFQ-CVFcJS9`-RPm%Pyj z>}^xb&+Sxg4)cid;NM(-tu3TU%L6dJJElQ@{X93MYZuOE>;nJ5so!+Of1sUH?E{kQ zB!`XlJ9 z+xTzTbD&dVJ`;z8`!$=@YDcW1SeMQ)|rPddm2d+as4=5&C14 zm?WfSemSjngy%%g$Fv&b`*}vbTb@ezqtE@!$AOH4IJSPqzRxD%PyOPhOpTY5osDJU z`&H`Z^FHOWUYCV(8Gmv+dRy@xE0!A#AzqDX#s2>bdeZiDMV{Isw1?#~{@5Y$`3&I? z&X+y+`{DvS&osV1&h3r(Lwh6DRxI;69#~EmWp8~Z!@h?0$wxvv>J)e$5P#}c=#QQz zjyvihQ&M)JAWvB_RYU$b&!)y{27w~XHMhAD#E;FEP#CjZ3^)Wq)zqJ zBmKC3NZD)PkE-opKgdR&;^j{D8Sx$$rd>pT!m0kvo^Kp&=F@)qA@LmEV~?e9f6M3t z_X9oo621b>*Q-z9dg!;o{>LaYg{O^T-p9{LE=ya<>yc`J3%&6bv41cACZt&vX@1M3 z{`!w_AKXbF%x8zZ#|88&@Ld4oKydFlV!OGW%J|c&4&vjZm!AW4_!42fjB}YgS1|v7 zZ9;w79@HK&L_=O}x z%BORAedDX{f*dD%k-{*)X=TQ9eqF+!Q$44Az5E3(N3A9LvGfr?>#dDH?LL_QQNB8V zJC`Tp56h9Ndtg4)CH!fdvo?RD+`LX{W4Mo=)qb#LaE`2Q-v{_O4Eq+{rtC8~pFsKK z&P|+8#-H(m=$Eh(fDgnU);Ai0KchTk-;;R4_qmlC3H}h)&oXTSaX^3d$>d|ae{5!N zPW|x*yIfx|4#vTz!ImFvdB&Llzw4kwe1KF0{Ag*z;l5Vv|9F4te`$oIZk%DRq~CVX90Id z;M`uupK?Fkn=ux6kHe@x!g{Lid6&y!yWo6Do$P&|k9H*mmJ8#B_|s0?@7-O3^Fj3~ zpyYYjPhx#-48-v~>EWV0%)f>n8Vh>b_`9Y&lkpdmWA)cApVrFfdZb8U#8?EaekA_5r6u(_IGBwzC#x7-e&G&1!*%P3i1OBuL;`596Gy010fBNS) zTn?QH-&IBYvEdNMAVa{v*3Jg^{)qn4bG%)|pK%%Pm+C$D`nE^xQK;uS;hQgu^QqEj zgKxQyi+1%pPFdCC2G!HL!22Oi{bH-X;PxS_;5}rrv5s$h`Rm66e?a{oAx>*VY~*PV z{zgN-e#e#UwIk_athd+ok#BRkh(GOb(N8C7ux{%cJ90jx+Ed^M)30HE)-~VffKK+5 O^XY6@XXN~V(*FlV#SLKq literal 0 HcmV?d00001 diff --git a/granule_ingester/tests/granules/TROPOMI_methane_mixing_ratio_20200802.nc b/granule_ingester/tests/granules/TROPOMI_methane_mixing_ratio_20200802.nc new file mode 100644 index 0000000000000000000000000000000000000000..00ef97b4861ae8e3bbae356e2e449ef5b6f46a41 GIT binary patch literal 63118 zcmeHQ3!G0y`#)vu$c>$mljKO=QON-JKaD0h@M-Bgsw2t`&$zBmu!%K}_i=>|k8Slap{B7daK zNA^$7C~PxWhb>0-ADH%t&49HjwzUSfwsH&pVbV$ePMpR=ExI})qKs7$5dq%XK0{b$ zd1RFJXYoR2wk`O)PyP7R0ja%BQ7c$TrD}42t6G0YJ!FPtYJzJ+ z(^4~rn5C9$cborXBfXPH3{i)qru9lu>qXa#t{vUDc62>8I{HEWUsO}eeuPq(JK8I` ze@a^K2Js&+0dau6JdmX`sf;6&} zWv7JN+-%DFAxiyQmHJmr5J4|iDFzA|K>lAZLcxWx*o$@SUVO>!UIcZ4?C!;Tt2$jwMvEPTKqz_3QnAWUbtu%hkHmhH!QJwm= z`u6VApjq8Ib?eoMuGO``ra_iT>u26v^QRjj%RE!QncbI=b@z<8>xHKX`TTg_#Ay== z*|)6gE6>g*`FP**F{J-v?^Q{u8AC>uUh+ipgE8dUZ)Pv` zJQ73ZE_mkWZe3%@(w;+WJpM!sSz9IL$;XDpkj=kzTHf}V7_w(&{K-dW#gJp^_3J#m zD2Dvr@ZN!Ot7AySU$?*A{F4|`{NuGe;=kY=~;ekb~J z3~9Y%QZ=3oyU)x_!+2u$#!Cwl;>oNPj~tlSDW1G;re#jAcw%O+ z$ISG2^2xP%}cL(S#8Uq7rpG|XtU%wFDqiYacF{<4S#oC z;j^Q>%xraC;UQku>A}@csQtW*yPxc3bu9y3z3kO-OEY@6^RlAxiz|<6;bo(E!(PUl zsqbZ7r_PW5xTcqFZew~@$;;{&w<7a0%e}}FUdGu%UN)wSDR|YxPL`eg=)>ndtcR7L zlODFk^fu#|hc!wzB@TL+*^1eMhgs=Zx5dN8o6YZF9`>7=wI9}cn3o6SVLXjXJj`;t z#w#9XiIG_zb|*LFVb7T9PaE%HJlqi;W|^Nq$ivKRnJOOkkd=6^hb=eLbfmL~m9yMx z?O_&)iuW+{*W$`eJK+!u6XjuVl{Oo?)5EN8nO)4otTen?z{CDD zo85RRk@Z#09y^=J_F3KeOCqzev)rw4Ady+WAK015o;P9e=;w*-vWc4Hj}qCF z90-ZbGCOT~BD1_%yeN^qXLi>I^Ag#;yaN*13#L0iPfldkWvoz}awhyMoPnD!1J=X> zCb;`bKyn;K20WscFd>d=lue zBC{8(nJZ&xj{mlzv?i%mPg@I#?Ct5OL9lYX&0LC>QT|?nwtl3e8A(~Q-Qwo|)^326 zF~p7Rr{!ZYzGVO_&HvBiVioJHb=SzsJJwtH68po7()^)O>lAa*8=>5$WSBlZme3;M zv4pm92}$5nX;aMdsdsWl@{kl<=y6TQU+8;pbD^AOO}1^RzY*Z5cI~R!WkUhi188OD zCyVo68ynPK7;D95t(9-A*sT7vem)vMyvfZLn~geU@#+SPAjjJIIdl5 zKGs+Ry!nIU4vy8h1C7_LT#ol8EXUnt*!{#UUqwMXxX~la+k1odCZnR@PBeI>D4M+o zgnXEWGZ4-|I0NAfgfkG%KsW>841_Zf&OkT=;S7W`5Y9lb8CdWzXoTrMAP-tp;aA8% zAWk#HFrLHxY-K>e0qeZ)WfIQIHF=TqbffTEudU(!4CDE_@kd4!Wz52>hy;KYG0cL^Fa33_m1|m>Tg&-ckn3Z z*;sfl^nY~wfN&t|%RH@nYe;sqGXL_aXB;>j#+{jLIo~%NzqFoXKN}Ag=jBL4jf)I| zc_h+M?YQZYGiA7+WRCN=>ZTVizdqv|=AHco^L@STgMh!g5*%@e@-j7!%*v7RM{7}o z`=O>a<&?#W^FX$JFK<^Zc_-A*y^T|Sc>zvkKf`?P;7n7d3wY3~Z5DXH{SYMn{My5H zNyEppgbT!<`l*ex56W?Sr1CwS8s4&;hVhp>I8(bGz_@do>h&_RG=2o?pT414VC}pK;}6=^YCIcKztjGY1=gN@z0mZXKHJgC zz38(Z{SDLH87JNH%)Ws69%~W>_pN@lfZF&|m)zwW7dHO1;;>)h){ejOVDU$OE#%X$ zF#dewO5O7-zdp4xi*ofkWldn7u3yN<`w#JFL^R~QkssEL#z~NK!wqe)hWo8|DB%+a zbb_o8>R#CAQ`bPfz6kJUT&W2-v+<|( zYQlN86P~YXGa5j>%ebRX`BUh3Z3>L1tO$%-WVVfO7yV%4mF3C0x5M}|lOpE@dE@@w zLjPd7OpS{$zaecm`s{C64|!F_-vYoHt@y3b{x$gf4))vhLiRq}NT7&6eJ2z2$w3$& zSxfsqE8!3Q@kiCE9|(5+$%pH+XS~8Zl8=@+=HFP~lH(64$|Kb}KkcZ8^+?%9cVWgeWRc=}^8oBmtM5CnN3_=gx7yOK;P2`Oz)lX0bLe-N`oZCUeee5{ zmv7uE;5E3pmj3hI1kT_ro4o2hd~903X`{C+dV2oHrW6eVEp< z*Rur{3x4T`#d&1F?MgV*_pK1@hH59Q=LdgM-h<5-JuII8&>OJA^=;U{QeK4Tlgh5i zj^mZF6y{$>_hGzV@6#FZRSoXXT048+(59r-5Jx-y`a}KsD)&#{(jD5dDXlyS_*3?} z;v$@@Lwj^0te;4O`$B5h*cv_`pBwMPliS3365@}JT+aKAj9JU6aVN~b*f==fhWMjx zh|s@kHGAE>9Nx!3JQ#<&2y$2+C{?Ep67)&o*9G3qj<&8;KytFac1S$;EZ2v5aU2UZ zA2R;Xj!_i$-DLc!r~3*0Nm`2cXT+b;3!Yo3w`L7@=nq{nMr_v){@6)42N){;u-!1t z9-J8ujNoe{a`kmb-?uVc}{S4;&#2+XWtRLc78^f)vZi9#~gs^-A~&d8Jr5N5%Sp9?I)S+Hg5qCQ-<@ zULM|?U{_}Ia`pqX+`ww(a)0$Nqr(vs=42dU+Us&htdJe30XP)n9*W%i#ME>P7o}iA}Xx zwE5Y^=wpV`;9kC{Qlw~wZ;9GYvf;r^K6=0 zh~tNza9+Qv&%!)MTmKu!xAq9U=fEy}#`Q>f`&>Yf`^MM!bA##|#hlM!>4yfxb!o@N zD%PdUOE8$O_5C!AcS$NMu#-ALIzUX$LUyXjN1YVZ+$Lrmk zp9A{SbF0_9>-hML_+$Tuc`Dfq&v{r$I9GxCMw4wYp7#vxoRyXz)?bQ^VZzMdeSf0pf#6XN%<-0-LE z=$|Ejr*)1jKQac za#^2x8gV(jB<#m%^9bkF9&r8A?!Wldm+b}08&hY6WY=gE-1tG>3GBPjtO4A=W&hQ2 z`$O_mwxg|Wnk{x@ySdG4CAS0>hm1cOE51*Q{v-a7zlKI<1l8^}`qF(mRQzSyt-Ht3 zAGPZ%yqq8P!|m=hs!HzT{74d4al)b~EeU^!$_~{`8d;>{q=x@aQiubin2u z$)~?|v%B8dC+)M?_%m*?$LGt=|E>$O@hAJEwheH9#|2m97I;5iwwuYHhI3l#c{rcX z1~%sXKL~ zJpEmCz3LK(!^KZl(|#Ty)$5COuBQ>$hf!+ywj<|3uMY1oAeHOc0I<)#_VRX+YA@dp z?GC@O_W!iuDco;LD|4C|?^f71QQq`DFCx9ya6Kax_McFnjjqEPah$jpDON*+w97R*6iI}$v5JUeJZj<1kveoS(i7 z-}lf@!Tz^V|2{4+<4^YIdh<*Jf04@mkjCQs8ghLmzrx0mjXx~cpFRONOu7g5SvK0w ze{70!HwtCNX7hNEX8PS}EH6i%O@-%?NNK6;*+)V9S_9aRN8UIF_gBOpeXcm*ul0|7 z9<3g@NAM5zt~)JPQrH*J-}u0%pQIzk7u<<%;!9n1aM%kFyhc32ODYFBIj zy%2}i6P~LXzuD`FGjM$({;(Z7KfrykT$&%DU;ZfD3h|PkX7RY$N8)q;VDU$bYTOQ) zB)*65&kyApc#h^*PRbwawieopd`#%KVC`g8o_*I+81F*0M~)oianSum1bc`-bqk!Y zpx?vu4e~vl^Kio-tx)ua*l%&eALfz9>&xV}d-|)7GzWEXVqw=`nHtgWi9D z+eaF-J$w8j`|D({pV>4nve;j9#4lD!#Fwgz-<4&fVZEzWJd&;Llii`68}^64Q2Sk} z_9XOx z-k1XWsYvCx)OGgzMDL8?`i4zqx$NS#hS_tU*XC>%;tkY4;|M%AMjEId%CQZw&Y@jk zoJ5NCnY7k`ko?Qk_jA2uS?;|teEid2gn!=};t%6dTWIrT9a`p49xGq-PzA5BWE!c$fdg)q1d8R!He`+Pzhhf{E2R#@3p}sMG zIhT|1M=oB>)7LfnA1X!sp+2hu-<4-mKH>S0@rQEPo#rt9C`{Lv%uK6p;~k==G#8F-%H&n{i^ zzdrke?YHM0-Nte`#Gm$9b)R~2yQEYdF3(m~4oVO0XzihWm$Kw1?Ix}Rb2~17?a{xI zLW&RZhxXO`U|d1oAJrAuXO`u(k2Zoo{YC}l7rhGmY=}Rtlnbt3`;FVpYxvV|g?&Cb zA8dgATxwQm?+uYraQ!2d?dLUK|3U0`edZIqzmR6tU;S)?fGh3ow}I~r_kVT%PR=6@ zwmoAKJa3{AuYlciZAE&W=g)!0qd&6_@{kVSF$`4CO^)jQAP+9_lT-G0VO|i=6}#EX zsd8E=u=TzA)f}JkxcMjJPnKh8KXSWz)9F6#Yul53)}vi==*HTALBHK7+cGtvxKW=D zg>{zk@_!ul)LE+?%as;qg>rfc-gig4IYnjs(T2|b=MR5s+0KIh#$EP&&*l1)O2KYw|l-Qb*|j6c1(_*~4NolxuD@Q3!))Fnc^ zSgvo}EtDG%iTfq$aX2Tgd+q1H4~owXGx2vXoHsB|RuSxBJHb*`^K`b@3AWvwl4ol& zcs$0HLlBpQziZm9FYa@&uC|_iJ!{*)0DF!Q zO>llxx%>{~LGn-0#rp2tf7yqyUZ68R z=R8uBQ#&q#JY=}w67$gmu4CnO3i38pi@=A)dl?tM9>b9;b27aRs^|3;P5qc-v9X}a1T@{lCXgV^uB`%8{nD{(%cd}^EE z`IEBQ?neoK>QmhUvLm-I^W?k9f*tKO=XE(r13$EUFmI6U7Y9hWC^V8-L2Y3jwvGHWS~|QMIlmhcrH~gU^*wMOCUgyN z2g{A7&vPEBRtd&EWe(t(JOTS9`e2x6AT<&Xa6SF)4}sQsQm~8m)pjuNLq1Gh&hOga z>2S-BI#?V}v|8eP!}u`Lx!)zc(N~gqKWRO$IQLhpDZJcx;~=Nx>Q|hqC8U3+#7|-?NFt0;7qs}kT{sveV(GsIM&laxcRBs9U%i407@3DP0{z?qt z^=OAh^y0j_3MiI`sfa&yp#ATkNcd9+ZWZz==S?dJ;|=Qo{i5vu-KRY2t0!!E36~@e z?r-|xyV^hLm>X5-H&1pVdDPx3X4H*Ru>Kde{J?t(lm0bCj-ApbJ{ zls-E{$|vGaYXSR$#yD7?8Qq?NIE&2X6!B;5y$bx3Fdrth;eJ3Dyv57)_u;v+@w1Jy z{2nO(rZwo<_@l!obDmaM#wo_@kFqjHxIAe-G$=iPcJoTT-XIO))9mp_8lUJlm&Lf_ z?EdnDKcf@Oi(K(H5uTeGeZd~)gVg*9{28~e&p40x(|Y{~e9fmhrH{h?3T+De*oZ%J{u^Ge+zs;$x&of7ApXd5!t0HTKuK8` z4-M7EosB=`cEF!8cp;Z(U)y$i%m@2+{>frH596U+-9JF6$N2IJCHYoni+`+?xUY%$ zGd9Ay)L*=0=rF+#Z7j@}WIZ|)#xK?maIaJ755ymPZa>&58`yZkX2bVv`hp|=v%vRdz$@R2q~NLHZNzt0shE5 iJAa73RJbp&FP4BmTVR~m-aP~MY^wY*lk;*M0{?%}L`b9n literal 0 HcmV?d00001 diff --git a/granule_ingester/tests/writers/test_LocalStore.py b/granule_ingester/tests/writers/test_LocalStore.py new file mode 100644 index 0000000..1d86f57 --- /dev/null +++ b/granule_ingester/tests/writers/test_LocalStore.py @@ -0,0 +1,30 @@ +from tkinter import W +import unittest +from os import path +import xarray as xr +#from nexusproto import DataTile_pb2 as nexusproto + +from granule_ingester.processors.reading_processors import LazyLoadProcessor +from granule_ingester.writers import LocalStore +from dask.diagnostics import ProgressBar + +def main(): + reading_processor_TROPOMI = LazyLoadProcessor(variable='[methane_mixing_ratio]', + latitude='lat', + longitude='lon', + time='time') + granule_path_TROPOMI = path.join(path.dirname(__file__), '../granules/TROPOMI_methane_mixing_ratio_20200801.nc') + #granule_path_OBP = path.join(path.dirname(__file__), '../granules/OBP_native_grid.nc') + + store_path = path.join(path.dirname(__file__), '../local_zarr_store/TROPOMI_MASTER.zarr') + zarr_writer_TROPOMI = LocalStore(store_path) + + cdfDS = xr.open_dataset(granule_path_TROPOMI) + chunkShape = (1,5,5) + processed_granule = reading_processor_TROPOMI.process(cdfDS) + with ProgressBar(): + zarr_writer_TROPOMI.save_data(ds=processed_granule, cname='blosclz', + clevel=9, shuffle=1, chunkShape=chunkShape) + +if __name__ == "__main__": + main() \ No newline at end of file From 8d6eca59f49d472b9d6f1f17619208cc6f19d0a2 Mon Sep 17 00:00:00 2001 From: drfok Date: Mon, 1 Aug 2022 17:42:18 -0700 Subject: [PATCH 3/8] Initial Ingester Processor | New Ingestion History Generated --- .../ZarrSolrIngestionHistory.py | 176 ++++++++ exampleConfigMap.yml | 424 ++++++++++++++++++ .../granule_loaders/GranuleLoader.py | 5 +- .../processors/ZarrProcessor.py | 24 + .../granule_ingester/processors/__init__.py | 1 + .../GridZarrReadingProcessor.py | 63 +++ .../TileReadingProcessor.py | 7 +- .../ZarrReadingProcessor.py | 55 +++ .../processors/reading_processors/__init__.py | 1 + .../granule_ingester/writers/SolrStore.py | 22 + .../granule_ingester/writers/__init__.py | 3 +- .../granule_ingester/writers/netCDF_Store.py | 4 +- .../tests/writers/test_LocalStore.py | 70 ++- 13 files changed, 844 insertions(+), 11 deletions(-) create mode 100644 collection_manager/collection_manager/services/history_manager/ZarrSolrIngestionHistory.py create mode 100644 exampleConfigMap.yml create mode 100644 granule_ingester/granule_ingester/processors/ZarrProcessor.py create mode 100644 granule_ingester/granule_ingester/processors/reading_processors/GridZarrReadingProcessor.py create mode 100644 granule_ingester/granule_ingester/processors/reading_processors/ZarrReadingProcessor.py diff --git a/collection_manager/collection_manager/services/history_manager/ZarrSolrIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/ZarrSolrIngestionHistory.py new file mode 100644 index 0000000..675331f --- /dev/null +++ b/collection_manager/collection_manager/services/history_manager/ZarrSolrIngestionHistory.py @@ -0,0 +1,176 @@ +import hashlib +import logging + +import pysolr +import requests +from collection_manager.services.history_manager.IngestionHistory import (IngestionHistory, IngestionHistoryBuilder) +from collection_manager.entities.Collection import Collection +from collections import defaultdict +from common.async_utils.AsyncUtils import run_in_executor +from typing import Awaitable, Callable, Dict, List, Optional, Set + +logging.getLogger("pysolr").setLevel(logging.WARNING) +logger = logging.getLogger(__name__) + + +def doc_key(dataset_id, file_name): + return hashlib.sha1(f'{dataset_id}{file_name}'.encode('utf-8')).hexdigest() + + +class ZarrSolrIngestionHistoryBuilder(IngestionHistoryBuilder): + def __init__(self, solr_url: str, signature_fun=None): + self._solr_url = solr_url + self._signature_fun = signature_fun + + def build(self, dataset_id: str): + return ZarrSolrIngestionHistory(solr_url=self._solr_url, + dataset_id=dataset_id, + signature_fun=self._signature_fun) + + +class ZarrSolrIngestionHistory(IngestionHistory): + # TODO change names for zarrgranules and zarrdatasets + _granule_collection_name = "zarrgranules" + _dataset_collection_name = "zarrdatasets" + _req_session = None + + def __init__(self, collections_path: str, solr_url: str, dataset_id: str, signature_fun=None): + try: + self._url_prefix = f"{solr_url.strip('/')}/solr" + # TODO check method + self._create_collection_if_needed() + self.collections_path = collections_path + # TODO check if this works + self.collections_by_dir = Dict[collections_path, Set[Collection]] = defaultdict(set) + self._solr_granules = pysolr.Solr(f"{self._url_prefix}/{self._granule_collection_name}") + self._solr_datasets = pysolr.Solr(f"{self._url_prefix}/{self._dataset_collection_name}") + self._dataset_id = dataset_id + self._signature_fun = signature_fun + self._latest_ingested_file_update = self._get_latest_file_update() + except requests.exceptions.RequestException: + raise DatasetIngestionHistorySolrException(f"solr instance unreachable {solr_url}") + + def __del__(self): + self._req_session.close() + + @run_in_executor + def _push_record(self, file_name, signature): # granule-level JSON entry + hash_id = doc_key(self._dataset_id, file_name) + self._solr_granules.delete(q=f"id:{hash_id}") + self._solr_granules.add([{ + 'id': hash_id, + 'dataset_s': self._dataset_id, + 'granule_s': file_name, + 'granule_signature_s': signature}]) + self._solr_granules.commit() + return None + + @run_in_executor + def _save_latest_timestamp(self): # dataset-level JSON entry + if self._solr_datasets: + self._solr_datasets.delete(q=f"id:{self._dataset_id}") + self._solr_datasets.add([{ + 'id': self._dataset_id, + 'dataset_s': self._dataset_id, + 'latest_update_l': self._latest_ingested_file_update}]) + self._solr_datasets.commit() + #{ + #"id": "MUR25-JPL-L4-GLOB-v04.2", + #"latest_update_l": 1637629358, + #"_version_": 1718445323844583426, + #"dataset_s": "MUR25-JPL-L4-GLOB-v04.2", + #"variables": [{ + #"name_s": "analysed_sst", + #"fill_d": -32768 + #}], + #"s3_uri_s": "s3://cdms-dev-zarr/MUR25-JPL-L4-GLOB-v04.2/", + #"public_b": false, + #"type_s": "gridded", + #"chunk_shape": [30, 120, 240] + #} + + def _get_latest_file_update(self): + results = self._solr_datasets.search(q=f"id:{self._dataset_id}") + if results: + return results.docs[0]['latest_update_l'] + else: + return None + + @run_in_executor + def _get_signature(self, file_name): + hash_id = doc_key(self._dataset_id, file_name) + results = self._solr_granules.search(q=f"id:{hash_id}") + if results: + return results.docs[0]['granule_signature_s'] + else: + return None + + # TODO check relation and see if need to replace collection path + def _create_collection_if_needed(self): + try: + if not self._req_session: + self._req_session = requests.session() + + payload = {'action': 'CLUSTERSTATUS'} + collections_endpoint = f"{self._url_prefix}/admin/collections" + result = self._req_session.get(collections_endpoint, params=payload) + response = result.json() + node_number = len(response['cluster']['live_nodes']) + + existing_collections = response['cluster']['collections'].keys() + + if self._granule_collection_name not in existing_collections: + # Create collection + payload = {'action': 'CREATE', + 'name': self._granule_collection_name, + 'numShards': node_number + } + result = self._req_session.get(collections_endpoint, params=payload) + response = result.json() + logger.info(f"solr collection created {response}") + + # Update schema + schema_endpoint = f"{self._url_prefix}/{self._granule_collection_name}/schema" + self._add_field(schema_endpoint, "dataset_s", "string") + self._add_field(schema_endpoint, "granule_s", "string") + self._add_field(schema_endpoint, "granule_signature_s", "string") + + if self._dataset_collection_name not in existing_collections: + # Create collection + payload = {'action': 'CREATE', + 'name': self._dataset_collection_name, + 'numShards': node_number + } + result = self._req_session.get(collections_endpoint, params=payload) + response = result.json() + logger.info(f"solr collection created {response}") + + # Update schema + schema_endpoint = f"{self._url_prefix}/{self._dataset_collection_name}/schema" + self._add_field(schema_endpoint, "dataset_s", "string") + self._add_field(schema_endpoint, "latest_update_l", "TrieLongField") + + except requests.exceptions.RequestException as e: + logger.error(f"solr instance unreachable {self._solr_url}") + raise e + + def _add_field(self, schema_url, field_name, field_type): + """ + Helper to add a string field in a solr schema + :param schema_url: + :param field_name: + :param field_type + :return: + """ + add_field_payload = { + "add-field": { + "name": field_name, + "type": field_type, + "stored": False + } + } + return self._req_session.post(schema_url, data=str(add_field_payload).encode('utf-8')) + + +class DatasetIngestionHistorySolrException(Exception): + pass diff --git a/exampleConfigMap.yml b/exampleConfigMap.yml new file mode 100644 index 0000000..c147a86 --- /dev/null +++ b/exampleConfigMap.yml @@ -0,0 +1,424 @@ +apiVersion: v1 +data: + collections.yml: |2 + - id: ECCO_v4_r4_EVELMASS_latlon + path: /data/datasets/ecco-distribution-archive/nexus-ingest/EVELMASSv4r4/*.nc + priority: 6 + forward-processing-priority: 7 + projection: Grid + dimensionNames: + latitude: latitude + longitude: longitude + depth: Z + time: time + variable: EVELMASS + slices: + time: 1 + i: 30 + j: 30 + k: 1 + + - id: ECCO_v4_r4_ETAN_latlon + path: /data/datasets/ecco-distribution-archive/nexus-ingest/ETANv4r4/*.nc + priority: 6 + forward-processing-priority: 7 + projection: Grid + dimensionNames: + latitude: latitude + longitude: longitude + time: time + variable: ETAN + slices: + time: 1 + i: 30 + j: 30 + + - id: ECCO_v4_r4_EXFqnet_latlon + path: /data/datasets/ecco-distribution-archive/nexus-ingest/EXFqnetv4r4/*.nc + priority: 6 + forward-processing-priority: 7 + projection: Grid + dimensionNames: + latitude: latitude + longitude: longitude + time: time + variable: EXFqnet + slices: + time: 1 + i: 30 + j: 30 + + - id: ECCO_v4_r4_EXFtaue_latlon + path: /data/datasets/ecco-distribution-archive/nexus-ingest/EXFtauev4r4/*.nc + priority: 6 + forward-processing-priority: 7 + projection: Grid + dimensionNames: + latitude: latitude + longitude: longitude + time: time + variable: EXFtaue + slices: + time: 1 + i: 30 + j: 30 + + - id: ECCO_v4_r4_EXFtaun_latlon + path: /data/datasets/ecco-distribution-archive/nexus-ingest/EXFtaunv4r4/*.nc + priority: 6 + forward-processing-priority: 7 + projection: Grid + dimensionNames: + latitude: latitude + longitude: longitude + time: time + variable: EXFtaun + slices: + time: 1 + i: 30 + j: 30 + + - id: ECCO_v4_r4_OBP_latlon + path: /data/datasets/ecco-distribution-archive/nexus-ingest/OBPv4r4/*.nc + priority: 6 + forward-processing-priority: 7 + projection: Grid + dimensionNames: + latitude: latitude + longitude: longitude + time: time + variable: OBP + slices: + time: 1 + i: 30 + j: 30 + + - id: ECCO_v4_r4_OBPNOPAB_latlon + path: /data/datasets/ecco-distribution-archive/nexus-ingest/OBPNOPABv4r4/*.nc + priority: 6 + forward-processing-priority: 7 + projection: Grid + dimensionNames: + latitude: latitude + longitude: longitude + time: time + variable: OBPNOPAB + slices: + time: 1 + i: 30 + j: 30 + + - id: ECCO_v4_r4_oceQnet_latlon + path: /data/datasets/ecco-distribution-archive/nexus-ingest/oceQnetv4r4/*.nc + priority: 6 + forward-processing-priority: 7 + projection: Grid + dimensionNames: + latitude: latitude + longitude: longitude + time: time + variable: oceQnet + slices: + time: 1 + i: 30 + j: 30 + + - id: ECCO_v4_r4_oceTAUE_latlon + path: /data/datasets/ecco-distribution-archive/nexus-ingest/oceTAUEv4r4/*.nc + priority: 6 + forward-processing-priority: 7 + projection: Grid + dimensionNames: + latitude: latitude + longitude: longitude + time: time + variable: oceTAUE + slices: + time: 1 + i: 30 + j: 30 + + - id: ECCO_v4_r4_oceTAUN_latlon + path: /data/datasets/ecco-distribution-archive/nexus-ingest/oceTAUNv4r4/*.nc + priority: 6 + forward-processing-priority: 7 + projection: Grid + dimensionNames: + latitude: latitude + longitude: longitude + time: time + variable: oceTAUN + slices: + time: 1 + i: 30 + j: 30 + + - id: ECCO_v4_r4_PHIBOT_latlon + path: /data/datasets/ecco-distribution-archive/nexus-ingest/PHIBOTv4r4/*.nc + priority: 6 + forward-processing-priority: 7 + projection: Grid + dimensionNames: + latitude: latitude + longitude: longitude + time: time + variable: PHIBOT + slices: + time: 1 + i: 30 + j: 30 + + - id: ECCO_v4_r4_SALT_SURFACE_latlon + path: /data/datasets/ecco-distribution-archive/nexus-ingest/SALTv4r4_layer_0/*.nc + priority: 6 + forward-processing-priority: 7 + projection: Grid + dimensionNames: + latitude: latitude + longitude: longitude + time: time + variable: SALT + slices: + time: 1 + i: 30 + j: 30 + + - id: ECCO_v4_r4_SIarea_latlon + path: /data/datasets/ecco-distribution-archive/nexus-ingest/SIareav4r4/*.nc + priority: 6 + forward-processing-priority: 7 + projection: Grid + dimensionNames: + latitude: latitude + longitude: longitude + time: time + variable: SIarea + slices: + time: 1 + i: 30 + j: 30 + + - id: ECCO_v4_r4_SIheff_latlon + path: /data/datasets/ecco-distribution-archive/nexus-ingest/SIheffv4r4/*.nc + priority: 6 + forward-processing-priority: 7 + projection: Grid + dimensionNames: + latitude: latitude + longitude: longitude + time: time + variable: SIheff + slices: + time: 1 + i: 30 + j: 30 + + - id: ECCO_v4_r4_SIhsnow_latlon + path: /data/datasets/ecco-distribution-archive/nexus-ingest/SIhsnowv4r4/*.nc + priority: 6 + forward-processing-priority: 7 + projection: Grid + dimensionNames: + latitude: latitude + longitude: longitude + time: time + variable: SIhsnow + slices: + time: 1 + i: 30 + j: 30 + + - id: ECCO_v4_r4_SSH_latlon + path: /data/datasets/ecco-distribution-archive/nexus-ingest/SSHv4r4/*.nc + priority: 6 + forward-processing-priority: 7 + projection: Grid + dimensionNames: + latitude: latitude + longitude: longitude + time: time + variable: SSH + slices: + time: 1 + i: 30 + j: 30 + + - id: ECCO_v4_r4_SSHDYN_latlon + path: /data/datasets/ecco-distribution-archive/nexus-ingest/SSHDYNv4r4/*.nc + priority: 6 + forward-processing-priority: 7 + projection: Grid + dimensionNames: + latitude: latitude + longitude: longitude + time: time + variable: SSHDYN + slices: + time: 1 + i: 30 + j: 30 + + - id: ECCO_v4_r4_TFLUX_latlon + path: /data/datasets/ecco-distribution-archive/nexus-ingest/TFLUXv4r4/*.nc + priority: 6 + forward-processing-priority: 7 + projection: Grid + dimensionNames: + latitude: latitude + longitude: longitude + time: time + variable: TFLUX + slices: + time: 1 + i: 30 + j: 30 + + - id: ECCO_v4_r4_THETA_SURFACE_latlon + path: /data/datasets/ecco-distribution-archive/nexus-ingest/THETAv4r4_layer_0/*.nc + priority: 6 + forward-processing-priority: 7 + projection: Grid + dimensionNames: + latitude: latitude + longitude: longitude + time: time + variable: THETA + slices: + time: 1 + i: 30 + j: 30 + + - id: ECCO_v4_r4_UVELMASS_latlon + path: /data/datasets/ecco-distribution-archive/nexus-ingest/UVELMASSv4r4/*.nc + priority: 6 + forward-processing-priority: 7 + projection: Grid + dimensionNames: + latitude: latitude + longitude: longitude + depth: Z + time: time + variable: UVELMASS + slices: + time: 1 + i: 30 + j: 30 + k: 1 + + - id: eccov4r4-obp + path: /data/datasets/ECCOv4r4/nctiles_daily/OBP/*.nc + priority: 6 + forward-processing-priority: 7 + projection: ECCO + dimensionNames: + latitude: YC + longitude: XC + time: time + tile: tile + variable: OBP + slices: + time: 1 + tile: 1 + i: 30 + j: 30 + + - id: mur25-jpl-l4-glob-v41-analysed-sst + path: /data/datasets/MUR25_L4_GLOB_V4.1/daily_data/*.nc + priority: 2 + forward-processing-priority: 5 + projection: Grid + dimensionNames: + latitude: lat + longitude: lon + time: time + variable: analysed_sst + slices: + time: 1 + lat: 30 + lon: 30 + + + - id: smap-l2b-sss-daily-smap-sss + path: /data/datasets/SMAP_L2B_SSS/daily_data/SMAP_L2B_SSS_*.h5 + priority: 4 + forward-processing-priority: 5 + projection: Grid + dimensionNames: + latitude: lat + longitude: lon + time: row_time + variable: smap_sss + slices: + phony_dim_0: 30 + phony_dim_1: 30 + + - id: modis-l3-aqua-11um-v2014-0-4km-daily-sst + path: /data/datasets/MODIS_L3_AQUA_11UM_V2014.0_4KM_DAILY/daily_data/A*.L3m_DAY_SST_sst_4km.nc + priority: 3 + forward-processing-priority: 5 + projection: Grid + dimensionNames: + latitude: lat + longitude: lon + variable: sst + slices: + lat: 30 + lon: 30 + + - id: avhrr-l4-glob-v2-daily-ncei-ghrsst-sstblend-avhrr-oi-glob-v020-fv020 + path: /data/datasets/AVHRR_L4_GLOB_V2/daily_data/*-NCEI-L4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc + priority: 3 + forward-processing-priority: 5 + projection: Grid + dimensionNames: + latitude: lat + longitude: lon + time: time + variable: analysed_sst + slices: + lat: 30 + lon: 30 + time: 1 + + - id: avhrr-l4-glob-v2-clim-seasonal-ncei-ghrsst-sstblend-avhrr-oi-glob-v020-fv020 + path: /data/datasets/AVHRR_L4_GLOB_V2/climatology_seasonal/ncei_*.nc + priority: 1 + forward-processing-priority: 5 + projection: Grid + dimensionNames: + latitude: lat + longitude: lon + variable: mean_sst + slices: + lat: 30 + lon: 30 + + - id: HLS-test + path: /data/datasets/HLS/HLS.S30.T06VUM.2015319.v1.4.h5 + priority: 1 + projection: Grid + dimensionNames: + latitude: YDim_Grid + longitude: XDim_Grid + variable: B01 + slices: + YDim_Grid: 30 + XDim_Grid: 30 +kind: ConfigMap +metadata: + creationTimestamp: "2021-03-01T17:33:54Z" + managedFields: + - apiVersion: v1 + fieldsType: FieldsV1 + fieldsV1: + f:data: + .: {} + f:collections.yml: {} + manager: kubectl-create + operation: Update + time: "2021-03-01T17:33:54Z" + name: collections-config + namespace: default + resourceVersion: "72403500" + selfLink: /api/v1/namespaces/default/configmaps/collections-config + uid: ec7ba7cd-07ba-464f-b30d-89d2585f8f42 diff --git a/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py b/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py index 6377de0..b80e176 100644 --- a/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py +++ b/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py @@ -16,6 +16,7 @@ import logging import os import tempfile +from typing import Tuple from urllib import parse import aioboto3 @@ -40,8 +41,8 @@ async def __aenter__(self): async def __aexit__(self, type, value, traceback): if self._granule_temp_file: self._granule_temp_file.close() - - async def open(self) -> (xr.Dataset, str): + # @TODO causes error. Test to see if Tuple change works with code + async def open(self) -> Tuple[xr.Dataset, str]: resource_url = parse.urlparse(self._resource) if resource_url.scheme == 's3': # We need to save a reference to the temporary granule file so we can delete it when the context manager diff --git a/granule_ingester/granule_ingester/processors/ZarrProcessor.py b/granule_ingester/granule_ingester/processors/ZarrProcessor.py new file mode 100644 index 0000000..6d5e59e --- /dev/null +++ b/granule_ingester/granule_ingester/processors/ZarrProcessor.py @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from nexusproto.serialization import from_shaped_array, to_shaped_array +import xarray as xr + + +# TODO: make this an informal interface, not an abstract class +class ZarrProcessor(): + + def process(self, ds: xr.Dataset, *args, **kwargs) -> None: + pass \ No newline at end of file diff --git a/granule_ingester/granule_ingester/processors/__init__.py b/granule_ingester/granule_ingester/processors/__init__.py index 8da7bba..8b0c2ac 100644 --- a/granule_ingester/granule_ingester/processors/__init__.py +++ b/granule_ingester/granule_ingester/processors/__init__.py @@ -6,3 +6,4 @@ from granule_ingester.processors.Subtract180FromLongitude import Subtract180FromLongitude from granule_ingester.processors.ForceAscendingLatitude import ForceAscendingLatitude from granule_ingester.processors.NetCDFProcessor import NetCDFProcessor +from granule_ingester.processors.ZarrProcessor import ZarrProcessor diff --git a/granule_ingester/granule_ingester/processors/reading_processors/GridZarrReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/GridZarrReadingProcessor.py new file mode 100644 index 0000000..ed74461 --- /dev/null +++ b/granule_ingester/granule_ingester/processors/reading_processors/GridZarrReadingProcessor.py @@ -0,0 +1,63 @@ +from typing import Dict + +import cftime +import numpy as np +import xarray as xr +from nexusproto import DataTile_pb2 as nexusproto +from nexusproto.serialization import to_shaped_array + +from granule_ingester.processors.reading_processors.ZarrReadingProcessor import ZarrReadingProcessor + + +class GridZarrReadingProcessor(ZarrReadingProcessor): + def __init__(self, variable, latitude, longitude, depth=None, time=None, **kwargs): + super().__init__(variable, latitude, longitude, **kwargs) + if isinstance(variable, list) and len(variable) != 1: + raise RuntimeError(f'TimeSeriesReadingProcessor does not support multiple variable: {variable}') + self.depth = depth + self.time = time + + # TODO generate after L2 v L4 meeting + def process(self, tile, dataset: xr.Dataset, *args, **kwargs): + pass + +# def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile): + #data_variable = self.variable[0] if isinstance(self.variable, list) else self.variable + #new_tile = nexusproto.GridTile() + + #lat_subset = ds[self.latitude][type(self)._slices_for_variable(ds[self.latitude], dimensions_to_slices)] + #lon_subset = ds[self.longitude][type(self)._slices_for_variable(ds[self.longitude], dimensions_to_slices)] + #lat_subset = np.ma.filled(np.squeeze(lat_subset), np.NaN) + #lon_subset = np.ma.filled(np.squeeze(lon_subset), np.NaN) + + #data_subset = ds[data_variable][type(self)._slices_for_variable(ds[data_variable], + #dimensions_to_slices)] + #data_subset = np.ma.filled(np.squeeze(data_subset), np.NaN) + + #if self.depth: + #depth_dim, depth_slice = list(type(self)._slices_for_variable(ds[self.depth], + #dimensions_to_slices).items())[0] + #depth_slice_len = depth_slice.stop - depth_slice.start + #if depth_slice_len > 1: + #raise RuntimeError( + #"Depth slices must have length 1, but '{dim}' has length {dim_len}.".format(dim=depth_dim, + #dim_len=depth_slice_len)) + #new_tile.depth = ds[self.depth][depth_slice].item() + + #if self.time: + #time_slice = dimensions_to_slices[self.time] + #time_slice_len = time_slice.stop - time_slice.start + #if time_slice_len > 1: + #raise RuntimeError( + #"Time slices must have length 1, but '{dim}' has length {dim_len}.".format(dim=self.time, + #dim_len=time_slice_len)) + #if isinstance(ds[self.time][time_slice.start].item(), cftime.datetime): + #ds[self.time] = ds.indexes[self.time].to_datetimeindex() + #new_tile.time = int(ds[self.time][time_slice.start].item() / 1e9) + + #new_tile.latitude.CopyFrom(to_shaped_array(lat_subset)) + #new_tile.longitude.CopyFrom(to_shaped_array(lon_subset)) + #new_tile.variable_data.CopyFrom(to_shaped_array(data_subset)) + + #input_tile.tile.grid_tile.CopyFrom(new_tile) + #return input_tile diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py index 68561e2..e3fa8ec 100644 --- a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py +++ b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py @@ -32,6 +32,7 @@ class TileReadingProcessor(TileProcessor, ABC): def __init__(self, variable: Union[str, list], latitude: str, longitude: str, *args, **kwargs): try: + # TODO variable in test cases are being passed in as just lists, and is not passable through json.loads() self.variable = json.loads(variable) except Exception as e: logger.exception(f'failed to convert literal list to python list. using as a single variable: {variable}') @@ -54,11 +55,7 @@ def process(self, tile, dataset: xr.Dataset, *args, **kwargs): return self._generate_tile(dataset, dimensions_to_slices, output_tile) except Exception as e: raise TileProcessingError(f"Could not generate tiles from the granule because of the following error: {e}.") - - @abstractmethod - def _generate_tile(self, dataset: xr.Dataset, dimensions_to_slices: Dict[str, slice], tile): - pass - + @classmethod def _parse_input(cls, the_input_tile, temp_dir): specs = the_input_tile.summary.section_spec diff --git a/granule_ingester/granule_ingester/processors/reading_processors/ZarrReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/ZarrReadingProcessor.py new file mode 100644 index 0000000..d1f0b14 --- /dev/null +++ b/granule_ingester/granule_ingester/processors/reading_processors/ZarrReadingProcessor.py @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import json +import logging +from abc import ABC, abstractmethod +from typing import Dict, Union + +import numpy as np +import xarray as xr + + +from granule_ingester.processors.ZarrProcessor import ZarrProcessor + +logger = logging.getLogger(__name__) + + +class ZarrReadingProcessor(ZarrProcessor, ABC): + + def __init__(self, variable: Union[str, list], latitude: str, longitude: str, *args, **kwargs): + try: + # TODO variable in test cases are being passed in as just lists, and is not passable through json.loads() + self.variable = json.loads(variable) + except Exception as e: + logger.exception(f'failed to convert literal list to python list. using as a single variable: {variable}') + self.variable = variable + if isinstance(self.variable, list) and len(self.variable) < 1: + logger.error(f'variable list is empty: {ZarrReadingProcessor}') + raise RuntimeError(f'variable list is empty: {self.variable}') + self.latitude = latitude + self.longitude = longitude + + @abstractmethod + def process(self, tile, dataset: xr.Dataset, *args, **kwargs): + pass + + @staticmethod + def _convert_to_timestamp(times: xr.DataArray) -> xr.DataArray: + if times.dtype == np.float32: + return times + epoch = np.datetime64(datetime.datetime(1970, 1, 1, 0, 0, 0)) + return ((times - epoch) / 1e9).astype(int) diff --git a/granule_ingester/granule_ingester/processors/reading_processors/__init__.py b/granule_ingester/granule_ingester/processors/reading_processors/__init__.py index 7e0b9e4..095aa0b 100644 --- a/granule_ingester/granule_ingester/processors/reading_processors/__init__.py +++ b/granule_ingester/granule_ingester/processors/reading_processors/__init__.py @@ -6,3 +6,4 @@ from granule_ingester.processors.reading_processors.TileReadingProcessor import TileReadingProcessor from granule_ingester.processors.reading_processors.TimeSeriesReadingProcessor import TimeSeriesReadingProcessor from granule_ingester.processors.reading_processors.LazyLoadProcessor import LazyLoadProcessor +from granule_ingester.processors.reading_processors.ZarrReadingProcessor import ZarrReadingProcessor diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py index 5c5f088..2ee1f81 100644 --- a/granule_ingester/granule_ingester/writers/SolrStore.py +++ b/granule_ingester/granule_ingester/writers/SolrStore.py @@ -16,6 +16,7 @@ import asyncio import functools import json +import netCDF4 import logging from asyncio import AbstractEventLoop from datetime import datetime @@ -111,6 +112,11 @@ async def save_metadata(self, nexus_tile: NexusTile) -> None: logger.debug(f'solr_doc: {solr_doc}') await self._save_document(solr_doc) + async def save_metadata_cdfDS(self, ds: netCDF4._netCDF4) -> None: + solr_doc = self._build_solr_doc(nexus_tile) + logger.debug(f'solr_doc: {solr_doc}') + await self._save_document(solr_doc) + @run_in_executor def _save_document(self, doc: dict): try: @@ -120,6 +126,22 @@ def _save_document(self, doc: dict): raise SolrLostConnectionError(f'Lost connection to Solr, and cannot save tiles. cause: {e}') def _build_solr_doc(self, tile: NexusTile) -> Dict: + + #{ + #"id": "MUR25-JPL-L4-GLOB-v04.2", + #"latest_update_l": 1637629358, + #"_version_": 1718445323844583426, + #"dataset_s": "MUR25-JPL-L4-GLOB-v04.2", + #"variables": [{ + #"name_s": "analysed_sst", + #"fill_d": -32768 + #}], + #"s3_uri_s": "s3://cdms-dev-zarr/MUR25-JPL-L4-GLOB-v04.2/", + #"public_b": false, + #"type_s": "gridded", + #"chunk_shape": [30, 120, 240] + #} + summary: TileSummary = tile.summary bbox: TileSummary.BBox = summary.bbox stats: TileSummary.DataStats = summary.stats diff --git a/granule_ingester/granule_ingester/writers/__init__.py b/granule_ingester/granule_ingester/writers/__init__.py index ce4d80a..9ab0b60 100644 --- a/granule_ingester/granule_ingester/writers/__init__.py +++ b/granule_ingester/granule_ingester/writers/__init__.py @@ -3,4 +3,5 @@ from granule_ingester.writers.SolrStore import SolrStore from granule_ingester.writers.CassandraStore import CassandraStore from granule_ingester.writers.netCDF_Store import netCDF_Store -from granule_ingester.writers.LocalStore import LocalStore \ No newline at end of file +from granule_ingester.writers.LocalStore import LocalStore +from granule_ingester.writers.netCDF_Store import netCDF_Store \ No newline at end of file diff --git a/granule_ingester/granule_ingester/writers/netCDF_Store.py b/granule_ingester/granule_ingester/writers/netCDF_Store.py index 277e70d..ede59c2 100644 --- a/granule_ingester/granule_ingester/writers/netCDF_Store.py +++ b/granule_ingester/granule_ingester/writers/netCDF_Store.py @@ -1,6 +1,6 @@ from abc import ABC, abstractmethod -import netCDF4 +import xarray as xr from granule_ingester.healthcheck import HealthCheck @@ -8,6 +8,6 @@ class netCDF_Store(ABC): @abstractmethod - def save_data(self, ds: netCDF4._netCDF4) -> None: + def save_data(self, ds: xr.Dataset) -> None: pass \ No newline at end of file diff --git a/granule_ingester/tests/writers/test_LocalStore.py b/granule_ingester/tests/writers/test_LocalStore.py index 1d86f57..a585679 100644 --- a/granule_ingester/tests/writers/test_LocalStore.py +++ b/granule_ingester/tests/writers/test_LocalStore.py @@ -2,12 +2,78 @@ import unittest from os import path import xarray as xr +import netCDF4 +import json +from typing import Dict +from datetime import datetime +from pathlib import Path #from nexusproto import DataTile_pb2 as nexusproto from granule_ingester.processors.reading_processors import LazyLoadProcessor from granule_ingester.writers import LocalStore from dask.diagnostics import ProgressBar +def _build_solr_doc(self, ds: netCDF4._netCDF4) -> Dict: + summary: TileSummary = tile.summary + bbox: TileSummary.BBox = summary.bbox + stats: TileSummary.DataStats = summary.stats + + min_time = datetime.strftime(datetime.utcfromtimestamp(stats.min_time), self.iso) + max_time = datetime.strftime(datetime.utcfromtimestamp(stats.max_time), self.iso) + day_of_year = datetime.utcfromtimestamp(stats.min_time).timetuple().tm_yday + geo = self.determine_geo(bbox) + + granule_file_name: str = Path(summary.granule).name # get base filename + + tile_type = tile.tile.WhichOneof("tile_type") + tile_data = getattr(tile.tile, tile_type) + + var_names = json.loads(summary.data_var_name) + standard_names = [] + if summary.standard_name: + standard_names = json.loads(summary.standard_name) + if not isinstance(var_names, list): + var_names = [var_names] + if not isinstance(standard_names, list): + standard_names = [standard_names] + + input_document = { + 'table_s': self.TABLE_NAME, + 'geo': geo, + 'id': summary.tile_id, + 'solr_id_s': '{ds_name}!{tile_id}'.format(ds_name=summary.dataset_name, tile_id=summary.tile_id), + 'sectionSpec_s': summary.section_spec, + 'dataset_s': summary.dataset_name, + 'granule_s': granule_file_name, + 'tile_var_name_ss': var_names, + 'day_of_year_i': day_of_year, + 'tile_min_lon': bbox.lon_min, + 'tile_max_lon': bbox.lon_max, + 'tile_min_lat': bbox.lat_min, + 'tile_max_lat': bbox.lat_max, + 'tile_depth': tile_data.depth, + 'tile_min_time_dt': min_time, + 'tile_max_time_dt': max_time, + 'tile_min_val_d': stats.min, + 'tile_max_val_d': stats.max, + 'tile_avg_val_d': stats.mean, + 'tile_count_i': int(stats.count) + } + + for var_name, standard_name in zip(var_names, standard_names): + if standard_name: + input_document[f'{var_name}.tile_standard_name_s'] = standard_name + + ecco_tile_id = getattr(tile_data, 'tile', None) + if ecco_tile_id: + input_document['ecco_tile'] = ecco_tile_id + + for attribute in summary.global_attributes: + input_document[attribute.getName()] = attribute.getValues( + 0) if attribute.getValuesCount() == 1 else attribute.getValuesList() + + return input_document + def main(): reading_processor_TROPOMI = LazyLoadProcessor(variable='[methane_mixing_ratio]', latitude='lat', @@ -26,5 +92,7 @@ def main(): zarr_writer_TROPOMI.save_data(ds=processed_granule, cname='blosclz', clevel=9, shuffle=1, chunkShape=chunkShape) + if __name__ == "__main__": - main() \ No newline at end of file + main() + \ No newline at end of file From 6933b4eec549a0fe62b8b687f9f0befe0b463754 Mon Sep 17 00:00:00 2001 From: drfok Date: Fri, 19 Aug 2022 09:26:59 -0700 Subject: [PATCH 4/8] CM and GI Zarr Work, minor bug fixes, dependncy updates --- .../services/CollectionWatcher.py | 2 +- .../services/ZarrCollectionProcessor.py | 112 +++++ .../collection_manager/services/__init__.py | 1 + .../ZarrSolrIngestionHistory.py | 86 ++-- .../services/history_manager/__init__.py | 1 + .../collection_manager/zarr_main.py | 100 +++++ collection_manager/requirements.txt | 5 +- .../tests/resources/zarr_collections.yml | 50 +++ .../test_SolrIngestionHistory.py | 44 -- exampleConfigMap.yml | 424 ------------------ granule_ingester/conda-requirements.txt | 2 +- .../granule_loaders/GranuleLoader.py | 2 +- .../granule_ingester/pipeline/ZarrPipeline.py | 211 +++++++++ .../processors/ZarrProcessor.py | 100 ++++- .../GridZarrReadingProcessor.py | 63 --- .../ZarrReadingProcessor.py | 55 --- .../processors/reading_processors/__init__.py | 2 - 17 files changed, 636 insertions(+), 624 deletions(-) create mode 100644 collection_manager/collection_manager/services/ZarrCollectionProcessor.py create mode 100644 collection_manager/collection_manager/zarr_main.py create mode 100644 collection_manager/tests/resources/zarr_collections.yml delete mode 100644 collection_manager/tests/services/history_manager/test_SolrIngestionHistory.py delete mode 100644 exampleConfigMap.yml create mode 100644 granule_ingester/granule_ingester/pipeline/ZarrPipeline.py delete mode 100644 granule_ingester/granule_ingester/processors/reading_processors/GridZarrReadingProcessor.py delete mode 100644 granule_ingester/granule_ingester/processors/reading_processors/ZarrReadingProcessor.py diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py index 1226351..4debac7 100644 --- a/collection_manager/collection_manager/services/CollectionWatcher.py +++ b/collection_manager/collection_manager/services/CollectionWatcher.py @@ -30,7 +30,7 @@ def __init__(self, s3_bucket: Optional[str] = None, collections_refresh_interval: float = 30): if not os.path.isabs(collections_path): - raise RelativePathError("Collections config path must be an absolute path.") + raise RelativePathError("Collections config path must be an absolute path.") self._collections_path = collections_path self._granule_updated_callback = granule_updated_callback diff --git a/collection_manager/collection_manager/services/ZarrCollectionProcessor.py b/collection_manager/collection_manager/services/ZarrCollectionProcessor.py new file mode 100644 index 0000000..4bc439e --- /dev/null +++ b/collection_manager/collection_manager/services/ZarrCollectionProcessor.py @@ -0,0 +1,112 @@ +import logging +import os.path +from glob import glob +from typing import Dict +from datetime import datetime + +import yaml +from collection_manager.entities import Collection +from collection_manager.services import MessagePublisher +from collection_manager.services.history_manager import (GranuleStatus, + IngestionHistory) +from collection_manager.services.history_manager.IngestionHistory import \ + ZarrIngestionHistoryBuilder + +logger = logging.getLogger(__name__) + +SUPPORTED_FILE_EXTENSIONS = ['.nc', '.nc4', '.h5'] + +# TODO generate tests for Zarr Collection Processor +class ZarrCollectionProcessor: + + def __init__(self, message_publisher: MessagePublisher, history_manager_builder: ZarrIngestionHistoryBuilder): + self._publisher = message_publisher + self._history_manager_builder = history_manager_builder + self._history_manager_cache: Dict[str, IngestionHistory] = {} + + async def process_granule(self, granule: str, modified_time: int, collection: Collection): + """ + Determine whether a granule needs to be ingested, and if so publish a RabbitMQ message for it. + :param granule: A path to a granule file + :param collection: A Collection against which to evaluate the granule + :return: None + """ + if not self._file_supported(granule): + return + + history_manager = self._get_history_manager(collection.dataset_id) + granule_status = await history_manager.get_granule_status(granule, + modified_time, + collection.date_from, + collection.date_to) + + if granule_status is GranuleStatus.DESIRED_FORWARD_PROCESSING: + logger.info(f"New granule '{granule}' detected for forward-processing ingestion " + f"in collection '{collection.dataset_id}'.") + if collection.forward_processing_priority is not None: + use_priority = collection.forward_processing_priority + else: + use_priority = collection.historical_priority + elif granule_status is GranuleStatus.DESIRED_HISTORICAL: + logger.info(f"New granule '{granule}' detected for historical ingestion in collection " + f"'{collection.dataset_id}'.") + use_priority = collection.historical_priority + else: + logger.debug(f"Granule '{granule}' detected but has already been ingested in " + f"collection '{collection.dataset_id}'. Skipping.") + return + + dataset_config = self._generate_ingestion_message(granule, collection) + await self._publisher.publish_message(body=dataset_config, priority=use_priority) + await history_manager.push(granule, modified_time) + + @staticmethod + def _file_supported(file_path: str): + ext = os.path.splitext(file_path)[-1] + return ext in SUPPORTED_FILE_EXTENSIONS + + def _get_history_manager(self, dataset_id: str) -> IngestionHistory: + if dataset_id not in self._history_manager_cache: + self._history_manager_cache[dataset_id] = self._history_manager_builder.build(dataset_id=dataset_id) + return self._history_manager_cache[dataset_id] + + + @staticmethod + def _get_default_processors(collection: Collection): + processors = [ + { + 'name': collection.projection, + **dict(collection.dimension_names), + }, + {'name': 'emptyTileFilter'}, + {'name': 'subtract180FromLongitude'} + ] + + if collection.projection == 'Grid': + processors.append({'name': 'forceAscendingLatitude'}) + processors.append({'name': 'kelvinToCelsius'}) + processors.append({ + 'name': 'tileSummary', + 'dataset_name': collection.dataset_id + }) + processors.append({'name': 'generateTileId'}) + + return processors + + + @staticmethod + def _generate_ingestion_message(granule_path: str, collection: Collection) -> str: + + config_dict = { + 'granule': { + 'resource': granule_path + }, + 'slicer': { + 'name': 'sliceFileByStepSize', + 'dimension_step_sizes': dict(collection.slices) + }, + 'processors': ZarrCollectionProcessor._get_default_processors(collection) + } + config_str = yaml.dump(config_dict) + logger.debug(f"Templated dataset config:\n{config_str}") + return config_str diff --git a/collection_manager/collection_manager/services/__init__.py b/collection_manager/collection_manager/services/__init__.py index 553e1b7..e51f5bc 100644 --- a/collection_manager/collection_manager/services/__init__.py +++ b/collection_manager/collection_manager/services/__init__.py @@ -14,6 +14,7 @@ # limitations under the License. from .CollectionProcessor import CollectionProcessor +from .ZarrCollectionProcessor import ZarrCollectionProcessor from .CollectionWatcher import CollectionWatcher from .MessagePublisher import MessagePublisher from .S3Observer import S3Observer diff --git a/collection_manager/collection_manager/services/history_manager/ZarrSolrIngestionHistory.py b/collection_manager/collection_manager/services/history_manager/ZarrSolrIngestionHistory.py index 675331f..8e277f8 100644 --- a/collection_manager/collection_manager/services/history_manager/ZarrSolrIngestionHistory.py +++ b/collection_manager/collection_manager/services/history_manager/ZarrSolrIngestionHistory.py @@ -1,5 +1,9 @@ +from gc import collect import hashlib +from importlib.resources import path import logging +from tkinter import W +from attr import s import pysolr import requests @@ -9,6 +13,8 @@ from common.async_utils.AsyncUtils import run_in_executor from typing import Awaitable, Callable, Dict, List, Optional, Set +import yaml + logging.getLogger("pysolr").setLevel(logging.WARNING) logger = logging.getLogger(__name__) @@ -18,41 +24,71 @@ def doc_key(dataset_id, file_name): class ZarrSolrIngestionHistoryBuilder(IngestionHistoryBuilder): - def __init__(self, solr_url: str, signature_fun=None): + def __init__(self, solr_url: str, collections_path: str, signature_fun=None): self._solr_url = solr_url self._signature_fun = signature_fun + self.collections_path = collections_path def build(self, dataset_id: str): return ZarrSolrIngestionHistory(solr_url=self._solr_url, dataset_id=dataset_id, + collections_path=self.collections_path, signature_fun=self._signature_fun) -class ZarrSolrIngestionHistory(IngestionHistory): - # TODO change names for zarrgranules and zarrdatasets +class ZarrSolrIngestionHistory(IngestionHistory): _granule_collection_name = "zarrgranules" _dataset_collection_name = "zarrdatasets" _req_session = None - def __init__(self, collections_path: str, solr_url: str, dataset_id: str, signature_fun=None): + def __init__(self, solr_url: str, dataset_id: str, collections_path: str, signature_fun=None): try: self._url_prefix = f"{solr_url.strip('/')}/solr" # TODO check method self._create_collection_if_needed() self.collections_path = collections_path - # TODO check if this works - self.collections_by_dir = Dict[collections_path, Set[Collection]] = defaultdict(set) + self.collections_by_dir: Dict[collections_path, Set[Collection]] = defaultdict(set) self._solr_granules = pysolr.Solr(f"{self._url_prefix}/{self._granule_collection_name}") self._solr_datasets = pysolr.Solr(f"{self._url_prefix}/{self._dataset_collection_name}") self._dataset_id = dataset_id self._signature_fun = signature_fun self._latest_ingested_file_update = self._get_latest_file_update() + self._solr_url = solr_url + self.collections_path = collections_path except requests.exceptions.RequestException: raise DatasetIngestionHistorySolrException(f"solr instance unreachable {solr_url}") def __del__(self): self._req_session.close() + @run_in_executor + def load_dataset_metadata(self): # retrieve metadata from respective dataset in config yaml + try: + with open(self._collections_path, 'r') as f: + collections_yaml = yaml.load(f, Loader=yaml.FullLoader) + self._collections_by_dir.clear() + for collection_dict in collections_yaml['collections']: + try: + collection = Collection.from_dict(collection_dict) + if collection['id'] == self.dataset_id: + return collection + except: + print("INNER LOOP ERROR") #TODO add error handling + except: + print("OUTER LOOP ERROR") #TODO add errors handling + + return None + + def retrieve_variable_lists(self, collection): # returns array of lists with variable and their fill values + var_arr = [{"name_s": collection['dimensionNames']['variable'], + "fill_d": collection['dimensionNames']['fill_value']}] + return var_arr + + def retrieve_chunk_size(self, collection): + chunkSize = [collection['slices']['time'], collection['slices']['lat'], + collection['slices']['lon']] + return chunkSize + @run_in_executor def _push_record(self, file_name, signature): # granule-level JSON entry hash_id = doc_key(self._dataset_id, file_name) @@ -68,26 +104,20 @@ def _push_record(self, file_name, signature): # granule-level JSON entry @run_in_executor def _save_latest_timestamp(self): # dataset-level JSON entry if self._solr_datasets: + collection = self.load_dataset_metadata self._solr_datasets.delete(q=f"id:{self._dataset_id}") + chunkSize = [collection['slices']['time'], collection['slices']['lat'], + collection['slices']['lon']] self._solr_datasets.add([{ 'id': self._dataset_id, + 'latest_update_l': self._latest_ingested_file_update, 'dataset_s': self._dataset_id, - 'latest_update_l': self._latest_ingested_file_update}]) - self._solr_datasets.commit() - #{ - #"id": "MUR25-JPL-L4-GLOB-v04.2", - #"latest_update_l": 1637629358, - #"_version_": 1718445323844583426, - #"dataset_s": "MUR25-JPL-L4-GLOB-v04.2", - #"variables": [{ - #"name_s": "analysed_sst", - #"fill_d": -32768 - #}], - #"s3_uri_s": "s3://cdms-dev-zarr/MUR25-JPL-L4-GLOB-v04.2/", - #"public_b": false, - #"type_s": "gridded", - #"chunk_shape": [30, 120, 240] - #} + 'variables': self.retrieve_variable_lists(collection), + 's3_url_s': collection['path'], + 'public_b': False, # TODO support for public buckets, make this dynamic + 'type_s': collection['projection'], + 'chunk_shape': self.retrieve_chunk_size(collection)}]) + self._solr_datasets.commit() def _get_latest_file_update(self): results = self._solr_datasets.search(q=f"id:{self._dataset_id}") @@ -104,8 +134,7 @@ def _get_signature(self, file_name): return results.docs[0]['granule_signature_s'] else: return None - - # TODO check relation and see if need to replace collection path + def _create_collection_if_needed(self): try: if not self._req_session: @@ -147,9 +176,13 @@ def _create_collection_if_needed(self): # Update schema schema_endpoint = f"{self._url_prefix}/{self._dataset_collection_name}/schema" + self._add_field(schema_endpoint, "latest_update_l", "TrieLongField") # TODO TrieLongField is depricated self._add_field(schema_endpoint, "dataset_s", "string") - self._add_field(schema_endpoint, "latest_update_l", "TrieLongField") - + self._add_field(schema_endpoint, "variables", "list") + self._add_field(schema_endpoint, "s2_uri_s", "string") + self._add_field(schema_endpoint, "public_b", "bool") + self._add_field(schema_endpoint, "type_s", "string") + self._add_field(schema_endpoint, "chunk_shape", "list") except requests.exceptions.RequestException as e: logger.error(f"solr instance unreachable {self._solr_url}") raise e @@ -174,3 +207,4 @@ def _add_field(self, schema_url, field_name, field_type): class DatasetIngestionHistorySolrException(Exception): pass + \ No newline at end of file diff --git a/collection_manager/collection_manager/services/history_manager/__init__.py b/collection_manager/collection_manager/services/history_manager/__init__.py index 0f6133b..fc14b10 100644 --- a/collection_manager/collection_manager/services/history_manager/__init__.py +++ b/collection_manager/collection_manager/services/history_manager/__init__.py @@ -2,3 +2,4 @@ from .IngestionHistory import GranuleStatus from .IngestionHistory import IngestionHistory, md5sum_from_filepath from .SolrIngestionHistory import SolrIngestionHistory, SolrIngestionHistoryBuilder +from .ZarrSolrIngestionHistory import ZarrSolrIngestionHistory, ZarrSolrIngestionHistoryBuilder diff --git a/collection_manager/collection_manager/zarr_main.py b/collection_manager/collection_manager/zarr_main.py new file mode 100644 index 0000000..a85c8bf --- /dev/null +++ b/collection_manager/collection_manager/zarr_main.py @@ -0,0 +1,100 @@ +import argparse +import asyncio +import logging +import os + +from collection_manager.services import (CollectionProcessor, + CollectionWatcher, MessagePublisher) +from collection_manager.services.history_manager import ( + FileIngestionHistoryBuilder, ZarrSolrIngestionHistoryBuilder, + md5sum_from_filepath) + +logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s] [%(name)s::%(lineno)d] %(message)s") +logging.getLogger("pika").setLevel(logging.WARNING) +logger = logging.getLogger(__name__) + + +def check_path(path) -> str: + if not os.path.isabs(path): + raise argparse.ArgumentError("Paths must be absolute.") + return path + + +def get_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Watch the filesystem for new granules, and publish messages to " + "RabbitMQ whenever they become available.") + parser.add_argument("--collections-path", + help="Absolute path to collections configuration file", + metavar="PATH", + required=True) + history_group = parser.add_mutually_exclusive_group(required=True) + history_group.add_argument("--history-path", + metavar="PATH", + help="Absolute path to ingestion history local directory") + history_group.add_argument("--history-url", + metavar="URL", + help="URL to ingestion history solr database") + parser.add_argument('--rabbitmq-host', + default='localhost', + metavar='HOST', + help='RabbitMQ hostname to connect to. (Default: "localhost")') + parser.add_argument('--rabbitmq-username', + default='guest', + metavar='USERNAME', + help='RabbitMQ username. (Default: "guest")') + parser.add_argument('--rabbitmq-password', + default='guest', + metavar='PASSWORD', + help='RabbitMQ password. (Default: "guest")') + parser.add_argument('--rabbitmq-queue', + default="nexus", + metavar="QUEUE", + help='Name of the RabbitMQ queue to consume from. (Default: "nexus")') + parser.add_argument('--refresh', + default='30', + metavar="INTERVAL", + help='Number of seconds after which to reload the collections config file. (Default: 30)') + parser.add_argument('--s3-bucket', + metavar='S3-BUCKET', + help='Optional name of an AWS S3 bucket where granules are stored. If this option is set, then all collections to be scanned must have their granules on S3, not the local filesystem.') + + return parser.parse_args() + + +async def main(): + try: + options = get_args() + + signature_fun = None if options.s3_bucket else md5sum_from_filepath + + if options.history_path: + history_manager_builder = FileIngestionHistoryBuilder(history_path=options.history_path, + signature_fun=signature_fun) + else: + history_manager_builder = ZarrSolrIngestionHistoryBuilder(solr_url=options.history_url, + signature_fun=signature_fun) + async with MessagePublisher(host=options.rabbitmq_host, + username=options.rabbitmq_username, + password=options.rabbitmq_password, + queue=options.rabbitmq_queue) as publisher: + collection_processor = CollectionProcessor(message_publisher=publisher, + history_manager_builder=history_manager_builder) + collection_watcher = CollectionWatcher(collections_path=options.collections_path, + granule_updated_callback=collection_processor.process_granule, + collections_refresh_interval=int(options.refresh), + s3_bucket=options.s3_bucket) + + await collection_watcher.start_watching() + while True: + try: + await asyncio.sleep(1) + except KeyboardInterrupt: + return + + except Exception as e: + logger.exception(e) + return + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/collection_manager/requirements.txt b/collection_manager/requirements.txt index c4b6323..9e14605 100644 --- a/collection_manager/requirements.txt +++ b/collection_manager/requirements.txt @@ -1,9 +1,10 @@ PyYAML==5.3.1 -pystache==0.5.4 +pystache==0.6.0 pysolr==3.9.0 watchdog==0.10.2 requests==2.23.0 tenacity==6.2.0 aioboto3==8.0.5 aiohttp==3.7.2 -aio-pika==6.7.1 \ No newline at end of file +aio-pika==6.7.1 +#setuptools==57.5.0 \ No newline at end of file diff --git a/collection_manager/tests/resources/zarr_collections.yml b/collection_manager/tests/resources/zarr_collections.yml new file mode 100644 index 0000000..376245f --- /dev/null +++ b/collection_manager/tests/resources/zarr_collections.yml @@ -0,0 +1,50 @@ +collections: + - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_LAND + path: /opt/data/grace/*land*.nc + priority: 1 + forward-processing-priority: 5 + projection: Grid + dimensionNames: + latitude: lat + longitude: lon + time: time + variable: lwe_thickness + fill_value: -99999 # need to change + slices: + time: 1 + lat: 30 + lon: 30 + + + - id: TELLUS_GRACE_MASCON_CRI_GRID_RL05_V2_OCEAN + path: /opt/data/grace/*ocean*.nc + priority: 2 + forward-processing-priority: 6 + projection: Grid + dimensionNames: + latitude: lat + longitude: lon + time: time + variable: lwe_thickness + fill_value: -99999 # need to change + slices: + time: 1 + lat: 30 + lon: 30 + + + - id: AVHRR_OI-NCEI-L4-GLOB-v2.0 + path: /opt/data/avhrr/*.nc + priority: 1 + projection: Grid + dimensionNames: + latitude: lat + longitude: lon + time: time + variable: analysed_sst + fill_value: -32768 # need to change + slices: + time: 1 + lat: 30 + lon: 30 + diff --git a/collection_manager/tests/services/history_manager/test_SolrIngestionHistory.py b/collection_manager/tests/services/history_manager/test_SolrIngestionHistory.py deleted file mode 100644 index deab42d..0000000 --- a/collection_manager/tests/services/history_manager/test_SolrIngestionHistory.py +++ /dev/null @@ -1,44 +0,0 @@ -import unittest -from collection_manager.services.history_manager import SolrIngestionHistory - -SOLR_URL = "http://localhost:8984/solr" -DATASET_ID = "zobi_la_mouche" - - -# TODO: mock solr and fix these tests -class TestSolrIngestionHistory(unittest.TestCase): - @unittest.skip("does not work without a solr server for history_manager") - def test_get(self): - ingestion_history = SolrIngestionHistory(SOLR_URL, DATASET_ID) - - ingestion_history.push("blue", "12weeukrhbwerqu7wier") - - result = ingestion_history.get("blue") - - self.assertEqual(result.docs[0]['dataset_s'], "zobi_la_mouche") - self.assertEqual(result.docs[0]['granule_s'], "blue") - self.assertEqual(result.docs[0]['granule_md5sum_s'], "12weeukrhbwerqu7wier") - - @unittest.skip("does not work without a solr server for history_manager") - def test_get_md5sum(self): - ingestion_history = SolrIngestionHistory(SOLR_URL, DATASET_ID) - - ingestion_history.push("blue", "12weeukrhbwerqu7wier") - - result = ingestion_history.get_md5sum("blue") - - self.assertEqual(result, "12weeukrhbwerqu7wier") - - @unittest.skip("does not work without a solr server for history_manager") - def test_get_missing_md5sum(self): - ingestion_history = SolrIngestionHistory(SOLR_URL, DATASET_ID) - - ingestion_history.push("blue", "12weeukrhbwerqu7wier") - - result = ingestion_history.get_md5sum("green") - - self.assertEqual(result, None) - - -if __name__ == '__main__': - unittest.main() diff --git a/exampleConfigMap.yml b/exampleConfigMap.yml deleted file mode 100644 index c147a86..0000000 --- a/exampleConfigMap.yml +++ /dev/null @@ -1,424 +0,0 @@ -apiVersion: v1 -data: - collections.yml: |2 - - id: ECCO_v4_r4_EVELMASS_latlon - path: /data/datasets/ecco-distribution-archive/nexus-ingest/EVELMASSv4r4/*.nc - priority: 6 - forward-processing-priority: 7 - projection: Grid - dimensionNames: - latitude: latitude - longitude: longitude - depth: Z - time: time - variable: EVELMASS - slices: - time: 1 - i: 30 - j: 30 - k: 1 - - - id: ECCO_v4_r4_ETAN_latlon - path: /data/datasets/ecco-distribution-archive/nexus-ingest/ETANv4r4/*.nc - priority: 6 - forward-processing-priority: 7 - projection: Grid - dimensionNames: - latitude: latitude - longitude: longitude - time: time - variable: ETAN - slices: - time: 1 - i: 30 - j: 30 - - - id: ECCO_v4_r4_EXFqnet_latlon - path: /data/datasets/ecco-distribution-archive/nexus-ingest/EXFqnetv4r4/*.nc - priority: 6 - forward-processing-priority: 7 - projection: Grid - dimensionNames: - latitude: latitude - longitude: longitude - time: time - variable: EXFqnet - slices: - time: 1 - i: 30 - j: 30 - - - id: ECCO_v4_r4_EXFtaue_latlon - path: /data/datasets/ecco-distribution-archive/nexus-ingest/EXFtauev4r4/*.nc - priority: 6 - forward-processing-priority: 7 - projection: Grid - dimensionNames: - latitude: latitude - longitude: longitude - time: time - variable: EXFtaue - slices: - time: 1 - i: 30 - j: 30 - - - id: ECCO_v4_r4_EXFtaun_latlon - path: /data/datasets/ecco-distribution-archive/nexus-ingest/EXFtaunv4r4/*.nc - priority: 6 - forward-processing-priority: 7 - projection: Grid - dimensionNames: - latitude: latitude - longitude: longitude - time: time - variable: EXFtaun - slices: - time: 1 - i: 30 - j: 30 - - - id: ECCO_v4_r4_OBP_latlon - path: /data/datasets/ecco-distribution-archive/nexus-ingest/OBPv4r4/*.nc - priority: 6 - forward-processing-priority: 7 - projection: Grid - dimensionNames: - latitude: latitude - longitude: longitude - time: time - variable: OBP - slices: - time: 1 - i: 30 - j: 30 - - - id: ECCO_v4_r4_OBPNOPAB_latlon - path: /data/datasets/ecco-distribution-archive/nexus-ingest/OBPNOPABv4r4/*.nc - priority: 6 - forward-processing-priority: 7 - projection: Grid - dimensionNames: - latitude: latitude - longitude: longitude - time: time - variable: OBPNOPAB - slices: - time: 1 - i: 30 - j: 30 - - - id: ECCO_v4_r4_oceQnet_latlon - path: /data/datasets/ecco-distribution-archive/nexus-ingest/oceQnetv4r4/*.nc - priority: 6 - forward-processing-priority: 7 - projection: Grid - dimensionNames: - latitude: latitude - longitude: longitude - time: time - variable: oceQnet - slices: - time: 1 - i: 30 - j: 30 - - - id: ECCO_v4_r4_oceTAUE_latlon - path: /data/datasets/ecco-distribution-archive/nexus-ingest/oceTAUEv4r4/*.nc - priority: 6 - forward-processing-priority: 7 - projection: Grid - dimensionNames: - latitude: latitude - longitude: longitude - time: time - variable: oceTAUE - slices: - time: 1 - i: 30 - j: 30 - - - id: ECCO_v4_r4_oceTAUN_latlon - path: /data/datasets/ecco-distribution-archive/nexus-ingest/oceTAUNv4r4/*.nc - priority: 6 - forward-processing-priority: 7 - projection: Grid - dimensionNames: - latitude: latitude - longitude: longitude - time: time - variable: oceTAUN - slices: - time: 1 - i: 30 - j: 30 - - - id: ECCO_v4_r4_PHIBOT_latlon - path: /data/datasets/ecco-distribution-archive/nexus-ingest/PHIBOTv4r4/*.nc - priority: 6 - forward-processing-priority: 7 - projection: Grid - dimensionNames: - latitude: latitude - longitude: longitude - time: time - variable: PHIBOT - slices: - time: 1 - i: 30 - j: 30 - - - id: ECCO_v4_r4_SALT_SURFACE_latlon - path: /data/datasets/ecco-distribution-archive/nexus-ingest/SALTv4r4_layer_0/*.nc - priority: 6 - forward-processing-priority: 7 - projection: Grid - dimensionNames: - latitude: latitude - longitude: longitude - time: time - variable: SALT - slices: - time: 1 - i: 30 - j: 30 - - - id: ECCO_v4_r4_SIarea_latlon - path: /data/datasets/ecco-distribution-archive/nexus-ingest/SIareav4r4/*.nc - priority: 6 - forward-processing-priority: 7 - projection: Grid - dimensionNames: - latitude: latitude - longitude: longitude - time: time - variable: SIarea - slices: - time: 1 - i: 30 - j: 30 - - - id: ECCO_v4_r4_SIheff_latlon - path: /data/datasets/ecco-distribution-archive/nexus-ingest/SIheffv4r4/*.nc - priority: 6 - forward-processing-priority: 7 - projection: Grid - dimensionNames: - latitude: latitude - longitude: longitude - time: time - variable: SIheff - slices: - time: 1 - i: 30 - j: 30 - - - id: ECCO_v4_r4_SIhsnow_latlon - path: /data/datasets/ecco-distribution-archive/nexus-ingest/SIhsnowv4r4/*.nc - priority: 6 - forward-processing-priority: 7 - projection: Grid - dimensionNames: - latitude: latitude - longitude: longitude - time: time - variable: SIhsnow - slices: - time: 1 - i: 30 - j: 30 - - - id: ECCO_v4_r4_SSH_latlon - path: /data/datasets/ecco-distribution-archive/nexus-ingest/SSHv4r4/*.nc - priority: 6 - forward-processing-priority: 7 - projection: Grid - dimensionNames: - latitude: latitude - longitude: longitude - time: time - variable: SSH - slices: - time: 1 - i: 30 - j: 30 - - - id: ECCO_v4_r4_SSHDYN_latlon - path: /data/datasets/ecco-distribution-archive/nexus-ingest/SSHDYNv4r4/*.nc - priority: 6 - forward-processing-priority: 7 - projection: Grid - dimensionNames: - latitude: latitude - longitude: longitude - time: time - variable: SSHDYN - slices: - time: 1 - i: 30 - j: 30 - - - id: ECCO_v4_r4_TFLUX_latlon - path: /data/datasets/ecco-distribution-archive/nexus-ingest/TFLUXv4r4/*.nc - priority: 6 - forward-processing-priority: 7 - projection: Grid - dimensionNames: - latitude: latitude - longitude: longitude - time: time - variable: TFLUX - slices: - time: 1 - i: 30 - j: 30 - - - id: ECCO_v4_r4_THETA_SURFACE_latlon - path: /data/datasets/ecco-distribution-archive/nexus-ingest/THETAv4r4_layer_0/*.nc - priority: 6 - forward-processing-priority: 7 - projection: Grid - dimensionNames: - latitude: latitude - longitude: longitude - time: time - variable: THETA - slices: - time: 1 - i: 30 - j: 30 - - - id: ECCO_v4_r4_UVELMASS_latlon - path: /data/datasets/ecco-distribution-archive/nexus-ingest/UVELMASSv4r4/*.nc - priority: 6 - forward-processing-priority: 7 - projection: Grid - dimensionNames: - latitude: latitude - longitude: longitude - depth: Z - time: time - variable: UVELMASS - slices: - time: 1 - i: 30 - j: 30 - k: 1 - - - id: eccov4r4-obp - path: /data/datasets/ECCOv4r4/nctiles_daily/OBP/*.nc - priority: 6 - forward-processing-priority: 7 - projection: ECCO - dimensionNames: - latitude: YC - longitude: XC - time: time - tile: tile - variable: OBP - slices: - time: 1 - tile: 1 - i: 30 - j: 30 - - - id: mur25-jpl-l4-glob-v41-analysed-sst - path: /data/datasets/MUR25_L4_GLOB_V4.1/daily_data/*.nc - priority: 2 - forward-processing-priority: 5 - projection: Grid - dimensionNames: - latitude: lat - longitude: lon - time: time - variable: analysed_sst - slices: - time: 1 - lat: 30 - lon: 30 - - - - id: smap-l2b-sss-daily-smap-sss - path: /data/datasets/SMAP_L2B_SSS/daily_data/SMAP_L2B_SSS_*.h5 - priority: 4 - forward-processing-priority: 5 - projection: Grid - dimensionNames: - latitude: lat - longitude: lon - time: row_time - variable: smap_sss - slices: - phony_dim_0: 30 - phony_dim_1: 30 - - - id: modis-l3-aqua-11um-v2014-0-4km-daily-sst - path: /data/datasets/MODIS_L3_AQUA_11UM_V2014.0_4KM_DAILY/daily_data/A*.L3m_DAY_SST_sst_4km.nc - priority: 3 - forward-processing-priority: 5 - projection: Grid - dimensionNames: - latitude: lat - longitude: lon - variable: sst - slices: - lat: 30 - lon: 30 - - - id: avhrr-l4-glob-v2-daily-ncei-ghrsst-sstblend-avhrr-oi-glob-v020-fv020 - path: /data/datasets/AVHRR_L4_GLOB_V2/daily_data/*-NCEI-L4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc - priority: 3 - forward-processing-priority: 5 - projection: Grid - dimensionNames: - latitude: lat - longitude: lon - time: time - variable: analysed_sst - slices: - lat: 30 - lon: 30 - time: 1 - - - id: avhrr-l4-glob-v2-clim-seasonal-ncei-ghrsst-sstblend-avhrr-oi-glob-v020-fv020 - path: /data/datasets/AVHRR_L4_GLOB_V2/climatology_seasonal/ncei_*.nc - priority: 1 - forward-processing-priority: 5 - projection: Grid - dimensionNames: - latitude: lat - longitude: lon - variable: mean_sst - slices: - lat: 30 - lon: 30 - - - id: HLS-test - path: /data/datasets/HLS/HLS.S30.T06VUM.2015319.v1.4.h5 - priority: 1 - projection: Grid - dimensionNames: - latitude: YDim_Grid - longitude: XDim_Grid - variable: B01 - slices: - YDim_Grid: 30 - XDim_Grid: 30 -kind: ConfigMap -metadata: - creationTimestamp: "2021-03-01T17:33:54Z" - managedFields: - - apiVersion: v1 - fieldsType: FieldsV1 - fieldsV1: - f:data: - .: {} - f:collections.yml: {} - manager: kubectl-create - operation: Update - time: "2021-03-01T17:33:54Z" - name: collections-config - namespace: default - resourceVersion: "72403500" - selfLink: /api/v1/namespaces/default/configmaps/collections-config - uid: ec7ba7cd-07ba-464f-b30d-89d2585f8f42 diff --git a/granule_ingester/conda-requirements.txt b/granule_ingester/conda-requirements.txt index 7febdaf..98aec50 100644 --- a/granule_ingester/conda-requirements.txt +++ b/granule_ingester/conda-requirements.txt @@ -8,6 +8,6 @@ pyyaml==5.3.1 aiohttp==3.6.2 tenacity requests==2.27.1 - +pathtools==0.1.2 numcodecs==0.9.1 zarr diff --git a/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py b/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py index b80e176..53519c1 100644 --- a/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py +++ b/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py @@ -41,7 +41,7 @@ async def __aenter__(self): async def __aexit__(self, type, value, traceback): if self._granule_temp_file: self._granule_temp_file.close() - # @TODO causes error. Test to see if Tuple change works with code + # @TODO causes error. Test to see if this Tuple change works with code async def open(self) -> Tuple[xr.Dataset, str]: resource_url = parse.urlparse(self._resource) if resource_url.scheme == 's3': diff --git a/granule_ingester/granule_ingester/pipeline/ZarrPipeline.py b/granule_ingester/granule_ingester/pipeline/ZarrPipeline.py new file mode 100644 index 0000000..bb8a95e --- /dev/null +++ b/granule_ingester/granule_ingester/pipeline/ZarrPipeline.py @@ -0,0 +1,211 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import pickle +import time +from multiprocessing import Manager +from typing import List + +import xarray as xr +import yaml + +from aiomultiprocess import Pool +from aiomultiprocess.types import ProxyException +from granule_ingester.exceptions import PipelineBuildingError +from granule_ingester.granule_loaders import GranuleLoader +from granule_ingester.pipeline.Modules import \ + modules as processor_module_mappings +from granule_ingester.processors.TileProcessor import TileProcessor +from granule_ingester.slicers import TileSlicer +from granule_ingester.writers import DataStore, MetadataStore +from nexusproto import DataTile_pb2 as nexusproto +from tblib import pickling_support + +logger = logging.getLogger(__name__) + +# The aiomultiprocessing library has a bug where it never closes out the pool if there are more than a certain +# number of items to process. The exact number is unknown, but 2**8-1 is safe. +MAX_CHUNK_SIZE = 2 ** 8 - 1 + +_worker_data_store: DataStore = None +_worker_metadata_store: MetadataStore = None +_worker_processor_list: List[TileProcessor] = None +_worker_dataset = None +_shared_memory = None + + +#def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory, shared_memory): + #global _worker_data_store + #global _worker_metadata_store + #global _worker_processor_list + #global _worker_dataset + #global _shared_memory + + ## _worker_data_store and _worker_metadata_store open multiple TCP sockets from each worker process; + ## however, these sockets will be automatically closed by the OS once the worker processes die so no need to worry. + #_worker_data_store = data_store_factory() + #_worker_metadata_store = metadata_store_factory() + #_worker_processor_list = processor_list + #_worker_dataset = dataset + #_shared_memory = shared_memory + + +#async def _process_tile_in_worker(serialized_input_tile: str): + #try: + #logger.debug(f'serialized_input_tile: {serialized_input_tile}') + #input_tile = nexusproto.NexusTile.FromString(serialized_input_tile) + #logger.debug(f'_recurse params: _worker_processor_list = {_worker_processor_list}, _worker_dataset = {_worker_dataset}, input_tile = {input_tile}') + #processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile) + + #if processed_tile: + #await _worker_data_store.save_data(processed_tile) + #await _worker_metadata_store.save_metadata(processed_tile) + #except Exception as e: + #pickling_support.install(e) + #_shared_memory.error = pickle.dumps(e) + #raise + + +def _recurse(processor_list: List[TileProcessor], + dataset: xr.Dataset, + input_tile: nexusproto.NexusTile) -> nexusproto.NexusTile: + if len(processor_list) == 0: + return input_tile + output_tile = processor_list[0].process(tile=input_tile, dataset=dataset) + return _recurse(processor_list[1:], dataset, output_tile) if output_tile else None + +class Pipeline: + def __init__(self, + granule_loader: GranuleLoader, + slicer: TileSlicer, + data_store_factory, + metadata_store_factory, + tile_processors: List[TileProcessor], + max_concurrency: int): + self._granule_loader = granule_loader + self._tile_processors = tile_processors + self._slicer = slicer + self._data_store_factory = data_store_factory + self._metadata_store_factory = metadata_store_factory + self._max_concurrency = max_concurrency + + # Create a SyncManager so that we can to communicate exceptions from the + # worker processes back to the main process. + self._manager = Manager() + + def __del__(self): + self._manager.shutdown() + + @classmethod + def from_string(cls, config_str: str, data_store_factory, metadata_store_factory, max_concurrency: int = 16): + logger.debug(f'config_str: {config_str}') + try: + config = yaml.load(config_str, yaml.FullLoader) + cls._validate_config(config) + return cls._build_pipeline(config, + data_store_factory, + metadata_store_factory, + processor_module_mappings, + max_concurrency) + + except yaml.scanner.ScannerError: + raise PipelineBuildingError("Cannot build pipeline because of a syntax error in the YAML.") + + # TODO: this method should validate the config against an actual schema definition + @staticmethod + def _validate_config(config: dict): + if type(config) is not dict: + raise PipelineBuildingError("Cannot build pipeline; the pipeline configuration that " + + "was received is not valid YAML.") + + @classmethod + def _build_pipeline(cls, + config: dict, + data_store_factory, + metadata_store_factory, + module_mappings: dict, + max_concurrency: int): + try: + granule_loader = GranuleLoader(**config['granule']) + + slicer_config = config['slicer'] + slicer = cls._parse_module(slicer_config, module_mappings) + + tile_processors = [] + for processor_config in config['processors']: + module = cls._parse_module(processor_config, module_mappings) + tile_processors.append(module) + + return cls(granule_loader, + slicer, + data_store_factory, + metadata_store_factory, + tile_processors, + max_concurrency) + except PipelineBuildingError: + raise + except KeyError as e: + raise PipelineBuildingError(f"Cannot build pipeline because {e} is missing from the YAML.") + except Exception as e: + logger.exception(e) + raise PipelineBuildingError(f"Cannot build pipeline because of the following error: {e}") + + @classmethod + def _parse_module(cls, module_config: dict, module_mappings: dict): + module_name = module_config.pop('name') + try: + module_class = module_mappings[module_name] + logger.debug("Loaded processor {}.".format(module_class)) + processor_module = module_class(**module_config) + except KeyError: + raise PipelineBuildingError(f"'{module_name}' is not a valid processor.") + except Exception as e: + raise PipelineBuildingError(f"Parsing module '{module_name}' failed because of the following error: {e}") + + return processor_module + + async def run(self): + async with self._granule_loader as (dataset, granule_name): + start = time.perf_counter() + + shared_memory = self._manager.Namespace() + async with Pool(initializer=_init_worker, + initargs=(self._tile_processors, + dataset, + self._data_store_factory, + self._metadata_store_factory, + shared_memory), + maxtasksperchild=self._max_concurrency, + childconcurrency=self._max_concurrency) as pool: + serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in + self._slicer.generate_tiles(dataset, granule_name)] + # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that + # a queue can't have more than 2**15-1 tasks. So, we have to batch it. + for chunk in self._chunk_list(serialized_tiles, MAX_CHUNK_SIZE): + try: + await pool.map(_process_tile_in_worker, chunk) + except ProxyException: + pool.terminate() + # Give the shared memory manager some time to write the exception + # await asyncio.sleep(1) + raise pickle.loads(shared_memory.error) + + end = time.perf_counter() + logger.info("Pipeline finished in {} seconds".format(end - start)) + + @staticmethod + def _chunk_list(items, chunk_size: int): + return [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)] diff --git a/granule_ingester/granule_ingester/processors/ZarrProcessor.py b/granule_ingester/granule_ingester/processors/ZarrProcessor.py index 6d5e59e..da5e673 100644 --- a/granule_ingester/granule_ingester/processors/ZarrProcessor.py +++ b/granule_ingester/granule_ingester/processors/ZarrProcessor.py @@ -12,13 +12,103 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import datetime +import json +import logging +from abc import ABC, abstractmethod +from time import time +from typing import Dict, Union, List -from nexusproto.serialization import from_shaped_array, to_shaped_array -import xarray as xr +import numpy as np +import xarray as xr +from nexusproto import DataTile_pb2 as nexusproto + +from granule_ingester.exceptions import TileProcessingError +from granule_ingester.processors.TileProcessor import TileProcessor + +logger = logging.getLogger(__name__) -# TODO: make this an informal interface, not an abstract class class ZarrProcessor(): + + def __init__(self, variable: Union[str, list], latitude: str, longitude: str, time=None, *args, **kwargs): + try: + # TODO variable in test cases are being passed in as just lists, and is not passable through json.loads() + self.variable = json.loads(variable) + except Exception as e: + logger.exception(f'failed to convert literal list to python list. using as a single variable: {variable}') + self.variable = variable + if isinstance(self.variable, list) and len(self.variable) < 1: + logger.error(f'variable list is empty: {self}') + raise RuntimeError(f'variable list is empty: {self.variable}') + self.latitude = latitude + self.longitude = longitude + self.time = time + + + def process(self, granule: xr.Dataset, process_list: List, *args, **kwargs): + for processes in process_list['processors']: + logger.debug(f'Reading Processor: {type(processes)}') + try: + # grab ingestion message's processors + processName = processes['name'] + # TODO process granule via methods in this class + except Exception as e: + raise TileProcessingError(f"Could not generate tiles from the granule because of the following error: {e}.") + # returns granule. Passed into writers, which then push into S3 + return granule + + @classmethod + def _parse_input(cls, the_input_tile, temp_dir): + specs = the_input_tile.summary.section_spec + tile_specifications = cls._convert_spec_to_slices(specs) + + file_path = the_input_tile.summary.granule + file_path = file_path[len('file:'):] if file_path.startswith('file:') else file_path + + return tile_specifications, file_path + + @staticmethod + def _slices_for_variable(variable: xr.DataArray, dimension_to_slice: Dict[str, slice]) -> Dict[str, slice]: + return {dim_name: dimension_to_slice[dim_name] for dim_name in variable.dims} + + @staticmethod + def _convert_spec_to_slices(spec): + dim_to_slice = {} + for dimension in spec.split(','): + name, start, stop = dimension.split(':') + dim_to_slice[name] = slice(int(start), int(stop)) + + return dim_to_slice + + @staticmethod + def _convert_to_timestamp(times: xr.DataArray) -> xr.DataArray: + if times.dtype == np.float32: + return times + epoch = np.datetime64(datetime.datetime(1970, 1, 1, 0, 0, 0)) + return ((times - epoch) / 1e9).astype(int) - def process(self, ds: xr.Dataset, *args, **kwargs) -> None: - pass \ No newline at end of file + # TODO passing in granule with single variable or with all? NEEDS TESTING + def kelvinToCelsius(self, granule: xr.Dataset): + for dataVar in self.variable: + logger.debug(f'converting kelvin to celsius for variable {dataVar}') + data_var = granule[dataVar] + + var_celsius = data_var - 273.15 + var_attrs = data_var.attrs + var_celsius.attrs = var_attrs + var_celsius.attrs['units'] = 'celsius' + granule[dataVar] = var_celsius + + # TODO needs testing + def forceAscendingLatitude(self, granule: xr.Dataset): + granule = granule.sortby('lat', ascending=False) + + # TODO below methods needs implementation + def GenerateTileId(self, granule: xr.Dataset): + pass + + def Subtract180FromLongitude(self, granule: xr.Dataset): + pass + + diff --git a/granule_ingester/granule_ingester/processors/reading_processors/GridZarrReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/GridZarrReadingProcessor.py deleted file mode 100644 index ed74461..0000000 --- a/granule_ingester/granule_ingester/processors/reading_processors/GridZarrReadingProcessor.py +++ /dev/null @@ -1,63 +0,0 @@ -from typing import Dict - -import cftime -import numpy as np -import xarray as xr -from nexusproto import DataTile_pb2 as nexusproto -from nexusproto.serialization import to_shaped_array - -from granule_ingester.processors.reading_processors.ZarrReadingProcessor import ZarrReadingProcessor - - -class GridZarrReadingProcessor(ZarrReadingProcessor): - def __init__(self, variable, latitude, longitude, depth=None, time=None, **kwargs): - super().__init__(variable, latitude, longitude, **kwargs) - if isinstance(variable, list) and len(variable) != 1: - raise RuntimeError(f'TimeSeriesReadingProcessor does not support multiple variable: {variable}') - self.depth = depth - self.time = time - - # TODO generate after L2 v L4 meeting - def process(self, tile, dataset: xr.Dataset, *args, **kwargs): - pass - -# def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile): - #data_variable = self.variable[0] if isinstance(self.variable, list) else self.variable - #new_tile = nexusproto.GridTile() - - #lat_subset = ds[self.latitude][type(self)._slices_for_variable(ds[self.latitude], dimensions_to_slices)] - #lon_subset = ds[self.longitude][type(self)._slices_for_variable(ds[self.longitude], dimensions_to_slices)] - #lat_subset = np.ma.filled(np.squeeze(lat_subset), np.NaN) - #lon_subset = np.ma.filled(np.squeeze(lon_subset), np.NaN) - - #data_subset = ds[data_variable][type(self)._slices_for_variable(ds[data_variable], - #dimensions_to_slices)] - #data_subset = np.ma.filled(np.squeeze(data_subset), np.NaN) - - #if self.depth: - #depth_dim, depth_slice = list(type(self)._slices_for_variable(ds[self.depth], - #dimensions_to_slices).items())[0] - #depth_slice_len = depth_slice.stop - depth_slice.start - #if depth_slice_len > 1: - #raise RuntimeError( - #"Depth slices must have length 1, but '{dim}' has length {dim_len}.".format(dim=depth_dim, - #dim_len=depth_slice_len)) - #new_tile.depth = ds[self.depth][depth_slice].item() - - #if self.time: - #time_slice = dimensions_to_slices[self.time] - #time_slice_len = time_slice.stop - time_slice.start - #if time_slice_len > 1: - #raise RuntimeError( - #"Time slices must have length 1, but '{dim}' has length {dim_len}.".format(dim=self.time, - #dim_len=time_slice_len)) - #if isinstance(ds[self.time][time_slice.start].item(), cftime.datetime): - #ds[self.time] = ds.indexes[self.time].to_datetimeindex() - #new_tile.time = int(ds[self.time][time_slice.start].item() / 1e9) - - #new_tile.latitude.CopyFrom(to_shaped_array(lat_subset)) - #new_tile.longitude.CopyFrom(to_shaped_array(lon_subset)) - #new_tile.variable_data.CopyFrom(to_shaped_array(data_subset)) - - #input_tile.tile.grid_tile.CopyFrom(new_tile) - #return input_tile diff --git a/granule_ingester/granule_ingester/processors/reading_processors/ZarrReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/ZarrReadingProcessor.py deleted file mode 100644 index d1f0b14..0000000 --- a/granule_ingester/granule_ingester/processors/reading_processors/ZarrReadingProcessor.py +++ /dev/null @@ -1,55 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import datetime -import json -import logging -from abc import ABC, abstractmethod -from typing import Dict, Union - -import numpy as np -import xarray as xr - - -from granule_ingester.processors.ZarrProcessor import ZarrProcessor - -logger = logging.getLogger(__name__) - - -class ZarrReadingProcessor(ZarrProcessor, ABC): - - def __init__(self, variable: Union[str, list], latitude: str, longitude: str, *args, **kwargs): - try: - # TODO variable in test cases are being passed in as just lists, and is not passable through json.loads() - self.variable = json.loads(variable) - except Exception as e: - logger.exception(f'failed to convert literal list to python list. using as a single variable: {variable}') - self.variable = variable - if isinstance(self.variable, list) and len(self.variable) < 1: - logger.error(f'variable list is empty: {ZarrReadingProcessor}') - raise RuntimeError(f'variable list is empty: {self.variable}') - self.latitude = latitude - self.longitude = longitude - - @abstractmethod - def process(self, tile, dataset: xr.Dataset, *args, **kwargs): - pass - - @staticmethod - def _convert_to_timestamp(times: xr.DataArray) -> xr.DataArray: - if times.dtype == np.float32: - return times - epoch = np.datetime64(datetime.datetime(1970, 1, 1, 0, 0, 0)) - return ((times - epoch) / 1e9).astype(int) diff --git a/granule_ingester/granule_ingester/processors/reading_processors/__init__.py b/granule_ingester/granule_ingester/processors/reading_processors/__init__.py index 095aa0b..84cd8f0 100644 --- a/granule_ingester/granule_ingester/processors/reading_processors/__init__.py +++ b/granule_ingester/granule_ingester/processors/reading_processors/__init__.py @@ -5,5 +5,3 @@ from granule_ingester.processors.reading_processors.SwathReadingProcessor import SwathReadingProcessor from granule_ingester.processors.reading_processors.TileReadingProcessor import TileReadingProcessor from granule_ingester.processors.reading_processors.TimeSeriesReadingProcessor import TimeSeriesReadingProcessor -from granule_ingester.processors.reading_processors.LazyLoadProcessor import LazyLoadProcessor -from granule_ingester.processors.reading_processors.ZarrReadingProcessor import ZarrReadingProcessor From d6dfd7bc41f1acb5a13aabeae53888cb945f98a7 Mon Sep 17 00:00:00 2001 From: drfok Date: Fri, 19 Aug 2022 09:49:17 -0700 Subject: [PATCH 5/8] Local store test comment out + comments --- .../tests/writers/test_LocalStore.py | 110 +++++++++--------- 1 file changed, 56 insertions(+), 54 deletions(-) diff --git a/granule_ingester/tests/writers/test_LocalStore.py b/granule_ingester/tests/writers/test_LocalStore.py index a585679..da54960 100644 --- a/granule_ingester/tests/writers/test_LocalStore.py +++ b/granule_ingester/tests/writers/test_LocalStore.py @@ -11,69 +11,69 @@ from granule_ingester.processors.reading_processors import LazyLoadProcessor from granule_ingester.writers import LocalStore -from dask.diagnostics import ProgressBar -def _build_solr_doc(self, ds: netCDF4._netCDF4) -> Dict: - summary: TileSummary = tile.summary - bbox: TileSummary.BBox = summary.bbox - stats: TileSummary.DataStats = summary.stats +#def _build_solr_doc(self, ds: netCDF4._netCDF4) -> Dict: + #summary: TileSummary = tile.summary + #bbox: TileSummary.BBox = summary.bbox + #stats: TileSummary.DataStats = summary.stats - min_time = datetime.strftime(datetime.utcfromtimestamp(stats.min_time), self.iso) - max_time = datetime.strftime(datetime.utcfromtimestamp(stats.max_time), self.iso) - day_of_year = datetime.utcfromtimestamp(stats.min_time).timetuple().tm_yday - geo = self.determine_geo(bbox) + #min_time = datetime.strftime(datetime.utcfromtimestamp(stats.min_time), self.iso) + #max_time = datetime.strftime(datetime.utcfromtimestamp(stats.max_time), self.iso) + #day_of_year = datetime.utcfromtimestamp(stats.min_time).timetuple().tm_yday + #geo = self.determine_geo(bbox) - granule_file_name: str = Path(summary.granule).name # get base filename + #granule_file_name: str = Path(summary.granule).name # get base filename - tile_type = tile.tile.WhichOneof("tile_type") - tile_data = getattr(tile.tile, tile_type) + #tile_type = tile.tile.WhichOneof("tile_type") + #tile_data = getattr(tile.tile, tile_type) - var_names = json.loads(summary.data_var_name) - standard_names = [] - if summary.standard_name: - standard_names = json.loads(summary.standard_name) - if not isinstance(var_names, list): - var_names = [var_names] - if not isinstance(standard_names, list): - standard_names = [standard_names] + #var_names = json.loads(summary.data_var_name) + #standard_names = [] + #if summary.standard_name: + #standard_names = json.loads(summary.standard_name) + #if not isinstance(var_names, list): + #var_names = [var_names] + #if not isinstance(standard_names, list): + #standard_names = [standard_names] - input_document = { - 'table_s': self.TABLE_NAME, - 'geo': geo, - 'id': summary.tile_id, - 'solr_id_s': '{ds_name}!{tile_id}'.format(ds_name=summary.dataset_name, tile_id=summary.tile_id), - 'sectionSpec_s': summary.section_spec, - 'dataset_s': summary.dataset_name, - 'granule_s': granule_file_name, - 'tile_var_name_ss': var_names, - 'day_of_year_i': day_of_year, - 'tile_min_lon': bbox.lon_min, - 'tile_max_lon': bbox.lon_max, - 'tile_min_lat': bbox.lat_min, - 'tile_max_lat': bbox.lat_max, - 'tile_depth': tile_data.depth, - 'tile_min_time_dt': min_time, - 'tile_max_time_dt': max_time, - 'tile_min_val_d': stats.min, - 'tile_max_val_d': stats.max, - 'tile_avg_val_d': stats.mean, - 'tile_count_i': int(stats.count) - } + #input_document = { + #'table_s': self.TABLE_NAME, + #'geo': geo, + #'id': summary.tile_id, + #'solr_id_s': '{ds_name}!{tile_id}'.format(ds_name=summary.dataset_name, tile_id=summary.tile_id), + #'sectionSpec_s': summary.section_spec, + #'dataset_s': summary.dataset_name, + #'granule_s': granule_file_name, + #'tile_var_name_ss': var_names, + #'day_of_year_i': day_of_year, + #'tile_min_lon': bbox.lon_min, + #'tile_max_lon': bbox.lon_max, + #'tile_min_lat': bbox.lat_min, + #'tile_max_lat': bbox.lat_max, + #'tile_depth': tile_data.depth, + #'tile_min_time_dt': min_time, + #'tile_max_time_dt': max_time, + #'tile_min_val_d': stats.min, + #'tile_max_val_d': stats.max, + #'tile_avg_val_d': stats.mean, + #'tile_count_i': int(stats.count) + #} - for var_name, standard_name in zip(var_names, standard_names): - if standard_name: - input_document[f'{var_name}.tile_standard_name_s'] = standard_name + #for var_name, standard_name in zip(var_names, standard_names): + #if standard_name: + #input_document[f'{var_name}.tile_standard_name_s'] = standard_name - ecco_tile_id = getattr(tile_data, 'tile', None) - if ecco_tile_id: - input_document['ecco_tile'] = ecco_tile_id + #ecco_tile_id = getattr(tile_data, 'tile', None) + #if ecco_tile_id: + #input_document['ecco_tile'] = ecco_tile_id - for attribute in summary.global_attributes: - input_document[attribute.getName()] = attribute.getValues( - 0) if attribute.getValuesCount() == 1 else attribute.getValuesList() + #for attribute in summary.global_attributes: + #input_document[attribute.getName()] = attribute.getValues( + #0) if attribute.getValuesCount() == 1 else attribute.getValuesList() - return input_document + #return input_document +# testing local store LLZA def main(): reading_processor_TROPOMI = LazyLoadProcessor(variable='[methane_mixing_ratio]', latitude='lat', @@ -88,9 +88,11 @@ def main(): cdfDS = xr.open_dataset(granule_path_TROPOMI) chunkShape = (1,5,5) processed_granule = reading_processor_TROPOMI.process(cdfDS) - with ProgressBar(): - zarr_writer_TROPOMI.save_data(ds=processed_granule, cname='blosclz', + zarr_writer_TROPOMI.save_data(ds=processed_granule, cname='blosclz', clevel=9, shuffle=1, chunkShape=chunkShape) + print(xr.open_zarr(store_path)) + + # NOTE: zarr file is stored in test directory "local_zarr_store" if __name__ == "__main__": From 57f935574dd635a142436fb918e9eb78a91263af Mon Sep 17 00:00:00 2001 From: drfok Date: Fri, 19 Aug 2022 09:57:01 -0700 Subject: [PATCH 6/8] remove ZarrPipeline (inadequete implementation) --- .../granule_ingester/pipeline/ZarrPipeline.py | 211 ------------------ 1 file changed, 211 deletions(-) delete mode 100644 granule_ingester/granule_ingester/pipeline/ZarrPipeline.py diff --git a/granule_ingester/granule_ingester/pipeline/ZarrPipeline.py b/granule_ingester/granule_ingester/pipeline/ZarrPipeline.py deleted file mode 100644 index bb8a95e..0000000 --- a/granule_ingester/granule_ingester/pipeline/ZarrPipeline.py +++ /dev/null @@ -1,211 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import pickle -import time -from multiprocessing import Manager -from typing import List - -import xarray as xr -import yaml - -from aiomultiprocess import Pool -from aiomultiprocess.types import ProxyException -from granule_ingester.exceptions import PipelineBuildingError -from granule_ingester.granule_loaders import GranuleLoader -from granule_ingester.pipeline.Modules import \ - modules as processor_module_mappings -from granule_ingester.processors.TileProcessor import TileProcessor -from granule_ingester.slicers import TileSlicer -from granule_ingester.writers import DataStore, MetadataStore -from nexusproto import DataTile_pb2 as nexusproto -from tblib import pickling_support - -logger = logging.getLogger(__name__) - -# The aiomultiprocessing library has a bug where it never closes out the pool if there are more than a certain -# number of items to process. The exact number is unknown, but 2**8-1 is safe. -MAX_CHUNK_SIZE = 2 ** 8 - 1 - -_worker_data_store: DataStore = None -_worker_metadata_store: MetadataStore = None -_worker_processor_list: List[TileProcessor] = None -_worker_dataset = None -_shared_memory = None - - -#def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory, shared_memory): - #global _worker_data_store - #global _worker_metadata_store - #global _worker_processor_list - #global _worker_dataset - #global _shared_memory - - ## _worker_data_store and _worker_metadata_store open multiple TCP sockets from each worker process; - ## however, these sockets will be automatically closed by the OS once the worker processes die so no need to worry. - #_worker_data_store = data_store_factory() - #_worker_metadata_store = metadata_store_factory() - #_worker_processor_list = processor_list - #_worker_dataset = dataset - #_shared_memory = shared_memory - - -#async def _process_tile_in_worker(serialized_input_tile: str): - #try: - #logger.debug(f'serialized_input_tile: {serialized_input_tile}') - #input_tile = nexusproto.NexusTile.FromString(serialized_input_tile) - #logger.debug(f'_recurse params: _worker_processor_list = {_worker_processor_list}, _worker_dataset = {_worker_dataset}, input_tile = {input_tile}') - #processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile) - - #if processed_tile: - #await _worker_data_store.save_data(processed_tile) - #await _worker_metadata_store.save_metadata(processed_tile) - #except Exception as e: - #pickling_support.install(e) - #_shared_memory.error = pickle.dumps(e) - #raise - - -def _recurse(processor_list: List[TileProcessor], - dataset: xr.Dataset, - input_tile: nexusproto.NexusTile) -> nexusproto.NexusTile: - if len(processor_list) == 0: - return input_tile - output_tile = processor_list[0].process(tile=input_tile, dataset=dataset) - return _recurse(processor_list[1:], dataset, output_tile) if output_tile else None - -class Pipeline: - def __init__(self, - granule_loader: GranuleLoader, - slicer: TileSlicer, - data_store_factory, - metadata_store_factory, - tile_processors: List[TileProcessor], - max_concurrency: int): - self._granule_loader = granule_loader - self._tile_processors = tile_processors - self._slicer = slicer - self._data_store_factory = data_store_factory - self._metadata_store_factory = metadata_store_factory - self._max_concurrency = max_concurrency - - # Create a SyncManager so that we can to communicate exceptions from the - # worker processes back to the main process. - self._manager = Manager() - - def __del__(self): - self._manager.shutdown() - - @classmethod - def from_string(cls, config_str: str, data_store_factory, metadata_store_factory, max_concurrency: int = 16): - logger.debug(f'config_str: {config_str}') - try: - config = yaml.load(config_str, yaml.FullLoader) - cls._validate_config(config) - return cls._build_pipeline(config, - data_store_factory, - metadata_store_factory, - processor_module_mappings, - max_concurrency) - - except yaml.scanner.ScannerError: - raise PipelineBuildingError("Cannot build pipeline because of a syntax error in the YAML.") - - # TODO: this method should validate the config against an actual schema definition - @staticmethod - def _validate_config(config: dict): - if type(config) is not dict: - raise PipelineBuildingError("Cannot build pipeline; the pipeline configuration that " + - "was received is not valid YAML.") - - @classmethod - def _build_pipeline(cls, - config: dict, - data_store_factory, - metadata_store_factory, - module_mappings: dict, - max_concurrency: int): - try: - granule_loader = GranuleLoader(**config['granule']) - - slicer_config = config['slicer'] - slicer = cls._parse_module(slicer_config, module_mappings) - - tile_processors = [] - for processor_config in config['processors']: - module = cls._parse_module(processor_config, module_mappings) - tile_processors.append(module) - - return cls(granule_loader, - slicer, - data_store_factory, - metadata_store_factory, - tile_processors, - max_concurrency) - except PipelineBuildingError: - raise - except KeyError as e: - raise PipelineBuildingError(f"Cannot build pipeline because {e} is missing from the YAML.") - except Exception as e: - logger.exception(e) - raise PipelineBuildingError(f"Cannot build pipeline because of the following error: {e}") - - @classmethod - def _parse_module(cls, module_config: dict, module_mappings: dict): - module_name = module_config.pop('name') - try: - module_class = module_mappings[module_name] - logger.debug("Loaded processor {}.".format(module_class)) - processor_module = module_class(**module_config) - except KeyError: - raise PipelineBuildingError(f"'{module_name}' is not a valid processor.") - except Exception as e: - raise PipelineBuildingError(f"Parsing module '{module_name}' failed because of the following error: {e}") - - return processor_module - - async def run(self): - async with self._granule_loader as (dataset, granule_name): - start = time.perf_counter() - - shared_memory = self._manager.Namespace() - async with Pool(initializer=_init_worker, - initargs=(self._tile_processors, - dataset, - self._data_store_factory, - self._metadata_store_factory, - shared_memory), - maxtasksperchild=self._max_concurrency, - childconcurrency=self._max_concurrency) as pool: - serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in - self._slicer.generate_tiles(dataset, granule_name)] - # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that - # a queue can't have more than 2**15-1 tasks. So, we have to batch it. - for chunk in self._chunk_list(serialized_tiles, MAX_CHUNK_SIZE): - try: - await pool.map(_process_tile_in_worker, chunk) - except ProxyException: - pool.terminate() - # Give the shared memory manager some time to write the exception - # await asyncio.sleep(1) - raise pickle.loads(shared_memory.error) - - end = time.perf_counter() - logger.info("Pipeline finished in {} seconds".format(end - start)) - - @staticmethod - def _chunk_list(items, chunk_size: int): - return [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)] From aceb3451cb582aaaa57e9faeb8484e5f479d73b0 Mon Sep 17 00:00:00 2001 From: drfok Date: Fri, 19 Aug 2022 10:08:26 -0700 Subject: [PATCH 7/8] Zarr collection processor dependency fix --- .../collection_manager/services/ZarrCollectionProcessor.py | 6 +++--- collection_manager/collection_manager/services/__init__.py | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/collection_manager/collection_manager/services/ZarrCollectionProcessor.py b/collection_manager/collection_manager/services/ZarrCollectionProcessor.py index 4bc439e..734e252 100644 --- a/collection_manager/collection_manager/services/ZarrCollectionProcessor.py +++ b/collection_manager/collection_manager/services/ZarrCollectionProcessor.py @@ -10,16 +10,16 @@ from collection_manager.services.history_manager import (GranuleStatus, IngestionHistory) from collection_manager.services.history_manager.IngestionHistory import \ - ZarrIngestionHistoryBuilder + IngestionHistoryBuilder logger = logging.getLogger(__name__) SUPPORTED_FILE_EXTENSIONS = ['.nc', '.nc4', '.h5'] -# TODO generate tests for Zarr Collection Processor +# TODO generate tests () class ZarrCollectionProcessor: - def __init__(self, message_publisher: MessagePublisher, history_manager_builder: ZarrIngestionHistoryBuilder): + def __init__(self, message_publisher: MessagePublisher, history_manager_builder: IngestionHistoryBuilder): self._publisher = message_publisher self._history_manager_builder = history_manager_builder self._history_manager_cache: Dict[str, IngestionHistory] = {} diff --git a/collection_manager/collection_manager/services/__init__.py b/collection_manager/collection_manager/services/__init__.py index e51f5bc..f977400 100644 --- a/collection_manager/collection_manager/services/__init__.py +++ b/collection_manager/collection_manager/services/__init__.py @@ -15,6 +15,7 @@ from .CollectionProcessor import CollectionProcessor from .ZarrCollectionProcessor import ZarrCollectionProcessor +from .CollectionProcessor import CollectionProcessor from .CollectionWatcher import CollectionWatcher from .MessagePublisher import MessagePublisher from .S3Observer import S3Observer From fba74815923064e4b6ef1a4ad039463603ef8a67 Mon Sep 17 00:00:00 2001 From: drfok Date: Fri, 19 Aug 2022 10:27:04 -0700 Subject: [PATCH 8/8] README updates --- granule_ingester/README.md | 54 -------------------------------------- 1 file changed, 54 deletions(-) diff --git a/granule_ingester/README.md b/granule_ingester/README.md index 41d3953..786224c 100644 --- a/granule_ingester/README.md +++ b/granule_ingester/README.md @@ -35,60 +35,6 @@ From `incubator-sdap-ingester`, run: In order to successfully run the service, you will need to have a Cassandra, Solr, and RabbitMQ connection. Make sure to provide their respective credentials. -### How to Launch Service with Port-Forwarded Instances in `bigdata` - -**NOTE: Run each of the following in different terminals** - -#### Cassandra -Connect to bigdata - - ssh -L 9042:localhost:9042 bigdata - -Log in and then run the following - - ssh sdeploy@bigdata - -Now, port forward the service - - kubectl port-forward svc/nexus-cassandra -n sdap 9042:9042 - -#### Solr -Connect to bigdata - - ssh -L 8983:localhost:8984 bigdata - -Log in and then run the following - - ssh sdeploy@bigdata - -Now, port forward the service - - kubectl port-forward svc/nexus-solr-svc -n sdap 8984:8983 - -#### RabbitMQ -Connect to bigdata - - ssh -L 5672:localhost:5672 bigdata - -Log in and then run the following - - ssh sdeploy@bigdata - -Now, port forward the service - - kubectl port-forward svc/rabbitmq -n sdap 5672:5672 - -From `incubator-sdap-ingester`, run: - - python granule_ingester/granule_ingester/main.py --cassandra-username=cassandra --cassandra-password=cassandra - -## Running the tests -From `incubator-sdap-ingester`, run: - - $ cd common && python setup.py install - $ cd ../granule_ingester && python setup.py install - $ pip install pytest && pytest - ## Building the Docker image From `incubator-sdap-ingester`, run: