[lnkForumImage]
TotalShareware - Download Free Software

Confronta i prezzi di migliaia di prodotti.
Asp Forum
 Home | Login | Register | Search 


 

Forums >

comp.lang.ruby

writing pipe on Windows problem.

Heesob Park

5/6/2008 3:14:00 AM

Hi, all

I tested pipe with this code:

readPipe, writePipe = IO.pipe
t = Thread.new { puts "got #{readPipe.readline.length} bytes" }
sleep 1
puts "hello from main"
writePipe.puts "a"*2048
t.join


It works on Linux, but it blocks and fails to respond Ctrl-C on Windows.
How to avoid freezing on Windows?

Regards,
Park Heesob
--
Posted via http://www.ruby-....

1 Answer

Heesob Park

5/12/2008 1:51:00 AM

0

Hi,

Heesob Park wrote:
> Hi, all
>
> I tested pipe with this code:
>
> readPipe, writePipe = IO.pipe
> t = Thread.new { puts "got #{readPipe.readline.length} bytes" }
> sleep 1
> puts "hello from main"
> writePipe.puts "a"*2048
> t.join
>
>
> It works on Linux, but it blocks and fails to respond Ctrl-C on Windows.
> How to avoid freezing on Windows?
>
It seems nobody cares about pipe blocking on windows.:(
I made my own patch applied for win32.c of ruby1.8.6p114.
It is not complete yet, but it will solve blocking problem of pipe and
cosole input on windows.
I hope this give some inpiration to the win32 ruby maintainer.

Regards,
Park Heesob

--- win32.c.org 2008-04-29 14:44:41.000000000 +0900
+++ win32.c 2008-05-02 14:08:10.000000000 +0900
@@ -2046,6 +2050,114 @@
return fileset->fd_count;
}

