forked from WiscADSL/MadFS
-
Notifications
You must be signed in to change notification settings - Fork 0
/
read.h
146 lines (124 loc) · 4.59 KB
/
read.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#pragma once
#include "tx.h"
namespace madfs::dram {
class ReadTx : public Tx {
protected:
char* const buf;
public:
ReadTx(File* file, char* buf, size_t count, size_t offset)
: Tx(file, count, offset), buf(buf) {
lock->rdlock(); // nop lock is used by default
}
ReadTx(File* file, char* buf, size_t count, size_t offset, FileState state,
uint64_t ticket)
: ReadTx(file, buf, count, offset) {
is_offset_depend = true;
this->state = state;
this->ticket = ticket;
}
ssize_t exec() {
static thread_local std::vector<LogicalBlockIdx> redo_image;
timer.stop<Event::READ_TX_CTOR>();
size_t first_block_offset = offset & (BLOCK_SIZE - 1);
size_t first_block_size = BLOCK_SIZE - first_block_offset;
if (first_block_size > count) first_block_size = count;
redo_image.resize(num_blocks, 0);
{
TimerGuard<Event::READ_TX_UPDATE> timer_guard;
if (!is_offset_depend) blk_table->update(&state);
}
// reach EOF
if (offset >= state.file_size) {
count = 0;
goto done;
}
if (offset + count > state.file_size) { // partial read; recalculate end_*
count = state.file_size - offset;
end_offset = offset + count;
end_vidx = BLOCK_SIZE_TO_IDX(ALIGN_UP(end_offset, BLOCK_SIZE));
}
// copy the blocks
{
TimerGuard<Event::READ_TX_COPY> timer_guard;
const char* addr =
mem_table->lidx_to_addr_ro(blk_table->vidx_to_lidx(begin_vidx))
->data_ro();
addr += first_block_offset;
size_t contiguous_bytes = first_block_size;
size_t buf_offset = 0;
for (VirtualBlockIdx vidx = begin_vidx + 1; vidx < end_vidx; ++vidx) {
const pmem::Block* curr_block =
mem_table->lidx_to_addr_ro(blk_table->vidx_to_lidx(vidx));
if (addr + contiguous_bytes == curr_block->data_ro()) {
contiguous_bytes += BLOCK_SIZE;
continue;
}
dram::memcpy(buf + buf_offset, addr, contiguous_bytes);
buf_offset += contiguous_bytes;
contiguous_bytes = BLOCK_SIZE;
addr = curr_block->data_ro();
}
dram::memcpy(buf + buf_offset, addr,
std::min(contiguous_bytes, count - buf_offset));
}
redo:
timer.start<Event::READ_TX_VALIDATE>();
while (true) {
// check the tail is still tail
if (bool success = state.cursor.handle_overflow(mem_table); !success) {
break;
}
pmem::TxEntry curr_entry = state.cursor.get_entry();
if (!curr_entry.is_valid()) break;
// then scan the log and build redo_image; if no redo needed, we are done
if (!handle_conflict(curr_entry, begin_vidx, end_vidx - 1, redo_image))
break;
// redo:
LogicalBlockIdx redo_lidx;
// first handle the first block (which might not be full block)
redo_lidx = redo_image[0];
if (redo_lidx != 0) {
const pmem::Block* curr_block = mem_table->lidx_to_addr_ro(redo_lidx);
dram::memcpy(buf, curr_block->data_ro() + first_block_offset,
first_block_size);
redo_image[0] = 0;
}
size_t buf_offset = first_block_size;
// then handle middle full blocks (which might not exist)
VirtualBlockIdx curr_vidx;
for (curr_vidx = begin_vidx + 1; curr_vidx < end_vidx - 1; ++curr_vidx) {
redo_lidx = redo_image[curr_vidx - begin_vidx];
if (redo_lidx != 0) {
const pmem::Block* curr_block = mem_table->lidx_to_addr_ro(redo_lidx);
dram::memcpy(buf + buf_offset, curr_block->data_ro(), BLOCK_SIZE);
redo_image[curr_vidx - begin_vidx] = 0;
}
buf_offset += BLOCK_SIZE;
}
// last handle the last block (which might not be full block)
if (begin_vidx != end_vidx - 1) {
redo_lidx = redo_image[curr_vidx - begin_vidx];
if (redo_lidx != 0) {
const pmem::Block* curr_block = mem_table->lidx_to_addr_ro(redo_lidx);
dram::memcpy(buf + buf_offset, curr_block->data_ro(),
count - buf_offset);
redo_image[curr_vidx - begin_vidx] = 0;
}
}
}
// we actually don't care what's the previous tx's tail, because we will
// need to validate against the latest tail anyway
if (is_offset_depend) {
if (!offset_mgr->validate(ticket, state.cursor)) {
// we don't need to revalidate after redo
is_offset_depend = false;
goto redo;
}
}
timer.stop<Event::READ_TX_VALIDATE>();
done:
allocator->tx_block.pin(state.get_tx_block_idx());
return static_cast<ssize_t>(count);
}
};
} // namespace madfs::dram