Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add tool to convert CSV format file to binary file required by GeminiGraph #27

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Makefile是给make命令执行时会读取的批处理脚本
# 不具有跨平台的特性,如果需要跨平台,可以使用cmake,通过编写CMakeLists.txt
# 来实现跨平台或者导出为某个平台特定的Makefile

ROOT_DIR= $(shell pwd)
TARGETS= toolkits/bc toolkits/bfs toolkits/cc toolkits/pagerank toolkits/sssp
TARGETS= toolkits/bc toolkits/bfs toolkits/cc toolkits/pagerank toolkits/sssp toolkits/convert_csv_to_binary
MACROS=
# MACROS= -D PRINT_DEBUG_MESSAGES

Expand Down
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ The input parameters of these applications are as follows:
./toolkits/bc [path] [vertices] [root]
```

*[path]* gives the path of an input graph, i.e. a file stored on a *shared* file system, consisting of *|E|* \<source vertex id, destination vertex id, edge data\> tuples in binary.
*[vertices]* gives the number of vertices *|V|*. Vertex IDs are represented with 32-bit integers and edge data can be omitted for unweighted graphs (e.g. the above applications except SSSP).
*[path]* gives the path of an input graph, i.e. a file stored on a *shared* file system, consisting of *|E|* \<source vertex id, destination vertex id, edge data\> tuples in binary. You can use sample data in `sample-data`, it's in csv format, you can use `./toolktits/convert_csv_to_binary ./sample.csv` to transform it to binary format. *[vertices]* gives the number of vertices *|V|*. Vertex IDs are represented with 32-bit integers and edge data can be omitted for unweighted graphs (e.g. the above applications except SSSP).
Note: CC makes the input graph undirected by adding a reversed edge to the graph for each loaded one; SSSP uses *float* as the type of weights.

If Slurm is installed on the cluster, you may run jobs like this, e.g. 20 iterations of PageRank on the *twitter-2010* graph:
Expand Down
2 changes: 2 additions & 0 deletions core/atomic.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ Copyright (c) 2015-2016 Xiaowei Zhu, Tsinghua University
limitations under the License.
*/

// 定义原子操作,使用CAS来更新对应的值
// 本质上来说,也是封装了系统库的__sync_bool_compare_and_swap之类的函数
#ifndef ATOMIC_HPP
#define ATOMIC_HPP

Expand Down
68 changes: 51 additions & 17 deletions core/bitmap.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,50 +14,84 @@ Copyright (c) 2015-2016 Xiaowei Zhu, Tsinghua University
limitations under the License.
*/

// 自己实现了一套位图
#include <stddef.h>
#ifndef BITMAP_HPP
#define BITMAP_HPP

#define WORD_OFFSET(i) ((i) >> 6)
#define BIT_OFFSET(i) ((i) & 0x3f)
// 位图,本身是使用二进制位来压缩数据的一种方式,在编程里面,没有直接表示二进制位的数据(boolean?)
// 所以,往往使用32位的int,64位的long等类型的数组来进行间接表示,即封装了多个二进制位
// 当需要定位索引为 x
// 的二进制位的位置时,要先使用整数除法定位x在哪个数值上,然后使用余数
// 定位在对应数值上的具体位置,比如定义索引号为2237的二进制位在以int32数组array上的位置时
// 其对应的可表示数值为 2237 / 32 = 69,即在array[69]上,接下来定位二进制位
// 2237 % 32 = 29,即对应的二进制位在arr[69]的整数的29位上。
//
#define WORD_OFFSET(i) ((i) >> 6) // 除以64,其实一个long放64个二进制位
#define BIT_OFFSET(i) ((i)&0x3f) // 和0011 1111相与,其实为和64取余,找到其二进制位的偏移

/**
* 用unsigned long数组存放位图,实现时使用指针 + size表示数组
*
*/
class Bitmap {
public:
size_t size;
unsigned long * data;
Bitmap() : size(0), data(NULL) { }
unsigned long *data;

// 初始化列表语法
Bitmap() : size(0), data(NULL) {}
Bitmap(size_t size) : size(size) {
data = new unsigned long [WORD_OFFSET(size)+1];
data = new unsigned long[WORD_OFFSET(size) + 1];
clear();
}
~Bitmap() {
delete [] data;
}

// 析构,清理data数组
~Bitmap() { delete[] data; }

/**
* 数组所有元素置为0
*/
void clear() {
size_t bm_size = WORD_OFFSET(size);
#pragma omp parallel for
for (size_t i=0;i<=bm_size;i++) {
#pragma omp parallel for
for (size_t i = 0; i <= bm_size; i++) {
data[i] = 0;
}
}

/**
* 数组所有元素置为0xffffffffffffffff,即64位全为1
*/
void fill() {
size_t bm_size = WORD_OFFSET(size);
#pragma omp parallel for
for (size_t i=0;i<bm_size;i++) {
#pragma omp parallel for
for (size_t i = 0; i < bm_size; i++) {
data[i] = 0xffffffffffffffff;
}
data[bm_size] = 0;
for (size_t i=(bm_size<<6);i<size;i++) {
for (size_t i = (bm_size << 6); i < size; i++) {
data[bm_size] |= 1ul << BIT_OFFSET(i);
}
}

/**
* 获取对应的二进制位的值,返回0代表该位为0,否则1
* @param i 二进制位的index值
*/
unsigned long get_bit(size_t i) {
return data[WORD_OFFSET(i)] & (1ul<<BIT_OFFSET(i));
return data[WORD_OFFSET(i)] & (1ul << BIT_OFFSET(i));
}

/**
* 设置对应二进制位的值为1,使用cas来实现原子更新
* @param i 二进制位的index值
*/
void set_bit(size_t i) {
__sync_fetch_and_or(data+WORD_OFFSET(i), 1ul<<BIT_OFFSET(i));
__sync_fetch_and_or(data + WORD_OFFSET(i), 1ul << BIT_OFFSET(i));
}
};

typedef Bitmap VertexSubset;

// 没有必要放在这里,既然有一个统一的type.hpp来放置定义
// typedef Bitmap VertexSubset;
#endif
8 changes: 7 additions & 1 deletion core/constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,16 @@ Copyright (c) 2015-2016 Xiaowei Zhu, Tsinghua University
limitations under the License.
*/

// 定义了用到的常量
#ifndef CONSTANTS_HPP
#define CONSTANTS_HPP

#define CHUNKSIZE (1<<20)
// CHUNKSIZE 表示划分的大小
// 1 << 20 => 1048576,即百万级别
#define CHUNKSIZE (1 << 20)

// PAGESIZE 表示单页大小
// 1 << 12 => 4096
#define PAGESIZE (1<<12)

#endif
6 changes: 5 additions & 1 deletion core/filesystem.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ Copyright (c) 2015-2016 Xiaowei Zhu, Tsinghua University
#include <errno.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <string>

// 这里就是简单检查文件是否存在以及文件的大小
// 注意输入文件是二进制格式的,没有空隙,
// 所以这里获取的文件大小才能用于后面的计算
inline bool file_exists(std::string filename) {
struct stat st;
return stat(filename.c_str(), &st)==0;
Expand All @@ -35,4 +39,4 @@ inline long file_size(std::string filename) {
return st.st_size;
}

#endif
#endif
14 changes: 12 additions & 2 deletions core/mpi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ Copyright (c) 2015-2016 Xiaowei Zhu, Tsinghua University
#include <stdlib.h>
#include <assert.h>
#include <mpi.h>

/**
* 获取与输入T类型对应的一个MPI数据类型
* 仅仅支持 char, unsigned char, int, long, unsigned long, float, double等类型
*/
template <typename T>
MPI_Datatype get_mpi_data_type() {
if (std::is_same<T, char>::value) {
Expand All @@ -46,11 +49,14 @@ MPI_Datatype get_mpi_data_type() {
}
}

// 定义MPI编程相关的变量,可以认为MPI构建了一个集群
// partition_id就是节点在集群中的编号,从0开始
// partitions就是集群中节点的个数
class MPI_Instance {
int partition_id;
int partitions;
public:
MPI_Instance(int * argc, char *** argv) {
MPI_Instance(int * argc, char *** argv) { // 这里对应的是main函数里面的argc和argv的指针
int provided;
MPI_Init_thread(argc, argv, MPI_THREAD_MULTIPLE, &provided);
MPI_Comm_rank(MPI_COMM_WORLD, &partition_id);
Expand All @@ -73,9 +79,13 @@ class MPI_Instance {
}
#endif
}

// 析构函数,执行清理工作
~MPI_Instance() {
MPI_Finalize();
}

// 进行信息交换,然后同步数据,这里就是BSP模型中的栅栏步骤
void pause() {
if (partition_id==0) {
getchar();
Expand Down
8 changes: 7 additions & 1 deletion core/time.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,15 @@ Copyright (c) 2015-2016 Xiaowei Zhu, Tsinghua University

#include <sys/time.h>

/**
* 获取当前时间
* tv_sec为从1970-1-1零点零分到创建struct timeval时的秒数,tv_usec为微秒数
*/
inline double get_time() {
struct timeval tv;
gettimeofday(&tv, NULL);
// C++ 11里面并不推荐使用NULL来表示空指针,最好是使用nullptr
// gettimeofday(&tv, NULL);
gettimeofday(&tv, nullptr);
return tv.tv_sec + (tv.tv_usec / 1e6);
}

Expand Down
16 changes: 15 additions & 1 deletion core/type.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,32 @@ Copyright (c) 2015-2016 Xiaowei Zhu, Tsinghua University
limitations under the License.
*/

// core/type.hpp定义了点,边ID以及邻接关系等
#ifndef TYPE_HPP
#define TYPE_HPP

#include <stdint.h>
#include "bitmap.hpp"

struct Empty { };
typedef Bitmap VertexSubset;

// 使用uint32_t之类的指定固定大小无符号整数类型
// 提供良好的移植性(同为int,但是不同平台下int大小可能不同,可能为2字节,4字节等)
typedef uint32_t VertexId;
typedef uint64_t EdgeId;

// 这里使用了模板,EdgeData只是代表抽象的边数据
// 比如最短路径里面边的权重,就是这里的EdgeData
// 类似Java里面的泛型
template <typename EdgeData>
struct EdgeUnit {
VertexId src;
VertexId dst;
EdgeData edge_data;
} __attribute__((packed));

// 代表没有数据的边,比如PageRank的输入数据的边就只有起点终点,并没有数据
template <>
struct EdgeUnit <Empty> {
VertexId src;
Expand All @@ -40,12 +49,14 @@ struct EdgeUnit <Empty> {
};
} __attribute__((packed));

// 代表一条邻接边关系,包含边的另一端的顶点id和边自身含有的数据
template <typename EdgeData>
struct AdjUnit {
VertexId neighbour;
EdgeData edge_data;
} __attribute__((packed));

// 代表一条邻接关系,只有邻居顶点id,但是邻接边没有数据属性,比如PageRank的输入数据
template <>
struct AdjUnit <Empty> {
union {
Expand All @@ -54,17 +65,20 @@ struct AdjUnit <Empty> {
};
} __attribute__((packed));

// 压缩的邻接索引单元,这里应该是要和其他的矩阵之类的结合理解
struct CompressedAdjIndexUnit {
EdgeId index;
VertexId vertex;
} __attribute__((packed));

// 顶点的邻接链表,即所有的邻接边关系
// 使用两个指针代表链表的起点和终点
// 使用初始化列表语法进行数据成员的初始化
template <typename EdgeData>
struct VertexAdjList {
AdjUnit<EdgeData> * begin;
AdjUnit<EdgeData> * end;
VertexAdjList() : begin(nullptr), end(nullptr) { }
VertexAdjList(AdjUnit<EdgeData> * begin, AdjUnit<EdgeData> * end) : begin(begin), end(end) { }
};

#endif
6 changes: 6 additions & 0 deletions sample-data/sample.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
0,1
0,2
0,3
1,2
1,3
2,3
64 changes: 64 additions & 0 deletions toolkits/convert_csv_to_binary.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#include <chrono>
#include <cstdint>
#include <iostream>
#include <fstream>
#include <sstream>
#include <string>
#include <set>
#include <limits>

template<typename VALUE>
void log(std::string msg, VALUE value) {
std::cout << msg << value << std::endl;
}

int main(int argc, char** argv)
{
if (argc < 2) {
std::cout << "Usage: " << *argv << " path/to/your/csv/file" << std::endl;
return -1;
}

std::cout << "processing..." << std::endl;
using clock = std::chrono::high_resolution_clock;
clock::time_point start_time = clock::now();

std::fstream input_csv(argv[1]);

std::string output_binary_file_name = std::string(argv[1]) + ".bin";
std::fstream output_binary(output_binary_file_name, std::ios::out | std::ios::binary);

std::uint32_t min_vertex_id = UINT32_MAX;
std::uint32_t max_vertex_id = 0;
std::uint32_t edges = 0;
std::set<std::uint32_t> record;
std::string line;

while(std::getline(input_csv, line)) {
edges++;
std::istringstream words(line);
std::string word;
while (getline(words, word, ',')) {
std::uint32_t num = std::stoi(word);
record.insert(num);
max_vertex_id = num > max_vertex_id ? num : max_vertex_id;
min_vertex_id = num < min_vertex_id ? num : min_vertex_id;
output_binary.write((char*)&num, sizeof(num));
}
}

std::string is_id_continuous = (record.size() == (max_vertex_id - min_vertex_id + 1)) ? "yes" : "no";
clock::time_point end_time = clock::now();
clock::duration duration = end_time - start_time;

log("output file: ", output_binary_file_name);
log("time cost: ", std::to_string(duration.count()));
log("edges: ", edges);
log("vertices: ", record.size());
log("min vertex id: ", min_vertex_id);
log("max vertex id: ", max_vertex_id);
log("Are all ids continuous: ", is_id_continuous);

input_csv.close();
output_binary.close();
}