您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 12162|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |正序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;
    $ u& J# g0 b* E& B2 ]$ P
  2. require_once __DIR__ . '/Workerman/Autoloader.php';
    6 N  D: n7 H4 [/ ]

  3. ' L. B/ v: H5 X
  4. $worker = new Worker();" o( }7 f4 I3 P
  5. // 4个进程" a1 B+ h5 c! K- d/ }. o  @
  6. $worker->count = 4;& c8 F" k+ U! i  \& C
  7. // 每个进程启动后在当前进程新增一个Worker监听/ a9 L& y' b* T+ k  x0 I
  8. $worker->onWorkerStart = function($worker)
    4 N( _$ w/ `: d  M
  9. {
    ' q0 N# {* ]: V$ N( v8 \
  10.     /**4 ^$ m% B  |! K2 S
  11.      * 4个进程启动的时候都创建2016端口的Worker: J7 o" P5 L5 S0 A* _
  12.      * 当执行到worker->listen()时会报Address already in use错误
    , P* ^8 a& s% e. N1 ^" x" i0 g6 L
  13.      * 如果worker->count=1则不会报错
    9 W( h5 Q, B, F+ w$ R' }
  14.      */  Y( M' ^9 k7 Q* l9 c$ P0 R( G$ p
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');; E3 N5 a8 B1 U: ]
  16.     $inner_worker->onMessage = 'on_message';5 Q4 q/ o4 }% j" A8 N( P
  17.     // 执行监听。这里会报Address already in use错误
    % q" ~# l4 g3 R+ |, Y* K' H
  18.     $inner_worker->listen();
      W/ e6 P% }& j5 o
  19. };
    & F; O. n5 t; D! a7 c

  20. 3 l; O7 d/ w4 K* w& A1 [  M: [
  21. $worker->onMessage = 'on_message';2 C/ V4 J* n, s/ r! }
  22. 0 o3 y7 R9 F% `  H& i  E6 @* O3 \5 z
  23. function on_message($connection, $data)
    ! e2 G& L1 O& B# ]; h$ \+ m( e7 w
  24. {
    7 T3 [" ]# m* t0 W# C/ G7 i7 a! ^3 w
  25.     $connection->send("hello\n");/ u3 K3 p- Y, [6 v7 O2 Y- l
  26. }
    3 v" ^" }; ^- @( _, D5 G
  27. 4 N$ |+ I& F5 s; v  _
  28. // 运行worker
      S' E- p8 a/ q" r2 z8 |  [' w
  29. Worker::runAll();
    " B  L0 l& ]% N; S8 M
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:
    % r: v4 @/ L; }' L
  31. . W1 o+ B" R2 W0 K
  32. use Workerman\Worker;
    . ]9 \# O, H! n! @/ j$ y6 z  \( ~( ^
  33. require_once './Workerman/Autoloader.php';
    ' @3 E1 q8 [: S7 s9 V0 k

  34. " l* A) t. S+ B+ L# z  u
  35. $worker = new Worker('text://0.0.0.0:2015');* H& D) x/ h- y
  36. // 4个进程! p; V- o& X, n/ F4 V* m
  37. $worker->count = 4;# r- m! I& i) ?8 F/ p: L, J5 p: `
  38. // 每个进程启动后在当前进程新增一个Worker监听
    7 ~9 G/ c1 q" C% y1 ]
  39. $worker->onWorkerStart = function($worker)  E! X/ ^2 N# j6 v
  40. {4 W! N2 t' f2 Z5 V" R
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');4 x+ Z+ w# }5 a
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0), j- ]; d) f, l* j, z  z; R
  43.     $inner_worker->reusePort = true;4 Z, v% f2 @' E( I. {" V! v0 Q( e
  44.     $inner_worker->onMessage = 'on_message';8 J" L( @3 J7 M3 y3 U0 G
  45.     // 执行监听。正常监听不会报错
    $ ~) `: x3 R0 Q+ u8 @  r; C. W
  46.     $inner_worker->listen();
    * H$ G! D+ u2 h% {0 B7 B8 ^
  47. };
    . h+ ^' E  I% o3 t( c( G$ O$ {3 y
  48. . }9 t0 e$ D* ~) |- F
  49. $worker->onMessage = 'on_message';4 _: a+ C( m3 k" B( z
  50. ) ~* Z, E/ n# k, C/ b: H2 V; B
  51. function on_message($connection, $data)
    - x8 N$ [- [2 g% v* t
  52. {: H) P: ^( O! p4 H
  53.     $connection->send("hello\n");
    4 U" _0 s3 ]9 ]) `* a% h4 ^
  54. }
    ' `# `% U" @6 I5 a/ n
  55. , @+ O6 T7 V% Q! p5 T" p
  56. // 运行worker; U' c. m* r, p
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    7 k1 h3 ]" `6 L* A
  2. use Workerman\Worker;, ~& [7 F: M) C9 Y. g& F1 T
  3. require_once './Workerman/Autoloader.php';5 G- x. v" Z% u6 W( y& ~4 w1 Y
  4. // 初始化一个worker容器,监听1234端口
    " {( `# ^2 z; S4 y- i
  5. $worker = new Worker('websocket://0.0.0.0:1234');
    1 O. {& ?3 [; u3 `
  6. 6 i9 f$ B$ n0 x- S3 j
  7. /*, ^8 B* n3 y) }' g. q& R
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误
    6 ?" _. g  O9 @  J9 \( s
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)  \" L, t8 ~5 R7 r8 O5 v, z7 |0 ^
  10. */" C/ Z- i5 x7 c/ R( N
  11. $worker->count = 1;0 B& C8 x6 |7 G( V/ J
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口8 i4 s# Q8 K! J6 ]* {& A
  13. $worker->onWorkerStart = function($worker)' G6 ~9 q( u9 X/ k2 L- x8 l
  14. {
    * U- A: e, e: ^0 a1 Y
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符- K, K: W0 S" b5 Q/ _
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    2 I, \/ i) U5 z9 O, R+ S+ J8 J( Z  M
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    " A$ w( m7 o, J  v
  18.     {
      t$ }/ W6 ]) F: m  q- C; Q
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据+ ]( W' q! r$ f
  20.         $data = json_decode($buffer, true);, ]: W* h* p, z$ f3 c
  21.         $uid = $data['uid'];7 Q( @; Z4 [. q
  22.         // 通过workerman,向uid的页面推送数据
    ) p# n0 Z% q# F( y$ Z% w6 P2 R4 l. \: h
  23.         $ret = sendMessageByUid($uid, $buffer);5 \$ L5 c9 c9 f7 [, T
  24.         // 返回推送结果
    1 m/ ~9 m+ `$ e2 R7 V, S% ^9 A/ A+ C; M
  25.         $connection->send($ret ? 'ok' : 'fail');
    4 p" [8 \; G( t5 O8 Y
  26.     };: K* X6 ~3 p7 Y( c! ]
  27.     // ## 执行监听 ##
    . x" C) K' d6 {& j) x4 r5 v
  28.     $inner_text_worker->listen();
    ! B) W* T+ f3 M4 X9 [
  29. };) u- d$ v9 p; ^5 L/ b+ J# D' M
  30. // 新增加一个属性,用来保存uid到connection的映射+ u3 Y$ `/ s3 ^6 H
  31. $worker->uidConnections = array();/ Y# L. \$ `6 _1 P) {
  32. // 当有客户端发来消息时执行的回调函数4 I& l& F+ \3 \1 b
  33. $worker->onMessage = function($connection, $data)
    - n- F1 L5 U: G9 U% `
  34. {
    8 {% m  S$ c4 T0 f$ o4 s
  35.     global $worker;+ F! i2 {4 ~5 J5 V/ o
  36.     // 判断当前客户端是否已经验证,既是否设置了uid
    : G! D2 O( m, G+ B' u9 V
  37.     if(!isset($connection->uid))
    5 V+ s1 c% l* b( c. a0 O! z
  38.     {( a4 f& m( k0 f5 ]$ g3 `9 N
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    # {8 {. i  r0 B
  40.        $connection->uid = $data;
    1 W8 q2 v! }, o
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,! H8 r/ Q) |  l# m4 Q
  42.         * 实现针对特定uid推送数据
    $ C5 _4 k0 V; ?1 ]# c" t! \
  43.         */
    & x5 `) F! B6 G4 b  m% W- I
  44.        $worker->uidConnections[$connection->uid] = $connection;7 z2 q% x* Y( e0 D' V7 U) I
  45.        return;
    / Y1 H5 \% v. K" ^* f
  46.     }- O6 b* B* {7 n% a8 [& }
  47. };
    ! q. L2 ?0 O' h9 c  `
  48. - V% n& d7 [; p* A& F; A. }6 i# L0 g
  49. // 当有客户端连接断开时
    0 P9 P7 P4 P- ]
  50. $worker->onClose = function($connection)
    9 p+ ?7 r. r4 ^) ?/ m4 I
  51. {0 T; ^; t0 T% Z) }( Z
  52.     global $worker;$ J+ {8 [, N' q- V2 }$ l9 c+ n
  53.     if(isset($connection->uid))- }2 |5 M7 l& G" }* ^
  54.     {" Q9 V8 y* i$ I' U6 j
  55.         // 连接断开时删除映射
    5 V! Q! T6 X; b4 P( G- ]
  56.         unset($worker->uidConnections[$connection->uid]);
    9 o( B. L3 r1 V7 R# \- _
  57.     }% o. j* g/ C- B) @' l
  58. };8 @9 L# n" |. J  Z9 _3 ^, c

  59. ) A, d0 u1 R8 }- E. B/ C
  60. // 向所有验证的用户推送数据$ d9 R( u5 C) @8 H) W
  61. function broadcast($message)
    9 Q6 p# Y, N" a+ H4 k+ W; g" G
  62. {
    3 ]; d. ^: @% V4 \0 |' r5 c$ n" x* j1 E
  63.    global $worker;
    & N) j4 H) y$ s2 F, \3 {
  64.    foreach($worker->uidConnections as $connection)& F1 z# Q! c; Y9 K, `
  65.    {
    + c2 P8 l5 j  H/ D
  66.         $connection->send($message);+ w3 |. N, a* \( V
  67.    }
    9 b" g; J' d- _8 D! A" \$ ~5 d
  68. }+ k6 h: _9 E% r+ J
  69. : Y' V* z* _! c9 N$ A( P" e2 e
  70. // 针对uid推送数据
    / t7 [3 `/ O5 T- d9 g* h
  71. function sendMessageByUid($uid, $message)- @; K! O; c: X# N" m( [
  72. {
    1 }3 s! E' s+ E$ {
  73.     global $worker;
    # P7 [2 p* e( }
  74.     if(isset($worker->uidConnections[$uid]))
    ) r# W# V; w) ?4 b! C/ }
  75.     {
    5 Z  L6 C. y' P/ R4 [6 Z
  76.         $connection = $worker->uidConnections[$uid];
    $ i  N6 Q) X9 H/ z8 P* L7 \( k
  77.         $connection->send($message);6 R" }1 k4 L& @7 H; g/ t
  78.         return true;* n3 ~5 x! i. I% i6 O
  79.     }
    0 X* [7 I- i( Q* [, X, P
  80.     return false;! ]: }2 V1 Z" o, H: B* I
  81. }
    . V8 f) y7 s9 }) c5 q& y
  82.   T' w3 K1 Q' R2 Y2 L( ?. e
  83. // 运行所有的worker1 {8 U! n* [- ^$ s6 W% |
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');
    % z, L1 h9 P/ b2 a5 S" a0 C
  2. ws.onopen = function(){- n: \. p' z$ q2 H: O( J6 \
  3.     var uid = 'uid1';
    " n3 ?2 ?/ z4 ?0 C* k$ T2 G
  4.     ws.send(uid);* o0 g% x2 h/ K* T# H- E4 e( ^
  5. };8 A- G  T7 z6 N5 E
  6. ws.onmessage = function(e){. f9 i- V9 ?4 L1 B- K: m% L
  7.     alert(e.data);
    2 O% Y5 D& o( t; h
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    " L; ~. N: Q4 j  L( ^1 {; L  Z4 G
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);  I2 U; f" |/ r+ s! X* V
  3. // 推送的数据,包含uid字段,表示是给这个uid推送* f# }$ [* A7 ]; r$ G+ Y
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');% T; b- Y8 e& |# O+ d1 I' d* G# `7 F
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符- m: Z3 [2 Y4 V8 m8 `2 F7 G
  6. fwrite($client, json_encode($data)."\n");
    # @+ u* O0 i0 Y5 }- h) l
  7. // 读取推送结果
    . v8 z$ ^1 r2 M7 Y2 S9 E
  8. echo fread($client, 8192);
复制代码
* G. x5 `8 X: |3 T

2 h2 B3 v* z; f7 \. t
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2024-12-22 17:36 , Processed in 0.133421 second(s), 20 queries .

Copyright © 2001-2024 Powered by cncml! X3.2. Theme By cncml!