Sunday, July 21, 2019

Handling zip files in Google Cloud Dataflow

Google cloud dataflow is useful in processing large amounts of data but when it comes to processing the files in zip files, it is a bit tricky. By design, dataflow doesn't distribute the zip files from a bucket to multiple workers. Instead it allocates only one worker to unzip all the files and process them. This is not an optimal solution for the amount of data we need to handle.

There is an alternate way to do this:
  1. Put all the zip file names in a text file. Note: The following model assumes all the zip files are in GCS path.
  2. Give the text file name to the pipeline.
  3. It will go through the bucket, read the zip file names and create the absolute GCS file path and outputs them to the next in the pipeline.
  4. The next pipeline will take this GCS filepath and without caring about the file type, will unzip each input zip file and then hand over the contents to the next in the pipeline.
  5. Now instead of one worker, you have N workers to handle the zip files
The code for assigning the absolute GCS path:

import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.util.List; public class GetGCSUtilPath extends DoFn<String, String> { private static final Log log = LogFactory.getLog(GetGCSUtilPath.class); @ProcessElement public void processElement(ProcessContext c){ try { GcsUtil.GcsUtilFactory factory = new GcsUtil.GcsUtilFactory(); GcsUtil util = factory.create(c.getPipelineOptions()); String dataFolder = c.element(); log.warn("Processing data folder - " + dataFolder); List<GcsPath> gcsPaths = util.expand(GcsPath.fromUri(dataFolder)); for (GcsPath gcsp : gcsPaths) { c.output(gcsp.toString()); } } catch(Exception ex) { log.error("Unalbe to process the file " + c.element()); } } }


Each line in the text file, will be coming in as input (from the text file we gave in the first entry of the pipeline) and the output will be the absolute path to the GCS bucket.

The following code unzips each file, read it as a string, then hands over to the next one in the pipeline:

@ProcessElement public void processElement(ProcessContext c){ String p = c.element(); GcsUtil.GcsUtilFactory factory = new GcsUtil.GcsUtilFactory(); GcsUtil u = factory.create(c.getPipelineOptions()); byte[] buffer = new byte[50*1024]; ZipInputStream zis = null; ZipEntry ze = null; try{ SeekableByteChannel sek = u.open(GcsPath.fromUri(p)); InputStream is = Channels.newInputStream(sek); BufferedInputStream bis = new BufferedInputStream(is); zis = new ZipInputStream(bis); ze = zis.getNextEntry(); while(ze!=null){ StringBuilder zipContents = new StringBuilder(); log.info("Unzipping File {}" + ze.getName()); int len; while((len=zis.read(buffer))>0){ zipContents.append(new String(buffer, 0, len, "UTF-8")); } log.info("File " + p + " is opened successfully."); ze=zis.getNextEntry(); c.output(zipContents.toString()); TOTAL_JSON_FILES_COUNT.inc(); } } catch(Exception e){ log.error("An exception processing the zip file - " + c.element(), e); } finally { try { if (zis != null) { zis.closeEntry(); zis.close(); } } catch(Exception ex){ log.error("Unable to close the zip files: " + c.element(), ex); } } }
Finally wire them all:
Pipeline pipeline = Pipeline.create(pipelineOptions); try { pipeline.apply("Get Zip File folders", TextIO.read().from("gs://rchandra/input/listofzipfiles.txt")) .apply("Get GCS Path for each zip file", ParDo.of(new GetGCSUtilPath())) .apply("Reading Zip Files",ParDo.of(new ProcessZipFolder())) .apply("Process Json Files", ParDo.of(new ProcessJsonData())) .apply("Writing results", TextIO.write().withSuffix(".csv").withNumShards(10).to("gs://output/")); pipeline.run(); } catch(Exception ex){ log.error("Exception while running the zip files"); }
The above code reads the zip file, unzips the json files in them, process them and then finally writes the output as a csv file in the given output location.
A sample "listofzipfile.txt" can be like this:
gs://rchandra/input/bucket1/*.zip gs://rchandra/input/bucket2/*.zip gs://rchandra/input/bucket3/*.zip gs://rchandra/input/bucket4/*.zip gs://rchandra/input/bucket5/*.zip

Saturday, July 13, 2019

Adding line numbers to a large number of entries in a file

After looooong time...

I had this requirement of creating a sequence file for a Map-Reduce job.  I had a large set of files (some 1000s) in a folder and I need to create keys.txt for the Sequence file creator.

First I thought I need a painful way of writing a python module to get the list of files, add the index value as I walk through the list and then save them in a file named keys.txt

Then I found out the good old editor vi can easily do this job for me.  Here are the steps:

1) Go to the folder where you have the last set of files
2) Use ls -1a > keys.txt to get just the file names.  One caveat is this will add ., .. and keys.txt in the file list.  We need to remove them.
3) Open the keys.txt in the vi.
4) Remove those 3 lines that has those entries mentioned in 2).
5) In vi command mode, enter the following:

%s/^/\=printf('%-3d,', line('.'))

and press enter.

The above command will add the line number for each line. For example:
1,1.zip
2,2.zip
...
...
...
500000,500000.zip

And that is all folks....