+static HANDLE main_select_event=NULL;
+static HANDLE *select_thread_list=NULL;
+
+static DWORD WINAPI
+select_read_thread(PVOID argp)
+{
+ HANDLE fd_handle = (HANDLE)argp;
+ DWORD ret = 1;
+ DWORD mode = 0;
+
+ GetConsoleMode(fd_handle,&mode);
+ if(mode) {
+ /* Console Input */
+ DWORD num_events,num_events_read,pre_num=0;
+ INPUT_RECORD *input_record;
+ int i;
+
+ while(1) {
+ if(WaitForSingleObject(main_select_event,0)!=WAIT_TIMEOUT)
break;
+ GetNumberOfConsoleInputEvents(fd_handle, &num_events);
+ if(pre_num != num_events) {
+ input_record =
(INPUT_RECORD*)malloc(sizeof(INPUT_RECORD)*num_events);
+ PeekConsoleInput(fd_handle, input_record,
num_events,&num_events_read);
+ for(i=0;i<num_events;i++) {
+if(input_record[i].Event.KeyEvent.uChar.AsciiChar) return ret;
+ if ((input_record[i].EventType == KEY_EVENT) &&
input_record[i].Event.KeyEvent.bKeyDown) {
+if(input_record[i].Event.KeyEvent.uChar.AsciiChar==13) { /* carriage
return */
+ free(input_record);
+ return ret;
+ }
+ else
if(input_record[i].Event.KeyEvent.uChar.AsciiChar==26) { /* ^Z - end of
file */
+ free(input_record);
+ return ret;
+ }
+ }
+ }
+ free(input_record);
+ pre_num = num_events;
+ }
+ }
+ return ret;
+ } else {
+ DWORD state;
+ ret =
GetNamedPipeHandleState(fd_handle,&state,NULL,NULL,NULL,NULL,0);
+ if(ret) {
+ /* Pipe Input */
+ int bytes_avail=0;
+ while(1) {
+if(WaitForSingleObject(main_select_event,0)!=WAIT_TIMEOUT) break;
+ ret =
PeekNamedPipe(fd_handle,NULL,0,NULL,&bytes_avail,NULL);
+ if(bytes_avail>0) {
+ return bytes_avail;
+ }
+ }
+ }
+ return ret;
+ }
+ return ret;
+}
+
+typedef DWORD (WINAPI *PNtQueryInformationFile)(HANDLE, PVOID,
PVOID,DWORD, DWORD );
+#define FilePipeLocalInformation 24
+typedef struct _FILE_PIPE_LOCAL_INFORMATION {
+ ULONG NamedPipeType;
+ ULONG NamedPipeConfiguration;
+ ULONG MaximumInstances;
+ ULONG CurrentInstances;
+ ULONG InboundQuota;
+ ULONG ReadDataAvailable;
+ ULONG OutboundQuota;
+ ULONG WriteQuotaAvailable;
+ ULONG NamedPipeState;
+ ULONG NamedPipeEnd;
+} FILE_PIPE_LOCAL_INFORMATION, *PFILE_PIPE_LOCAL_INFORMATION;
+static PNtQueryInformationFile NtQueryInformationFile=NULL;
+
+static DWORD WINAPI
+select_write_thread(PVOID argp)
+{
+ HANDLE fd_handle = (HANDLE)argp;
+ DWORD ret = 1;
+ DWORD mode = 0;
+
+ GetConsoleMode(fd_handle,&mode);
+ if(mode) {
+ /* Console Output */
+ return ret;
+ } else {
+ DWORD state;
+ ret =
GetNamedPipeHandleState(fd_handle,&state,NULL,NULL,NULL,NULL,0);
+ if(ret) {
+ /* Pipe output */
+ int bytes_written=0;
+ int bytes_avail=0;
+ DWORD iob[2];
+ FILE_PIPE_LOCAL_INFORMATION fpli = {0};
+ if(NtQueryInformationFile) {
+ while(1) {
+ if(WaitForSingleObject(main_select_event,0)!=WAIT_TIMEOUT) break;
+ ret =
NtQueryInformationFile(fd_handle,iob,&fpli,sizeof(fpli),FilePipeLocalInformation);
+ if(fpli.OutboundQuota==fpli.WriteQuotaAvailable) return
ret;
+ }
+ }
+ }
+ }
+ return ret;
+}
+
long
rb_w32_select (int nfds, fd_set *rd, fd_set *wr, fd_set *ex,
struct timeval *timeout)
@@ -2074,6 +2187,86 @@
file_nfds += extract_file_fd(wr, &file_wr);
if (file_nfds)
{
+ DWORD val,exit_code;
+ int i;
+ if(!NtQueryInformationFile)
+ NtQueryInformationFile = (PNtQueryInformationFile)
+GetProcAddress(GetModuleHandle("ntdll.dll"),"NtQueryInformationFile");
+ main_select_event = CreateEvent(NULL,TRUE,FALSE,NULL);
+ if(main_select_event == NULL)
+ {
+ printf("CreateEvent failed (%d)\n", GetLastError());
+ return -1;
+ }
+ select_thread_list = malloc(sizeof(HANDLE)*(file_nfds+1));
+ for(i=0; i<file_nfds; i++)
+ {
+ if(i<file_rd.fd_count) {
+ select_thread_list[i] =
CreateThread(NULL,0,select_read_thread,
+ (PVOID)file_rd.fd_array[i],0,&val);
+ }
+ else {
+ select_thread_list[i] =
CreateThread(NULL,0,select_write_thread,
+ (PVOID)file_wr.fd_array[i-file_rd.fd_count],0,&val);
+ }
+ if (select_thread_list[i] == NULL)
+ {
+ printf("CreateThread failed (%d)\n", GetLastError());
+ return -1;
+ }
+ }
+
+ select_thread_list[file_nfds] = interrupted_event;
+ if(timeout)
+ r =
WaitForMultipleObjects(file_nfds+1,select_thread_list,FALSE,
+ timeout->tv_sec * 1000 + timeout->tv_usec / 1000);
+ else
+ r =
WaitForMultipleObjects(file_nfds+1,select_thread_list,FALSE,
+ INFINITE);
+
+ if (!SetEvent(main_select_event) )
+ {
+ printf("SetEvent failed (%d)\n", GetLastError());
+ return -1;
+ }
+ /* thread cleanup */
+ for(i=0;i<file_nfds;i++) {
+ while(1) {
+ exit_code = 0;
+ GetExitCodeThread(select_thread_list[i],&exit_code);
+ if(exit_code!=STILL_ACTIVE) {
+ CloseHandle(select_thread_list[i]);
+ break;
+ }
+ }
+ }
+ free(select_thread_list);
+ CloseHandle(main_select_event);
+
+ if(r==WAIT_TIMEOUT) { /* timeout */
+ FD_ZERO(&file_rd);
+ FD_ZERO(&file_wr);
+ if(rd) *rd = file_rd;
+ if(wr) *wr = file_wr;
+ return 0;
+ } else if((r -= WAIT_OBJECT_0) == file_nfds) { /* interrupt */
+ FD_ZERO(&file_rd);
+ FD_ZERO(&file_wr);
+ if(rd) *rd = file_rd;
+ if(wr) *wr = file_wr;
+ return -1;
+ } else if(r<file_rd.fd_count) { /* read ready */
+ FD_ZERO(&file_wr);
+ if(rd) *rd = file_rd;
+ if(wr) *wr = file_wr;
+ return 1;
+ } else { /* write ready */
+ FD_ZERO(&file_rd);
+ if(rd) *rd = file_rd;
+ if(wr) *wr = file_wr;
+ return 1;
+ }
+
// assume normal files are always readable/writable
// fake read/write fd_set and return value
if (rd) *rd = file_rd;


--
Posted via http://www.ruby-....