From 71cd6a84cc41728dd75b499b7870f78095dd382d Mon Sep 17 00:00:00 2001 From: Shelby Holden Date: Wed, 10 Jul 2024 11:49:05 -0400 Subject: [PATCH] POC More complete e2e test wip --- .gitignore | 2 + build.gradle | 23 +- .../parquet/ParquetReaderWriterWithAvro.java | 151 ++++++++++++ .../service/parquet/azure_datetime.parquet | Bin 0 -> 1624 bytes .../service/parquet/azure_original.parquet | Bin 0 -> 9450 bytes .../terra/service/parquet/bq_original.parquet | Bin 0 -> 8533 bytes .../resources/parquet/bq_original.parquet | Bin 0 -> 8533 bytes .../ParquetReaderWriterWithAvroTest.java | 229 ++++++++++++++++++ 8 files changed, 397 insertions(+), 8 deletions(-) create mode 100644 src/main/java/bio/terra/service/parquet/ParquetReaderWriterWithAvro.java create mode 100644 src/main/java/bio/terra/service/parquet/azure_datetime.parquet create mode 100644 src/main/java/bio/terra/service/parquet/azure_original.parquet create mode 100644 src/main/java/bio/terra/service/parquet/bq_original.parquet create mode 100644 src/main/resources/parquet/bq_original.parquet create mode 100644 src/test/java/bio/terra/service/parquet/ParquetReaderWriterWithAvroTest.java diff --git a/.gitignore b/.gitignore index 9da78c2942..4e31c9e179 100644 --- a/.gitignore +++ b/.gitignore @@ -66,3 +66,5 @@ gha-creds-*.json #tools output tools/setupResourceScripts/*_outputs.json tools/profileEndpoints/results_*.csv + +src/main/java/bio/terra/service/parquet/output/* diff --git a/build.gradle b/build.gradle index c42360de30..3f117d8a52 100644 --- a/build.gradle +++ b/build.gradle @@ -241,19 +241,26 @@ dependencies { // OpenTelemetry @WithSpan annotations: implementation 'io.opentelemetry.instrumentation:opentelemetry-instrumentation-annotations:2.2.0' - - testImplementation 'org.apache.parquet:parquet-common:1.12.0' - testImplementation 'org.apache.parquet:parquet-hadoop:1.12.0' - testImplementation 'org.apache.parquet:parquet-hadoop-bundle:1.12.0' - testImplementation 'org.apache.parquet:parquet-encoding:1.12.0' - testImplementation 'org.apache.parquet:parquet-column:1.12.0' - testImplementation ('org.apache.hadoop:hadoop-common:3.3.1') { + // Needed for HadoopInputFile.fromPath(path(signed uri)) + // Old version is needed - https://stackoverflow.com/questions/66713254/spark-wasb-and-jetty-11 + implementation 'org.eclipse.jetty:jetty-util:9.4.55.v20240627' + + // Needed for org.apache.parquet.avro.AvroParquetReader & AvroParquetWriter + implementation 'org.apache.parquet:parquet-avro:1.12.0' + // Needed for hadoop.fs.Path & hadoop.conf.Configuration + implementation ('org.apache.hadoop:hadoop-common:3.3.1') { exclude group: 'com.sun.jersey', module: 'jersey-core' exclude group: 'com.sun.jersey', module: 'jersey-servlet' exclude group: 'com.sun.jersey', module: 'jersey-json' exclude group: 'com.sun.jersey', module: 'jersey-server' } - testImplementation ('org.apache.hadoop:hadoop-azure:3.3.1') { + + implementation 'org.apache.parquet:parquet-common:1.12.0' + implementation 'org.apache.parquet:parquet-hadoop:1.12.0' + implementation 'org.apache.parquet:parquet-hadoop-bundle:1.12.0' + implementation 'org.apache.parquet:parquet-encoding:1.12.0' + implementation 'org.apache.parquet:parquet-column:1.12.0' + implementation ('org.apache.hadoop:hadoop-azure:3.3.1') { exclude group: 'com.sun.jersey', module: 'jersey-core' exclude group: 'com.sun.jersey', module: 'jersey-servlet' exclude group: 'com.sun.jersey', module: 'jersey-json' diff --git a/src/main/java/bio/terra/service/parquet/ParquetReaderWriterWithAvro.java b/src/main/java/bio/terra/service/parquet/ParquetReaderWriterWithAvro.java new file mode 100644 index 0000000000..4267b90799 --- /dev/null +++ b/src/main/java/bio/terra/service/parquet/ParquetReaderWriterWithAvro.java @@ -0,0 +1,151 @@ +package bio.terra.service.parquet; + +import bio.terra.model.TableDataType; +import com.azure.storage.blob.BlobUrlParts; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.OutputFile; + +public class ParquetReaderWriterWithAvro { + + private ParquetReaderWriterWithAvro() {} + + public static List readFromParquet(InputFile inputFile) throws IOException { + List records = new ArrayList<>(); + try (ParquetReader reader = + AvroParquetReader.builder(inputFile).build()) { + GenericData.Record record; + + while ((record = reader.read()) != null) { + records.add(record); + } + return records; + } + } + + public static List readFromParquet(Path filePath) throws IOException { + var config = new Configuration(); + config.set("parquet.avro.readInt96AsFixed", "true"); + InputFile inputFile = HadoopInputFile.fromPath(filePath, config); + return readFromParquet(inputFile); + } + + public static void writeToParquet( + List recordsToWrite, + List columnNames, + Map columnDataTypeMap, + OutputFile fileToWrite, + Schema schema, + Configuration config) + throws IOException { + try (ParquetWriter writer = + AvroParquetWriter.builder(fileToWrite) + .withSchema(schema) + .withConf(config) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .build()) { + + for (GenericData.Record record : recordsToWrite) { + var newRecord = new GenericData.Record(schema); + for (var column : columnNames) { + switch (columnDataTypeMap.get(column)) { + case DATETIME, TIMESTAMP: + // Convert from fixed length binary to a long representing microseconds since epoch + GenericData.Fixed dtFixed = (GenericData.Fixed) record.get(column); + if (dtFixed == null) { + newRecord.put(column, null); + } else { + var bytes = dtFixed.bytes(); + ByteBuffer bb = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN); + long timeOfDayNanos = bb.getLong(); + var julianDay = bb.getInt(); + // Given timeOfDayNanos and julianDay, convert to microseconds since epoch + Long microSeconds = + (long) (julianDay - 2440588) * 86400 * 1000000 + timeOfDayNanos / 1000; + newRecord.put(column, microSeconds); + } + break; + default: + newRecord.put(column, record.get(column)); + } + } + writer.write(newRecord); + } + } + } + + public static void simpleWriteToParquet( + List recordsToWrite, + OutputFile fileToWrite, + Schema schema, + Configuration config) + throws IOException { + try (ParquetWriter writer = + AvroParquetWriter.builder(fileToWrite) + .withSchema(schema) + .withConf(config) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) + .build()) { + + for (GenericData.Record record : recordsToWrite) { + writer.write(record); + } + } + } + + // Pulled this code from ParquetUtils.java + // In a real implementation, we would probably want to do something a little more direct in the + // flight rather than producing the signed url and then breaking it back down + public static Configuration getConfigFromSignedUri(String signedUrl) { + BlobUrlParts blobUrlParts = BlobUrlParts.parse(signedUrl); + Configuration config = new Configuration(); + config.set("parquet.avro.readInt96AsFixed", "true"); + config.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem"); + config.set( + "fs.azure.sas." + + blobUrlParts.getBlobContainerName() + + "." + + blobUrlParts.getAccountName() + + ".blob.core.windows.net", + blobUrlParts.getCommonSasQueryParameters().encode()); + return config; + } + + public static URI getURIFromSignedUrl(String signedUrl) { + BlobUrlParts blobUrlParts = BlobUrlParts.parse(signedUrl); + URI uri; + try { + uri = + new URI( + "wasbs://" + + blobUrlParts.getBlobContainerName() + + "@" + + blobUrlParts.getAccountName() + + ".blob.core.windows.net/" + + blobUrlParts.getBlobName()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + return uri; + } +} diff --git a/src/main/java/bio/terra/service/parquet/azure_datetime.parquet b/src/main/java/bio/terra/service/parquet/azure_datetime.parquet new file mode 100644 index 0000000000000000000000000000000000000000..7a73450b2da73a5c4c03dba78617a84877abf877 GIT binary patch literal 1624 zcmds2%Wl&^6rDP06r#RVF_tX_$&1#s5)=6`;~87s_z{Du09D(4&$)lo;h>9IP4BwTwwO|dFR{>Z9 z0BG1VT}BKk)GQyBhLfwFb z_SnzHh96)zL>AL+gk7p3Mkz*=G3+-3gK5UZkC~cj`Av*ilbSrA$=~$AM&Y;$52KoI zO!B$d_lU-jZZ#RgmTMwc4?RREqaF^uz_cj@P=w-xaaXVcD|HKqHEGShL8P>Hlm2;I z0)J|ArP65#)DQD@xFC}ql?P6x$FdsyU2|9QKcbc!VFNBK!R>38E-l7Ms|ekrNdzu$)ZfD=@P0^o#= z7cgEv&vBb6VufS6;pm!fQ;QPQ*?0UZ$KtdYoa|=D%j131AqKOFZkWzPXMc%9mSY+k z7ZGKSe~w_srp(}JbnfF{dqCA$QfcM)l04;Q!oSxp2(OKzJc;_njO#N&JQHrk^^ZI1 zrzzQg`lM^glc3as!sIf6ZSVE=eE>$M>tlXb;p&ATr~Q=f%~gT=eyI*CQ~9iL`NZua zN#k}W>m8;2iGF%wPyM#2eq7wVdSHUwB0#DPJ1^oif)xN4)`cr7TnDW>uQ}PfMA!_j z@Q^E&xp3vWy!Ah+Tm46cb2|x$U#ntGt@7nA02?o-Qw62D!pF@ZDw2a9-rd*bfwJ$kyYY#baVy{@u}eg*6u_D~*=8oIp2 PRdsIQ4Zol}{Gaj@({ya~ literal 0 HcmV?d00001 diff --git a/src/main/java/bio/terra/service/parquet/azure_original.parquet b/src/main/java/bio/terra/service/parquet/azure_original.parquet new file mode 100644 index 0000000000000000000000000000000000000000..dd35c736c07a81a9990ab5bcabb395d8f2b28eb4 GIT binary patch literal 9450 zcmds7U2Gdw7QSP_b!vPy5t|kO$7p_;zMIzkPl1+H&IuKB7a-Hcl5DM4P)kKk zGz2L-We94vpa=!2QWP}Z(2HuNSXOd6MwrEH+LQ_p;UPYY*cdmNy9G*a@>b*ru|jag zm~i-P1(DSqJRVMH5j2$GMi#1_YP+`7nrpXC&(#+&Y|WnA0q+FQDf6vnrdgkFx0YKc zI+>+v`{b!wCsS)Kbh~^PPHYvD!W}S6^JH{f!F=5@P=sCehn3Mj{Og zir^3eg_&Q-{JOxvn_9#u^ht7F5~ZA;)iotQn=eJ7uh#4=otbH!9x=z2ye1ntO_G)T zllj>=bY=63EP@f~M!sbCsQH|3$gqqAb4q~#Md_m4Xz!M=q+8+FV~)aOE7f+ry4az* zz8qpNhX&I1`n}}8U1e{^Qh@Rhj^ns#cE~b2j$t?ku^|@Ot*#W|2X>J4T|7pKlxZJc{+G;U8d>LtTRzwban5H$7IoFXj=ar15_H&PahC46+d*0csI?>< z0-~4~r9Rm1?qb_Jnd>*;VSHncgCFZK?gQ~U!(KOy{?{Mf-`!9Ce!mT3KhFBI90(LZ zG7YGhPi*+=9y#47U5o`W?ZXlw^5sL0l<|dnd#+tO;Wp7{*=Glm?c77;{}1w6bJGDi zeQUzubmT<6QM=u&UU`^&@$e?Bj^a^ji65or!{TRud7{_yxhNfRT5zC+oI9uGF}810 zJdSm}WcWm*RqePzxX7><8FFcoygA7`3plO5TLh|n0#8smb^#N|q>QY{X?Qkg#Nab> zmuCv+z4eH(w$H(6%ofAvFP|{%CxiLRwY}s&du@O5^3uy%p!jzZ#S^*=g5j#EMI0F>()axrr|pp4xMh6pdeN74~4N?W$O&^xCe$j#6x{DArR zURQV=>3U?W4$-44{<8e>7sWY0%H&fSAop+DmzhB0C74j(H3REovZ|#OMb^@?qAJ1O zJJK_W>+SG}eNA_OA2r4Hfqsc$FAe6uYgzJc)&{+o7e8}>YH$Z4z`peg|NAE&_``G% z!6@gsv-Ixi@s0zfV+^a0@i6U@A@|dTP$AGD0 zl8LpfXu)X5oN><8_DJ*o;||7SwlqGJ&ok`#!BGBeKRLVKM%l%Php9k$qc}?CMd+_) zBIpS&xro_oe_9H4-Fu|mDDpVk#Vo#fs#$B-=iN@c%CM^pIaegt3Xp(ovS=WeP(7er z-eaO7r{#1X2fU{{$kB#}&C62`7~_;p9~^Hm>>Go@akWf7EZcBUJ{*PuMRYkCw{hrS ztA`N!=bvs8?O#3MMEg5~0{sgI$oi9kf&PBHpK5fEsnH23uc&5lpPqm?-)d&WoM`k3 z_V;W;z->0@LF4LE4jT7a{QIDIEyTVySoz?CgXFtE+fZ!A*ydc{3^H_O<>X6Ygg$@u zCXjt~*nw=+6nVK*Z7#Vz#Z1sv50fj00&xpU&=f(;LN1;ZrM#%-Rk06dcDCc6+|73U z10JkCopG>Y9mait4W6LA`3(8;hz-~;XE^|j%X6BwFOstD}kiri9W(QQ2aocT$$;)`%7Z5(qLxO6}2EqY?n z6G|WO^rS09P36$@+>RZ)Jkw6Mp;9Un4EMQdllV^}_k5qz+C+U^qD#rf5`1D~_wT7P zO}1#!chFMqmZ-V@?#h{Dpu!~Cw$;JATk#`zw~9x8vpO5u@})5H8y2g`HmE0U#flHv z2IUxX*NP)#3l}oT9V|bPEm|`mzc%|P+d1tf10~R8+h(!shm&lKMBxk4seLbXf2txh z+e!HE|5CM6Ki;S%Y47~fk^uQm>vZyI`s0>lrFtAna>PbD>v~?c6hoD5wQiWTl5Sfyoz63LrDhpbOEwA%hOF1~ zmTVRZRavc>8qhLo1x=&#fJw1bsvC0ED%rALD(Q07P!zdl8-==|XsW8#9lmB!tD3ed z=k?-(tXK7#T+&QaF4~r=78I>u6bl$(5sOn46&}Qc*#erJ*bFXdBEJdhAZ!G_2_YI? zy$_L>2C{fKlZy>zq>%-yZME!`=6tJpe7>=Ok(dxy2jC$Wq9p!b%aNG^d@&qBfS0}y zeZ48b!wedSK6|OX^3=o4<4I`iy(JarD=K}N_XKbWY?|N9y>+6Tz8~RE-X|)>Ga+|5@QbfEI zA?tCnwu|bjAI$*l2q$rp8XY-Cmjpo)kQkYC-W?&L`-jPoLs^V6xj~eXhN{ix;(V>S zcx-9eG1ja&OI{y3+_N?V3*cWz!{d0og5yrWe0Kz)Z{Yhie7}V+Od7?bp5sX|Izx)l zagc=jwINR=`V}IllUW>beP{@L$mT^_6T~%vtnDFJhqHa?1t=tjB`j4y88o)3m{de6 z5QsU6C#Ucj_zR5>MwZU;#H7FA6NT@<;y&l~=S`7;i3d+2@@t9-Z0>agwpuOgsgBsO z#m3S0E_=~lvX|SS)-mzC01sIoA@7a!V-|=Zg;NzAaWG7uJN@#WF9)dnX`JTN9A_ia zMhkFkw7p)yLai9xFT@dfel&|?uGUiyh<0PCOZ687@dAjQChw=S9u2P*!sB1S*pK&b z0Y-uYbLIvH#uPm;{mhaO-xJ^=Cq(iY-ZDNgN(JmfT@7BHNbGFCiH_RgO|C*Pb+*go zLZda`vg^FJ&xv#7e1^P|$rchQP?4KubaCGXqmg=J(GKFYx{q8L3+2Rl-rKoaRJr)r zwBPgCh!kfn>J_Z*^>%sQ`i)tfa6NChzSy+dyb4^eyOJdrvtSiJr4ago%5wH9I3&O- z&t1Iy+P&w0x(ftB$c1tMADDtj>TCk^uy97CIGamZ`Fo+q*$w6dcT*6A=o&e5GlOKN zT}b_WFD-s5z(bxLCzoyllzt{5tOHSAJ40QbV5p>|gQ}qWP4(_Ua`NE+$W%Yy!a$j$ z4Af?f^(i5*lK;|JiQSIf>4wp>pR*Au&RWzTSljD`_1=Dd_I9=fN{I&CD!6mqjj7YO zkt??X06(1&>Ve*-V1*s3zqsv@%7l7<0O zP-v|yK8xy2n~_ z`Ph=(YSj23T#bmU5%O7)oGE7gl>9xLlps(U(|Ec92OB;3_|I|U6#o;Q``|IEoC!R^ z-G+JC&WtY65T(&0TF5M5)g8>jz{GCy*--*COP8}u}?&wxr2;WUg z7pOQc-2a;Y;L+OS_L7AUAv~Om_^L!XRLN4`q+?t~gmok6=IazthoC)NnIJb@ zSp)*W$)Afu<~&$AOh;pqlb6$SQ6z^XmY;VP;}{V2Wx=`8$_wf26fAehha{GPJYqZ# zj*J7dC-Y-a9qshwYHTb4@^$4(6mLML5hzT869!>Qs^>OOi3Q$;H$jeSj=OJCQobu% zAQ@gokU3L;{58k{=)Xj4f>aWZV}QsG7xx~VOTZ<&GW96^>zXt0^a&(S334veP05j* z-_dia$}@XCukmk12DsYhPDMFAP zD-4k9B`Lq2^IozoxhdJHe35KlPDge$;}S0sF+t++g1W`z_=?qf;+Wl*Ybz@<PbD>v~?c6hoD5wQiWTl5Sfyoz63LrDhpbOEwA%hOF1~ zmTVRZRavc>8qhLo1x=&#fJw1bsvC0ED%rALD(Q07P!zdl8-==|XsW8#9lmB!tD3ed z=k?-(tXK7#T+&QaF4~r=78I>u6bl$(5sOn46&}Qc*#erJ*bFXdBEJdhAZ!G_2_YI? zy$_L>2C{fKlZy>zq>%-yZME!`=6tJpe7>=Ok(dxy2jC$Wq9p!b%aNG^d@&qBfS0}y zeZ48b!wedSK6|OX^3=o4<4I`iy(JarD=K}N_XKbWY?|N9y>+6Tz8~RE-X|)>Ga+|5@QbfEI zA?tCnwu|bjAI$*l2q$rp8XY-Cmjpo)kQkYC-W?&L`-jPoLs^V6xj~eXhN{ix;(V>S zcx-9eG1ja&OI{y3+_N?V3*cWz!{d0og5yrWe0Kz)Z{Yhie7}V+Od7?bp5sX|Izx)l zagc=jwINR=`V}IllUW>beP{@L$mT^_6T~%vtnDFJhqHa?1t=tjB`j4y88o)3m{de6 z5QsU6C#Ucj_zR5>MwZU;#H7FA6NT@<;y&l~=S`7;i3d+2@@t9-Z0>agwpuOgsgBsO z#m3S0E_=~lvX|SS)-mzC01sIoA@7a!V-|=Zg;NzAaWG7uJN@#WF9)dnX`JTN9A_ia zMhkFkw7p)yLai9xFT@dfel&|?uGUiyh<0PCOZ687@dAjQChw=S9u2P*!sB1S*pK&b z0Y-uYbLIvH#uPm;{mhaO-xJ^=Cq(iY-ZDNgN(JmfT@7BHNbGFCiH_RgO|C*Pb+*go zLZda`vg^FJ&xv#7e1^P|$rchQP?4KubaCGXqmg=J(GKFYx{q8L3+2Rl-rKoaRJr)r zwBPgCh!kfn>J_Z*^>%sQ`i)tfa6NChzSy+dyb4^eyOJdrvtSiJr4ago%5wH9I3&O- z&t1Iy+P&w0x(ftB$c1tMADDtj>TCk^uy97CIGamZ`Fo+q*$w6dcT*6A=o&e5GlOKN zT}b_WFD-s5z(bxLCzoyllzt{5tOHSAJ40QbV5p>|gQ}qWP4(_Ua`NE+$W%Yy!a$j$ z4Af?f^(i5*lK;|JiQSIf>4wp>pR*Au&RWzTSljD`_1=Dd_I9=fN{I&CD!6mqjj7YO zkt??X06(1&>Ve*-V1*s3zqsv@%7l7<0O zP-v|yK8xy2n~_ z`Ph=(YSj23T#bmU5%O7)oGE7gl>9xLlps(U(|Ec92OB;3_|I|U6#o;Q``|IEoC!R^ z-G+JC&WtY65T(&0TF5M5)g8>jz{GCy*--*COP8}u}?&wxr2;WUg z7pOQc-2a;Y;L+OS_L7AUAv~Om_^L!XRLN4`q+?t~gmok6=IazthoC)NnIJb@ zSp)*W$)Afu<~&$AOh;pqlb6$SQ6z^XmY;VP;}{V2Wx=`8$_wf26fAehha{GPJYqZ# zj*J7dC-Y-a9qshwYHTb4@^$4(6mLML5hzT869!>Qs^>OOi3Q$;H$jeSj=OJCQobu% zAQ@gokU3L;{58k{=)Xj4f>aWZV}QsG7xx~VOTZ<&GW96^>zXt0^a&(S334veP05j* z-_dia$}@XCukmk12DsYhPDMFAP zD-4k9B`Lq2^IozoxhdJHe35KlPDge$;}S0sF+t++g1W`z_=?qf;+Wl*Ybz@< records = readFromParquet(path); + assertSampleData(records); + } + + @Test + void readWriteToAzureStorageBlob() throws IOException { + var schema = sampleDataSchema(); + var recordsToWrite = sampleRecords(schema); + + // Write to azure storage blob + var signedUrl = buildSignedUrl(ShortUUID.get() + "_datetime.parquet"); + var writeToUri = ParquetReaderWriterWithAvro.getURIFromSignedUrl(signedUrl); + var config = ParquetReaderWriterWithAvro.getConfigFromSignedUri(signedUrl); + var path = new Path(writeToUri); + OutputFile outputFile = HadoopOutputFile.fromPath(path, config); + simpleWriteToParquet(recordsToWrite, outputFile, schema, config); + + // Read from azure storage blob + InputFile inputFile = HadoopInputFile.fromPath(path, config); + List records = readFromParquet(inputFile); + assertSampleData(records); + } + + @Test + void formatDateTimeField() throws IOException { + Path sourceFile = new Path(FOLDER_PATH + "azure_original.parquet"); + List records = ParquetReaderWriterWithAvro.readFromParquet(sourceFile); + + Schema nullType = Schema.create(Schema.Type.NULL); + Schema timestampMicroType = + LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)); + Schema newSchema = + SchemaBuilder.record("newFormatSchema") + .fields() + .name("variant_id") + .type() + .nullable() + .stringType() + .noDefault() + .name("date_time_column") + .type(Schema.createUnion(nullType, timestampMicroType)) + .noDefault() + .endRecord(); + var columnNames = List.of("variant_id", "date_time_column"); + var columnDataTypeMap = + Map.of("variant_id", TableDataType.STRING, "date_time_column", TableDataType.DATETIME); + + // === write to local file === + var output = randomizedFilePath("datetime"); + var config = new Configuration(); + OutputFile outputFile = HadoopOutputFile.fromPath(output, config); + writeToParquet(records, columnNames, columnDataTypeMap, outputFile, newSchema, config); + + List formattedRecords = ParquetReaderWriterWithAvro.readFromParquet(output); + assertThat( + "datetime should now be long value representing microseconds since epoch", + formattedRecords.get(0).get("date_time_column"), + equalTo(1657022401000000L)); + assertThat( + "record has correct schema", + formattedRecords.get(0).getSchema().getField("date_time_column").schema().toString(), + containsString("timestamp-micros")); + } + + @Test + void e2eTestFormatDatetime() throws IOException { + // == local read == + Path azureSourceFile = new Path(FOLDER_PATH + "azure_original.parquet"); + List recordsToWrite = + ParquetReaderWriterWithAvro.readFromParquet(azureSourceFile); + // === generate signed url from snapshotExport === + // var signedUrlFromSnapshotExport = ""; + // InputFile azureInputFile = buildInputFileFromSignedUrl(signedUrlFromSnapshotExport); + // List recordsToWrite = + // ParquetReaderWriterWithAvro.readFromParquet(azureInputFile); + + Schema nullType = Schema.create(Schema.Type.NULL); + Schema timestampMicroType = + LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)); + Schema newSchema = + SchemaBuilder.record("newFormatSchema") + .fields() + .name("variant_id") + .type() + .nullable() + .stringType() + .noDefault() + .name("date_time_column") + .type(Schema.createUnion(nullType, timestampMicroType)) + .noDefault() + .endRecord(); + var columnNames = List.of("variant_id", "date_time_column"); + var columnDataTypeMap = + Map.of("variant_id", TableDataType.STRING, "date_time_column", TableDataType.DATETIME); + + // Write to azure storage blob + + var signedUrl = buildSignedUrl(ShortUUID.get() + "_e2e.parquet"); + var writeToUri = ParquetReaderWriterWithAvro.getURIFromSignedUrl(signedUrl); + var config = ParquetReaderWriterWithAvro.getConfigFromSignedUri(signedUrl); + var path = new Path(writeToUri); + OutputFile outputFile = HadoopOutputFile.fromPath(path, config); + writeToParquet(recordsToWrite, columnNames, columnDataTypeMap, outputFile, newSchema, config); + + // Read from azure storage blob + InputFile inputFile = HadoopInputFile.fromPath(path, config); + List formattedRecords = + ParquetReaderWriterWithAvro.readFromParquet(inputFile); + assertThat( + "datetime should now be long value representing microseconds since epoch", + formattedRecords.get(0).get("date_time_column"), + equalTo(1657022401000000L)); + assertThat( + "record has correct schema", + formattedRecords.get(0).getSchema().getField("date_time_column").schema().toString(), + containsString("timestamp-micros")); + + // read from GCP output file and confirm the same results + Path gcpSourceFile = new Path(FOLDER_PATH + "bq_original.parquet"); + List gcpRecords = + ParquetReaderWriterWithAvro.readFromParquet(gcpSourceFile); + assertThat( + "record has correct schema", + gcpRecords.get(0).getSchema().getField("date_time_column").schema().toString(), + containsString("timestamp-micros")); + } + + private Path randomizedFilePath(String baseFileName) { + return new Path(OUTPUT_PATH + baseFileName + "_" + ShortUUID.get().toString() + ".parquet"); + } + + private Schema sampleDataSchema() { + var jsonSchema = + """ + { + "type" : "record", + "name" : "int_record", + "namespace" : "test", + "fields" : [ { + "name" : "c1", + "type" : [ "null", "long" ], + "default" : null + } ] + } + """; + return new Schema.Parser().parse(jsonSchema); + } + + private List sampleRecords(Schema schema) { + GenericData.Record record = new GenericData.Record(schema); + record.put("c1", 1); + GenericData.Record record2 = new GenericData.Record(schema); + record2.put("c1", 2); + return List.of(record, record2); + } + + private void assertSampleData(List records) { + assertThat("contains the right number of values", records, hasSize(2)); + assertThat("retrieves the right value", records.get(0).get("c1"), equalTo(1L)); + assertThat("retrieves the right value", records.get(1).get("c1"), equalTo(2L)); + } + + // We don't really need to build the signed URL, we could instead just pass these few bits of + // information to the method that builds the write configuration + private String buildSignedUrl(String fileName) { + var storageAccount = "shelbytestaccount"; + var filePath = "testfilesystem/oysters/"; + // Url signed from "testfilesystem" container + var defaultSasToken = ""; + // TODO: override this with your SAS token + var sasToken = defaultSasToken; + if (sasToken.equals(defaultSasToken)) { + throw new IllegalArgumentException("Please set a valid SAS token"); + } + return "https://%s.blob.core.windows.net/%s%s?%s" + .formatted(storageAccount, filePath, fileName, sasToken); + } + + private InputFile buildInputFileFromSignedUrl(String signedUrl) throws IOException { + var writeToUri = ParquetReaderWriterWithAvro.getURIFromSignedUrl(signedUrl); + var config = ParquetReaderWriterWithAvro.getConfigFromSignedUri(signedUrl); + var path = new Path(writeToUri); + return HadoopInputFile.fromPath(path, config); + } +}