Friday, July 14, 2017

Calculating HBase Tables disk space

If you want to get the size of the HBase tables,  the simple way is to use 'hadoop' command from your local hadoop setup.  The command is:

hadoop --config <your hbase-config-file location> fs -du  -h <folder where the data resides>

For example:
hadoop --config c:\hadoop\staging fs -du  -h /hbase/data/default/

This will display all the tables under that 'default' folder (Note: Based on your setup, the data folder may vary, but you get the idea).  The first column is the actual table size across the region servers.  The second column shows the total size including the replication.  The second column is important in deciding how much "total" space is required.

1.4 M    4.3 M    /hbase/data/default/Table1
79.9 M   239.8 M  /hbase/data/default/Table2
2.0 K    6.0 K    /hbase/data/default/Table3

Wednesday, December 14, 2016

Cassandra:nodetool usage

Nodetool is a OOTB tool from Cassandra.  It provides various options to manage the keyspace.  There were three useful options I came across:

1) I had to copy the keyspace from one server to another server.  Since the source is from multiple servers, it is essential to cleanup the data after all the db and its related files are copied.  For that, I have to use nodetool.  Nodetool usually resides in <cassandra-path>/bin.  To cleanup, give the following command:

            nodetool -h <hostname of the cassandra> cleanup

Based on the amount of data to be cleaned, it will take some time.To track the progress, you can use:

            nodetool compactionstats

This will print the following:

pending tasks: 1
          compaction type        keyspace   column family       completed           total      unit  progress
                  Cleanup             <keyspace-name>      <column-family-name>      1667500496      2004577146     bytes    83.18%

Note: the total column will vary based on the size that is being calculated in real time.

2) To get the statistics of all the keyspaces. Especially to get a rough idea of how much keys per column family are being used:

           nodetool -h <hostname> cfstats


Friday, December 9, 2016

MSSQL: How to transform a set of rows from one table to another?

My requirement was to transfer a specific set of rows from one table to another table and both have the exact schema bit-by-bit.  In MSSQL, follow these steps:

1) First get the column list from the destination table.  Use:

SELECT SUBSTRING(
    (SELECT ', ' + QUOTENAME(COLUMN_NAME)
        FROM INFORMATION_SCHEMA.COLUMNS
        WHERE TABLE_NAME = '<destination-table-name>'
        ORDER BY ORDINAL_POSITION
        FOR XML path('')),
    3,
    200000);

2) If the destination table has the ID column (primary key), then you need to turn on the identity_insert option:
   
     set identity_insert <destination-table> on;

3) Then you insert...into...select query.  The select can have any valid condition to get the specific set of data.  A typical query can be like this:

insert into emails (<destination-table-column-list-from the query in step 1) SELECT <source-table-column-names>
  FROM <source-table> where <any condition that will load only those records>;

4) Don't forget to turn off the identity_insert option you changed in step 2):
 
    set identity_insert dbo.emails off;

And that is all...

Wednesday, November 30, 2016

Installing Jupyter Notebook and Numpy in Windows

I had hard time to install Jupyter Notebook.  My machine already had Python 2.7 and after installing anaconda and trying to run 'juypter notebook' command threw python errors.

Then I tried to uninstall Python 2.7 and again let the anaconda install its own python version.  No luck.

Then finally I removed all the Python installation but forgot to remove the path variables (more on this below).  Then installed python 3.5 version (take the latest 3.5 release).

Then using pip3 install jupyter, the notebook got installed successfully.

Then running 'jupyter notebook', got an error message "Fatal Python error: Py_Initialize: unable to load the file system codec".  Upon checking the forums, one of the solution was to check the PATH and the PYTHONPATH environment variables.  Bingo..they were still referring to the older and now removed installations of Python.

Removed them and the 'jupyter notebook' brought the index page in the browser but with the 404 error.

Again, forums helped me.  It looks like there may be a bug in the jupyter version I downloaded, so if you run the 'jupyter notebook' from your root folder (c:\), then this error will happen.  This could be due to the fact that the root folder has hidden system files.   Switch to any folder, say, c:\tmp, and run the command, brings the page with the file contents of that folder.

Numpy is bit different.  Since there is no installation manager provided by numpy.org, you need to first download the binary wheel for the python version you're using.  Important: Make sure you are downloading the right version of the whl file.  if you're using Python 3.5, look for the one with the name that contains cp35.

Then go to that downloaded folder and run pip install <whl file name>.  For example, pip3 install "numpy-1.11.2+mkl-cp35-cp35m-win32.whl".  This will install the numpy package.

That's all folks!.

Wednesday, November 16, 2016

Using Jboss JConsole

If you're using JBoss as your app server, then you don't need to setup JDK settings for the JMX.  Instead you need to check the following: http://stackoverflow.com/questions/17105933/enabling-jmx-remote-in-jboss-6-1

Then run JBOSS_HOME/bin/jconsole.sh (or .bat for Windows).

In the "Remote" section enter the following:service:jmx:remoting-jmx://<server-name>:9999 (the default OOTB port number is 9999.  Check 'management-native' value in standalone.xml.

Then enter the username and password we created using add-user.sh (or .bat) in jboss/bin.